head latency instead of peak latency for now
This commit is contained in:
parent
685c1d039a
commit
c66eb6d864
|
@ -17,18 +17,19 @@ impl Serialize for EwmaLatency {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl EwmaLatency {
|
impl EwmaLatency {
|
||||||
#[inline(always)]
|
#[inline]
|
||||||
pub fn record(&mut self, duration: Duration) {
|
pub fn record(&mut self, duration: Duration) {
|
||||||
self.record_ms(duration.as_secs_f64() * 1000.0);
|
self.record_ms(duration.as_secs_f64() * 1000.0);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline(always)]
|
#[inline]
|
||||||
pub fn record_ms(&mut self, milliseconds: f64) {
|
pub fn record_ms(&mut self, milliseconds: f64) {
|
||||||
self.ewma.add(milliseconds);
|
// don't let it go under 0.1ms
|
||||||
|
self.ewma.add(milliseconds.max(0.1));
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Current EWMA value in milliseconds
|
/// Current EWMA value in milliseconds
|
||||||
#[inline(always)]
|
#[inline]
|
||||||
pub fn value(&self) -> f64 {
|
pub fn value(&self) -> f64 {
|
||||||
self.ewma.value()
|
self.ewma.value()
|
||||||
}
|
}
|
||||||
|
@ -36,10 +37,11 @@ impl EwmaLatency {
|
||||||
|
|
||||||
impl Default for EwmaLatency {
|
impl Default for EwmaLatency {
|
||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
// TODO: what should the default span be? 25 requests?
|
// TODO: what should the default span be? 10 requests?
|
||||||
let span = 25.0;
|
let span = 10.0;
|
||||||
|
|
||||||
let start = 1000.0;
|
// TODO: what should the defautt start be?
|
||||||
|
let start = 1.0;
|
||||||
|
|
||||||
Self::new(span, start)
|
Self::new(span, start)
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,7 +7,7 @@ use derive_more::Constructor;
|
||||||
use ethers::prelude::{H256, U64};
|
use ethers::prelude::{H256, U64};
|
||||||
use hashbrown::{HashMap, HashSet};
|
use hashbrown::{HashMap, HashSet};
|
||||||
use itertools::{Itertools, MinMaxResult};
|
use itertools::{Itertools, MinMaxResult};
|
||||||
use log::{trace, warn};
|
use log::{debug, info, trace, warn};
|
||||||
use moka::future::Cache;
|
use moka::future::Cache;
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
use std::cmp::Reverse;
|
use std::cmp::Reverse;
|
||||||
|
@ -266,15 +266,16 @@ impl ConsensusFinder {
|
||||||
async fn insert(&mut self, rpc: Arc<Web3Rpc>, block: Web3ProxyBlock) -> Option<Web3ProxyBlock> {
|
async fn insert(&mut self, rpc: Arc<Web3Rpc>, block: Web3ProxyBlock) -> Option<Web3ProxyBlock> {
|
||||||
let first_seen = self
|
let first_seen = self
|
||||||
.first_seen
|
.first_seen
|
||||||
.get_with_by_ref(block.hash(), async move { Instant::now() })
|
.get_with_by_ref(block.hash(), async { Instant::now() })
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
// TODO: this should be 0 if we are first seen, but i think it will be slightly non-zero.
|
// calculate elapsed time before trying to lock
|
||||||
// calculate elapsed time before trying to lock.
|
|
||||||
let latency = first_seen.elapsed();
|
let latency = first_seen.elapsed();
|
||||||
|
|
||||||
|
// record the time behind the fastest node
|
||||||
rpc.head_latency.write().record(latency);
|
rpc.head_latency.write().record(latency);
|
||||||
|
|
||||||
|
// update the local mapping of rpc -> block
|
||||||
self.rpc_heads.insert(rpc, block)
|
self.rpc_heads.insert(rpc, block)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -70,7 +70,6 @@ pub struct Web3Rpc {
|
||||||
/// Track head block latency
|
/// Track head block latency
|
||||||
pub(super) head_latency: RwLock<EwmaLatency>,
|
pub(super) head_latency: RwLock<EwmaLatency>,
|
||||||
/// Track peak request latency
|
/// Track peak request latency
|
||||||
///
|
|
||||||
/// This is only inside an Option so that the "Default" derive works. it will always be set.
|
/// This is only inside an Option so that the "Default" derive works. it will always be set.
|
||||||
pub(super) peak_latency: Option<PeakEwmaLatency>,
|
pub(super) peak_latency: Option<PeakEwmaLatency>,
|
||||||
/// Track total requests served
|
/// Track total requests served
|
||||||
|
@ -236,16 +235,18 @@ impl Web3Rpc {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn peak_ewma(&self) -> OrderedFloat<f64> {
|
pub fn peak_ewma(&self) -> OrderedFloat<f64> {
|
||||||
let peak_latency = if let Some(peak_latency) = self.peak_latency.as_ref() {
|
// TODO: bug inside peak ewma somewhere. possible with atomics being relaxed or the conversion to pair and back
|
||||||
peak_latency.latency().as_secs_f64()
|
// let peak_latency = if let Some(peak_latency) = self.peak_latency.as_ref() {
|
||||||
} else {
|
// peak_latency.latency().as_secs_f64()
|
||||||
0.0
|
// } else {
|
||||||
};
|
// 0.0
|
||||||
|
// };
|
||||||
|
let head_latency = self.head_latency.read().value();
|
||||||
|
|
||||||
// TODO: what ordering?
|
// TODO: what ordering?
|
||||||
let active_requests = self.active_requests.load(atomic::Ordering::Acquire) as f64 + 1.0;
|
let active_requests = self.active_requests.load(atomic::Ordering::Acquire) as f64 + 1.0;
|
||||||
|
|
||||||
OrderedFloat(peak_latency * active_requests)
|
OrderedFloat(head_latency * active_requests)
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: would be great if rpcs exposed this. see https://github.com/ledgerwatch/erigon/issues/6391
|
// TODO: would be great if rpcs exposed this. see https://github.com/ledgerwatch/erigon/issues/6391
|
||||||
|
|
Loading…
Reference in New Issue