From 825ba006f188ad965ec79e1a86f0be99dab7c6e7 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Sun, 18 Jun 2023 10:46:22 -0700 Subject: [PATCH] move floats and durations around for latency tracking --- TODO.md | 2 +- latency/src/ewma.rs | 2 +- latency/src/rolling_quantile.rs | 2 +- web3_proxy/src/app/mod.rs | 8 ++- .../bin/web3_proxy_cli/popularity_contest.rs | 8 +-- web3_proxy/src/rpcs/consensus.rs | 8 +-- web3_proxy/src/rpcs/many.rs | 2 +- web3_proxy/src/rpcs/one.rs | 72 +++++++++---------- web3_proxy/src/rpcs/request.rs | 2 +- 9 files changed, 53 insertions(+), 53 deletions(-) diff --git a/TODO.md b/TODO.md index 631b9c2a..fc124fb4 100644 --- a/TODO.md +++ b/TODO.md @@ -430,7 +430,7 @@ These are not yet ordered. There might be duplicates. We might not actually need - [ ] cli for adding rpc keys to an existing user - [ ] rename "private" to "mev protected" to avoid confusion about private transactions being public once they are mined - [ ] allow restricting an rpc key to specific chains -- [-] writes to request_latency should be handled by a background task so they don't slow down the request +- [-] writes to median_request_latency should be handled by a background task so they don't slow down the request - [ ] keep re-broadcasting transactions until they are confirmed - [ ] if mev protection is disabled, we should send to *both* balanced_rpcs *and* private_rps - [x] if mev protection is enabled, we should sent to *only* private_rpcs diff --git a/latency/src/ewma.rs b/latency/src/ewma.rs index 549564b1..82c7d764 100644 --- a/latency/src/ewma.rs +++ b/latency/src/ewma.rs @@ -40,7 +40,7 @@ impl EwmaLatency { /// Current EWMA value in seconds #[inline] - pub fn duration(&self) -> Duration { + pub fn latency(&self) -> Duration { let x = self.seconds.get(); Duration::from_secs_f32(x) diff --git a/latency/src/rolling_quantile.rs b/latency/src/rolling_quantile.rs index 17b4d962..f2514454 100644 --- a/latency/src/rolling_quantile.rs +++ b/latency/src/rolling_quantile.rs @@ -69,7 +69,7 @@ impl RollingQuantileLatency { /// Current median. #[inline] - pub fn duration(&self) -> Duration { + pub fn latency(&self) -> Duration { Duration::from_secs_f32(self.seconds()) } } diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 6c0cd669..79e6cd0f 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -495,9 +495,11 @@ impl Web3ProxyApp { // create semaphores for concurrent connection limits // TODO: how can we implement time til idle? // TODO: what should tti be for semaphores? - let bearer_token_semaphores = Cache::new(max_users); - let ip_semaphores = Cache::new(max_users); - let user_semaphores = Cache::new(max_users); + let bearer_token_semaphores = CacheBuilder::new(max_users) + .name("bearer_token_semaphores") + .build(); + let ip_semaphores = CacheBuilder::new(max_users).name("ip_semaphores").build(); + let user_semaphores = CacheBuilder::new(max_users).name("user_semaphores").build(); let chain_id = top_config.app.chain_id; diff --git a/web3_proxy/src/bin/web3_proxy_cli/popularity_contest.rs b/web3_proxy/src/bin/web3_proxy_cli/popularity_contest.rs index fecc2523..b1dc161e 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/popularity_contest.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/popularity_contest.rs @@ -24,7 +24,7 @@ struct BackendRpcData<'a> { active_requests: u64, internal_requests: u64, external_requests: u64, - head_latency_ms: f64, + head_delay_ms: f64, peak_latency_ms: f64, weighted_latency_ms: f64, } @@ -95,7 +95,7 @@ impl PopularityContestSubCommand { highest_block = highest_block.max(head_block); - let head_latency_ms = conn.get("head_latency_ms").unwrap().as_f64().unwrap(); + let head_delay_ms = conn.get("head_delay_ms").unwrap().as_f64().unwrap(); let peak_latency_ms = conn .get("peak_latency_ms") @@ -116,7 +116,7 @@ impl PopularityContestSubCommand { internal_requests, external_requests, head_block, - head_latency_ms, + head_delay_ms, peak_latency_ms, weighted_latency_ms, }; @@ -178,7 +178,7 @@ impl PopularityContestSubCommand { rpc.active_requests, lag, block_data_limit, - format!("{:.3}", rpc.head_latency_ms), + format!("{:.3}", rpc.head_delay_ms), rpc.peak_latency_ms, format!("{:.3}", rpc.weighted_latency_ms), tier, diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index e20fa38c..8b10bc19 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -342,7 +342,7 @@ pub struct ConsensusFinder { max_head_block_age: Option, /// tier 0 will be prefered as long as the distance between it and the other tiers is <= max_tier_lag max_head_block_lag: Option, - /// Block Hash -> First Seen Instant. used to track rpc.head_latency. The same cache should be shared between all ConnectionsGroups + /// Block Hash -> First Seen Instant. used to track rpc.head_delay. The same cache should be shared between all ConnectionsGroups first_seen: FirstSeenCache, } @@ -383,9 +383,7 @@ impl ConsensusFinder { let latency = first_seen.elapsed(); // record the time behind the fastest node - rpc.head_latency_ms - .write() - .record_secs(latency.as_secs_f32()); + rpc.head_delay.write().record_secs(latency.as_secs_f32()); // update the local mapping of rpc -> block self.rpc_heads.insert(rpc, block) @@ -450,7 +448,7 @@ impl ConsensusFinder { let mut median_latencies_sec = HashMap::new(); for rpc in self.rpc_heads.keys() { let median_latency_sec = rpc - .request_latency + .median_latency .as_ref() .map(|x| x.seconds()) .unwrap_or_default(); diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 21db4d8e..e2fe0da6 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -460,7 +460,7 @@ impl Web3Rpcs { trace!("{} vs {}", rpc_a, rpc_b); // TODO: cached key to save a read lock // TODO: ties to the server with the smallest block_data_limit - let faster_rpc = min_by_key(rpc_a, rpc_b, |x| x.weighted_peak_ewma_seconds()); + let faster_rpc = min_by_key(rpc_a, rpc_b, |x| x.weighted_peak_latency()); trace!("winner: {}", faster_rpc); // add to the skip list in case this one fails diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index f0b03127..38c12c32 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -18,7 +18,6 @@ use latency::{EwmaLatency, PeakEwmaLatency, RollingQuantileLatency}; use log::{debug, info, trace, warn, Level}; use migration::sea_orm::DatabaseConnection; use nanorand::Rng; -use ordered_float::OrderedFloat; use parking_lot::RwLock; use redis_rate_limiter::{RedisPool, RedisRateLimitResult, RedisRateLimiter}; use serde::ser::{SerializeStruct, Serializer}; @@ -64,7 +63,7 @@ pub struct Web3Rpc { pub(super) head_block: Option>>, /// Track head block latency. /// RwLock is fine because this isn't updated often and is for monitoring. It is not used on the hot path. - pub(super) head_latency_ms: RwLock, + pub(super) head_delay: RwLock, /// Track peak request latency /// peak_latency is only inside an Option so that the "Default" derive works. it will always be set. pub(super) peak_latency: Option, @@ -76,7 +75,7 @@ pub struct Web3Rpc { pub(super) external_requests: AtomicUsize, /// Track time used by external requests served /// request_ms_histogram is only inside an Option so that the "Default" derive works. it will always be set. - pub(super) request_latency: Option, + pub(super) median_latency: Option, /// Track in-flight requests pub(super) active_requests: AtomicUsize, /// disconnect_watch is only inside an Option so that the "Default" derive works. it will always be set. @@ -172,7 +171,7 @@ impl Web3Rpc { Duration::from_secs(1), ); - let request_latency = RollingQuantileLatency::spawn_median(1_000).await; + let median_request_latency = RollingQuantileLatency::spawn_median(1_000).await; let http_provider = if let Some(http_url) = config.http_url { let http_url = http_url.parse::()?; @@ -208,7 +207,7 @@ impl Web3Rpc { http_provider, name, peak_latency: Some(peak_latency), - request_latency: Some(request_latency), + median_latency: Some(median_request_latency), soft_limit: config.soft_limit, ws_url, disconnect_watch: Some(disconnect_watch), @@ -266,19 +265,19 @@ impl Web3Rpc { pub fn sort_for_load_balancing_on( &self, max_block: Option, - ) -> ((bool, u32, Reverse), OrderedFloat) { + ) -> ((bool, u32, Reverse), Duration) { let sort_on = self.sort_on(max_block); - let weighted_peak_ewma_seconds = self.weighted_peak_ewma_seconds(); + let weighted_peak_latency = self.weighted_peak_latency(); - let x = (sort_on, weighted_peak_ewma_seconds); + let x = (sort_on, weighted_peak_latency); trace!("sort_for_load_balancing {}: {:?}", self, x); x } - /// like sort_for_load_balancing, but shuffles tiers randomly instead of sorting by weighted_peak_ewma_seconds + /// like sort_for_load_balancing, but shuffles tiers randomly instead of sorting by weighted_peak_latency pub fn shuffle_for_load_balancing_on( &self, max_block: Option, @@ -292,17 +291,17 @@ impl Web3Rpc { (sort_on, r) } - pub fn weighted_peak_ewma_seconds(&self) -> OrderedFloat { + pub fn weighted_peak_latency(&self) -> Duration { let peak_latency = if let Some(peak_latency) = self.peak_latency.as_ref() { - peak_latency.latency().as_secs_f64() + peak_latency.latency() } else { - 1.0 + Duration::from_secs(1) }; // TODO: what ordering? - let active_requests = self.active_requests.load(atomic::Ordering::Acquire) as f64 + 1.0; + let active_requests = self.active_requests.load(atomic::Ordering::Acquire) as f32 + 1.0; - OrderedFloat(peak_latency * active_requests) + peak_latency.mul_f32(active_requests) } // TODO: would be great if rpcs exposed this. see https://github.com/ledgerwatch/erigon/issues/6391 @@ -697,7 +696,7 @@ impl Web3Rpc { let rpc = self.clone(); // TODO: how often? different depending on the chain? - // TODO: reset this timeout when a new block is seen? we need to keep request_latency updated though + // TODO: reset this timeout when a new block is seen? we need to keep median_request_latency updated though let health_sleep_seconds = 5; // health check loop @@ -1173,29 +1172,30 @@ impl Serialize for Web3Rpc { &self.active_requests.load(atomic::Ordering::Relaxed), )?; - state.serialize_field( - "head_latency_ms", - &self.head_latency_ms.read().duration().as_millis(), - )?; - - state.serialize_field( - "request_latency_ms", - &self - .request_latency - .as_ref() - .unwrap() - .duration() - .as_millis(), - )?; - - state.serialize_field( - "peak_latency_ms", - &self.peak_latency.as_ref().unwrap().latency().as_millis(), - )?; + { + let head_delay_ms = self.head_delay.read().latency().as_secs_f32() * 1000.0; + state.serialize_field("head_delay_ms", &(head_delay_ms))?; + } { - let weighted_latency_ms = self.weighted_peak_ewma_seconds() * 1000.0; - state.serialize_field("weighted_latency_ms", weighted_latency_ms.as_ref())?; + let median_latency_ms = self + .median_latency + .as_ref() + .unwrap() + .latency() + .as_secs_f32() + * 1000.0; + state.serialize_field("median_latency_ms", &(median_latency_ms))?; + } + + { + let peak_latency_ms = + self.peak_latency.as_ref().unwrap().latency().as_secs_f32() * 1000.0; + state.serialize_field("peak_latency_ms", &peak_latency_ms)?; + } + { + let weighted_latency_ms = self.weighted_peak_latency().as_secs_f32() * 1000.0; + state.serialize_field("weighted_latency_ms", &weighted_latency_ms)?; } state.end() diff --git a/web3_proxy/src/rpcs/request.rs b/web3_proxy/src/rpcs/request.rs index 80b7a941..7ea6eaf1 100644 --- a/web3_proxy/src/rpcs/request.rs +++ b/web3_proxy/src/rpcs/request.rs @@ -383,7 +383,7 @@ impl OpenRequestHandle { self.rpc.peak_latency.as_ref().unwrap().report(latency); self.rpc - .request_latency + .median_latency .as_ref() .unwrap() .record(latency)