From bb50efb7f99b6e1bc7c318bb243cae6399e71958 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Mon, 29 May 2023 15:48:22 -0700 Subject: [PATCH] quick cache ttl max item size, better function names, and tests (#97) * Revert "remove cache on /status and /health for now" This reverts commit 166b0d810cd25b895560d50d00383a57e1269ad4. * Revert "remove cache on backups needed, too" This reverts commit 4597967def2f1daa71a277ec6731db4739ead640. * add tests * add max_item_weight * comment * add some helper functions, max weights, and tests --- deferred-rate-limiter/src/lib.rs | 5 +- quick_cache_ttl/src/cache.rs | 45 ++++++---- quick_cache_ttl/src/kq_cache.rs | 70 ++++++++++++--- quick_cache_ttl/src/lib.rs | 52 +++++++++++ web3_proxy/src/app/mod.rs | 30 +++---- web3_proxy/src/frontend/authorization.rs | 2 +- web3_proxy/src/frontend/mod.rs | 3 +- web3_proxy/src/frontend/status.rs | 40 +++------ web3_proxy/src/response_cache.rs | 110 ++++++++++++++++------- web3_proxy/src/rpcs/blockchain.rs | 10 +-- web3_proxy/src/rpcs/many.rs | 6 +- web3_proxy/src/rpcs/one.rs | 9 +- 12 files changed, 257 insertions(+), 125 deletions(-) diff --git a/deferred-rate-limiter/src/lib.rs b/deferred-rate-limiter/src/lib.rs index 99cf5a2e..37a943fa 100644 --- a/deferred-rate-limiter/src/lib.rs +++ b/deferred-rate-limiter/src/lib.rs @@ -45,8 +45,7 @@ where // TODO: time to live is not exactly right. we want this ttl counter to start only after redis is down. this works for now // TODO: what do these weigh? // TODO: allow skipping max_capacity - let local_cache = - CacheWithTTL::new_with_capacity(cache_size, Duration::from_secs(ttl)).await; + let local_cache = CacheWithTTL::new(cache_size, Duration::from_secs(ttl)).await; Self { local_cache, @@ -86,7 +85,7 @@ where // set arc_deferred_rate_limit_result and return the count self.local_cache - .get_or_insert_async::(&key, async move { + .try_get_or_insert_async::(&key, async move { // we do not use the try operator here because we want to be okay with redis errors let redis_count = match rrl .throttle_label(&redis_key, Some(max_requests_per_period), count) diff --git a/quick_cache_ttl/src/cache.rs b/quick_cache_ttl/src/cache.rs index 6f2ac650..941c3148 100644 --- a/quick_cache_ttl/src/cache.rs +++ b/quick_cache_ttl/src/cache.rs @@ -2,6 +2,7 @@ use quick_cache::{DefaultHashBuilder, UnitWeighter, Weighter}; use std::{ future::Future, hash::{BuildHasher, Hash}, + num::NonZeroU32, sync::Arc, time::Duration, }; @@ -15,9 +16,10 @@ pub struct CacheWithTTL( impl CacheWithTTL { - pub async fn new_with_capacity(capacity: usize, ttl: Duration) -> Self { - Self::new( + pub async fn new(capacity: usize, ttl: Duration) -> Self { + Self::new_with_options( capacity, + 1.try_into().unwrap(), capacity as u64, UnitWeighter, DefaultHashBuilder::default(), @@ -27,7 +29,7 @@ impl Arc { - let x = Self::new_with_capacity(capacity, ttl).await; + let x = Self::new(capacity, ttl).await; Arc::new(x) } @@ -42,12 +44,16 @@ impl< { pub async fn new_with_weights( estimated_items_capacity: usize, + max_item_weigth: NonZeroU32, weight_capacity: u64, weighter: We, ttl: Duration, ) -> Self { - let inner = KQCacheWithTTL::new( + let max_item_weigth = max_item_weigth.min((weight_capacity as u32).try_into().unwrap()); + + let inner = KQCacheWithTTL::new_with_options( estimated_items_capacity, + max_item_weigth, weight_capacity, weighter, B::default(), @@ -57,24 +63,18 @@ impl< Self(inner) } -} -impl< - Key: Eq + Hash + Clone + Send + Sync + 'static, - Val: Clone + Send + Sync + 'static, - We: Weighter + Clone + Send + Sync + 'static, - B: BuildHasher + Clone + Send + Sync + 'static, - > CacheWithTTL -{ - pub async fn new( + pub async fn new_with_options( estimated_items_capacity: usize, + max_item_weight: NonZeroU32, weight_capacity: u64, weighter: We, hash_builder: B, ttl: Duration, ) -> Self { - let inner = KQCacheWithTTL::new( + let inner = KQCacheWithTTL::new_with_options( estimated_items_capacity, + max_item_weight, weight_capacity, weighter, hash_builder, @@ -91,11 +91,19 @@ impl< } #[inline] - pub async fn get_or_insert_async(&self, key: &Key, f: Fut) -> Result + pub async fn get_or_insert_async(&self, key: &Key, f: Fut) -> Val + where + Fut: Future, + { + self.0.get_or_insert_async(key, &(), f).await + } + + #[inline] + pub async fn try_get_or_insert_async(&self, key: &Key, f: Fut) -> Result where Fut: Future>, { - self.0.get_or_insert_async(key, &(), f).await + self.0.try_get_or_insert_async(key, &(), f).await } #[inline] @@ -106,9 +114,10 @@ impl< self.0.get_value_or_guard_async(key, ()).await } + /// if the item was too large to insert, it is returned with the error #[inline] - pub fn insert(&self, key: Key, val: Val) { - self.0.insert(key, (), val) + pub fn try_insert(&self, key: Key, val: Val) -> Result<(), (Key, Val)> { + self.0.try_insert(key, (), val).map_err(|(k, _q, v)| (k, v)) } #[inline] diff --git a/quick_cache_ttl/src/kq_cache.rs b/quick_cache_ttl/src/kq_cache.rs index e566c127..d658305b 100644 --- a/quick_cache_ttl/src/kq_cache.rs +++ b/quick_cache_ttl/src/kq_cache.rs @@ -1,7 +1,9 @@ use quick_cache::sync::KQCache; use quick_cache::{PlaceholderGuard, Weighter}; +use std::convert::Infallible; use std::future::Future; use std::hash::{BuildHasher, Hash}; +use std::num::NonZeroU32; use std::sync::Arc; use std::time::Duration; use tokio::task::JoinHandle; @@ -9,9 +11,12 @@ use tokio::time::{sleep_until, Instant}; pub struct KQCacheWithTTL { cache: Arc>, - pub task_handle: JoinHandle<()>, + max_item_weight: NonZeroU32, ttl: Duration, tx: flume::Sender<(Instant, Key, Qey)>, + weighter: We, + + pub task_handle: JoinHandle<()>, } struct KQCacheWithTTLTask { @@ -35,8 +40,9 @@ impl< B: BuildHasher + Clone + Send + Sync + 'static, > KQCacheWithTTL { - pub async fn new( + pub async fn new_with_options( estimated_items_capacity: usize, + max_item_weight: NonZeroU32, weight_capacity: u64, weighter: We, hash_builder: B, @@ -47,7 +53,7 @@ impl< let cache = KQCache::with( estimated_items_capacity, weight_capacity, - weighter, + weighter.clone(), hash_builder, ); @@ -62,9 +68,11 @@ impl< Self { cache, + max_item_weight, task_handle, ttl, tx, + weighter, } } @@ -74,7 +82,23 @@ impl< } #[inline] - pub async fn get_or_insert_async(&self, key: &Key, qey: &Qey, f: Fut) -> Result + pub async fn get_or_insert_async(&self, key: &Key, qey: &Qey, f: Fut) -> Val + where + Fut: Future, + { + self.cache + .get_or_insert_async::(key, qey, async move { Ok(f.await) }) + .await + .expect("infallible") + } + + #[inline] + pub async fn try_get_or_insert_async( + &self, + key: &Key, + qey: &Qey, + f: Fut, + ) -> Result where Fut: Future>, { @@ -99,14 +123,40 @@ impl< } } - pub fn insert(&self, key: Key, qey: Qey, val: Val) { - let expire_at = Instant::now() + self.ttl; - - self.cache.insert(key.clone(), qey.clone(), val); - - self.tx.send((expire_at, key, qey)).unwrap(); + #[inline] + pub fn hits(&self) -> u64 { + self.cache.hits() } + /// if the item was too large to insert, it is returned with the error + #[inline] + pub fn try_insert(&self, key: Key, qey: Qey, val: Val) -> Result<(), (Key, Qey, Val)> { + let expire_at = Instant::now() + self.ttl; + + let weight = self.weighter.weight(&key, &qey, &val); + + if weight <= self.max_item_weight { + self.cache.insert(key.clone(), qey.clone(), val); + + self.tx.send((expire_at, key, qey)).unwrap(); + + Ok(()) + } else { + Err((key, qey, val)) + } + } + + #[inline] + pub fn misses(&self) -> u64 { + self.cache.misses() + } + + #[inline] + pub fn peek(&self, key: &Key, qey: &Qey) -> Option { + self.cache.peek(key, qey) + } + + #[inline] pub fn remove(&self, key: &Key, qey: &Qey) -> bool { self.cache.remove(key, qey) } diff --git a/quick_cache_ttl/src/lib.rs b/quick_cache_ttl/src/lib.rs index ada0c5b0..cbc3bac7 100644 --- a/quick_cache_ttl/src/lib.rs +++ b/quick_cache_ttl/src/lib.rs @@ -5,3 +5,55 @@ pub use cache::CacheWithTTL; pub use kq_cache::{KQCacheWithTTL, PlaceholderGuardWithTTL}; pub use quick_cache::sync::{Cache, KQCache}; pub use quick_cache::{DefaultHashBuilder, UnitWeighter, Weighter}; + +#[cfg(test)] +mod tests { + use std::time::Duration; + use tokio::task::yield_now; + use tokio::time; + + use crate::CacheWithTTL; + + #[tokio::test(start_paused = true)] + async fn test_time_passing() { + let x = CacheWithTTL::::new(2, Duration::from_secs(2)).await; + + assert!(x.get(&0).is_none()); + + x.try_insert(0, 0).unwrap(); + + assert!(x.get(&0).is_some()); + + time::advance(Duration::from_secs(1)).await; + + assert!(x.get(&0).is_some()); + + time::advance(Duration::from_secs(1)).await; + + // yield so that the expiration code gets a chance to run + yield_now().await; + + assert!(x.get(&0).is_none()); + } + + #[tokio::test(start_paused = true)] + async fn test_capacity_based_eviction() { + let x = CacheWithTTL::::new(1, Duration::from_secs(2)).await; + + assert!(x.get(&0).is_none()); + + x.try_insert(0, ()).unwrap(); + + assert!(x.get(&0).is_some()); + + x.try_insert(1, ()).unwrap(); + + assert!(x.get(&1).is_some()); + assert!(x.get(&0).is_none()); + } + + // #[tokio::test(start_paused = true)] + // async fn test_overweight() { + // todo!("wip"); + // } +} diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index a6ade181..b7fe7e0c 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -13,7 +13,7 @@ use crate::jsonrpc::{ JsonRpcRequestEnum, }; use crate::response_cache::{ - JsonRpcQueryCache, JsonRpcQueryCacheKey, JsonRpcQueryWeigher, JsonRpcResponseData, + JsonRpcResponseCache, JsonRpcResponseCacheKey, JsonRpcResponseData, JsonRpcResponseWeigher, }; use crate::rpcs::blockchain::Web3ProxyBlock; use crate::rpcs::consensus::ConsensusWeb3Rpcs; @@ -36,7 +36,6 @@ use ethers::types::U256; use ethers::utils::rlp::{Decodable, Rlp}; use futures::future::join_all; use futures::stream::{FuturesUnordered, StreamExt}; -use hashbrown::hash_map::DefaultHashBuilder; use hashbrown::{HashMap, HashSet}; use ipnet::IpNet; use log::{debug, error, info, trace, warn, Level}; @@ -54,7 +53,7 @@ use serde_json::json; use std::borrow::Cow; use std::fmt; use std::net::IpAddr; -use std::num::NonZeroU64; +use std::num::{NonZeroU32, NonZeroU64}; use std::str::FromStr; use std::sync::{atomic, Arc}; use std::time::Duration; @@ -144,7 +143,7 @@ pub struct Web3ProxyApp { /// TODO: include another type so that we can use private miner relays that do not use JSONRPC requests pub private_rpcs: Option>, /// track JSONRPC responses - pub jsonrpc_query_cache: JsonRpcQueryCache, + pub jsonrpc_response_cache: JsonRpcResponseCache, /// rpc clients that subscribe to newHeads use this channel /// don't drop this or the sender will stop working /// TODO: broadcast channel instead? @@ -606,14 +605,15 @@ impl Web3ProxyApp { CacheWithTTL::arc_with_capacity(10_000, Duration::from_secs(300)).await; // responses can be very different in sizes, so this is a cache with a max capacity and a weigher - // TODO: don't allow any response to be bigger than X% of the cache // TODO: we should emit stats to calculate a more accurate expected cache size // TODO: do we actually want a TTL on this? - let response_cache = JsonRpcQueryCache::new( - (top_config.app.response_cache_max_bytes / 2048) as usize, + // TODO: configurable max item weight instead of using ~0.1% + // TODO: resize the cache automatically + let response_cache = JsonRpcResponseCache::new_with_weights( + (top_config.app.response_cache_max_bytes / 16_384) as usize, + NonZeroU32::try_from((top_config.app.response_cache_max_bytes / 1024) as u32).unwrap(), top_config.app.response_cache_max_bytes, - JsonRpcQueryWeigher, - DefaultHashBuilder::default(), + JsonRpcResponseWeigher, Duration::from_secs(3600), ) .await; @@ -716,7 +716,7 @@ impl Web3ProxyApp { http_client, kafka_producer, private_rpcs, - jsonrpc_query_cache: response_cache, + jsonrpc_response_cache: response_cache, watch_consensus_head_receiver, pending_tx_sender, pending_transactions, @@ -1637,7 +1637,7 @@ impl Web3ProxyApp { // we do this check before checking caches because it might modify the request params // TODO: add a stat for archive vs full since they should probably cost different // TODO: this cache key can be rather large. is that okay? - let cache_key: Option = match block_needed( + let cache_key: Option = match block_needed( authorization, method, request.params.as_mut(), @@ -1646,7 +1646,7 @@ impl Web3ProxyApp { ) .await? { - BlockNeeded::CacheSuccessForever => Some(JsonRpcQueryCacheKey { + BlockNeeded::CacheSuccessForever => Some(JsonRpcResponseCacheKey { from_block: None, to_block: None, method: method.to_string(), @@ -1675,7 +1675,7 @@ impl Web3ProxyApp { .await? .block; - Some(JsonRpcQueryCacheKey { + Some(JsonRpcResponseCacheKey { from_block: Some(request_block), to_block: None, method: method.to_string(), @@ -1717,7 +1717,7 @@ impl Web3ProxyApp { .await? .block; - Some(JsonRpcQueryCacheKey { + Some(JsonRpcResponseCacheKey { from_block: Some(from_block), to_block: Some(to_block), method: method.to_string(), @@ -1737,7 +1737,7 @@ impl Web3ProxyApp { let to_block_num = cache_key.to_block.as_ref().map(|x| x.number.unwrap()); match self - .jsonrpc_query_cache + .jsonrpc_response_cache .get_value_or_guard_async(cache_key).await { Ok(x) => x, diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index 2f9ae0ee..6873fc3e 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -1091,7 +1091,7 @@ impl Web3ProxyApp { rpc_secret_key: RpcSecretKey, ) -> Web3ProxyResult { self.rpc_secret_key_cache - .get_or_insert_async(&rpc_secret_key, async move { + .try_get_or_insert_async(&rpc_secret_key, async move { // trace!(?rpc_secret_key, "user cache miss"); let db_replica = self diff --git a/web3_proxy/src/frontend/mod.rs b/web3_proxy/src/frontend/mod.rs index 23cd51d2..436ad3de 100644 --- a/web3_proxy/src/frontend/mod.rs +++ b/web3_proxy/src/frontend/mod.rs @@ -57,8 +57,7 @@ pub async fn serve( // TODO: latest moka allows for different ttls for different let response_cache_size = ResponseCacheKey::COUNT; - let response_cache = - ResponseCache::new_with_capacity(response_cache_size, Duration::from_secs(1)).await; + let response_cache = ResponseCache::new(response_cache_size, Duration::from_secs(1)).await; // TODO: read config for if fastest/versus should be available publicly. default off diff --git a/web3_proxy/src/frontend/status.rs b/web3_proxy/src/frontend/status.rs index b75d7528..6b65a04f 100644 --- a/web3_proxy/src/frontend/status.rs +++ b/web3_proxy/src/frontend/status.rs @@ -14,7 +14,7 @@ use axum::{ use axum_macros::debug_handler; use once_cell::sync::Lazy; use serde_json::json; -use std::{convert::Infallible, sync::Arc}; +use std::sync::Arc; static HEALTH_OK: Lazy = Lazy::new(|| Bytes::from("OK\n")); static HEALTH_NOT_OK: Lazy = Lazy::new(|| Bytes::from(":(\n")); @@ -31,15 +31,9 @@ pub async fn health( Extension(app): Extension>, Extension(cache): Extension>, ) -> impl IntoResponse { - // let (code, content_type, body) = cache - // .get_or_insert_async::(&ResponseCacheKey::Health, async move { - // Ok(_health(app).await) - // }) - // .await - // .expect("this cache get is infallible"); - - // TODO: cache this once new TTLs work - let (code, content_type, body) = _health(app).await; + let (code, content_type, body) = cache + .get_or_insert_async(&ResponseCacheKey::Health, async move { _health(app).await }) + .await; Response::builder() .status(code) @@ -68,15 +62,11 @@ pub async fn backups_needed( Extension(app): Extension>, Extension(cache): Extension>, ) -> impl IntoResponse { - // let (code, content_type, body) = cache - // .get_or_insert_async::(&ResponseCacheKey::BackupsNeeded, async move { - // Ok(_backups_needed(app).await) - // }) - // .await - // .expect("this cache get is infallible"); - - // TODO: cache this once new TTLs work - let (code, content_type, body) = _backups_needed(app).await; + let (code, content_type, body) = cache + .get_or_insert_async(&ResponseCacheKey::BackupsNeeded, async move { + _backups_needed(app).await + }) + .await; Response::builder() .status(code) @@ -121,15 +111,9 @@ pub async fn status( Extension(app): Extension>, Extension(cache): Extension>, ) -> impl IntoResponse { - // let (code, content_type, body) = cache - // .get_or_insert_async::(&ResponseCacheKey::Status, async move { - // Ok(_status(app).await) - // }) - // .await - // .expect("this cache get is infallible"); - - // TODO: cache this once new TTLs work - let (code, content_type, body) = _status(app).await; + let (code, content_type, body) = cache + .get_or_insert_async(&ResponseCacheKey::Status, async move { _status(app).await }) + .await; Response::builder() .status(code) diff --git a/web3_proxy/src/response_cache.rs b/web3_proxy/src/response_cache.rs index 7b269847..f1a31126 100644 --- a/web3_proxy/src/response_cache.rs +++ b/web3_proxy/src/response_cache.rs @@ -3,7 +3,6 @@ use crate::{ }; use derive_more::From; use ethers::providers::ProviderError; -use hashbrown::hash_map::DefaultHashBuilder; use quick_cache_ttl::{CacheWithTTL, Weighter}; use serde_json::value::RawValue; use std::{ @@ -13,7 +12,7 @@ use std::{ }; #[derive(Clone, Debug, From, PartialEq, Eq)] -pub struct JsonRpcQueryCacheKey { +pub struct JsonRpcResponseCacheKey { pub from_block: Option, pub to_block: Option, pub method: String, @@ -21,7 +20,7 @@ pub struct JsonRpcQueryCacheKey { pub cache_errors: bool, } -impl Hash for JsonRpcQueryCacheKey { +impl Hash for JsonRpcResponseCacheKey { fn hash(&self, state: &mut H) { self.from_block.as_ref().map(|x| x.hash).hash(state); self.to_block.as_ref().map(|x| x.hash).hash(state); @@ -34,25 +33,21 @@ impl Hash for JsonRpcQueryCacheKey { } } -pub type JsonRpcQueryCache = CacheWithTTL< - JsonRpcQueryCacheKey, - JsonRpcResponseData, - JsonRpcQueryWeigher, - DefaultHashBuilder, ->; +pub type JsonRpcResponseCache = + CacheWithTTL; #[derive(Clone)] -pub struct JsonRpcQueryWeigher; +pub struct JsonRpcResponseWeigher; -#[derive(Clone)] +#[derive(Clone, Debug)] pub enum JsonRpcResponseData { Result { value: Box, - size: Option, + num_bytes: NonZeroU32, }, Error { value: JsonRpcErrorData, - size: Option, + num_bytes: NonZeroU32, }, } @@ -60,16 +55,8 @@ impl JsonRpcResponseData { pub fn num_bytes(&self) -> NonZeroU32 { // TODO: dry this somehow match self { - JsonRpcResponseData::Result { value, size } => size.unwrap_or_else(|| { - let size = value.get().len(); - - NonZeroU32::new(size.clamp(1, u32::MAX as usize) as u32).unwrap() - }), - JsonRpcResponseData::Error { value, size } => size.unwrap_or_else(|| { - let size = serde_json::to_string(value).unwrap().len(); - - NonZeroU32::new(size.clamp(1, u32::MAX as usize) as u32).unwrap() - }), + JsonRpcResponseData::Result { num_bytes, .. } => *num_bytes, + JsonRpcResponseData::Error { num_bytes, .. } => *num_bytes, } } } @@ -78,19 +65,28 @@ impl From for JsonRpcResponseData { fn from(value: serde_json::Value) -> Self { let value = RawValue::from_string(value.to_string()).unwrap(); - Self::Result { value, size: None } + value.into() } } impl From> for JsonRpcResponseData { fn from(value: Box) -> Self { - Self::Result { value, size: None } + let num_bytes = value.get().len(); + + let num_bytes = NonZeroU32::try_from(num_bytes as u32).unwrap(); + + Self::Result { value, num_bytes } } } impl From for JsonRpcResponseData { fn from(value: JsonRpcErrorData) -> Self { - Self::Error { value, size: None } + // TODO: wrap the error in a complete response? + let num_bytes = serde_json::to_string(&value).unwrap().len(); + + let num_bytes = NonZeroU32::try_from(num_bytes as u32).unwrap(); + + Self::Error { value, num_bytes } } } @@ -130,13 +126,61 @@ impl TryFrom for JsonRpcErrorData { } } -impl Weighter for JsonRpcQueryWeigher { - fn weight( - &self, - _key: &JsonRpcQueryCacheKey, - _qey: &(), - value: &JsonRpcResponseData, - ) -> NonZeroU32 { +impl Weighter for JsonRpcResponseWeigher { + fn weight(&self, _key: &K, _qey: &Q, value: &JsonRpcResponseData) -> NonZeroU32 { value.num_bytes() } } + +#[cfg(test)] +mod tests { + use super::{JsonRpcResponseData, JsonRpcResponseWeigher}; + use quick_cache_ttl::CacheWithTTL; + use std::{num::NonZeroU32, time::Duration}; + + #[tokio::test(start_paused = true)] + async fn test_json_rpc_query_weigher() { + let max_item_weight = 200; + let weight_capacity = 1_000; + + let response_cache: CacheWithTTL = + CacheWithTTL::new_with_weights( + 5, + max_item_weight.try_into().unwrap(), + weight_capacity, + JsonRpcResponseWeigher, + Duration::from_secs(2), + ) + .await; + + let small_data: JsonRpcResponseData = JsonRpcResponseData::Result { + value: Default::default(), + num_bytes: NonZeroU32::try_from(max_item_weight / 2).unwrap(), + }; + + let max_sized_data = JsonRpcResponseData::Result { + value: Default::default(), + num_bytes: NonZeroU32::try_from(max_item_weight).unwrap(), + }; + + let oversized_data = JsonRpcResponseData::Result { + value: Default::default(), + num_bytes: NonZeroU32::try_from(max_item_weight * 2).unwrap(), + }; + + response_cache.try_insert(0, small_data).unwrap(); + + response_cache.get(&0).unwrap(); + + response_cache.try_insert(1, max_sized_data).unwrap(); + + response_cache.get(&0).unwrap(); + response_cache.get(&1).unwrap(); + + response_cache.try_insert(2, oversized_data).unwrap_err(); + + response_cache.get(&0).unwrap(); + response_cache.get(&1).unwrap(); + assert!(response_cache.get(&2).is_none()); + } +} diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index df79ac68..b7c4d7fc 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -14,7 +14,6 @@ use quick_cache_ttl::CacheWithTTL; use serde::ser::SerializeStruct; use serde::Serialize; use serde_json::json; -use std::convert::Infallible; use std::hash::Hash; use std::{cmp::Ordering, fmt::Display, sync::Arc}; use tokio::sync::broadcast; @@ -181,7 +180,9 @@ impl Web3Rpcs { // this is the only place that writes to block_numbers // multiple inserts should be okay though // TODO: info that there was a fork? - self.blocks_by_number.insert(*block_num, *block.hash()); + if let Err((k, v)) = self.blocks_by_number.try_insert(*block_num, *block.hash()) { + warn!("unable to cache {} as {}", k, v); + } } // this block is very likely already in block_hashes @@ -190,9 +191,8 @@ impl Web3Rpcs { let block = self .blocks_by_hash - .get_or_insert_async::(&block_hash, async move { Ok(block) }) - .await - .expect("this cache get is infallible"); + .get_or_insert_async(&block_hash, async move { block }) + .await; Ok(block) } diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 7ad30174..4c0f70ba 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -98,13 +98,13 @@ impl Web3Rpcs { // TODO: actual weighter on this // TODO: time_to_idle instead? let blocks_by_hash: BlocksByHashCache = - Arc::new(CacheWithTTL::new_with_capacity(10_000, Duration::from_secs(30 * 60)).await); + Arc::new(CacheWithTTL::new(10_000, Duration::from_secs(30 * 60)).await); // all block numbers are the same size, so no need for weigher // TODO: limits from config // TODO: time_to_idle instead? let blocks_by_number = - Arc::new(CacheWithTTL::new_with_capacity(10_000, Duration::from_secs(30 * 60)).await); + Arc::new(CacheWithTTL::new(10_000, Duration::from_secs(30 * 60)).await); let (watch_consensus_rpcs_sender, consensus_connections_watcher) = watch::channel(Default::default()); @@ -339,7 +339,7 @@ impl Web3Rpcs { .spawn(async move { loop { sleep(Duration::from_secs(600)).await; - // TODO: "every interval, check that the provider is still connected" + // TODO: "every interval, do a health check or disconnect the rpc" } })?; diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index c74a156e..ec787c34 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -21,7 +21,6 @@ use redis_rate_limiter::{RedisPool, RedisRateLimitResult, RedisRateLimiter}; use serde::ser::{SerializeStruct, Serializer}; use serde::Serialize; use serde_json::json; -use std::convert::Infallible; use std::fmt; use std::hash::{Hash, Hasher}; use std::sync::atomic::{self, AtomicU64, AtomicUsize}; @@ -444,12 +443,8 @@ impl Web3Rpc { // if we already have this block saved, set new_head_block to that arc. otherwise store this copy let new_head_block = block_map - .get_or_insert_async::( - &new_hash, - async move { Ok(new_head_block) }, - ) - .await - .expect("this cache get is infallible"); + .get_or_insert_async(&new_hash, async move { new_head_block }) + .await; // save the block so we don't send the same one multiple times // also save so that archive checks can know how far back to query