From 1fb4dd6ccc021ee22c92091c395e95c9198387c1 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Wed, 15 Feb 2023 12:33:43 -0800 Subject: [PATCH] i think it works --- web3_proxy/src/rpcs/blockchain.rs | 30 ++++++-- web3_proxy/src/rpcs/consensus.rs | 88 ++++++++++++++++-------- web3_proxy/src/rpcs/many.rs | 5 +- web3_proxy/src/rpcs/one.rs | 109 +++++++++++++++++++++--------- web3_proxy/src/rpcs/request.rs | 12 ++++ 5 files changed, 174 insertions(+), 70 deletions(-) diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index 9aa018a0..cd8957f5 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -415,6 +415,8 @@ impl Web3Rpcs { // TODO: what should we do if the block number of new_synced_connections is < old_synced_connections? wait? + let consensus_tier = new_synced_connections.tier; + let total_tiers = consensus_finder.len(); let backups_needed = new_synced_connections.backups_needed; let consensus_head_block = new_synced_connections.head_block.clone(); let num_consensus_rpcs = new_synced_connections.num_conns(); @@ -434,7 +436,9 @@ impl Web3Rpcs { match &old_consensus_head_connections.head_block { None => { debug!( - "first {}{}/{}/{} block={}, rpc={}", + "first {}/{} {}{}/{}/{} block={}, rpc={}", + consensus_tier, + total_tiers, backups_voted_str, num_consensus_rpcs, num_active_rpcs, @@ -469,7 +473,9 @@ impl Web3Rpcs { // no change in hash. no need to use head_block_sender // TODO: trace level if rpc is backup debug!( - "con {}{}/{}/{} con={} rpc={}@{}", + "con {}/{} {}{}/{}/{} con={} rpc={}@{}", + consensus_tier, + total_tiers, backups_voted_str, num_consensus_rpcs, num_active_rpcs, @@ -486,7 +492,9 @@ impl Web3Rpcs { } debug!( - "unc {}{}/{}/{} con_head={} old={} rpc={}@{}", + "unc {}/{} {}{}/{}/{} con_head={} old={} rpc={}@{}", + consensus_tier, + total_tiers, backups_voted_str, num_consensus_rpcs, num_active_rpcs, @@ -511,7 +519,9 @@ impl Web3Rpcs { // this is unlikely but possible // TODO: better log warn!( - "chain rolled back {}{}/{}/{} con={} old={} rpc={}@{}", + "chain rolled back {}/{} {}{}/{}/{} con={} old={} rpc={}@{}", + consensus_tier, + total_tiers, backups_voted_str, num_consensus_rpcs, num_active_rpcs, @@ -541,7 +551,9 @@ impl Web3Rpcs { } Ordering::Greater => { debug!( - "new {}{}/{}/{} con={} rpc={}@{}", + "new {}/{} {}{}/{}/{} con={} rpc={}@{}", + consensus_tier, + total_tiers, backups_voted_str, num_consensus_rpcs, num_active_rpcs, @@ -573,7 +585,9 @@ impl Web3Rpcs { if num_active_rpcs >= self.min_head_rpcs { // no consensus!!! error!( - "non {}{}/{}/{} rpc={}@{}", + "non {}/{} {}{}/{}/{} rpc={}@{}", + consensus_tier, + total_tiers, backups_voted_str, num_consensus_rpcs, num_active_rpcs, @@ -584,7 +598,9 @@ impl Web3Rpcs { } else { // no consensus, but we do not have enough rpcs connected yet to panic debug!( - "non {}{}/{}/{} rpc={}@{}", + "non {}/{} {}{}/{}/{} rpc={}@{}", + consensus_tier, + total_tiers, backups_voted_str, num_consensus_rpcs, num_active_rpcs, diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index 847892cf..62901b59 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -7,15 +7,18 @@ use anyhow::Context; use ethers::prelude::{H256, U64}; use hashbrown::{HashMap, HashSet}; use log::{debug, trace, warn}; +use moka::future::Cache; use serde::Serialize; use std::collections::BTreeMap; use std::fmt; use std::sync::Arc; +use tokio::time::Instant; /// A collection of Web3Rpcs that are on the same block. /// Serialize is so we can print it on our debug endpoint #[derive(Clone, Default, Serialize)] pub struct ConsensusWeb3Rpcs { + pub(super) tier: u64, pub(super) head_block: Option, // TODO: this should be able to serialize, but it isn't #[serde(skip_serializing)] @@ -74,22 +77,25 @@ impl Web3Rpcs { } } +type FirstSeenCache = Cache; + pub struct ConnectionsGroup { rpc_name_to_block: HashMap, // TODO: what if there are two blocks with the same number? highest_block: Option, -} - -impl Default for ConnectionsGroup { - fn default() -> Self { - Self { - rpc_name_to_block: Default::default(), - highest_block: Default::default(), - } - } + /// used to track rpc.head_latency. The same cache should be shared between all ConnectionsGroups + first_seen: FirstSeenCache, } impl ConnectionsGroup { + pub fn new(first_seen: FirstSeenCache) -> Self { + Self { + rpc_name_to_block: Default::default(), + highest_block: Default::default(), + first_seen, + } + } + pub fn len(&self) -> usize { self.rpc_name_to_block.len() } @@ -115,7 +121,17 @@ impl ConnectionsGroup { } } - fn insert(&mut self, rpc: &Web3Rpc, block: Web3ProxyBlock) -> Option { + async fn insert(&mut self, rpc: &Web3Rpc, block: Web3ProxyBlock) -> Option { + let first_seen = self + .first_seen + .get_with(*block.hash(), async move { Instant::now() }) + .await; + + // TODO: this should be 0 if we are first seen, but i think it will be slightly non-zero + rpc.head_latency + .write() + .record(first_seen.elapsed().as_secs_f64() * 1000.0); + // TODO: what about a reorg to the same height? if Some(block.number()) > self.highest_block.as_ref().map(|x| x.number()) { self.highest_block = Some(block.clone()); @@ -179,6 +195,7 @@ impl ConnectionsGroup { authorization: &Arc, web3_rpcs: &Web3Rpcs, min_consensus_block_num: Option, + tier: &u64, ) -> anyhow::Result { let mut maybe_head_block = match self.highest_block.clone() { None => return Err(anyhow::anyhow!("no blocks known")), @@ -191,13 +208,18 @@ impl ConnectionsGroup { if let Some(min_consensus_block_num) = min_consensus_block_num { maybe_head_block .number() + .saturating_add(1.into()) .saturating_sub(min_consensus_block_num) .as_u64() } else { - // TODO: get from app config? different chains probably should have different values. 10 is probably too much 10 }; + trace!( + "max_lag_consensus_to_highest: {}", + max_lag_consensus_to_highest + ); + let num_known = self.rpc_name_to_block.len(); if num_known < web3_rpcs.min_head_rpcs { @@ -338,7 +360,7 @@ impl ConnectionsGroup { } // success! this block has enough soft limit and nodes on it (or on later blocks) - let conns: Vec> = primary_consensus_rpcs + let rpcs: Vec> = primary_consensus_rpcs .into_iter() .filter_map(|conn_name| web3_rpcs.by_name.get(conn_name).cloned()) .collect(); @@ -349,8 +371,9 @@ impl ConnectionsGroup { let _ = maybe_head_block.number(); Ok(ConsensusWeb3Rpcs { + tier: *tier, head_block: Some(maybe_head_block), - rpcs: conns, + rpcs, backups_voted: backup_rpcs_voted, backups_needed: primary_rpcs_voted.is_none(), }) @@ -377,10 +400,15 @@ impl ConsensusFinder { max_block_age: Option, max_block_lag: Option, ) -> Self { + // TODO: what's a good capacity for this? + let first_seen = Cache::builder() + .max_capacity(16) + .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()); + // TODO: this will need some thought when config reloading is written let tiers = configured_tiers .iter() - .map(|x| (*x, Default::default())) + .map(|x| (*x, ConnectionsGroup::new(first_seen.clone()))) .collect(); Self { @@ -389,9 +417,11 @@ impl ConsensusFinder { max_block_lag, } } -} -impl ConsensusFinder { + pub fn len(&self) -> usize { + self.tiers.len() + } + /// get the ConnectionsGroup that contains all rpcs /// panics if there are no tiers pub fn all_rpcs_group(&self) -> Option<&ConnectionsGroup> { @@ -421,7 +451,11 @@ impl ConsensusFinder { } /// returns the block that the rpc was on before updating to the new_block - pub fn insert(&mut self, rpc: &Web3Rpc, new_block: Web3ProxyBlock) -> Option { + pub async fn insert( + &mut self, + rpc: &Web3Rpc, + new_block: Web3ProxyBlock, + ) -> Option { let mut old = None; // TODO: error if rpc.tier is not in self.tiers @@ -432,7 +466,7 @@ impl ConsensusFinder { } // TODO: should new_block be a ref? - let x = tier_group.insert(rpc, new_block.clone()); + let x = tier_group.insert(rpc, new_block.clone()).await; if old.is_none() && x.is_some() { old = x; @@ -473,7 +507,7 @@ impl ConsensusFinder { } } - if let Some(prev_block) = self.insert(&rpc, rpc_head_block.clone()) { + if let Some(prev_block) = self.insert(&rpc, rpc_head_block.clone()).await { if prev_block.hash() == rpc_head_block.hash() { // this block was already sent by this rpc. return early false @@ -527,13 +561,13 @@ impl ConsensusFinder { // TODO: how should errors be handled? // TODO: find the best tier with a connectionsgroup. best case, this only queries the first tier // TODO: do we need to calculate all of them? I think having highest_known_block included as part of min_block_num should make that unnecessary - for (i, x) in self.tiers.iter() { - trace!("checking tier {}: {:#?}", i, x.rpc_name_to_block); + for (tier, x) in self.tiers.iter() { + trace!("checking tier {}: {:#?}", tier, x.rpc_name_to_block); if let Ok(consensus_head_connections) = x - .consensus_head_connections(authorization, web3_connections, min_block_num) + .consensus_head_connections(authorization, web3_connections, min_block_num, tier) .await { - trace!("success on tier {}", i); + trace!("success on tier {}", tier); // we got one! hopefully it didn't need to use any backups. // but even if it did need backup servers, that is better than going to a worse tier return Ok(consensus_head_connections); @@ -546,8 +580,8 @@ impl ConsensusFinder { #[cfg(test)] mod test { - #[test] - fn test_simplest_case_consensus_head_connections() { - todo!(); - } + // #[test] + // fn test_simplest_case_consensus_head_connections() { + // todo!(); + // } } diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 19958016..4a4d1995 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -458,10 +458,7 @@ impl Web3Rpcs { max_block_needed )) } - cmp::Ordering::Less => { - // hmmmm - todo!("now what do we do?"); - } + cmp::Ordering::Less => min_block_needed.cmp(head_block_num), } } }; diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index 5b030bad..8bc94243 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -21,33 +21,74 @@ use serde_json::json; use std::cmp::min; use std::fmt; use std::hash::{Hash, Hasher}; -use std::sync::atomic::{self, AtomicU64}; +use std::sync::atomic::{self, AtomicU64, AtomicUsize}; use std::{cmp::Ordering, sync::Arc}; use thread_fast_rng::rand::Rng; use thread_fast_rng::thread_fast_rng; use tokio::sync::{broadcast, oneshot, watch, RwLock as AsyncRwLock}; use tokio::time::{sleep, sleep_until, timeout, Duration, Instant}; -pub struct Web3RpcLatencies { - /// Traack how far behind the fastest node we are - pub new_head: Histogram, - /// exponentially weighted moving average of how far behind the fastest node we are - pub new_head_ewma: u32, - /// Track how long an rpc call takes on average - pub request: Histogram, - /// exponentially weighted moving average of how far behind the fastest node we are - pub request_ewma: u32, +pub struct Latency { + /// Track how many milliseconds slower we are than the fastest node + pub histogram: Histogram, + /// exponentially weighted moving average of how many milliseconds behind the fastest node we are + pub ewma: ewma::EWMA, } -impl Default for Web3RpcLatencies { +impl Serialize for Latency { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let mut state = serializer.serialize_struct("latency", 6)?; + + state.serialize_field("ewma_ms", &self.ewma.value())?; + + state.serialize_field("histogram_len", &self.histogram.len())?; + state.serialize_field("mean_ms", &self.histogram.mean())?; + state.serialize_field("p50_ms", &self.histogram.value_at_quantile(0.50))?; + state.serialize_field("p75_ms", &self.histogram.value_at_quantile(0.75))?; + state.serialize_field("p99_ms", &self.histogram.value_at_quantile(0.99))?; + + state.end() + } +} + +impl Latency { + pub fn record(&mut self, milliseconds: f64) { + self.ewma.add(milliseconds); + + // histogram needs ints and not floats + self.histogram.record(milliseconds as u64).unwrap(); + } +} + +impl Default for Latency { fn default() -> Self { - todo!("use ewma crate, not u32"); - Self { - new_head: Histogram::new(3).unwrap(), - new_head_ewma: 0, - request: Histogram::new(3).unwrap(), - request_ewma: 0, - } + // TODO: what should the default sigfig be? + let sigfig = 0; + + // TODO: what should the default span be? 25 requests? have a "new" + let span = 25.0; + + Self::new(sigfig, span).expect("default histogram sigfigs should always work") + } +} + +impl Latency { + pub fn new(sigfig: u8, span: f64) -> Result { + let alpha = Self::span_to_alpha(span); + + let histogram = Histogram::new(sigfig)?; + + Ok(Self { + histogram, + ewma: ewma::EWMA::new(alpha), + }) + } + + fn span_to_alpha(span: f64) -> f64 { + 2.0 / (span + 1.0) } } @@ -83,8 +124,13 @@ pub struct Web3Rpc { pub(super) tier: u64, /// TODO: change this to a watch channel so that http providers can subscribe and take action on change. pub(super) head_block: RwLock>, - /// Track how fast this RPC is - pub(super) latency: Web3RpcLatencies, + /// Track head block latency + pub(super) head_latency: RwLock, + /// Track request latency + pub(super) request_latency: RwLock, + /// Track total requests served + /// TODO: maybe move this to graphana + pub(super) total_requests: AtomicUsize, } impl Web3Rpc { @@ -1081,7 +1127,7 @@ impl Serialize for Web3Rpc { S: Serializer, { // 3 is the number of fields in the struct. - let mut state = serializer.serialize_struct("Web3Rpc", 9)?; + let mut state = serializer.serialize_struct("Web3Rpc", 10)?; // the url is excluded because it likely includes private information. just show the name that we use in keys state.serialize_field("name", &self.name)?; @@ -1103,17 +1149,17 @@ impl Serialize for Web3Rpc { state.serialize_field("soft_limit", &self.soft_limit)?; - // TODO: keep this for the "popularity_contest" command? or maybe better to just use graphana? - // state.serialize_field( - // "frontend_requests", - // &self.frontend_requests.load(atomic::Ordering::Relaxed), - // )?; + // TODO: maybe this is too much data. serialize less? + state.serialize_field("head_block", &*self.head_block.read())?; - { - // TODO: maybe this is too much data. serialize less? - let head_block = &*self.head_block.read(); - state.serialize_field("head_block", head_block)?; - } + state.serialize_field("head_latency", &*self.head_latency.read())?; + + state.serialize_field("request_latency", &*self.request_latency.read())?; + + state.serialize_field( + "total_requests", + &self.total_requests.load(atomic::Ordering::Relaxed), + )?; state.end() } @@ -1207,7 +1253,6 @@ mod tests { let block_data_limit = 64; - // TODO: this is getting long. have a `impl Default` let x = Web3Rpc { name: "name".to_string(), soft_limit: 1_000, diff --git a/web3_proxy/src/rpcs/request.rs b/web3_proxy/src/rpcs/request.rs index b3f4864a..7a2d735d 100644 --- a/web3_proxy/src/rpcs/request.rs +++ b/web3_proxy/src/rpcs/request.rs @@ -183,6 +183,12 @@ impl OpenRequestHandle { let provider = provider.expect("provider was checked already"); + self.rpc + .total_requests + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); + + let start = Instant::now(); + // TODO: replace ethers-rs providers with our own that supports streaming the responses let response = match provider.as_ref() { #[cfg(test)] @@ -367,6 +373,12 @@ impl OpenRequestHandle { tokio::spawn(f); } } + } else { + // TODO: locking now will slow us down. send latency into a channel instead + self.rpc + .request_latency + .write() + .record(start.elapsed().as_secs_f64() * 1000.0); } response