From 8eff48611c50090f1646839dcfe115c52e436831 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Tue, 21 Mar 2023 11:16:18 -0700 Subject: [PATCH] more direct consensus finding code this hopefully has less bugs. speed isn't super important since this isn't on the host path. --- TODO.md | 2 + web3_proxy/src/frontend/mod.rs | 5 +- web3_proxy/src/rpcs/blockchain.rs | 59 ++- web3_proxy/src/rpcs/consensus.rs | 575 ++++++++++-------------------- web3_proxy/src/rpcs/many.rs | 13 +- web3_proxy/src/rpcs/request.rs | 2 +- 6 files changed, 223 insertions(+), 433 deletions(-) diff --git a/TODO.md b/TODO.md index 9b10ae12..4336e32f 100644 --- a/TODO.md +++ b/TODO.md @@ -390,6 +390,7 @@ These are not yet ordered. There might be duplicates. We might not actually need - [x] maybe we shouldn't route eth_getLogs to syncing nodes. serving queries slows down sync significantly - change the send_best function to only include servers that are at least close to fully synced - [-] have private transactions be enabled by a url setting rather than a setting on the key +- [ ] eth_sendRawTransaction should only forward if the chain_id matches what we are running - [ ] cli for adding rpc keys to an existing user - [ ] rate limiting/throttling on query_user_stats - [ ] web3rpc configs should have a max_concurrent_requests @@ -425,6 +426,7 @@ These are not yet ordered. There might be duplicates. We might not actually need - [ ] implement remaining subscriptions - would be nice if our subscriptions had better gaurentees than geth/erigon do, but maybe simpler to just setup a broadcast channel and proxy all the respones to a backend instead - [ ] tests should use `test-env-log = "0.2.8"` +- [ ] eth_sendRawTransaction should only forward if the chain_id matches what we are running - [ ] weighted random choice should still prioritize non-archive servers - maybe shuffle randomly and then sort by (block_limit, random_index)? - maybe sum available_requests grouped by archive/non-archive. only limit to non-archive if they have enough? diff --git a/web3_proxy/src/frontend/mod.rs b/web3_proxy/src/frontend/mod.rs index bfa7256d..0d0e5146 100644 --- a/web3_proxy/src/frontend/mod.rs +++ b/web3_proxy/src/frontend/mod.rs @@ -232,6 +232,7 @@ pub async fn serve(port: u16, proxy_app: Arc) -> anyhow::Result<() axum::Server::bind(&addr) // TODO: option to use with_connect_info. we want it in dev, but not when running behind a proxy, but not .serve(service) - .await - .map_err(Into::into) + .await?; + + Ok(()) } diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index be220d29..b90b9958 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -12,6 +12,7 @@ use log::{debug, trace, warn, Level}; use moka::future::Cache; use serde::Serialize; use serde_json::json; +use std::hash::Hash; use std::time::{SystemTime, UNIX_EPOCH}; use std::{cmp::Ordering, fmt::Display, sync::Arc}; use tokio::sync::broadcast; @@ -42,6 +43,14 @@ impl PartialEq for Web3ProxyBlock { } } +impl Eq for Web3ProxyBlock {} + +impl Hash for Web3ProxyBlock { + fn hash(&self, state: &mut H) { + self.block.hash.hash(state); + } +} + impl Web3ProxyBlock { /// A new block has arrived over a subscription pub fn try_new(block: ArcBlock) -> Option { @@ -393,36 +402,28 @@ impl Web3Rpcs { return Ok(()); } - let new_synced_connections = consensus_finder - .best_consensus_connections(authorization, self) - .await - .context("no consensus head block!") - .map_err(|err| { - self.watch_consensus_rpcs_sender.send_replace(None); + let new_synced_connections = match consensus_finder + .find_consensus_connections(authorization, self) + .await { + Err(err) => { + let err = err.context("error while finding consensus head block!"); - err - })?; - - // TODO: what should we do if the block number of new_synced_connections is < old_synced_connections? wait? + return Err(err); + } + Ok(None) => { + return Err(anyhow!("no consensus head block!")); + } + Ok(Some(x)) => x + }; let watch_consensus_head_sender = self.watch_consensus_head_sender.as_ref().unwrap(); let consensus_tier = new_synced_connections.tier; - let total_tiers = consensus_finder.len(); + // TODO: think more about this unwrap + let total_tiers = consensus_finder.worst_tier().unwrap_or(10); let backups_needed = new_synced_connections.backups_needed; let consensus_head_block = new_synced_connections.head_block.clone(); let num_consensus_rpcs = new_synced_connections.num_conns(); - let mut num_synced_rpcs = 0; - let num_active_rpcs = consensus_finder - .all_rpcs_group() - .map(|x| { - for v in x.rpc_to_block.values() { - if *v == consensus_head_block { - num_synced_rpcs += 1; - } - } - x.len() - }) - .unwrap_or_default(); + let num_active_rpcs = consensus_finder.len(); let total_rpcs = self.by_name.read().len(); let old_consensus_head_connections = self @@ -474,12 +475,11 @@ impl Web3Rpcs { // no change in hash. no need to use watch_consensus_head_sender // TODO: trace level if rpc is backup debug!( - "con {}/{} {}{}/{}/{}/{} con={} rpc={}@{}", + "con {}/{} {}{}/{}/{} con={} rpc={}@{}", consensus_tier, total_tiers, backups_voted_str, num_consensus_rpcs, - num_synced_rpcs, num_active_rpcs, total_rpcs, consensus_head_block, @@ -490,12 +490,11 @@ impl Web3Rpcs { // hash changed debug!( - "unc {}/{} {}{}/{}/{}/{} con_head={} old={} rpc={}@{}", + "unc {}/{} {}{}/{}/{} con_head={} old={} rpc={}@{}", consensus_tier, total_tiers, backups_voted_str, num_consensus_rpcs, - num_synced_rpcs, num_active_rpcs, total_rpcs, consensus_head_block, @@ -518,12 +517,11 @@ impl Web3Rpcs { // this is unlikely but possible // TODO: better log warn!( - "chain rolled back {}/{} {}{}/{}/{}/{} con={} old={} rpc={}@{}", + "chain rolled back t{}/{} {}{}/{}/{} con={} old={} rpc={}@{}", consensus_tier, total_tiers, backups_voted_str, num_consensus_rpcs, - num_synced_rpcs, num_active_rpcs, total_rpcs, consensus_head_block, @@ -549,12 +547,11 @@ impl Web3Rpcs { } Ordering::Greater => { debug!( - "new {}/{} {}{}/{}/{}/{} con={} rpc={}@{}", + "new {}/{} {}{}/{}/{} con={} rpc={}@{}", consensus_tier, total_tiers, backups_voted_str, num_consensus_rpcs, - num_synced_rpcs, num_active_rpcs, total_rpcs, consensus_head_block, diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index 6fb596bf..943dd051 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -6,10 +6,11 @@ use super::one::Web3Rpc; use anyhow::Context; use ethers::prelude::{H256, U64}; use hashbrown::{HashMap, HashSet}; -use log::{debug, trace, warn}; +use itertools::{Itertools, MinMaxResult}; +use log::{trace, warn}; use moka::future::Cache; use serde::Serialize; -use std::collections::BTreeMap; +use std::cmp::Reverse; use std::fmt; use std::sync::Arc; use tokio::time::Instant; @@ -18,23 +19,19 @@ use tokio::time::Instant; /// Serialize is so we can print it on our debug endpoint #[derive(Clone, Serialize)] pub struct ConsensusWeb3Rpcs { - // TODO: tier should be an option, or we should have consensus be stored as an Option pub(super) tier: u64, pub(super) head_block: Web3ProxyBlock, // TODO: this should be able to serialize, but it isn't #[serde(skip_serializing)] - pub(super) rpcs: Vec>, - pub(super) backups_voted: Option, + pub(super) best_rpcs: Vec>, + // TODO: functions like "compare_backup_vote()" + // pub(super) backups_voted: Option, pub(super) backups_needed: bool, } impl ConsensusWeb3Rpcs { pub fn num_conns(&self) -> usize { - self.rpcs.len() - } - - pub fn sum_soft_limit(&self) -> u32 { - self.rpcs.iter().fold(0, |sum, rpc| sum + rpc.soft_limit) + self.best_rpcs.len() } // TODO: sum_hard_limit? @@ -44,9 +41,9 @@ impl fmt::Debug for ConsensusWeb3Rpcs { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { // TODO: the default formatter takes forever to write. this is too quiet though // TODO: print the actual conns? - f.debug_struct("ConsensusConnections") + f.debug_struct("ConsensusWeb3Rpcs") .field("head_block", &self.head_block) - .field("num_conns", &self.rpcs.len()) + .field("num_conns", &self.best_rpcs.len()) .finish_non_exhaustive() } } @@ -73,7 +70,7 @@ impl Web3Rpcs { let consensus = self.watch_consensus_rpcs_sender.borrow(); if let Some(consensus) = consensus.as_ref() { - !consensus.rpcs.is_empty() + !consensus.best_rpcs.is_empty() } else { false } @@ -83,7 +80,7 @@ impl Web3Rpcs { let consensus = self.watch_consensus_rpcs_sender.borrow(); if let Some(consensus) = consensus.as_ref() { - consensus.rpcs.len() + consensus.best_rpcs.len() } else { 0 } @@ -92,50 +89,51 @@ impl Web3Rpcs { type FirstSeenCache = Cache; -pub struct ConnectionsGroup { - pub rpc_to_block: HashMap, Web3ProxyBlock>, - // TODO: what if there are two blocks with the same number? - pub highest_block: Option, +/// A ConsensusConnections builder that tracks all connection heads across multiple groups of servers +pub struct ConsensusFinder { + /// backups for all tiers are only used if necessary + /// tiers[0] = only tier 0. + /// tiers[1] = tier 0 and tier 1 + /// tiers[n] = tier 0..=n + /// This is a BTreeMap and not a Vec because sometimes a tier is empty + rpc_heads: HashMap, Web3ProxyBlock>, + /// never serve blocks that are too old + max_block_age: Option, + /// tier 0 will be prefered as long as the distance between it and the other tiers is <= max_tier_lag + max_block_lag: Option, /// used to track rpc.head_latency. The same cache should be shared between all ConnectionsGroups first_seen: FirstSeenCache, } -impl ConnectionsGroup { - pub fn new(first_seen: FirstSeenCache) -> Self { +impl ConsensusFinder { + pub fn new(max_block_age: Option, max_block_lag: Option) -> Self { + // TODO: what's a good capacity for this? it shouldn't need to be very large + // TODO: if we change Web3ProxyBlock to store the instance, i think we could use the block_by_hash cache + let first_seen = Cache::builder() + .max_capacity(16) + .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()); + + // TODO: hard coding 0-9 isn't great, but its easier than refactoring this to be smart about config reloading + let rpc_heads = HashMap::new(); + Self { - rpc_to_block: Default::default(), - highest_block: Default::default(), + rpc_heads, + max_block_age, + max_block_lag, first_seen, } } pub fn len(&self) -> usize { - self.rpc_to_block.len() + self.rpc_heads.len() } pub fn is_empty(&self) -> bool { - self.rpc_to_block.is_empty() + self.rpc_heads.is_empty() } fn remove(&mut self, rpc: &Arc) -> Option { - if let Some(removed_block) = self.rpc_to_block.remove(rpc) { - match self.highest_block.as_mut() { - None => {} - Some(current_highest_block) => { - if removed_block.hash() == current_highest_block.hash() { - for maybe_highest_block in self.rpc_to_block.values() { - if maybe_highest_block.number() > current_highest_block.number() { - *current_highest_block = maybe_highest_block.clone(); - }; - } - } - } - } - - Some(removed_block) - } else { - None - } + self.rpc_heads.remove(rpc) } async fn insert(&mut self, rpc: Arc, block: Web3ProxyBlock) -> Option { @@ -150,322 +148,7 @@ impl ConnectionsGroup { rpc.head_latency.write().record(latency); - // TODO: what about a reorg to the same height? - if Some(block.number()) > self.highest_block.as_ref().map(|x| x.number()) { - self.highest_block = Some(block.clone()); - } - - self.rpc_to_block.insert(rpc, block) - } - - /// min_consensus_block_num keeps us from ever going backwards. - /// TODO: think about min_consensus_block_num more. i think this might cause an outage if the chain is doing weird things. but 503s is probably better than broken data. - pub(self) async fn consensus_head_connections( - &self, - authorization: &Arc, - web3_rpcs: &Web3Rpcs, - min_consensus_block_num: Option, - tier: &u64, - ) -> anyhow::Result { - let mut maybe_head_block = match self.highest_block.clone() { - None => return Err(anyhow::anyhow!("no blocks known")), - Some(x) => x, - }; - - // TODO: take max_distance_consensus_to_highest as an argument? - // TODO: what if someone's backup node is misconfigured and goes on a really fast forked chain? - let max_lag_consensus_to_highest = - if let Some(min_consensus_block_num) = min_consensus_block_num { - maybe_head_block - .number() - .saturating_add(1.into()) - .saturating_sub(min_consensus_block_num) - .as_u64() - } else { - 10 - }; - - trace!( - "max_lag_consensus_to_highest: {}", - max_lag_consensus_to_highest - ); - - let num_known = self.rpc_to_block.len(); - - if num_known < web3_rpcs.min_head_rpcs { - return Err(anyhow::anyhow!( - "not enough rpcs connected: {}/{}", - num_known, - web3_rpcs.min_head_rpcs, - )); - } - - let mut primary_rpcs_voted: Option = None; - let mut backup_rpcs_voted: Option = None; - - // track rpcs on this heaviest chain so we can build a new ConsensusConnections - let mut primary_consensus_rpcs = HashSet::<&str>::new(); - let mut backup_consensus_rpcs = HashSet::<&str>::new(); - let mut skip_rpcs = HashSet::<&str>::new(); - - // a running total of the soft limits covered by the rpcs that agree on the head block - let mut primary_sum_soft_limit: u32 = 0; - let mut backup_sum_soft_limit: u32 = 0; - - // TODO: also track the sum of *available* hard_limits. if any servers have no hard limits, use their soft limit or no limit? - - // check the highest work block for a set of rpcs that can serve our request load - // if it doesn't have enough rpcs for our request load, check the parent block - // TODO: loop for how many parent blocks? we don't want to serve blocks that are too far behind. probably different per chain - // TODO: this loop is pretty long. any way to clean up this code? - for _ in 0..max_lag_consensus_to_highest { - let maybe_head_hash = maybe_head_block.hash(); - - // find all rpcs with maybe_head_hash as their current head - for (rpc, rpc_head) in self.rpc_to_block.iter() { - if rpc_head.hash() != maybe_head_hash { - // connection is not on the desired block - continue; - } - let rpc_name = rpc.name.as_str(); - if backup_consensus_rpcs.contains(rpc_name) { - // connection is on a later block in this same chain - continue; - } - if primary_consensus_rpcs.contains(rpc_name) { - // connection is on a later block in this same chain - continue; - } - if skip_rpcs.contains(rpc_name) { - // connection is missing or theres some other reason to skip this rpc - continue; - } - - if let Some(rpc) = web3_rpcs.by_name.read().get(rpc_name) { - if backup_rpcs_voted.is_some() { - // backups already voted for a head block on another tier. don't change it - } else { - backup_consensus_rpcs.insert(rpc_name); - backup_sum_soft_limit += rpc.soft_limit; - } - if !rpc.backup { - primary_consensus_rpcs.insert(rpc_name); - primary_sum_soft_limit += rpc.soft_limit; - } - } else { - // i don't think this is an error. i think its just if a reconnect is currently happening - if web3_rpcs.synced() { - warn!("connection missing: {}", rpc_name); - debug!("web3_rpcs.by_name: {:#?}", web3_rpcs.by_name); - } else { - debug!("connection missing: {}", rpc_name); - } - skip_rpcs.insert(rpc_name); - } - } - - if primary_sum_soft_limit >= web3_rpcs.min_sum_soft_limit - && primary_consensus_rpcs.len() >= web3_rpcs.min_head_rpcs - { - // we have enough servers with enough requests! yey! - primary_rpcs_voted = Some(maybe_head_block.clone()); - break; - } - - if backup_rpcs_voted.is_none() - && backup_sum_soft_limit >= web3_rpcs.min_sum_soft_limit - && backup_consensus_rpcs.len() >= web3_rpcs.min_head_rpcs - { - // if we include backup servers, we have enough servers with high enough limits - backup_rpcs_voted = Some(maybe_head_block.clone()); - } - - // not enough rpcs on this block. check the parent block - match web3_rpcs - .block(authorization, maybe_head_block.parent_hash(), None) - .await - { - Ok(parent_block) => { - // trace!( - // child=%maybe_head_hash, parent=%parent_block.hash.unwrap(), "avoiding thundering herd. checking consensus on parent block", - // ); - maybe_head_block = parent_block; - continue; - } - Err(err) => { - let soft_limit_percent = (primary_sum_soft_limit as f32 - / web3_rpcs.min_sum_soft_limit as f32) - * 100.0; - - let err_msg = format!("ran out of parents to check. rpcs {}/{}/{}. soft limit: {:.2}% ({}/{}). err: {:#?}", - primary_consensus_rpcs.len(), - num_known, - web3_rpcs.min_head_rpcs, - soft_limit_percent, - primary_sum_soft_limit, - web3_rpcs.min_sum_soft_limit, - err, - ); - - if backup_rpcs_voted.is_some() { - warn!("{}. using backup vote", err_msg); - break; - } else { - return Err(anyhow::anyhow!(err_msg)); - } - } - } - } - - // TODO: if consensus_head_rpcs.is_empty, try another method of finding the head block. will need to change the return Err above into breaks. - - // we've done all the searching for the heaviest block that we can - if (primary_consensus_rpcs.len() < web3_rpcs.min_head_rpcs - || primary_sum_soft_limit < web3_rpcs.min_sum_soft_limit) - && backup_rpcs_voted.is_none() - { - // if we get here, not enough servers are synced. return an error - let soft_limit_percent = - (primary_sum_soft_limit as f32 / web3_rpcs.min_sum_soft_limit as f32) * 100.0; - - return Err(anyhow::anyhow!( - "Not enough resources. rpcs {}/{}/{}. soft limit: {:.2}% ({}/{})", - primary_consensus_rpcs.len(), - num_known, - web3_rpcs.min_head_rpcs, - primary_sum_soft_limit, - web3_rpcs.min_sum_soft_limit, - soft_limit_percent, - )); - } - - // success! a block has enough soft limit and nodes on it (or on later blocks) - let backups_needed; - let (head_block, consensus_rpcs) = if let Some(head_block) = primary_rpcs_voted { - backups_needed = false; - (head_block, primary_consensus_rpcs) - } else if let Some(head_block) = backup_rpcs_voted.clone() { - backups_needed = true; - (head_block, backup_consensus_rpcs) - } else { - return Err(anyhow::anyhow!("No head voted!")); - }; - - #[cfg(debug_assertions)] - { - let _ = head_block.hash(); - let _ = head_block.number(); - } - - let consensus_rpcs: Vec> = consensus_rpcs - .into_iter() - .filter_map(|conn_name| web3_rpcs.by_name.read().get(conn_name).cloned()) - .collect(); - - Ok(ConsensusWeb3Rpcs { - tier: *tier, - head_block, - rpcs: consensus_rpcs, - backups_voted: backup_rpcs_voted, - backups_needed, - }) - } -} - -/// A ConsensusConnections builder that tracks all connection heads across multiple groups of servers -pub struct ConsensusFinder { - /// backups for all tiers are only used if necessary - /// tiers[0] = only tier 0. - /// tiers[1] = tier 0 and tier 1 - /// tiers[n] = tier 0..=n - /// This is a BTreeMap and not a Vec because sometimes a tier is empty - tiers: BTreeMap, - /// never serve blocks that are too old - max_block_age: Option, - /// tier 0 will be prefered as long as the distance between it and the other tiers is <= max_tier_lag - max_block_lag: Option, -} - -impl ConsensusFinder { - pub fn new(max_block_age: Option, max_block_lag: Option) -> Self { - // TODO: what's a good capacity for this? it shouldn't need to be very large - // TODO: if we change Web3ProxyBlock to store the instance, i think we could use the block_by_hash cache - let first_seen = Cache::builder() - .max_capacity(16) - .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()); - - // TODO: hard coding 0-9 isn't great, but its easier than refactoring this to be smart about config reloading - let tiers = (0..10) - .map(|x| (x, ConnectionsGroup::new(first_seen.clone()))) - .collect(); - - Self { - tiers, - max_block_age, - max_block_lag, - } - } - - pub fn len(&self) -> usize { - self.tiers.len() - } - - pub fn is_empty(&self) -> bool { - self.tiers.is_empty() - } - - /// get the ConnectionsGroup that contains all rpcs - /// panics if there are no tiers - pub fn all_rpcs_group(&self) -> Option<&ConnectionsGroup> { - self.tiers.values().last() - } - - /// get the mutable ConnectionsGroup that contains all rpcs - pub fn all_mut(&mut self) -> Option<&mut ConnectionsGroup> { - self.tiers.values_mut().last() - } - - pub fn remove(&mut self, rpc: &Arc) -> Option { - let mut removed = None; - - for (i, tier_group) in self.tiers.iter_mut().rev() { - if i < &rpc.tier { - break; - } - let x = tier_group.remove(rpc); - - if removed.is_none() && x.is_some() { - removed = x; - } - } - - removed - } - - /// returns the block that the rpc was on before updating to the new_block - pub async fn insert( - &mut self, - rpc: &Arc, - new_block: Web3ProxyBlock, - ) -> Option { - let mut old = None; - - // TODO: error if rpc.tier is not in self.tiers - - for (i, tier_group) in self.tiers.iter_mut().rev() { - if i < &rpc.tier { - break; - } - - // TODO: should new_block be a ref? - let x = tier_group.insert(rpc.clone(), new_block.clone()).await; - - if old.is_none() && x.is_some() { - old = x; - } - } - - old + self.rpc_heads.insert(rpc, block) } /// Update our tracking of the rpc and return true if something changed @@ -499,8 +182,8 @@ impl ConsensusFinder { } } - if let Some(prev_block) = self.insert(&rpc, rpc_head_block.clone()).await { - // false if this block was already sent by this rpc. return early + if let Some(prev_block) = self.insert(rpc, rpc_head_block.clone()).await { + // false if this block was already sent by this rpc // true if new block for this rpc prev_block.hash() != rpc_head_block.hash() } else { @@ -518,47 +201,155 @@ impl ConsensusFinder { Ok(changed) } - pub async fn best_consensus_connections( + pub async fn find_consensus_connections( &mut self, authorization: &Arc, - web3_connections: &Web3Rpcs, - ) -> anyhow::Result { - // TODO: attach context to these? - let highest_known_block = self - .all_rpcs_group() - .context("no rpcs")? - .highest_block - .as_ref() - .context("no highest block")?; + web3_rpcs: &Web3Rpcs, + ) -> anyhow::Result> { + let minmax_block = self + .rpc_heads + .values().minmax_by_key(|&x| x.number()); - trace!("highest_known_block: {}", highest_known_block); + let (lowest_block, highest_block) = match minmax_block { + MinMaxResult::NoElements => return Ok(None), + MinMaxResult::OneElement(x) => (x, x), + MinMaxResult::MinMax(min, max) => (min, max), + }; - let min_block_num = self - .max_block_lag - .map(|x| highest_known_block.number().saturating_sub(x)) - // we also want to be sure we don't ever go backwards! - .max(web3_connections.head_block_num()); + let highest_block_number = highest_block.number(); - trace!("min_block_num: {:#?}", min_block_num); + trace!("highest_block_number: {}", highest_block_number); - // TODO Should this be a Vec>>? - // TODO: how should errors be handled? - // TODO: find the best tier with a connectionsgroup. best case, this only queries the first tier - // TODO: do we need to calculate all of them? I think having highest_known_block included as part of min_block_num should make that unnecessary - for (tier, x) in self.tiers.iter() { - trace!("checking tier {}: {:#?}", tier, x.rpc_to_block); - if let Ok(consensus_head_connections) = x - .consensus_head_connections(authorization, web3_connections, min_block_num, tier) - .await - { - trace!("success on tier {}", tier); - // we got one! hopefully it didn't need to use any backups. - // but even if it did need backup servers, that is better than going to a worse tier - return Ok(consensus_head_connections); + trace!("lowest_block_number: {}", lowest_block.number()); + + let max_lag_block_number = highest_block_number.saturating_sub(self.max_block_lag.unwrap_or_else(|| U64::from(10))); + + trace!("max_lag_block_number: {}", max_lag_block_number); + + let lowest_block_number = lowest_block.number().max(&max_lag_block_number); + + trace!("safe lowest_block_number: {}", lowest_block_number); + + let num_known = self.rpc_heads.len(); + + if num_known < web3_rpcs.min_head_rpcs { + // this keeps us from serving requests when the proxy first starts + return Ok(None); + } + + // TODO: also track the sum of *available* hard_limits? if any servers have no hard limits, use their soft limit or no limit? + // TODO: struct for the value of the votes hashmap? + let mut primary_votes: HashMap, u32)> = Default::default(); + let mut backup_votes: HashMap, u32)> = Default::default(); + + let mut backup_consensus = None; + + let mut rpc_heads_by_tier: Vec<_> = self.rpc_heads.iter().collect(); + rpc_heads_by_tier.sort_by_cached_key(|(rpc, _)| rpc.tier); + + let current_tier = rpc_heads_by_tier.first().expect("rpc_heads_by_tier should never be empty").0.tier; + + // loop over all the rpc heads (grouped by tier) and their parents to find consensus + // TODO: i'm sure theres a lot of shortcuts that could be taken, but this is simplest to implement + for (rpc, rpc_head) in self.rpc_heads.iter() { + if current_tier != rpc.tier { + // we finished processing a tier. check for primary results + if let Some(consensus) = self.count_votes(&primary_votes, web3_rpcs) { + return Ok(Some(consensus)) + } + + // only set backup consensus once. we don't want it to keep checking on worse tiers if it already found consensus + if backup_consensus.is_none() { + if let Some(consensus) = self.count_votes(&backup_votes, web3_rpcs) { + backup_consensus =Some(consensus) + } + } + } + + let mut block_to_check = rpc_head.clone(); + + while block_to_check.number() >= lowest_block_number { + if !rpc.backup { + // backup nodes are excluded from the primary voting + let entry = primary_votes.entry(block_to_check.clone()).or_default(); + + entry.0.insert(&rpc.name); + entry.1 += rpc.soft_limit; + } + + // both primary and backup rpcs get included in the backup voting + let backup_entry = backup_votes.entry(block_to_check.clone()).or_default(); + + backup_entry.0.insert(&rpc.name); + backup_entry.1 += rpc.soft_limit; + + match web3_rpcs.block(authorization, block_to_check.parent_hash(), Some(rpc)).await { + Ok(parent_block) => block_to_check = parent_block, + Err(err) => { + warn!("Problem fetching parent block of {:#?} during consensus finding: {:#?}", block_to_check, err); + break; + } + } } } - return Err(anyhow::anyhow!("failed finding consensus on all tiers")); + // we finished processing all tiers. check for primary results (if anything but the last tier found consensus, we already returned above) + if let Some(consensus) = self.count_votes(&primary_votes, web3_rpcs) { + return Ok(Some(consensus)) + } + + // only set backup consensus once. we don't want it to keep checking on worse tiers if it already found consensus + if let Some(consensus) = backup_consensus { + return Ok(Some(consensus)); + } + + // count votes one last time + Ok(self.count_votes(&backup_votes, web3_rpcs)) + } + + // TODO: have min_sum_soft_limit and min_head_rpcs on self instead of on Web3Rpcs + fn count_votes(&self, votes: &HashMap, u32)>, web3_rpcs: &Web3Rpcs) -> Option { + // sort the primary votes ascending by tier and descending by block num + let mut votes: Vec<_> = votes.iter().map(|(block, (rpc_names, sum_soft_limit))| (block, sum_soft_limit, rpc_names)).collect(); + votes.sort_by_cached_key(|(block, sum_soft_limit, rpc_names)| (Reverse(*block.number()), Reverse(*sum_soft_limit), Reverse(rpc_names.len()))); + + // return the first result that exceededs confgured minimums (if any) + for (maybe_head_block, sum_soft_limit, rpc_names) in votes { + if *sum_soft_limit < web3_rpcs.min_sum_soft_limit { + continue; + } + // TODO: different mins for backup vs primary + if rpc_names.len() < web3_rpcs.min_head_rpcs { + continue; + } + + // consensus likely found! load the rpcs to make sure they all have active connections + let consensus_rpcs: Vec<_> = rpc_names.into_iter().filter_map(|x| web3_rpcs.get(x)).collect(); + + if consensus_rpcs.len() < web3_rpcs.min_head_rpcs { + continue; + } + // consensus found! + + let tier = consensus_rpcs.iter().map(|x| x.tier).max().expect("there should always be a max"); + + let backups_needed = consensus_rpcs.iter().any(|x| x.backup); + + let consensus = ConsensusWeb3Rpcs { + tier, + head_block: maybe_head_block.clone(), + best_rpcs: consensus_rpcs, + backups_needed, + }; + + return Some(consensus); + } + + None + } + + pub fn worst_tier(&self) -> Option { + self.rpc_heads.iter().map(|(x, _)| x.tier).max() } } diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 24f84678..98a58391 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -27,7 +27,7 @@ use serde::ser::{SerializeStruct, Serializer}; use serde::Serialize; use serde_json::json; use serde_json::value::RawValue; -use std::cmp::min_by_key; +use std::cmp::{min_by_key, Reverse}; use std::collections::BTreeMap; use std::sync::atomic::{self, Ordering}; use std::sync::Arc; @@ -611,7 +611,7 @@ impl Web3Rpcs { // they are all at the same block and it is already sized to what we need let key = (0, None); - for x in synced_connections.rpcs.iter() { + for x in synced_connections.best_rpcs.iter() { if skip.contains(x) { trace!("skipping: {}", x); continue; @@ -741,7 +741,7 @@ impl Web3Rpcs { let synced_rpcs = self.watch_consensus_rpcs_sender.borrow(); if let Some(synced_rpcs) = synced_rpcs.as_ref() { - synced_rpcs.rpcs.clone() + synced_rpcs.best_rpcs.clone() } else { vec![] } @@ -1219,9 +1219,8 @@ impl Serialize for Web3Rpcs { /// TODO: should this be moved into a `impl Web3Rpc`? /// TODO: i think we still have sorts scattered around the code that should use this /// TODO: take AsRef or something like that? We don't need an Arc here -fn rpc_sync_status_sort_key(x: &Arc) -> (U64, u64, bool, OrderedFloat) { - let reversed_head_block = U64::MAX - - x.head_block +fn rpc_sync_status_sort_key(x: &Arc) -> (Reverse, u64, bool, OrderedFloat) { + let head_block = x.head_block .read() .as_ref() .map(|x| *x.number()) @@ -1241,7 +1240,7 @@ fn rpc_sync_status_sort_key(x: &Arc) -> (U64, u64, bool, OrderedFloat unimplemented!(), + Web3Provider::Mock => return Err(ProviderError::CustomError("mock provider can't respond".to_string())), Web3Provider::Ws(p) => p.request(method, params).await, Web3Provider::Http(p) | Web3Provider::Both(p, _) => { // TODO: i keep hearing that http is faster. but ws has always been better for me. investigate more with actual benchmarks