From 4a837b35ccc0b11f50fb2ce52519df9911022663 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Fri, 23 Dec 2022 17:32:58 -0800 Subject: [PATCH] improve eth_sendRawTransaction --- TODO.md | 7 ++- web3_proxy/src/app/mod.rs | 60 ++++++++++++++++----- web3_proxy/src/block_number.rs | 2 +- web3_proxy/src/frontend/rpc_proxy_ws.rs | 6 ++- web3_proxy/src/jsonrpc.rs | 22 +++----- web3_proxy/src/rpcs/connections.rs | 72 ++++++++++++++----------- 6 files changed, 103 insertions(+), 66 deletions(-) diff --git a/TODO.md b/TODO.md index e00e735f..883282f3 100644 --- a/TODO.md +++ b/TODO.md @@ -287,6 +287,9 @@ These are not yet ordered. There might be duplicates. We might not actually need - [x] sometimes when fetching a txid through the proxy it fails, but fetching from the backends works fine - check flashprofits logs for examples - we were caching too aggressively +- [x] BUG! if sending transactions gets "INTERNAL_ERROR: existing tx with same hash", create a success message + - we just want to be sure that the server has our tx and in this case, it does. + - ERROR http_request:request:try_send_all_upstream_servers: web3_proxy::rpcs::request: bad response! err=JsonRpcClientError(JsonRpcError(JsonRpcError { code: -32000, message: "INTERNAL_ERROR: existing tx with same hash", data: None })) method=eth_sendRawTransaction rpc=local_erigon_alpha_archive id=01GF4HV03Y4ZNKQV8DW5NDQ5CG method=POST authorized_request=User(Some(SqlxMySqlPoolConnection), AuthorizedKey { ip: 10.11.12.15, origin: None, user_key_id: 4, log_revert_chance: 0.0000 }) self=Web3Connections { conns: {"local_erigon_alpha_archive_ws": Web3Connection { name: "local_erigon_alpha_archive_ws", blocks: "all", .. }, "local_geth_ws": Web3Connection { name: "local_geth_ws", blocks: 64, .. }, "local_erigon_alpha_archive": Web3Connection { name: "local_erigon_alpha_archive", blocks: "all", .. }}, .. } authorized_request=Some(User(Some(SqlxMySqlPoolConnection), AuthorizedKey { ip: 10.11.12.15, origin: None, user_key_id: 4, log_revert_chance: 0.0000 })) request=JsonRpcRequest { id: RawValue(39), method: "eth_sendRawTransaction", .. } request_metadata=Some(RequestMetadata { datetime: 2022-10-11T22:14:57.406829095Z, period_seconds: 60, request_bytes: 633, backend_requests: 0, no_servers: 0, error_response: false, response_bytes: 0, response_millis: 0 }) block_needed=None - [-] let users choose a % to log (or maybe x/second). someone like curve logging all reverts will be a BIG database very quickly - this must be opt-in or spawned since it will slow things down and will make their calls less private - [ ] automatic pruning of old revert logs once too many are collected @@ -384,8 +387,6 @@ These are not yet ordered. There might be duplicates. We might not actually need - [ ] somehow the proxy thought latest was hours behind. need internal health check that forces reconnect if this happens - [ ] display concurrent requests per api key (only with authentication!) - [ ] change "remember me" to last until 4 weeks of no use, rather than 4 weeks since login? that will be a lot more database writes -- [ ] BUG! if sending transactions gets "INTERNAL_ERROR: existing tx with same hash", fake a success message - - ERROR http_request:request:try_send_all_upstream_servers: web3_proxy::rpcs::request: bad response! err=JsonRpcClientError(JsonRpcError(JsonRpcError { code: -32000, message: "INTERNAL_ERROR: existing tx with same hash", data: None })) method=eth_sendRawTransaction rpc=local_erigon_alpha_archive id=01GF4HV03Y4ZNKQV8DW5NDQ5CG method=POST authorized_request=User(Some(SqlxMySqlPoolConnection), AuthorizedKey { ip: 10.11.12.15, origin: None, user_key_id: 4, log_revert_chance: 0.0000 }) self=Web3Connections { conns: {"local_erigon_alpha_archive_ws": Web3Connection { name: "local_erigon_alpha_archive_ws", blocks: "all", .. }, "local_geth_ws": Web3Connection { name: "local_geth_ws", blocks: 64, .. }, "local_erigon_alpha_archive": Web3Connection { name: "local_erigon_alpha_archive", blocks: "all", .. }}, .. } authorized_request=Some(User(Some(SqlxMySqlPoolConnection), AuthorizedKey { ip: 10.11.12.15, origin: None, user_key_id: 4, log_revert_chance: 0.0000 })) request=JsonRpcRequest { id: RawValue(39), method: "eth_sendRawTransaction", .. } request_metadata=Some(RequestMetadata { datetime: 2022-10-11T22:14:57.406829095Z, period_seconds: 60, request_bytes: 633, backend_requests: 0, no_servers: 0, error_response: false, response_bytes: 0, response_millis: 0 }) block_needed=None - [ ] BUG? WARN http_request:request: web3_proxy::block_number: could not get block from params err=unexpected params length id=01GF4HTRKM4JV6NX52XSF9AYMW method=POST authorized_request=User(Some(SqlxMySqlPoolConnection), AuthorizedKey { ip: 10.11.12.15, origin: None, user_key_id: 4, log_revert_chance: 0.0000 }) - why is it failing to get the block from params when its set to None? That should be the simple case - [ ] BUG: i think if all backend servers stop, the server doesn't properly reconnect. It appears to stop listening on 8854, but not shut down. @@ -406,8 +407,6 @@ These are not yet ordered. There might be duplicates. We might not actually need - [ ] geth sometimes gives an empty response instead of an error response. figure out a good way to catch this and not serve it - [ ] GET balance endpoint - [ ] POST balance endpoint -- [ ] eth_1 | 2022-10-11T22:14:57.408114Z ERROR http_request:request:try_send_all_upstream_servers: web3_proxy::rpcs::request: bad response! err=JsonRpcClientError(JsonRpcError(JsonRpcError { code: -32000, message: "INTERNAL_ERROR: existing tx with same hash", data: None })) method=eth_sendRawTransaction rpc=local_erigon_alpha_archive id=01GF4HV03Y4ZNKQV8DW5NDQ5CG method=POST authorized_request=User(Some(SqlxMySqlPoolConnection), AuthorizedKey { ip: 10.11.12.15, origin: None, user_key_id: 4, log_revert_chance: 0.0000 }) self=Web3Connections { conns: {"local_erigon_alpha_archive_ws": Web3Connection { name: "local_erigon_alpha_archive_ws", blocks: "all", .. }, "local_geth_ws": Web3Connection { name: "local_geth_ws", blocks: 64, .. }, "local_erigon_alpha_archive": Web3Connection { name: "local_erigon_alpha_archive", blocks: "all", .. }}, .. } authorized_request=Some(User(Some(SqlxMySqlPoolConnection), AuthorizedKey { ip: 10.11.12.15, origin: None, user_key_id: 4, log_revert_chance: 0.0000 })) request=JsonRpcRequest { id: RawValue(39), method: "eth_sendRawTransaction", .. } request_metadata=Some(RequestMetadata { datetime: 2022-10-11T22:14:57.406829095Z, period_seconds: 60, request_bytes: 633, backend_requests: 0, no_servers: 0, error_response: false, response_bytes: 0, response_millis: 0 }) block_needed=None - - eth_sendRawTransaction should accept "INTERNAL_ERROR: existing tx with same hash" as a successful response. we just want to be sure that the server has our tx and in this case, it does. - [ ] EIP1271 for siwe - [ ] Limited throughput during high traffic - [ ] instead of Option<...> in our frontend function signatures, use result and then the try operator so that we get our errors wrapped in json diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index ab8377cf..b6c63d6b 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -23,6 +23,8 @@ use derive_more::From; use entities::sea_orm_active_enums::LogLevel; use ethers::core::utils::keccak256; use ethers::prelude::{Address, Block, Bytes, TxHash, H256, U64}; +use ethers::types::Transaction; +use ethers::utils::rlp::{Decodable, Rlp}; use futures::future::join_all; use futures::stream::FuturesUnordered; use futures::stream::StreamExt; @@ -38,6 +40,7 @@ use moka::future::Cache; use redis_rate_limiter::{DeadpoolRuntime, RedisConfig, RedisPool, RedisRateLimiter}; use serde::Serialize; use serde_json::json; +use serde_json::value::to_raw_value; use std::fmt; use std::hash::{Hash, Hasher}; use std::net::IpAddr; @@ -984,17 +987,54 @@ impl Web3ProxyApp { // emit stats let private_rpcs = self.private_rpcs.as_ref().unwrap_or(&self.balanced_rpcs); + // try_send_all_upstream_servers puts the request id into the response. no need to do that ourselves here. let mut response = private_rpcs .try_send_all_upstream_servers( authorization, - request, + &request, Some(request_metadata.clone()), None, Level::Trace, ) .await?; - response.id = request_id; + // sometimes we get an error that the transaction is already known by our nodes, + // that's not really an error. Just return the hash like a successful response would. + if let Some(response_error) = response.error.as_ref() { + if response_error.code == -32000 + && (response_error.message == "ALREADY_EXISTS: already known" + || response_error.message + == "INTERNAL_ERROR: existing tx with same hash") + { + let params = request + .params + .context("there must be params if we got this far")?; + + let params = params + .as_array() + .context("there must be an array if we got this far")? + .get(0) + .context("there must be an item if we got this far")? + .as_str() + .context("there must be a string if we got this far")?; + + let params = Bytes::from_str(params) + .expect("there must be Bytes if we got this far"); + + let rlp = Rlp::new(params.as_ref()); + + if let Ok(tx) = Transaction::decode(&rlp) { + let tx_hash = json!(tx.hash()); + + debug!("tx_hash: {:#?}", tx_hash); + + let tx_hash = to_raw_value(&tx_hash).unwrap(); + + response.error = None; + response.result = Some(tx_hash); + } + } + } let rpcs = request_metadata.backend_requests.lock().clone(); @@ -1143,7 +1183,7 @@ impl Web3ProxyApp { // discard their id by replacing it with an empty response.id = Default::default(); - // TODO: only cache the inner response (or error) + // TODO: only cache the inner response Ok::<_, anyhow::Error>(response) }) .await @@ -1152,23 +1192,17 @@ impl Web3ProxyApp { // TODO: emit a stat for an error anyhow::anyhow!(err) }) - .context("caching response")? + .context("error while forwarding and caching response")? } else { - let mut response = self - .balanced_rpcs + self.balanced_rpcs .try_send_best_upstream_server( &authorization, request, Some(&request_metadata), None, ) - .await?; - - // discard their id by replacing it with an empty - response.id = Default::default(); - - // TODO: only cache the inner response (or error) - response + .await + .context("error while forwarding response")? } }; diff --git a/web3_proxy/src/block_number.rs b/web3_proxy/src/block_number.rs index 95be5dc5..4e52788f 100644 --- a/web3_proxy/src/block_number.rs +++ b/web3_proxy/src/block_number.rs @@ -4,7 +4,7 @@ use ethers::{ prelude::{BlockNumber, U64}, types::H256, }; -use log::{debug, trace, warn}; +use log::{trace, warn}; use serde_json::json; use std::sync::Arc; diff --git a/web3_proxy/src/frontend/rpc_proxy_ws.rs b/web3_proxy/src/frontend/rpc_proxy_ws.rs index fedc222a..b1f70e9f 100644 --- a/web3_proxy/src/frontend/rpc_proxy_ws.rs +++ b/web3_proxy/src/frontend/rpc_proxy_ws.rs @@ -28,7 +28,8 @@ use handlebars::Handlebars; use hashbrown::HashMap; use http::StatusCode; use log::{error, info, trace, warn}; -use serde_json::{json, value::RawValue}; +use serde_json::json; +use serde_json::value::to_raw_value; use std::sync::Arc; use std::{str::from_utf8_mut, sync::atomic::AtomicUsize}; @@ -264,7 +265,8 @@ async fn handle_socket_payload( } Err(err) => { // TODO: move this logic somewhere else and just set id to None here - let id = RawValue::from_string("null".to_string()).expect("null can always be a value"); + let id = + to_raw_value(&json!(None::>)).expect("None can always be a RawValue"); (id, Err(err.into())) } }; diff --git a/web3_proxy/src/jsonrpc.rs b/web3_proxy/src/jsonrpc.rs index 6395a208..0356362a 100644 --- a/web3_proxy/src/jsonrpc.rs +++ b/web3_proxy/src/jsonrpc.rs @@ -2,7 +2,8 @@ use derive_more::From; use ethers::prelude::{HttpClientError, ProviderError, WsClientError}; use serde::de::{self, Deserializer, MapAccess, SeqAccess, Visitor}; use serde::{Deserialize, Serialize}; -use serde_json::value::RawValue; +use serde_json::json; +use serde_json::value::{to_raw_value, RawValue}; use std::fmt; // this is used by serde @@ -161,7 +162,8 @@ pub struct JsonRpcErrorData { } /// A complete response -#[derive(Clone, Deserialize, Serialize)] +/// TODO: better Debug response +#[derive(Clone, Debug, Deserialize, Serialize)] pub struct JsonRpcForwardedResponse { // TODO: jsonrpc a &str? #[serde(default = "default_jsonrpc")] @@ -173,15 +175,6 @@ pub struct JsonRpcForwardedResponse { pub error: Option, } -/// TODO: the default formatter takes forever to write. this is too quiet though -impl fmt::Debug for JsonRpcForwardedResponse { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("JsonRpcForwardedResponse") - .field("id", &self.id) - .finish_non_exhaustive() - } -} - impl JsonRpcRequest { pub fn num_bytes(&self) -> usize { // TODO: not sure how to do this without wasting a ton of allocations @@ -212,7 +205,7 @@ impl JsonRpcForwardedResponse { JsonRpcForwardedResponse { jsonrpc: "2.0".to_string(), id: id.unwrap_or_else(|| { - RawValue::from_string("null".to_string()).expect("null id should always work") + to_raw_value(&json!(None::>)).expect("null id should always work") }), result: None, error: Some(JsonRpcErrorData { @@ -235,10 +228,7 @@ impl JsonRpcForwardedResponse { pub fn from_value(partial_response: serde_json::Value, id: Box) -> Self { let partial_response = - serde_json::to_string(&partial_response).expect("this should always work"); - - let partial_response = - RawValue::from_string(partial_response).expect("this should always work"); + to_raw_value(&partial_response).expect("Value to RawValue should always work"); JsonRpcForwardedResponse { jsonrpc: "2.0".to_string(), diff --git a/web3_proxy/src/rpcs/connections.rs b/web3_proxy/src/rpcs/connections.rs index bdce0589..9fac1150 100644 --- a/web3_proxy/src/rpcs/connections.rs +++ b/web3_proxy/src/rpcs/connections.rs @@ -310,11 +310,12 @@ impl Web3Connections { pub async fn try_send_parallel_requests( &self, active_request_handles: Vec, + id: Box, method: &str, params: Option<&serde_json::Value>, error_level: Level, // TODO: remove this box once i figure out how to do the options - ) -> Result, ProviderError> { + ) -> anyhow::Result { // TODO: if only 1 active_request_handles, do self.try_send_request? let responses = active_request_handles @@ -330,17 +331,24 @@ impl Web3Connections { .await; // TODO: Strings are not great keys, but we can't use RawValue or ProviderError as keys because they don't implement Hash or Eq - let mut count_map: HashMap, ProviderError>> = HashMap::new(); + let mut count_map: HashMap = HashMap::new(); let mut counts: Counter = Counter::new(); - let mut any_ok = false; - for response in responses { - // TODO: i think we need to do something smarter with provider error. we at least need to wrap it up as JSON - // TODO: emit stats errors? + let mut any_ok_with_json_result = false; + let mut any_ok_but_maybe_json_error = false; + for partial_response in responses { + if partial_response.is_ok() { + any_ok_with_json_result = true; + } + + let response = + JsonRpcForwardedResponse::try_from_response_result(partial_response, id.clone()); + + // TODO: better key? let s = format!("{:?}", response); if count_map.get(&s).is_none() { if response.is_ok() { - any_ok = true; + any_ok_but_maybe_json_error = true; } count_map.insert(s.clone(), response); @@ -350,14 +358,26 @@ impl Web3Connections { } for (most_common, _) in counts.most_common_ordered() { - let most_common = count_map.remove(&most_common).unwrap(); + let most_common = count_map + .remove(&most_common) + .expect("most_common key must exist"); - if any_ok && most_common.is_err() { - // errors were more common, but we are going to skip them because we got an okay - continue; - } else { - // return the most common - return most_common; + match most_common { + Ok(x) => { + if any_ok_with_json_result && x.error.is_some() { + // this one may be an "Ok", but the json has an error inside it + continue; + } + // return the most common success + return Ok(x); + } + Err(err) => { + if any_ok_but_maybe_json_error { + // the most common is an error, but there is an Ok in here somewhere. loop to find it + continue; + } + return Err(err); + } } } @@ -719,18 +739,18 @@ impl Web3Connections { .store(true, Ordering::Release); } - warn!("No synced servers! {:?}", self); + let num_conns = self.conns.len(); - // TODO: what error code? 502? - Err(anyhow::anyhow!("all {} tries exhausted", skip_rpcs.len())) + error!("No servers synced ({} known)", num_conns); + + Err(anyhow::anyhow!("No servers synced ({} known)", num_conns)) } /// be sure there is a timeout on this or it might loop forever - pub async fn try_send_all_upstream_servers( &self, authorization: &Arc, - request: JsonRpcRequest, + request: &JsonRpcRequest, request_metadata: Option>, block_needed: Option<&U64>, error_level: Level, @@ -752,23 +772,15 @@ impl Web3Connections { .extend(active_request_handles.iter().map(|x| x.clone_connection())); } - let quorum_response = self + return self .try_send_parallel_requests( active_request_handles, + request.id.clone(), request.method.as_ref(), request.params.as_ref(), error_level, ) - .await?; - - let response = JsonRpcForwardedResponse { - jsonrpc: "2.0".to_string(), - id: request.id, - result: Some(quorum_response), - error: None, - }; - - return Ok(response); + .await; } Err(None) => { warn!("No servers in sync on {:?}! Retrying", self);