From 552f3dbffc469babc1153623af672e4e4cc79db8 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Tue, 11 Oct 2022 19:58:25 +0000 Subject: [PATCH] proper sizes for caches and emit all stats --- Cargo.lock | 4 +- TODO.md | 1 + web3_proxy/Cargo.toml | 2 +- web3_proxy/src/app.rs | 57 ++++++++++++------ web3_proxy/src/frontend/authorization.rs | 31 ++++++---- web3_proxy/src/jsonrpc.rs | 2 +- web3_proxy/src/rpcs/blockchain.rs | 6 +- web3_proxy/src/rpcs/connections.rs | 19 +++++- web3_proxy/src/stats.rs | 77 ++++++++++++++++-------- 9 files changed, 137 insertions(+), 62 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e63e8869..871a5bb9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4967,9 +4967,9 @@ dependencies = [ [[package]] name = "tokio-stream" -version = "0.1.10" +version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f6edf2d6bc038a43d31353570e27270603f4648d18f5ed10c0e179abe43255af" +checksum = "d660770404473ccd7bc9f8b28494a811bc18542b915c0855c51e8f419d5223ce" dependencies = [ "futures-core", "pin-project-lite", diff --git a/TODO.md b/TODO.md index 677da5ba..e2445e20 100644 --- a/TODO.md +++ b/TODO.md @@ -422,3 +422,4 @@ in another repo: event subscriber - [ ] hit counts seem wrong. how are we hitting the backend so much more than the frontend? retries on disconnect don't seem to fit that web3_proxy_hit_count{path = "app/proxy_web3_rpc_request"} 857270 web3_proxy_hit_count{path = "backend_rpc/request"} 1396127 +- [ ] replace serde_json::Value with https://lib.rs/crates/ijson (more memory efficient) diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index 1d2a89a5..8888986e 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -62,7 +62,7 @@ serde_prometheus = "0.1.6" time = "0.3.15" tokio = { version = "1.21.2", features = ["full", "tracing"] } # TODO: make sure this uuid version matches sea-orm. PR to put this in their prelude -tokio-stream = { version = "0.1.10", features = ["sync"] } +tokio-stream = { version = "0.1.11", features = ["sync"] } toml = "0.5.9" tower = "0.4.13" # TODO: i don't think we need this. we can use it from tower-http instead. though this seems to use ulid and not uuid? diff --git a/web3_proxy/src/app.rs b/web3_proxy/src/app.rs index 72a7071d..62fc156d 100644 --- a/web3_proxy/src/app.rs +++ b/web3_proxy/src/app.rs @@ -13,7 +13,6 @@ use crate::rpcs::request::OpenRequestHandleMetrics; use crate::rpcs::transactions::TxStatus; use crate::stats::{ProxyResponseStat, StatEmitter, Web3ProxyStat}; use anyhow::Context; -use atomic::{AtomicBool, Ordering}; use axum::extract::ws::Message; use axum::headers::{Referer, UserAgent}; use deferred_rate_limiter::DeferredRateLimiter; @@ -36,7 +35,6 @@ use sea_orm::DatabaseConnection; use serde::Serialize; use serde_json::json; use std::fmt; -use std::mem::size_of_val; use std::net::IpAddr; use std::pin::Pin; use std::str::FromStr; @@ -297,7 +295,7 @@ impl Web3ProxyApp { // TODO: capacity from configs // all these are the same size, so no need for a weigher - // TODO: ttl on this? + // TODO: ttl on this? or is max_capacity fine? let pending_transactions = Cache::builder() .max_capacity(10_000) .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::new()); @@ -305,9 +303,13 @@ impl Web3ProxyApp { // keep 1GB of blocks in the cache // TODO: limits from config // these blocks don't have full transactions, but they do have rather variable amounts of transaction hashes + // TODO: how can we do the weigher better? this is going to be slow! let block_map = Cache::builder() .max_capacity(1024 * 1024 * 1024) - .weigher(|_k, v| size_of_val(v) as u32) + .weigher(|_k, v: &Arc>| { + // TODO: is this good enough? + v.transactions.len().try_into().unwrap_or(u32::MAX) + }) .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::new()); // connect to the load balanced rpcs @@ -405,7 +407,24 @@ impl Web3ProxyApp { // TODO: don't allow any response to be bigger than X% of the cache let response_cache = Cache::builder() .max_capacity(1024 * 1024 * 1024) - .weigher(|k, v| (size_of_val(k) + size_of_val(v)) as u32) + .weigher(|k: &(H256, String, Option), v| { + // TODO: make this weigher past. serializing json is not fast + let mut size = (k.1).len(); + + if let Some(params) = &k.2 { + size += params.len() + } + + if let Ok(v) = serde_json::to_string(v) { + size += v.len(); + + // the or in unwrap_or is probably never called + size.try_into().unwrap_or(u32::MAX) + } else { + // this seems impossible + u32::MAX + } + }) .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::new()); // all the users are the same size, so no need for a weigher @@ -763,7 +782,8 @@ impl Web3ProxyApp { ) -> anyhow::Result { trace!("Received request: {:?}", request); - let request_metadata = RequestMetadata::new(&request); + // TODO: allow customizing the period? + let request_metadata = Arc::new(RequestMetadata::new(60, &request)?); // save the id so we can attach it to the response // TODO: instead of cloning, take the id out @@ -894,7 +914,12 @@ impl Web3ProxyApp { let rpcs = self.private_rpcs.as_ref().unwrap_or(&self.balanced_rpcs); return rpcs - .try_send_all_upstream_servers(Some(&authorized_request), request, None) + .try_send_all_upstream_servers( + Some(&authorized_request), + request, + Some(request_metadata), + None, + ) .await; } "eth_syncing" => { @@ -982,17 +1007,13 @@ impl Web3ProxyApp { request.params.clone().map(|x| x.to_string()), ); - let cache_hit = Arc::new(AtomicBool::new(true)); - let mut response = { - let cache_hit = cache_hit.clone(); + let request_metadata = request_metadata.clone(); let authorized_request = authorized_request.clone(); self.response_cache .try_get_with(cache_key, async move { - cache_hit.store(false, Ordering::Release); - // TODO: retry some failures automatically! // TODO: try private_rpcs if all the balanced_rpcs fail! // TODO: put the hash here instead? @@ -1001,6 +1022,7 @@ impl Web3ProxyApp { .try_send_best_upstream_server( Some(&authorized_request), request, + Some(&request_metadata), Some(&request_block_id.num), ) .await?; @@ -1019,6 +1041,11 @@ impl Web3ProxyApp { .context("caching response")? }; + // since this data came likely out of a cache, the id is not going to match + // replace the id with our request's id. + // TODO: cache without the id + response.id = request_id; + if let (Some(stat_sender), Ok(AuthorizedRequest::User(Some(_), authorized_key))) = ( self.stat_sender.as_ref(), Arc::try_unwrap(authorized_request), @@ -1027,16 +1054,12 @@ impl Web3ProxyApp { method.to_string(), authorized_key, request_metadata, + &response, ); stat_sender.send_async(response_stat.into()).await?; } - // since this data came likely out of a cache, the id is not going to match - // replace the id with our request's id. - // TODO: cache without the id - response.id = request_id; - return Ok(response); } }; diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index 3b43cb92..e5f78612 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -12,7 +12,7 @@ use redis_rate_limiter::RedisRateLimitResult; use sea_orm::{prelude::Decimal, ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter}; use serde::Serialize; use std::fmt::Display; -use std::mem::size_of_val; +use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64}; use std::{net::IpAddr, str::FromStr, sync::Arc}; use tokio::sync::{OwnedSemaphorePermit, Semaphore}; use tokio::time::Instant; @@ -53,12 +53,14 @@ pub struct AuthorizedKey { #[derive(Debug, Default, Serialize)] pub struct RequestMetadata { - pub timestamp: u64, + pub datetime: chrono::DateTime, + pub period_seconds: u64, pub request_bytes: u64, - pub backend_requests: u32, - pub error_response: bool, - pub response_bytes: u64, - pub response_millis: u64, + /// if this is 0, there was a cache_hit + pub backend_requests: AtomicU32, + pub error_response: AtomicBool, + pub response_bytes: AtomicU64, + pub response_millis: AtomicU64, } #[derive(Clone, Debug, Serialize)] @@ -72,14 +74,21 @@ pub enum AuthorizedRequest { } impl RequestMetadata { - pub fn new(request: &JsonRpcRequest) -> Self { - let request_bytes = size_of_val(request) as u64; + pub fn new(period_seconds: u64, request: &JsonRpcRequest) -> anyhow::Result { + // TODO: how can we do this without turning it into a string first. this is going to slow us down! + let request_bytes = serde_json::to_string(request) + .context("finding request size")? + .len() + .try_into()?; - Self { + let new = Self { + period_seconds, request_bytes, - timestamp: Utc::now().timestamp() as u64, + datetime: Utc::now(), ..Default::default() - } + }; + + Ok(new) } } diff --git a/web3_proxy/src/jsonrpc.rs b/web3_proxy/src/jsonrpc.rs index d3b802bf..5cd2706a 100644 --- a/web3_proxy/src/jsonrpc.rs +++ b/web3_proxy/src/jsonrpc.rs @@ -11,7 +11,7 @@ fn default_jsonrpc() -> String { "2.0".to_string() } -#[derive(Clone, Deserialize)] +#[derive(Clone, Deserialize, Serialize)] pub struct JsonRpcRequest { // TODO: skip jsonrpc entirely? its against spec to drop it, but some servers bad #[serde(default = "default_jsonrpc")] diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index ef6ed6ac..ed1a45be 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -116,8 +116,9 @@ impl Web3Connections { let request = json!({ "id": "1", "method": "eth_getBlockByHash", "params": get_block_params }); let request: JsonRpcRequest = serde_json::from_value(request)?; + // TODO: request_metadata? maybe we should put it in the authorized_request? let response = self - .try_send_best_upstream_server(authorized_request, request, None) + .try_send_best_upstream_server(authorized_request, request, None, None) .await?; let block = response.result.unwrap(); @@ -176,8 +177,9 @@ impl Web3Connections { let request: JsonRpcRequest = serde_json::from_value(request)?; // TODO: if error, retry? + // TODO: request_metadata or authorized_request? let response = self - .try_send_best_upstream_server(None, request, Some(num)) + .try_send_best_upstream_server(None, request, None, Some(num)) .await?; let raw_block = response.result.context("no block result")?; diff --git a/web3_proxy/src/rpcs/connections.rs b/web3_proxy/src/rpcs/connections.rs index 601e6d85..769b726a 100644 --- a/web3_proxy/src/rpcs/connections.rs +++ b/web3_proxy/src/rpcs/connections.rs @@ -7,7 +7,7 @@ use super::request::{ use super::synced_connections::SyncedConnections; use crate::app::{flatten_handle, AnyhowJoinHandle}; use crate::config::{BlockAndRpc, TxHashAndRpc, Web3ConnectionConfig}; -use crate::frontend::authorization::AuthorizedRequest; +use crate::frontend::authorization::{AuthorizedRequest, RequestMetadata}; use crate::jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest}; use crate::rpcs::transactions::TxStatus; use arc_swap::ArcSwap; @@ -27,6 +27,7 @@ use serde_json::value::RawValue; use std::cmp; use std::cmp::Reverse; use std::fmt; +use std::sync::atomic::Ordering; use std::sync::Arc; use tokio::sync::RwLock as AsyncRwLock; use tokio::sync::{broadcast, watch}; @@ -501,11 +502,12 @@ impl Web3Connections { &self, authorized_request: Option<&Arc>, request: JsonRpcRequest, + request_metadata: Option<&Arc>, min_block_needed: Option<&U64>, ) -> anyhow::Result { let mut skip_rpcs = vec![]; - // TODO: maximum retries? + // TODO: maximum retries? right now its the total number of servers loop { if skip_rpcs.len() == self.conns.len() { break; @@ -518,6 +520,12 @@ impl Web3Connections { // save the rpc in case we get an error and want to retry on another server skip_rpcs.push(active_request_handle.clone_connection()); + if let Some(request_metadata) = request_metadata { + request_metadata + .backend_requests + .fetch_add(1, Ordering::Acquire); + } + // TODO: get the log percent from the user data let response_result = active_request_handle .request( @@ -535,6 +543,12 @@ impl Web3Connections { if let Some(error) = &response.error { trace!(?response, "rpc error"); + if let Some(request_metadata) = request_metadata { + request_metadata + .error_response + .store(true, Ordering::Release); + } + // some errors should be retried on other nodes if error.code == -32000 { let error_msg = error.message.as_str(); @@ -603,6 +617,7 @@ impl Web3Connections { &self, authorized_request: Option<&Arc>, request: JsonRpcRequest, + request_metadata: Option>, block_needed: Option<&U64>, ) -> anyhow::Result { loop { diff --git a/web3_proxy/src/stats.rs b/web3_proxy/src/stats.rs index 0255391b..d2611efa 100644 --- a/web3_proxy/src/stats.rs +++ b/web3_proxy/src/stats.rs @@ -1,4 +1,5 @@ use crate::frontend::authorization::{AuthorizedKey, RequestMetadata}; +use crate::jsonrpc::JsonRpcForwardedResponse; use anyhow::Context; use chrono::{TimeZone, Utc}; use derive_more::From; @@ -19,7 +20,14 @@ use tracing::{error, info, trace}; pub struct ProxyResponseStat { user_key_id: u64, method: String, - metadata: AsyncMutex, + period_seconds: u64, + period_timestamp: u64, + request_bytes: u64, + /// if this is 0, there was a cache_hit + backend_requests: u32, + error_response: bool, + response_bytes: u64, + response_millis: u64, } pub type TimeBucketTimestamp = u64; @@ -98,13 +106,38 @@ pub enum Web3ProxyStat { impl ProxyResponseStat { // TODO: should RequestMetadata be in an arc? or can we handle refs here? - pub fn new(method: String, authorized_key: AuthorizedKey, metadata: RequestMetadata) -> Self { - let metadata = AsyncMutex::new(metadata); + pub fn new( + method: String, + authorized_key: AuthorizedKey, + metadata: Arc, + response: &JsonRpcForwardedResponse, + ) -> Self { + // TODO: do this without serializing to a string. this is going to slow us down! + let response_bytes = serde_json::to_string(response) + .expect("serializing here should always work") + .len() as u64; + + let backend_requests = metadata.backend_requests.load(Ordering::Acquire); + let period_seconds = metadata.period_seconds; + let period_timestamp = + (metadata.datetime.timestamp() as u64) / period_seconds * period_seconds; + let request_bytes = metadata.request_bytes; + let response_millis = metadata + .datetime + .signed_duration_since(Utc::now()) + .num_seconds() as u64; + let error_response = metadata.error_response.load(Ordering::Acquire); Self { user_key_id: authorized_key.user_key_id, method, - metadata, + backend_requests, + period_seconds, + period_timestamp, + request_bytes, + error_response, + response_bytes, + response_millis, } } } @@ -188,8 +221,7 @@ impl StatEmitter { while let Ok(x) = self.save_rx.recv_async().await { // TODO: batch these for (k, v) in x.into_iter() { - info!(?k, "saving"); - + // TODO: this is a lot of variables let period_datetime = Utc.timestamp(v.period_timestamp as i64, 0); let frontend_requests = v.frontend_requests.load(Ordering::Acquire); let backend_requests = v.backend_requests.load(Ordering::Acquire); @@ -289,31 +321,28 @@ impl StatEmitter { pub async fn aggregate_stat(&self, stat: Web3ProxyStat) -> anyhow::Result<()> { trace!(?stat, "aggregating"); match stat { - Web3ProxyStat::ProxyResponse(x) => { + Web3ProxyStat::ProxyResponse(stat) => { // TODO: move this whole closure to another function? - let metadata = x.metadata.lock().await; - // TODO: move period calculation into another function? - let period_timestamp = - metadata.timestamp / self.period_seconds * self.period_seconds; + debug_assert_eq!(stat.period_seconds, self.period_seconds); // get the user cache for the current period let user_cache = self .aggregated_proxy_responses - .get_with(period_timestamp, async move { + .get_with(stat.period_timestamp, async move { CacheBuilder::default() .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::new()) }) .await; - let key = (x.user_key_id, x.method, metadata.error_response).into(); + let key = (stat.user_key_id, stat.method, stat.error_response).into(); let user_aggregate = user_cache .get_with(key, async move { let histograms = ProxyResponseHistograms::default(); let aggregate = ProxyResponseAggregate { - period_timestamp, + period_timestamp: stat.period_timestamp, // start most things at 0 because we add outside this getter frontend_requests: 0.into(), backend_requests: 0.into(), @@ -338,31 +367,27 @@ impl StatEmitter { // a stat might have multiple backend requests user_aggregate .backend_requests - .fetch_add(metadata.backend_requests, Ordering::Acquire); + .fetch_add(stat.backend_requests, Ordering::Acquire); user_aggregate .sum_request_bytes - .fetch_add(metadata.request_bytes, Ordering::Release); + .fetch_add(stat.request_bytes, Ordering::Release); user_aggregate .sum_response_bytes - .fetch_add(metadata.response_bytes, Ordering::Release); + .fetch_add(stat.response_bytes, Ordering::Release); user_aggregate .sum_response_millis - .fetch_add(metadata.response_millis, Ordering::Release); + .fetch_add(stat.response_millis, Ordering::Release); { let mut histograms = user_aggregate.histograms.lock().await; - // TODO: record_correct? - histograms.request_bytes.record(metadata.request_bytes)?; - - histograms.response_bytes.record(metadata.response_bytes)?; - - histograms - .response_millis - .record(metadata.response_millis)?; + // TODO: use `record_correct`? + histograms.request_bytes.record(stat.request_bytes)?; + histograms.response_bytes.record(stat.response_bytes)?; + histograms.response_millis.record(stat.response_millis)?; } } }