move data to the right places

This commit is contained in:
Bryan Stitt 2023-06-18 21:42:16 -07:00
parent 58cc129837
commit c3ae4ded2b
4 changed files with 36 additions and 28 deletions

@ -29,6 +29,7 @@ impl EwmaLatency {
#[inline] #[inline]
pub fn record_secs(&mut self, secs: f32) { pub fn record_secs(&mut self, secs: f32) {
// TODO: we could change this to use a channel like the peak_ewma and rolling_quantile code, but this is fine if it updates on insert instead of async
self.seconds.update(secs); self.seconds.update(secs);
} }

@ -18,50 +18,61 @@ pub struct RollingQuantileLatency {
} }
/// Task to be spawned per-RollingMedianLatency for calculating the value /// Task to be spawned per-RollingMedianLatency for calculating the value
#[derive(Debug)]
struct RollingQuantileLatencyTask { struct RollingQuantileLatencyTask {
/// Receive to update each request duration /// Receive to update each request duration
latency_rx: flume::Receiver<f32>, latency_rx: flume::Receiver<f32>,
/// Current estimate and update time /// Current estimate and update time
seconds: Arc<AtomicF32>, seconds: Arc<AtomicF32>,
/// quantile value. /// quantile value.
/// WARNING! should be between 0 and 1. quantile: RollingQuantile<f32>,
q: f32,
/// Rolling window size.
window_size: usize,
} }
impl RollingQuantileLatencyTask { impl RollingQuantileLatencyTask {
/// Run the loop for updating latency. fn new(
async fn run(self) { latency_rx: flume::Receiver<f32>,
let mut q = RollingQuantile::new(self.q, self.window_size).unwrap(); seconds: Arc<AtomicF32>,
q: f32,
window_size: usize,
) -> Self {
let quantile = RollingQuantile::new(q, window_size).unwrap();
Self {
latency_rx,
seconds,
quantile,
}
}
/// Run the loop for updating latency.
async fn run(mut self) {
while let Ok(rtt) = self.latency_rx.recv_async().await { while let Ok(rtt) = self.latency_rx.recv_async().await {
self.update(&mut q, rtt); self.update(rtt);
} }
} }
/// Update the estimate object atomically. /// Update the estimate object atomically.
fn update(&self, q: &mut RollingQuantile<f32>, rtt: f32) { fn update(&mut self, rtt: f32) {
q.update(rtt); self.quantile.update(rtt);
self.seconds self.seconds
.store(q.get(), portable_atomic::Ordering::Relaxed); .store(self.quantile.get(), portable_atomic::Ordering::Relaxed);
} }
} }
impl RollingQuantileLatency { impl RollingQuantileLatency {
#[inline] #[inline]
pub async fn record(&self, duration: Duration) { pub fn record(&self, duration: Duration) {
self.record_secs(duration.as_secs_f32()).await self.record_secs(duration.as_secs_f32())
} }
/// if the channel is full (unlikely), entries will be silently dropped
#[inline] #[inline]
pub async fn record_secs(&self, secs: f32) { pub fn record_secs(&self, secs: f32) {
self.latency_tx.send_async(secs).await.unwrap() // self.latency_tx.send_async(secs).await.unwrap()
let _ = self.latency_tx.try_send(secs);
} }
/// Current median. /// Current .
#[inline] #[inline]
pub fn seconds(&self) -> f32 { pub fn seconds(&self) -> f32 {
self.seconds.load(portable_atomic::Ordering::Relaxed) self.seconds.load(portable_atomic::Ordering::Relaxed)
@ -81,12 +92,12 @@ impl RollingQuantileLatency {
let seconds = Arc::new(AtomicF32::new(0.0)); let seconds = Arc::new(AtomicF32::new(0.0));
let task = RollingQuantileLatencyTask { let task = RollingQuantileLatencyTask::new(
latency_rx, latency_rx,
seconds: seconds.clone(), seconds.clone(),
q: quantile_value, quantile_value,
window_size, window_size,
}; );
let join_handle = tokio::spawn(task.run()); let join_handle = tokio::spawn(task.run());

@ -10,7 +10,7 @@ use hashbrown::{HashMap, HashSet};
use hdrhistogram::serialization::{Serializer, V2DeflateSerializer}; use hdrhistogram::serialization::{Serializer, V2DeflateSerializer};
use hdrhistogram::Histogram; use hdrhistogram::Histogram;
use itertools::{Itertools, MinMaxResult}; use itertools::{Itertools, MinMaxResult};
use log::{debug, log_enabled, trace, warn, Level}; use log::{log_enabled, trace, warn, Level};
use moka::future::Cache; use moka::future::Cache;
use serde::Serialize; use serde::Serialize;
use std::cmp::{Ordering, Reverse}; use std::cmp::{Ordering, Reverse};

@ -381,13 +381,9 @@ impl OpenRequestHandle {
} }
} }
// TODO: benchmark spawning both of these
self.rpc.peak_latency.as_ref().unwrap().report(latency); self.rpc.peak_latency.as_ref().unwrap().report(latency);
self.rpc self.rpc.median_latency.as_ref().unwrap().record(latency);
.median_latency
.as_ref()
.unwrap()
.record(latency)
.await;
response response
} }