diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index a1df6c2a..063c2144 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -516,19 +516,14 @@ impl Web3ProxyApp { // responses can be very different in sizes, so this is a cache with a max capacity and a weigher // TODO: we should emit stats to calculate a more accurate expected cache size // TODO: do we actually want a TTL on this? - // TODO: configurable max item weight instead of using ~0.1% + // TODO: configurable max item weight // TODO: resize the cache automatically - let response_cache: JsonRpcResponseCache = + let jsonrpc_response_cache: JsonRpcResponseCache = CacheBuilder::new(top_config.app.response_cache_max_bytes) + .name("jsonrpc_response_cache") + .time_to_idle(Duration::from_secs(3600)) .weigher(json_rpc_response_weigher) .build(); - // (top_config.app.response_cache_max_bytes / 16_384) as usize, - // NonZeroU32::try_from((top_config.app.response_cache_max_bytes / 1024) as u32).unwrap(), - // top_config.app.response_cache_max_bytes, - // JsonRpcResponseWeigher, - // Duration::from_secs(3600), - // ) - // .await; // TODO: how should we handle hitting this max? let max_users = 20_000; @@ -652,7 +647,7 @@ impl Web3ProxyApp { influxdb_client, internal_provider, ip_semaphores, - jsonrpc_response_cache: response_cache, + jsonrpc_response_cache, kafka_producer, login_rate_limiter, pending_transactions, diff --git a/web3_proxy/src/bin/web3_proxy_cli/popularity_contest.rs b/web3_proxy/src/bin/web3_proxy_cli/popularity_contest.rs index 0fd08af4..dc562c51 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/popularity_contest.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/popularity_contest.rs @@ -1,13 +1,11 @@ -use std::{cmp::Reverse, collections::BTreeMap, str::FromStr}; - -// show what nodes are used most often use argh::FromArgs; use ethers::types::U64; -use log::trace; +use ordered_float::OrderedFloat; use prettytable::{row, Table}; +use std::{cmp::Reverse, str::FromStr}; #[derive(FromArgs, PartialEq, Debug)] -/// Second subcommand. +/// show what nodes are used most often #[argh(subcommand, name = "popularity_contest")] pub struct PopularityContestSubCommand { #[argh(positional)] @@ -19,14 +17,16 @@ pub struct PopularityContestSubCommand { #[derive(Debug)] struct BackendRpcData<'a> { name: &'a str, - // tier: u64, - // backup: bool, - // block_data_limit: u64, + tier: u64, + backup: bool, + block_data_limit: u64, head_block: u64, - requests: u64, + active_requests: u64, + internal_requests: u64, + external_requests: u64, head_latency_ms: f64, peak_latency_ms: f64, - peak_ewma_ms: f64, + weighted_latency_ms: f64, } impl PopularityContestSubCommand { @@ -48,10 +48,9 @@ impl PopularityContestSubCommand { .as_array() .unwrap(); - let mut by_tier = BTreeMap::>::new(); - let mut tier_requests = BTreeMap::::new(); - let mut total_requests = 0; let mut highest_block = 0; + let mut rpc_data = vec![]; + let mut total_external_requests = 0; for conn in conns { let conn = conn.as_object().unwrap(); @@ -60,19 +59,31 @@ impl PopularityContestSubCommand { .get("display_name") .unwrap_or_else(|| conn.get("name").unwrap()) .as_str() - .unwrap(); + .unwrap_or("unknown"); let tier = conn.get("tier").unwrap().as_u64().unwrap(); - // let backup = conn.get("backup").unwrap().as_bool().unwrap(); + let backup = conn.get("backup").unwrap().as_bool().unwrap(); - // let block_data_limit = conn - // .get("block_data_limit") - // .unwrap() - // .as_u64() - // .unwrap_or(u64::MAX); + let block_data_limit = conn + .get("block_data_limit") + .and_then(|x| x.as_u64()) + .unwrap_or(u64::MAX); - let requests = conn.get("total_requests").unwrap().as_u64().unwrap(); + let internal_requests = conn + .get("internal_requests") + .and_then(|x| x.as_u64()) + .unwrap_or_default(); + + let external_requests = conn + .get("external_requests") + .and_then(|x| x.as_u64()) + .unwrap_or_default(); + + let active_requests = conn + .get("active_requests") + .and_then(|x| x.as_u64()) + .unwrap_or_default(); let head_block = conn .get("head_block") @@ -88,85 +99,90 @@ impl PopularityContestSubCommand { let peak_latency_ms = conn .get("peak_latency_ms") - .unwrap_or(&serde_json::Value::Null) - .as_f64() + .and_then(|x| x.as_f64()) .unwrap_or_default(); - let peak_ewma_ms = conn - .get("peak_ewma_s") - .unwrap_or(&serde_json::Value::Null) - .as_f64() - .unwrap_or_default() - * 1000.0; + let weighted_latency_ms = conn + .get("weighted_latency_ms") + .and_then(|x| x.as_f64()) + .unwrap_or_default(); - let rpc_data = BackendRpcData { + let x = BackendRpcData { name, - // tier, - // backup, - // block_data_limit, - requests, + tier, + backup, + block_data_limit, + active_requests, + internal_requests, + external_requests, head_block, head_latency_ms, peak_latency_ms, - peak_ewma_ms, + weighted_latency_ms, }; - total_requests += rpc_data.requests; + total_external_requests += x.external_requests; - *tier_requests.entry(tier).or_default() += rpc_data.requests; - - by_tier.entry(tier).or_default().push(rpc_data); + rpc_data.push(x); } - trace!("tier_requests: {:#?}", tier_requests); - trace!("by_tier: {:#?}", by_tier); + rpc_data.sort_by_key(|x| { + ( + Reverse(x.external_requests), + OrderedFloat(x.weighted_latency_ms), + ) + }); let mut table = Table::new(); table.add_row(row![ "name", + "external %", + "external", + "internal", + "active", + "lag", + "head_ms", + "peak_ms", + "weighted_ms", "tier", - "rpc_requests", - "tier_request_pct", - "total_pct", - "head_lag", - "head_latency_ms", - "peak_latency_ms", - "peak_ewma_ms", + "block_data_limit", ]); - let total_requests = total_requests as f32; + for rpc in rpc_data.into_iter() { + let external_request_pct = if total_external_requests == 0 { + 0.0 + } else { + (rpc.external_requests as f32) / (total_external_requests as f32) * 100.0 + }; - for (tier, rpcs) in by_tier.iter_mut() { - let t = (*tier_requests.get(tier).unwrap()) as f32; + let block_data_limit = if rpc.block_data_limit == u64::MAX { + "archive".to_string() + } else { + format!("{}", rpc.block_data_limit) + }; - rpcs.sort_by_cached_key(|x| Reverse(x.requests)); + let tier = if rpc.backup { + format!("{}B", rpc.tier) + } else { + rpc.tier.to_string() + }; - for rpc in rpcs.iter() { - let tier_request_pct = if t == 0.0 { - 0.0 - } else { - (rpc.requests as f32) / t * 100.0 - }; + let lag = highest_block - rpc.head_block; - let total_request_pct = if total_requests == 0.0 { - 0.0 - } else { - (rpc.requests as f32) / total_requests * 100.0 - }; - - table.add_row(row![ - rpc.name, - tier, - rpc.requests, - tier_request_pct, - total_request_pct, - highest_block - rpc.head_block, - format!("{:.3}", rpc.head_latency_ms), - format!("{:.3}", rpc.peak_latency_ms), - format!("{:.3}", rpc.peak_ewma_ms), - ]); - } + table.add_row(row![ + rpc.name, + external_request_pct, + rpc.external_requests, + rpc.internal_requests, + rpc.active_requests, + lag, + format!("{:.3}", rpc.head_latency_ms), + format!("{:.3}", rpc.peak_latency_ms), + format!("{:.3}", rpc.weighted_latency_ms), + tier, + block_data_limit, + ]); } table.printstd(); diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index 4af35499..f33710ee 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -71,8 +71,10 @@ pub struct Web3Rpc { /// Automatically set priority pub(super) tier: AtomicU32, /// Track total requests served - /// TODO: maybe move this to graphana - pub(super) total_requests: AtomicUsize, + pub(super) internal_requests: AtomicUsize, + /// Track total requests served + pub(super) external_requests: AtomicUsize, + /// Track in-flight requests pub(super) active_requests: AtomicUsize, /// disconnect_watch is only inside an Option so that the "Default" derive works. it will always be set. pub(super) disconnect_watch: Option>, @@ -689,7 +691,8 @@ impl Web3Rpc { // errors here should not cause the loop to exit! while !(*subscribe_stop_rx.borrow()) { - new_total_requests = rpc.total_requests.load(atomic::Ordering::Relaxed); + new_total_requests = rpc.internal_requests.load(atomic::Ordering::Relaxed) + + rpc.external_requests.load(atomic::Ordering::Relaxed); if new_total_requests - old_total_requests < 5 { // TODO: if this fails too many times, reset the connection @@ -1091,7 +1094,7 @@ impl Serialize for Web3Rpc { S: Serializer, { // 3 is the number of fields in the struct. - let mut state = serializer.serialize_struct("Web3Rpc", 12)?; + let mut state = serializer.serialize_struct("Web3Rpc", 13)?; // the url is excluded because it likely includes private information. just show the name that we use in keys state.serialize_field("name", &self.name)?; @@ -1122,8 +1125,13 @@ impl Serialize for Web3Rpc { } state.serialize_field( - "total_requests", - &self.total_requests.load(atomic::Ordering::Acquire), + "external_requests", + &self.external_requests.load(atomic::Ordering::Relaxed), + )?; + + state.serialize_field( + "internal_requests", + &self.internal_requests.load(atomic::Ordering::Relaxed), )?; state.serialize_field( @@ -1138,7 +1146,10 @@ impl Serialize for Web3Rpc { &self.peak_latency.as_ref().unwrap().latency().as_millis(), )?; - state.serialize_field("peak_ewma_s", self.weighted_peak_ewma_seconds().as_ref())?; + { + let weighted_latency_ms = self.weighted_peak_ewma_seconds() * 1000.0; + state.serialize_field("weighted_latency_ms", weighted_latency_ms.as_ref())?; + } state.end() } diff --git a/web3_proxy/src/rpcs/request.rs b/web3_proxy/src/rpcs/request.rs index 89c24727..25c75e2a 100644 --- a/web3_proxy/src/rpcs/request.rs +++ b/web3_proxy/src/rpcs/request.rs @@ -1,6 +1,6 @@ use super::one::Web3Rpc; use crate::errors::Web3ProxyResult; -use crate::frontend::authorization::Authorization; +use crate::frontend::authorization::{Authorization, AuthorizationType}; use crate::jsonrpc::{JsonRpcParams, JsonRpcResultData}; use anyhow::Context; use chrono::Utc; @@ -181,9 +181,18 @@ impl OpenRequestHandle { // trace!(rpc=%self.rpc, %method, "request"); trace!("requesting from {}", self.rpc); - self.rpc - .total_requests - .fetch_add(1, std::sync::atomic::Ordering::AcqRel); + match self.authorization.authorization_type { + AuthorizationType::Frontend => { + self.rpc + .external_requests + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); + } + AuthorizationType::Internal => { + self.rpc + .internal_requests + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); + } + } // we used to fetch_add the active_request count here, but sometimes a request is made without going through this function (like with subscriptions) @@ -231,7 +240,8 @@ impl OpenRequestHandle { } else if log_revert_chance == u16::MAX { // trace!(%method, "gaurenteed chance. SAVING on revert"); self.error_handler - } else if nanorand::tls_rng().generate_range(0u16..u16::MAX) < log_revert_chance { + } else if nanorand::tls_rng().generate_range(0u16..u16::MAX) < log_revert_chance + { // trace!(%method, "missed chance. skipping save on revert"); RequestErrorHandler::TraceLevel } else {