diff --git a/Cargo.lock b/Cargo.lock index ae05a0fb..7a56dc18 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3020,6 +3020,16 @@ dependencies = [ "regex", ] +[[package]] +name = "latency" +version = "0.1.0" +dependencies = [ + "ewma", + "log", + "serde", + "tokio", +] + [[package]] name = "lazy_static" version = "1.4.0" @@ -6657,7 +6667,6 @@ dependencies = [ "entities", "env_logger", "ethers", - "ewma", "fdlimit", "flume", "fstrings", @@ -6673,6 +6682,7 @@ dependencies = [ "influxdb2-structmap", "ipnet", "itertools", + "latency", "log", "migration", "moka", diff --git a/Cargo.toml b/Cargo.toml index 9c7db66d..3c0a9c22 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,6 +2,7 @@ members = [ "deferred-rate-limiter", "entities", + "latency", "migration", "rate-counter", "redis-rate-limiter", diff --git a/latency/Cargo.toml b/latency/Cargo.toml new file mode 100644 index 00000000..daf303e3 --- /dev/null +++ b/latency/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "latency" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +ewma = "0.1.1" +log = "0.4.17" +serde = { version = "1.0.159", features = [] } +tokio = { version = "1.27.0", features = ["full"] } + +[dev-dependencies] +tokio = { version = "1.27.0", features = ["full", "test-util"] } diff --git a/latency/src/ewma.rs b/latency/src/ewma.rs new file mode 100644 index 00000000..073dad54 --- /dev/null +++ b/latency/src/ewma.rs @@ -0,0 +1,67 @@ +use serde::ser::Serializer; +use serde::Serialize; +use tokio::time::Duration; + +pub struct EwmaLatency { + /// exponentially weighted moving average of how many milliseconds behind the fastest node we are + ewma: ewma::EWMA, +} + +impl Serialize for EwmaLatency { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + serializer.serialize_f64(self.ewma.value()) + } +} + +impl EwmaLatency { + #[inline(always)] + pub fn record(&mut self, duration: Duration) { + self.record_ms(duration.as_secs_f64() * 1000.0); + } + + #[inline(always)] + pub fn record_ms(&mut self, milliseconds: f64) { + self.ewma.add(milliseconds); + } + + /// Current EWMA value in milliseconds + #[inline(always)] + pub fn value(&self) -> f64 { + self.ewma.value() + } +} + +impl Default for EwmaLatency { + fn default() -> Self { + // TODO: what should the default span be? 25 requests? + let span = 25.0; + + let start = 1000.0; + + Self::new(span, start) + } +} + +impl EwmaLatency { + // depending on the span, start might not be perfect + pub fn new(span: f64, start_ms: f64) -> Self { + let alpha = Self::span_to_alpha(span); + + let mut ewma = ewma::EWMA::new(alpha); + + if start_ms > 0.0 { + for _ in 0..(span as u64) { + ewma.add(start_ms); + } + } + + Self { ewma } + } + + fn span_to_alpha(span: f64) -> f64 { + 2.0 / (span + 1.0) + } +} diff --git a/latency/src/lib.rs b/latency/src/lib.rs new file mode 100644 index 00000000..4f305e85 --- /dev/null +++ b/latency/src/lib.rs @@ -0,0 +1,5 @@ +mod ewma; +mod peak_ewma; +mod util; + +pub use self::{ewma::EwmaLatency, peak_ewma::PeakEwmaLatency}; diff --git a/latency/src/peak_ewma/mod.rs b/latency/src/peak_ewma/mod.rs new file mode 100644 index 00000000..9b6f2f8b --- /dev/null +++ b/latency/src/peak_ewma/mod.rs @@ -0,0 +1,149 @@ +mod rtt_estimate; + +use std::sync::Arc; + +use log::error; +use tokio::sync::mpsc; +use tokio::sync::mpsc::error::TrySendError; +use tokio::task::JoinHandle; +use tokio::time::{Duration, Instant}; + +use self::rtt_estimate::AtomicRttEstimate; +use crate::util::nanos::nanos; + +/// Latency calculation using Peak EWMA algorithm +/// +/// Updates are done in a separate task to avoid locking or race +/// conditions. Reads may happen on any thread. +#[derive(Debug)] +pub struct PeakEwmaLatency { + /// Join handle for the latency calculation task + pub join_handle: JoinHandle<()>, + /// Send to update with each request duration + request_tx: mpsc::Sender, + /// Latency average and last update time + rtt_estimate: Arc, + /// Decay time + decay_ns: f64, +} + +impl PeakEwmaLatency { + /// Spawn the task for calculating peak request latency + /// + /// Returns a handle that can also be used to read the current + /// average latency. + pub fn spawn(decay_ns: f64, buf_size: usize, start_latency: Duration) -> Self { + debug_assert!(decay_ns > 0.0, "decay_ns must be positive"); + let (request_tx, request_rx) = mpsc::channel(buf_size); + let rtt_estimate = Arc::new(AtomicRttEstimate::new(start_latency)); + let task = PeakEwmaLatencyTask { + request_rx, + rtt_estimate: rtt_estimate.clone(), + update_at: Instant::now(), + decay_ns, + }; + let join_handle = tokio::spawn(task.run()); + Self { + join_handle, + request_tx, + rtt_estimate, + decay_ns, + } + } + + /// Get the current peak-ewma latency estimate + pub fn latency(&self) -> Duration { + let mut estimate = self.rtt_estimate.load(); + + let now = Instant::now(); + debug_assert!( + estimate.update_at <= now, + "update_at={:?} in the future", + estimate.update_at, + ); + + // Update the RTT estimate to account for decay since the last update. + estimate.update(0.0, self.decay_ns, now) + } + + /// Report latency from a single request + /// + /// Should only be called from the Web3Rpc that owns it. + pub fn report(&self, duration: Duration) { + match self.request_tx.try_send(duration) { + Ok(()) => {} + Err(TrySendError::Full(_)) => { + // We don't want to block if the channel is full, just + // report the error + error!("Latency report channel full"); + // TODO: could we spawn a new tokio task to report tthis later? + } + Err(TrySendError::Closed(_)) => { + unreachable!("Owner should keep channel open"); + } + }; + //.expect("Owner should keep channel open"); + } +} + +/// Task to be spawned per-Web3Rpc for calculating the peak request latency +#[derive(Debug)] +struct PeakEwmaLatencyTask { + /// Receive new request timings for update + request_rx: mpsc::Receiver, + /// Current estimate and update time + rtt_estimate: Arc, + /// Last update time, used for decay calculation + update_at: Instant, + /// Decay time + decay_ns: f64, +} + +impl PeakEwmaLatencyTask { + /// Run the loop for updating latency + async fn run(mut self) { + while let Some(rtt) = self.request_rx.recv().await { + self.update(rtt); + } + } + + /// Update the estimate object atomically. + fn update(&mut self, rtt: Duration) { + let rtt = nanos(rtt); + + let now = Instant::now(); + debug_assert!( + self.update_at <= now, + "update_at={:?} in the future", + self.update_at, + ); + + self.rtt_estimate + .fetch_update(|mut rtt_estimate| rtt_estimate.update(rtt, self.decay_ns, now)); + } +} + +#[cfg(test)] +mod tests { + use tokio::time::{self, Duration}; + + use crate::util::nanos::NANOS_PER_MILLI; + + /// The default RTT estimate decays, so that new nodes are considered if the + /// default RTT is too high. + #[tokio::test(start_paused = true)] + async fn default_decay() { + let estimate = + super::PeakEwmaLatency::spawn(NANOS_PER_MILLI * 1_000.0, 8, Duration::from_millis(10)); + let load = estimate.latency(); + assert_eq!(load, Duration::from_millis(10)); + + time::advance(Duration::from_millis(100)).await; + let load = estimate.latency(); + assert!(Duration::from_millis(9) < load && load < Duration::from_millis(10)); + + time::advance(Duration::from_millis(100)).await; + let load = estimate.latency(); + assert!(Duration::from_millis(8) < load && load < Duration::from_millis(9)); + } +} diff --git a/latency/src/peak_ewma/rtt_estimate.rs b/latency/src/peak_ewma/rtt_estimate.rs new file mode 100644 index 00000000..be56fe9c --- /dev/null +++ b/latency/src/peak_ewma/rtt_estimate.rs @@ -0,0 +1,168 @@ +use std::sync::atomic::Ordering; + +use log::trace; +use tokio::time::{Duration, Instant}; + +use crate::util::atomic_f32_pair::AtomicF32Pair; +use crate::util::nanos::{nanos, NANOS_PER_MILLI}; + +/// Holds the current RTT estimate and the last time this value was updated. +#[derive(Debug)] +pub struct RttEstimate { + pub update_at: Instant, + pub rtt: Duration, +} + +impl RttEstimate { + /// Update the estimate with a new rtt value. Use rtt=0.0 for simply + /// decaying the current value. + pub fn update(&mut self, rtt: f64, decay_ns: f64, now: Instant) -> Duration { + let ewma = nanos(self.rtt); + self.rtt = if ewma < rtt { + // For Peak-EWMA, always use the worst-case (peak) value as the estimate for + // subsequent requests. + trace!( + "update peak rtt={}ms prior={}ms", + rtt / NANOS_PER_MILLI, + ewma / NANOS_PER_MILLI, + ); + Duration::from_nanos(rtt as u64) + } else { + // When a latency is observed that is less than the estimated latency, we decay the + // prior estimate according to how much time has elapsed since the last + // update. The inverse of the decay is used to scale the estimate towards the + // observed latency value. + let elapsed = nanos(now.saturating_duration_since(self.update_at)); + let decay = (-elapsed / decay_ns).exp(); + let recency = 1.0 - decay; + let next_estimate = (ewma * decay) + (rtt * recency); + trace!( + "update duration={:03.0}ms decay={:06.0}ns; next={:03.0}ms", + rtt / NANOS_PER_MILLI, + ewma - next_estimate, + next_estimate / NANOS_PER_MILLI, + ); + Duration::from_nanos(next_estimate as u64) + }; + self.rtt + } + + /// Build a new estimate object using current time. + fn new(start_duration: Duration) -> Self { + Self { + update_at: Instant::now(), + rtt: start_duration, + } + } + + /// Convert to pair of f32 + fn as_pair(&self, start_time: Instant) -> [f32; 2] { + let update_at = self + .update_at + .saturating_duration_since(start_time) + .as_secs_f32(); + let rtt = self.rtt.as_secs_f32(); + [update_at, rtt] + } + + /// Build from pair of f32 + fn from_pair(pair: [f32; 2], start_time: Instant) -> Self { + let update_at = start_time + Duration::from_secs_f32(pair[0]); + let rtt = Duration::from_secs_f32(pair[1]); + Self { update_at, rtt } + } +} + +/// Atomic storage of RttEstimate using AtomicF32Pair +/// +/// Start time is needed to (de-)serialize the update_at instance. +#[derive(Debug)] +pub struct AtomicRttEstimate { + pair: AtomicF32Pair, + start_time: Instant, +} + +impl AtomicRttEstimate { + /// Creates a new atomic rtt estimate. + pub fn new(start_duration: Duration) -> Self { + let estimate = RttEstimate::new(start_duration); + Self { + pair: AtomicF32Pair::new(estimate.as_pair(estimate.update_at)), + start_time: estimate.update_at, + } + } + + /// Loads a value from the atomic rtt estimate. + /// + /// This method omits the ordering argument since loads may use + /// slightly stale data to avoid adding additional latency. + pub fn load(&self) -> RttEstimate { + RttEstimate::from_pair(self.pair.load(Ordering::Relaxed), self.start_time) + } + + /// Fetches the value, and applies a function to it that returns an + /// new rtt. Retrns the new RttEstimate with new update_at. + /// + /// Automatically updates the update_at with Instant::now(). This + /// method omits ordering arguments, defaulting to Relaxed since + /// all writes are serial and any reads may rely on slightly stale + /// data. + pub fn fetch_update(&self, mut f: F) -> RttEstimate + where + F: FnMut(RttEstimate) -> Duration, + { + let mut update_at = Instant::now(); + let mut rtt = Duration::ZERO; + self.pair + .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |pair| { + rtt = f(RttEstimate::from_pair(pair, self.start_time)); + // Save the new update_at inside the function in case it + // is run multiple times + update_at = Instant::now(); + Some(RttEstimate { rtt, update_at }.as_pair(self.start_time)) + }) + .expect("Should never Err"); + RttEstimate { update_at, rtt } + } +} + +#[cfg(test)] +mod tests { + use tokio::time::{self, Duration, Instant}; + + use super::{AtomicRttEstimate, RttEstimate}; + + #[test] + fn test_rtt_estimate_f32_conversions() { + let rtt = Duration::from_secs(1); + let expected = RttEstimate::new(rtt); + let actual = + RttEstimate::from_pair(expected.as_pair(expected.update_at), expected.update_at); + assert_eq!(expected.update_at, actual.update_at); + assert_eq!(expected.rtt, actual.rtt); + } + + #[tokio::test(start_paused = true)] + async fn test_atomic_rtt_estimate_load() { + let rtt = Duration::from_secs(1); + let estimate = AtomicRttEstimate::new(rtt); + let actual = estimate.load(); + assert_eq!(Instant::now(), actual.update_at); + assert_eq!(rtt, actual.rtt); + } + + #[tokio::test(start_paused = true)] + async fn test_atomic_rtt_estimate_fetch_update() { + let start_time = Instant::now(); + let rtt = Duration::from_secs(1); + let estimate = AtomicRttEstimate::new(rtt); + time::advance(Duration::from_secs(1)).await; + let rv = estimate.fetch_update(|value| { + assert_eq!(start_time, value.update_at); + assert_eq!(rtt, value.rtt); + Duration::from_secs(2) + }); + assert_eq!(start_time + Duration::from_secs(1), rv.update_at); + assert_eq!(Duration::from_secs(2), rv.rtt); + } +} diff --git a/latency/src/util/atomic_f32_pair.rs b/latency/src/util/atomic_f32_pair.rs new file mode 100644 index 00000000..08ee3953 --- /dev/null +++ b/latency/src/util/atomic_f32_pair.rs @@ -0,0 +1,88 @@ +use std::sync::atomic::{AtomicU64, Ordering}; + +/// Implements an atomic pair of f32s +/// +/// This uses an AtomicU64 internally. +#[derive(Debug)] +pub struct AtomicF32Pair(AtomicU64); + +impl AtomicF32Pair { + /// Creates a new atomic pair. + pub fn new(pair: [f32; 2]) -> Self { + Self(AtomicU64::new(to_bits(pair))) + } + + /// Loads a value from the atomic pair. + pub fn load(&self, ordering: Ordering) -> [f32; 2] { + from_bits(self.0.load(ordering)) + } + + /// Fetches the value, and applies a function to it that returns an + /// optional new value. Returns a Result of Ok(previous_value) if + /// the function returned Some(_), else Err(previous_value). + pub fn fetch_update( + &self, + set_order: Ordering, + fetch_order: Ordering, + mut f: F, + ) -> Result<[f32; 2], [f32; 2]> + where + F: FnMut([f32; 2]) -> Option<[f32; 2]>, + { + self.0 + .fetch_update(set_order, fetch_order, |bits| { + f(from_bits(bits)).map(to_bits) + }) + .map(from_bits) + .map_err(from_bits) + } +} + +/// Convert a f32 pair to its bit-representation as u64 +fn to_bits(pair: [f32; 2]) -> u64 { + let f1 = pair[0].to_bits() as u64; + let f2 = pair[1].to_bits() as u64; + (f1 << 32) | f2 +} + +/// Build a f32 pair from its bit-representation as u64 +fn from_bits(bits: u64) -> [f32; 2] { + let f1 = f32::from_bits((bits >> 32) as u32); + let f2 = f32::from_bits(bits as u32); + [f1, f2] +} + +#[cfg(test)] +mod tests { + use std::sync::atomic::Ordering; + + use super::{from_bits, to_bits, AtomicF32Pair}; + + #[test] + fn test_f32_pair_bit_conversions() { + let pair = [3.14159, 2.71828]; + assert_eq!(pair, from_bits(to_bits(pair))); + } + + #[test] + fn test_atomic_f32_pair_load() { + let pair = [3.14159, 2.71828]; + let atomic = AtomicF32Pair::new(pair); + assert_eq!(pair, atomic.load(Ordering::Relaxed)); + } + + #[test] + fn test_atomic_f32_pair_fetch_update() { + let pair = [3.14159, 2.71828]; + let atomic = AtomicF32Pair::new(pair); + atomic + .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |[f1, f2]| { + Some([f1 + 1.0, f2 + 1.0]) + }) + .unwrap(); + assert_eq!( + [pair[0] + 1.0, pair[1] + 1.0], + atomic.load(Ordering::Relaxed) + ); + } +} diff --git a/latency/src/util/mod.rs b/latency/src/util/mod.rs new file mode 100644 index 00000000..531c7f56 --- /dev/null +++ b/latency/src/util/mod.rs @@ -0,0 +1,2 @@ +pub(crate) mod atomic_f32_pair; +pub(crate) mod nanos; diff --git a/latency/src/util/nanos.rs b/latency/src/util/nanos.rs new file mode 100644 index 00000000..d6947a70 --- /dev/null +++ b/latency/src/util/nanos.rs @@ -0,0 +1,30 @@ +use tokio::time::Duration; + +pub const NANOS_PER_MILLI: f64 = 1_000_000.0; + +/// Utility that converts durations to nanos in f64. +/// +/// Due to a lossy transformation, the maximum value that can be represented is ~585 years, +/// which, I hope, is more than enough to represent request latencies. +pub fn nanos(d: Duration) -> f64 { + const NANOS_PER_SEC: u64 = 1_000_000_000; + let n = f64::from(d.subsec_nanos()); + let s = d.as_secs().saturating_mul(NANOS_PER_SEC) as f64; + n + s +} + +#[cfg(test)] +mod tests { + use tokio::time::Duration; + + #[test] + fn nanos() { + assert_eq!(super::nanos(Duration::new(0, 0)), 0.0); + assert_eq!(super::nanos(Duration::new(0, 123)), 123.0); + assert_eq!(super::nanos(Duration::new(1, 23)), 1_000_000_023.0); + assert_eq!( + super::nanos(Duration::new(::std::u64::MAX, 999_999_999)), + 18446744074709553000.0 + ); + } +} diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index 1136ad26..0bd41831 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -15,6 +15,7 @@ rdkafka-src = ["rdkafka/cmake-build", "rdkafka/libz", "rdkafka/ssl", "rdkafka/zs [dependencies] deferred-rate-limiter = { path = "../deferred-rate-limiter" } entities = { path = "../entities" } +latency = { path = "../latency" } migration = { path = "../migration" } redis-rate-limiter = { path = "../redis-rate-limiter" } thread-fast-rng = { path = "../thread-fast-rng" } @@ -37,7 +38,6 @@ derive_more = "0.99.17" dotenv = "0.15.0" env_logger = "0.10.0" ethers = { version = "2.0.3", default-features = false, features = ["rustls", "ws"] } -ewma = "0.1.1" fdlimit = "0.2.1" flume = "0.10.14" fstrings = "0.2" diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 8fce98df..3ee83fa3 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -1282,13 +1282,16 @@ fn rpc_sync_status_sort_key(x: &Arc) -> (Reverse, u64, bool, Order // TODO: use request latency instead of head latency // TODO: have the latency decay automatically - let head_ewma = x.head_latency.read().value(); + let peak_latency = x + .peak_latency + .as_ref() + .expect("peak_latency uniniialized") + .latency(); let active_requests = x.active_requests.load(atomic::Ordering::Relaxed) as f64; // TODO: i'm not sure head * active is exactly right. but we'll see - // TODO: i don't think this actually counts as peak. investigate with atomics.rs and peak_ewma.rs - let peak_ewma = OrderedFloat(head_ewma * active_requests); + let peak_ewma = OrderedFloat(peak_latency.as_millis() as f64 * (active_requests + 1.0)); let backup = x.backup; @@ -1296,18 +1299,26 @@ fn rpc_sync_status_sort_key(x: &Arc) -> (Reverse, u64, bool, Order } mod tests { - // TODO: why is this allow needed? does tokio::test get in the way somehow? #![allow(unused_imports)] + use std::time::{SystemTime, UNIX_EPOCH}; + use super::*; use crate::rpcs::consensus::ConsensusFinder; use crate::rpcs::{blockchain::Web3ProxyBlock, provider::Web3Provider}; + use ethers::types::{Block, U256}; + use latency::PeakEwmaLatency; use log::{trace, LevelFilter}; use parking_lot::RwLock; - use std::time::{SystemTime, UNIX_EPOCH}; use tokio::sync::RwLock as AsyncRwLock; + #[cfg(test)] + fn new_peak_latency() -> PeakEwmaLatency { + const NANOS_PER_MILLI: f64 = 1_000_000.0; + PeakEwmaLatency::spawn(1_000.0 * NANOS_PER_MILLI, 4, Duration::from_secs(1)) + } + #[tokio::test] async fn test_sort_connections_by_sync_status() { let block_0 = Block { @@ -1338,36 +1349,42 @@ mod tests { name: "a".to_string(), tier: 0, head_block: RwLock::new(None), + peak_latency: Some(new_peak_latency()), ..Default::default() }, Web3Rpc { name: "b".to_string(), tier: 0, head_block: RwLock::new(blocks.get(1).cloned()), + peak_latency: Some(new_peak_latency()), ..Default::default() }, Web3Rpc { name: "c".to_string(), tier: 0, head_block: RwLock::new(blocks.get(2).cloned()), + peak_latency: Some(new_peak_latency()), ..Default::default() }, Web3Rpc { name: "d".to_string(), tier: 1, head_block: RwLock::new(None), + peak_latency: Some(new_peak_latency()), ..Default::default() }, Web3Rpc { name: "e".to_string(), tier: 1, head_block: RwLock::new(blocks.get(1).cloned()), + peak_latency: Some(new_peak_latency()), ..Default::default() }, Web3Rpc { name: "f".to_string(), tier: 1, head_block: RwLock::new(blocks.get(2).cloned()), + peak_latency: Some(new_peak_latency()), ..Default::default() }, ] @@ -1425,6 +1442,7 @@ mod tests { tier: 0, head_block: RwLock::new(Some(head_block.clone())), provider: AsyncRwLock::new(Some(Arc::new(Web3Provider::Mock))), + peak_latency: Some(new_peak_latency()), ..Default::default() }; @@ -1437,6 +1455,7 @@ mod tests { tier: 0, head_block: RwLock::new(Some(lagged_block.clone())), provider: AsyncRwLock::new(Some(Arc::new(Web3Provider::Mock))), + peak_latency: Some(new_peak_latency()), ..Default::default() }; @@ -1781,6 +1800,7 @@ mod tests { tier: 0, head_block: RwLock::new(Some(block_1.clone())), provider: AsyncRwLock::new(Some(Arc::new(Web3Provider::Mock))), + peak_latency: Some(new_peak_latency()), ..Default::default() }; @@ -1793,6 +1813,7 @@ mod tests { tier: 1, head_block: RwLock::new(Some(block_2.clone())), provider: AsyncRwLock::new(Some(Arc::new(Web3Provider::Mock))), + peak_latency: Some(new_peak_latency()), ..Default::default() }; diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index 1b7246e2..bd92293d 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -12,6 +12,7 @@ use ethers::prelude::{Bytes, Middleware, ProviderError, TxHash, H256, U64}; use ethers::types::{Address, Transaction, U256}; use futures::future::try_join_all; use futures::StreamExt; +use latency::{EwmaLatency, PeakEwmaLatency}; use log::{debug, error, info, trace, warn, Level}; use migration::sea_orm::DatabaseConnection; use ordered_float::OrderedFloat; @@ -30,69 +31,6 @@ use thread_fast_rng::thread_fast_rng; use tokio::sync::{broadcast, oneshot, watch, RwLock as AsyncRwLock}; use tokio::time::{sleep, sleep_until, timeout, Duration, Instant}; -pub struct Latency { - /// exponentially weighted moving average of how many milliseconds behind the fastest node we are - ewma: ewma::EWMA, -} - -impl Serialize for Latency { - fn serialize(&self, serializer: S) -> Result - where - S: Serializer, - { - serializer.serialize_f64(self.ewma.value()) - } -} - -impl Latency { - #[inline(always)] - pub fn record(&mut self, duration: Duration) { - self.record_ms(duration.as_secs_f64() * 1000.0); - } - - #[inline(always)] - pub fn record_ms(&mut self, milliseconds: f64) { - self.ewma.add(milliseconds); - } - - #[inline(always)] - pub fn value(&self) -> f64 { - self.ewma.value() - } -} - -impl Default for Latency { - fn default() -> Self { - // TODO: what should the default span be? 25 requests? have a "new" - let span = 25.0; - - let start = 1000.0; - - Self::new(span, start) - } -} - -impl Latency { - // depending on the span, start might not be perfect - pub fn new(span: f64, start: f64) -> Self { - let alpha = Self::span_to_alpha(span); - - let mut ewma = ewma::EWMA::new(alpha); - - if start > 0.0 { - for _ in 0..(span as u64) { - ewma.add(start); - } - } - - Self { ewma } - } - - fn span_to_alpha(span: f64) -> f64 { - 2.0 / (span + 1.0) - } -} - /// An active connection to a Web3 RPC server like geth or erigon. #[derive(Default)] pub struct Web3Rpc { @@ -127,10 +65,11 @@ pub struct Web3Rpc { /// TODO: change this to a watch channel so that http providers can subscribe and take action on change. pub(super) head_block: RwLock>, /// Track head block latency - pub(super) head_latency: RwLock, - // /// Track request latency - // /// TODO: refactor this. this lock kills perf. for now just use head_latency - // pub(super) request_latency: RwLock, + pub(super) head_latency: RwLock, + /// Track peak request latency + /// + /// This is only inside an Option so that the "Default" derive works. it will always be set. + pub(super) peak_latency: Option, /// Track total requests served /// TODO: maybe move this to graphana pub(super) total_requests: AtomicUsize, @@ -227,6 +166,18 @@ impl Web3Rpc { let (disconnect_sender, disconnect_receiver) = watch::channel(false); let reconnect = reconnect.into(); + // Spawn the task for calculting average peak latency + // TODO Should these defaults be in config + let peak_latency = PeakEwmaLatency::spawn( + // Decay over 15s + Duration::from_secs(15).as_millis() as f64, + // Peak requests so far around 5k, we will use an order of magnitude + // more to be safe. Should only use about 50mb RAM + 50_000, + // Start latency at 1 second + Duration::from_secs(1), + ); + let new_connection = Self { name, db_conn: db_conn.clone(), @@ -245,6 +196,7 @@ impl Web3Rpc { disconnect_watch: Some(disconnect_sender), created_at: Some(created_at), head_block: RwLock::new(Default::default()), + peak_latency: Some(peak_latency), ..Default::default() }; diff --git a/web3_proxy/src/rpcs/request.rs b/web3_proxy/src/rpcs/request.rs index 4c6b2dbf..7620716d 100644 --- a/web3_proxy/src/rpcs/request.rs +++ b/web3_proxy/src/rpcs/request.rs @@ -194,7 +194,7 @@ impl OpenRequestHandle { .active_requests .fetch_add(1, std::sync::atomic::Ordering::Relaxed); - // let latency = Instant::now(); + let start = Instant::now(); // TODO: replace ethers-rs providers with our own that supports streaming the responses let response = match provider.as_ref() { @@ -366,14 +366,10 @@ impl OpenRequestHandle { tokio::spawn(f); } } + } else if let Some(peak_latency) = &self.rpc.peak_latency { + peak_latency.report(start.elapsed()); } else { - // TODO: record request latency - // let latency_ms = start.elapsed().as_secs_f64() * 1000.0; - - // TODO: is this lock here a problem? should this be done through a channel? i started to code it, but it didn't seem to matter - // let mut latency_recording = self.rpc.request_latency.write(); - - // latency_recording.record(latency_ms); + unreachable!("peak_latency not initialized"); } response