From c66eb6d864cd87f1b1f18d04abf354ff5fe483a8 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Tue, 16 May 2023 12:18:59 -0700 Subject: [PATCH 1/2] head latency instead of peak latency for now --- TODO.md | 2 +- latency/src/ewma.rs | 16 +++++++++------- web3_proxy/src/rpcs/consensus.rs | 9 +++++---- web3_proxy/src/rpcs/one.rs | 15 ++++++++------- 4 files changed, 23 insertions(+), 19 deletions(-) diff --git a/TODO.md b/TODO.md index b405b251..195c293d 100644 --- a/TODO.md +++ b/TODO.md @@ -745,4 +745,4 @@ in another repo: event subscriber - [ ] tests for config reloading - [ ] use pin instead of arc for a bunch of things? - https://fasterthanli.me/articles/pin-and-suffering -- [ ] calculate archive depth automatically based on block_data_limits +- [ ] calculate archive depth automatically based on block_data_limits \ No newline at end of file diff --git a/latency/src/ewma.rs b/latency/src/ewma.rs index 073dad54..fe5b51f9 100644 --- a/latency/src/ewma.rs +++ b/latency/src/ewma.rs @@ -17,18 +17,19 @@ impl Serialize for EwmaLatency { } impl EwmaLatency { - #[inline(always)] + #[inline] pub fn record(&mut self, duration: Duration) { self.record_ms(duration.as_secs_f64() * 1000.0); } - #[inline(always)] + #[inline] pub fn record_ms(&mut self, milliseconds: f64) { - self.ewma.add(milliseconds); + // don't let it go under 0.1ms + self.ewma.add(milliseconds.max(0.1)); } /// Current EWMA value in milliseconds - #[inline(always)] + #[inline] pub fn value(&self) -> f64 { self.ewma.value() } @@ -36,10 +37,11 @@ impl EwmaLatency { impl Default for EwmaLatency { fn default() -> Self { - // TODO: what should the default span be? 25 requests? - let span = 25.0; + // TODO: what should the default span be? 10 requests? + let span = 10.0; - let start = 1000.0; + // TODO: what should the defautt start be? + let start = 1.0; Self::new(span, start) } diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index fbe10fa6..f2a4b576 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -7,7 +7,7 @@ use derive_more::Constructor; use ethers::prelude::{H256, U64}; use hashbrown::{HashMap, HashSet}; use itertools::{Itertools, MinMaxResult}; -use log::{trace, warn}; +use log::{debug, info, trace, warn}; use moka::future::Cache; use serde::Serialize; use std::cmp::Reverse; @@ -266,15 +266,16 @@ impl ConsensusFinder { async fn insert(&mut self, rpc: Arc, block: Web3ProxyBlock) -> Option { let first_seen = self .first_seen - .get_with_by_ref(block.hash(), async move { Instant::now() }) + .get_with_by_ref(block.hash(), async { Instant::now() }) .await; - // TODO: this should be 0 if we are first seen, but i think it will be slightly non-zero. - // calculate elapsed time before trying to lock. + // calculate elapsed time before trying to lock let latency = first_seen.elapsed(); + // record the time behind the fastest node rpc.head_latency.write().record(latency); + // update the local mapping of rpc -> block self.rpc_heads.insert(rpc, block) } diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index b9faea75..686e20d2 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -70,7 +70,6 @@ pub struct Web3Rpc { /// Track head block latency pub(super) head_latency: RwLock, /// Track peak request latency - /// /// This is only inside an Option so that the "Default" derive works. it will always be set. pub(super) peak_latency: Option, /// Track total requests served @@ -236,16 +235,18 @@ impl Web3Rpc { } pub fn peak_ewma(&self) -> OrderedFloat { - let peak_latency = if let Some(peak_latency) = self.peak_latency.as_ref() { - peak_latency.latency().as_secs_f64() - } else { - 0.0 - }; + // TODO: bug inside peak ewma somewhere. possible with atomics being relaxed or the conversion to pair and back + // let peak_latency = if let Some(peak_latency) = self.peak_latency.as_ref() { + // peak_latency.latency().as_secs_f64() + // } else { + // 0.0 + // }; + let head_latency = self.head_latency.read().value(); // TODO: what ordering? let active_requests = self.active_requests.load(atomic::Ordering::Acquire) as f64 + 1.0; - OrderedFloat(peak_latency * active_requests) + OrderedFloat(head_latency * active_requests) } // TODO: would be great if rpcs exposed this. see https://github.com/ledgerwatch/erigon/issues/6391 From 978c385b3c277852d9c81ff0e8e0b0ea36fce12b Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Tue, 16 May 2023 13:26:39 -0700 Subject: [PATCH 2/2] improve wait for block --- web3_proxy/src/rpcs/blockchain.rs | 2 +- web3_proxy/src/rpcs/consensus.rs | 25 ++++++++++++-- web3_proxy/src/rpcs/many.rs | 54 +++++++++++++++---------------- web3_proxy/src/stats/mod.rs | 6 ++-- 4 files changed, 53 insertions(+), 34 deletions(-) diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index 1ec3bc2c..828eacb3 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -444,7 +444,7 @@ impl Web3Rpcs { let total_tiers = consensus_finder.worst_tier().unwrap_or(10); let backups_needed = new_synced_connections.backups_needed; let consensus_head_block = new_synced_connections.head_block.clone(); - let num_consensus_rpcs = new_synced_connections.num_conns(); + let num_consensus_rpcs = new_synced_connections.num_consensus_rpcs(); let num_active_rpcs = consensus_finder.len(); let total_rpcs = self.by_name.load().len(); diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index f2a4b576..31ffe385 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -7,7 +7,7 @@ use derive_more::Constructor; use ethers::prelude::{H256, U64}; use hashbrown::{HashMap, HashSet}; use itertools::{Itertools, MinMaxResult}; -use log::{debug, info, trace, warn}; +use log::{trace, warn}; use moka::future::Cache; use serde::Serialize; use std::cmp::Reverse; @@ -111,10 +111,31 @@ pub struct ConsensusWeb3Rpcs { impl ConsensusWeb3Rpcs { #[inline] - pub fn num_conns(&self) -> usize { + pub fn num_consensus_rpcs(&self) -> usize { self.best_rpcs.len() } + pub fn best_block_num(&self, skip_rpcs: &[Arc]) -> Option<&U64> { + if self.best_rpcs.iter().all(|rpc| skip_rpcs.contains(rpc)) { + // all of the consensus rpcs are skipped + // iterate the other rpc tiers to find the next best block + let mut best_block = None; + for (next_ranking, next_rpcs) in self.other_rpcs.iter() { + if next_rpcs.iter().all(|rpc| skip_rpcs.contains(rpc)) { + // everything in this ranking is skipped + continue; + } + + best_block = best_block.max(next_ranking.head_num.as_ref()); + } + + best_block + } else { + // not all the best synced rpcs are skipped yet. use the best head block + Some(self.head_block.number()) + } + } + pub fn has_block_data(&self, rpc: &Web3Rpc, block_num: &U64) -> bool { self.rpc_data .get(rpc) diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index e9183d86..35dd5d2e 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -37,7 +37,6 @@ use std::fmt; use std::sync::atomic::Ordering; use std::sync::Arc; use thread_fast_rng::rand::seq::SliceRandom; -use tokio; use tokio::sync::{broadcast, watch}; use tokio::time::{interval, sleep, sleep_until, Duration, Instant, MissedTickBehavior}; @@ -630,7 +629,7 @@ impl Web3Rpcs { match earliest_retry_at { None => { // none of the servers gave us a time to retry at - debug!("no servers on {:?} gave a retry time", self); + debug!("no servers on {:?} gave a retry time. {:?}", self, skip); // TODO: bring this back? need to think about how to do this with `allow_backups` // we could return an error here, but maybe waiting a second will fix the problem @@ -784,7 +783,7 @@ impl Web3Rpcs { let mut skip_rpcs = vec![]; let mut method_not_available_response = None; - let mut watch_consensus_connections = self.watch_consensus_rpcs_sender.subscribe(); + let mut watch_consensus_rpcs = self.watch_consensus_rpcs_sender.subscribe(); let start = Instant::now(); @@ -949,8 +948,8 @@ impl Web3Rpcs { trace!("slept!"); skip_rpcs.pop(); } - _ = watch_consensus_connections.changed() => { - watch_consensus_connections.borrow_and_update(); + _ = watch_consensus_rpcs.changed() => { + watch_consensus_rpcs.borrow_and_update(); } } } @@ -961,14 +960,12 @@ impl Web3Rpcs { let waiting_for = min_block_needed.max(max_block_needed); - info!("waiting for {:?}", waiting_for); - - if watch_for_block(waiting_for, &mut watch_consensus_connections).await? { + if watch_for_block(waiting_for, &skip_rpcs, &mut watch_consensus_rpcs).await? { // block found! continue so we can check for another rpc } else { // rate limits are likely keeping us from serving the head block - watch_consensus_connections.changed().await?; - watch_consensus_connections.borrow_and_update(); + watch_consensus_rpcs.changed().await?; + watch_consensus_rpcs.borrow_and_update(); } } } @@ -994,7 +991,7 @@ impl Web3Rpcs { let needed = min_block_needed.max(max_block_needed); - let head_block_num = watch_consensus_connections + let head_block_num = watch_consensus_rpcs .borrow() .as_ref() .map(|x| *x.head_block.number()); @@ -1536,7 +1533,6 @@ mod tests { assert!(lagged_rpc.has_block_data(lagged_block.number.as_ref().unwrap())); assert!(!lagged_rpc.has_block_data(head_block.number.as_ref().unwrap())); - // todo!("this doesn't work anymore. send_head_block_result doesn't do anything when rpcs isn't watching the block_receiver") assert_eq!(rpcs.num_synced_rpcs(), 2); // add head block to the rpcs. lagged_rpc should not be available @@ -1917,22 +1913,24 @@ mod tests { /// returns `true` when the desired block number is available /// TODO: max wait time? max number of blocks to wait for? time is probably best async fn watch_for_block( - block_num: Option<&U64>, - watch_consensus_connections: &mut watch::Receiver>>, + needed_block_num: Option<&U64>, + skip_rpcs: &[Arc], + watch_consensus_rpcs: &mut watch::Receiver>>, ) -> Web3ProxyResult { - let mut head_block_num = watch_consensus_connections + info!("waiting for {:?}", needed_block_num); + + let mut best_block_num: Option = watch_consensus_rpcs .borrow_and_update() .as_ref() - .map(|x| *x.head_block.number()); + .and_then(|x| x.best_block_num(skip_rpcs).copied()); - match (block_num, head_block_num) { - (Some(x), Some(ref head)) => { - if x <= head { - // we are past this block and no servers have this block + match (needed_block_num, best_block_num.as_ref()) { + (Some(x), Some(best)) => { + if x <= best { + // the best block is past the needed block and no servers have the needed data // this happens if the block is old and all archive servers are offline // there is no chance we will get this block without adding an archive server to the config - - // TODO: i think this can also happen if we are being rate limited! + // TODO: i think this can also happen if we are being rate limited! but then waiting might work. need skip_rpcs to be smarter return Ok(false); } } @@ -1944,6 +1942,7 @@ async fn watch_for_block( } (Some(_), None) => { // block requested but no servers synced. we will wait + // TODO: if the web3rpcs connected to this consensus isn't watching head blocks, exit with an erorr (waiting for blocks won't ever work) } (None, Some(head)) => { // i don't think this is possible @@ -1955,13 +1954,14 @@ async fn watch_for_block( // future block is requested // wait for the block to arrive - while head_block_num < block_num.copied() { - watch_consensus_connections.changed().await?; + while best_block_num.as_ref() < needed_block_num { + watch_consensus_rpcs.changed().await?; - head_block_num = watch_consensus_connections - .borrow_and_update() + let consensus_rpcs = watch_consensus_rpcs.borrow_and_update(); + + best_block_num = consensus_rpcs .as_ref() - .map(|x| *x.head_block.number()); + .and_then(|x| x.best_block_num(skip_rpcs).copied()); } Ok(true) diff --git a/web3_proxy/src/stats/mod.rs b/web3_proxy/src/stats/mod.rs index 930f9e04..352adb58 100644 --- a/web3_proxy/src/stats/mod.rs +++ b/web3_proxy/src/stats/mod.rs @@ -17,7 +17,7 @@ use derive_more::From; use entities::sea_orm_active_enums::TrackingLevel; use entities::{balance, referee, referrer, rpc_accounting_v2, rpc_key, user, user_tier}; use influxdb2::models::DataPoint; -use log::{trace, warn}; +use log::{error, trace, warn}; use migration::sea_orm::prelude::Decimal; use migration::sea_orm::{ self, ActiveModelTrait, ColumnTrait, DatabaseConnection, EntityTrait, IntoActiveModel, @@ -406,7 +406,7 @@ impl BufferedRpcQueryStats { if new_available_balance < Decimal::from(10u64) && downgrade_user_role.title == "Premium" { // TODO: we could do this outside the balance low block, but I think its fine. or better, update the cache if <$10 and downgrade if <$1 if let Some(rpc_secret_key_cache) = rpc_secret_key_cache { - todo!("expire (or probably better to update) the user cache now that the balance is low"); + error!("expire (or probably better to update) the user cache now that the balance is low"); // actually i think we need to have 2 caches. otherwise users with 2 keys are going to have seperate caches // 1. rpc_secret_key_id -> AuthorizationChecks (cuz we don't want to hit the db every time) // 2. user_id -> Balance @@ -419,8 +419,6 @@ impl BufferedRpcQueryStats { // active_downgrade_user.save(db_conn).await?; } - // TODO: - // Get the referee, and the referrer // (2) Look up the code that this user used. This is the referee table let referee_object = match referee::Entity::find()