diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index 31ffe385..5a943f67 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -7,10 +7,10 @@ use derive_more::Constructor; use ethers::prelude::{H256, U64}; use hashbrown::{HashMap, HashSet}; use itertools::{Itertools, MinMaxResult}; -use log::{trace, warn}; +use log::{debug, trace, warn}; use moka::future::Cache; use serde::Serialize; -use std::cmp::Reverse; +use std::cmp::{Ordering, Reverse}; use std::collections::BTreeMap; use std::fmt; use std::sync::Arc; @@ -115,24 +115,76 @@ impl ConsensusWeb3Rpcs { self.best_rpcs.len() } - pub fn best_block_num(&self, skip_rpcs: &[Arc]) -> Option<&U64> { - if self.best_rpcs.iter().all(|rpc| skip_rpcs.contains(rpc)) { + pub fn best_block_num( + &self, + needed_block_num: Option<&U64>, + skip_rpcs: &[Arc], + ) -> Option<&U64> { + // TODO: dry this up with `filter`? + fn _best_block_num_filter( + x: &ConsensusWeb3Rpcs, + rpc: &Arc, + needed_block_num: Option<&U64>, + skip_rpcs: &[Arc], + ) -> bool { + // return true if this rpc will never work for us. "false" is good + if skip_rpcs.contains(rpc) { + // if rpc is skipped, it must have already been determined it is unable to serve the request + true + } else if let Some(needed_block_num) = needed_block_num { + if let Some(rpc_data) = x.rpc_data.get(rpc).as_ref() { + match rpc_data.head_block_num.cmp(needed_block_num) { + Ordering::Less => { + // rpc is not synced. let it catch up + false + } + Ordering::Greater | Ordering::Equal => { + // rpc is synced past the needed block. make sure the block isn't too old for it + !x.has_block_data(rpc, needed_block_num) + } + } + } else { + // no rpc data for this rpc. thats not promising + true + } + } else { + false + } + } + + if self + .best_rpcs + .iter() + .all(|rpc| _best_block_num_filter(self, rpc, needed_block_num, skip_rpcs)) + { // all of the consensus rpcs are skipped // iterate the other rpc tiers to find the next best block - let mut best_block = None; + let mut best_num = None; for (next_ranking, next_rpcs) in self.other_rpcs.iter() { - if next_rpcs.iter().all(|rpc| skip_rpcs.contains(rpc)) { - // everything in this ranking is skipped + if next_rpcs + .iter() + .all(|rpc| _best_block_num_filter(self, rpc, needed_block_num, skip_rpcs)) + { + // TODO: too verbose + debug!("everything in this ranking ({:?}) is skipped", next_ranking); continue; } - best_block = best_block.max(next_ranking.head_num.as_ref()); + best_num = best_num.max(next_ranking.head_num.as_ref()); } - best_block + // TODO: too verbose + debug!("best (old) block: {:?}", best_num); + + best_num } else { // not all the best synced rpcs are skipped yet. use the best head block - Some(self.head_block.number()) + let best_num = self.head_block.number(); + + // TODO: too verbose + debug!("best (head) block: {}", best_num); + + Some(best_num) } } @@ -143,6 +195,7 @@ impl ConsensusWeb3Rpcs { .unwrap_or(false) } + // TODO: better name for this pub fn filter( &self, skip: &[Arc], diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index e761f691..cf8e7f1c 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -1933,12 +1933,12 @@ async fn watch_for_block( skip_rpcs: &[Arc], watch_consensus_rpcs: &mut watch::Receiver>>, ) -> Web3ProxyResult { - info!("waiting for {:?}", needed_block_num); + debug!("waiting for {:?}", needed_block_num); let mut best_block_num: Option = watch_consensus_rpcs .borrow_and_update() .as_ref() - .and_then(|x| x.best_block_num(skip_rpcs).copied()); + .and_then(|x| x.best_block_num(needed_block_num, skip_rpcs).copied()); match (needed_block_num, best_block_num.as_ref()) { (Some(x), Some(best)) => { @@ -1977,7 +1977,7 @@ async fn watch_for_block( best_block_num = consensus_rpcs .as_ref() - .and_then(|x| x.best_block_num(skip_rpcs).copied()); + .and_then(|x| x.best_block_num(needed_block_num, skip_rpcs).copied()); } Ok(true)