diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index 11176be9..6b0b5b89 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -21,7 +21,7 @@ use std::sync::{atomic, Arc}; use std::time::Duration; use tokio::select; use tokio::task::yield_now; -use tokio::time::{sleep_until, Instant}; +use tokio::time::{sleep, sleep_until, Instant}; use tracing::{debug, enabled, info, trace, warn, Level}; #[derive(Clone, Debug, Serialize)] @@ -92,7 +92,8 @@ impl PartialOrd for RpcRanking { pub enum ShouldWaitForBlock { Ready, // BackupReady, - Wait { current: Option }, + /// how many blocks you will have to wait + Wait(U64), // WaitForBackup { current: Option }, NeverReady, } @@ -337,6 +338,10 @@ impl RankedRpcs { min_block_num: Option, max_block_num: Option, ) -> ShouldWaitForBlock { + if !rpc.healthy.load(atomic::Ordering::Relaxed) { + return ShouldWaitForBlock::NeverReady; + } + if let Some(min_block_num) = min_block_num { if !self.has_block_data(rpc, min_block_num) { trace!( @@ -354,9 +359,9 @@ impl RankedRpcs { Ordering::Less => { trace!("{} is behind. let it catch up", rpc); // TODO: what if this is a pruned rpc that is behind by a lot, and the block is old, too? - return ShouldWaitForBlock::Wait { - current: Some(rpc_data.head_block_num), - }; + return ShouldWaitForBlock::Wait( + needed_block_num - rpc_data.head_block_num, + ); } Ordering::Greater | Ordering::Equal => { // rpc is synced past the needed block. make sure the block isn't too old for it @@ -391,6 +396,10 @@ impl RankedRpcs { return false; } + if !rpc.healthy.load(atomic::Ordering::Relaxed) { + return false; + } + if let Some(min_block_needed) = min_block_needed { if !self.has_block_data(rpc, min_block_needed) { trace!( @@ -1015,13 +1024,15 @@ impl RpcsForRequest { // TODO: do this without having 3 Vecs let mut filtered = Vec::with_capacity(max_len); - let mut attempted = Vec::with_capacity(max_len); - let mut failed = Vec::with_capacity(max_len); + let mut attempted = HashSet::with_capacity(max_len); + let mut completed = HashSet::with_capacity(max_len); // todo!("be sure to set server_error if we exit without any rpcs!"); loop { if self.request.connect_timeout() { break; + } else { + yield_now().await; } let mut earliest_retry_at = None; @@ -1032,11 +1043,11 @@ impl RpcsForRequest { attempted.clear(); - while attempted.len() + failed.len() < rpcs.len() { + while attempted.len() + completed.len() < rpcs.len() { filtered.clear(); // TODO: i'd like to do this without the collect, but since we push into `attempted`, having `attempted.contains` causes issues - filtered.extend(rpcs.iter().filter(|x| !(attempted.contains(x) || failed.contains(x)))); + filtered.extend(rpcs.iter().filter(|x| !(attempted.contains(x) || completed.contains(x)))); // tuple_windows doesn't do anything for single item iters. make the code DRY by just having it compare itself if filtered.len() == 1 { @@ -1052,14 +1063,13 @@ impl RpcsForRequest { // TODO: should x.next_available should be Reverse<_>? let best_rpc = best_rpc(rpc_a, rpc_b); - attempted.push(best_rpc); - match best_rpc .try_request_handle(&self.request, error_handler, false) .await { Ok(OpenRequestResult::Handle(handle)) => { trace!("opened handle: {}", best_rpc); + completed.insert(best_rpc); yield handle; } Ok(OpenRequestResult::RetryAt(retry_at)) => { @@ -1068,13 +1078,14 @@ impl RpcsForRequest { best_rpc, retry_at.duration_since(Instant::now()).as_secs_f32() ); - - earliest_retry_at = earliest_retry_at.min(Some(retry_at)); + attempted.insert(best_rpc); + earliest_retry_at = earliest_retry_at.min(Some(retry_at, )); continue; } Ok(OpenRequestResult::Lagged(x)) => { - trace!("{} is lagged. will not work now", best_rpc); // this will probably always be the same block, right? + trace!("{} is lagged. will not work now", best_rpc); + attempted.insert(best_rpc); if wait_for_sync.is_none() { wait_for_sync = Some(x); } @@ -1083,34 +1094,40 @@ impl RpcsForRequest { Ok(OpenRequestResult::Failed) => { // TODO: log a warning? emit a stat? trace!("best_rpc not ready: {}", best_rpc); - attempted.pop(); - failed.push(best_rpc); + completed.insert(best_rpc); continue; } Err(err) => { trace!("No request handle for {}. err={:?}", best_rpc, err); - attempted.pop(); - failed.push(best_rpc); + completed.insert(best_rpc); continue; } } } - debug_assert!(!attempted.is_empty()); + debug_assert!(!(attempted.is_empty() && completed.is_empty())); } + + yield_now().await; } // if we got this far, no inner or outer rpcs are ready. thats suprising since an inner should have been // clear earliest_retry_at if it is too far in the future to help us if let Some(retry_at) = earliest_retry_at { - if self.request.connect_timeout_at() <= retry_at { + if retry_at > self.request.connect_timeout_at() { // no point in waiting. it wants us to wait too long earliest_retry_at = None; } + + let now = Instant::now(); + if retry_at < now { + warn!("this seems like a problem and not something we should just sleep on"); + earliest_retry_at = Some(now + Duration::from_millis(100)) + } } - // TODO: i think theres a bug here so i + // TODO: i think theres a bug here match (wait_for_sync, earliest_retry_at) { (None, None) => { // we have nothing to wait for. uh oh! @@ -1118,12 +1135,7 @@ impl RpcsForRequest { } (None, Some(retry_at)) => { // try again after rate limits are done - if retry_at > Instant::now() { - sleep_until(retry_at).await; - } else { - // TODO: why is this happening? why would we get rate limited to now? it should be like a second at minimum - yield_now().await; - } + sleep_until(retry_at).await; } (Some(wait_for_sync), None) => { select! { @@ -1140,7 +1152,7 @@ impl RpcsForRequest { }, } } - _ = sleep_until(self.request.expire_at()) => { + _ = sleep_until(self.request.connect_timeout_at()) => { break; } } @@ -1161,8 +1173,6 @@ impl RpcsForRequest { } } _ = sleep_until(retry_at) => { - // if sleep didn't have to wait at all, something seems wrong. have a minimum wait? - yield_now().await; continue; } }