diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index 613e259a..744ce25f 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -21,36 +21,10 @@ use std::cmp::{min_by_key, Ordering, Reverse}; use std::sync::{atomic, Arc}; use std::time::Duration; use tokio::select; +use tokio::task::yield_now; use tokio::time::{sleep_until, Instant}; use tracing::{debug, enabled, error, info, trace, warn, Level}; -#[derive(Clone, Debug, Serialize)] -struct ConsensusRpcData { - head_block_num: U64, - // TODO: this is too simple. erigon has 4 prune levels (hrct) - oldest_block_num: U64, -} - -impl ConsensusRpcData { - fn new(rpc: &Web3Rpc, head: &Web3ProxyBlock) -> Self { - let head_block_num = head.number(); - - let block_data_limit = rpc.block_data_limit(); - - let oldest_block_num = head_block_num.saturating_sub(block_data_limit); - - Self { - head_block_num, - oldest_block_num, - } - } - - // TODO: take an enum for the type of data (hrtc) - fn data_available(&self, block_num: U64) -> bool { - block_num >= self.oldest_block_num && block_num <= self.head_block_num - } -} - #[derive(Constructor, Clone, Copy, Debug, Default, Eq, PartialEq, Serialize)] pub struct RpcRanking { backup: bool, @@ -117,17 +91,12 @@ pub struct RankedRpcs { inner: Vec>, - // TODO: make serializing work. the key needs to be a string. I think we need `serialize_with` - #[serde(skip_serializing)] - rpc_data: HashMap, ConsensusRpcData>, - sort_mode: SortMethod, } // TODO: could these be refs? The owning RankedRpcs lifetime might work. `stream!` might make it complicated pub struct RpcsForRequest { inner: Vec>, - outer: Vec>, request: Arc, } @@ -142,14 +111,6 @@ impl RankedRpcs { let num_synced = rpcs.len(); - let rpc_data = Default::default(); - // TODO: do we need real data in rpc_data? if we are calling from_rpcs, we probably don't even track their block - // let mut rpc_data = HashMap::, ConsensusRpcData>::with_capacity(num_synced); - // for rpc in rpcs.iter().cloned() { - // let data = ConsensusRpcData::new(&rpc, &head_block); - // rpc_data.insert(rpc, data); - // } - let sort_mode = SortMethod::Shuffle; let ranked_rpcs = RankedRpcs { @@ -157,7 +118,6 @@ impl RankedRpcs { head_block, inner: rpcs, num_synced, - rpc_data, sort_mode, }; @@ -199,17 +159,12 @@ impl RankedRpcs { // return the first result that exceededs confgured minimums (if any) if let Some((best_block, _, best_rpcs)) = votes.into_iter().next() { let mut ranked_rpcs: Vec<_> = best_rpcs.into_iter().map(Arc::clone).collect(); - let mut rpc_data = HashMap::new(); let backups_needed = ranked_rpcs.iter().any(|x| x.backup); let num_synced = ranked_rpcs.len(); // TODO: add all the unsynced rpcs for (x, x_head) in heads.iter() { - let data = ConsensusRpcData::new(x, x_head); - - rpc_data.insert(x.clone(), data); - if ranked_rpcs.contains(x) { continue; } @@ -219,6 +174,8 @@ impl RankedRpcs { continue; } + // TODO: max age here too? + ranked_rpcs.push(x.clone()); } @@ -230,7 +187,6 @@ impl RankedRpcs { let consensus = RankedRpcs { backups_needed, head_block: best_block, - rpc_data, sort_mode, inner: ranked_rpcs, num_synced, @@ -250,64 +206,59 @@ impl RankedRpcs { let head_block = self.head_block.number(); // these are bigger than we need, but how much does that matter? - let mut inner = Vec::with_capacity(self.num_active_rpcs()); - let mut outer = Vec::with_capacity(self.num_active_rpcs()); + let mut inner_for_request = Vec::>::with_capacity(self.num_active_rpcs()); + // TODO: what if min is set to some future block? + // TODO: what if max is set to some future block? + let min_block_needed = web3_request.min_block_needed(); let max_block_needed = web3_request.max_block_needed(); + // TODO: max lag was already handled + for rpc in self.inner.iter() { + if let Some(block_needed) = min_block_needed { + if !rpc.has_block_data(block_needed) { + continue; + } + } + if let Some(block_needed) = max_block_needed { + if !rpc.has_block_data(block_needed) { + continue; + } + } + + inner_for_request.push(rpc.clone()); + } + + let now = Instant::now(); + match self.sort_mode { SortMethod::Shuffle => { // if we are shuffling, it is because we don't watch the head_blocks of the rpcs // clone all of the rpcs - self.inner.clone_into(&mut inner); - let mut rng = nanorand::tls_rng(); - // we use shuffle instead of sort. we will compare weights during `RpcsForRequest::to_stream` + // we use shuffle instead of sort si that the load gets spread around more + // we will still compare weights during `RpcsForRequest::to_stream` // TODO: use web3_request.start_instant? I think we want it to be as recent as possible - let now = Instant::now(); - inner.sort_by_cached_key(|x| { + inner_for_request.sort_by_cached_key(|x| { x.shuffle_for_load_balancing_on(max_block_needed, &mut rng, now) }); } SortMethod::Sort => { - // TODO: what if min is set to some future block? - let min_block_needed = web3_request.min_block_needed(); - - // TODO: max lag from config - let recent_block_needed = head_block.saturating_sub(U64::from(5)); - - for rpc in &self.inner { - if self.has_block_data(rpc, recent_block_needed) { - match self.rpc_will_work_eventually(rpc, min_block_needed, max_block_needed) - { - ShouldWaitForBlock::NeverReady => continue, - ShouldWaitForBlock::Ready => inner.push(rpc.clone()), - ShouldWaitForBlock::Wait { .. } => outer.push(rpc.clone()), - } - } - } - - let now = Instant::now(); - - // we use shuffle instead of sort. we will compare weights during `RpcsForRequest::to_stream` - inner.sort_by_cached_key(|x| x.sort_for_load_balancing_on(max_block_needed, now)); - outer.sort_by_cached_key(|x| x.sort_for_load_balancing_on(max_block_needed, now)); + // we sort so that the best nodes are always preferred. we will compare weights during `RpcsForRequest::to_stream` + inner_for_request + .sort_by_cached_key(|x| x.sort_for_load_balancing_on(max_block_needed, now)); } } - // TODO: turn these back on - outer.clear(); - - if inner.is_empty() && outer.is_empty() { - warn!(?inner, ?outer, %web3_request, %head_block, "no rpcs for request"); + if inner_for_request.is_empty() { + warn!(?inner_for_request, %web3_request, %head_block, "no rpcs for request"); None } else { - trace!(?inner, ?outer, %web3_request, "for_request"); + trace!(?inner_for_request, %web3_request, "for_request"); Some(RpcsForRequest { - inner, - outer, + inner: inner_for_request, request: web3_request.clone(), }) } @@ -327,109 +278,6 @@ impl RankedRpcs { self.inner.len() } - pub fn has_block_data(&self, rpc: &Web3Rpc, block_num: U64) -> bool { - self.rpc_data - .get(rpc) - .map(|x| x.data_available(block_num)) - .unwrap_or(false) - } - - // TODO: take method as a param, too. mark nodes with supported methods (maybe do it optimistically? on error mark them as not supporting it) - pub fn rpc_will_work_eventually( - &self, - rpc: &Arc, - min_block_num: Option, - max_block_num: Option, - ) -> ShouldWaitForBlock { - if !rpc.healthy.load(atomic::Ordering::Relaxed) { - return ShouldWaitForBlock::NeverReady; - } - - if let Some(min_block_num) = min_block_num { - if !self.has_block_data(rpc, min_block_num) { - trace!( - "{} is missing min_block_num ({}). will not work eventually", - rpc, - min_block_num, - ); - return ShouldWaitForBlock::NeverReady; - } - } - - if let Some(needed_block_num) = max_block_num { - if let Some(rpc_data) = self.rpc_data.get(rpc) { - match rpc_data.head_block_num.cmp(&needed_block_num) { - Ordering::Less => { - trace!("{} is behind. let it catch up", rpc); - // TODO: what if this is a pruned rpc that is behind by a lot, and the block is old, too? - return ShouldWaitForBlock::Wait( - needed_block_num - rpc_data.head_block_num, - ); - } - Ordering::Greater | Ordering::Equal => { - // rpc is synced past the needed block. make sure the block isn't too old for it - if self.has_block_data(rpc, needed_block_num) { - trace!("{} has {}", rpc, needed_block_num); - return ShouldWaitForBlock::Ready; - } else { - trace!("{} does not have {}", rpc, needed_block_num); - return ShouldWaitForBlock::NeverReady; - } - } - } - } - warn!("no rpc data for this {}. thats not promising", rpc); - ShouldWaitForBlock::NeverReady - } else { - // if no needed_block_num was specified, then this should work - ShouldWaitForBlock::Ready - } - } - - // TODO: this should probably be on the rpcs as "can_serve_request" - // TODO: and it should take the method into account, too - pub fn rpc_will_work_now( - &self, - min_block_needed: Option, - max_block_needed: Option, - rpc: &Arc, - ) -> bool { - if rpc.backup && !self.backups_needed { - // skip backups unless backups are needed for ranked_rpcs to exist - return false; - } - - if !rpc.healthy.load(atomic::Ordering::Relaxed) { - return false; - } - - if let Some(min_block_needed) = min_block_needed { - if !self.has_block_data(rpc, min_block_needed) { - trace!( - "{} is missing min_block_needed ({}). will not work now", - rpc, - min_block_needed, - ); - return false; - } - } - - if let Some(max_block_needed) = max_block_needed { - if !self.has_block_data(rpc, max_block_needed) { - trace!( - "{} is missing max_block_needed ({}). will not work now", - rpc, - max_block_needed, - ); - return false; - } - } - - // rate limit are handled by sort order - - true - } - // TODO: sum_hard_limit? } @@ -1018,7 +866,7 @@ impl RpcsForRequest { // let error_handler = web3_request.authorization.error_handler; let error_handler = None; - let max_len = self.inner.len() + self.outer.len(); + let max_len = self.inner.len(); // TODO: do this without having 3 Vecs let mut filtered = Vec::with_capacity(max_len); @@ -1029,73 +877,74 @@ impl RpcsForRequest { loop { if self.request.connect_timeout() { break; + } else { + yield_now().await; } let mut earliest_retry_at = None; let mut wait_for_sync = FuturesUnordered::new(); // first check the inners, then the outers - for rpcs in [&self.inner, &self.outer] { - attempted.clear(); + attempted.clear(); - while attempted.len() + completed.len() < rpcs.len() { - filtered.clear(); + while attempted.len() + completed.len() < self.inner.len() { + filtered.clear(); - // TODO: i'd like to do this without the collect, but since we push into `attempted`, having `attempted.contains` causes issues - filtered.extend(rpcs.iter().filter(|x| !(attempted.contains(x) || completed.contains(x)))); + // TODO: i'd like to do this without the collect, but since we push into `attempted`, having `attempted.contains` causes issues + filtered.extend(self.inner.iter().filter(|x| !(attempted.contains(x) || completed.contains(x)))); - // tuple_windows doesn't do anything for single item iters. make the code DRY by just having it compare itself - if filtered.len() == 1 { - filtered.push(filtered[0]); - } + // tuple_windows doesn't do anything for single item iters. make the code DRY by just having it compare itself + if filtered.len() == 1 { + filtered.push(filtered[0]); + } - for (rpc_a, rpc_b) in filtered.iter().tuple_windows() { - // TODO: ties within X% to the server with the smallest block_data_limit? - // find rpc with the lowest weighted peak latency. backups always lose. rate limits always lose - // TODO: should next_available be reversed? - // TODO: this is similar to sort_for_load_balancing_on, but at this point we don't want to prefer tiers - // TODO: move ethis to a helper function just so we can test it - // TODO: should x.next_available should be Reverse<_>? - let best_rpc = best_rpc(rpc_a, rpc_b); + for (rpc_a, rpc_b) in filtered.iter().tuple_windows() { + // TODO: ties within X% to the server with the smallest block_data_limit? + // find rpc with the lowest weighted peak latency. backups always lose. rate limits always lose + // TODO: should next_available be reversed? + // TODO: this is similar to sort_for_load_balancing_on, but at this point we don't want to prefer tiers + // TODO: move ethis to a helper function just so we can test it + // TODO: should x.next_available should be Reverse<_>? + let best_rpc = best_rpc(rpc_a, rpc_b); - match best_rpc - .try_request_handle(&self.request, error_handler, false) - .await - { - Ok(OpenRequestResult::Handle(handle)) => { - trace!("opened handle: {}", best_rpc); - completed.insert(best_rpc); - yield handle; - } - Ok(OpenRequestResult::RetryAt(retry_at)) => { - trace!( - "retry on {} @ {}", - best_rpc, - retry_at.duration_since(Instant::now()).as_secs_f32() - ); - attempted.insert(best_rpc); - earliest_retry_at = earliest_retry_at.min(Some(retry_at, )); - } - Ok(OpenRequestResult::Lagged(x)) => { - // this will probably always be the same block, right? - trace!("{} is lagged. will not work now", best_rpc); - attempted.insert(best_rpc); - wait_for_sync.push(x); - } - Ok(OpenRequestResult::Failed) => { - // TODO: log a warning? emit a stat? - trace!("best_rpc not ready: {}", best_rpc); - completed.insert(best_rpc); - } - Err(err) => { - trace!("No request handle for {}. err={:?}", best_rpc, err); - completed.insert(best_rpc); - } + match best_rpc + .try_request_handle(&self.request, error_handler, false) + .await + { + Ok(OpenRequestResult::Handle(handle)) => { + trace!("opened handle: {}", best_rpc); + completed.insert(best_rpc); + yield handle; + } + Ok(OpenRequestResult::RetryAt(retry_at)) => { + trace!( + "retry on {} @ {}", + best_rpc, + retry_at.duration_since(Instant::now()).as_secs_f32() + ); + attempted.insert(best_rpc); + earliest_retry_at = earliest_retry_at.min(Some(retry_at, )); + } + Ok(OpenRequestResult::Lagged(x)) => { + // this will probably always be the same block, right? + trace!("{} is lagged. will not work now", best_rpc); + attempted.insert(best_rpc); + wait_for_sync.push(x); + } + Ok(OpenRequestResult::Failed) => { + // TODO: log a warning? emit a stat? + trace!("best_rpc not ready: {}", best_rpc); + completed.insert(best_rpc); + } + Err(err) => { + trace!("No request handle for {}. err={:?}", best_rpc, err); + completed.insert(best_rpc); } } - - debug_assert!(!(attempted.is_empty() && completed.is_empty())); } + + debug_assert!(!(attempted.is_empty() && completed.is_empty())); + } // if we got this far, no inner or outer rpcs are ready. thats suprising since an inner should have been ready. maybe it got rate limited @@ -1105,11 +954,15 @@ impl RpcsForRequest { // clear earliest_retry_at if it is too far in the future to help us if let Some(retry_at) = earliest_retry_at { + let corrected = retry_at.max(min_wait_until).min(self.request.connect_timeout_at()); + // set a minimum of 100ms. this is probably actually a bug we should figure out. - earliest_retry_at = Some(retry_at.max(min_wait_until)); + earliest_retry_at = Some(corrected); + } else { + earliest_retry_at = Some(self.request.connect_timeout_at()); } - let retry_at = earliest_retry_at.min(Some(self.request.connect_timeout_at())).expect("retry_at always set"); + let retry_at = earliest_retry_at.expect("retry_at should always be set by now"); if wait_for_sync.is_empty() { sleep_until(retry_at).await;