move floats and durations around for latency tracking

This commit is contained in:
Bryan Stitt 2023-06-18 10:46:22 -07:00
parent 2f0e6103ac
commit 825ba006f1
9 changed files with 53 additions and 53 deletions

@ -430,7 +430,7 @@ These are not yet ordered. There might be duplicates. We might not actually need
- [ ] cli for adding rpc keys to an existing user - [ ] cli for adding rpc keys to an existing user
- [ ] rename "private" to "mev protected" to avoid confusion about private transactions being public once they are mined - [ ] rename "private" to "mev protected" to avoid confusion about private transactions being public once they are mined
- [ ] allow restricting an rpc key to specific chains - [ ] allow restricting an rpc key to specific chains
- [-] writes to request_latency should be handled by a background task so they don't slow down the request - [-] writes to median_request_latency should be handled by a background task so they don't slow down the request
- [ ] keep re-broadcasting transactions until they are confirmed - [ ] keep re-broadcasting transactions until they are confirmed
- [ ] if mev protection is disabled, we should send to *both* balanced_rpcs *and* private_rps - [ ] if mev protection is disabled, we should send to *both* balanced_rpcs *and* private_rps
- [x] if mev protection is enabled, we should sent to *only* private_rpcs - [x] if mev protection is enabled, we should sent to *only* private_rpcs

@ -40,7 +40,7 @@ impl EwmaLatency {
/// Current EWMA value in seconds /// Current EWMA value in seconds
#[inline] #[inline]
pub fn duration(&self) -> Duration { pub fn latency(&self) -> Duration {
let x = self.seconds.get(); let x = self.seconds.get();
Duration::from_secs_f32(x) Duration::from_secs_f32(x)

@ -69,7 +69,7 @@ impl RollingQuantileLatency {
/// Current median. /// Current median.
#[inline] #[inline]
pub fn duration(&self) -> Duration { pub fn latency(&self) -> Duration {
Duration::from_secs_f32(self.seconds()) Duration::from_secs_f32(self.seconds())
} }
} }

@ -495,9 +495,11 @@ impl Web3ProxyApp {
// create semaphores for concurrent connection limits // create semaphores for concurrent connection limits
// TODO: how can we implement time til idle? // TODO: how can we implement time til idle?
// TODO: what should tti be for semaphores? // TODO: what should tti be for semaphores?
let bearer_token_semaphores = Cache::new(max_users); let bearer_token_semaphores = CacheBuilder::new(max_users)
let ip_semaphores = Cache::new(max_users); .name("bearer_token_semaphores")
let user_semaphores = Cache::new(max_users); .build();
let ip_semaphores = CacheBuilder::new(max_users).name("ip_semaphores").build();
let user_semaphores = CacheBuilder::new(max_users).name("user_semaphores").build();
let chain_id = top_config.app.chain_id; let chain_id = top_config.app.chain_id;

@ -24,7 +24,7 @@ struct BackendRpcData<'a> {
active_requests: u64, active_requests: u64,
internal_requests: u64, internal_requests: u64,
external_requests: u64, external_requests: u64,
head_latency_ms: f64, head_delay_ms: f64,
peak_latency_ms: f64, peak_latency_ms: f64,
weighted_latency_ms: f64, weighted_latency_ms: f64,
} }
@ -95,7 +95,7 @@ impl PopularityContestSubCommand {
highest_block = highest_block.max(head_block); highest_block = highest_block.max(head_block);
let head_latency_ms = conn.get("head_latency_ms").unwrap().as_f64().unwrap(); let head_delay_ms = conn.get("head_delay_ms").unwrap().as_f64().unwrap();
let peak_latency_ms = conn let peak_latency_ms = conn
.get("peak_latency_ms") .get("peak_latency_ms")
@ -116,7 +116,7 @@ impl PopularityContestSubCommand {
internal_requests, internal_requests,
external_requests, external_requests,
head_block, head_block,
head_latency_ms, head_delay_ms,
peak_latency_ms, peak_latency_ms,
weighted_latency_ms, weighted_latency_ms,
}; };
@ -178,7 +178,7 @@ impl PopularityContestSubCommand {
rpc.active_requests, rpc.active_requests,
lag, lag,
block_data_limit, block_data_limit,
format!("{:.3}", rpc.head_latency_ms), format!("{:.3}", rpc.head_delay_ms),
rpc.peak_latency_ms, rpc.peak_latency_ms,
format!("{:.3}", rpc.weighted_latency_ms), format!("{:.3}", rpc.weighted_latency_ms),
tier, tier,

@ -342,7 +342,7 @@ pub struct ConsensusFinder {
max_head_block_age: Option<Duration>, max_head_block_age: Option<Duration>,
/// tier 0 will be prefered as long as the distance between it and the other tiers is <= max_tier_lag /// tier 0 will be prefered as long as the distance between it and the other tiers is <= max_tier_lag
max_head_block_lag: Option<U64>, max_head_block_lag: Option<U64>,
/// Block Hash -> First Seen Instant. used to track rpc.head_latency. The same cache should be shared between all ConnectionsGroups /// Block Hash -> First Seen Instant. used to track rpc.head_delay. The same cache should be shared between all ConnectionsGroups
first_seen: FirstSeenCache, first_seen: FirstSeenCache,
} }
@ -383,9 +383,7 @@ impl ConsensusFinder {
let latency = first_seen.elapsed(); let latency = first_seen.elapsed();
// record the time behind the fastest node // record the time behind the fastest node
rpc.head_latency_ms rpc.head_delay.write().record_secs(latency.as_secs_f32());
.write()
.record_secs(latency.as_secs_f32());
// update the local mapping of rpc -> block // update the local mapping of rpc -> block
self.rpc_heads.insert(rpc, block) self.rpc_heads.insert(rpc, block)
@ -450,7 +448,7 @@ impl ConsensusFinder {
let mut median_latencies_sec = HashMap::new(); let mut median_latencies_sec = HashMap::new();
for rpc in self.rpc_heads.keys() { for rpc in self.rpc_heads.keys() {
let median_latency_sec = rpc let median_latency_sec = rpc
.request_latency .median_latency
.as_ref() .as_ref()
.map(|x| x.seconds()) .map(|x| x.seconds())
.unwrap_or_default(); .unwrap_or_default();

@ -460,7 +460,7 @@ impl Web3Rpcs {
trace!("{} vs {}", rpc_a, rpc_b); trace!("{} vs {}", rpc_a, rpc_b);
// TODO: cached key to save a read lock // TODO: cached key to save a read lock
// TODO: ties to the server with the smallest block_data_limit // TODO: ties to the server with the smallest block_data_limit
let faster_rpc = min_by_key(rpc_a, rpc_b, |x| x.weighted_peak_ewma_seconds()); let faster_rpc = min_by_key(rpc_a, rpc_b, |x| x.weighted_peak_latency());
trace!("winner: {}", faster_rpc); trace!("winner: {}", faster_rpc);
// add to the skip list in case this one fails // add to the skip list in case this one fails

@ -18,7 +18,6 @@ use latency::{EwmaLatency, PeakEwmaLatency, RollingQuantileLatency};
use log::{debug, info, trace, warn, Level}; use log::{debug, info, trace, warn, Level};
use migration::sea_orm::DatabaseConnection; use migration::sea_orm::DatabaseConnection;
use nanorand::Rng; use nanorand::Rng;
use ordered_float::OrderedFloat;
use parking_lot::RwLock; use parking_lot::RwLock;
use redis_rate_limiter::{RedisPool, RedisRateLimitResult, RedisRateLimiter}; use redis_rate_limiter::{RedisPool, RedisRateLimitResult, RedisRateLimiter};
use serde::ser::{SerializeStruct, Serializer}; use serde::ser::{SerializeStruct, Serializer};
@ -64,7 +63,7 @@ pub struct Web3Rpc {
pub(super) head_block: Option<watch::Sender<Option<Web3ProxyBlock>>>, pub(super) head_block: Option<watch::Sender<Option<Web3ProxyBlock>>>,
/// Track head block latency. /// Track head block latency.
/// RwLock is fine because this isn't updated often and is for monitoring. It is not used on the hot path. /// RwLock is fine because this isn't updated often and is for monitoring. It is not used on the hot path.
pub(super) head_latency_ms: RwLock<EwmaLatency>, pub(super) head_delay: RwLock<EwmaLatency>,
/// Track peak request latency /// Track peak request latency
/// peak_latency is only inside an Option so that the "Default" derive works. it will always be set. /// peak_latency is only inside an Option so that the "Default" derive works. it will always be set.
pub(super) peak_latency: Option<PeakEwmaLatency>, pub(super) peak_latency: Option<PeakEwmaLatency>,
@ -76,7 +75,7 @@ pub struct Web3Rpc {
pub(super) external_requests: AtomicUsize, pub(super) external_requests: AtomicUsize,
/// Track time used by external requests served /// Track time used by external requests served
/// request_ms_histogram is only inside an Option so that the "Default" derive works. it will always be set. /// request_ms_histogram is only inside an Option so that the "Default" derive works. it will always be set.
pub(super) request_latency: Option<RollingQuantileLatency>, pub(super) median_latency: Option<RollingQuantileLatency>,
/// Track in-flight requests /// Track in-flight requests
pub(super) active_requests: AtomicUsize, pub(super) active_requests: AtomicUsize,
/// disconnect_watch is only inside an Option so that the "Default" derive works. it will always be set. /// disconnect_watch is only inside an Option so that the "Default" derive works. it will always be set.
@ -172,7 +171,7 @@ impl Web3Rpc {
Duration::from_secs(1), Duration::from_secs(1),
); );
let request_latency = RollingQuantileLatency::spawn_median(1_000).await; let median_request_latency = RollingQuantileLatency::spawn_median(1_000).await;
let http_provider = if let Some(http_url) = config.http_url { let http_provider = if let Some(http_url) = config.http_url {
let http_url = http_url.parse::<Url>()?; let http_url = http_url.parse::<Url>()?;
@ -208,7 +207,7 @@ impl Web3Rpc {
http_provider, http_provider,
name, name,
peak_latency: Some(peak_latency), peak_latency: Some(peak_latency),
request_latency: Some(request_latency), median_latency: Some(median_request_latency),
soft_limit: config.soft_limit, soft_limit: config.soft_limit,
ws_url, ws_url,
disconnect_watch: Some(disconnect_watch), disconnect_watch: Some(disconnect_watch),
@ -266,19 +265,19 @@ impl Web3Rpc {
pub fn sort_for_load_balancing_on( pub fn sort_for_load_balancing_on(
&self, &self,
max_block: Option<U64>, max_block: Option<U64>,
) -> ((bool, u32, Reverse<U64>), OrderedFloat<f64>) { ) -> ((bool, u32, Reverse<U64>), Duration) {
let sort_on = self.sort_on(max_block); let sort_on = self.sort_on(max_block);
let weighted_peak_ewma_seconds = self.weighted_peak_ewma_seconds(); let weighted_peak_latency = self.weighted_peak_latency();
let x = (sort_on, weighted_peak_ewma_seconds); let x = (sort_on, weighted_peak_latency);
trace!("sort_for_load_balancing {}: {:?}", self, x); trace!("sort_for_load_balancing {}: {:?}", self, x);
x x
} }
/// like sort_for_load_balancing, but shuffles tiers randomly instead of sorting by weighted_peak_ewma_seconds /// like sort_for_load_balancing, but shuffles tiers randomly instead of sorting by weighted_peak_latency
pub fn shuffle_for_load_balancing_on( pub fn shuffle_for_load_balancing_on(
&self, &self,
max_block: Option<U64>, max_block: Option<U64>,
@ -292,17 +291,17 @@ impl Web3Rpc {
(sort_on, r) (sort_on, r)
} }
pub fn weighted_peak_ewma_seconds(&self) -> OrderedFloat<f64> { pub fn weighted_peak_latency(&self) -> Duration {
let peak_latency = if let Some(peak_latency) = self.peak_latency.as_ref() { let peak_latency = if let Some(peak_latency) = self.peak_latency.as_ref() {
peak_latency.latency().as_secs_f64() peak_latency.latency()
} else { } else {
1.0 Duration::from_secs(1)
}; };
// TODO: what ordering? // TODO: what ordering?
let active_requests = self.active_requests.load(atomic::Ordering::Acquire) as f64 + 1.0; let active_requests = self.active_requests.load(atomic::Ordering::Acquire) as f32 + 1.0;
OrderedFloat(peak_latency * active_requests) peak_latency.mul_f32(active_requests)
} }
// TODO: would be great if rpcs exposed this. see https://github.com/ledgerwatch/erigon/issues/6391 // TODO: would be great if rpcs exposed this. see https://github.com/ledgerwatch/erigon/issues/6391
@ -697,7 +696,7 @@ impl Web3Rpc {
let rpc = self.clone(); let rpc = self.clone();
// TODO: how often? different depending on the chain? // TODO: how often? different depending on the chain?
// TODO: reset this timeout when a new block is seen? we need to keep request_latency updated though // TODO: reset this timeout when a new block is seen? we need to keep median_request_latency updated though
let health_sleep_seconds = 5; let health_sleep_seconds = 5;
// health check loop // health check loop
@ -1173,29 +1172,30 @@ impl Serialize for Web3Rpc {
&self.active_requests.load(atomic::Ordering::Relaxed), &self.active_requests.load(atomic::Ordering::Relaxed),
)?; )?;
state.serialize_field( {
"head_latency_ms", let head_delay_ms = self.head_delay.read().latency().as_secs_f32() * 1000.0;
&self.head_latency_ms.read().duration().as_millis(), state.serialize_field("head_delay_ms", &(head_delay_ms))?;
)?; }
state.serialize_field(
"request_latency_ms",
&self
.request_latency
.as_ref()
.unwrap()
.duration()
.as_millis(),
)?;
state.serialize_field(
"peak_latency_ms",
&self.peak_latency.as_ref().unwrap().latency().as_millis(),
)?;
{ {
let weighted_latency_ms = self.weighted_peak_ewma_seconds() * 1000.0; let median_latency_ms = self
state.serialize_field("weighted_latency_ms", weighted_latency_ms.as_ref())?; .median_latency
.as_ref()
.unwrap()
.latency()
.as_secs_f32()
* 1000.0;
state.serialize_field("median_latency_ms", &(median_latency_ms))?;
}
{
let peak_latency_ms =
self.peak_latency.as_ref().unwrap().latency().as_secs_f32() * 1000.0;
state.serialize_field("peak_latency_ms", &peak_latency_ms)?;
}
{
let weighted_latency_ms = self.weighted_peak_latency().as_secs_f32() * 1000.0;
state.serialize_field("weighted_latency_ms", &weighted_latency_ms)?;
} }
state.end() state.end()

@ -383,7 +383,7 @@ impl OpenRequestHandle {
self.rpc.peak_latency.as_ref().unwrap().report(latency); self.rpc.peak_latency.as_ref().unwrap().report(latency);
self.rpc self.rpc
.request_latency .median_latency
.as_ref() .as_ref()
.unwrap() .unwrap()
.record(latency) .record(latency)