proper sizes for caches and emit all stats

This commit is contained in:
Bryan Stitt 2022-10-11 19:58:25 +00:00
parent 8f3d31869f
commit 552f3dbffc
9 changed files with 137 additions and 62 deletions

4
Cargo.lock generated

@ -4967,9 +4967,9 @@ dependencies = [
[[package]]
name = "tokio-stream"
version = "0.1.10"
version = "0.1.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f6edf2d6bc038a43d31353570e27270603f4648d18f5ed10c0e179abe43255af"
checksum = "d660770404473ccd7bc9f8b28494a811bc18542b915c0855c51e8f419d5223ce"
dependencies = [
"futures-core",
"pin-project-lite",

@ -422,3 +422,4 @@ in another repo: event subscriber
- [ ] hit counts seem wrong. how are we hitting the backend so much more than the frontend? retries on disconnect don't seem to fit that
web3_proxy_hit_count{path = "app/proxy_web3_rpc_request"} 857270
web3_proxy_hit_count{path = "backend_rpc/request"} 1396127
- [ ] replace serde_json::Value with https://lib.rs/crates/ijson (more memory efficient)

@ -62,7 +62,7 @@ serde_prometheus = "0.1.6"
time = "0.3.15"
tokio = { version = "1.21.2", features = ["full", "tracing"] }
# TODO: make sure this uuid version matches sea-orm. PR to put this in their prelude
tokio-stream = { version = "0.1.10", features = ["sync"] }
tokio-stream = { version = "0.1.11", features = ["sync"] }
toml = "0.5.9"
tower = "0.4.13"
# TODO: i don't think we need this. we can use it from tower-http instead. though this seems to use ulid and not uuid?

@ -13,7 +13,6 @@ use crate::rpcs::request::OpenRequestHandleMetrics;
use crate::rpcs::transactions::TxStatus;
use crate::stats::{ProxyResponseStat, StatEmitter, Web3ProxyStat};
use anyhow::Context;
use atomic::{AtomicBool, Ordering};
use axum::extract::ws::Message;
use axum::headers::{Referer, UserAgent};
use deferred_rate_limiter::DeferredRateLimiter;
@ -36,7 +35,6 @@ use sea_orm::DatabaseConnection;
use serde::Serialize;
use serde_json::json;
use std::fmt;
use std::mem::size_of_val;
use std::net::IpAddr;
use std::pin::Pin;
use std::str::FromStr;
@ -297,7 +295,7 @@ impl Web3ProxyApp {
// TODO: capacity from configs
// all these are the same size, so no need for a weigher
// TODO: ttl on this?
// TODO: ttl on this? or is max_capacity fine?
let pending_transactions = Cache::builder()
.max_capacity(10_000)
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::new());
@ -305,9 +303,13 @@ impl Web3ProxyApp {
// keep 1GB of blocks in the cache
// TODO: limits from config
// these blocks don't have full transactions, but they do have rather variable amounts of transaction hashes
// TODO: how can we do the weigher better? this is going to be slow!
let block_map = Cache::builder()
.max_capacity(1024 * 1024 * 1024)
.weigher(|_k, v| size_of_val(v) as u32)
.weigher(|_k, v: &Arc<Block<TxHash>>| {
// TODO: is this good enough?
v.transactions.len().try_into().unwrap_or(u32::MAX)
})
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::new());
// connect to the load balanced rpcs
@ -405,7 +407,24 @@ impl Web3ProxyApp {
// TODO: don't allow any response to be bigger than X% of the cache
let response_cache = Cache::builder()
.max_capacity(1024 * 1024 * 1024)
.weigher(|k, v| (size_of_val(k) + size_of_val(v)) as u32)
.weigher(|k: &(H256, String, Option<String>), v| {
// TODO: make this weigher past. serializing json is not fast
let mut size = (k.1).len();
if let Some(params) = &k.2 {
size += params.len()
}
if let Ok(v) = serde_json::to_string(v) {
size += v.len();
// the or in unwrap_or is probably never called
size.try_into().unwrap_or(u32::MAX)
} else {
// this seems impossible
u32::MAX
}
})
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::new());
// all the users are the same size, so no need for a weigher
@ -763,7 +782,8 @@ impl Web3ProxyApp {
) -> anyhow::Result<JsonRpcForwardedResponse> {
trace!("Received request: {:?}", request);
let request_metadata = RequestMetadata::new(&request);
// TODO: allow customizing the period?
let request_metadata = Arc::new(RequestMetadata::new(60, &request)?);
// save the id so we can attach it to the response
// TODO: instead of cloning, take the id out
@ -894,7 +914,12 @@ impl Web3ProxyApp {
let rpcs = self.private_rpcs.as_ref().unwrap_or(&self.balanced_rpcs);
return rpcs
.try_send_all_upstream_servers(Some(&authorized_request), request, None)
.try_send_all_upstream_servers(
Some(&authorized_request),
request,
Some(request_metadata),
None,
)
.await;
}
"eth_syncing" => {
@ -982,17 +1007,13 @@ impl Web3ProxyApp {
request.params.clone().map(|x| x.to_string()),
);
let cache_hit = Arc::new(AtomicBool::new(true));
let mut response = {
let cache_hit = cache_hit.clone();
let request_metadata = request_metadata.clone();
let authorized_request = authorized_request.clone();
self.response_cache
.try_get_with(cache_key, async move {
cache_hit.store(false, Ordering::Release);
// TODO: retry some failures automatically!
// TODO: try private_rpcs if all the balanced_rpcs fail!
// TODO: put the hash here instead?
@ -1001,6 +1022,7 @@ impl Web3ProxyApp {
.try_send_best_upstream_server(
Some(&authorized_request),
request,
Some(&request_metadata),
Some(&request_block_id.num),
)
.await?;
@ -1019,6 +1041,11 @@ impl Web3ProxyApp {
.context("caching response")?
};
// since this data came likely out of a cache, the id is not going to match
// replace the id with our request's id.
// TODO: cache without the id
response.id = request_id;
if let (Some(stat_sender), Ok(AuthorizedRequest::User(Some(_), authorized_key))) = (
self.stat_sender.as_ref(),
Arc::try_unwrap(authorized_request),
@ -1027,16 +1054,12 @@ impl Web3ProxyApp {
method.to_string(),
authorized_key,
request_metadata,
&response,
);
stat_sender.send_async(response_stat.into()).await?;
}
// since this data came likely out of a cache, the id is not going to match
// replace the id with our request's id.
// TODO: cache without the id
response.id = request_id;
return Ok(response);
}
};

@ -12,7 +12,7 @@ use redis_rate_limiter::RedisRateLimitResult;
use sea_orm::{prelude::Decimal, ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter};
use serde::Serialize;
use std::fmt::Display;
use std::mem::size_of_val;
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64};
use std::{net::IpAddr, str::FromStr, sync::Arc};
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use tokio::time::Instant;
@ -53,12 +53,14 @@ pub struct AuthorizedKey {
#[derive(Debug, Default, Serialize)]
pub struct RequestMetadata {
pub timestamp: u64,
pub datetime: chrono::DateTime<Utc>,
pub period_seconds: u64,
pub request_bytes: u64,
pub backend_requests: u32,
pub error_response: bool,
pub response_bytes: u64,
pub response_millis: u64,
/// if this is 0, there was a cache_hit
pub backend_requests: AtomicU32,
pub error_response: AtomicBool,
pub response_bytes: AtomicU64,
pub response_millis: AtomicU64,
}
#[derive(Clone, Debug, Serialize)]
@ -72,14 +74,21 @@ pub enum AuthorizedRequest {
}
impl RequestMetadata {
pub fn new(request: &JsonRpcRequest) -> Self {
let request_bytes = size_of_val(request) as u64;
pub fn new(period_seconds: u64, request: &JsonRpcRequest) -> anyhow::Result<Self> {
// TODO: how can we do this without turning it into a string first. this is going to slow us down!
let request_bytes = serde_json::to_string(request)
.context("finding request size")?
.len()
.try_into()?;
Self {
let new = Self {
period_seconds,
request_bytes,
timestamp: Utc::now().timestamp() as u64,
datetime: Utc::now(),
..Default::default()
}
};
Ok(new)
}
}

@ -11,7 +11,7 @@ fn default_jsonrpc() -> String {
"2.0".to_string()
}
#[derive(Clone, Deserialize)]
#[derive(Clone, Deserialize, Serialize)]
pub struct JsonRpcRequest {
// TODO: skip jsonrpc entirely? its against spec to drop it, but some servers bad
#[serde(default = "default_jsonrpc")]

@ -116,8 +116,9 @@ impl Web3Connections {
let request = json!({ "id": "1", "method": "eth_getBlockByHash", "params": get_block_params });
let request: JsonRpcRequest = serde_json::from_value(request)?;
// TODO: request_metadata? maybe we should put it in the authorized_request?
let response = self
.try_send_best_upstream_server(authorized_request, request, None)
.try_send_best_upstream_server(authorized_request, request, None, None)
.await?;
let block = response.result.unwrap();
@ -176,8 +177,9 @@ impl Web3Connections {
let request: JsonRpcRequest = serde_json::from_value(request)?;
// TODO: if error, retry?
// TODO: request_metadata or authorized_request?
let response = self
.try_send_best_upstream_server(None, request, Some(num))
.try_send_best_upstream_server(None, request, None, Some(num))
.await?;
let raw_block = response.result.context("no block result")?;

@ -7,7 +7,7 @@ use super::request::{
use super::synced_connections::SyncedConnections;
use crate::app::{flatten_handle, AnyhowJoinHandle};
use crate::config::{BlockAndRpc, TxHashAndRpc, Web3ConnectionConfig};
use crate::frontend::authorization::AuthorizedRequest;
use crate::frontend::authorization::{AuthorizedRequest, RequestMetadata};
use crate::jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest};
use crate::rpcs::transactions::TxStatus;
use arc_swap::ArcSwap;
@ -27,6 +27,7 @@ use serde_json::value::RawValue;
use std::cmp;
use std::cmp::Reverse;
use std::fmt;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use tokio::sync::RwLock as AsyncRwLock;
use tokio::sync::{broadcast, watch};
@ -501,11 +502,12 @@ impl Web3Connections {
&self,
authorized_request: Option<&Arc<AuthorizedRequest>>,
request: JsonRpcRequest,
request_metadata: Option<&Arc<RequestMetadata>>,
min_block_needed: Option<&U64>,
) -> anyhow::Result<JsonRpcForwardedResponse> {
let mut skip_rpcs = vec![];
// TODO: maximum retries?
// TODO: maximum retries? right now its the total number of servers
loop {
if skip_rpcs.len() == self.conns.len() {
break;
@ -518,6 +520,12 @@ impl Web3Connections {
// save the rpc in case we get an error and want to retry on another server
skip_rpcs.push(active_request_handle.clone_connection());
if let Some(request_metadata) = request_metadata {
request_metadata
.backend_requests
.fetch_add(1, Ordering::Acquire);
}
// TODO: get the log percent from the user data
let response_result = active_request_handle
.request(
@ -535,6 +543,12 @@ impl Web3Connections {
if let Some(error) = &response.error {
trace!(?response, "rpc error");
if let Some(request_metadata) = request_metadata {
request_metadata
.error_response
.store(true, Ordering::Release);
}
// some errors should be retried on other nodes
if error.code == -32000 {
let error_msg = error.message.as_str();
@ -603,6 +617,7 @@ impl Web3Connections {
&self,
authorized_request: Option<&Arc<AuthorizedRequest>>,
request: JsonRpcRequest,
request_metadata: Option<Arc<RequestMetadata>>,
block_needed: Option<&U64>,
) -> anyhow::Result<JsonRpcForwardedResponse> {
loop {

@ -1,4 +1,5 @@
use crate::frontend::authorization::{AuthorizedKey, RequestMetadata};
use crate::jsonrpc::JsonRpcForwardedResponse;
use anyhow::Context;
use chrono::{TimeZone, Utc};
use derive_more::From;
@ -19,7 +20,14 @@ use tracing::{error, info, trace};
pub struct ProxyResponseStat {
user_key_id: u64,
method: String,
metadata: AsyncMutex<RequestMetadata>,
period_seconds: u64,
period_timestamp: u64,
request_bytes: u64,
/// if this is 0, there was a cache_hit
backend_requests: u32,
error_response: bool,
response_bytes: u64,
response_millis: u64,
}
pub type TimeBucketTimestamp = u64;
@ -98,13 +106,38 @@ pub enum Web3ProxyStat {
impl ProxyResponseStat {
// TODO: should RequestMetadata be in an arc? or can we handle refs here?
pub fn new(method: String, authorized_key: AuthorizedKey, metadata: RequestMetadata) -> Self {
let metadata = AsyncMutex::new(metadata);
pub fn new(
method: String,
authorized_key: AuthorizedKey,
metadata: Arc<RequestMetadata>,
response: &JsonRpcForwardedResponse,
) -> Self {
// TODO: do this without serializing to a string. this is going to slow us down!
let response_bytes = serde_json::to_string(response)
.expect("serializing here should always work")
.len() as u64;
let backend_requests = metadata.backend_requests.load(Ordering::Acquire);
let period_seconds = metadata.period_seconds;
let period_timestamp =
(metadata.datetime.timestamp() as u64) / period_seconds * period_seconds;
let request_bytes = metadata.request_bytes;
let response_millis = metadata
.datetime
.signed_duration_since(Utc::now())
.num_seconds() as u64;
let error_response = metadata.error_response.load(Ordering::Acquire);
Self {
user_key_id: authorized_key.user_key_id,
method,
metadata,
backend_requests,
period_seconds,
period_timestamp,
request_bytes,
error_response,
response_bytes,
response_millis,
}
}
}
@ -188,8 +221,7 @@ impl StatEmitter {
while let Ok(x) = self.save_rx.recv_async().await {
// TODO: batch these
for (k, v) in x.into_iter() {
info!(?k, "saving");
// TODO: this is a lot of variables
let period_datetime = Utc.timestamp(v.period_timestamp as i64, 0);
let frontend_requests = v.frontend_requests.load(Ordering::Acquire);
let backend_requests = v.backend_requests.load(Ordering::Acquire);
@ -289,31 +321,28 @@ impl StatEmitter {
pub async fn aggregate_stat(&self, stat: Web3ProxyStat) -> anyhow::Result<()> {
trace!(?stat, "aggregating");
match stat {
Web3ProxyStat::ProxyResponse(x) => {
Web3ProxyStat::ProxyResponse(stat) => {
// TODO: move this whole closure to another function?
let metadata = x.metadata.lock().await;
// TODO: move period calculation into another function?
let period_timestamp =
metadata.timestamp / self.period_seconds * self.period_seconds;
debug_assert_eq!(stat.period_seconds, self.period_seconds);
// get the user cache for the current period
let user_cache = self
.aggregated_proxy_responses
.get_with(period_timestamp, async move {
.get_with(stat.period_timestamp, async move {
CacheBuilder::default()
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::new())
})
.await;
let key = (x.user_key_id, x.method, metadata.error_response).into();
let key = (stat.user_key_id, stat.method, stat.error_response).into();
let user_aggregate = user_cache
.get_with(key, async move {
let histograms = ProxyResponseHistograms::default();
let aggregate = ProxyResponseAggregate {
period_timestamp,
period_timestamp: stat.period_timestamp,
// start most things at 0 because we add outside this getter
frontend_requests: 0.into(),
backend_requests: 0.into(),
@ -338,31 +367,27 @@ impl StatEmitter {
// a stat might have multiple backend requests
user_aggregate
.backend_requests
.fetch_add(metadata.backend_requests, Ordering::Acquire);
.fetch_add(stat.backend_requests, Ordering::Acquire);
user_aggregate
.sum_request_bytes
.fetch_add(metadata.request_bytes, Ordering::Release);
.fetch_add(stat.request_bytes, Ordering::Release);
user_aggregate
.sum_response_bytes
.fetch_add(metadata.response_bytes, Ordering::Release);
.fetch_add(stat.response_bytes, Ordering::Release);
user_aggregate
.sum_response_millis
.fetch_add(metadata.response_millis, Ordering::Release);
.fetch_add(stat.response_millis, Ordering::Release);
{
let mut histograms = user_aggregate.histograms.lock().await;
// TODO: record_correct?
histograms.request_bytes.record(metadata.request_bytes)?;
histograms.response_bytes.record(metadata.response_bytes)?;
histograms
.response_millis
.record(metadata.response_millis)?;
// TODO: use `record_correct`?
histograms.request_bytes.record(stat.request_bytes)?;
histograms.response_bytes.record(stat.response_bytes)?;
histograms.response_millis.record(stat.response_millis)?;
}
}
}