From 89961331af3a4ca3141ec9f9d4d975fc6f5d543d Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Tue, 16 May 2023 22:29:36 -0700 Subject: [PATCH] refactor wait_for_block into should_wait_for_block --- web3_proxy/src/rpcs/consensus.rs | 168 ++++++++++++++++++------------- web3_proxy/src/rpcs/many.rs | 90 ++++------------- web3_proxy/src/rpcs/one.rs | 3 + 3 files changed, 121 insertions(+), 140 deletions(-) diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index 5a943f67..90385901 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -86,25 +86,31 @@ impl PartialOrd for RpcRanking { pub type RankedRpcMap = BTreeMap>>; +pub enum ShouldWaitForBlock { + Ready, + Wait { current: Option }, + NeverReady, +} + /// A collection of Web3Rpcs that are on the same block. -/// Serialize is so we can print it on our debug endpoint +/// Serialize is so we can print it on our /status endpoint #[derive(Clone, Serialize)] pub struct ConsensusWeb3Rpcs { pub(crate) tier: u64, pub(crate) backups_needed: bool, - // TODO: this is already inside best_rpcs. give that a shorter serialize here and then include this again + // TODO: this is already inside best_rpcs. Don't skip, instead make a shorter serialize #[serde(skip_serializing)] pub(crate) head_block: Web3ProxyBlock, // TODO: smaller serialize - pub(crate) best_rpcs: Vec>, + pub(crate) head_rpcs: Vec>, - // TODO: make this work. the key needs to be a string + // TODO: make this work. the key needs to be a string. I think we need `serialize_with` #[serde(skip_serializing)] pub(crate) other_rpcs: RankedRpcMap, - // TODO: make this work. the key needs to be a string + // TODO: make this work. the key needs to be a string. I think we need `serialize_with` #[serde(skip_serializing)] rpc_data: HashMap, RpcData>, } @@ -112,79 +118,65 @@ pub struct ConsensusWeb3Rpcs { impl ConsensusWeb3Rpcs { #[inline] pub fn num_consensus_rpcs(&self) -> usize { - self.best_rpcs.len() + self.head_rpcs.len() } - pub fn best_block_num( + /// will tell you if you should wait for a block + /// TODO: also include method (or maybe an enum representing the different prune types) + pub fn should_wait_for_block( &self, needed_block_num: Option<&U64>, skip_rpcs: &[Arc], - ) -> Option<&U64> { - // TODO: dry this up with `filter`? - fn _best_block_num_filter( - x: &ConsensusWeb3Rpcs, - rpc: &Arc, - needed_block_num: Option<&U64>, - skip_rpcs: &[Arc], - ) -> bool { - // return true if this rpc will never work for us. "false" is good - if skip_rpcs.contains(rpc) { - // if rpc is skipped, it must have already been determined it is unable to serve the request - true - } else if let Some(needed_block_num) = needed_block_num { - if let Some(rpc_data) = x.rpc_data.get(rpc).as_ref() { - match rpc_data.head_block_num.cmp(needed_block_num) { - Ordering::Less => { - // rpc is not synced. let it catch up - false - } - Ordering::Greater | Ordering::Equal => { - // rpc is synced past the needed block. make sure the block isn't too old for it - !x.has_block_data(rpc, needed_block_num) - } - } - } else { - // no rpc data for this rpc. thats not promising - true - } - } else { - false + ) -> ShouldWaitForBlock { + if self + .head_rpcs + .iter() + .any(|rpc| self.rpc_will_work_eventually(rpc, needed_block_num, skip_rpcs)) + { + let head_num = self.head_block.number(); + + if Some(head_num) >= needed_block_num { + debug!("best (head) block: {}", head_num); + return ShouldWaitForBlock::Ready; } } - if self - .best_rpcs - .iter() - .all(|rpc| _best_block_num_filter(self, rpc, needed_block_num, skip_rpcs)) - { - // all of the consensus rpcs are skipped - // iterate the other rpc tiers to find the next best block - let mut best_num = None; - for (next_ranking, next_rpcs) in self.other_rpcs.iter() { - if next_rpcs - .iter() - .all(|rpc| _best_block_num_filter(self, rpc, needed_block_num, skip_rpcs)) - { - // TODO: too verbose - debug!("everything in this ranking ({:?}) is skipped", next_ranking); - continue; - } + // all of the head rpcs are skipped - best_num = best_num.max(next_ranking.head_num.as_ref()); + let mut best_num = None; + + // iterate the other rpc tiers to find the next best block + for (next_ranking, next_rpcs) in self.other_rpcs.iter() { + if !next_rpcs + .iter() + .any(|rpc| self.rpc_will_work_eventually(rpc, needed_block_num, skip_rpcs)) + { + // TODO: too verbose + debug!("everything in this ranking ({:?}) is skipped", next_ranking); + continue; } + let next_head_num = next_ranking.head_num.as_ref(); + + if next_head_num >= needed_block_num { + debug!("best (head) block: {:?}", next_head_num); + return ShouldWaitForBlock::Ready; + } + + best_num = next_head_num; + } + + // TODO: this seems wrong + if best_num.is_some() { // TODO: too verbose debug!("best (old) block: {:?}", best_num); - - best_num + ShouldWaitForBlock::Wait { + current: best_num.copied(), + } } else { - // not all the best synced rpcs are skipped yet. use the best head block - let best_num = self.head_block.number(); - // TODO: too verbose - debug!("best (head) block: {}", best_num); - - Some(best_num) + debug!("never ready"); + ShouldWaitForBlock::NeverReady } } @@ -195,8 +187,48 @@ impl ConsensusWeb3Rpcs { .unwrap_or(false) } + // TODO: take method as a param, too. mark nodes with supported methods (maybe do it optimistically? on) + fn rpc_will_work_eventually( + &self, + rpc: &Arc, + needed_block_num: Option<&U64>, + skip_rpcs: &[Arc], + ) -> bool { + // return true if this rpc will never work for us. "false" is good + if skip_rpcs.contains(rpc) { + // if rpc is skipped, it must have already been determined it is unable to serve the request + return false; + } + + if let Some(needed_block_num) = needed_block_num { + if let Some(rpc_data) = self.rpc_data.get(rpc) { + match rpc_data.head_block_num.cmp(needed_block_num) { + Ordering::Less => { + debug!("{} is behind. let it catch up", rpc); + return true; + } + Ordering::Greater | Ordering::Equal => { + // rpc is synced past the needed block. make sure the block isn't too old for it + if self.has_block_data(rpc, needed_block_num) { + debug!("{} has {}", rpc, needed_block_num); + return true; + } else { + debug!("{} does not have {}", rpc, needed_block_num); + return false; + } + } + } + } + + // no rpc data for this rpc. thats not promising + return true; + } + + false + } + // TODO: better name for this - pub fn filter( + pub fn rpc_will_work_now( &self, skip: &[Arc], min_block_needed: Option<&U64>, @@ -244,7 +276,7 @@ impl fmt::Debug for ConsensusWeb3Rpcs { // TODO: print the actual conns? f.debug_struct("ConsensusWeb3Rpcs") .field("head_block", &self.head_block) - .field("num_conns", &self.best_rpcs.len()) + .field("num_conns", &self.head_rpcs.len()) .finish_non_exhaustive() } } @@ -272,7 +304,7 @@ impl Web3Rpcs { let consensus = self.watch_consensus_rpcs_sender.borrow(); if let Some(consensus) = consensus.as_ref() { - !consensus.best_rpcs.is_empty() + !consensus.head_rpcs.is_empty() } else { false } @@ -282,7 +314,7 @@ impl Web3Rpcs { let consensus = self.watch_consensus_rpcs_sender.borrow(); if let Some(consensus) = consensus.as_ref() { - consensus.best_rpcs.len() + consensus.head_rpcs.len() } else { 0 } @@ -598,7 +630,7 @@ impl ConsensusFinder { let consensus = ConsensusWeb3Rpcs { tier, head_block: maybe_head_block.clone(), - best_rpcs: consensus_rpcs, + head_rpcs: consensus_rpcs, other_rpcs, backups_needed, rpc_data, diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index a1b67f7d..d5440115 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -1,6 +1,6 @@ ///! Load balanced communication with a group of web3 rpc providers use super::blockchain::{BlocksByHashCache, Web3ProxyBlock}; -use super::consensus::ConsensusWeb3Rpcs; +use super::consensus::{ConsensusWeb3Rpcs, ShouldWaitForBlock}; use super::one::Web3Rpc; use super::request::{OpenRequestHandle, OpenRequestResult, RequestErrorHandler}; use crate::app::{flatten_handle, AnyhowJoinHandle, Web3ProxyApp}; @@ -521,8 +521,8 @@ impl Web3Rpcs { ); // todo: for now, build the map m here. once that works, do as much of it as possible while building ConsensusWeb3Rpcs - for x in consensus_rpcs.best_rpcs.iter().filter(|rpc| { - consensus_rpcs.filter(skip, min_block_needed, max_block_needed, rpc) + for x in consensus_rpcs.head_rpcs.iter().filter(|rpc| { + consensus_rpcs.rpc_will_work_now(skip, min_block_needed, max_block_needed, rpc) }) { m.entry(best_key).or_insert_with(Vec::new).push(x.clone()); } @@ -533,7 +533,12 @@ impl Web3Rpcs { let v: Vec<_> = v .iter() .filter(|rpc| { - consensus_rpcs.filter(skip, min_block_needed, max_block_needed, rpc) + consensus_rpcs.rpc_will_work_now( + skip, + min_block_needed, + max_block_needed, + rpc, + ) }) .cloned() .collect(); @@ -698,7 +703,7 @@ impl Web3Rpcs { let synced_rpcs = self.watch_consensus_rpcs_sender.borrow(); if let Some(synced_rpcs) = synced_rpcs.as_ref() { - synced_rpcs.best_rpcs.clone() + synced_rpcs.head_rpcs.clone() } else { vec![] } @@ -967,13 +972,15 @@ impl Web3Rpcs { let waiting_for = min_block_needed.max(max_block_needed); - if watch_for_block(waiting_for, &skip_rpcs, &mut watch_consensus_rpcs).await? { - // block found! continue so we can check for another rpc - } else { - // rate limits are likely keeping us from serving the head block - watch_consensus_rpcs.changed().await?; - watch_consensus_rpcs.borrow_and_update(); + if let Some(consensus_rpcs) = watch_consensus_rpcs.borrow_and_update().as_ref() + { + match consensus_rpcs.should_wait_for_block(waiting_for, &skip_rpcs) { + ShouldWaitForBlock::NeverReady => break, + ShouldWaitForBlock::Ready => continue, + ShouldWaitForBlock::Wait { .. } => {} + } } + watch_consensus_rpcs.changed().await; } } } @@ -1926,67 +1933,6 @@ mod tests { } } -/// returns `true` when the desired block number is available -/// TODO: max wait time? max number of blocks to wait for? time is probably best -async fn watch_for_block( - needed_block_num: Option<&U64>, - skip_rpcs: &[Arc], - watch_consensus_rpcs: &mut watch::Receiver>>, -) -> Web3ProxyResult { - let mut best_block_num: Option = watch_consensus_rpcs - .borrow_and_update() - .as_ref() - .and_then(|x| x.best_block_num(needed_block_num, skip_rpcs).copied()); - - debug!( - "waiting for {:?}. best {:?}", - needed_block_num, best_block_num - ); - - match (needed_block_num, best_block_num.as_ref()) { - (Some(x), Some(best)) => { - if x <= best { - // the best block is past the needed block and no servers have the needed data - // this happens if the block is old and all archive servers are offline - // there is no chance we will get this block without adding an archive server to the config - // TODO: i think this can also happen if we are being rate limited! but then waiting might work. need skip_rpcs to be smarter - warn!("watching for block {} will never succeed. best {}", x, best); - return Ok(false); - } - } - (None, None) => { - // i don't think this is possible - // maybe if we internally make a request for the latest block and all our servers are disconnected? - warn!("how'd this None/None happen?"); - return Ok(false); - } - (Some(_), None) => { - // block requested but no servers synced. we will wait - // TODO: if the web3rpcs connected to this consensus isn't watching head blocks, exit with an erorr (waiting for blocks won't ever work) - } - (None, Some(head)) => { - // i don't think this is possible - // maybe if we internally make a request for the latest block and all our servers are disconnected? - warn!("how'd this None/Some({}) happen?", head); - return Ok(false); - } - }; - - // future block is requested - // wait for the block to arrive - while best_block_num.as_ref() < needed_block_num { - watch_consensus_rpcs.changed().await?; - - let consensus_rpcs = watch_consensus_rpcs.borrow_and_update(); - - best_block_num = consensus_rpcs - .as_ref() - .and_then(|x| x.best_block_num(needed_block_num, skip_rpcs).copied()); - } - - Ok(true) -} - #[cfg(test)] mod test { use std::cmp::Reverse; diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index 686e20d2..b37a4425 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -47,6 +47,8 @@ pub struct Web3Rpc { /// it is an async lock because we hold it open across awaits /// this provider is only used for new heads subscriptions /// TODO: benchmark ArcSwapOption and a watch::Sender + /// TODO: only the websocket provider needs to be behind an asyncrwlock! + /// TODO: the http provider is just an http_client pub(super) provider: AsyncRwLock>>, /// keep track of hard limits /// this is only inside an Option so that the "Default" derive works. it will always be set. @@ -1216,6 +1218,7 @@ impl Web3Rpc { if unlocked_provider.is_some() || self.provider.read().await.is_some() { // we already have an unlocked provider. no need to lock } else { + warn!("no provider on {}", self); return Ok(OpenRequestResult::NotReady); }