try_send_all_synced_connections for getting transactions

This commit is contained in:
Bryan Stitt 2023-10-03 21:35:25 -07:00
parent 633177c0a6
commit c1c127e6bd
2 changed files with 22 additions and 24 deletions

View File

@ -1365,15 +1365,15 @@ impl Web3ProxyApp {
// TODO: timeout // TODO: timeout
// TODO: change this to send serially until we get a success // TODO: change this to send serially until we get a success
let result = self let mut result = self
.balanced_rpcs .balanced_rpcs
.try_proxy_connection::<serde_json::Value>( .try_send_all_synced_connections::<serde_json::Value>(
web3_request, web3_request,
Some(Duration::from_secs(30)),
Some(crate::rpcs::request::RequestErrorHandler::DebugLevel),
None,
) )
.await? .await;
.parsed()
.await?
.into_result();
// if we got "null" or "", it is probably because the tx is old. retry on nodes with old block data // if we got "null" or "", it is probably because the tx is old. retry on nodes with old block data
// TODO: this feels fragile. how should we do this better/ // TODO: this feels fragile. how should we do this better/
@ -1391,22 +1391,21 @@ impl Web3ProxyApp {
.archive_request .archive_request
.store(true, atomic::Ordering::Relaxed); .store(true, atomic::Ordering::Relaxed);
self // TODO: we don't actually want try_send_all. we want the first non-null, non-error response
result = self
.balanced_rpcs .balanced_rpcs
.try_proxy_connection::<Arc<RawValue>>( .try_send_all_synced_connections::<serde_json::Value>(
web3_request, web3_request,
// Some(Duration::from_secs(30)), Some(Duration::from_secs(30)),
// // TODO: should this be block 0 instead? Some(crate::rpcs::request::RequestErrorHandler::DebugLevel),
// Some(&U64::one()), None,
// // TODO: is this a good way to allow lagged archive nodes a try
// Some(&head_block_num.saturating_sub(5.into()).clamp(U64::one(), U64::MAX)),
) )
.await? .await;
} else {
jsonrpc::ParsedResponse::from_value(result?, web3_request.id()).into()
} }
// TODO: if parsed is an error, return a null instead // TODO: if parsed is an error, return a null instead
jsonrpc::ParsedResponse::from_value(result?, web3_request.id()).into()
} }
// TODO: eth_gasPrice that does awesome magic to predict the future // TODO: eth_gasPrice that does awesome magic to predict the future
"eth_hashrate" => jsonrpc::ParsedResponse::from_value(json!(U64::zero()), web3_request.id()).into(), "eth_hashrate" => jsonrpc::ParsedResponse::from_value(json!(U64::zero()), web3_request.id()).into(),

View File

@ -22,7 +22,6 @@ use moka::future::CacheBuilder;
use parking_lot::RwLock; use parking_lot::RwLock;
use serde::ser::{SerializeStruct, Serializer}; use serde::ser::{SerializeStruct, Serializer};
use serde::Serialize; use serde::Serialize;
use serde_json::value::RawValue;
use std::borrow::Cow; use std::borrow::Cow;
use std::cmp::min_by_key; use std::cmp::min_by_key;
use std::fmt::{self, Display}; use std::fmt::{self, Display};
@ -370,12 +369,12 @@ impl Web3Rpcs {
} }
/// Send the same request to all the handles. Returning the most common success or most common error. /// Send the same request to all the handles. Returning the most common success or most common error.
/// TODO: option to return the fastest response and handles for all the others instead? /// TODO: option to return the fastest (non-null, non-error) response and handles for all the others instead?
pub async fn try_send_parallel_requests( pub async fn try_send_parallel_requests<R: JsonRpcResultData>(
&self, &self,
active_request_handles: Vec<OpenRequestHandle>, active_request_handles: Vec<OpenRequestHandle>,
max_wait: Option<Duration>, max_wait: Option<Duration>,
) -> Result<Arc<RawValue>, Web3ProxyError> { ) -> Result<R, Web3ProxyError> {
// TODO: if only 1 active_request_handles, do self.try_send_request? // TODO: if only 1 active_request_handles, do self.try_send_request?
let max_wait = max_wait.unwrap_or_else(|| Duration::from_secs(300)); let max_wait = max_wait.unwrap_or_else(|| Duration::from_secs(300));
@ -384,7 +383,7 @@ impl Web3Rpcs {
let responses = active_request_handles let responses = active_request_handles
.into_iter() .into_iter()
.map(|active_request_handle| async move { .map(|active_request_handle| async move {
let result: Result<Result<Arc<RawValue>, Web3ProxyError>, Web3ProxyError> = let result: Result<Result<R, Web3ProxyError>, Web3ProxyError> =
timeout(max_wait, async { timeout(max_wait, async {
match active_request_handle.request().await { match active_request_handle.request().await {
Ok(response) => match response.parsed().await { Ok(response) => match response.parsed().await {
@ -400,7 +399,7 @@ impl Web3Rpcs {
result.flatten() result.flatten()
}) })
.collect::<FuturesUnordered<_>>() .collect::<FuturesUnordered<_>>()
.collect::<Vec<Result<Arc<RawValue>, Web3ProxyError>>>() .collect::<Vec<Result<R, Web3ProxyError>>>()
.await; .await;
// TODO: Strings are not great keys, but we can't use RawValue or ProviderError as keys because they don't implement Hash or Eq // TODO: Strings are not great keys, but we can't use RawValue or ProviderError as keys because they don't implement Hash or Eq
@ -1049,13 +1048,13 @@ impl Web3Rpcs {
/// be sure there is a timeout on this or it might loop forever /// be sure there is a timeout on this or it might loop forever
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
pub async fn try_send_all_synced_connections( pub async fn try_send_all_synced_connections<R: JsonRpcResultData>(
self: &Arc<Self>, self: &Arc<Self>,
web3_request: &Arc<Web3Request>, web3_request: &Arc<Web3Request>,
max_wait: Option<Duration>, max_wait: Option<Duration>,
error_level: Option<RequestErrorHandler>, error_level: Option<RequestErrorHandler>,
max_sends: Option<usize>, max_sends: Option<usize>,
) -> Web3ProxyResult<Arc<RawValue>> { ) -> Web3ProxyResult<R> {
let mut watch_consensus_rpcs = self.watch_ranked_rpcs.subscribe(); let mut watch_consensus_rpcs = self.watch_ranked_rpcs.subscribe();
// todo!() we are inconsistent with max_wait and web3_request.expires_at // todo!() we are inconsistent with max_wait and web3_request.expires_at