From c1c127e6bd5693c0b8e93082e48ca27b3e51db9f Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Tue, 3 Oct 2023 21:35:25 -0700 Subject: [PATCH] try_send_all_synced_connections for getting transactions --- web3_proxy/src/app/mod.rs | 31 +++++++++++++++---------------- web3_proxy/src/rpcs/many.rs | 15 +++++++-------- 2 files changed, 22 insertions(+), 24 deletions(-) diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 9b94d846..e0bd63a4 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -1365,15 +1365,15 @@ impl Web3ProxyApp { // TODO: timeout // TODO: change this to send serially until we get a success - let result = self + let mut result = self .balanced_rpcs - .try_proxy_connection::( + .try_send_all_synced_connections::( web3_request, + Some(Duration::from_secs(30)), + Some(crate::rpcs::request::RequestErrorHandler::DebugLevel), + None, ) - .await? - .parsed() - .await? - .into_result(); + .await; // if we got "null" or "", it is probably because the tx is old. retry on nodes with old block data // TODO: this feels fragile. how should we do this better/ @@ -1391,22 +1391,21 @@ impl Web3ProxyApp { .archive_request .store(true, atomic::Ordering::Relaxed); - self + // TODO: we don't actually want try_send_all. we want the first non-null, non-error response + result = self .balanced_rpcs - .try_proxy_connection::>( + .try_send_all_synced_connections::( web3_request, - // Some(Duration::from_secs(30)), - // // TODO: should this be block 0 instead? - // Some(&U64::one()), - // // TODO: is this a good way to allow lagged archive nodes a try - // Some(&head_block_num.saturating_sub(5.into()).clamp(U64::one(), U64::MAX)), + Some(Duration::from_secs(30)), + Some(crate::rpcs::request::RequestErrorHandler::DebugLevel), + None, ) - .await? - } else { - jsonrpc::ParsedResponse::from_value(result?, web3_request.id()).into() + .await; } // TODO: if parsed is an error, return a null instead + + jsonrpc::ParsedResponse::from_value(result?, web3_request.id()).into() } // TODO: eth_gasPrice that does awesome magic to predict the future "eth_hashrate" => jsonrpc::ParsedResponse::from_value(json!(U64::zero()), web3_request.id()).into(), diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index fb6046c1..508c3b54 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -22,7 +22,6 @@ use moka::future::CacheBuilder; use parking_lot::RwLock; use serde::ser::{SerializeStruct, Serializer}; use serde::Serialize; -use serde_json::value::RawValue; use std::borrow::Cow; use std::cmp::min_by_key; use std::fmt::{self, Display}; @@ -370,12 +369,12 @@ impl Web3Rpcs { } /// Send the same request to all the handles. Returning the most common success or most common error. - /// TODO: option to return the fastest response and handles for all the others instead? - pub async fn try_send_parallel_requests( + /// TODO: option to return the fastest (non-null, non-error) response and handles for all the others instead? + pub async fn try_send_parallel_requests( &self, active_request_handles: Vec, max_wait: Option, - ) -> Result, Web3ProxyError> { + ) -> Result { // TODO: if only 1 active_request_handles, do self.try_send_request? let max_wait = max_wait.unwrap_or_else(|| Duration::from_secs(300)); @@ -384,7 +383,7 @@ impl Web3Rpcs { let responses = active_request_handles .into_iter() .map(|active_request_handle| async move { - let result: Result, Web3ProxyError>, Web3ProxyError> = + let result: Result, Web3ProxyError> = timeout(max_wait, async { match active_request_handle.request().await { Ok(response) => match response.parsed().await { @@ -400,7 +399,7 @@ impl Web3Rpcs { result.flatten() }) .collect::>() - .collect::, Web3ProxyError>>>() + .collect::>>() .await; // TODO: Strings are not great keys, but we can't use RawValue or ProviderError as keys because they don't implement Hash or Eq @@ -1049,13 +1048,13 @@ impl Web3Rpcs { /// be sure there is a timeout on this or it might loop forever #[allow(clippy::too_many_arguments)] - pub async fn try_send_all_synced_connections( + pub async fn try_send_all_synced_connections( self: &Arc, web3_request: &Arc, max_wait: Option, error_level: Option, max_sends: Option, - ) -> Web3ProxyResult> { + ) -> Web3ProxyResult { let mut watch_consensus_rpcs = self.watch_ranked_rpcs.subscribe(); // todo!() we are inconsistent with max_wait and web3_request.expires_at