From c27f0d065b8fc0962e1649af7e31da553bb5537b Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Tue, 10 Oct 2023 18:41:31 -0700 Subject: [PATCH] try wait_for_sync a new way with a min sleep --- web3_proxy/src/rpcs/consensus.rs | 107 +++++++++++-------------------- 1 file changed, 39 insertions(+), 68 deletions(-) diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index 6b0b5b89..9fc5f7c6 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -9,7 +9,8 @@ use async_stream::stream; use base64::engine::general_purpose; use derive_more::Constructor; use ethers::prelude::{H256, U64}; -use futures_util::Stream; +use futures::stream::FuturesUnordered; +use futures::{Stream, StreamExt}; use hashbrown::{HashMap, HashSet}; use hdrhistogram::serialization::{Serializer, V2DeflateSerializer}; use hdrhistogram::Histogram; @@ -21,8 +22,8 @@ use std::sync::{atomic, Arc}; use std::time::Duration; use tokio::select; use tokio::task::yield_now; -use tokio::time::{sleep, sleep_until, Instant}; -use tracing::{debug, enabled, info, trace, warn, Level}; +use tokio::time::{sleep_until, Instant}; +use tracing::{debug, enabled, error, info, trace, warn, Level}; #[derive(Clone, Debug, Serialize)] struct ConsensusRpcData { @@ -1036,7 +1037,7 @@ impl RpcsForRequest { } let mut earliest_retry_at = None; - let mut wait_for_sync = None; + let mut wait_for_sync = FuturesUnordered::new(); // first check the inners, then the outers for rpcs in [&self.inner, &self.outer] { @@ -1080,27 +1081,21 @@ impl RpcsForRequest { ); attempted.insert(best_rpc); earliest_retry_at = earliest_retry_at.min(Some(retry_at, )); - continue; } Ok(OpenRequestResult::Lagged(x)) => { // this will probably always be the same block, right? trace!("{} is lagged. will not work now", best_rpc); attempted.insert(best_rpc); - if wait_for_sync.is_none() { - wait_for_sync = Some(x); - } - continue; + wait_for_sync.push(x); } Ok(OpenRequestResult::Failed) => { // TODO: log a warning? emit a stat? trace!("best_rpc not ready: {}", best_rpc); completed.insert(best_rpc); - continue; } Err(err) => { trace!("No request handle for {}. err={:?}", best_rpc, err); completed.insert(best_rpc); - continue; } } } @@ -1113,69 +1108,45 @@ impl RpcsForRequest { // if we got this far, no inner or outer rpcs are ready. thats suprising since an inner should have been + let min_wait_until = Instant::now() + Duration::from_millis(100); + // clear earliest_retry_at if it is too far in the future to help us if let Some(retry_at) = earliest_retry_at { - if retry_at > self.request.connect_timeout_at() { - // no point in waiting. it wants us to wait too long - earliest_retry_at = None; - } - - let now = Instant::now(); - if retry_at < now { - warn!("this seems like a problem and not something we should just sleep on"); - earliest_retry_at = Some(now + Duration::from_millis(100)) - } + // set a minimum of 100ms. this is probably actually a bug we should figure out. + earliest_retry_at = Some(retry_at.max(min_wait_until)); } - // TODO: i think theres a bug here - match (wait_for_sync, earliest_retry_at) { - (None, None) => { - // we have nothing to wait for. uh oh! - break; - } - (None, Some(retry_at)) => { - // try again after rate limits are done - sleep_until(retry_at).await; - } - (Some(wait_for_sync), None) => { - select! { - x = wait_for_sync => { - match x { - Ok(rpc) => { - trace!(%rpc, "rpc ready. it might be used on the next loop"); - // TODO: try a handle now? - continue; - }, - Err(err) => { - trace!(?err, "problem while waiting for an rpc for a request"); - break; - }, + let retry_at = earliest_retry_at.min(Some(self.request.connect_timeout_at())).expect("retry_at always set"); + + if wait_for_sync.is_empty() { + sleep_until(retry_at).await; + } else { + select!{ + x = wait_for_sync.next() => { + match x { + Some(Ok(rpc)) => { + trace!(%rpc, "rpc ready. it might be used on the next loop"); + + // TODO: i don't think this sleep should be necessary. but i just want the cpus to cool down + sleep_until(min_wait_until).await; + }, + Some(Err(err)) => { + error!(?err, "problem while waiting for an rpc for a request"); + + // TODO: break or continue? + // TODO: i don't think this sleep should be necessary. but i just want the cpus to cool down + sleep_until(min_wait_until).await; + }, + None => { + // this would only happen if we got to the end of wait_for_sync. but we stop after the first result + warn!("wait_for_sync is empty. how'd this happen?"); + break; } } - _ = sleep_until(self.request.connect_timeout_at()) => { - break; - } - } - } - (Some(wait_for_sync), Some(retry_at)) => { - select! { - x = wait_for_sync => { - match x { - Ok(rpc) => { - trace!(%rpc, "rpc ready. it might be used on the next loop"); - // TODO: try a handle now? - continue; - }, - Err(err) => { - trace!(?err, "problem while waiting for an rpc for a request"); - break; - }, - } - } - _ = sleep_until(retry_at) => { - continue; - } - } + }, + _ = sleep_until(retry_at) => { + // we've waited long enough that trying again might work + }, } } }