improve eth_sendRawTransaction

This commit is contained in:
Bryan Stitt 2022-12-23 17:32:58 -08:00
parent ce1b0da1e3
commit 4a837b35cc
6 changed files with 103 additions and 66 deletions

@ -287,6 +287,9 @@ These are not yet ordered. There might be duplicates. We might not actually need
- [x] sometimes when fetching a txid through the proxy it fails, but fetching from the backends works fine
- check flashprofits logs for examples
- we were caching too aggressively
- [x] BUG! if sending transactions gets "INTERNAL_ERROR: existing tx with same hash", create a success message
- we just want to be sure that the server has our tx and in this case, it does.
- ERROR http_request:request:try_send_all_upstream_servers: web3_proxy::rpcs::request: bad response! err=JsonRpcClientError(JsonRpcError(JsonRpcError { code: -32000, message: "INTERNAL_ERROR: existing tx with same hash", data: None })) method=eth_sendRawTransaction rpc=local_erigon_alpha_archive id=01GF4HV03Y4ZNKQV8DW5NDQ5CG method=POST authorized_request=User(Some(SqlxMySqlPoolConnection), AuthorizedKey { ip: 10.11.12.15, origin: None, user_key_id: 4, log_revert_chance: 0.0000 }) self=Web3Connections { conns: {"local_erigon_alpha_archive_ws": Web3Connection { name: "local_erigon_alpha_archive_ws", blocks: "all", .. }, "local_geth_ws": Web3Connection { name: "local_geth_ws", blocks: 64, .. }, "local_erigon_alpha_archive": Web3Connection { name: "local_erigon_alpha_archive", blocks: "all", .. }}, .. } authorized_request=Some(User(Some(SqlxMySqlPoolConnection), AuthorizedKey { ip: 10.11.12.15, origin: None, user_key_id: 4, log_revert_chance: 0.0000 })) request=JsonRpcRequest { id: RawValue(39), method: "eth_sendRawTransaction", .. } request_metadata=Some(RequestMetadata { datetime: 2022-10-11T22:14:57.406829095Z, period_seconds: 60, request_bytes: 633, backend_requests: 0, no_servers: 0, error_response: false, response_bytes: 0, response_millis: 0 }) block_needed=None
- [-] let users choose a % to log (or maybe x/second). someone like curve logging all reverts will be a BIG database very quickly
- this must be opt-in or spawned since it will slow things down and will make their calls less private
- [ ] automatic pruning of old revert logs once too many are collected
@ -384,8 +387,6 @@ These are not yet ordered. There might be duplicates. We might not actually need
- [ ] somehow the proxy thought latest was hours behind. need internal health check that forces reconnect if this happens
- [ ] display concurrent requests per api key (only with authentication!)
- [ ] change "remember me" to last until 4 weeks of no use, rather than 4 weeks since login? that will be a lot more database writes
- [ ] BUG! if sending transactions gets "INTERNAL_ERROR: existing tx with same hash", fake a success message
- ERROR http_request:request:try_send_all_upstream_servers: web3_proxy::rpcs::request: bad response! err=JsonRpcClientError(JsonRpcError(JsonRpcError { code: -32000, message: "INTERNAL_ERROR: existing tx with same hash", data: None })) method=eth_sendRawTransaction rpc=local_erigon_alpha_archive id=01GF4HV03Y4ZNKQV8DW5NDQ5CG method=POST authorized_request=User(Some(SqlxMySqlPoolConnection), AuthorizedKey { ip: 10.11.12.15, origin: None, user_key_id: 4, log_revert_chance: 0.0000 }) self=Web3Connections { conns: {"local_erigon_alpha_archive_ws": Web3Connection { name: "local_erigon_alpha_archive_ws", blocks: "all", .. }, "local_geth_ws": Web3Connection { name: "local_geth_ws", blocks: 64, .. }, "local_erigon_alpha_archive": Web3Connection { name: "local_erigon_alpha_archive", blocks: "all", .. }}, .. } authorized_request=Some(User(Some(SqlxMySqlPoolConnection), AuthorizedKey { ip: 10.11.12.15, origin: None, user_key_id: 4, log_revert_chance: 0.0000 })) request=JsonRpcRequest { id: RawValue(39), method: "eth_sendRawTransaction", .. } request_metadata=Some(RequestMetadata { datetime: 2022-10-11T22:14:57.406829095Z, period_seconds: 60, request_bytes: 633, backend_requests: 0, no_servers: 0, error_response: false, response_bytes: 0, response_millis: 0 }) block_needed=None
- [ ] BUG? WARN http_request:request: web3_proxy::block_number: could not get block from params err=unexpected params length id=01GF4HTRKM4JV6NX52XSF9AYMW method=POST authorized_request=User(Some(SqlxMySqlPoolConnection), AuthorizedKey { ip: 10.11.12.15, origin: None, user_key_id: 4, log_revert_chance: 0.0000 })
- why is it failing to get the block from params when its set to None? That should be the simple case
- [ ] BUG: i think if all backend servers stop, the server doesn't properly reconnect. It appears to stop listening on 8854, but not shut down.
@ -406,8 +407,6 @@ These are not yet ordered. There might be duplicates. We might not actually need
- [ ] geth sometimes gives an empty response instead of an error response. figure out a good way to catch this and not serve it
- [ ] GET balance endpoint
- [ ] POST balance endpoint
- [ ] eth_1 | 2022-10-11T22:14:57.408114Z ERROR http_request:request:try_send_all_upstream_servers: web3_proxy::rpcs::request: bad response! err=JsonRpcClientError(JsonRpcError(JsonRpcError { code: -32000, message: "INTERNAL_ERROR: existing tx with same hash", data: None })) method=eth_sendRawTransaction rpc=local_erigon_alpha_archive id=01GF4HV03Y4ZNKQV8DW5NDQ5CG method=POST authorized_request=User(Some(SqlxMySqlPoolConnection), AuthorizedKey { ip: 10.11.12.15, origin: None, user_key_id: 4, log_revert_chance: 0.0000 }) self=Web3Connections { conns: {"local_erigon_alpha_archive_ws": Web3Connection { name: "local_erigon_alpha_archive_ws", blocks: "all", .. }, "local_geth_ws": Web3Connection { name: "local_geth_ws", blocks: 64, .. }, "local_erigon_alpha_archive": Web3Connection { name: "local_erigon_alpha_archive", blocks: "all", .. }}, .. } authorized_request=Some(User(Some(SqlxMySqlPoolConnection), AuthorizedKey { ip: 10.11.12.15, origin: None, user_key_id: 4, log_revert_chance: 0.0000 })) request=JsonRpcRequest { id: RawValue(39), method: "eth_sendRawTransaction", .. } request_metadata=Some(RequestMetadata { datetime: 2022-10-11T22:14:57.406829095Z, period_seconds: 60, request_bytes: 633, backend_requests: 0, no_servers: 0, error_response: false, response_bytes: 0, response_millis: 0 }) block_needed=None
- eth_sendRawTransaction should accept "INTERNAL_ERROR: existing tx with same hash" as a successful response. we just want to be sure that the server has our tx and in this case, it does.
- [ ] EIP1271 for siwe
- [ ] Limited throughput during high traffic
- [ ] instead of Option<...> in our frontend function signatures, use result and then the try operator so that we get our errors wrapped in json

@ -23,6 +23,8 @@ use derive_more::From;
use entities::sea_orm_active_enums::LogLevel;
use ethers::core::utils::keccak256;
use ethers::prelude::{Address, Block, Bytes, TxHash, H256, U64};
use ethers::types::Transaction;
use ethers::utils::rlp::{Decodable, Rlp};
use futures::future::join_all;
use futures::stream::FuturesUnordered;
use futures::stream::StreamExt;
@ -38,6 +40,7 @@ use moka::future::Cache;
use redis_rate_limiter::{DeadpoolRuntime, RedisConfig, RedisPool, RedisRateLimiter};
use serde::Serialize;
use serde_json::json;
use serde_json::value::to_raw_value;
use std::fmt;
use std::hash::{Hash, Hasher};
use std::net::IpAddr;
@ -984,17 +987,54 @@ impl Web3ProxyApp {
// emit stats
let private_rpcs = self.private_rpcs.as_ref().unwrap_or(&self.balanced_rpcs);
// try_send_all_upstream_servers puts the request id into the response. no need to do that ourselves here.
let mut response = private_rpcs
.try_send_all_upstream_servers(
authorization,
request,
&request,
Some(request_metadata.clone()),
None,
Level::Trace,
)
.await?;
response.id = request_id;
// sometimes we get an error that the transaction is already known by our nodes,
// that's not really an error. Just return the hash like a successful response would.
if let Some(response_error) = response.error.as_ref() {
if response_error.code == -32000
&& (response_error.message == "ALREADY_EXISTS: already known"
|| response_error.message
== "INTERNAL_ERROR: existing tx with same hash")
{
let params = request
.params
.context("there must be params if we got this far")?;
let params = params
.as_array()
.context("there must be an array if we got this far")?
.get(0)
.context("there must be an item if we got this far")?
.as_str()
.context("there must be a string if we got this far")?;
let params = Bytes::from_str(params)
.expect("there must be Bytes if we got this far");
let rlp = Rlp::new(params.as_ref());
if let Ok(tx) = Transaction::decode(&rlp) {
let tx_hash = json!(tx.hash());
debug!("tx_hash: {:#?}", tx_hash);
let tx_hash = to_raw_value(&tx_hash).unwrap();
response.error = None;
response.result = Some(tx_hash);
}
}
}
let rpcs = request_metadata.backend_requests.lock().clone();
@ -1143,7 +1183,7 @@ impl Web3ProxyApp {
// discard their id by replacing it with an empty
response.id = Default::default();
// TODO: only cache the inner response (or error)
// TODO: only cache the inner response
Ok::<_, anyhow::Error>(response)
})
.await
@ -1152,23 +1192,17 @@ impl Web3ProxyApp {
// TODO: emit a stat for an error
anyhow::anyhow!(err)
})
.context("caching response")?
.context("error while forwarding and caching response")?
} else {
let mut response = self
.balanced_rpcs
self.balanced_rpcs
.try_send_best_upstream_server(
&authorization,
request,
Some(&request_metadata),
None,
)
.await?;
// discard their id by replacing it with an empty
response.id = Default::default();
// TODO: only cache the inner response (or error)
response
.await
.context("error while forwarding response")?
}
};

@ -4,7 +4,7 @@ use ethers::{
prelude::{BlockNumber, U64},
types::H256,
};
use log::{debug, trace, warn};
use log::{trace, warn};
use serde_json::json;
use std::sync::Arc;

@ -28,7 +28,8 @@ use handlebars::Handlebars;
use hashbrown::HashMap;
use http::StatusCode;
use log::{error, info, trace, warn};
use serde_json::{json, value::RawValue};
use serde_json::json;
use serde_json::value::to_raw_value;
use std::sync::Arc;
use std::{str::from_utf8_mut, sync::atomic::AtomicUsize};
@ -264,7 +265,8 @@ async fn handle_socket_payload(
}
Err(err) => {
// TODO: move this logic somewhere else and just set id to None here
let id = RawValue::from_string("null".to_string()).expect("null can always be a value");
let id =
to_raw_value(&json!(None::<Option::<()>>)).expect("None can always be a RawValue");
(id, Err(err.into()))
}
};

@ -2,7 +2,8 @@ use derive_more::From;
use ethers::prelude::{HttpClientError, ProviderError, WsClientError};
use serde::de::{self, Deserializer, MapAccess, SeqAccess, Visitor};
use serde::{Deserialize, Serialize};
use serde_json::value::RawValue;
use serde_json::json;
use serde_json::value::{to_raw_value, RawValue};
use std::fmt;
// this is used by serde
@ -161,7 +162,8 @@ pub struct JsonRpcErrorData {
}
/// A complete response
#[derive(Clone, Deserialize, Serialize)]
/// TODO: better Debug response
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct JsonRpcForwardedResponse {
// TODO: jsonrpc a &str?
#[serde(default = "default_jsonrpc")]
@ -173,15 +175,6 @@ pub struct JsonRpcForwardedResponse {
pub error: Option<JsonRpcErrorData>,
}
/// TODO: the default formatter takes forever to write. this is too quiet though
impl fmt::Debug for JsonRpcForwardedResponse {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("JsonRpcForwardedResponse")
.field("id", &self.id)
.finish_non_exhaustive()
}
}
impl JsonRpcRequest {
pub fn num_bytes(&self) -> usize {
// TODO: not sure how to do this without wasting a ton of allocations
@ -212,7 +205,7 @@ impl JsonRpcForwardedResponse {
JsonRpcForwardedResponse {
jsonrpc: "2.0".to_string(),
id: id.unwrap_or_else(|| {
RawValue::from_string("null".to_string()).expect("null id should always work")
to_raw_value(&json!(None::<Option<()>>)).expect("null id should always work")
}),
result: None,
error: Some(JsonRpcErrorData {
@ -235,10 +228,7 @@ impl JsonRpcForwardedResponse {
pub fn from_value(partial_response: serde_json::Value, id: Box<RawValue>) -> Self {
let partial_response =
serde_json::to_string(&partial_response).expect("this should always work");
let partial_response =
RawValue::from_string(partial_response).expect("this should always work");
to_raw_value(&partial_response).expect("Value to RawValue should always work");
JsonRpcForwardedResponse {
jsonrpc: "2.0".to_string(),

@ -310,11 +310,12 @@ impl Web3Connections {
pub async fn try_send_parallel_requests(
&self,
active_request_handles: Vec<OpenRequestHandle>,
id: Box<RawValue>,
method: &str,
params: Option<&serde_json::Value>,
error_level: Level,
// TODO: remove this box once i figure out how to do the options
) -> Result<Box<RawValue>, ProviderError> {
) -> anyhow::Result<JsonRpcForwardedResponse> {
// TODO: if only 1 active_request_handles, do self.try_send_request?
let responses = active_request_handles
@ -330,17 +331,24 @@ impl Web3Connections {
.await;
// TODO: Strings are not great keys, but we can't use RawValue or ProviderError as keys because they don't implement Hash or Eq
let mut count_map: HashMap<String, Result<Box<RawValue>, ProviderError>> = HashMap::new();
let mut count_map: HashMap<String, _> = HashMap::new();
let mut counts: Counter<String> = Counter::new();
let mut any_ok = false;
for response in responses {
// TODO: i think we need to do something smarter with provider error. we at least need to wrap it up as JSON
// TODO: emit stats errors?
let mut any_ok_with_json_result = false;
let mut any_ok_but_maybe_json_error = false;
for partial_response in responses {
if partial_response.is_ok() {
any_ok_with_json_result = true;
}
let response =
JsonRpcForwardedResponse::try_from_response_result(partial_response, id.clone());
// TODO: better key?
let s = format!("{:?}", response);
if count_map.get(&s).is_none() {
if response.is_ok() {
any_ok = true;
any_ok_but_maybe_json_error = true;
}
count_map.insert(s.clone(), response);
@ -350,14 +358,26 @@ impl Web3Connections {
}
for (most_common, _) in counts.most_common_ordered() {
let most_common = count_map.remove(&most_common).unwrap();
let most_common = count_map
.remove(&most_common)
.expect("most_common key must exist");
if any_ok && most_common.is_err() {
// errors were more common, but we are going to skip them because we got an okay
continue;
} else {
// return the most common
return most_common;
match most_common {
Ok(x) => {
if any_ok_with_json_result && x.error.is_some() {
// this one may be an "Ok", but the json has an error inside it
continue;
}
// return the most common success
return Ok(x);
}
Err(err) => {
if any_ok_but_maybe_json_error {
// the most common is an error, but there is an Ok in here somewhere. loop to find it
continue;
}
return Err(err);
}
}
}
@ -719,18 +739,18 @@ impl Web3Connections {
.store(true, Ordering::Release);
}
warn!("No synced servers! {:?}", self);
let num_conns = self.conns.len();
// TODO: what error code? 502?
Err(anyhow::anyhow!("all {} tries exhausted", skip_rpcs.len()))
error!("No servers synced ({} known)", num_conns);
Err(anyhow::anyhow!("No servers synced ({} known)", num_conns))
}
/// be sure there is a timeout on this or it might loop forever
pub async fn try_send_all_upstream_servers(
&self,
authorization: &Arc<Authorization>,
request: JsonRpcRequest,
request: &JsonRpcRequest,
request_metadata: Option<Arc<RequestMetadata>>,
block_needed: Option<&U64>,
error_level: Level,
@ -752,23 +772,15 @@ impl Web3Connections {
.extend(active_request_handles.iter().map(|x| x.clone_connection()));
}
let quorum_response = self
return self
.try_send_parallel_requests(
active_request_handles,
request.id.clone(),
request.method.as_ref(),
request.params.as_ref(),
error_level,
)
.await?;
let response = JsonRpcForwardedResponse {
jsonrpc: "2.0".to_string(),
id: request.id,
result: Some(quorum_response),
error: None,
};
return Ok(response);
.await;
}
Err(None) => {
warn!("No servers in sync on {:?}! Retrying", self);