From c1eba556a57e38e2e2990d875dda8ac82a7169c0 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Mon, 29 May 2023 17:19:05 -0700 Subject: [PATCH] fix missing ttl send and add name to cache --- Cargo.lock | 1 + deferred-rate-limiter/src/lib.rs | 8 +- quick_cache_ttl/Cargo.toml | 1 + quick_cache_ttl/src/cache.rs | 29 +++++-- quick_cache_ttl/src/kq_cache.rs | 96 ++++++++++++++++++++--- quick_cache_ttl/src/lib.rs | 4 +- web3_proxy/src/app/mod.rs | 17 +++- web3_proxy/src/bin/web3_proxy_cli/main.rs | 2 + web3_proxy/src/frontend/mod.rs | 13 ++- web3_proxy/src/frontend/status.rs | 3 + web3_proxy/src/response_cache.rs | 1 + web3_proxy/src/rpcs/blockchain.rs | 14 ++-- web3_proxy/src/rpcs/many.rs | 56 ++++++++++--- 13 files changed, 197 insertions(+), 48 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 35cf7ee4..9e67990b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4031,6 +4031,7 @@ name = "quick_cache_ttl" version = "0.1.0" dependencies = [ "flume", + "log", "quick_cache", "tokio", ] diff --git a/deferred-rate-limiter/src/lib.rs b/deferred-rate-limiter/src/lib.rs index 37a943fa..b15920ea 100644 --- a/deferred-rate-limiter/src/lib.rs +++ b/deferred-rate-limiter/src/lib.rs @@ -45,7 +45,13 @@ where // TODO: time to live is not exactly right. we want this ttl counter to start only after redis is down. this works for now // TODO: what do these weigh? // TODO: allow skipping max_capacity - let local_cache = CacheWithTTL::new(cache_size, Duration::from_secs(ttl)).await; + // TODO: prefix instead of a static str + let local_cache = CacheWithTTL::new( + "deferred rate limiter", + cache_size, + Duration::from_secs(ttl), + ) + .await; Self { local_cache, diff --git a/quick_cache_ttl/Cargo.toml b/quick_cache_ttl/Cargo.toml index d632bfa6..8df44b3e 100644 --- a/quick_cache_ttl/Cargo.toml +++ b/quick_cache_ttl/Cargo.toml @@ -7,6 +7,7 @@ edition = "2021" [dependencies] flume = "0.10.14" +log = "0.4.18" quick_cache = "0.3.0" tokio = { version = "1.28.2", features = ["full"] } diff --git a/quick_cache_ttl/src/cache.rs b/quick_cache_ttl/src/cache.rs index 941c3148..418d6082 100644 --- a/quick_cache_ttl/src/cache.rs +++ b/quick_cache_ttl/src/cache.rs @@ -1,5 +1,6 @@ use quick_cache::{DefaultHashBuilder, UnitWeighter, Weighter}; use std::{ + fmt::Debug, future::Future, hash::{BuildHasher, Hash}, num::NonZeroU32, @@ -13,11 +14,14 @@ pub struct CacheWithTTL( KQCacheWithTTL, ); -impl - CacheWithTTL +impl< + Key: Clone + Debug + Eq + Hash + Send + Sync + 'static, + Val: Clone + Send + Sync + 'static, + > CacheWithTTL { - pub async fn new(capacity: usize, ttl: Duration) -> Self { + pub async fn new(name: &'static str, capacity: usize, ttl: Duration) -> Self { Self::new_with_options( + name, capacity, 1.try_into().unwrap(), capacity as u64, @@ -28,32 +32,38 @@ impl Arc { - let x = Self::new(capacity, ttl).await; + pub async fn arc_with_capacity( + name: &'static str, + capacity: usize, + ttl: Duration, + ) -> Arc { + let x = Self::new(name, capacity, ttl).await; Arc::new(x) } } impl< - Key: Eq + Hash + Clone + Send + Sync + 'static, + Key: Clone + Debug + Eq + Hash + Send + Sync + 'static, Val: Clone + Send + Sync + 'static, We: Weighter + Clone + Send + Sync + 'static, B: BuildHasher + Clone + Default + Send + Sync + 'static, > CacheWithTTL { pub async fn new_with_weights( + name: &'static str, estimated_items_capacity: usize, max_item_weigth: NonZeroU32, weight_capacity: u64, weighter: We, ttl: Duration, ) -> Self { - let max_item_weigth = max_item_weigth.min((weight_capacity as u32).try_into().unwrap()); + let max_item_weight = max_item_weigth.min((weight_capacity as u32).try_into().unwrap()); let inner = KQCacheWithTTL::new_with_options( + name, estimated_items_capacity, - max_item_weigth, + max_item_weight, weight_capacity, weighter, B::default(), @@ -65,6 +75,7 @@ impl< } pub async fn new_with_options( + name: &'static str, estimated_items_capacity: usize, max_item_weight: NonZeroU32, weight_capacity: u64, @@ -73,6 +84,7 @@ impl< ttl: Duration, ) -> Self { let inner = KQCacheWithTTL::new_with_options( + name, estimated_items_capacity, max_item_weight, weight_capacity, @@ -115,6 +127,7 @@ impl< } /// if the item was too large to insert, it is returned with the error + /// IMPORTANT! Inserting the same key multiple times does NOT reset the TTL! #[inline] pub fn try_insert(&self, key: Key, val: Val) -> Result<(), (Key, Val)> { self.0.try_insert(key, (), val).map_err(|(k, _q, v)| (k, v)) diff --git a/quick_cache_ttl/src/kq_cache.rs b/quick_cache_ttl/src/kq_cache.rs index d658305b..14b68064 100644 --- a/quick_cache_ttl/src/kq_cache.rs +++ b/quick_cache_ttl/src/kq_cache.rs @@ -1,6 +1,8 @@ +use log::{log_enabled, trace}; use quick_cache::sync::KQCache; use quick_cache::{PlaceholderGuard, Weighter}; use std::convert::Infallible; +use std::fmt::Debug; use std::future::Future; use std::hash::{BuildHasher, Hash}; use std::num::NonZeroU32; @@ -12,6 +14,7 @@ use tokio::time::{sleep_until, Instant}; pub struct KQCacheWithTTL { cache: Arc>, max_item_weight: NonZeroU32, + name: &'static str, ttl: Duration, tx: flume::Sender<(Instant, Key, Qey)>, weighter: We, @@ -21,10 +24,12 @@ pub struct KQCacheWithTTL { struct KQCacheWithTTLTask { cache: Arc>, + name: &'static str, rx: flume::Receiver<(Instant, Key, Qey)>, } pub struct PlaceholderGuardWithTTL<'a, Key, Qey, Val, We, B> { + name: &'a str, inner: PlaceholderGuard<'a, Key, Qey, Val, We, B>, key: Key, qey: Qey, @@ -33,14 +38,15 @@ pub struct PlaceholderGuardWithTTL<'a, Key, Qey, Val, We, B> { } impl< - Key: Eq + Hash + Clone + Send + Sync + 'static, - Qey: Eq + Hash + Clone + Send + Sync + 'static, + Key: Clone + Debug + Eq + Hash + Send + Sync + 'static, + Qey: Clone + Debug + Eq + Hash + Send + Sync + 'static, Val: Clone + Send + Sync + 'static, We: Weighter + Clone + Send + Sync + 'static, B: BuildHasher + Clone + Send + Sync + 'static, > KQCacheWithTTL { pub async fn new_with_options( + name: &'static str, estimated_items_capacity: usize, max_item_weight: NonZeroU32, weight_capacity: u64, @@ -61,6 +67,7 @@ impl< let task = KQCacheWithTTLTask { cache: cache.clone(), + name, rx, }; @@ -69,6 +76,7 @@ impl< Self { cache, max_item_weight, + name, task_handle, ttl, tx, @@ -86,8 +94,7 @@ impl< where Fut: Future, { - self.cache - .get_or_insert_async::(key, qey, async move { Ok(f.await) }) + self.try_get_or_insert_async::(key, qey, async move { Ok(f.await) }) .await .expect("infallible") } @@ -102,7 +109,27 @@ impl< where Fut: Future>, { - self.cache.get_or_insert_async(key, qey, f).await + self.cache + .get_or_insert_async(key, qey, async move { + let x = f.await; + + if x.is_ok() { + let expire_at = Instant::now() + self.ttl; + + trace!( + "{}, {:?}, {:?} expiring in {}s", + self.name, + &key, + &qey, + expire_at.duration_since(Instant::now()).as_secs_f32() + ); + + self.tx.send((expire_at, key.clone(), qey.clone())).unwrap(); + } + + x + }) + .await } #[inline] @@ -114,6 +141,7 @@ impl< match self.cache.get_value_or_guard_async(&key, &qey).await { Ok(x) => Ok(x), Err(inner) => Err(PlaceholderGuardWithTTL { + name: self.name, inner, key, qey, @@ -129,6 +157,7 @@ impl< } /// if the item was too large to insert, it is returned with the error + /// IMPORTANT! Inserting the same key multiple times does NOT reset the TTL! #[inline] pub fn try_insert(&self, key: Key, qey: Qey, val: Val) -> Result<(), (Key, Qey, Val)> { let expire_at = Instant::now() + self.ttl; @@ -138,6 +167,14 @@ impl< if weight <= self.max_item_weight { self.cache.insert(key.clone(), qey.clone(), val); + trace!( + "{}, {:?}, {:?} expiring in {}s", + self.name, + &key, + &qey, + expire_at.duration_since(Instant::now()).as_secs_f32() + ); + self.tx.send((expire_at, key, qey)).unwrap(); Ok(()) @@ -163,26 +200,51 @@ impl< } impl< - Key: Eq + Hash, - Qey: Eq + Hash, + Key: Debug + Eq + Hash, + Qey: Debug + Eq + Hash, Val: Clone, We: Weighter + Clone, B: BuildHasher + Clone, > KQCacheWithTTLTask { async fn run(self) { - while let Ok((expire_at, key, qey)) = self.rx.recv_async().await { - sleep_until(expire_at).await; + trace!("watching for expirations on {}", self.name); - self.cache.remove(&key, &qey); + while let Ok((expire_at, key, qey)) = self.rx.recv_async().await { + let now = Instant::now(); + if expire_at > now { + if log_enabled!(log::Level::Trace) { + trace!( + "{}, {:?}, {:?} sleeping for {}ms.", + self.name, + key, + qey, + expire_at.duration_since(now).as_millis(), + ); + } + + sleep_until(expire_at).await; + + trace!("{}, {:?}, {:?} done sleeping", self.name, key, qey); + } else { + trace!("no need to sleep!"); + } + + if self.cache.remove(&key, &qey) { + trace!("removed {}, {:?}, {:?}", self.name, key, qey); + } else { + trace!("empty {}, {:?}, {:?}", self.name, key, qey); + }; } + + trace!("watching for expirations on {}", self.name) } } impl< 'a, - Key: Clone + Hash + Eq, - Qey: Clone + Hash + Eq, + Key: Clone + Debug + Hash + Eq, + Qey: Clone + Debug + Hash + Eq, Val: Clone, We: Weighter, B: BuildHasher, @@ -193,6 +255,16 @@ impl< self.inner.insert(val); + if log_enabled!(log::Level::Trace) { + trace!( + "{}, {:?}, {:?} expiring in {}s", + self.name, + self.key, + self.qey, + expire_at.duration_since(Instant::now()).as_secs_f32() + ); + } + self.tx.send((expire_at, self.key, self.qey)).unwrap(); } } diff --git a/quick_cache_ttl/src/lib.rs b/quick_cache_ttl/src/lib.rs index cbc3bac7..6fddd833 100644 --- a/quick_cache_ttl/src/lib.rs +++ b/quick_cache_ttl/src/lib.rs @@ -16,7 +16,7 @@ mod tests { #[tokio::test(start_paused = true)] async fn test_time_passing() { - let x = CacheWithTTL::::new(2, Duration::from_secs(2)).await; + let x = CacheWithTTL::::new("test", 2, Duration::from_secs(2)).await; assert!(x.get(&0).is_none()); @@ -38,7 +38,7 @@ mod tests { #[tokio::test(start_paused = true)] async fn test_capacity_based_eviction() { - let x = CacheWithTTL::::new(1, Duration::from_secs(2)).await; + let x = CacheWithTTL::::new("test", 1, Duration::from_secs(2)).await; assert!(x.get(&0).is_none()); diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index b7fe7e0c..0c346b15 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -507,8 +507,12 @@ impl Web3ProxyApp { // if there is no database of users, there will be no keys and so this will be empty // TODO: max_capacity from config // TODO: ttl from config - let rpc_secret_key_cache = - CacheWithTTL::arc_with_capacity(10_000, Duration::from_secs(600)).await; + let rpc_secret_key_cache = CacheWithTTL::arc_with_capacity( + "rpc_secret_key_cache", + 10_000, + Duration::from_secs(600), + ) + .await; // create a channel for receiving stats // we do this in a channel so we don't slow down our response to the users @@ -601,8 +605,12 @@ impl Web3ProxyApp { // TODO: different chains might handle this differently // TODO: what should we set? 5 minutes is arbitrary. the nodes themselves hold onto transactions for much longer // TODO: this used to be time_to_update, but - let pending_transactions = - CacheWithTTL::arc_with_capacity(10_000, Duration::from_secs(300)).await; + let pending_transactions = CacheWithTTL::arc_with_capacity( + "pending_transactions", + 10_000, + Duration::from_secs(300), + ) + .await; // responses can be very different in sizes, so this is a cache with a max capacity and a weigher // TODO: we should emit stats to calculate a more accurate expected cache size @@ -610,6 +618,7 @@ impl Web3ProxyApp { // TODO: configurable max item weight instead of using ~0.1% // TODO: resize the cache automatically let response_cache = JsonRpcResponseCache::new_with_weights( + "response_cache", (top_config.app.response_cache_max_bytes / 16_384) as usize, NonZeroU32::try_from((top_config.app.response_cache_max_bytes / 1024) as u32).unwrap(), top_config.app.response_cache_max_bytes, diff --git a/web3_proxy/src/bin/web3_proxy_cli/main.rs b/web3_proxy/src/bin/web3_proxy_cli/main.rs index 53ba221d..74a2bb5c 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/main.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/main.rs @@ -140,6 +140,7 @@ fn main() -> anyhow::Result<()> { "ethers=debug", "ethers_providers::rpc=off", "ethers_providers=debug", + "quick_cache_ttl=debug", "redis_rate_limit=debug", "web3_proxy::rpcs::blockchain=info", "web3_proxy::rpcs::request=debug", @@ -154,6 +155,7 @@ fn main() -> anyhow::Result<()> { "ethers=debug", "ethers_providers::rpc=off", "ethers_providers=error", + "quick_cache_ttl=info", "redis_rate_limit=debug", // "web3_proxy::stats::influxdb_queries=trace", "web3_proxy=debug", diff --git a/web3_proxy/src/frontend/mod.rs b/web3_proxy/src/frontend/mod.rs index 436ad3de..b3ef44e6 100644 --- a/web3_proxy/src/frontend/mod.rs +++ b/web3_proxy/src/frontend/mod.rs @@ -18,7 +18,7 @@ use axum::{ }; use http::{header::AUTHORIZATION, StatusCode}; use listenfd::ListenFd; -use log::info; +use log::{debug, info}; use quick_cache_ttl::UnitWeighter; use std::net::SocketAddr; use std::sync::Arc; @@ -31,7 +31,7 @@ use tower_http::sensitive_headers::SetSensitiveRequestHeadersLayer; use self::errors::Web3ProxyResult; /// simple keys for caching responses -#[derive(Copy, Clone, Hash, PartialEq, Eq, EnumCount, EnumIter)] +#[derive(Copy, Clone, Debug, Hash, PartialEq, Eq, EnumCount, EnumIter)] pub enum ResponseCacheKey { BackupsNeeded, Health, @@ -57,7 +57,14 @@ pub async fn serve( // TODO: latest moka allows for different ttls for different let response_cache_size = ResponseCacheKey::COUNT; - let response_cache = ResponseCache::new(response_cache_size, Duration::from_secs(1)).await; + debug!("response_cache size: {}", response_cache_size); + + let response_cache = ResponseCache::new( + "response_cache", + response_cache_size, + Duration::from_secs(1), + ) + .await; // TODO: read config for if fastest/versus should be available publicly. default off diff --git a/web3_proxy/src/frontend/status.rs b/web3_proxy/src/frontend/status.rs index 6b65a04f..4ab2637b 100644 --- a/web3_proxy/src/frontend/status.rs +++ b/web3_proxy/src/frontend/status.rs @@ -12,6 +12,7 @@ use axum::{ Extension, }; use axum_macros::debug_handler; +use log::debug; use once_cell::sync::Lazy; use serde_json::json; use std::sync::Arc; @@ -125,6 +126,8 @@ pub async fn status( // TODO: _status doesn't need to be async, but _quick_cache_ttl needs an async function #[inline] async fn _status(app: Arc) -> (StatusCode, &'static str, Bytes) { + debug!("status is not cached"); + // TODO: what else should we include? uptime, cache hit rates, cpu load, memory used // TODO: the hostname is probably not going to change. only get once at the start? let body = json!({ diff --git a/web3_proxy/src/response_cache.rs b/web3_proxy/src/response_cache.rs index f1a31126..4c488676 100644 --- a/web3_proxy/src/response_cache.rs +++ b/web3_proxy/src/response_cache.rs @@ -145,6 +145,7 @@ mod tests { let response_cache: CacheWithTTL = CacheWithTTL::new_with_weights( + "test", 5, max_item_weight.try_into().unwrap(), weight_capacity, diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index b7c4d7fc..20d0d417 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -175,20 +175,20 @@ impl Web3Rpcs { let block_num = block.number(); + // this block is very likely already in block_hashes + // TODO: use their get_with + let block_hash = *block.hash(); + // TODO: think more about heaviest_chain. would be better to do the check inside this function if heaviest_chain { // this is the only place that writes to block_numbers // multiple inserts should be okay though // TODO: info that there was a fork? - if let Err((k, v)) = self.blocks_by_number.try_insert(*block_num, *block.hash()) { - warn!("unable to cache {} as {}", k, v); - } + self.blocks_by_number + .get_or_insert_async(block_num, async move { block_hash }) + .await; } - // this block is very likely already in block_hashes - // TODO: use their get_with - let block_hash = *block.hash(); - let block = self .blocks_by_hash .get_or_insert_async(&block_hash, async move { block }) diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 4c0f70ba..4ee3aadc 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -97,14 +97,16 @@ impl Web3Rpcs { // these blocks don't have full transactions, but they do have rather variable amounts of transaction hashes // TODO: actual weighter on this // TODO: time_to_idle instead? - let blocks_by_hash: BlocksByHashCache = - Arc::new(CacheWithTTL::new(10_000, Duration::from_secs(30 * 60)).await); + let blocks_by_hash: BlocksByHashCache = Arc::new( + CacheWithTTL::new("blocks_by_hash", 10_000, Duration::from_secs(30 * 60)).await, + ); // all block numbers are the same size, so no need for weigher // TODO: limits from config // TODO: time_to_idle instead? - let blocks_by_number = - Arc::new(CacheWithTTL::new(10_000, Duration::from_secs(30 * 60)).await); + let blocks_by_number = Arc::new( + CacheWithTTL::new("blocks_by_number", 10_000, Duration::from_secs(30 * 60)).await, + ); let (watch_consensus_rpcs_sender, consensus_connections_watcher) = watch::channel(Default::default()); @@ -1444,14 +1446,25 @@ mod tests { watch_consensus_head_sender: Some(watch_consensus_head_sender), watch_consensus_rpcs_sender, pending_transaction_cache: CacheWithTTL::arc_with_capacity( + "pending_transaction_cache", 100, Duration::from_secs(60), ) .await, pending_tx_id_receiver, pending_tx_id_sender, - blocks_by_hash: CacheWithTTL::arc_with_capacity(100, Duration::from_secs(60)).await, - blocks_by_number: CacheWithTTL::arc_with_capacity(100, Duration::from_secs(60)).await, + blocks_by_hash: CacheWithTTL::arc_with_capacity( + "blocks_by_hash", + 100, + Duration::from_secs(60), + ) + .await, + blocks_by_number: CacheWithTTL::arc_with_capacity( + "blocks_by_number", + 100, + Duration::from_secs(60), + ) + .await, // TODO: test max_block_age? max_block_age: None, // TODO: test max_block_lag? @@ -1722,14 +1735,25 @@ mod tests { watch_consensus_head_sender: Some(watch_consensus_head_sender), watch_consensus_rpcs_sender, pending_transaction_cache: CacheWithTTL::arc_with_capacity( + "pending_transaction_cache", 100, Duration::from_secs(120), ) .await, pending_tx_id_receiver, pending_tx_id_sender, - blocks_by_hash: CacheWithTTL::arc_with_capacity(100, Duration::from_secs(120)).await, - blocks_by_number: CacheWithTTL::arc_with_capacity(100, Duration::from_secs(120)).await, + blocks_by_hash: CacheWithTTL::arc_with_capacity( + "blocks_by_hash", + 100, + Duration::from_secs(120), + ) + .await, + blocks_by_number: CacheWithTTL::arc_with_capacity( + "blocks_by_number", + 100, + Duration::from_secs(120), + ) + .await, min_head_rpcs: 1, min_sum_soft_limit: 4_000, max_block_age: None, @@ -1906,15 +1930,25 @@ mod tests { watch_consensus_head_sender: Some(watch_consensus_head_sender), watch_consensus_rpcs_sender, pending_transaction_cache: CacheWithTTL::arc_with_capacity( + "pending_transaction_cache", 10_000, Duration::from_secs(120), ) .await, pending_tx_id_receiver, pending_tx_id_sender, - blocks_by_hash: CacheWithTTL::arc_with_capacity(10_000, Duration::from_secs(120)).await, - blocks_by_number: CacheWithTTL::arc_with_capacity(10_000, Duration::from_secs(120)) - .await, + blocks_by_hash: CacheWithTTL::arc_with_capacity( + "blocks_by_hash", + 10_000, + Duration::from_secs(120), + ) + .await, + blocks_by_number: CacheWithTTL::arc_with_capacity( + "blocks_by_number", + 10_000, + Duration::from_secs(120), + ) + .await, min_head_rpcs: 1, min_sum_soft_limit: 1_000, max_block_age: None,