From cdea61cb6bf070bfd57af89e298d1f422930c75d Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Tue, 27 Jun 2023 12:36:41 -0700 Subject: [PATCH] one list for ranked rpcs --- Cargo.lock | 23 +- deferred-rate-limiter/Cargo.toml | 2 +- latency/Cargo.toml | 4 +- migration/Cargo.toml | 2 +- rate-counter/Cargo.toml | 2 +- redis-rate-limiter/Cargo.toml | 2 +- web3_proxy/Cargo.toml | 10 +- web3_proxy/src/app/mod.rs | 8 +- web3_proxy/src/bin/web3_proxy_cli/proxyd.rs | 2 +- web3_proxy/src/frontend/status.rs | 64 +-- web3_proxy/src/rpcs/blockchain.rs | 2 +- web3_proxy/src/rpcs/consensus.rs | 398 +++++++++---------- web3_proxy/src/rpcs/many.rs | 410 ++++++-------------- web3_proxy/src/rpcs/one.rs | 4 + 14 files changed, 389 insertions(+), 544 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 84afb0c9..bdc639d3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1676,6 +1676,19 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "env_logger" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85cdab6a89accf66733ad5a1693a4dcced6aeff64602b634530dd73c1f3ee9f0" +dependencies = [ + "humantime", + "is-terminal", + "log", + "regex", + "termcolor", +] + [[package]] name = "equivalent" version = "1.0.0" @@ -6102,11 +6115,12 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.28.2" +version = "1.29.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94d7b1cfd2aa4011f2de74c2c4c63665e27a71006b0a192dcd2710272e73dfa2" +checksum = "374442f06ee49c3a28a8fc9f01a2596fed7559c6b99b31279c3261778e77d84f" dependencies = [ "autocfg", + "backtrace", "bytes", "libc", "mio 0.8.8", @@ -6418,9 +6432,9 @@ dependencies = [ [[package]] name = "tracing-attributes" -version = "0.1.24" +version = "0.1.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f57e3ca2a01450b1a921183a9c9cbfda207fd822cef4ccb00a65402cbba7a74" +checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab" dependencies = [ "proc-macro2", "quote", @@ -6937,6 +6951,7 @@ dependencies = [ "derivative", "derive_more", "entities", + "env_logger", "ethbloom", "ethers", "fdlimit", diff --git a/deferred-rate-limiter/Cargo.toml b/deferred-rate-limiter/Cargo.toml index e765510f..8f2fd057 100644 --- a/deferred-rate-limiter/Cargo.toml +++ b/deferred-rate-limiter/Cargo.toml @@ -11,6 +11,6 @@ anyhow = "1.0.71" hashbrown = "0.14.0" log = "0.4.19" moka = { version = "0.11.2", features = ["future"] } -tokio = "1.28.2" +tokio = "1.29.0" tracing = "0.1.37" workspace-hack = { version = "0.1", path = "../workspace-hack" } diff --git a/latency/Cargo.toml b/latency/Cargo.toml index fb8522c4..fe09f165 100644 --- a/latency/Cargo.toml +++ b/latency/Cargo.toml @@ -10,10 +10,10 @@ flume = "0.10.14" log = "0.4.19" portable-atomic = { version = "1.3.3", features = ["float"] } serde = { version = "1.0.164", features = [] } -tokio = { version = "1.28.2", features = ["full"] } +tokio = { version = "1.29.0", features = ["full"] } tracing = "0.1.37" watermill = "0.1.1" workspace-hack = { version = "0.1", path = "../workspace-hack" } [dev-dependencies] -tokio = { version = "1.28.2", features = ["full", "test-util"] } +tokio = { version = "1.29.0", features = ["full", "test-util"] } diff --git a/migration/Cargo.toml b/migration/Cargo.toml index 6e573b8a..819d3e08 100644 --- a/migration/Cargo.toml +++ b/migration/Cargo.toml @@ -9,7 +9,7 @@ name = "migration" path = "src/lib.rs" [dependencies] -tokio = { version = "1.28.2", features = ["full", "tracing"] } +tokio = { version = "1.29.0", features = ["full", "tracing"] } workspace-hack = { version = "0.1", path = "../workspace-hack" } [dependencies.sea-orm-migration] diff --git a/rate-counter/Cargo.toml b/rate-counter/Cargo.toml index 20c3b692..a34c183a 100644 --- a/rate-counter/Cargo.toml +++ b/rate-counter/Cargo.toml @@ -5,5 +5,5 @@ authors = ["Bryan Stitt "] edition = "2021" [dependencies] -tokio = { version = "1.28.2", features = ["time"] } +tokio = { version = "1.29.0", features = ["time"] } workspace-hack = { version = "0.1", path = "../workspace-hack" } diff --git a/redis-rate-limiter/Cargo.toml b/redis-rate-limiter/Cargo.toml index ab549fe5..3d580c94 100644 --- a/redis-rate-limiter/Cargo.toml +++ b/redis-rate-limiter/Cargo.toml @@ -8,5 +8,5 @@ edition = "2021" anyhow = "1.0.71" chrono = "0.4.26" deadpool-redis = { version = "0.12.0", features = ["rt_tokio_1", "serde"] } -tokio = "1.28.2" +tokio = "1.29.0" workspace-hack = { version = "0.1", path = "../workspace-hack" } diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index af3e7939..94e868d2 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -90,7 +90,7 @@ serde_json = { version = "1.0.99", default-features = false, features = ["raw_va serde_prometheus = "0.2.3" strum = { version = "0.25.0", features = ["derive"] } time = { version = "0.3.22" } -tokio = { version = "1.28.2", features = ["full", "tracing"] } +tokio = { version = "1.29.0", features = ["full", "tracing"] } tokio-console = { version = "0.1.8", optional = true } tokio-stream = { version = "0.1.14", features = ["sync"] } tokio-uring = { version = "0.4.0", optional = true } @@ -105,8 +105,12 @@ uuid = { version = "1.4.0", default-features = false, features = ["fast-rng", "v derivative = "2.2.0" workspace-hack = { version = "0.1", path = "../workspace-hack" } -[dev-dependencies] +# TODO: why doesn't this work in dev-dependencies test-log = { version = "0.2.12", default-features = false, features = ["trace"] } -tokio = { version = "1.28.2", features = ["full", "test-util"] } + +[dev-dependencies] +env_logger = "0.10" +test-log = "0.2.12" +tokio = { version = "1.29.0", features = ["full", "test-util"] } tracing = {version = "0.1", default-features = false} tracing-subscriber = {version = "0.3", default-features = false, features = ["env-filter", "fmt"]} diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index ce43a2df..9c8150db 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -17,7 +17,7 @@ use crate::response_cache::{ JsonRpcQueryCacheKey, JsonRpcResponseCache, JsonRpcResponseEnum, JsonRpcResponseWeigher, }; use crate::rpcs::blockchain::Web3ProxyBlock; -use crate::rpcs::consensus::ConsensusWeb3Rpcs; +use crate::rpcs::consensus::RankedRpcs; use crate::rpcs::many::Web3Rpcs; use crate::rpcs::one::Web3Rpc; use crate::rpcs::provider::{connect_http, EthersHttpProvider}; @@ -175,9 +175,9 @@ pub struct Web3ProxyAppSpawn { /// these are important and must be allowed to finish pub background_handles: FuturesUnordered>, /// config changes are sent here - pub new_top_config_sender: watch::Sender, + pub new_top_config: watch::Sender, /// watch this to know when the app is ready to serve requests - pub consensus_connections_watcher: watch::Receiver>>, + pub ranked_rpcs: watch::Receiver>>, } impl Web3ProxyApp { @@ -1083,7 +1083,6 @@ impl Web3ProxyApp { Some(Duration::from_secs(30)), Some(Level::TRACE.into()), None, - true, ) .await; @@ -1114,7 +1113,6 @@ impl Web3ProxyApp { Some(Duration::from_secs(30)), Some(Level::TRACE.into()), num_public_rpcs, - true, ) .await } diff --git a/web3_proxy/src/bin/web3_proxy_cli/proxyd.rs b/web3_proxy/src/bin/web3_proxy_cli/proxyd.rs index a212fdfa..09ed4ad2 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/proxyd.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/proxyd.rs @@ -89,7 +89,7 @@ async fn run( // start thread for watching config if let Some(top_config_path) = top_config_path { - let config_sender = spawned_app.new_top_config_sender; + let config_sender = spawned_app.new_top_config; { let mut current_config = config_sender.borrow().clone(); diff --git a/web3_proxy/src/frontend/status.rs b/web3_proxy/src/frontend/status.rs index ec5318a8..4a77c144 100644 --- a/web3_proxy/src/frontend/status.rs +++ b/web3_proxy/src/frontend/status.rs @@ -4,7 +4,10 @@ //! They will eventually move to another port. use super::{ResponseCache, ResponseCacheKey}; -use crate::app::{Web3ProxyApp, APP_USER_AGENT}; +use crate::{ + app::{Web3ProxyApp, APP_USER_AGENT}, + errors::Web3ProxyError, +}; use axum::{ body::{Bytes, Full}, http::StatusCode, @@ -19,7 +22,8 @@ use moka::future::Cache; use once_cell::sync::Lazy; use serde::{ser::SerializeStruct, Serialize}; use serde_json::json; -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; +use tokio::time::timeout; use tracing::trace; static HEALTH_OK: Lazy = Lazy::new(|| Bytes::from("OK\n")); @@ -74,16 +78,20 @@ pub async fn debug_request( pub async fn health( Extension(app): Extension>, Extension(cache): Extension>, -) -> impl IntoResponse { - let (code, content_type, body) = cache - .get_with(ResponseCacheKey::Health, async move { _health(app).await }) - .await; +) -> Result { + let (code, content_type, body) = timeout( + Duration::from_secs(3), + cache.get_with(ResponseCacheKey::Health, async move { _health(app).await }), + ) + .await?; - Response::builder() + let x = Response::builder() .status(code) .header("content-type", content_type) .body(Full::from(body)) - .unwrap() + .unwrap(); + + Ok(x) } // TODO: _health doesn't need to be async, but _quick_cache_ttl needs an async function @@ -107,18 +115,22 @@ async fn _health(app: Arc) -> (StatusCode, &'static str, Bytes) { pub async fn backups_needed( Extension(app): Extension>, Extension(cache): Extension>, -) -> impl IntoResponse { - let (code, content_type, body) = cache - .get_with(ResponseCacheKey::BackupsNeeded, async move { +) -> Result { + let (code, content_type, body) = timeout( + Duration::from_secs(3), + cache.get_with(ResponseCacheKey::BackupsNeeded, async move { _backups_needed(app).await - }) - .await; + }), + ) + .await?; - Response::builder() + let x = Response::builder() .status(code) .header("content-type", content_type) .body(Full::from(body)) - .unwrap() + .unwrap(); + + Ok(x) } #[inline] @@ -126,11 +138,7 @@ async fn _backups_needed(app: Arc) -> (StatusCode, &'static str, B trace!("backups_needed is not cached"); let code = { - let consensus_rpcs = app - .balanced_rpcs - .watch_consensus_rpcs_sender - .borrow() - .clone(); + let consensus_rpcs = app.balanced_rpcs.watch_ranked_rpcs.borrow().clone(); if let Some(ref consensus_rpcs) = consensus_rpcs { if consensus_rpcs.backups_needed { @@ -158,16 +166,20 @@ async fn _backups_needed(app: Arc) -> (StatusCode, &'static str, B pub async fn status( Extension(app): Extension>, Extension(cache): Extension>, -) -> impl IntoResponse { - let (code, content_type, body) = cache - .get_with(ResponseCacheKey::Status, async move { _status(app).await }) - .await; +) -> Result { + let (code, content_type, body) = timeout( + Duration::from_secs(3), + cache.get_with(ResponseCacheKey::Status, async move { _status(app).await }), + ) + .await?; - Response::builder() + let x = Response::builder() .status(code) .header("content-type", content_type) .body(Full::from(body)) - .unwrap() + .unwrap(); + + Ok(x) } // TODO: _status doesn't need to be async, but _quick_cache_ttl needs an async function diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index 507bbe04..5a9250be 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -351,7 +351,7 @@ impl Web3Rpcs { // if theres multiple, use petgraph to find the one on the main chain (and remove the others if they have enough confirmations) let mut consensus_head_receiver = self - .watch_consensus_head_sender + .watch_head_block .as_ref() .web3_context("need new head subscriptions to fetch cannonical_block")? .subscribe(); diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index 8d5206ba..e7b19715 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -14,7 +14,6 @@ use itertools::{Itertools, MinMaxResult}; use moka::future::Cache; use serde::Serialize; use std::cmp::{Ordering, Reverse}; -use std::collections::BTreeMap; use std::fmt; use std::sync::{atomic, Arc}; use std::time::Duration; @@ -51,9 +50,10 @@ impl ConsensusRpcData { #[derive(Constructor, Clone, Copy, Debug, Default, Eq, PartialEq, Serialize)] pub struct RpcRanking { - tier: u32, backup: bool, - head_num: Option, + /// note: the servers in this tier might have blocks higher than this + consensus_head_num: Option, + tier: u32, } impl RpcRanking { @@ -65,11 +65,11 @@ impl RpcRanking { } fn sort_key(&self) -> (bool, Reverse>, u32) { - // TODO: add soft_limit here? add peak_ewma here? - // TODO: should backup or tier be checked first? now that tiers are automated, backups + // TODO: add sum_soft_limit here? add peak_ewma here? + // TODO: should backup or tier be checked first? now that tiers are automated, backups should be more reliable, but still leave them last + // while tier might give us better latency, giving requests to a server that is behind by a block will get in the way of it syncing. better to only query synced servers // TODO: should we include a random number in here? - // TODO: should we include peak_ewma_latency or weighted_peak_ewma_latency? - (!self.backup, Reverse(self.head_num), self.tier) + (!self.backup, Reverse(self.consensus_head_num), self.tier) } } @@ -85,8 +85,6 @@ impl PartialOrd for RpcRanking { } } -pub type RankedRpcMap = BTreeMap>>; - pub enum ShouldWaitForBlock { Ready, // BackupReady, @@ -99,31 +97,125 @@ pub enum ShouldWaitForBlock { /// Serialize is so we can print it on our /status endpoint /// TODO: remove head_block/head_rpcs/tier and replace with one RankedRpcMap /// TODO: add `best_rpc(method_data_kind, min_block_needed, max_block_needed, include_backups)` +/// TODO: make serializing work. the key needs to be a string. I think we need `serialize_with` #[derive(Clone, Serialize)] -pub struct ConsensusWeb3Rpcs { - pub(crate) tier: u32, - pub(crate) backups_needed: bool, +pub struct RankedRpcs { + pub head_block: Web3ProxyBlock, + pub num_synced: usize, + pub backups_needed: bool, - // TODO: Don't skip serializing, instead make a shorter serialize - #[serde(skip_serializing)] - pub(crate) head_block: Web3ProxyBlock, - - // TODO: make a shorter serialize - pub(crate) head_rpcs: Vec>, - - // TODO: make serializing work. the key needs to be a string. I think we need `serialize_with` - #[serde(skip_serializing)] - pub(crate) other_rpcs: RankedRpcMap, + inner: Vec>, // TODO: make serializing work. the key needs to be a string. I think we need `serialize_with` #[serde(skip_serializing)] rpc_data: HashMap, ConsensusRpcData>, } -impl ConsensusWeb3Rpcs { +impl RankedRpcs { + // /// useful when Web3Rpcs does not track the head block + // pub fn from_all(rpcs: &Web3Rpcs) -> Self { + // let inner = vec![( + // RpcRanking::default(), + // rpcs.by_name + // .read() + // .values() + // .cloned() + // .collect::>>(), + // )]; + + // todo!() + // } + + pub fn from_votes( + min_synced_rpcs: usize, + min_sum_soft_limit: u32, + max_lag_block: U64, + votes: HashMap>, u32)>, + heads: HashMap, Web3ProxyBlock>, + ) -> Option { + // find the blocks that meets our min_sum_soft_limit and min_synced_rpcs + let mut votes: Vec<_> = votes + .into_iter() + .filter_map(|(block, (rpcs, sum_soft_limit))| { + if *block.number() < max_lag_block + || sum_soft_limit < min_sum_soft_limit + || rpcs.len() < min_synced_rpcs + { + None + } else { + Some((block, sum_soft_limit, rpcs)) + } + }) + .collect(); + + // sort the votes + votes.sort_by_key(|(block, sum_soft_limit, _)| { + ( + Reverse(*block.number()), + // TODO: block total difficulty (if we have it) + Reverse(*sum_soft_limit), + // TODO: median/peak latency here? + ) + }); + + // return the first result that exceededs confgured minimums (if any) + if let Some((best_block, _, best_rpcs)) = votes.into_iter().next() { + let mut ranked_rpcs: Vec<_> = best_rpcs.into_iter().map(Arc::clone).collect(); + let mut rpc_data = HashMap::new(); + + let backups_needed = ranked_rpcs.iter().any(|x| x.backup); + let num_synced = ranked_rpcs.len(); + + // TODO: add all the unsynced rpcs + for (x, x_head) in heads.iter() { + let data = ConsensusRpcData::new(x, x_head); + + rpc_data.insert(x.clone(), data); + + if ranked_rpcs.contains(x) { + continue; + } + + if *x_head.number() < max_lag_block { + // server is too far behind + continue; + } + + ranked_rpcs.push(x.clone()); + } + + ranked_rpcs + .sort_by_cached_key(|x| x.sort_for_load_balancing_on(Some(*best_block.number()))); + + // consensus found! + trace!(?ranked_rpcs); + + let consensus = RankedRpcs { + backups_needed, + head_block: best_block, + rpc_data, + inner: ranked_rpcs, + num_synced, + }; + + return Some(consensus); + } + + None + } + + pub fn all(&self) -> &[Arc] { + &self.inner + } + + pub fn is_empty(&self) -> bool { + self.inner.is_empty() + } + #[inline] pub fn num_consensus_rpcs(&self) -> usize { - self.head_rpcs.len() + // TODO! wrong! this is now the total num of rpcs. we should keep the number on the head block saved + self.inner.len() } /// will tell you if waiting will eventually should wait for a block @@ -135,56 +227,17 @@ impl ConsensusWeb3Rpcs { needed_block_num: Option<&U64>, skip_rpcs: &[Arc], ) -> ShouldWaitForBlock { - // TODO: i think checking synced is always a waste of time. though i guess there could be a race - if self - .head_rpcs - .iter() - .any(|rpc| self.rpc_will_work_eventually(rpc, needed_block_num, skip_rpcs)) - { - let head_num = self.head_block.number(); - - if Some(head_num) >= needed_block_num { - trace!("best (head) block: {}", head_num); - return ShouldWaitForBlock::Ready; + for rpc in self.inner.iter() { + match self.rpc_will_work_eventually(rpc, needed_block_num, skip_rpcs) { + ShouldWaitForBlock::NeverReady => continue, + x => return x, } } - // all of the head rpcs are skipped - - let mut best_num = None; - - // iterate the other rpc tiers to find the next best block - for (next_ranking, next_rpcs) in self.other_rpcs.iter() { - if !next_rpcs - .iter() - .any(|rpc| self.rpc_will_work_eventually(rpc, needed_block_num, skip_rpcs)) - { - trace!("everything in this ranking ({:?}) is skipped", next_ranking); - continue; - } - - let next_head_num = next_ranking.head_num.as_ref(); - - if next_head_num >= needed_block_num { - trace!("best (head) block: {:?}", next_head_num); - return ShouldWaitForBlock::Ready; - } - - best_num = next_head_num; - } - - // TODO: this seems wrong - if best_num.is_some() { - trace!("best (old) block: {:?}", best_num); - ShouldWaitForBlock::Wait { - current: best_num.copied(), - } - } else { - trace!("never ready"); - ShouldWaitForBlock::NeverReady - } + ShouldWaitForBlock::NeverReady } + /// TODO: change this to take a min and a max pub fn has_block_data(&self, rpc: &Web3Rpc, block_num: &U64) -> bool { self.rpc_data .get(rpc) @@ -193,15 +246,17 @@ impl ConsensusWeb3Rpcs { } // TODO: take method as a param, too. mark nodes with supported methods (maybe do it optimistically? on) + // TODO: move this onto Web3Rpc? + // TODO: this needs min and max block on it pub fn rpc_will_work_eventually( &self, rpc: &Arc, needed_block_num: Option<&U64>, skip_rpcs: &[Arc], - ) -> bool { + ) -> ShouldWaitForBlock { if skip_rpcs.contains(rpc) { // if rpc is skipped, it must have already been determined it is unable to serve the request - return false; + return ShouldWaitForBlock::NeverReady; } if let Some(needed_block_num) = needed_block_num { @@ -210,26 +265,27 @@ impl ConsensusWeb3Rpcs { Ordering::Less => { trace!("{} is behind. let it catch up", rpc); // TODO: what if this is a pruned rpc that is behind by a lot, and the block is old, too? - return true; + return ShouldWaitForBlock::Wait { + current: Some(rpc_data.head_block_num), + }; } Ordering::Greater | Ordering::Equal => { // rpc is synced past the needed block. make sure the block isn't too old for it if self.has_block_data(rpc, needed_block_num) { trace!("{} has {}", rpc, needed_block_num); - return true; + return ShouldWaitForBlock::Ready; } else { trace!("{} does not have {}", rpc, needed_block_num); - return false; + return ShouldWaitForBlock::NeverReady; } } } } - - // no rpc data for this rpc. thats not promising - false + warn!(%rpc, "no rpc data for this rpc. thats not promising"); + ShouldWaitForBlock::NeverReady } else { // if no needed_block_num was specified, then this should work - true + ShouldWaitForBlock::Ready } } @@ -284,14 +340,10 @@ impl ConsensusWeb3Rpcs { // TODO: sum_hard_limit? } -impl fmt::Debug for ConsensusWeb3Rpcs { +impl fmt::Debug for RankedRpcs { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - // TODO: the default formatter takes forever to write. this is too quiet though - // TODO: print the actual conns? - f.debug_struct("ConsensusWeb3Rpcs") - .field("head_block", &self.head_block) - .field("num_conns", &self.head_rpcs.len()) - .finish_non_exhaustive() + // TODO: the default formatter takes forever to write. we need to print something though + f.debug_struct("RankedRpcs").finish_non_exhaustive() } } @@ -299,7 +351,7 @@ impl fmt::Debug for ConsensusWeb3Rpcs { impl Web3Rpcs { // TODO: return a ref? pub fn head_block(&self) -> Option { - self.watch_consensus_head_sender + self.watch_head_block .as_ref() .and_then(|x| x.borrow().clone()) } @@ -315,20 +367,20 @@ impl Web3Rpcs { } pub fn synced(&self) -> bool { - let consensus = self.watch_consensus_rpcs_sender.borrow(); + let consensus = self.watch_ranked_rpcs.borrow(); if let Some(consensus) = consensus.as_ref() { - !consensus.head_rpcs.is_empty() + !consensus.is_empty() } else { false } } pub fn num_synced_rpcs(&self) -> usize { - let consensus = self.watch_consensus_rpcs_sender.borrow(); + let consensus = self.watch_ranked_rpcs.borrow(); if let Some(consensus) = consensus.as_ref() { - consensus.head_rpcs.len() + consensus.num_synced } else { 0 } @@ -381,26 +433,22 @@ impl ConsensusFinder { authorization: &Arc, rpc: Option<&Arc>, new_block: Option, - ) -> Web3ProxyResult<()> { + ) -> Web3ProxyResult { let new_consensus_rpcs = match self .find_consensus_connections(authorization, web3_rpcs) .await + .web3_context("error while finding consensus head block!")? { - Err(err) => { - return Err(err).web3_context("error while finding consensus head block!"); - } - Ok(None) => { - return Err(Web3ProxyError::NoConsensusHeadBlock); - } - Ok(Some(x)) => x, + None => return Ok(false), + Some(x) => x, }; - trace!("new_synced_connections: {:#?}", new_consensus_rpcs); + trace!(?new_consensus_rpcs); - let watch_consensus_head_sender = web3_rpcs.watch_consensus_head_sender.as_ref().unwrap(); - let consensus_tier = new_consensus_rpcs.tier; - // TODO: think more about the default for total_tiers - let total_tiers = self.worst_tier().unwrap_or_default(); + let watch_consensus_head_sender = web3_rpcs.watch_head_block.as_ref().unwrap(); + // TODO: think more about the default for tiers + let best_tier = self.best_tier().unwrap_or_default(); + let worst_tier = self.worst_tier().unwrap_or_default(); let backups_needed = new_consensus_rpcs.backups_needed; let consensus_head_block = new_consensus_rpcs.head_block.clone(); let num_consensus_rpcs = new_consensus_rpcs.num_consensus_rpcs(); @@ -410,7 +458,7 @@ impl ConsensusFinder { let new_consensus_rpcs = Arc::new(new_consensus_rpcs); let old_consensus_head_connections = web3_rpcs - .watch_consensus_rpcs_sender + .watch_ranked_rpcs .send_replace(Some(new_consensus_rpcs.clone())); let backups_voted_str = if backups_needed { "B " } else { "" }; @@ -431,8 +479,8 @@ impl ConsensusFinder { None => { debug!( "first {}/{} {}{}/{}/{} block={}, rpc={}", - consensus_tier, - total_tiers, + best_tier, + worst_tier, backups_voted_str, num_consensus_rpcs, num_active_rpcs, @@ -469,8 +517,8 @@ impl ConsensusFinder { // TODO: trace level if rpc is backup debug!( "con {}/{} {}{}/{}/{} con={} rpc={}", - consensus_tier, - total_tiers, + best_tier, + worst_tier, backups_voted_str, num_consensus_rpcs, num_active_rpcs, @@ -483,8 +531,8 @@ impl ConsensusFinder { debug!( "unc {}/{} {}{}/{}/{} con={} old={} rpc={}", - consensus_tier, - total_tiers, + best_tier, + worst_tier, backups_voted_str, num_consensus_rpcs, num_active_rpcs, @@ -510,8 +558,8 @@ impl ConsensusFinder { // TODO: better log that includes all the votes warn!( "chain rolled back {}/{} {}{}/{}/{} con={} old={} rpc={}", - consensus_tier, - total_tiers, + best_tier, + worst_tier, backups_voted_str, num_consensus_rpcs, num_active_rpcs, @@ -542,8 +590,8 @@ impl ConsensusFinder { Ordering::Greater => { info!( "new {}/{} {}{}/{}/{} con={} rpc={}", - consensus_tier, - total_tiers, + best_tier, + worst_tier, backups_voted_str, num_consensus_rpcs, num_active_rpcs, @@ -569,7 +617,7 @@ impl ConsensusFinder { } } - Ok(()) + Ok(true) } pub(super) async fn process_block_from_rpc( @@ -579,7 +627,7 @@ impl ConsensusFinder { new_block: Option, rpc: Arc, _pending_tx_sender: &Option>, - ) -> Web3ProxyResult<()> { + ) -> Web3ProxyResult { // TODO: how should we handle an error here? if !self .update_rpc(new_block.clone(), rpc.clone(), web3_rpcs) @@ -587,7 +635,8 @@ impl ConsensusFinder { .web3_context("failed to update rpc")? { // nothing changed. no need to scan for a new consensus head - return Ok(()); + // TODO: this should this be true if there is an existing consensus? + return Ok(false); } self.refresh(web3_rpcs, authorization, Some(&rpc), new_block) @@ -754,7 +803,7 @@ impl ConsensusFinder { &mut self, authorization: &Arc, web3_rpcs: &Web3Rpcs, - ) -> Web3ProxyResult> { + ) -> Web3ProxyResult> { self.update_tiers().await?; let minmax_block = self.rpc_heads.values().minmax_by_key(|&x| x.number()); @@ -801,6 +850,12 @@ impl ConsensusFinder { let mut block_to_check = rpc_head.clone(); while block_to_check.number() >= lowest_block_number { + if let Some(max_age) = self.max_head_block_age { + if block_to_check.age() > max_age { + break; + } + } + if !rpc.backup { // backup nodes are excluded from the primary voting let entry = primary_votes.entry(block_to_check.clone()).or_default(); @@ -815,7 +870,6 @@ impl ConsensusFinder { backup_entry.0.insert(rpc); backup_entry.1 += rpc.soft_limit; - // we used to specify rpc on this, but it shouldn't be necessary let parent_hash = block_to_check.parent_hash(); // TODO: i flip flop on passing rpc to this or not match web3_rpcs @@ -837,99 +891,31 @@ impl ConsensusFinder { } // we finished processing all tiers. check for primary results (if anything but the last tier found consensus, we already returned above) - if let Some(consensus) = self.count_votes(&primary_votes, web3_rpcs) { + if let Some(consensus) = RankedRpcs::from_votes( + web3_rpcs.min_synced_rpcs, + web3_rpcs.min_sum_soft_limit, + max_lag_block_number, + primary_votes, + self.rpc_heads.clone(), + ) { return Ok(Some(consensus)); } // primary votes didn't work. hopefully backup tiers are synced - Ok(self.count_votes(&backup_votes, web3_rpcs)) + Ok(RankedRpcs::from_votes( + web3_rpcs.min_synced_rpcs, + web3_rpcs.min_sum_soft_limit, + max_lag_block_number, + backup_votes, + self.rpc_heads.clone(), + )) } - // TODO: have min_sum_soft_limit and min_head_rpcs on self instead of on Web3Rpcs - fn count_votes( - &self, - votes: &HashMap>, u32)>, - web3_rpcs: &Web3Rpcs, - ) -> Option { - // sort the primary votes ascending by tier and descending by block num - let mut votes: Vec<_> = votes - .into_iter() - .map(|(block, (rpc_names, sum_soft_limit))| (block, sum_soft_limit, rpc_names)) - .collect(); - votes.sort_by_key(|(block, sum_soft_limit, _)| { - (Reverse(*block.number()), Reverse(*sum_soft_limit)) - }); - - // return the first result that exceededs confgured minimums (if any) - for (maybe_head_block, sum_soft_limit, rpc_names) in votes { - if *sum_soft_limit < web3_rpcs.min_sum_soft_limit { - continue; - } - // TODO: different mins for backup vs primary - if rpc_names.len() < web3_rpcs.min_synced_rpcs { - continue; - } - - trace!("rpc_names: {:#?}", rpc_names); - - if rpc_names.len() < web3_rpcs.min_synced_rpcs { - continue; - } - - // consensus found! - let consensus_rpcs: Vec> = rpc_names.iter().map(|x| Arc::clone(x)).collect(); - - let tier = consensus_rpcs - .iter() - .map(|x| x.tier.load(atomic::Ordering::Relaxed)) - .max() - .expect("there should always be a max"); - - let backups_needed = consensus_rpcs.iter().any(|x| x.backup); - - let mut other_rpcs = BTreeMap::new(); - - for (x, x_head) in self - .rpc_heads - .iter() - .filter(|(k, _)| !consensus_rpcs.contains(k)) - { - let x_head_num = *x_head.number(); - - let key: RpcRanking = RpcRanking::new( - x.tier.load(atomic::Ordering::Relaxed), - x.backup, - Some(x_head_num), - ); - - other_rpcs - .entry(key) - .or_insert_with(Vec::new) - .push(x.clone()); - } - - // TODO: how should we populate this? - let mut rpc_data = HashMap::with_capacity(self.rpc_heads.len()); - - for (x, x_head) in self.rpc_heads.iter() { - let y = ConsensusRpcData::new(x, x_head); - - rpc_data.insert(x.clone(), y); - } - - let consensus = ConsensusWeb3Rpcs { - tier, - head_block: maybe_head_block.clone(), - head_rpcs: consensus_rpcs, - other_rpcs, - backups_needed, - rpc_data, - }; - - return Some(consensus); - } - - None + pub fn best_tier(&self) -> Option { + self.rpc_heads + .iter() + .map(|(x, _)| x.tier.load(atomic::Ordering::Relaxed)) + .min() } pub fn worst_tier(&self) -> Option { diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 6a737a4e..1af95ecc 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -1,6 +1,6 @@ //! Load balanced communication with a group of web3 rpc providers use super::blockchain::{BlocksByHashCache, BlocksByNumberCache, Web3ProxyBlock}; -use super::consensus::{ConsensusWeb3Rpcs, ShouldWaitForBlock}; +use super::consensus::{RankedRpcs, ShouldWaitForBlock}; use super::one::Web3Rpc; use super::request::{OpenRequestHandle, OpenRequestResult, RequestErrorHandler}; use crate::app::{flatten_handle, Web3ProxyApp, Web3ProxyJoinHandle}; @@ -17,7 +17,7 @@ use ethers::prelude::{ProviderError, TxHash, U64}; use futures::future::try_join_all; use futures::stream::FuturesUnordered; use futures::StreamExt; -use hashbrown::{HashMap, HashSet}; +use hashbrown::HashMap; use itertools::Itertools; use migration::sea_orm::DatabaseConnection; use moka::future::{Cache, CacheBuilder}; @@ -50,9 +50,9 @@ pub struct Web3Rpcs { /// TODO: document that this is a watch sender and not a broadcast! if things get busy, blocks might get missed /// TODO: why is watch_consensus_head_sender in an Option, but this one isn't? /// Geth's subscriptions have the same potential for skipping blocks. - pub(crate) watch_consensus_rpcs_sender: watch::Sender>>, + pub(crate) watch_ranked_rpcs: watch::Sender>>, /// this head receiver makes it easy to wait until there is a new block - pub(super) watch_consensus_head_sender: Option>>, + pub(super) watch_head_block: Option>>, /// keep track of transactions that we have sent through subscriptions pub(super) pending_transaction_cache: Cache, pub(super) pending_tx_id_receiver: flume::Receiver, @@ -89,8 +89,7 @@ impl Web3Rpcs { ) -> anyhow::Result<( Arc, Web3ProxyJoinHandle<()>, - watch::Receiver>>, - // watch::Receiver>, + watch::Receiver>>, )> { let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded(); let (block_sender, block_receiver) = flume::unbounded::(); @@ -136,8 +135,8 @@ impl Web3Rpcs { pending_transaction_cache, pending_tx_id_receiver, pending_tx_id_sender, - watch_consensus_head_sender, - watch_consensus_rpcs_sender, + watch_head_block: watch_consensus_head_sender, + watch_ranked_rpcs: watch_consensus_rpcs_sender, }); let authorization = Arc::new(Authorization::internal(db_conn)?); @@ -198,7 +197,7 @@ impl Web3Rpcs { let http_client = app.http_client.clone(); let vredis_pool = app.vredis_pool.clone(); - let block_sender = if self.watch_consensus_head_sender.is_some() { + let block_sender = if self.watch_head_block.is_some() { Some(self.block_sender.clone()) } else { None @@ -342,7 +341,7 @@ impl Web3Rpcs { } // setup the block funnel - if self.watch_consensus_head_sender.is_some() { + if self.watch_head_block.is_some() { let connections = Arc::clone(&self); let pending_tx_sender = pending_tx_sender.clone(); @@ -526,211 +525,92 @@ impl Web3Rpcs { .and_then(|x| x.authorization.clone()) .unwrap_or_default(); - if self.watch_consensus_head_sender.is_none() { - // this Web3Rpcs is not tracking head blocks. pick any server + let mut watch_ranked_rpcs = self.watch_ranked_rpcs.subscribe(); - let mut potential_rpcs: Vec<_> = self - .by_name - .read() - .values() - .filter(|rpc| !skip_rpcs.contains(rpc)) - .filter(|rpc| { - min_block_needed - .map(|x| rpc.has_block_data(x)) - .unwrap_or(true) - }) - .filter(|rpc| { - max_block_needed - .map(|x| rpc.has_block_data(x)) - .unwrap_or(true) - }) - .cloned() - .collect(); + let mut potential_rpcs = Vec::with_capacity(self.len()); - potential_rpcs - .sort_by_cached_key(|x| x.shuffle_for_load_balancing_on(max_block_needed.copied())); + loop { + // TODO: need a change so that protected and 4337 rpcs set watch_consensus_rpcs on start + let ranked_rpcs: Option> = + watch_ranked_rpcs.borrow_and_update().clone(); - match self - ._best_available_rpc(&authorization, error_handler, &potential_rpcs, skip_rpcs) - .await - { - OpenRequestResult::Handle(x) => return Ok(OpenRequestResult::Handle(x)), - OpenRequestResult::NotReady => {} - OpenRequestResult::RetryAt(retry_at) => { - if earliest_retry_at.is_none() { - earliest_retry_at = Some(retry_at); - } else { - earliest_retry_at = earliest_retry_at.min(Some(retry_at)); + // first check everything that is synced + // even though we might be querying an old block that an unsynced server can handle, + // it is best to not send queries to a syncing server. that slows down sync and can bloat erigon's disk usage. + if let Some(ranked_rpcs) = ranked_rpcs { + potential_rpcs.extend( + ranked_rpcs + .all() + .iter() + .filter(|rpc| { + ranked_rpcs.rpc_will_work_now( + skip_rpcs, + min_block_needed, + max_block_needed, + rpc, + ) + }) + .cloned(), + ); + + if potential_rpcs.len() >= self.min_synced_rpcs { + // we have enough potential rpcs. try to load balance + potential_rpcs.sort_by_cached_key(|x| { + x.shuffle_for_load_balancing_on(max_block_needed.copied()) + }); + + match self + ._best_available_rpc( + &authorization, + error_handler, + &potential_rpcs, + skip_rpcs, + ) + .await + { + OpenRequestResult::Handle(x) => return Ok(OpenRequestResult::Handle(x)), + OpenRequestResult::NotReady => {} + OpenRequestResult::RetryAt(retry_at) => { + if earliest_retry_at.is_none() { + earliest_retry_at = Some(retry_at); + } else { + earliest_retry_at = earliest_retry_at.min(Some(retry_at)); + } + } } } - } - } else { - let mut watch_consensus_rpcs = self.watch_consensus_rpcs_sender.subscribe(); - let mut potential_rpcs = Vec::with_capacity(self.len()); + let waiting_for = min_block_needed.max(max_block_needed); - loop { - let consensus_rpcs = watch_consensus_rpcs.borrow_and_update().clone(); - - potential_rpcs.clear(); - - // first check everything that is synced - // even though we might be querying an old block that an unsynced server can handle, - // it is best to not send queries to a syncing server. that slows down sync and can bloat erigon's disk usage. - if let Some(consensus_rpcs) = consensus_rpcs { - potential_rpcs.extend( - consensus_rpcs - .head_rpcs - .iter() - .filter(|rpc| { - consensus_rpcs.rpc_will_work_now( - skip_rpcs, - min_block_needed, - max_block_needed, - rpc, - ) - }) - .cloned(), - ); - - if potential_rpcs.len() >= self.min_synced_rpcs { - // we have enough potential rpcs. try to load balance - potential_rpcs.sort_by_cached_key(|x| { - x.shuffle_for_load_balancing_on(max_block_needed.copied()) - }); - - match self - ._best_available_rpc( - &authorization, - error_handler, - &potential_rpcs, - skip_rpcs, - ) - .await - { - OpenRequestResult::Handle(x) => { - return Ok(OpenRequestResult::Handle(x)) - } - OpenRequestResult::NotReady => {} - OpenRequestResult::RetryAt(retry_at) => { - if earliest_retry_at.is_none() { - earliest_retry_at = Some(retry_at); - } else { - earliest_retry_at = earliest_retry_at.min(Some(retry_at)); - } + if let Some(max_wait) = max_wait { + match ranked_rpcs.should_wait_for_block(waiting_for, skip_rpcs) { + ShouldWaitForBlock::NeverReady => break, + ShouldWaitForBlock::Ready => { + if start.elapsed() > max_wait { + break; } } - - // these rpcs were tried. don't try them again - potential_rpcs.clear(); - } - - for next_rpcs in consensus_rpcs.other_rpcs.values() { - let more_rpcs = next_rpcs - .iter() - .filter(|rpc| { - consensus_rpcs.rpc_will_work_now( - skip_rpcs, - min_block_needed, - max_block_needed, - rpc, - ) - }) - .cloned(); - - potential_rpcs.extend(more_rpcs); - - if potential_rpcs.len() >= self.min_synced_rpcs { - // we have enough potential rpcs. try to load balance - potential_rpcs.sort_by_cached_key(|x| { - x.shuffle_for_load_balancing_on(max_block_needed.copied()) - }); - - match self - ._best_available_rpc( - &authorization, - error_handler, - &potential_rpcs, - skip_rpcs, - ) - .await - { - OpenRequestResult::Handle(x) => { - return Ok(OpenRequestResult::Handle(x)) - } - OpenRequestResult::NotReady => {} - OpenRequestResult::RetryAt(retry_at) => { - if earliest_retry_at.is_none() { - earliest_retry_at = Some(retry_at); - } else { - earliest_retry_at = earliest_retry_at.min(Some(retry_at)); - } - } - } - - // these rpcs were tried. don't try them again - potential_rpcs.clear(); - } - } - - if !potential_rpcs.is_empty() { - // even after scanning all the tiers, there are not enough rpcs that can serve this request. try anyways - potential_rpcs.sort_by_cached_key(|x| { - x.shuffle_for_load_balancing_on(max_block_needed.copied()) - }); - - match self - ._best_available_rpc( - &authorization, - error_handler, - &potential_rpcs, - skip_rpcs, - ) - .await - { - OpenRequestResult::Handle(x) => { - return Ok(OpenRequestResult::Handle(x)) - } - OpenRequestResult::NotReady => {} - OpenRequestResult::RetryAt(retry_at) => { - if earliest_retry_at.is_none() { - earliest_retry_at = Some(retry_at); - } else { - earliest_retry_at = earliest_retry_at.min(Some(retry_at)); - } - } - } - } - - let waiting_for = min_block_needed.max(max_block_needed); - - if let Some(max_wait) = max_wait { - match consensus_rpcs.should_wait_for_block(waiting_for, skip_rpcs) { - ShouldWaitForBlock::NeverReady => break, - ShouldWaitForBlock::Ready => { - if start.elapsed() > max_wait { - break; - } - } - ShouldWaitForBlock::Wait { .. } => select! { - _ = watch_consensus_rpcs.changed() => { - // no need to borrow_and_update because we do that at the top of the loop - }, - _ = sleep_until(start + max_wait) => break, + ShouldWaitForBlock::Wait { .. } => select! { + _ = watch_ranked_rpcs.changed() => { + // no need to borrow_and_update because we do that at the top of the loop }, - } - } - } else if let Some(max_wait) = max_wait { - select! { - _ = watch_consensus_rpcs.changed() => { - // no need to borrow_and_update because we do that at the top of the loop + _ = sleep_until(start + max_wait) => break, }, - _ = sleep_until(start + max_wait) => break, } - } else { - break; } + } else if let Some(max_wait) = max_wait { + select! { + _ = watch_ranked_rpcs.changed() => { + // no need to borrow_and_update because we do that at the top of the loop + }, + _ = sleep_until(start + max_wait) => break, + } + } else { + break; } + + // clear for the next loop + potential_rpcs.clear(); } if let Some(request_metadata) = request_metadata { @@ -765,7 +645,6 @@ impl Web3Rpcs { min_block_needed: Option<&U64>, max_block_needed: Option<&U64>, max_count: Option, - allow_backups: bool, error_level: Option, ) -> Result, Option> { let mut earliest_retry_at = None; @@ -785,27 +664,10 @@ impl Web3Rpcs { let mut selected_rpcs = Vec::with_capacity(max_count); - let mut tried = HashSet::new(); - - let mut synced_rpcs = { - let synced_rpcs = self.watch_consensus_rpcs_sender.borrow(); - - if let Some(synced_rpcs) = synced_rpcs.as_ref() { - synced_rpcs.head_rpcs.clone() - } else { - // TODO: make this an Option instead of making an empty vec? - vec![] - } - }; - - // synced connections are all on the same block. sort them by tier with higher soft limits first - synced_rpcs.sort_by_cached_key(|x| x.sort_for_load_balancing_on(max_block_needed.copied())); - - trace!("synced_rpcs: {:#?}", synced_rpcs); - - // if there aren't enough synced connections, include more connections - // TODO: only do this sorting if the synced_rpcs isn't enough + // TODO: filter the rpcs here let mut all_rpcs: Vec<_> = self.by_name.read().values().cloned().collect(); + + // TODO: this sorts them all even though we probably won't need all of them. think about this more all_rpcs.sort_by_cached_key(|x| x.sort_for_load_balancing_on(max_block_needed.copied())); trace!("all_rpcs: {:#?}", all_rpcs); @@ -814,21 +676,10 @@ impl Web3Rpcs { .and_then(|x| x.authorization.clone()) .unwrap_or_default(); - for rpc in itertools::chain(synced_rpcs, all_rpcs) { - if tried.contains(&rpc) { - continue; - } - + for rpc in all_rpcs { trace!("trying {}", rpc); - tried.insert(rpc.clone()); - - if !allow_backups && rpc.backup { - trace!("{} is a backup. skipping", rpc); - continue; - } - - // TODO: this has_block_data check is in a few places now. move it onto the rpc + // TODO: use a helper function for these if let Some(block_needed) = min_block_needed { if !rpc.has_block_data(block_needed) { trace!("{} is missing min_block_needed. skipping", rpc); @@ -900,7 +751,7 @@ impl Web3Rpcs { let mut skip_rpcs = vec![]; let mut method_not_available_response = None; - let mut watch_consensus_rpcs = self.watch_consensus_rpcs_sender.subscribe(); + let mut watch_consensus_rpcs = self.watch_ranked_rpcs.subscribe(); let start = Instant::now(); @@ -1182,9 +1033,8 @@ impl Web3Rpcs { max_wait: Option, error_level: Option, max_sends: Option, - include_backups: bool, ) -> Web3ProxyResult> { - let mut watch_consensus_rpcs = self.watch_consensus_rpcs_sender.subscribe(); + let mut watch_consensus_rpcs = self.watch_ranked_rpcs.subscribe(); let start = Instant::now(); @@ -1201,7 +1051,6 @@ impl Web3Rpcs { min_block_needed, max_block_needed, max_sends, - include_backups, error_level, ) .await @@ -1359,7 +1208,7 @@ impl Serialize for Web3Rpcs { } { - let consensus_rpcs = self.watch_consensus_rpcs_sender.borrow().clone(); + let consensus_rpcs = self.watch_ranked_rpcs.borrow().clone(); // TODO: rename synced_connections to consensus_rpcs if let Some(consensus_rpcs) = consensus_rpcs.as_ref() { @@ -1382,10 +1231,10 @@ impl Serialize for Web3Rpcs { state.serialize_field( "watch_consensus_rpcs_receivers", - &self.watch_consensus_rpcs_sender.receiver_count(), + &self.watch_ranked_rpcs.receiver_count(), )?; - if let Some(ref x) = self.watch_consensus_head_sender { + if let Some(ref x) = self.watch_head_block { state.serialize_field("watch_consensus_head_receivers", &x.receiver_count())?; } else { state.serialize_field("watch_consensus_head_receivers", &None::<()>)?; @@ -1415,16 +1264,8 @@ mod tests { PeakEwmaLatency::spawn(Duration::from_secs(1), 4, Duration::from_secs(1)) } - // TODO: logging - #[tokio::test(start_paused = true)] + #[test_log::test(tokio::test)] async fn test_sort_connections_by_sync_status() { - // TODO: how should we do test logging setup with tracing? - // let _ = env_logger::builder() - // .filter_level(LevelFilter::Error) - // .filter_module("web3_proxy", LevelFilter::Trace) - // .is_test(true) - // .try_init(); - let block_0 = Block { number: Some(0.into()), hash: Some(H256::random()), @@ -1511,15 +1352,8 @@ mod tests { assert_eq!(names_in_sort_order, ["c", "f", "b", "e", "a", "d"]); } - #[tokio::test(start_paused = true)] + #[test_log::test(tokio::test)] async fn test_server_selection_by_height() { - // // TODO: how should we do test logging setup with tracing? - // let _ = env_logger::builder() - // .filter_level(LevelFilter::Error) - // .filter_module("web3_proxy", LevelFilter::Trace) - // .is_test(true) - // .try_init(); - let now = chrono::Utc::now().timestamp().into(); let lagged_block = Block { @@ -1550,7 +1384,6 @@ mod tests { automatic_block_limit: false, backup: false, block_data_limit: block_data_limit.into(), - // tier: 0, head_block: Some(tx_synced), peak_latency: Some(new_peak_latency()), ..Default::default() @@ -1564,7 +1397,6 @@ mod tests { automatic_block_limit: false, backup: false, block_data_limit: block_data_limit.into(), - // tier: 0, head_block: Some(tx_lagged), peak_latency: Some(new_peak_latency()), ..Default::default() @@ -1586,7 +1418,7 @@ mod tests { let (block_sender, _block_receiver) = flume::unbounded(); let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded(); - let (watch_consensus_rpcs_sender, _watch_consensus_rpcs_receiver) = watch::channel(None); + let (watch_ranked_rpcs, _watch_consensus_rpcs_receiver) = watch::channel(None); let (watch_consensus_head_sender, _watch_consensus_head_receiver) = watch::channel(None); let chain_id = 1; @@ -1597,8 +1429,8 @@ mod tests { by_name: RwLock::new(rpcs_by_name), chain_id, name: "test".to_string(), - watch_consensus_head_sender: Some(watch_consensus_head_sender), - watch_consensus_rpcs_sender, + watch_head_block: Some(watch_consensus_head_sender), + watch_ranked_rpcs, pending_transaction_cache: CacheBuilder::new(100) .time_to_live(Duration::from_secs(60)) .build(), @@ -1638,7 +1470,7 @@ mod tests { // all_backend_connections gives all non-backup servers regardless of sync status assert_eq!( - rpcs.all_connections(None, None, None, None, false, None) + rpcs.all_connections(None, None, None, None, None) .await .unwrap() .len(), @@ -1802,15 +1634,8 @@ mod tests { assert!(matches!(future_rpc, Ok(OpenRequestResult::NotReady))); } - #[tokio::test(start_paused = true)] + #[test_log::test(tokio::test)] async fn test_server_selection_by_archive() { - // // TODO: how should we do test logging setup with tracing? - // let _ = env_logger::builder() - // .filter_level(LevelFilter::Error) - // .filter_module("web3_proxy", LevelFilter::Trace) - // .is_test(true) - // .try_init(); - let now = chrono::Utc::now().timestamp().into(); let head_block = Block { @@ -1831,7 +1656,7 @@ mod tests { automatic_block_limit: false, backup: false, block_data_limit: 64.into(), - // tier: 1, + tier: 1.into(), head_block: Some(tx_pruned), ..Default::default() }; @@ -1844,7 +1669,7 @@ mod tests { automatic_block_limit: false, backup: false, block_data_limit: u64::MAX.into(), - // tier: 2, + tier: 2.into(), head_block: Some(tx_archive), ..Default::default() }; @@ -1864,7 +1689,7 @@ mod tests { let (block_sender, _) = flume::unbounded(); let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded(); - let (watch_consensus_rpcs_sender, _) = watch::channel(None); + let (watch_ranked_rpcs, _) = watch::channel(None); let (watch_consensus_head_sender, _watch_consensus_head_receiver) = watch::channel(None); let chain_id = 1; @@ -1874,8 +1699,8 @@ mod tests { by_name: RwLock::new(rpcs_by_name), chain_id, name: "test".to_string(), - watch_consensus_head_sender: Some(watch_consensus_head_sender), - watch_consensus_rpcs_sender, + watch_head_block: Some(watch_consensus_head_sender), + watch_ranked_rpcs, pending_transaction_cache: CacheBuilder::new(100) .time_to_live(Duration::from_secs(120)) .build(), @@ -1897,8 +1722,8 @@ mod tests { let mut connection_heads = ConsensusFinder::new(None, None); - // min sum soft limit will require tier 2 - connection_heads + // min sum soft limit will require 2 servers + let x = connection_heads .process_block_from_rpc( &rpcs, &authorization, @@ -1907,9 +1732,12 @@ mod tests { &None, ) .await - .unwrap_err(); + .unwrap(); + assert!(!x); - connection_heads + assert_eq!(rpcs.num_synced_rpcs(), 0); + + let x = connection_heads .process_block_from_rpc( &rpcs, &authorization, @@ -1919,6 +1747,7 @@ mod tests { ) .await .unwrap(); + assert!(x); assert_eq!(rpcs.num_synced_rpcs(), 2); @@ -1976,8 +1805,7 @@ mod tests { } } - // TODO: #[test_log::test(tokio::test)] - #[tokio::test] + #[test_log::test(tokio::test)] async fn test_all_connections() { // TODO: use chrono, not SystemTime let now: U256 = SystemTime::now() @@ -2049,7 +1877,7 @@ mod tests { let (block_sender, _) = flume::unbounded(); let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded(); - let (watch_consensus_rpcs_sender, _) = watch::channel(None); + let (watch_ranked_rpcs, _) = watch::channel(None); let (watch_consensus_head_sender, _watch_consensus_head_receiver) = watch::channel(None); let chain_id = 1; @@ -2060,8 +1888,8 @@ mod tests { by_name: RwLock::new(rpcs_by_name), chain_id, name: "test".to_string(), - watch_consensus_head_sender: Some(watch_consensus_head_sender), - watch_consensus_rpcs_sender, + watch_head_block: Some(watch_consensus_head_sender), + watch_ranked_rpcs, pending_transaction_cache: Cache::new(10_000), pending_tx_id_receiver, pending_tx_id_sender, @@ -2104,7 +1932,7 @@ mod tests { // best_synced_backend_connection requires servers to be synced with the head block // TODO: test with and without passing the head_block.number? let head_connections = rpcs - .all_connections(None, Some(block_2.number()), None, None, false, None) + .all_connections(None, Some(block_2.number()), None, None, None) .await; debug!("head_connections: {:#?}", head_connections); @@ -2116,7 +1944,7 @@ mod tests { ); let all_connections = rpcs - .all_connections(None, Some(block_1.number()), None, None, false, None) + .all_connections(None, Some(block_1.number()), None, None, None) .await; debug!("all_connections: {:#?}", all_connections); @@ -2127,9 +1955,7 @@ mod tests { "wrong number of connections" ); - let all_connections = rpcs - .all_connections(None, None, None, None, false, None) - .await; + let all_connections = rpcs.all_connections(None, None, None, None, None).await; debug!("all_connections: {:#?}", all_connections); diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index 98294a23..3b39280a 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -234,6 +234,7 @@ impl Web3Rpc { /// TODO: tests on this! /// TODO: should tier or block number take priority? /// TODO: should this return a struct that implements sorting traits? + /// TODO: move this to consensus.rs fn sort_on(&self, max_block: Option) -> (bool, Reverse, u32) { let mut head_block = self .head_block @@ -252,6 +253,7 @@ impl Web3Rpc { (!backup, Reverse(head_block), tier) } + /// TODO: move this to consensus.rs pub fn sort_for_load_balancing_on( &self, max_block: Option, @@ -268,6 +270,7 @@ impl Web3Rpc { } /// like sort_for_load_balancing, but shuffles tiers randomly instead of sorting by weighted_peak_latency + /// TODO: move this to consensus.rs pub fn shuffle_for_load_balancing_on( &self, max_block: Option, @@ -444,6 +447,7 @@ impl Web3Rpc { .into()); } + // TODO: only do this for balanced_rpcs. this errors on 4337 rpcs self.check_block_data_limit() .await .context(format!("unable to check_block_data_limit of {}", self))?;