web3-proxy/latency/src/rolling_quantile.rs

127 lines
3.2 KiB
Rust
Raw Normal View History

2023-06-18 19:47:40 +03:00
use std::sync::Arc;
use portable_atomic::{AtomicF32, Ordering};
use serde::ser::Serializer;
use serde::Serialize;
use tokio::sync::mpsc;
2023-06-18 19:47:40 +03:00
use tokio::task::JoinHandle;
use tokio::time::Duration;
use watermill::quantile::RollingQuantile;
use watermill::stats::Univariate;
pub struct RollingQuantileLatency {
/// Join handle for the latency calculation task.
pub join_handle: JoinHandle<()>,
/// Send to update with each request duration.
latency_tx: mpsc::Sender<f32>,
2023-06-18 19:47:40 +03:00
/// rolling quantile latency in seconds. Updated async.
seconds: Arc<AtomicF32>,
}
/// Task to be spawned per-RollingMedianLatency for calculating the value
struct RollingQuantileLatencyTask {
/// Receive to update each request duration
latency_rx: mpsc::Receiver<f32>,
2023-06-18 19:47:40 +03:00
/// Current estimate and update time
seconds: Arc<AtomicF32>,
/// quantile value.
2023-06-19 07:42:16 +03:00
quantile: RollingQuantile<f32>,
2023-06-18 19:47:40 +03:00
}
impl RollingQuantileLatencyTask {
2023-06-19 07:42:16 +03:00
fn new(
latency_rx: mpsc::Receiver<f32>,
2023-06-19 07:42:16 +03:00
seconds: Arc<AtomicF32>,
q: f32,
window_size: usize,
) -> Self {
let quantile = RollingQuantile::new(q, window_size).unwrap();
2023-06-18 19:47:40 +03:00
2023-06-19 07:42:16 +03:00
Self {
latency_rx,
seconds,
quantile,
}
}
/// Run the loop for updating latency.
async fn run(mut self) {
while let Some(rtt) = self.latency_rx.recv().await {
2023-06-19 07:42:16 +03:00
self.update(rtt);
2023-06-18 19:47:40 +03:00
}
}
/// Update the estimate object atomically.
2023-06-19 07:42:16 +03:00
fn update(&mut self, rtt: f32) {
self.quantile.update(rtt);
2023-06-18 19:47:40 +03:00
self.seconds
2023-06-19 07:42:16 +03:00
.store(self.quantile.get(), portable_atomic::Ordering::Relaxed);
2023-06-18 19:47:40 +03:00
}
}
impl RollingQuantileLatency {
#[inline]
2023-06-19 07:42:16 +03:00
pub fn record(&self, duration: Duration) {
self.record_secs(duration.as_secs_f32())
2023-06-18 19:47:40 +03:00
}
2023-06-19 07:42:16 +03:00
/// if the channel is full (unlikely), entries will be silently dropped
2023-06-18 19:47:40 +03:00
#[inline]
2023-06-19 07:42:16 +03:00
pub fn record_secs(&self, secs: f32) {
// self.latency_tx.send_async(secs).await.unwrap()
let _ = self.latency_tx.try_send(secs);
2023-06-18 19:47:40 +03:00
}
2023-06-19 07:42:16 +03:00
/// Current .
2023-06-18 19:47:40 +03:00
#[inline]
pub fn seconds(&self) -> f32 {
self.seconds.load(portable_atomic::Ordering::Relaxed)
}
/// Current median.
#[inline]
pub fn latency(&self) -> Duration {
2023-06-18 19:47:40 +03:00
Duration::from_secs_f32(self.seconds())
}
}
impl RollingQuantileLatency {
pub async fn spawn(quantile_value: f32, window_size: usize) -> Self {
// TODO: how should queue size and window size be related?
let (latency_tx, latency_rx) = mpsc::channel(window_size);
2023-06-18 19:47:40 +03:00
let seconds = Arc::new(AtomicF32::new(0.0));
2023-06-19 07:42:16 +03:00
let task = RollingQuantileLatencyTask::new(
2023-06-18 19:47:40 +03:00
latency_rx,
2023-06-19 07:42:16 +03:00
seconds.clone(),
quantile_value,
2023-06-18 19:47:40 +03:00
window_size,
2023-06-19 07:42:16 +03:00
);
2023-06-18 19:47:40 +03:00
let join_handle = tokio::spawn(task.run());
Self {
join_handle,
latency_tx,
seconds,
}
}
#[inline]
pub async fn spawn_median(window_size: usize) -> Self {
Self::spawn(0.5, window_size).await
}
}
/// serialize as seconds
impl Serialize for RollingQuantileLatency {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_f32(self.seconds.load(Ordering::Relaxed))
}
}