fix decay time

This commit is contained in:
Bryan Stitt 2023-05-22 15:43:39 -07:00
parent 91eeee23e2
commit 3ac15558e3
8 changed files with 40 additions and 34 deletions

1
Cargo.lock generated

@ -2941,6 +2941,7 @@ name = "latency"
version = "0.1.0"
dependencies = [
"ewma",
"flume",
"log",
"serde",
"tokio",

@ -7,6 +7,7 @@ edition = "2021"
[dependencies]
ewma = "0.1.1"
flume = "0.10.14"
log = "0.4.17"
serde = { version = "1.0.163", features = [] }
tokio = { version = "1.28.1", features = ["full"] }

@ -2,9 +2,7 @@ mod rtt_estimate;
use std::sync::Arc;
use log::{error, trace};
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TrySendError;
use log::{error, log_enabled, trace};
use tokio::task::JoinHandle;
use tokio::time::{Duration, Instant};
@ -20,7 +18,7 @@ pub struct PeakEwmaLatency {
/// Join handle for the latency calculation task
pub join_handle: JoinHandle<()>,
/// Send to update with each request duration
request_tx: mpsc::Sender<Duration>,
request_tx: flume::Sender<Duration>,
/// Latency average and last update time
rtt_estimate: Arc<AtomicRttEstimate>,
/// Decay time
@ -32,9 +30,12 @@ impl PeakEwmaLatency {
///
/// Returns a handle that can also be used to read the current
/// average latency.
pub fn spawn(decay_ns: f64, buf_size: usize, start_latency: Duration) -> Self {
pub fn spawn(decay: Duration, buf_size: usize, start_latency: Duration) -> Self {
let decay_ns = decay.as_nanos() as f64;
debug_assert!(decay_ns > 0.0, "decay_ns must be positive");
let (request_tx, request_rx) = mpsc::channel(buf_size);
let (request_tx, request_rx) = flume::bounded(buf_size);
let rtt_estimate = Arc::new(AtomicRttEstimate::new(start_latency));
let task = PeakEwmaLatencyTask {
request_rx,
@ -56,15 +57,19 @@ impl PeakEwmaLatency {
let mut estimate = self.rtt_estimate.load();
let now = Instant::now();
assert!(
estimate.update_at <= now,
"update_at is {}ns in the future",
estimate.update_at.duration_since(now).as_nanos(),
);
// Update the RTT estimate to account for decay since the last update.
// TODO: having an update here means we don't actually write from just one thread!! Thats how we get partially written stuff i think
estimate.update(0.0, self.decay_ns, now)
if estimate.update_at > now {
if log_enabled!(log::Level::Trace) {
trace!(
"update_at is {}ns in the future",
estimate.update_at.duration_since(now).as_nanos()
);
}
estimate.rtt
} else {
// Update the RTT estimate to account for decay since the last update.
estimate.update(0.0, self.decay_ns, now)
}
}
/// Report latency from a single request
@ -73,15 +78,12 @@ impl PeakEwmaLatency {
pub fn report(&self, duration: Duration) {
match self.request_tx.try_send(duration) {
Ok(()) => {}
Err(TrySendError::Full(_)) => {
Err(err) => {
// We don't want to block if the channel is full, just
// report the error
error!("Latency report channel full");
error!("Latency report channel full. {}", err);
// TODO: could we spawn a new tokio task to report tthis later?
}
Err(TrySendError::Closed(_)) => {
unreachable!("Owner should keep channel open");
}
};
}
}
@ -90,7 +92,7 @@ impl PeakEwmaLatency {
#[derive(Debug)]
struct PeakEwmaLatencyTask {
/// Receive new request timings for update
request_rx: mpsc::Receiver<Duration>,
request_rx: flume::Receiver<Duration>,
/// Current estimate and update time
rtt_estimate: Arc<AtomicRttEstimate>,
/// Last update time, used for decay calculation
@ -101,8 +103,8 @@ struct PeakEwmaLatencyTask {
impl PeakEwmaLatencyTask {
/// Run the loop for updating latency
async fn run(mut self) {
while let Some(rtt) = self.request_rx.recv().await {
async fn run(self) {
while let Ok(rtt) = self.request_rx.recv_async().await {
self.update(rtt);
}
trace!("latency loop exited");
@ -128,14 +130,15 @@ impl PeakEwmaLatencyTask {
mod tests {
use tokio::time::{self, Duration};
use crate::util::nanos::NANOS_PER_MILLI;
/// The default RTT estimate decays, so that new nodes are considered if the
/// default RTT is too high.
#[tokio::test(start_paused = true)]
async fn default_decay() {
let estimate =
super::PeakEwmaLatency::spawn(NANOS_PER_MILLI * 1_000.0, 8, Duration::from_millis(10));
let estimate = super::PeakEwmaLatency::spawn(
Duration::from_millis(1_000),
8,
Duration::from_millis(10),
);
let load = estimate.latency();
assert_eq!(load, Duration::from_millis(10));

@ -44,6 +44,7 @@ impl RttEstimate {
);
Duration::from_nanos(next_estimate as u64)
};
self.rtt
}
@ -101,7 +102,7 @@ impl AtomicRttEstimate {
}
/// Fetches the value, and applies a function to it that returns an
/// new rtt. Retrns the new RttEstimate with new update_at.
/// new rtt. Returns the new RttEstimate with new update_at.
///
/// Automatically updates the update_at with Instant::now(). This
/// method omits ordering arguments, defaulting to Relaxed since

@ -91,3 +91,6 @@ tower-http = { version = "0.4.0", features = ["cors", "sensitive-headers"] }
ulid = { version = "1.0.0", features = ["uuid", "serde"] }
url = "2.3.1"
uuid = "1.3.3"
[dev-dependencies]
tokio = { version = "1.28.1", features = ["full", "test-util"] }

@ -1318,7 +1318,7 @@ mod tests {
#[cfg(test)]
fn new_peak_latency() -> PeakEwmaLatency {
const NANOS_PER_MILLI: f64 = 1_000_000.0;
PeakEwmaLatency::spawn(1_000.0 * NANOS_PER_MILLI, 4, Duration::from_secs(1))
PeakEwmaLatency::spawn(Duration::from_secs(1), 4, Duration::from_secs(1))
}
#[tokio::test]

@ -159,8 +159,7 @@ impl Web3Rpc {
// TODO Should these defaults be in config
let peak_latency = PeakEwmaLatency::spawn(
// Decay over 15s
// TODO! This is wrong! needs to be as_nanos!
Duration::from_secs(15).as_millis() as f64,
Duration::from_secs(15),
// Peak requests so far around 5k, we will use an order of magnitude
// more to be safe. Should only use about 50mb RAM
50_000,
@ -1220,7 +1219,7 @@ impl Serialize for Web3Rpc {
state.serialize_field(
"peak_latency",
&self.peak_latency.as_ref().unwrap().latency(),
&self.peak_latency.as_ref().unwrap().latency().as_millis(),
)?;
state.serialize_field("peak_ewma", self.peak_ewma().as_ref())?;

@ -356,9 +356,7 @@ impl OpenRequestHandle {
}
}
} else if let Some(peak_latency) = &self.rpc.peak_latency {
// trace!("updating peak_latency: {}", latency.as_secs_f64());
// peak_latency.report(latency);
trace!("peak latency disabled for now");
peak_latency.report(latency);
} else {
unreachable!("peak_latency not initialized");
}