From 690601643def77da6fd7b0e798053925dfc7553d Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Thu, 3 Aug 2023 11:39:48 -0700 Subject: [PATCH] improve eth_sendRawTransaction and other timeouts --- web3_proxy/src/app/mod.rs | 114 +++++++++++++++++++----------------- web3_proxy/src/lib.rs | 1 + web3_proxy/src/rpcs/many.rs | 35 +++++++---- 3 files changed, 86 insertions(+), 64 deletions(-) diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 2767cbd5..0c029579 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -1106,9 +1106,9 @@ impl Web3ProxyApp { Some(request_metadata), None, None, - Some(Duration::from_secs(30)), + Some(Duration::from_secs(10)), Some(Level::TRACE.into()), - None, + Some(3), ) .await; @@ -1136,7 +1136,7 @@ impl Web3ProxyApp { Some(request_metadata), None, None, - Some(Duration::from_secs(30)), + Some(Duration::from_secs(10)), Some(Level::TRACE.into()), num_public_rpcs, ) @@ -1164,56 +1164,66 @@ impl Web3ProxyApp { // turn some of the Web3ProxyErrors into Ok results // TODO: move this into a helper function - let (code, response_data) = match self - ._proxy_request_with_caching( - &request.method, - &mut request.params, - head_block, - &request_metadata, - ) - .await - { - Ok(response_data) => { - request_metadata - .error_response - .store(false, Ordering::Release); + let max_tries = 3; + let mut tries = 0; + loop { + let (code, response_data) = match self + ._proxy_request_with_caching( + &request.method, + &mut request.params, + head_block, + &request_metadata, + ) + .await + { + Ok(response_data) => { + request_metadata + .error_response + .store(false, Ordering::Release); - (StatusCode::OK, response_data) - } - Err(err @ Web3ProxyError::NullJsonRpcResult) => { - request_metadata - .error_response - .store(false, Ordering::Release); + (StatusCode::OK, response_data) + } + Err(err @ Web3ProxyError::NullJsonRpcResult) => { + request_metadata + .error_response + .store(false, Ordering::Release); - err.as_response_parts() - } - Err(Web3ProxyError::JsonRpcResponse(response_data)) => { - request_metadata - .error_response - .store(response_data.is_error(), Ordering::Release); + err.as_response_parts() + } + Err(Web3ProxyError::JsonRpcResponse(response_data)) => { + request_metadata + .error_response + .store(response_data.is_error(), Ordering::Release); - (StatusCode::OK, response_data) - } - Err(err) => { - request_metadata - .error_response - .store(true, Ordering::Release); + (StatusCode::OK, response_data) + } + Err(err) => { + tries += 1; + if tries < max_tries { + // try again + continue + } - err.as_response_parts() - } - }; + request_metadata + .error_response + .store(true, Ordering::Release); - let response = JsonRpcForwardedResponse::from_response_data(response_data, response_id); + err.as_response_parts() + } + }; - // TODO: this serializes twice :/ - request_metadata.add_response(ResponseOrBytes::Response(&response)); + let response = JsonRpcForwardedResponse::from_response_data(response_data, response_id); - let rpcs = request_metadata.backend_rpcs_used(); + // TODO: this serializes twice :/ + request_metadata.add_response(ResponseOrBytes::Response(&response)); - // there might be clones in the background, so this isn't a sure thing - let _ = request_metadata.try_send_arc_stat(); + let rpcs = request_metadata.backend_rpcs_used(); - (code, response, rpcs) + // there might be clones in the background, so this isn't a sure thing + let _ = request_metadata.try_send_arc_stat(); + + return (code, response, rpcs) + } } /// main logic for proxy_cached_request but in a dedicated function so the try operator is easy to use @@ -1453,16 +1463,12 @@ impl Web3ProxyApp { // TODO: error if the chain_id is incorrect - let response = timeout( - Duration::from_secs(30), - self - .try_send_protected( - method, - params, - request_metadata, - ) - ) - .await?; + let response = self + .try_send_protected( + method, + params, + request_metadata, + ).await; let mut response = response.try_into()?; diff --git a/web3_proxy/src/lib.rs b/web3_proxy/src/lib.rs index dbde96d5..e5ad7bc1 100644 --- a/web3_proxy/src/lib.rs +++ b/web3_proxy/src/lib.rs @@ -1,6 +1,7 @@ #![feature(lazy_cell)] #![feature(let_chains)] #![feature(trait_alias)] +#![feature(result_flattening)] #![forbid(unsafe_code)] pub mod admin_queries; diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 70cac655..96e88c06 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -12,10 +12,10 @@ use crate::frontend::status::MokaCacheSerializer; use crate::jsonrpc::{JsonRpcErrorData, JsonRpcParams, JsonRpcResultData}; use counter::Counter; use derive_more::From; -use ethers::prelude::{ProviderError, U64}; +use ethers::prelude::U64; use futures::future::try_join_all; use futures::stream::FuturesUnordered; -use futures::StreamExt; +use futures::{StreamExt, TryFutureExt}; use hashbrown::HashMap; use itertools::Itertools; use moka::future::CacheBuilder; @@ -31,7 +31,7 @@ use std::sync::atomic::Ordering; use std::sync::Arc; use tokio::select; use tokio::sync::{mpsc, watch}; -use tokio::time::{sleep, sleep_until, Duration, Instant}; +use tokio::time::{sleep, sleep_until, timeout, Duration, Instant}; use tracing::{debug, error, info, instrument, trace, warn}; /// A collection of web3 connections. Sends requests either the current best server or all servers. @@ -379,20 +379,30 @@ impl Web3Rpcs { active_request_handles: Vec, method: &str, params: &P, - // TODO: remove this box once i figure out how to do the options - ) -> Result, ProviderError> { + max_wait: Option, + ) -> Result, Web3ProxyError> { // TODO: if only 1 active_request_handles, do self.try_send_request? + let max_wait = max_wait.unwrap_or_else(|| Duration::from_secs(300)); + // TODO: iter stream let responses = active_request_handles .into_iter() .map(|active_request_handle| async move { - let result: Result, _> = - active_request_handle.request(method, &json!(¶ms)).await; - result + let result: Result, Web3ProxyError>, Web3ProxyError> = + timeout( + max_wait, + active_request_handle + .request(method, &json!(¶ms)) + .map_err(Web3ProxyError::EthersProvider), + ) + .await + .map_err(Web3ProxyError::from); + + result.flatten() }) .collect::>() - .collect::, ProviderError>>>() + .collect::, Web3ProxyError>>>() .await; // TODO: Strings are not great keys, but we can't use RawValue or ProviderError as keys because they don't implement Hash or Eq @@ -1123,7 +1133,12 @@ impl Web3Rpcs { } let x = self - .try_send_parallel_requests(active_request_handles, method, params) + .try_send_parallel_requests( + active_request_handles, + method, + params, + max_wait, + ) .await?; return Ok(x);