use super::blockchain::Web3ProxyBlock; use super::many::Web3Rpcs; use super::one::Web3Rpc; use crate::frontend::authorization::Authorization; use crate::frontend::errors::{Web3ProxyErrorContext, Web3ProxyResult}; use derive_more::Constructor; use ethers::prelude::{H256, U64}; use hashbrown::{HashMap, HashSet}; use itertools::{Itertools, MinMaxResult}; use log::{debug, trace, warn}; use quick_cache_ttl::Cache; use serde::Serialize; use std::cmp::{Ordering, Reverse}; use std::collections::BTreeMap; use std::convert::Infallible; use std::fmt; use std::sync::Arc; use tokio::time::Instant; #[derive(Clone, Serialize)] struct RpcData { head_block_num: U64, // TODO: this is too simple. erigon has 4 prune levels (hrct) oldest_block_num: U64, } impl RpcData { fn new(rpc: &Web3Rpc, head: &Web3ProxyBlock) -> Self { let head_block_num = *head.number(); let block_data_limit = rpc.block_data_limit(); let oldest_block_num = head_block_num.saturating_sub(block_data_limit); Self { head_block_num, oldest_block_num, } } // TODO: take an enum for the type of data (hrtc) fn data_available(&self, block_num: &U64) -> bool { *block_num >= self.oldest_block_num && *block_num <= self.head_block_num } } #[derive(Constructor, Clone, Copy, Debug, Default, Eq, PartialEq, Serialize)] pub struct RpcRanking { tier: u64, backup: bool, head_num: Option, } impl RpcRanking { pub fn add_offset(&self, offset: u64) -> Self { Self { tier: self.tier + offset, backup: self.backup, head_num: self.head_num, } } pub fn default_with_backup(backup: bool) -> Self { Self { backup, ..Default::default() } } fn sort_key(&self) -> (u64, bool, Reverse>) { // TODO: add soft_limit here? add peak_ewma here? (self.tier, !self.backup, Reverse(self.head_num)) } } impl Ord for RpcRanking { fn cmp(&self, other: &Self) -> std::cmp::Ordering { self.sort_key().cmp(&other.sort_key()) } } impl PartialOrd for RpcRanking { fn partial_cmp(&self, other: &Self) -> Option { Some(self.cmp(other)) } } pub type RankedRpcMap = BTreeMap>>; pub enum ShouldWaitForBlock { Ready, Wait { current: Option }, NeverReady, } /// A collection of Web3Rpcs that are on the same block. /// Serialize is so we can print it on our /status endpoint #[derive(Clone, Serialize)] pub struct ConsensusWeb3Rpcs { pub(crate) tier: u64, pub(crate) backups_needed: bool, // TODO: this is already inside best_rpcs. Don't skip, instead make a shorter serialize #[serde(skip_serializing)] pub(crate) head_block: Web3ProxyBlock, // TODO: smaller serialize pub(crate) head_rpcs: Vec>, // TODO: make this work. the key needs to be a string. I think we need `serialize_with` #[serde(skip_serializing)] pub(crate) other_rpcs: RankedRpcMap, // TODO: make this work. the key needs to be a string. I think we need `serialize_with` #[serde(skip_serializing)] rpc_data: HashMap, RpcData>, } impl ConsensusWeb3Rpcs { #[inline] pub fn num_consensus_rpcs(&self) -> usize { self.head_rpcs.len() } /// will tell you if you should wait for a block /// TODO: also include method (or maybe an enum representing the different prune types) pub fn should_wait_for_block( &self, needed_block_num: Option<&U64>, skip_rpcs: &[Arc], ) -> ShouldWaitForBlock { // TODO: i think checking synced is always a waste of time. though i guess there could be a race if self .head_rpcs .iter() .any(|rpc| self.rpc_will_work_eventually(rpc, needed_block_num, skip_rpcs)) { let head_num = self.head_block.number(); if Some(head_num) >= needed_block_num { debug!("best (head) block: {}", head_num); return ShouldWaitForBlock::Ready; } } // all of the head rpcs are skipped let mut best_num = None; // iterate the other rpc tiers to find the next best block for (next_ranking, next_rpcs) in self.other_rpcs.iter() { if !next_rpcs .iter() .any(|rpc| self.rpc_will_work_eventually(rpc, needed_block_num, skip_rpcs)) { // TODO: too verbose debug!("everything in this ranking ({:?}) is skipped", next_ranking); continue; } let next_head_num = next_ranking.head_num.as_ref(); if next_head_num >= needed_block_num { debug!("best (head) block: {:?}", next_head_num); return ShouldWaitForBlock::Ready; } best_num = next_head_num; } // TODO: this seems wrong if best_num.is_some() { // TODO: too verbose debug!("best (old) block: {:?}", best_num); ShouldWaitForBlock::Wait { current: best_num.copied(), } } else { // TODO: too verbose debug!("never ready"); ShouldWaitForBlock::NeverReady } } pub fn has_block_data(&self, rpc: &Web3Rpc, block_num: &U64) -> bool { self.rpc_data .get(rpc) .map(|x| x.data_available(block_num)) .unwrap_or(false) } // TODO: take method as a param, too. mark nodes with supported methods (maybe do it optimistically? on) fn rpc_will_work_eventually( &self, rpc: &Arc, needed_block_num: Option<&U64>, skip_rpcs: &[Arc], ) -> bool { // return true if this rpc will never work for us. "false" is good if skip_rpcs.contains(rpc) { // if rpc is skipped, it must have already been determined it is unable to serve the request return false; } if let Some(needed_block_num) = needed_block_num { if let Some(rpc_data) = self.rpc_data.get(rpc) { match rpc_data.head_block_num.cmp(needed_block_num) { Ordering::Less => { debug!("{} is behind. let it catch up", rpc); return true; } Ordering::Greater | Ordering::Equal => { // rpc is synced past the needed block. make sure the block isn't too old for it if self.has_block_data(rpc, needed_block_num) { debug!("{} has {}", rpc, needed_block_num); return true; } else { debug!("{} does not have {}", rpc, needed_block_num); return false; } } } } // no rpc data for this rpc. thats not promising return true; } false } // TODO: better name for this pub fn rpc_will_work_now( &self, skip: &[Arc], min_block_needed: Option<&U64>, max_block_needed: Option<&U64>, rpc: &Arc, ) -> bool { if skip.contains(rpc) { trace!("skipping {}", rpc); return false; } if let Some(min_block_needed) = min_block_needed { if !self.has_block_data(rpc, min_block_needed) { trace!( "{} is missing min_block_needed ({}). skipping", rpc, min_block_needed, ); return false; } } if let Some(max_block_needed) = max_block_needed { if !self.has_block_data(rpc, max_block_needed) { trace!( "{} is missing max_block_needed ({}). skipping", rpc, max_block_needed, ); return false; } } // we could check hard rate limits here, but i think it is faster to do later true } // TODO: sum_hard_limit? } impl fmt::Debug for ConsensusWeb3Rpcs { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { // TODO: the default formatter takes forever to write. this is too quiet though // TODO: print the actual conns? f.debug_struct("ConsensusWeb3Rpcs") .field("head_block", &self.head_block) .field("num_conns", &self.head_rpcs.len()) .finish_non_exhaustive() } } // TODO: refs for all of these. borrow on a Sender is cheap enough impl Web3Rpcs { // TODO: return a ref? pub fn head_block(&self) -> Option { self.watch_consensus_head_sender .as_ref() .and_then(|x| x.borrow().clone()) } // TODO: return a ref? pub fn head_block_hash(&self) -> Option { self.head_block().map(|x| *x.hash()) } // TODO: return a ref? pub fn head_block_num(&self) -> Option { self.head_block().map(|x| *x.number()) } pub fn synced(&self) -> bool { let consensus = self.watch_consensus_rpcs_sender.borrow(); if let Some(consensus) = consensus.as_ref() { !consensus.head_rpcs.is_empty() } else { false } } pub fn num_synced_rpcs(&self) -> usize { let consensus = self.watch_consensus_rpcs_sender.borrow(); if let Some(consensus) = consensus.as_ref() { consensus.head_rpcs.len() } else { 0 } } } type FirstSeenCache = Cache; /// A ConsensusConnections builder that tracks all connection heads across multiple groups of servers pub struct ConsensusFinder { /// backups for all tiers are only used if necessary /// `tiers[0] = only tier 0` /// `tiers[1] = tier 0 and tier 1` /// `tiers[n] = tier 0..=n` rpc_heads: HashMap, Web3ProxyBlock>, /// never serve blocks that are too old max_block_age: Option, /// tier 0 will be prefered as long as the distance between it and the other tiers is <= max_tier_lag max_block_lag: Option, /// used to track rpc.head_latency. The same cache should be shared between all ConnectionsGroups first_seen: FirstSeenCache, } impl ConsensusFinder { pub fn new(max_block_age: Option, max_block_lag: Option) -> Self { // TODO: what's a good capacity for this? it shouldn't need to be very large // TODO: if we change Web3ProxyBlock to store the instance, i think we could use the block_by_hash cache let first_seen = Cache::new(16); // TODO: hard coding 0-9 isn't great, but its easier than refactoring this to be smart about config reloading let rpc_heads = HashMap::new(); Self { rpc_heads, max_block_age, max_block_lag, first_seen, } } pub fn len(&self) -> usize { self.rpc_heads.len() } pub fn is_empty(&self) -> bool { self.rpc_heads.is_empty() } fn remove(&mut self, rpc: &Arc) -> Option { self.rpc_heads.remove(rpc) } async fn insert(&mut self, rpc: Arc, block: Web3ProxyBlock) -> Option { let first_seen = self .first_seen .get_or_insert_async::(block.hash(), async { Ok(Instant::now()) }) .await .unwrap(); // calculate elapsed time before trying to lock let latency = first_seen.elapsed(); // record the time behind the fastest node rpc.head_latency.write().record(latency); // update the local mapping of rpc -> block self.rpc_heads.insert(rpc, block) } /// Update our tracking of the rpc and return true if something changed pub(crate) async fn update_rpc( &mut self, rpc_head_block: Option, rpc: Arc, // we need this so we can save the block to caches. i don't like it though. maybe we should use a lazy_static Cache wrapper that has a "save_block" method?. i generally dislike globals but i also dislike all the types having to pass eachother around web3_connections: &Web3Rpcs, ) -> Web3ProxyResult { // add the rpc's block to connection_heads, or remove the rpc from connection_heads let changed = match rpc_head_block { Some(mut rpc_head_block) => { // we don't know if its on the heaviest chain yet rpc_head_block = web3_connections .try_cache_block(rpc_head_block, false) .await .web3_context("failed caching block")?; if let Some(max_age) = self.max_block_age { if rpc_head_block.age() > max_age { trace!("rpc_head_block from {} is too old! {}", rpc, rpc_head_block); return Ok(self.remove(&rpc).is_some()); } } if let Some(prev_block) = self.insert(rpc, rpc_head_block.clone()).await { // false if this block was already sent by this rpc // true if new block for this rpc prev_block.hash() != rpc_head_block.hash() } else { // first block for this rpc true } } None => { // false if this rpc was already removed // true if rpc head changed from being synced to not self.remove(&rpc).is_some() } }; Ok(changed) } pub async fn find_consensus_connections( &mut self, authorization: &Arc, web3_rpcs: &Web3Rpcs, ) -> Web3ProxyResult> { let minmax_block = self.rpc_heads.values().minmax_by_key(|&x| x.number()); let (lowest_block, highest_block) = match minmax_block { MinMaxResult::NoElements => return Ok(None), MinMaxResult::OneElement(x) => (x, x), MinMaxResult::MinMax(min, max) => (min, max), }; let highest_block_number = highest_block.number(); trace!("highest_block_number: {}", highest_block_number); trace!("lowest_block_number: {}", lowest_block.number()); let max_lag_block_number = highest_block_number.saturating_sub(self.max_block_lag.unwrap_or_else(|| U64::from(5))); trace!("max_lag_block_number: {}", max_lag_block_number); let lowest_block_number = lowest_block.number().max(&max_lag_block_number); trace!("safe lowest_block_number: {}", lowest_block_number); let num_known = self.rpc_heads.len(); if num_known < web3_rpcs.min_head_rpcs { // this keeps us from serving requests when the proxy first starts trace!("not enough servers known"); return Ok(None); } // TODO: also track the sum of *available* hard_limits? if any servers have no hard limits, use their soft limit or no limit? // TODO: struct for the value of the votes hashmap? let mut primary_votes: HashMap, u32)> = Default::default(); let mut backup_votes: HashMap, u32)> = Default::default(); let mut backup_consensus = None; let mut rpc_heads_by_tier: Vec<_> = self.rpc_heads.iter().collect(); rpc_heads_by_tier.sort_by_cached_key(|(rpc, _)| rpc.tier); let current_tier = rpc_heads_by_tier .first() .expect("rpc_heads_by_tier should never be empty") .0 .tier; // trace!("first_tier: {}", current_tier); // trace!("rpc_heads_by_tier: {:#?}", rpc_heads_by_tier); // loop over all the rpc heads (grouped by tier) and their parents to find consensus // TODO: i'm sure theres a lot of shortcuts that could be taken, but this is simplest to implement for (rpc, rpc_head) in rpc_heads_by_tier.into_iter() { if current_tier != rpc.tier { // we finished processing a tier. check for primary results if let Some(consensus) = self.count_votes(&primary_votes, web3_rpcs) { trace!("found enough votes on tier {}", current_tier); return Ok(Some(consensus)); } // only set backup consensus once. we don't want it to keep checking on worse tiers if it already found consensus if backup_consensus.is_none() { if let Some(consensus) = self.count_votes(&backup_votes, web3_rpcs) { trace!("found backup votes on tier {}", current_tier); backup_consensus = Some(consensus) } } } let mut block_to_check = rpc_head.clone(); while block_to_check.number() >= lowest_block_number { if !rpc.backup { // backup nodes are excluded from the primary voting let entry = primary_votes.entry(block_to_check.clone()).or_default(); entry.0.insert(&rpc.name); entry.1 += rpc.soft_limit; } // both primary and backup rpcs get included in the backup voting let backup_entry = backup_votes.entry(block_to_check.clone()).or_default(); backup_entry.0.insert(&rpc.name); backup_entry.1 += rpc.soft_limit; match web3_rpcs .block(authorization, block_to_check.parent_hash(), Some(rpc)) .await { Ok(parent_block) => block_to_check = parent_block, Err(err) => { warn!( "Problem fetching parent block of {:?} during consensus finding: {:#?}", block_to_check.hash(), err ); break; } } } } // we finished processing all tiers. check for primary results (if anything but the last tier found consensus, we already returned above) if let Some(consensus) = self.count_votes(&primary_votes, web3_rpcs) { return Ok(Some(consensus)); } // only set backup consensus once. we don't want it to keep checking on worse tiers if it already found consensus if let Some(consensus) = backup_consensus { return Ok(Some(consensus)); } // count votes one last time Ok(self.count_votes(&backup_votes, web3_rpcs)) } // TODO: have min_sum_soft_limit and min_head_rpcs on self instead of on Web3Rpcs fn count_votes( &self, votes: &HashMap, u32)>, web3_rpcs: &Web3Rpcs, ) -> Option { // sort the primary votes ascending by tier and descending by block num let mut votes: Vec<_> = votes .into_iter() .map(|(block, (rpc_names, sum_soft_limit))| (block, sum_soft_limit, rpc_names)) .collect(); votes.sort_by_cached_key(|(block, sum_soft_limit, rpc_names)| { ( Reverse(*block.number()), Reverse(*sum_soft_limit), Reverse(rpc_names.len()), ) }); // return the first result that exceededs confgured minimums (if any) for (maybe_head_block, sum_soft_limit, rpc_names) in votes { if *sum_soft_limit < web3_rpcs.min_sum_soft_limit { continue; } // TODO: different mins for backup vs primary if rpc_names.len() < web3_rpcs.min_head_rpcs { continue; } trace!("rpc_names: {:#?}", rpc_names); // consensus likely found! load the rpcs to make sure they all have active connections let consensus_rpcs: Vec<_> = rpc_names .into_iter() .filter_map(|x| web3_rpcs.get(x)) .collect(); if consensus_rpcs.len() < web3_rpcs.min_head_rpcs { continue; } // consensus found! let tier = consensus_rpcs .iter() .map(|x| x.tier) .max() .expect("there should always be a max"); let backups_needed = consensus_rpcs.iter().any(|x| x.backup); let mut other_rpcs = BTreeMap::new(); for (x, x_head) in self .rpc_heads .iter() .filter(|(k, _)| !consensus_rpcs.contains(k)) { let x_head_num = *x_head.number(); let key: RpcRanking = RpcRanking::new(x.tier, x.backup, Some(x_head_num)); other_rpcs .entry(key) .or_insert_with(Vec::new) .push(x.clone()); } // TODO: how should we populate this? let mut rpc_data = HashMap::with_capacity(self.rpc_heads.len()); for (x, x_head) in self.rpc_heads.iter() { let y = RpcData::new(x, x_head); rpc_data.insert(x.clone(), y); } let consensus = ConsensusWeb3Rpcs { tier, head_block: maybe_head_block.clone(), head_rpcs: consensus_rpcs, other_rpcs, backups_needed, rpc_data, }; return Some(consensus); } None } pub fn worst_tier(&self) -> Option { self.rpc_heads.iter().map(|(x, _)| x.tier).max() } } #[cfg(test)] mod test { // #[test] // fn test_simplest_case_consensus_head_connections() { // todo!(); // } }