From e54299beca90eb2c6b8e1f20e6703551b89c539e Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Sun, 18 Jun 2023 09:47:40 -0700 Subject: [PATCH] rolling median for choosing tier --- Cargo.lock | 36 +++++++++- latency/Cargo.toml | 3 +- latency/src/ewma.rs | 44 +++++++----- latency/src/lib.rs | 5 +- latency/src/rolling_quantile.rs | 114 +++++++++++++++++++++++++++++++ latency/src/util/mod.rs | 1 + latency/src/util/span.rs | 4 ++ web3_proxy/Cargo.toml | 1 + web3_proxy/src/rpcs/consensus.rs | 87 +++++++++++------------ web3_proxy/src/rpcs/one.rs | 34 +++++++-- web3_proxy/src/rpcs/request.rs | 12 ++-- 11 files changed, 267 insertions(+), 74 deletions(-) create mode 100644 latency/src/rolling_quantile.rs create mode 100644 latency/src/util/span.rs diff --git a/Cargo.lock b/Cargo.lock index 78bea86a..c5a61493 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2058,6 +2058,16 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2f20267f3a8b678b7151c0c508002e79126144a5d47badddec7f31ddc1f4c754" +[[package]] +name = "exponential-decay-histogram" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55d9dc1064c0b1bc8c691c0ea539385bc6f299f5e5e6050583d34fdb032e9935" +dependencies = [ + "ordered-float", + "rand", +] + [[package]] name = "eyre" version = "0.6.8" @@ -3145,11 +3155,12 @@ checksum = "d3c48237b9604c5a4702de6b824e02006c3214327564636aef27c1028a8fa0ed" name = "latency" version = "0.1.0" dependencies = [ - "ewma", "flume", "log", + "portable-atomic", "serde", "tokio", + "watermill", ] [[package]] @@ -3733,6 +3744,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2fc2dbde8f8a79f2102cc474ceb0ad68e3b80b85289ea62389b60e66777e4213" dependencies = [ "num-traits", + "rand", + "serde", ] [[package]] @@ -4161,6 +4174,12 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "portable-atomic" +version = "1.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "767eb9f07d4a5ebcb39bbf2d452058a93c011373abf6832e24194a1c3f004794" + [[package]] name = "ppv-lite86" version = "0.2.17" @@ -4395,6 +4414,7 @@ dependencies = [ "libc", "rand_chacha", "rand_core", + "serde", ] [[package]] @@ -4414,6 +4434,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" dependencies = [ "getrandom", + "serde", ] [[package]] @@ -6898,6 +6919,18 @@ dependencies = [ "web-sys", ] +[[package]] +name = "watermill" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b85594e8299160103ff39146412bf32711b11af3856d70c9b5539211adcbc502" +dependencies = [ + "num", + "ordered-float", + "serde", + "serde_json", +] + [[package]] name = "web-sys" version = "0.3.64" @@ -6931,6 +6964,7 @@ dependencies = [ "ethbloom", "ethers", "ewma", + "exponential-decay-histogram", "fdlimit", "flume", "fstrings", diff --git a/latency/Cargo.toml b/latency/Cargo.toml index 7ef84f29..243df974 100644 --- a/latency/Cargo.toml +++ b/latency/Cargo.toml @@ -6,11 +6,12 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -ewma = "0.1.1" flume = "0.10.14" log = "0.4.19" +portable-atomic = { version = "1.3.3", features = ["float"] } serde = { version = "1.0.164", features = [] } tokio = { version = "1.28.2", features = ["full"] } +watermill = "0.1.1" [dev-dependencies] tokio = { version = "1.28.2", features = ["full", "test-util"] } diff --git a/latency/src/ewma.rs b/latency/src/ewma.rs index ae201549..549564b1 100644 --- a/latency/src/ewma.rs +++ b/latency/src/ewma.rs @@ -1,37 +1,49 @@ +use crate::util::span::span_to_alpha; use serde::ser::Serializer; use serde::Serialize; use tokio::time::Duration; +use watermill::ewmean::EWMean; +use watermill::stats::Univariate; pub struct EwmaLatency { /// exponentially weighted of some latency in milliseconds - ewma: ewma::EWMA, + /// TODO: compare crates: ewma vs watermill + seconds: EWMean, } +/// serialize as milliseconds impl Serialize for EwmaLatency { fn serialize(&self, serializer: S) -> Result where S: Serializer, { - serializer.serialize_f64(self.ewma.value()) + serializer.serialize_f32(self.seconds.get() * 1000.0) } } impl EwmaLatency { #[inline] pub fn record(&mut self, duration: Duration) { - self.record_ms(duration.as_secs_f64() * 1000.0); + self.record_secs(duration.as_secs_f32()); } #[inline] - pub fn record_ms(&mut self, milliseconds: f64) { - // don't let it go under 0.1ms - self.ewma.add(milliseconds.max(0.1)); + pub fn record_secs(&mut self, secs: f32) { + self.seconds.update(secs); } - /// Current EWMA value in milliseconds + /// Current EWMA value in seconds #[inline] - pub fn value(&self) -> f64 { - self.ewma.value() + pub fn value(&self) -> f32 { + self.seconds.get() + } + + /// Current EWMA value in seconds + #[inline] + pub fn duration(&self) -> Duration { + let x = self.seconds.get(); + + Duration::from_secs_f32(x) } } @@ -49,21 +61,17 @@ impl Default for EwmaLatency { impl EwmaLatency { // depending on the span, start might not be perfect - pub fn new(span: f64, start_ms: f64) -> Self { - let alpha = Self::span_to_alpha(span); + pub fn new(span: f32, start_ms: f32) -> Self { + let alpha = span_to_alpha(span); - let mut ewma = ewma::EWMA::new(alpha); + let mut seconds = EWMean::new(alpha); if start_ms > 0.0 { for _ in 0..(span as u64) { - ewma.add(start_ms); + seconds.update(start_ms); } } - Self { ewma } - } - - fn span_to_alpha(span: f64) -> f64 { - 2.0 / (span + 1.0) + Self { seconds } } } diff --git a/latency/src/lib.rs b/latency/src/lib.rs index 4f305e85..e3f0421b 100644 --- a/latency/src/lib.rs +++ b/latency/src/lib.rs @@ -1,5 +1,8 @@ mod ewma; mod peak_ewma; +mod rolling_quantile; mod util; -pub use self::{ewma::EwmaLatency, peak_ewma::PeakEwmaLatency}; +pub use self::ewma::EwmaLatency; +pub use self::peak_ewma::PeakEwmaLatency; +pub use self::rolling_quantile::RollingQuantileLatency; diff --git a/latency/src/rolling_quantile.rs b/latency/src/rolling_quantile.rs new file mode 100644 index 00000000..17b4d962 --- /dev/null +++ b/latency/src/rolling_quantile.rs @@ -0,0 +1,114 @@ +use std::sync::Arc; + +use portable_atomic::{AtomicF32, Ordering}; +use serde::ser::Serializer; +use serde::Serialize; +use tokio::task::JoinHandle; +use tokio::time::Duration; +use watermill::quantile::RollingQuantile; +use watermill::stats::Univariate; + +pub struct RollingQuantileLatency { + /// Join handle for the latency calculation task. + pub join_handle: JoinHandle<()>, + /// Send to update with each request duration. + latency_tx: flume::Sender, + /// rolling quantile latency in seconds. Updated async. + seconds: Arc, +} + +/// Task to be spawned per-RollingMedianLatency for calculating the value +#[derive(Debug)] +struct RollingQuantileLatencyTask { + /// Receive to update each request duration + latency_rx: flume::Receiver, + /// Current estimate and update time + seconds: Arc, + /// quantile value. + /// WARNING! should be between 0 and 1. + q: f32, + /// Rolling window size. + window_size: usize, +} + +impl RollingQuantileLatencyTask { + /// Run the loop for updating latency. + async fn run(self) { + let mut q = RollingQuantile::new(self.q, self.window_size).unwrap(); + + while let Ok(rtt) = self.latency_rx.recv_async().await { + self.update(&mut q, rtt); + } + } + + /// Update the estimate object atomically. + fn update(&self, q: &mut RollingQuantile, rtt: f32) { + q.update(rtt); + + self.seconds + .store(q.get(), portable_atomic::Ordering::Relaxed); + } +} + +impl RollingQuantileLatency { + #[inline] + pub async fn record(&self, duration: Duration) { + self.record_secs(duration.as_secs_f32()).await + } + + #[inline] + pub async fn record_secs(&self, secs: f32) { + self.latency_tx.send_async(secs).await.unwrap() + } + + /// Current median. + #[inline] + pub fn seconds(&self) -> f32 { + self.seconds.load(portable_atomic::Ordering::Relaxed) + } + + /// Current median. + #[inline] + pub fn duration(&self) -> Duration { + Duration::from_secs_f32(self.seconds()) + } +} + +impl RollingQuantileLatency { + pub async fn spawn(quantile_value: f32, window_size: usize) -> Self { + // TODO: how should queue size and window size be related? + let (latency_tx, latency_rx) = flume::bounded(window_size); + + let seconds = Arc::new(AtomicF32::new(0.0)); + + let task = RollingQuantileLatencyTask { + latency_rx, + seconds: seconds.clone(), + q: quantile_value, + window_size, + }; + + let join_handle = tokio::spawn(task.run()); + + Self { + join_handle, + latency_tx, + seconds, + } + } + + #[inline] + pub async fn spawn_median(window_size: usize) -> Self { + Self::spawn(0.5, window_size).await + } +} + +/// serialize as seconds +impl Serialize for RollingQuantileLatency { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + serializer.serialize_f32(self.seconds.load(Ordering::Relaxed)) + } +} diff --git a/latency/src/util/mod.rs b/latency/src/util/mod.rs index 531c7f56..2335c688 100644 --- a/latency/src/util/mod.rs +++ b/latency/src/util/mod.rs @@ -1,2 +1,3 @@ pub(crate) mod atomic_f32_pair; pub(crate) mod nanos; +pub(crate) mod span; diff --git a/latency/src/util/span.rs b/latency/src/util/span.rs new file mode 100644 index 00000000..f1350f34 --- /dev/null +++ b/latency/src/util/span.rs @@ -0,0 +1,4 @@ +// TODO: generic for any float +pub fn span_to_alpha(span: f32) -> f32 { + 2.0 / (span + 1.0) +} diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index bc9300df..d54f6a01 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -103,6 +103,7 @@ tracing-subscriber = "0.3" ulid = { version = "1.0.0", features = ["rand", "uuid", "serde"] } url = "2.4.0" uuid = { version = "1.3.4", default-features = false, features = ["fast-rng", "serde", "v4", "zerocopy"] } +exponential-decay-histogram = "0.1.10" [dev-dependencies] tokio = { version = "1.28.2", features = ["full", "test-util"] } diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index abb5e769..e20fa38c 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -3,7 +3,6 @@ use super::many::Web3Rpcs; use super::one::Web3Rpc; use crate::errors::{Web3ProxyErrorContext, Web3ProxyResult}; use crate::frontend::authorization::Authorization; -use anyhow::Context; use base64::engine::general_purpose; use derive_more::Constructor; use ethers::prelude::{H256, U64}; @@ -384,7 +383,9 @@ impl ConsensusFinder { let latency = first_seen.elapsed(); // record the time behind the fastest node - rpc.head_latency.write().record(latency); + rpc.head_latency_ms + .write() + .record_secs(latency.as_secs_f32()); // update the local mapping of rpc -> block self.rpc_heads.insert(rpc, block) @@ -438,51 +439,57 @@ impl ConsensusFinder { 0 => {} 1 => { for rpc in self.rpc_heads.keys() { - rpc.tier.store(0, atomic::Ordering::Relaxed) + rpc.tier.store(1, atomic::Ordering::Relaxed) } } _ => { // iterate first to find bounds - let mut min_latency = u64::MAX; - let mut max_latency = u64::MIN; - let mut weighted_latencies = HashMap::new(); + // min_latency_sec is actual min_median_latency_sec + let mut min_median_latency_sec = f32::MAX; + let mut max_median_latency_sec = f32::MIN; + let mut median_latencies_sec = HashMap::new(); for rpc in self.rpc_heads.keys() { - let weighted_latency_seconds = rpc.weighted_peak_ewma_seconds(); + let median_latency_sec = rpc + .request_latency + .as_ref() + .map(|x| x.seconds()) + .unwrap_or_default(); - let weighted_latency_ms = (weighted_latency_seconds * 1000.0).round() as i64; + min_median_latency_sec = min_median_latency_sec.min(median_latency_sec); + max_median_latency_sec = min_median_latency_sec.max(median_latency_sec); - let weighted_latency_ms: u64 = weighted_latency_ms - .try_into() - .context("weighted_latency_ms does not fit in a u64")?; - - min_latency = min_latency.min(weighted_latency_ms); - max_latency = min_latency.max(weighted_latency_ms); - - weighted_latencies.insert(rpc, weighted_latency_ms); + median_latencies_sec.insert(rpc, median_latency_sec); } - // // histogram requires high to be at least 2 x low - // // using min_latency for low does not work how we want it though - max_latency = max_latency.max(1000); - - // create the histogram - let mut hist = Histogram::::new_with_bounds(1, max_latency, 3).unwrap(); - - // TODO: resize shouldn't be necessary, but i've seen it error - hist.auto(true); - - for weighted_latency_ms in weighted_latencies.values() { - hist.record(*weighted_latency_ms)?; - } - - // dev logging + // dev logging of a histogram if log_enabled!(Level::Trace) { + // convert to ms because the histogram needs ints + let max_median_latency_ms = (max_median_latency_sec * 1000.0).ceil() as u64; + + // create the histogram + // histogram requires high to be at least 2 x low + // using min_latency for low does not work how we want it though + // so just set the default range = 1ms..1s + let hist_low = 1; + let hist_high = max_median_latency_ms.max(1_000); + let mut hist_ms = + Histogram::::new_with_bounds(hist_low, hist_high, 3).unwrap(); + + // TODO: resize shouldn't be necessary, but i've seen it error + hist_ms.auto(true); + + for median_sec in median_latencies_sec.values() { + let median_ms = (median_sec * 1000.0).round() as u64; + + hist_ms.record(median_ms)?; + } + // print the histogram. see docs/histograms.txt for more info let mut encoder = base64::write::EncoderWriter::new(Vec::new(), &general_purpose::STANDARD); V2DeflateSerializer::new() - .serialize(&hist, &mut encoder) + .serialize(&hist_ms, &mut encoder) .unwrap(); let encoded = encoder.finish().unwrap(); @@ -493,20 +500,16 @@ impl ConsensusFinder { } // TODO: get someone who is better at math to do something smarter. maybe involving stddev? - let divisor = 30f64.max(min_latency as f64 / 2.0); + // bucket sizes of the larger of 30ms or 1/2 the lowest latency + let tier_sec_size = 30f32.max(min_median_latency_sec / 2.0); - for (rpc, weighted_latency_ms) in weighted_latencies.into_iter() { - let tier = (weighted_latency_ms - min_latency) as f64 / divisor; + for (rpc, median_latency_sec) in median_latencies_sec.into_iter() { + let tier = (median_latency_sec - min_median_latency_sec) / tier_sec_size; + // start tiers at 1 let tier = (tier.floor() as u32).saturating_add(1); - // TODO: this should be trace - trace!( - "{} - weighted_latency: {}ms, tier {}", - rpc, - weighted_latency_ms, - tier - ); + trace!("{} - p50_sec: {}, tier {}", rpc, median_latency_sec, tier); rpc.tier.store(tier, atomic::Ordering::Relaxed); } diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index 679788d4..f0b03127 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -14,7 +14,7 @@ use ethers::prelude::{Bytes, Middleware, TxHash, U64}; use ethers::types::{Address, Transaction, U256}; use futures::future::try_join_all; use futures::StreamExt; -use latency::{EwmaLatency, PeakEwmaLatency}; +use latency::{EwmaLatency, PeakEwmaLatency, RollingQuantileLatency}; use log::{debug, info, trace, warn, Level}; use migration::sea_orm::DatabaseConnection; use nanorand::Rng; @@ -62,17 +62,21 @@ pub struct Web3Rpc { pub(super) block_data_limit: AtomicU64, /// head_block is only inside an Option so that the "Default" derive works. it will always be set. pub(super) head_block: Option>>, - /// Track head block latency - pub(super) head_latency: RwLock, + /// Track head block latency. + /// RwLock is fine because this isn't updated often and is for monitoring. It is not used on the hot path. + pub(super) head_latency_ms: RwLock, /// Track peak request latency /// peak_latency is only inside an Option so that the "Default" derive works. it will always be set. pub(super) peak_latency: Option, /// Automatically set priority pub(super) tier: AtomicU32, - /// Track total requests served + /// Track total internal requests served pub(super) internal_requests: AtomicUsize, - /// Track total requests served + /// Track total external requests served pub(super) external_requests: AtomicUsize, + /// Track time used by external requests served + /// request_ms_histogram is only inside an Option so that the "Default" derive works. it will always be set. + pub(super) request_latency: Option, /// Track in-flight requests pub(super) active_requests: AtomicUsize, /// disconnect_watch is only inside an Option so that the "Default" derive works. it will always be set. @@ -168,6 +172,8 @@ impl Web3Rpc { Duration::from_secs(1), ); + let request_latency = RollingQuantileLatency::spawn_median(1_000).await; + let http_provider = if let Some(http_url) = config.http_url { let http_url = http_url.parse::()?; @@ -202,6 +208,7 @@ impl Web3Rpc { http_provider, name, peak_latency: Some(peak_latency), + request_latency: Some(request_latency), soft_limit: config.soft_limit, ws_url, disconnect_watch: Some(disconnect_watch), @@ -1121,7 +1128,7 @@ impl Serialize for Web3Rpc { S: Serializer, { // 3 is the number of fields in the struct. - let mut state = serializer.serialize_struct("Web3Rpc", 13)?; + let mut state = serializer.serialize_struct("Web3Rpc", 14)?; // the url is excluded because it likely includes private information. just show the name that we use in keys state.serialize_field("name", &self.name)?; @@ -1166,7 +1173,20 @@ impl Serialize for Web3Rpc { &self.active_requests.load(atomic::Ordering::Relaxed), )?; - state.serialize_field("head_latency_ms", &self.head_latency.read().value())?; + state.serialize_field( + "head_latency_ms", + &self.head_latency_ms.read().duration().as_millis(), + )?; + + state.serialize_field( + "request_latency_ms", + &self + .request_latency + .as_ref() + .unwrap() + .duration() + .as_millis(), + )?; state.serialize_field( "peak_latency_ms", diff --git a/web3_proxy/src/rpcs/request.rs b/web3_proxy/src/rpcs/request.rs index 25c75e2a..80b7a941 100644 --- a/web3_proxy/src/rpcs/request.rs +++ b/web3_proxy/src/rpcs/request.rs @@ -379,12 +379,16 @@ impl OpenRequestHandle { } } } - } else if let Some(peak_latency) = &self.rpc.peak_latency { - peak_latency.report(latency); - } else { - unreachable!("peak_latency not initialized"); } + self.rpc.peak_latency.as_ref().unwrap().report(latency); + self.rpc + .request_latency + .as_ref() + .unwrap() + .record(latency) + .await; + response } }