diff --git a/Cargo.lock b/Cargo.lock index c98a94fc..cc24f712 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1091,6 +1091,19 @@ dependencies = [ "cipher 0.4.3", ] +[[package]] +name = "dashmap" +version = "5.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "907076dfda823b0b36d2a1bb5f90c96660a5bbcd7729e10727f07858f22c4edc" +dependencies = [ + "cfg-if", + "hashbrown", + "lock_api", + "once_cell", + "parking_lot_core 0.9.3", +] + [[package]] name = "deadpool" version = "0.9.5" @@ -5538,6 +5551,7 @@ dependencies = [ "axum-macros", "chrono", "counter", + "dashmap", "deferred-rate-limiter", "derive_more", "dotenv", diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index 829207cd..44ef8df3 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -28,6 +28,7 @@ axum-macros = "0.2.3" # TODO: import chrono from sea-orm so we always have the same version chrono = "0.4.22" counter = "0.5.7" +dashmap = "5.4.0" derive_more = "0.99.17" dotenv = "0.15.0" ethers = { version = "1.0.0", features = ["rustls", "ws"] } diff --git a/web3_proxy/src/app_stats.rs b/web3_proxy/src/app_stats.rs index e01941a2..9ee3cafa 100644 --- a/web3_proxy/src/app_stats.rs +++ b/web3_proxy/src/app_stats.rs @@ -2,6 +2,8 @@ use crate::frontend::authorization::{AuthorizedKey, RequestMetadata}; use crate::jsonrpc::JsonRpcForwardedResponse; use anyhow::Context; use chrono::{TimeZone, Utc}; +use dashmap::mapref::entry::Entry; +use dashmap::DashMap; use derive_more::From; use entities::rpc_accounting; use hdrhistogram::Histogram; @@ -80,11 +82,8 @@ pub struct UserProxyResponseKey { error_response: bool, } -pub type UserProxyResponseCache = Cache< - UserProxyResponseKey, - Arc, - hashbrown::hash_map::DefaultHashBuilder, ->; +// TODO: think about nested maps more. does this need an arc? +pub type UserProxyResponseCache = DashMap>; /// key is the "time bucket's timestamp" (timestamp / period * period) pub type TimeProxyResponseCache = Cache; @@ -147,6 +146,7 @@ impl StatEmitter { // this needs to be long enough that there are definitely no outstanding queries // TODO: what should the "safe" multiplier be? what if something is late? + // TODO: in most cases this delays more than necessary. think of how to do this without dashmap which might let us proceed let ttl_seconds = period_seconds * 3; let aggregated_proxy_responses = CacheBuilder::default() @@ -351,18 +351,18 @@ impl StatEmitter { // TODO: i don't think this works right. maybe do DashMap entry api as the outer variable let user_cache = self .aggregated_proxy_responses - .get_with_by_ref(&stat.period_timestamp, async move { - CacheBuilder::default() - .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::new()) - }) + .get_with_by_ref(&stat.period_timestamp, async move { Default::default() }) .await; let key = (stat.rpc_key_id, stat.method, stat.error_response).into(); - let user_aggregate = user_cache - .get_with(key, async move { + let user_aggregate = match user_cache.entry(key) { + Entry::Occupied(x) => x.get().clone(), + Entry::Vacant(y) => { let histograms = ProxyResponseHistograms::default(); + // TODO: create a counter here that we use to tell when it is safe to flush these? faster than waiting 3 periods + let aggregate = ProxyResponseAggregate { period_timestamp: stat.period_timestamp, // start most things at 0 because we add outside this getter @@ -378,9 +378,16 @@ impl StatEmitter { histograms: AsyncMutex::new(histograms), }; - Arc::new(aggregate) - }) - .await; + // TODO: store this arc in the map + // TODO: does this have a race condition? + + let aggregate = Arc::new(aggregate); + + y.insert(aggregate.clone()); + + aggregate + } + }; // a stat always come from just 1 frontend request user_aggregate