improvements to popularity_contesst

This commit is contained in:
Bryan Stitt 2023-06-13 11:51:19 -07:00
parent f5a1ac274a
commit b8f429d70a
4 changed files with 130 additions and 98 deletions

@ -516,19 +516,14 @@ impl Web3ProxyApp {
// responses can be very different in sizes, so this is a cache with a max capacity and a weigher
// TODO: we should emit stats to calculate a more accurate expected cache size
// TODO: do we actually want a TTL on this?
// TODO: configurable max item weight instead of using ~0.1%
// TODO: configurable max item weight
// TODO: resize the cache automatically
let response_cache: JsonRpcResponseCache =
let jsonrpc_response_cache: JsonRpcResponseCache =
CacheBuilder::new(top_config.app.response_cache_max_bytes)
.name("jsonrpc_response_cache")
.time_to_idle(Duration::from_secs(3600))
.weigher(json_rpc_response_weigher)
.build();
// (top_config.app.response_cache_max_bytes / 16_384) as usize,
// NonZeroU32::try_from((top_config.app.response_cache_max_bytes / 1024) as u32).unwrap(),
// top_config.app.response_cache_max_bytes,
// JsonRpcResponseWeigher,
// Duration::from_secs(3600),
// )
// .await;
// TODO: how should we handle hitting this max?
let max_users = 20_000;
@ -652,7 +647,7 @@ impl Web3ProxyApp {
influxdb_client,
internal_provider,
ip_semaphores,
jsonrpc_response_cache: response_cache,
jsonrpc_response_cache,
kafka_producer,
login_rate_limiter,
pending_transactions,

@ -1,13 +1,11 @@
use std::{cmp::Reverse, collections::BTreeMap, str::FromStr};
// show what nodes are used most often
use argh::FromArgs;
use ethers::types::U64;
use log::trace;
use ordered_float::OrderedFloat;
use prettytable::{row, Table};
use std::{cmp::Reverse, str::FromStr};
#[derive(FromArgs, PartialEq, Debug)]
/// Second subcommand.
/// show what nodes are used most often
#[argh(subcommand, name = "popularity_contest")]
pub struct PopularityContestSubCommand {
#[argh(positional)]
@ -19,14 +17,16 @@ pub struct PopularityContestSubCommand {
#[derive(Debug)]
struct BackendRpcData<'a> {
name: &'a str,
// tier: u64,
// backup: bool,
// block_data_limit: u64,
tier: u64,
backup: bool,
block_data_limit: u64,
head_block: u64,
requests: u64,
active_requests: u64,
internal_requests: u64,
external_requests: u64,
head_latency_ms: f64,
peak_latency_ms: f64,
peak_ewma_ms: f64,
weighted_latency_ms: f64,
}
impl PopularityContestSubCommand {
@ -48,10 +48,9 @@ impl PopularityContestSubCommand {
.as_array()
.unwrap();
let mut by_tier = BTreeMap::<u64, Vec<_>>::new();
let mut tier_requests = BTreeMap::<u64, u64>::new();
let mut total_requests = 0;
let mut highest_block = 0;
let mut rpc_data = vec![];
let mut total_external_requests = 0;
for conn in conns {
let conn = conn.as_object().unwrap();
@ -60,19 +59,31 @@ impl PopularityContestSubCommand {
.get("display_name")
.unwrap_or_else(|| conn.get("name").unwrap())
.as_str()
.unwrap();
.unwrap_or("unknown");
let tier = conn.get("tier").unwrap().as_u64().unwrap();
// let backup = conn.get("backup").unwrap().as_bool().unwrap();
let backup = conn.get("backup").unwrap().as_bool().unwrap();
// let block_data_limit = conn
// .get("block_data_limit")
// .unwrap()
// .as_u64()
// .unwrap_or(u64::MAX);
let block_data_limit = conn
.get("block_data_limit")
.and_then(|x| x.as_u64())
.unwrap_or(u64::MAX);
let requests = conn.get("total_requests").unwrap().as_u64().unwrap();
let internal_requests = conn
.get("internal_requests")
.and_then(|x| x.as_u64())
.unwrap_or_default();
let external_requests = conn
.get("external_requests")
.and_then(|x| x.as_u64())
.unwrap_or_default();
let active_requests = conn
.get("active_requests")
.and_then(|x| x.as_u64())
.unwrap_or_default();
let head_block = conn
.get("head_block")
@ -88,86 +99,91 @@ impl PopularityContestSubCommand {
let peak_latency_ms = conn
.get("peak_latency_ms")
.unwrap_or(&serde_json::Value::Null)
.as_f64()
.and_then(|x| x.as_f64())
.unwrap_or_default();
let peak_ewma_ms = conn
.get("peak_ewma_s")
.unwrap_or(&serde_json::Value::Null)
.as_f64()
.unwrap_or_default()
* 1000.0;
let weighted_latency_ms = conn
.get("weighted_latency_ms")
.and_then(|x| x.as_f64())
.unwrap_or_default();
let rpc_data = BackendRpcData {
let x = BackendRpcData {
name,
// tier,
// backup,
// block_data_limit,
requests,
tier,
backup,
block_data_limit,
active_requests,
internal_requests,
external_requests,
head_block,
head_latency_ms,
peak_latency_ms,
peak_ewma_ms,
weighted_latency_ms,
};
total_requests += rpc_data.requests;
total_external_requests += x.external_requests;
*tier_requests.entry(tier).or_default() += rpc_data.requests;
by_tier.entry(tier).or_default().push(rpc_data);
rpc_data.push(x);
}
trace!("tier_requests: {:#?}", tier_requests);
trace!("by_tier: {:#?}", by_tier);
rpc_data.sort_by_key(|x| {
(
Reverse(x.external_requests),
OrderedFloat(x.weighted_latency_ms),
)
});
let mut table = Table::new();
table.add_row(row![
"name",
"external %",
"external",
"internal",
"active",
"lag",
"head_ms",
"peak_ms",
"weighted_ms",
"tier",
"rpc_requests",
"tier_request_pct",
"total_pct",
"head_lag",
"head_latency_ms",
"peak_latency_ms",
"peak_ewma_ms",
"block_data_limit",
]);
let total_requests = total_requests as f32;
for (tier, rpcs) in by_tier.iter_mut() {
let t = (*tier_requests.get(tier).unwrap()) as f32;
rpcs.sort_by_cached_key(|x| Reverse(x.requests));
for rpc in rpcs.iter() {
let tier_request_pct = if t == 0.0 {
for rpc in rpc_data.into_iter() {
let external_request_pct = if total_external_requests == 0 {
0.0
} else {
(rpc.requests as f32) / t * 100.0
(rpc.external_requests as f32) / (total_external_requests as f32) * 100.0
};
let total_request_pct = if total_requests == 0.0 {
0.0
let block_data_limit = if rpc.block_data_limit == u64::MAX {
"archive".to_string()
} else {
(rpc.requests as f32) / total_requests * 100.0
format!("{}", rpc.block_data_limit)
};
let tier = if rpc.backup {
format!("{}B", rpc.tier)
} else {
rpc.tier.to_string()
};
let lag = highest_block - rpc.head_block;
table.add_row(row![
rpc.name,
tier,
rpc.requests,
tier_request_pct,
total_request_pct,
highest_block - rpc.head_block,
external_request_pct,
rpc.external_requests,
rpc.internal_requests,
rpc.active_requests,
lag,
format!("{:.3}", rpc.head_latency_ms),
format!("{:.3}", rpc.peak_latency_ms),
format!("{:.3}", rpc.peak_ewma_ms),
format!("{:.3}", rpc.weighted_latency_ms),
tier,
block_data_limit,
]);
}
}
table.printstd();

@ -71,8 +71,10 @@ pub struct Web3Rpc {
/// Automatically set priority
pub(super) tier: AtomicU32,
/// Track total requests served
/// TODO: maybe move this to graphana
pub(super) total_requests: AtomicUsize,
pub(super) internal_requests: AtomicUsize,
/// Track total requests served
pub(super) external_requests: AtomicUsize,
/// Track in-flight requests
pub(super) active_requests: AtomicUsize,
/// disconnect_watch is only inside an Option so that the "Default" derive works. it will always be set.
pub(super) disconnect_watch: Option<watch::Sender<bool>>,
@ -689,7 +691,8 @@ impl Web3Rpc {
// errors here should not cause the loop to exit!
while !(*subscribe_stop_rx.borrow()) {
new_total_requests = rpc.total_requests.load(atomic::Ordering::Relaxed);
new_total_requests = rpc.internal_requests.load(atomic::Ordering::Relaxed)
+ rpc.external_requests.load(atomic::Ordering::Relaxed);
if new_total_requests - old_total_requests < 5 {
// TODO: if this fails too many times, reset the connection
@ -1091,7 +1094,7 @@ impl Serialize for Web3Rpc {
S: Serializer,
{
// 3 is the number of fields in the struct.
let mut state = serializer.serialize_struct("Web3Rpc", 12)?;
let mut state = serializer.serialize_struct("Web3Rpc", 13)?;
// the url is excluded because it likely includes private information. just show the name that we use in keys
state.serialize_field("name", &self.name)?;
@ -1122,8 +1125,13 @@ impl Serialize for Web3Rpc {
}
state.serialize_field(
"total_requests",
&self.total_requests.load(atomic::Ordering::Acquire),
"external_requests",
&self.external_requests.load(atomic::Ordering::Relaxed),
)?;
state.serialize_field(
"internal_requests",
&self.internal_requests.load(atomic::Ordering::Relaxed),
)?;
state.serialize_field(
@ -1138,7 +1146,10 @@ impl Serialize for Web3Rpc {
&self.peak_latency.as_ref().unwrap().latency().as_millis(),
)?;
state.serialize_field("peak_ewma_s", self.weighted_peak_ewma_seconds().as_ref())?;
{
let weighted_latency_ms = self.weighted_peak_ewma_seconds() * 1000.0;
state.serialize_field("weighted_latency_ms", weighted_latency_ms.as_ref())?;
}
state.end()
}

@ -1,6 +1,6 @@
use super::one::Web3Rpc;
use crate::errors::Web3ProxyResult;
use crate::frontend::authorization::Authorization;
use crate::frontend::authorization::{Authorization, AuthorizationType};
use crate::jsonrpc::{JsonRpcParams, JsonRpcResultData};
use anyhow::Context;
use chrono::Utc;
@ -181,9 +181,18 @@ impl OpenRequestHandle {
// trace!(rpc=%self.rpc, %method, "request");
trace!("requesting from {}", self.rpc);
match self.authorization.authorization_type {
AuthorizationType::Frontend => {
self.rpc
.total_requests
.fetch_add(1, std::sync::atomic::Ordering::AcqRel);
.external_requests
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
AuthorizationType::Internal => {
self.rpc
.internal_requests
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
}
// we used to fetch_add the active_request count here, but sometimes a request is made without going through this function (like with subscriptions)
@ -231,7 +240,8 @@ impl OpenRequestHandle {
} else if log_revert_chance == u16::MAX {
// trace!(%method, "gaurenteed chance. SAVING on revert");
self.error_handler
} else if nanorand::tls_rng().generate_range(0u16..u16::MAX) < log_revert_chance {
} else if nanorand::tls_rng().generate_range(0u16..u16::MAX) < log_revert_chance
{
// trace!(%method, "missed chance. skipping save on revert");
RequestErrorHandler::TraceLevel
} else {