diff --git a/Cargo.lock b/Cargo.lock index 95aec00b..922bdae7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2941,6 +2941,7 @@ name = "latency" version = "0.1.0" dependencies = [ "ewma", + "flume", "log", "serde", "tokio", diff --git a/latency/Cargo.toml b/latency/Cargo.toml index eb51eba9..7cea2c16 100644 --- a/latency/Cargo.toml +++ b/latency/Cargo.toml @@ -7,6 +7,7 @@ edition = "2021" [dependencies] ewma = "0.1.1" +flume = "0.10.14" log = "0.4.17" serde = { version = "1.0.163", features = [] } tokio = { version = "1.28.1", features = ["full"] } diff --git a/latency/src/peak_ewma/mod.rs b/latency/src/peak_ewma/mod.rs index a795c383..0d4dd25b 100644 --- a/latency/src/peak_ewma/mod.rs +++ b/latency/src/peak_ewma/mod.rs @@ -2,9 +2,7 @@ mod rtt_estimate; use std::sync::Arc; -use log::{error, trace}; -use tokio::sync::mpsc; -use tokio::sync::mpsc::error::TrySendError; +use log::{error, log_enabled, trace}; use tokio::task::JoinHandle; use tokio::time::{Duration, Instant}; @@ -20,7 +18,7 @@ pub struct PeakEwmaLatency { /// Join handle for the latency calculation task pub join_handle: JoinHandle<()>, /// Send to update with each request duration - request_tx: mpsc::Sender, + request_tx: flume::Sender, /// Latency average and last update time rtt_estimate: Arc, /// Decay time @@ -32,9 +30,12 @@ impl PeakEwmaLatency { /// /// Returns a handle that can also be used to read the current /// average latency. - pub fn spawn(decay_ns: f64, buf_size: usize, start_latency: Duration) -> Self { + pub fn spawn(decay: Duration, buf_size: usize, start_latency: Duration) -> Self { + let decay_ns = decay.as_nanos() as f64; + debug_assert!(decay_ns > 0.0, "decay_ns must be positive"); - let (request_tx, request_rx) = mpsc::channel(buf_size); + + let (request_tx, request_rx) = flume::bounded(buf_size); let rtt_estimate = Arc::new(AtomicRttEstimate::new(start_latency)); let task = PeakEwmaLatencyTask { request_rx, @@ -56,15 +57,19 @@ impl PeakEwmaLatency { let mut estimate = self.rtt_estimate.load(); let now = Instant::now(); - assert!( - estimate.update_at <= now, - "update_at is {}ns in the future", - estimate.update_at.duration_since(now).as_nanos(), - ); - // Update the RTT estimate to account for decay since the last update. - // TODO: having an update here means we don't actually write from just one thread!! Thats how we get partially written stuff i think - estimate.update(0.0, self.decay_ns, now) + if estimate.update_at > now { + if log_enabled!(log::Level::Trace) { + trace!( + "update_at is {}ns in the future", + estimate.update_at.duration_since(now).as_nanos() + ); + } + estimate.rtt + } else { + // Update the RTT estimate to account for decay since the last update. + estimate.update(0.0, self.decay_ns, now) + } } /// Report latency from a single request @@ -73,15 +78,12 @@ impl PeakEwmaLatency { pub fn report(&self, duration: Duration) { match self.request_tx.try_send(duration) { Ok(()) => {} - Err(TrySendError::Full(_)) => { + Err(err) => { // We don't want to block if the channel is full, just // report the error - error!("Latency report channel full"); + error!("Latency report channel full. {}", err); // TODO: could we spawn a new tokio task to report tthis later? } - Err(TrySendError::Closed(_)) => { - unreachable!("Owner should keep channel open"); - } }; } } @@ -90,7 +92,7 @@ impl PeakEwmaLatency { #[derive(Debug)] struct PeakEwmaLatencyTask { /// Receive new request timings for update - request_rx: mpsc::Receiver, + request_rx: flume::Receiver, /// Current estimate and update time rtt_estimate: Arc, /// Last update time, used for decay calculation @@ -101,8 +103,8 @@ struct PeakEwmaLatencyTask { impl PeakEwmaLatencyTask { /// Run the loop for updating latency - async fn run(mut self) { - while let Some(rtt) = self.request_rx.recv().await { + async fn run(self) { + while let Ok(rtt) = self.request_rx.recv_async().await { self.update(rtt); } trace!("latency loop exited"); @@ -128,14 +130,15 @@ impl PeakEwmaLatencyTask { mod tests { use tokio::time::{self, Duration}; - use crate::util::nanos::NANOS_PER_MILLI; - /// The default RTT estimate decays, so that new nodes are considered if the /// default RTT is too high. #[tokio::test(start_paused = true)] async fn default_decay() { - let estimate = - super::PeakEwmaLatency::spawn(NANOS_PER_MILLI * 1_000.0, 8, Duration::from_millis(10)); + let estimate = super::PeakEwmaLatency::spawn( + Duration::from_millis(1_000), + 8, + Duration::from_millis(10), + ); let load = estimate.latency(); assert_eq!(load, Duration::from_millis(10)); diff --git a/latency/src/peak_ewma/rtt_estimate.rs b/latency/src/peak_ewma/rtt_estimate.rs index be56fe9c..1850d9e4 100644 --- a/latency/src/peak_ewma/rtt_estimate.rs +++ b/latency/src/peak_ewma/rtt_estimate.rs @@ -44,6 +44,7 @@ impl RttEstimate { ); Duration::from_nanos(next_estimate as u64) }; + self.rtt } @@ -101,7 +102,7 @@ impl AtomicRttEstimate { } /// Fetches the value, and applies a function to it that returns an - /// new rtt. Retrns the new RttEstimate with new update_at. + /// new rtt. Returns the new RttEstimate with new update_at. /// /// Automatically updates the update_at with Instant::now(). This /// method omits ordering arguments, defaulting to Relaxed since diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index c51e9c8e..90764578 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -91,3 +91,6 @@ tower-http = { version = "0.4.0", features = ["cors", "sensitive-headers"] } ulid = { version = "1.0.0", features = ["uuid", "serde"] } url = "2.3.1" uuid = "1.3.3" + +[dev-dependencies] +tokio = { version = "1.28.1", features = ["full", "test-util"] } diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 74ef6776..b6f924de 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -1318,7 +1318,7 @@ mod tests { #[cfg(test)] fn new_peak_latency() -> PeakEwmaLatency { const NANOS_PER_MILLI: f64 = 1_000_000.0; - PeakEwmaLatency::spawn(1_000.0 * NANOS_PER_MILLI, 4, Duration::from_secs(1)) + PeakEwmaLatency::spawn(Duration::from_secs(1), 4, Duration::from_secs(1)) } #[tokio::test] diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index dd84fa4b..9015dd18 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -159,8 +159,7 @@ impl Web3Rpc { // TODO Should these defaults be in config let peak_latency = PeakEwmaLatency::spawn( // Decay over 15s - // TODO! This is wrong! needs to be as_nanos! - Duration::from_secs(15).as_millis() as f64, + Duration::from_secs(15), // Peak requests so far around 5k, we will use an order of magnitude // more to be safe. Should only use about 50mb RAM 50_000, @@ -1220,7 +1219,7 @@ impl Serialize for Web3Rpc { state.serialize_field( "peak_latency", - &self.peak_latency.as_ref().unwrap().latency(), + &self.peak_latency.as_ref().unwrap().latency().as_millis(), )?; state.serialize_field("peak_ewma", self.peak_ewma().as_ref())?; diff --git a/web3_proxy/src/rpcs/request.rs b/web3_proxy/src/rpcs/request.rs index 7a13335f..34110bf7 100644 --- a/web3_proxy/src/rpcs/request.rs +++ b/web3_proxy/src/rpcs/request.rs @@ -356,9 +356,7 @@ impl OpenRequestHandle { } } } else if let Some(peak_latency) = &self.rpc.peak_latency { - // trace!("updating peak_latency: {}", latency.as_secs_f64()); - // peak_latency.report(latency); - trace!("peak latency disabled for now"); + peak_latency.report(latency); } else { unreachable!("peak_latency not initialized"); }