rolling median for choosing tier

This commit is contained in:
Bryan Stitt 2023-06-18 09:47:40 -07:00
parent 8b22c9861b
commit e54299beca
11 changed files with 267 additions and 74 deletions

36
Cargo.lock generated

@ -2058,6 +2058,16 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2f20267f3a8b678b7151c0c508002e79126144a5d47badddec7f31ddc1f4c754"
[[package]]
name = "exponential-decay-histogram"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "55d9dc1064c0b1bc8c691c0ea539385bc6f299f5e5e6050583d34fdb032e9935"
dependencies = [
"ordered-float",
"rand",
]
[[package]]
name = "eyre"
version = "0.6.8"
@ -3145,11 +3155,12 @@ checksum = "d3c48237b9604c5a4702de6b824e02006c3214327564636aef27c1028a8fa0ed"
name = "latency"
version = "0.1.0"
dependencies = [
"ewma",
"flume",
"log",
"portable-atomic",
"serde",
"tokio",
"watermill",
]
[[package]]
@ -3733,6 +3744,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2fc2dbde8f8a79f2102cc474ceb0ad68e3b80b85289ea62389b60e66777e4213"
dependencies = [
"num-traits",
"rand",
"serde",
]
[[package]]
@ -4161,6 +4174,12 @@ dependencies = [
"windows-sys 0.48.0",
]
[[package]]
name = "portable-atomic"
version = "1.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "767eb9f07d4a5ebcb39bbf2d452058a93c011373abf6832e24194a1c3f004794"
[[package]]
name = "ppv-lite86"
version = "0.2.17"
@ -4395,6 +4414,7 @@ dependencies = [
"libc",
"rand_chacha",
"rand_core",
"serde",
]
[[package]]
@ -4414,6 +4434,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c"
dependencies = [
"getrandom",
"serde",
]
[[package]]
@ -6898,6 +6919,18 @@ dependencies = [
"web-sys",
]
[[package]]
name = "watermill"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b85594e8299160103ff39146412bf32711b11af3856d70c9b5539211adcbc502"
dependencies = [
"num",
"ordered-float",
"serde",
"serde_json",
]
[[package]]
name = "web-sys"
version = "0.3.64"
@ -6931,6 +6964,7 @@ dependencies = [
"ethbloom",
"ethers",
"ewma",
"exponential-decay-histogram",
"fdlimit",
"flume",
"fstrings",

@ -6,11 +6,12 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
ewma = "0.1.1"
flume = "0.10.14"
log = "0.4.19"
portable-atomic = { version = "1.3.3", features = ["float"] }
serde = { version = "1.0.164", features = [] }
tokio = { version = "1.28.2", features = ["full"] }
watermill = "0.1.1"
[dev-dependencies]
tokio = { version = "1.28.2", features = ["full", "test-util"] }

@ -1,37 +1,49 @@
use crate::util::span::span_to_alpha;
use serde::ser::Serializer;
use serde::Serialize;
use tokio::time::Duration;
use watermill::ewmean::EWMean;
use watermill::stats::Univariate;
pub struct EwmaLatency {
/// exponentially weighted of some latency in milliseconds
ewma: ewma::EWMA,
/// TODO: compare crates: ewma vs watermill
seconds: EWMean<f32>,
}
/// serialize as milliseconds
impl Serialize for EwmaLatency {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_f64(self.ewma.value())
serializer.serialize_f32(self.seconds.get() * 1000.0)
}
}
impl EwmaLatency {
#[inline]
pub fn record(&mut self, duration: Duration) {
self.record_ms(duration.as_secs_f64() * 1000.0);
self.record_secs(duration.as_secs_f32());
}
#[inline]
pub fn record_ms(&mut self, milliseconds: f64) {
// don't let it go under 0.1ms
self.ewma.add(milliseconds.max(0.1));
pub fn record_secs(&mut self, secs: f32) {
self.seconds.update(secs);
}
/// Current EWMA value in milliseconds
/// Current EWMA value in seconds
#[inline]
pub fn value(&self) -> f64 {
self.ewma.value()
pub fn value(&self) -> f32 {
self.seconds.get()
}
/// Current EWMA value in seconds
#[inline]
pub fn duration(&self) -> Duration {
let x = self.seconds.get();
Duration::from_secs_f32(x)
}
}
@ -49,21 +61,17 @@ impl Default for EwmaLatency {
impl EwmaLatency {
// depending on the span, start might not be perfect
pub fn new(span: f64, start_ms: f64) -> Self {
let alpha = Self::span_to_alpha(span);
pub fn new(span: f32, start_ms: f32) -> Self {
let alpha = span_to_alpha(span);
let mut ewma = ewma::EWMA::new(alpha);
let mut seconds = EWMean::new(alpha);
if start_ms > 0.0 {
for _ in 0..(span as u64) {
ewma.add(start_ms);
seconds.update(start_ms);
}
}
Self { ewma }
}
fn span_to_alpha(span: f64) -> f64 {
2.0 / (span + 1.0)
Self { seconds }
}
}

@ -1,5 +1,8 @@
mod ewma;
mod peak_ewma;
mod rolling_quantile;
mod util;
pub use self::{ewma::EwmaLatency, peak_ewma::PeakEwmaLatency};
pub use self::ewma::EwmaLatency;
pub use self::peak_ewma::PeakEwmaLatency;
pub use self::rolling_quantile::RollingQuantileLatency;

@ -0,0 +1,114 @@
use std::sync::Arc;
use portable_atomic::{AtomicF32, Ordering};
use serde::ser::Serializer;
use serde::Serialize;
use tokio::task::JoinHandle;
use tokio::time::Duration;
use watermill::quantile::RollingQuantile;
use watermill::stats::Univariate;
pub struct RollingQuantileLatency {
/// Join handle for the latency calculation task.
pub join_handle: JoinHandle<()>,
/// Send to update with each request duration.
latency_tx: flume::Sender<f32>,
/// rolling quantile latency in seconds. Updated async.
seconds: Arc<AtomicF32>,
}
/// Task to be spawned per-RollingMedianLatency for calculating the value
#[derive(Debug)]
struct RollingQuantileLatencyTask {
/// Receive to update each request duration
latency_rx: flume::Receiver<f32>,
/// Current estimate and update time
seconds: Arc<AtomicF32>,
/// quantile value.
/// WARNING! should be between 0 and 1.
q: f32,
/// Rolling window size.
window_size: usize,
}
impl RollingQuantileLatencyTask {
/// Run the loop for updating latency.
async fn run(self) {
let mut q = RollingQuantile::new(self.q, self.window_size).unwrap();
while let Ok(rtt) = self.latency_rx.recv_async().await {
self.update(&mut q, rtt);
}
}
/// Update the estimate object atomically.
fn update(&self, q: &mut RollingQuantile<f32>, rtt: f32) {
q.update(rtt);
self.seconds
.store(q.get(), portable_atomic::Ordering::Relaxed);
}
}
impl RollingQuantileLatency {
#[inline]
pub async fn record(&self, duration: Duration) {
self.record_secs(duration.as_secs_f32()).await
}
#[inline]
pub async fn record_secs(&self, secs: f32) {
self.latency_tx.send_async(secs).await.unwrap()
}
/// Current median.
#[inline]
pub fn seconds(&self) -> f32 {
self.seconds.load(portable_atomic::Ordering::Relaxed)
}
/// Current median.
#[inline]
pub fn duration(&self) -> Duration {
Duration::from_secs_f32(self.seconds())
}
}
impl RollingQuantileLatency {
pub async fn spawn(quantile_value: f32, window_size: usize) -> Self {
// TODO: how should queue size and window size be related?
let (latency_tx, latency_rx) = flume::bounded(window_size);
let seconds = Arc::new(AtomicF32::new(0.0));
let task = RollingQuantileLatencyTask {
latency_rx,
seconds: seconds.clone(),
q: quantile_value,
window_size,
};
let join_handle = tokio::spawn(task.run());
Self {
join_handle,
latency_tx,
seconds,
}
}
#[inline]
pub async fn spawn_median(window_size: usize) -> Self {
Self::spawn(0.5, window_size).await
}
}
/// serialize as seconds
impl Serialize for RollingQuantileLatency {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_f32(self.seconds.load(Ordering::Relaxed))
}
}

@ -1,2 +1,3 @@
pub(crate) mod atomic_f32_pair;
pub(crate) mod nanos;
pub(crate) mod span;

4
latency/src/util/span.rs Normal file

@ -0,0 +1,4 @@
// TODO: generic for any float
pub fn span_to_alpha(span: f32) -> f32 {
2.0 / (span + 1.0)
}

@ -103,6 +103,7 @@ tracing-subscriber = "0.3"
ulid = { version = "1.0.0", features = ["rand", "uuid", "serde"] }
url = "2.4.0"
uuid = { version = "1.3.4", default-features = false, features = ["fast-rng", "serde", "v4", "zerocopy"] }
exponential-decay-histogram = "0.1.10"
[dev-dependencies]
tokio = { version = "1.28.2", features = ["full", "test-util"] }

@ -3,7 +3,6 @@ use super::many::Web3Rpcs;
use super::one::Web3Rpc;
use crate::errors::{Web3ProxyErrorContext, Web3ProxyResult};
use crate::frontend::authorization::Authorization;
use anyhow::Context;
use base64::engine::general_purpose;
use derive_more::Constructor;
use ethers::prelude::{H256, U64};
@ -384,7 +383,9 @@ impl ConsensusFinder {
let latency = first_seen.elapsed();
// record the time behind the fastest node
rpc.head_latency.write().record(latency);
rpc.head_latency_ms
.write()
.record_secs(latency.as_secs_f32());
// update the local mapping of rpc -> block
self.rpc_heads.insert(rpc, block)
@ -438,51 +439,57 @@ impl ConsensusFinder {
0 => {}
1 => {
for rpc in self.rpc_heads.keys() {
rpc.tier.store(0, atomic::Ordering::Relaxed)
rpc.tier.store(1, atomic::Ordering::Relaxed)
}
}
_ => {
// iterate first to find bounds
let mut min_latency = u64::MAX;
let mut max_latency = u64::MIN;
let mut weighted_latencies = HashMap::new();
// min_latency_sec is actual min_median_latency_sec
let mut min_median_latency_sec = f32::MAX;
let mut max_median_latency_sec = f32::MIN;
let mut median_latencies_sec = HashMap::new();
for rpc in self.rpc_heads.keys() {
let weighted_latency_seconds = rpc.weighted_peak_ewma_seconds();
let median_latency_sec = rpc
.request_latency
.as_ref()
.map(|x| x.seconds())
.unwrap_or_default();
let weighted_latency_ms = (weighted_latency_seconds * 1000.0).round() as i64;
min_median_latency_sec = min_median_latency_sec.min(median_latency_sec);
max_median_latency_sec = min_median_latency_sec.max(median_latency_sec);
let weighted_latency_ms: u64 = weighted_latency_ms
.try_into()
.context("weighted_latency_ms does not fit in a u64")?;
min_latency = min_latency.min(weighted_latency_ms);
max_latency = min_latency.max(weighted_latency_ms);
weighted_latencies.insert(rpc, weighted_latency_ms);
median_latencies_sec.insert(rpc, median_latency_sec);
}
// // histogram requires high to be at least 2 x low
// // using min_latency for low does not work how we want it though
max_latency = max_latency.max(1000);
// create the histogram
let mut hist = Histogram::<u32>::new_with_bounds(1, max_latency, 3).unwrap();
// TODO: resize shouldn't be necessary, but i've seen it error
hist.auto(true);
for weighted_latency_ms in weighted_latencies.values() {
hist.record(*weighted_latency_ms)?;
}
// dev logging
// dev logging of a histogram
if log_enabled!(Level::Trace) {
// convert to ms because the histogram needs ints
let max_median_latency_ms = (max_median_latency_sec * 1000.0).ceil() as u64;
// create the histogram
// histogram requires high to be at least 2 x low
// using min_latency for low does not work how we want it though
// so just set the default range = 1ms..1s
let hist_low = 1;
let hist_high = max_median_latency_ms.max(1_000);
let mut hist_ms =
Histogram::<u32>::new_with_bounds(hist_low, hist_high, 3).unwrap();
// TODO: resize shouldn't be necessary, but i've seen it error
hist_ms.auto(true);
for median_sec in median_latencies_sec.values() {
let median_ms = (median_sec * 1000.0).round() as u64;
hist_ms.record(median_ms)?;
}
// print the histogram. see docs/histograms.txt for more info
let mut encoder =
base64::write::EncoderWriter::new(Vec::new(), &general_purpose::STANDARD);
V2DeflateSerializer::new()
.serialize(&hist, &mut encoder)
.serialize(&hist_ms, &mut encoder)
.unwrap();
let encoded = encoder.finish().unwrap();
@ -493,20 +500,16 @@ impl ConsensusFinder {
}
// TODO: get someone who is better at math to do something smarter. maybe involving stddev?
let divisor = 30f64.max(min_latency as f64 / 2.0);
// bucket sizes of the larger of 30ms or 1/2 the lowest latency
let tier_sec_size = 30f32.max(min_median_latency_sec / 2.0);
for (rpc, weighted_latency_ms) in weighted_latencies.into_iter() {
let tier = (weighted_latency_ms - min_latency) as f64 / divisor;
for (rpc, median_latency_sec) in median_latencies_sec.into_iter() {
let tier = (median_latency_sec - min_median_latency_sec) / tier_sec_size;
// start tiers at 1
let tier = (tier.floor() as u32).saturating_add(1);
// TODO: this should be trace
trace!(
"{} - weighted_latency: {}ms, tier {}",
rpc,
weighted_latency_ms,
tier
);
trace!("{} - p50_sec: {}, tier {}", rpc, median_latency_sec, tier);
rpc.tier.store(tier, atomic::Ordering::Relaxed);
}

@ -14,7 +14,7 @@ use ethers::prelude::{Bytes, Middleware, TxHash, U64};
use ethers::types::{Address, Transaction, U256};
use futures::future::try_join_all;
use futures::StreamExt;
use latency::{EwmaLatency, PeakEwmaLatency};
use latency::{EwmaLatency, PeakEwmaLatency, RollingQuantileLatency};
use log::{debug, info, trace, warn, Level};
use migration::sea_orm::DatabaseConnection;
use nanorand::Rng;
@ -62,17 +62,21 @@ pub struct Web3Rpc {
pub(super) block_data_limit: AtomicU64,
/// head_block is only inside an Option so that the "Default" derive works. it will always be set.
pub(super) head_block: Option<watch::Sender<Option<Web3ProxyBlock>>>,
/// Track head block latency
pub(super) head_latency: RwLock<EwmaLatency>,
/// Track head block latency.
/// RwLock is fine because this isn't updated often and is for monitoring. It is not used on the hot path.
pub(super) head_latency_ms: RwLock<EwmaLatency>,
/// Track peak request latency
/// peak_latency is only inside an Option so that the "Default" derive works. it will always be set.
pub(super) peak_latency: Option<PeakEwmaLatency>,
/// Automatically set priority
pub(super) tier: AtomicU32,
/// Track total requests served
/// Track total internal requests served
pub(super) internal_requests: AtomicUsize,
/// Track total requests served
/// Track total external requests served
pub(super) external_requests: AtomicUsize,
/// Track time used by external requests served
/// request_ms_histogram is only inside an Option so that the "Default" derive works. it will always be set.
pub(super) request_latency: Option<RollingQuantileLatency>,
/// Track in-flight requests
pub(super) active_requests: AtomicUsize,
/// disconnect_watch is only inside an Option so that the "Default" derive works. it will always be set.
@ -168,6 +172,8 @@ impl Web3Rpc {
Duration::from_secs(1),
);
let request_latency = RollingQuantileLatency::spawn_median(1_000).await;
let http_provider = if let Some(http_url) = config.http_url {
let http_url = http_url.parse::<Url>()?;
@ -202,6 +208,7 @@ impl Web3Rpc {
http_provider,
name,
peak_latency: Some(peak_latency),
request_latency: Some(request_latency),
soft_limit: config.soft_limit,
ws_url,
disconnect_watch: Some(disconnect_watch),
@ -1121,7 +1128,7 @@ impl Serialize for Web3Rpc {
S: Serializer,
{
// 3 is the number of fields in the struct.
let mut state = serializer.serialize_struct("Web3Rpc", 13)?;
let mut state = serializer.serialize_struct("Web3Rpc", 14)?;
// the url is excluded because it likely includes private information. just show the name that we use in keys
state.serialize_field("name", &self.name)?;
@ -1166,7 +1173,20 @@ impl Serialize for Web3Rpc {
&self.active_requests.load(atomic::Ordering::Relaxed),
)?;
state.serialize_field("head_latency_ms", &self.head_latency.read().value())?;
state.serialize_field(
"head_latency_ms",
&self.head_latency_ms.read().duration().as_millis(),
)?;
state.serialize_field(
"request_latency_ms",
&self
.request_latency
.as_ref()
.unwrap()
.duration()
.as_millis(),
)?;
state.serialize_field(
"peak_latency_ms",

@ -379,12 +379,16 @@ impl OpenRequestHandle {
}
}
}
} else if let Some(peak_latency) = &self.rpc.peak_latency {
peak_latency.report(latency);
} else {
unreachable!("peak_latency not initialized");
}
self.rpc.peak_latency.as_ref().unwrap().report(latency);
self.rpc
.request_latency
.as_ref()
.unwrap()
.record(latency)
.await;
response
}
}