improve eth_sendRawTransaction and other timeouts

This commit is contained in:
Bryan Stitt 2023-08-03 11:39:48 -07:00
parent 74224977b7
commit 690601643d
3 changed files with 86 additions and 64 deletions

@ -1106,9 +1106,9 @@ impl Web3ProxyApp {
Some(request_metadata),
None,
None,
Some(Duration::from_secs(30)),
Some(Duration::from_secs(10)),
Some(Level::TRACE.into()),
None,
Some(3),
)
.await;
@ -1136,7 +1136,7 @@ impl Web3ProxyApp {
Some(request_metadata),
None,
None,
Some(Duration::from_secs(30)),
Some(Duration::from_secs(10)),
Some(Level::TRACE.into()),
num_public_rpcs,
)
@ -1164,56 +1164,66 @@ impl Web3ProxyApp {
// turn some of the Web3ProxyErrors into Ok results
// TODO: move this into a helper function
let (code, response_data) = match self
._proxy_request_with_caching(
&request.method,
&mut request.params,
head_block,
&request_metadata,
)
.await
{
Ok(response_data) => {
request_metadata
.error_response
.store(false, Ordering::Release);
let max_tries = 3;
let mut tries = 0;
loop {
let (code, response_data) = match self
._proxy_request_with_caching(
&request.method,
&mut request.params,
head_block,
&request_metadata,
)
.await
{
Ok(response_data) => {
request_metadata
.error_response
.store(false, Ordering::Release);
(StatusCode::OK, response_data)
}
Err(err @ Web3ProxyError::NullJsonRpcResult) => {
request_metadata
.error_response
.store(false, Ordering::Release);
(StatusCode::OK, response_data)
}
Err(err @ Web3ProxyError::NullJsonRpcResult) => {
request_metadata
.error_response
.store(false, Ordering::Release);
err.as_response_parts()
}
Err(Web3ProxyError::JsonRpcResponse(response_data)) => {
request_metadata
.error_response
.store(response_data.is_error(), Ordering::Release);
err.as_response_parts()
}
Err(Web3ProxyError::JsonRpcResponse(response_data)) => {
request_metadata
.error_response
.store(response_data.is_error(), Ordering::Release);
(StatusCode::OK, response_data)
}
Err(err) => {
request_metadata
.error_response
.store(true, Ordering::Release);
(StatusCode::OK, response_data)
}
Err(err) => {
tries += 1;
if tries < max_tries {
// try again
continue
}
err.as_response_parts()
}
};
request_metadata
.error_response
.store(true, Ordering::Release);
let response = JsonRpcForwardedResponse::from_response_data(response_data, response_id);
err.as_response_parts()
}
};
// TODO: this serializes twice :/
request_metadata.add_response(ResponseOrBytes::Response(&response));
let response = JsonRpcForwardedResponse::from_response_data(response_data, response_id);
let rpcs = request_metadata.backend_rpcs_used();
// TODO: this serializes twice :/
request_metadata.add_response(ResponseOrBytes::Response(&response));
// there might be clones in the background, so this isn't a sure thing
let _ = request_metadata.try_send_arc_stat();
let rpcs = request_metadata.backend_rpcs_used();
(code, response, rpcs)
// there might be clones in the background, so this isn't a sure thing
let _ = request_metadata.try_send_arc_stat();
return (code, response, rpcs)
}
}
/// main logic for proxy_cached_request but in a dedicated function so the try operator is easy to use
@ -1453,16 +1463,12 @@ impl Web3ProxyApp {
// TODO: error if the chain_id is incorrect
let response = timeout(
Duration::from_secs(30),
self
.try_send_protected(
method,
params,
request_metadata,
)
)
.await?;
let response = self
.try_send_protected(
method,
params,
request_metadata,
).await;
let mut response = response.try_into()?;

@ -1,6 +1,7 @@
#![feature(lazy_cell)]
#![feature(let_chains)]
#![feature(trait_alias)]
#![feature(result_flattening)]
#![forbid(unsafe_code)]
pub mod admin_queries;

@ -12,10 +12,10 @@ use crate::frontend::status::MokaCacheSerializer;
use crate::jsonrpc::{JsonRpcErrorData, JsonRpcParams, JsonRpcResultData};
use counter::Counter;
use derive_more::From;
use ethers::prelude::{ProviderError, U64};
use ethers::prelude::U64;
use futures::future::try_join_all;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use futures::{StreamExt, TryFutureExt};
use hashbrown::HashMap;
use itertools::Itertools;
use moka::future::CacheBuilder;
@ -31,7 +31,7 @@ use std::sync::atomic::Ordering;
use std::sync::Arc;
use tokio::select;
use tokio::sync::{mpsc, watch};
use tokio::time::{sleep, sleep_until, Duration, Instant};
use tokio::time::{sleep, sleep_until, timeout, Duration, Instant};
use tracing::{debug, error, info, instrument, trace, warn};
/// A collection of web3 connections. Sends requests either the current best server or all servers.
@ -379,20 +379,30 @@ impl Web3Rpcs {
active_request_handles: Vec<OpenRequestHandle>,
method: &str,
params: &P,
// TODO: remove this box once i figure out how to do the options
) -> Result<Box<RawValue>, ProviderError> {
max_wait: Option<Duration>,
) -> Result<Box<RawValue>, Web3ProxyError> {
// TODO: if only 1 active_request_handles, do self.try_send_request?
let max_wait = max_wait.unwrap_or_else(|| Duration::from_secs(300));
// TODO: iter stream
let responses = active_request_handles
.into_iter()
.map(|active_request_handle| async move {
let result: Result<Box<RawValue>, _> =
active_request_handle.request(method, &json!(&params)).await;
result
let result: Result<Result<Box<RawValue>, Web3ProxyError>, Web3ProxyError> =
timeout(
max_wait,
active_request_handle
.request(method, &json!(&params))
.map_err(Web3ProxyError::EthersProvider),
)
.await
.map_err(Web3ProxyError::from);
result.flatten()
})
.collect::<FuturesUnordered<_>>()
.collect::<Vec<Result<Box<RawValue>, ProviderError>>>()
.collect::<Vec<Result<Box<RawValue>, Web3ProxyError>>>()
.await;
// TODO: Strings are not great keys, but we can't use RawValue or ProviderError as keys because they don't implement Hash or Eq
@ -1123,7 +1133,12 @@ impl Web3Rpcs {
}
let x = self
.try_send_parallel_requests(active_request_handles, method, params)
.try_send_parallel_requests(
active_request_handles,
method,
params,
max_wait,
)
.await?;
return Ok(x);