diff --git a/latency/src/ewma.rs b/latency/src/ewma.rs index 82c7d764..5b47a38c 100644 --- a/latency/src/ewma.rs +++ b/latency/src/ewma.rs @@ -29,6 +29,7 @@ impl EwmaLatency { #[inline] pub fn record_secs(&mut self, secs: f32) { + // TODO: we could change this to use a channel like the peak_ewma and rolling_quantile code, but this is fine if it updates on insert instead of async self.seconds.update(secs); } diff --git a/latency/src/rolling_quantile.rs b/latency/src/rolling_quantile.rs index f2514454..5d0f0e6d 100644 --- a/latency/src/rolling_quantile.rs +++ b/latency/src/rolling_quantile.rs @@ -18,50 +18,61 @@ pub struct RollingQuantileLatency { } /// Task to be spawned per-RollingMedianLatency for calculating the value -#[derive(Debug)] struct RollingQuantileLatencyTask { /// Receive to update each request duration latency_rx: flume::Receiver, /// Current estimate and update time seconds: Arc, /// quantile value. - /// WARNING! should be between 0 and 1. - q: f32, - /// Rolling window size. - window_size: usize, + quantile: RollingQuantile, } impl RollingQuantileLatencyTask { - /// Run the loop for updating latency. - async fn run(self) { - let mut q = RollingQuantile::new(self.q, self.window_size).unwrap(); + fn new( + latency_rx: flume::Receiver, + seconds: Arc, + q: f32, + window_size: usize, + ) -> Self { + let quantile = RollingQuantile::new(q, window_size).unwrap(); + Self { + latency_rx, + seconds, + quantile, + } + } + + /// Run the loop for updating latency. + async fn run(mut self) { while let Ok(rtt) = self.latency_rx.recv_async().await { - self.update(&mut q, rtt); + self.update(rtt); } } /// Update the estimate object atomically. - fn update(&self, q: &mut RollingQuantile, rtt: f32) { - q.update(rtt); + fn update(&mut self, rtt: f32) { + self.quantile.update(rtt); self.seconds - .store(q.get(), portable_atomic::Ordering::Relaxed); + .store(self.quantile.get(), portable_atomic::Ordering::Relaxed); } } impl RollingQuantileLatency { #[inline] - pub async fn record(&self, duration: Duration) { - self.record_secs(duration.as_secs_f32()).await + pub fn record(&self, duration: Duration) { + self.record_secs(duration.as_secs_f32()) } + /// if the channel is full (unlikely), entries will be silently dropped #[inline] - pub async fn record_secs(&self, secs: f32) { - self.latency_tx.send_async(secs).await.unwrap() + pub fn record_secs(&self, secs: f32) { + // self.latency_tx.send_async(secs).await.unwrap() + let _ = self.latency_tx.try_send(secs); } - /// Current median. + /// Current . #[inline] pub fn seconds(&self) -> f32 { self.seconds.load(portable_atomic::Ordering::Relaxed) @@ -81,12 +92,12 @@ impl RollingQuantileLatency { let seconds = Arc::new(AtomicF32::new(0.0)); - let task = RollingQuantileLatencyTask { + let task = RollingQuantileLatencyTask::new( latency_rx, - seconds: seconds.clone(), - q: quantile_value, + seconds.clone(), + quantile_value, window_size, - }; + ); let join_handle = tokio::spawn(task.run()); diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index 5f2306eb..16e07617 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -10,7 +10,7 @@ use hashbrown::{HashMap, HashSet}; use hdrhistogram::serialization::{Serializer, V2DeflateSerializer}; use hdrhistogram::Histogram; use itertools::{Itertools, MinMaxResult}; -use log::{debug, log_enabled, trace, warn, Level}; +use log::{log_enabled, trace, warn, Level}; use moka::future::Cache; use serde::Serialize; use std::cmp::{Ordering, Reverse}; diff --git a/web3_proxy/src/rpcs/request.rs b/web3_proxy/src/rpcs/request.rs index 7ea6eaf1..d6cfa951 100644 --- a/web3_proxy/src/rpcs/request.rs +++ b/web3_proxy/src/rpcs/request.rs @@ -381,13 +381,9 @@ impl OpenRequestHandle { } } + // TODO: benchmark spawning both of these self.rpc.peak_latency.as_ref().unwrap().report(latency); - self.rpc - .median_latency - .as_ref() - .unwrap() - .record(latency) - .await; + self.rpc.median_latency.as_ref().unwrap().record(latency); response }