Peak ewma (#63)

* use peak-ewma instead of head for latency calculation

* Implement some suggested changes from PR

* move latency to new package in workspace root

* fix unit tests which now require peak_latency on Web3Rpc

* Switch to atomics for peak-ewma

This change is to avoid locking from tokio::sync::watch.

* add decay calculation to latency reads in peak-ewma

* Add some tests for peak-ewma

* Sensible latency defaults and not blocking on full

* Cleanup and a couple additional comments
This commit is contained in:
Rory Trent 2023-05-11 13:09:15 -07:00 committed by GitHub
parent 0531e2f8dd
commit ec11e210ee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 586 additions and 82 deletions

12
Cargo.lock generated

@ -3020,6 +3020,16 @@ dependencies = [
"regex",
]
[[package]]
name = "latency"
version = "0.1.0"
dependencies = [
"ewma",
"log",
"serde",
"tokio",
]
[[package]]
name = "lazy_static"
version = "1.4.0"
@ -6657,7 +6667,6 @@ dependencies = [
"entities",
"env_logger",
"ethers",
"ewma",
"fdlimit",
"flume",
"fstrings",
@ -6673,6 +6682,7 @@ dependencies = [
"influxdb2-structmap",
"ipnet",
"itertools",
"latency",
"log",
"migration",
"moka",

@ -2,6 +2,7 @@
members = [
"deferred-rate-limiter",
"entities",
"latency",
"migration",
"rate-counter",
"redis-rate-limiter",

15
latency/Cargo.toml Normal file

@ -0,0 +1,15 @@
[package]
name = "latency"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
ewma = "0.1.1"
log = "0.4.17"
serde = { version = "1.0.159", features = [] }
tokio = { version = "1.27.0", features = ["full"] }
[dev-dependencies]
tokio = { version = "1.27.0", features = ["full", "test-util"] }

67
latency/src/ewma.rs Normal file

@ -0,0 +1,67 @@
use serde::ser::Serializer;
use serde::Serialize;
use tokio::time::Duration;
pub struct EwmaLatency {
/// exponentially weighted moving average of how many milliseconds behind the fastest node we are
ewma: ewma::EWMA,
}
impl Serialize for EwmaLatency {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_f64(self.ewma.value())
}
}
impl EwmaLatency {
#[inline(always)]
pub fn record(&mut self, duration: Duration) {
self.record_ms(duration.as_secs_f64() * 1000.0);
}
#[inline(always)]
pub fn record_ms(&mut self, milliseconds: f64) {
self.ewma.add(milliseconds);
}
/// Current EWMA value in milliseconds
#[inline(always)]
pub fn value(&self) -> f64 {
self.ewma.value()
}
}
impl Default for EwmaLatency {
fn default() -> Self {
// TODO: what should the default span be? 25 requests?
let span = 25.0;
let start = 1000.0;
Self::new(span, start)
}
}
impl EwmaLatency {
// depending on the span, start might not be perfect
pub fn new(span: f64, start_ms: f64) -> Self {
let alpha = Self::span_to_alpha(span);
let mut ewma = ewma::EWMA::new(alpha);
if start_ms > 0.0 {
for _ in 0..(span as u64) {
ewma.add(start_ms);
}
}
Self { ewma }
}
fn span_to_alpha(span: f64) -> f64 {
2.0 / (span + 1.0)
}
}

5
latency/src/lib.rs Normal file

@ -0,0 +1,5 @@
mod ewma;
mod peak_ewma;
mod util;
pub use self::{ewma::EwmaLatency, peak_ewma::PeakEwmaLatency};

@ -0,0 +1,149 @@
mod rtt_estimate;
use std::sync::Arc;
use log::error;
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TrySendError;
use tokio::task::JoinHandle;
use tokio::time::{Duration, Instant};
use self::rtt_estimate::AtomicRttEstimate;
use crate::util::nanos::nanos;
/// Latency calculation using Peak EWMA algorithm
///
/// Updates are done in a separate task to avoid locking or race
/// conditions. Reads may happen on any thread.
#[derive(Debug)]
pub struct PeakEwmaLatency {
/// Join handle for the latency calculation task
pub join_handle: JoinHandle<()>,
/// Send to update with each request duration
request_tx: mpsc::Sender<Duration>,
/// Latency average and last update time
rtt_estimate: Arc<AtomicRttEstimate>,
/// Decay time
decay_ns: f64,
}
impl PeakEwmaLatency {
/// Spawn the task for calculating peak request latency
///
/// Returns a handle that can also be used to read the current
/// average latency.
pub fn spawn(decay_ns: f64, buf_size: usize, start_latency: Duration) -> Self {
debug_assert!(decay_ns > 0.0, "decay_ns must be positive");
let (request_tx, request_rx) = mpsc::channel(buf_size);
let rtt_estimate = Arc::new(AtomicRttEstimate::new(start_latency));
let task = PeakEwmaLatencyTask {
request_rx,
rtt_estimate: rtt_estimate.clone(),
update_at: Instant::now(),
decay_ns,
};
let join_handle = tokio::spawn(task.run());
Self {
join_handle,
request_tx,
rtt_estimate,
decay_ns,
}
}
/// Get the current peak-ewma latency estimate
pub fn latency(&self) -> Duration {
let mut estimate = self.rtt_estimate.load();
let now = Instant::now();
debug_assert!(
estimate.update_at <= now,
"update_at={:?} in the future",
estimate.update_at,
);
// Update the RTT estimate to account for decay since the last update.
estimate.update(0.0, self.decay_ns, now)
}
/// Report latency from a single request
///
/// Should only be called from the Web3Rpc that owns it.
pub fn report(&self, duration: Duration) {
match self.request_tx.try_send(duration) {
Ok(()) => {}
Err(TrySendError::Full(_)) => {
// We don't want to block if the channel is full, just
// report the error
error!("Latency report channel full");
// TODO: could we spawn a new tokio task to report tthis later?
}
Err(TrySendError::Closed(_)) => {
unreachable!("Owner should keep channel open");
}
};
//.expect("Owner should keep channel open");
}
}
/// Task to be spawned per-Web3Rpc for calculating the peak request latency
#[derive(Debug)]
struct PeakEwmaLatencyTask {
/// Receive new request timings for update
request_rx: mpsc::Receiver<Duration>,
/// Current estimate and update time
rtt_estimate: Arc<AtomicRttEstimate>,
/// Last update time, used for decay calculation
update_at: Instant,
/// Decay time
decay_ns: f64,
}
impl PeakEwmaLatencyTask {
/// Run the loop for updating latency
async fn run(mut self) {
while let Some(rtt) = self.request_rx.recv().await {
self.update(rtt);
}
}
/// Update the estimate object atomically.
fn update(&mut self, rtt: Duration) {
let rtt = nanos(rtt);
let now = Instant::now();
debug_assert!(
self.update_at <= now,
"update_at={:?} in the future",
self.update_at,
);
self.rtt_estimate
.fetch_update(|mut rtt_estimate| rtt_estimate.update(rtt, self.decay_ns, now));
}
}
#[cfg(test)]
mod tests {
use tokio::time::{self, Duration};
use crate::util::nanos::NANOS_PER_MILLI;
/// The default RTT estimate decays, so that new nodes are considered if the
/// default RTT is too high.
#[tokio::test(start_paused = true)]
async fn default_decay() {
let estimate =
super::PeakEwmaLatency::spawn(NANOS_PER_MILLI * 1_000.0, 8, Duration::from_millis(10));
let load = estimate.latency();
assert_eq!(load, Duration::from_millis(10));
time::advance(Duration::from_millis(100)).await;
let load = estimate.latency();
assert!(Duration::from_millis(9) < load && load < Duration::from_millis(10));
time::advance(Duration::from_millis(100)).await;
let load = estimate.latency();
assert!(Duration::from_millis(8) < load && load < Duration::from_millis(9));
}
}

@ -0,0 +1,168 @@
use std::sync::atomic::Ordering;
use log::trace;
use tokio::time::{Duration, Instant};
use crate::util::atomic_f32_pair::AtomicF32Pair;
use crate::util::nanos::{nanos, NANOS_PER_MILLI};
/// Holds the current RTT estimate and the last time this value was updated.
#[derive(Debug)]
pub struct RttEstimate {
pub update_at: Instant,
pub rtt: Duration,
}
impl RttEstimate {
/// Update the estimate with a new rtt value. Use rtt=0.0 for simply
/// decaying the current value.
pub fn update(&mut self, rtt: f64, decay_ns: f64, now: Instant) -> Duration {
let ewma = nanos(self.rtt);
self.rtt = if ewma < rtt {
// For Peak-EWMA, always use the worst-case (peak) value as the estimate for
// subsequent requests.
trace!(
"update peak rtt={}ms prior={}ms",
rtt / NANOS_PER_MILLI,
ewma / NANOS_PER_MILLI,
);
Duration::from_nanos(rtt as u64)
} else {
// When a latency is observed that is less than the estimated latency, we decay the
// prior estimate according to how much time has elapsed since the last
// update. The inverse of the decay is used to scale the estimate towards the
// observed latency value.
let elapsed = nanos(now.saturating_duration_since(self.update_at));
let decay = (-elapsed / decay_ns).exp();
let recency = 1.0 - decay;
let next_estimate = (ewma * decay) + (rtt * recency);
trace!(
"update duration={:03.0}ms decay={:06.0}ns; next={:03.0}ms",
rtt / NANOS_PER_MILLI,
ewma - next_estimate,
next_estimate / NANOS_PER_MILLI,
);
Duration::from_nanos(next_estimate as u64)
};
self.rtt
}
/// Build a new estimate object using current time.
fn new(start_duration: Duration) -> Self {
Self {
update_at: Instant::now(),
rtt: start_duration,
}
}
/// Convert to pair of f32
fn as_pair(&self, start_time: Instant) -> [f32; 2] {
let update_at = self
.update_at
.saturating_duration_since(start_time)
.as_secs_f32();
let rtt = self.rtt.as_secs_f32();
[update_at, rtt]
}
/// Build from pair of f32
fn from_pair(pair: [f32; 2], start_time: Instant) -> Self {
let update_at = start_time + Duration::from_secs_f32(pair[0]);
let rtt = Duration::from_secs_f32(pair[1]);
Self { update_at, rtt }
}
}
/// Atomic storage of RttEstimate using AtomicF32Pair
///
/// Start time is needed to (de-)serialize the update_at instance.
#[derive(Debug)]
pub struct AtomicRttEstimate {
pair: AtomicF32Pair,
start_time: Instant,
}
impl AtomicRttEstimate {
/// Creates a new atomic rtt estimate.
pub fn new(start_duration: Duration) -> Self {
let estimate = RttEstimate::new(start_duration);
Self {
pair: AtomicF32Pair::new(estimate.as_pair(estimate.update_at)),
start_time: estimate.update_at,
}
}
/// Loads a value from the atomic rtt estimate.
///
/// This method omits the ordering argument since loads may use
/// slightly stale data to avoid adding additional latency.
pub fn load(&self) -> RttEstimate {
RttEstimate::from_pair(self.pair.load(Ordering::Relaxed), self.start_time)
}
/// Fetches the value, and applies a function to it that returns an
/// new rtt. Retrns the new RttEstimate with new update_at.
///
/// Automatically updates the update_at with Instant::now(). This
/// method omits ordering arguments, defaulting to Relaxed since
/// all writes are serial and any reads may rely on slightly stale
/// data.
pub fn fetch_update<F>(&self, mut f: F) -> RttEstimate
where
F: FnMut(RttEstimate) -> Duration,
{
let mut update_at = Instant::now();
let mut rtt = Duration::ZERO;
self.pair
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |pair| {
rtt = f(RttEstimate::from_pair(pair, self.start_time));
// Save the new update_at inside the function in case it
// is run multiple times
update_at = Instant::now();
Some(RttEstimate { rtt, update_at }.as_pair(self.start_time))
})
.expect("Should never Err");
RttEstimate { update_at, rtt }
}
}
#[cfg(test)]
mod tests {
use tokio::time::{self, Duration, Instant};
use super::{AtomicRttEstimate, RttEstimate};
#[test]
fn test_rtt_estimate_f32_conversions() {
let rtt = Duration::from_secs(1);
let expected = RttEstimate::new(rtt);
let actual =
RttEstimate::from_pair(expected.as_pair(expected.update_at), expected.update_at);
assert_eq!(expected.update_at, actual.update_at);
assert_eq!(expected.rtt, actual.rtt);
}
#[tokio::test(start_paused = true)]
async fn test_atomic_rtt_estimate_load() {
let rtt = Duration::from_secs(1);
let estimate = AtomicRttEstimate::new(rtt);
let actual = estimate.load();
assert_eq!(Instant::now(), actual.update_at);
assert_eq!(rtt, actual.rtt);
}
#[tokio::test(start_paused = true)]
async fn test_atomic_rtt_estimate_fetch_update() {
let start_time = Instant::now();
let rtt = Duration::from_secs(1);
let estimate = AtomicRttEstimate::new(rtt);
time::advance(Duration::from_secs(1)).await;
let rv = estimate.fetch_update(|value| {
assert_eq!(start_time, value.update_at);
assert_eq!(rtt, value.rtt);
Duration::from_secs(2)
});
assert_eq!(start_time + Duration::from_secs(1), rv.update_at);
assert_eq!(Duration::from_secs(2), rv.rtt);
}
}

@ -0,0 +1,88 @@
use std::sync::atomic::{AtomicU64, Ordering};
/// Implements an atomic pair of f32s
///
/// This uses an AtomicU64 internally.
#[derive(Debug)]
pub struct AtomicF32Pair(AtomicU64);
impl AtomicF32Pair {
/// Creates a new atomic pair.
pub fn new(pair: [f32; 2]) -> Self {
Self(AtomicU64::new(to_bits(pair)))
}
/// Loads a value from the atomic pair.
pub fn load(&self, ordering: Ordering) -> [f32; 2] {
from_bits(self.0.load(ordering))
}
/// Fetches the value, and applies a function to it that returns an
/// optional new value. Returns a Result of Ok(previous_value) if
/// the function returned Some(_), else Err(previous_value).
pub fn fetch_update<F>(
&self,
set_order: Ordering,
fetch_order: Ordering,
mut f: F,
) -> Result<[f32; 2], [f32; 2]>
where
F: FnMut([f32; 2]) -> Option<[f32; 2]>,
{
self.0
.fetch_update(set_order, fetch_order, |bits| {
f(from_bits(bits)).map(to_bits)
})
.map(from_bits)
.map_err(from_bits)
}
}
/// Convert a f32 pair to its bit-representation as u64
fn to_bits(pair: [f32; 2]) -> u64 {
let f1 = pair[0].to_bits() as u64;
let f2 = pair[1].to_bits() as u64;
(f1 << 32) | f2
}
/// Build a f32 pair from its bit-representation as u64
fn from_bits(bits: u64) -> [f32; 2] {
let f1 = f32::from_bits((bits >> 32) as u32);
let f2 = f32::from_bits(bits as u32);
[f1, f2]
}
#[cfg(test)]
mod tests {
use std::sync::atomic::Ordering;
use super::{from_bits, to_bits, AtomicF32Pair};
#[test]
fn test_f32_pair_bit_conversions() {
let pair = [3.14159, 2.71828];
assert_eq!(pair, from_bits(to_bits(pair)));
}
#[test]
fn test_atomic_f32_pair_load() {
let pair = [3.14159, 2.71828];
let atomic = AtomicF32Pair::new(pair);
assert_eq!(pair, atomic.load(Ordering::Relaxed));
}
#[test]
fn test_atomic_f32_pair_fetch_update() {
let pair = [3.14159, 2.71828];
let atomic = AtomicF32Pair::new(pair);
atomic
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |[f1, f2]| {
Some([f1 + 1.0, f2 + 1.0])
})
.unwrap();
assert_eq!(
[pair[0] + 1.0, pair[1] + 1.0],
atomic.load(Ordering::Relaxed)
);
}
}

2
latency/src/util/mod.rs Normal file

@ -0,0 +1,2 @@
pub(crate) mod atomic_f32_pair;
pub(crate) mod nanos;

30
latency/src/util/nanos.rs Normal file

@ -0,0 +1,30 @@
use tokio::time::Duration;
pub const NANOS_PER_MILLI: f64 = 1_000_000.0;
/// Utility that converts durations to nanos in f64.
///
/// Due to a lossy transformation, the maximum value that can be represented is ~585 years,
/// which, I hope, is more than enough to represent request latencies.
pub fn nanos(d: Duration) -> f64 {
const NANOS_PER_SEC: u64 = 1_000_000_000;
let n = f64::from(d.subsec_nanos());
let s = d.as_secs().saturating_mul(NANOS_PER_SEC) as f64;
n + s
}
#[cfg(test)]
mod tests {
use tokio::time::Duration;
#[test]
fn nanos() {
assert_eq!(super::nanos(Duration::new(0, 0)), 0.0);
assert_eq!(super::nanos(Duration::new(0, 123)), 123.0);
assert_eq!(super::nanos(Duration::new(1, 23)), 1_000_000_023.0);
assert_eq!(
super::nanos(Duration::new(::std::u64::MAX, 999_999_999)),
18446744074709553000.0
);
}
}

@ -15,6 +15,7 @@ rdkafka-src = ["rdkafka/cmake-build", "rdkafka/libz", "rdkafka/ssl", "rdkafka/zs
[dependencies]
deferred-rate-limiter = { path = "../deferred-rate-limiter" }
entities = { path = "../entities" }
latency = { path = "../latency" }
migration = { path = "../migration" }
redis-rate-limiter = { path = "../redis-rate-limiter" }
thread-fast-rng = { path = "../thread-fast-rng" }
@ -37,7 +38,6 @@ derive_more = "0.99.17"
dotenv = "0.15.0"
env_logger = "0.10.0"
ethers = { version = "2.0.3", default-features = false, features = ["rustls", "ws"] }
ewma = "0.1.1"
fdlimit = "0.2.1"
flume = "0.10.14"
fstrings = "0.2"

@ -1282,13 +1282,16 @@ fn rpc_sync_status_sort_key(x: &Arc<Web3Rpc>) -> (Reverse<U64>, u64, bool, Order
// TODO: use request latency instead of head latency
// TODO: have the latency decay automatically
let head_ewma = x.head_latency.read().value();
let peak_latency = x
.peak_latency
.as_ref()
.expect("peak_latency uniniialized")
.latency();
let active_requests = x.active_requests.load(atomic::Ordering::Relaxed) as f64;
// TODO: i'm not sure head * active is exactly right. but we'll see
// TODO: i don't think this actually counts as peak. investigate with atomics.rs and peak_ewma.rs
let peak_ewma = OrderedFloat(head_ewma * active_requests);
let peak_ewma = OrderedFloat(peak_latency.as_millis() as f64 * (active_requests + 1.0));
let backup = x.backup;
@ -1296,18 +1299,26 @@ fn rpc_sync_status_sort_key(x: &Arc<Web3Rpc>) -> (Reverse<U64>, u64, bool, Order
}
mod tests {
// TODO: why is this allow needed? does tokio::test get in the way somehow?
#![allow(unused_imports)]
use std::time::{SystemTime, UNIX_EPOCH};
use super::*;
use crate::rpcs::consensus::ConsensusFinder;
use crate::rpcs::{blockchain::Web3ProxyBlock, provider::Web3Provider};
use ethers::types::{Block, U256};
use latency::PeakEwmaLatency;
use log::{trace, LevelFilter};
use parking_lot::RwLock;
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::sync::RwLock as AsyncRwLock;
#[cfg(test)]
fn new_peak_latency() -> PeakEwmaLatency {
const NANOS_PER_MILLI: f64 = 1_000_000.0;
PeakEwmaLatency::spawn(1_000.0 * NANOS_PER_MILLI, 4, Duration::from_secs(1))
}
#[tokio::test]
async fn test_sort_connections_by_sync_status() {
let block_0 = Block {
@ -1338,36 +1349,42 @@ mod tests {
name: "a".to_string(),
tier: 0,
head_block: RwLock::new(None),
peak_latency: Some(new_peak_latency()),
..Default::default()
},
Web3Rpc {
name: "b".to_string(),
tier: 0,
head_block: RwLock::new(blocks.get(1).cloned()),
peak_latency: Some(new_peak_latency()),
..Default::default()
},
Web3Rpc {
name: "c".to_string(),
tier: 0,
head_block: RwLock::new(blocks.get(2).cloned()),
peak_latency: Some(new_peak_latency()),
..Default::default()
},
Web3Rpc {
name: "d".to_string(),
tier: 1,
head_block: RwLock::new(None),
peak_latency: Some(new_peak_latency()),
..Default::default()
},
Web3Rpc {
name: "e".to_string(),
tier: 1,
head_block: RwLock::new(blocks.get(1).cloned()),
peak_latency: Some(new_peak_latency()),
..Default::default()
},
Web3Rpc {
name: "f".to_string(),
tier: 1,
head_block: RwLock::new(blocks.get(2).cloned()),
peak_latency: Some(new_peak_latency()),
..Default::default()
},
]
@ -1425,6 +1442,7 @@ mod tests {
tier: 0,
head_block: RwLock::new(Some(head_block.clone())),
provider: AsyncRwLock::new(Some(Arc::new(Web3Provider::Mock))),
peak_latency: Some(new_peak_latency()),
..Default::default()
};
@ -1437,6 +1455,7 @@ mod tests {
tier: 0,
head_block: RwLock::new(Some(lagged_block.clone())),
provider: AsyncRwLock::new(Some(Arc::new(Web3Provider::Mock))),
peak_latency: Some(new_peak_latency()),
..Default::default()
};
@ -1781,6 +1800,7 @@ mod tests {
tier: 0,
head_block: RwLock::new(Some(block_1.clone())),
provider: AsyncRwLock::new(Some(Arc::new(Web3Provider::Mock))),
peak_latency: Some(new_peak_latency()),
..Default::default()
};
@ -1793,6 +1813,7 @@ mod tests {
tier: 1,
head_block: RwLock::new(Some(block_2.clone())),
provider: AsyncRwLock::new(Some(Arc::new(Web3Provider::Mock))),
peak_latency: Some(new_peak_latency()),
..Default::default()
};

@ -12,6 +12,7 @@ use ethers::prelude::{Bytes, Middleware, ProviderError, TxHash, H256, U64};
use ethers::types::{Address, Transaction, U256};
use futures::future::try_join_all;
use futures::StreamExt;
use latency::{EwmaLatency, PeakEwmaLatency};
use log::{debug, error, info, trace, warn, Level};
use migration::sea_orm::DatabaseConnection;
use ordered_float::OrderedFloat;
@ -30,69 +31,6 @@ use thread_fast_rng::thread_fast_rng;
use tokio::sync::{broadcast, oneshot, watch, RwLock as AsyncRwLock};
use tokio::time::{sleep, sleep_until, timeout, Duration, Instant};
pub struct Latency {
/// exponentially weighted moving average of how many milliseconds behind the fastest node we are
ewma: ewma::EWMA,
}
impl Serialize for Latency {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_f64(self.ewma.value())
}
}
impl Latency {
#[inline(always)]
pub fn record(&mut self, duration: Duration) {
self.record_ms(duration.as_secs_f64() * 1000.0);
}
#[inline(always)]
pub fn record_ms(&mut self, milliseconds: f64) {
self.ewma.add(milliseconds);
}
#[inline(always)]
pub fn value(&self) -> f64 {
self.ewma.value()
}
}
impl Default for Latency {
fn default() -> Self {
// TODO: what should the default span be? 25 requests? have a "new"
let span = 25.0;
let start = 1000.0;
Self::new(span, start)
}
}
impl Latency {
// depending on the span, start might not be perfect
pub fn new(span: f64, start: f64) -> Self {
let alpha = Self::span_to_alpha(span);
let mut ewma = ewma::EWMA::new(alpha);
if start > 0.0 {
for _ in 0..(span as u64) {
ewma.add(start);
}
}
Self { ewma }
}
fn span_to_alpha(span: f64) -> f64 {
2.0 / (span + 1.0)
}
}
/// An active connection to a Web3 RPC server like geth or erigon.
#[derive(Default)]
pub struct Web3Rpc {
@ -127,10 +65,11 @@ pub struct Web3Rpc {
/// TODO: change this to a watch channel so that http providers can subscribe and take action on change.
pub(super) head_block: RwLock<Option<Web3ProxyBlock>>,
/// Track head block latency
pub(super) head_latency: RwLock<Latency>,
// /// Track request latency
// /// TODO: refactor this. this lock kills perf. for now just use head_latency
// pub(super) request_latency: RwLock<Latency>,
pub(super) head_latency: RwLock<EwmaLatency>,
/// Track peak request latency
///
/// This is only inside an Option so that the "Default" derive works. it will always be set.
pub(super) peak_latency: Option<PeakEwmaLatency>,
/// Track total requests served
/// TODO: maybe move this to graphana
pub(super) total_requests: AtomicUsize,
@ -227,6 +166,18 @@ impl Web3Rpc {
let (disconnect_sender, disconnect_receiver) = watch::channel(false);
let reconnect = reconnect.into();
// Spawn the task for calculting average peak latency
// TODO Should these defaults be in config
let peak_latency = PeakEwmaLatency::spawn(
// Decay over 15s
Duration::from_secs(15).as_millis() as f64,
// Peak requests so far around 5k, we will use an order of magnitude
// more to be safe. Should only use about 50mb RAM
50_000,
// Start latency at 1 second
Duration::from_secs(1),
);
let new_connection = Self {
name,
db_conn: db_conn.clone(),
@ -245,6 +196,7 @@ impl Web3Rpc {
disconnect_watch: Some(disconnect_sender),
created_at: Some(created_at),
head_block: RwLock::new(Default::default()),
peak_latency: Some(peak_latency),
..Default::default()
};

@ -194,7 +194,7 @@ impl OpenRequestHandle {
.active_requests
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
// let latency = Instant::now();
let start = Instant::now();
// TODO: replace ethers-rs providers with our own that supports streaming the responses
let response = match provider.as_ref() {
@ -366,14 +366,10 @@ impl OpenRequestHandle {
tokio::spawn(f);
}
}
} else if let Some(peak_latency) = &self.rpc.peak_latency {
peak_latency.report(start.elapsed());
} else {
// TODO: record request latency
// let latency_ms = start.elapsed().as_secs_f64() * 1000.0;
// TODO: is this lock here a problem? should this be done through a channel? i started to code it, but it didn't seem to matter
// let mut latency_recording = self.rpc.request_latency.write();
// latency_recording.record(latency_ms);
unreachable!("peak_latency not initialized");
}
response