diff --git a/TODO.md b/TODO.md index b405b251..195c293d 100644 --- a/TODO.md +++ b/TODO.md @@ -745,4 +745,4 @@ in another repo: event subscriber - [ ] tests for config reloading - [ ] use pin instead of arc for a bunch of things? - https://fasterthanli.me/articles/pin-and-suffering -- [ ] calculate archive depth automatically based on block_data_limits +- [ ] calculate archive depth automatically based on block_data_limits \ No newline at end of file diff --git a/latency/src/ewma.rs b/latency/src/ewma.rs index 073dad54..fe5b51f9 100644 --- a/latency/src/ewma.rs +++ b/latency/src/ewma.rs @@ -17,18 +17,19 @@ impl Serialize for EwmaLatency { } impl EwmaLatency { - #[inline(always)] + #[inline] pub fn record(&mut self, duration: Duration) { self.record_ms(duration.as_secs_f64() * 1000.0); } - #[inline(always)] + #[inline] pub fn record_ms(&mut self, milliseconds: f64) { - self.ewma.add(milliseconds); + // don't let it go under 0.1ms + self.ewma.add(milliseconds.max(0.1)); } /// Current EWMA value in milliseconds - #[inline(always)] + #[inline] pub fn value(&self) -> f64 { self.ewma.value() } @@ -36,10 +37,11 @@ impl EwmaLatency { impl Default for EwmaLatency { fn default() -> Self { - // TODO: what should the default span be? 25 requests? - let span = 25.0; + // TODO: what should the default span be? 10 requests? + let span = 10.0; - let start = 1000.0; + // TODO: what should the defautt start be? + let start = 1.0; Self::new(span, start) } diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index fbe10fa6..f2a4b576 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -7,7 +7,7 @@ use derive_more::Constructor; use ethers::prelude::{H256, U64}; use hashbrown::{HashMap, HashSet}; use itertools::{Itertools, MinMaxResult}; -use log::{trace, warn}; +use log::{debug, info, trace, warn}; use moka::future::Cache; use serde::Serialize; use std::cmp::Reverse; @@ -266,15 +266,16 @@ impl ConsensusFinder { async fn insert(&mut self, rpc: Arc, block: Web3ProxyBlock) -> Option { let first_seen = self .first_seen - .get_with_by_ref(block.hash(), async move { Instant::now() }) + .get_with_by_ref(block.hash(), async { Instant::now() }) .await; - // TODO: this should be 0 if we are first seen, but i think it will be slightly non-zero. - // calculate elapsed time before trying to lock. + // calculate elapsed time before trying to lock let latency = first_seen.elapsed(); + // record the time behind the fastest node rpc.head_latency.write().record(latency); + // update the local mapping of rpc -> block self.rpc_heads.insert(rpc, block) } diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index b9faea75..686e20d2 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -70,7 +70,6 @@ pub struct Web3Rpc { /// Track head block latency pub(super) head_latency: RwLock, /// Track peak request latency - /// /// This is only inside an Option so that the "Default" derive works. it will always be set. pub(super) peak_latency: Option, /// Track total requests served @@ -236,16 +235,18 @@ impl Web3Rpc { } pub fn peak_ewma(&self) -> OrderedFloat { - let peak_latency = if let Some(peak_latency) = self.peak_latency.as_ref() { - peak_latency.latency().as_secs_f64() - } else { - 0.0 - }; + // TODO: bug inside peak ewma somewhere. possible with atomics being relaxed or the conversion to pair and back + // let peak_latency = if let Some(peak_latency) = self.peak_latency.as_ref() { + // peak_latency.latency().as_secs_f64() + // } else { + // 0.0 + // }; + let head_latency = self.head_latency.read().value(); // TODO: what ordering? let active_requests = self.active_requests.load(atomic::Ordering::Acquire) as f64 + 1.0; - OrderedFloat(peak_latency * active_requests) + OrderedFloat(head_latency * active_requests) } // TODO: would be great if rpcs exposed this. see https://github.com/ledgerwatch/erigon/issues/6391