fix missing ttl send and add name to cache

This commit is contained in:
Bryan Stitt 2023-05-29 17:19:05 -07:00
parent bb50efb7f9
commit c1eba556a5
13 changed files with 197 additions and 48 deletions

1
Cargo.lock generated
View File

@ -4031,6 +4031,7 @@ name = "quick_cache_ttl"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"flume", "flume",
"log",
"quick_cache", "quick_cache",
"tokio", "tokio",
] ]

View File

@ -45,7 +45,13 @@ where
// TODO: time to live is not exactly right. we want this ttl counter to start only after redis is down. this works for now // TODO: time to live is not exactly right. we want this ttl counter to start only after redis is down. this works for now
// TODO: what do these weigh? // TODO: what do these weigh?
// TODO: allow skipping max_capacity // TODO: allow skipping max_capacity
let local_cache = CacheWithTTL::new(cache_size, Duration::from_secs(ttl)).await; // TODO: prefix instead of a static str
let local_cache = CacheWithTTL::new(
"deferred rate limiter",
cache_size,
Duration::from_secs(ttl),
)
.await;
Self { Self {
local_cache, local_cache,

View File

@ -7,6 +7,7 @@ edition = "2021"
[dependencies] [dependencies]
flume = "0.10.14" flume = "0.10.14"
log = "0.4.18"
quick_cache = "0.3.0" quick_cache = "0.3.0"
tokio = { version = "1.28.2", features = ["full"] } tokio = { version = "1.28.2", features = ["full"] }

View File

@ -1,5 +1,6 @@
use quick_cache::{DefaultHashBuilder, UnitWeighter, Weighter}; use quick_cache::{DefaultHashBuilder, UnitWeighter, Weighter};
use std::{ use std::{
fmt::Debug,
future::Future, future::Future,
hash::{BuildHasher, Hash}, hash::{BuildHasher, Hash},
num::NonZeroU32, num::NonZeroU32,
@ -13,11 +14,14 @@ pub struct CacheWithTTL<Key, Val, We = UnitWeighter, B = DefaultHashBuilder>(
KQCacheWithTTL<Key, (), Val, We, B>, KQCacheWithTTL<Key, (), Val, We, B>,
); );
impl<Key: Eq + Hash + Clone + Send + Sync + 'static, Val: Clone + Send + Sync + 'static> impl<
CacheWithTTL<Key, Val, UnitWeighter, DefaultHashBuilder> Key: Clone + Debug + Eq + Hash + Send + Sync + 'static,
Val: Clone + Send + Sync + 'static,
> CacheWithTTL<Key, Val, UnitWeighter, DefaultHashBuilder>
{ {
pub async fn new(capacity: usize, ttl: Duration) -> Self { pub async fn new(name: &'static str, capacity: usize, ttl: Duration) -> Self {
Self::new_with_options( Self::new_with_options(
name,
capacity, capacity,
1.try_into().unwrap(), 1.try_into().unwrap(),
capacity as u64, capacity as u64,
@ -28,32 +32,38 @@ impl<Key: Eq + Hash + Clone + Send + Sync + 'static, Val: Clone + Send + Sync +
.await .await
} }
pub async fn arc_with_capacity(capacity: usize, ttl: Duration) -> Arc<Self> { pub async fn arc_with_capacity(
let x = Self::new(capacity, ttl).await; name: &'static str,
capacity: usize,
ttl: Duration,
) -> Arc<Self> {
let x = Self::new(name, capacity, ttl).await;
Arc::new(x) Arc::new(x)
} }
} }
impl< impl<
Key: Eq + Hash + Clone + Send + Sync + 'static, Key: Clone + Debug + Eq + Hash + Send + Sync + 'static,
Val: Clone + Send + Sync + 'static, Val: Clone + Send + Sync + 'static,
We: Weighter<Key, (), Val> + Clone + Send + Sync + 'static, We: Weighter<Key, (), Val> + Clone + Send + Sync + 'static,
B: BuildHasher + Clone + Default + Send + Sync + 'static, B: BuildHasher + Clone + Default + Send + Sync + 'static,
> CacheWithTTL<Key, Val, We, B> > CacheWithTTL<Key, Val, We, B>
{ {
pub async fn new_with_weights( pub async fn new_with_weights(
name: &'static str,
estimated_items_capacity: usize, estimated_items_capacity: usize,
max_item_weigth: NonZeroU32, max_item_weigth: NonZeroU32,
weight_capacity: u64, weight_capacity: u64,
weighter: We, weighter: We,
ttl: Duration, ttl: Duration,
) -> Self { ) -> Self {
let max_item_weigth = max_item_weigth.min((weight_capacity as u32).try_into().unwrap()); let max_item_weight = max_item_weigth.min((weight_capacity as u32).try_into().unwrap());
let inner = KQCacheWithTTL::new_with_options( let inner = KQCacheWithTTL::new_with_options(
name,
estimated_items_capacity, estimated_items_capacity,
max_item_weigth, max_item_weight,
weight_capacity, weight_capacity,
weighter, weighter,
B::default(), B::default(),
@ -65,6 +75,7 @@ impl<
} }
pub async fn new_with_options( pub async fn new_with_options(
name: &'static str,
estimated_items_capacity: usize, estimated_items_capacity: usize,
max_item_weight: NonZeroU32, max_item_weight: NonZeroU32,
weight_capacity: u64, weight_capacity: u64,
@ -73,6 +84,7 @@ impl<
ttl: Duration, ttl: Duration,
) -> Self { ) -> Self {
let inner = KQCacheWithTTL::new_with_options( let inner = KQCacheWithTTL::new_with_options(
name,
estimated_items_capacity, estimated_items_capacity,
max_item_weight, max_item_weight,
weight_capacity, weight_capacity,
@ -115,6 +127,7 @@ impl<
} }
/// if the item was too large to insert, it is returned with the error /// if the item was too large to insert, it is returned with the error
/// IMPORTANT! Inserting the same key multiple times does NOT reset the TTL!
#[inline] #[inline]
pub fn try_insert(&self, key: Key, val: Val) -> Result<(), (Key, Val)> { pub fn try_insert(&self, key: Key, val: Val) -> Result<(), (Key, Val)> {
self.0.try_insert(key, (), val).map_err(|(k, _q, v)| (k, v)) self.0.try_insert(key, (), val).map_err(|(k, _q, v)| (k, v))

View File

@ -1,6 +1,8 @@
use log::{log_enabled, trace};
use quick_cache::sync::KQCache; use quick_cache::sync::KQCache;
use quick_cache::{PlaceholderGuard, Weighter}; use quick_cache::{PlaceholderGuard, Weighter};
use std::convert::Infallible; use std::convert::Infallible;
use std::fmt::Debug;
use std::future::Future; use std::future::Future;
use std::hash::{BuildHasher, Hash}; use std::hash::{BuildHasher, Hash};
use std::num::NonZeroU32; use std::num::NonZeroU32;
@ -12,6 +14,7 @@ use tokio::time::{sleep_until, Instant};
pub struct KQCacheWithTTL<Key, Qey, Val, We, B> { pub struct KQCacheWithTTL<Key, Qey, Val, We, B> {
cache: Arc<KQCache<Key, Qey, Val, We, B>>, cache: Arc<KQCache<Key, Qey, Val, We, B>>,
max_item_weight: NonZeroU32, max_item_weight: NonZeroU32,
name: &'static str,
ttl: Duration, ttl: Duration,
tx: flume::Sender<(Instant, Key, Qey)>, tx: flume::Sender<(Instant, Key, Qey)>,
weighter: We, weighter: We,
@ -21,10 +24,12 @@ pub struct KQCacheWithTTL<Key, Qey, Val, We, B> {
struct KQCacheWithTTLTask<Key, Qey, Val, We, B> { struct KQCacheWithTTLTask<Key, Qey, Val, We, B> {
cache: Arc<KQCache<Key, Qey, Val, We, B>>, cache: Arc<KQCache<Key, Qey, Val, We, B>>,
name: &'static str,
rx: flume::Receiver<(Instant, Key, Qey)>, rx: flume::Receiver<(Instant, Key, Qey)>,
} }
pub struct PlaceholderGuardWithTTL<'a, Key, Qey, Val, We, B> { pub struct PlaceholderGuardWithTTL<'a, Key, Qey, Val, We, B> {
name: &'a str,
inner: PlaceholderGuard<'a, Key, Qey, Val, We, B>, inner: PlaceholderGuard<'a, Key, Qey, Val, We, B>,
key: Key, key: Key,
qey: Qey, qey: Qey,
@ -33,14 +38,15 @@ pub struct PlaceholderGuardWithTTL<'a, Key, Qey, Val, We, B> {
} }
impl< impl<
Key: Eq + Hash + Clone + Send + Sync + 'static, Key: Clone + Debug + Eq + Hash + Send + Sync + 'static,
Qey: Eq + Hash + Clone + Send + Sync + 'static, Qey: Clone + Debug + Eq + Hash + Send + Sync + 'static,
Val: Clone + Send + Sync + 'static, Val: Clone + Send + Sync + 'static,
We: Weighter<Key, Qey, Val> + Clone + Send + Sync + 'static, We: Weighter<Key, Qey, Val> + Clone + Send + Sync + 'static,
B: BuildHasher + Clone + Send + Sync + 'static, B: BuildHasher + Clone + Send + Sync + 'static,
> KQCacheWithTTL<Key, Qey, Val, We, B> > KQCacheWithTTL<Key, Qey, Val, We, B>
{ {
pub async fn new_with_options( pub async fn new_with_options(
name: &'static str,
estimated_items_capacity: usize, estimated_items_capacity: usize,
max_item_weight: NonZeroU32, max_item_weight: NonZeroU32,
weight_capacity: u64, weight_capacity: u64,
@ -61,6 +67,7 @@ impl<
let task = KQCacheWithTTLTask { let task = KQCacheWithTTLTask {
cache: cache.clone(), cache: cache.clone(),
name,
rx, rx,
}; };
@ -69,6 +76,7 @@ impl<
Self { Self {
cache, cache,
max_item_weight, max_item_weight,
name,
task_handle, task_handle,
ttl, ttl,
tx, tx,
@ -86,8 +94,7 @@ impl<
where where
Fut: Future<Output = Val>, Fut: Future<Output = Val>,
{ {
self.cache self.try_get_or_insert_async::<Infallible, _>(key, qey, async move { Ok(f.await) })
.get_or_insert_async::<Infallible>(key, qey, async move { Ok(f.await) })
.await .await
.expect("infallible") .expect("infallible")
} }
@ -102,7 +109,27 @@ impl<
where where
Fut: Future<Output = Result<Val, E>>, Fut: Future<Output = Result<Val, E>>,
{ {
self.cache.get_or_insert_async(key, qey, f).await self.cache
.get_or_insert_async(key, qey, async move {
let x = f.await;
if x.is_ok() {
let expire_at = Instant::now() + self.ttl;
trace!(
"{}, {:?}, {:?} expiring in {}s",
self.name,
&key,
&qey,
expire_at.duration_since(Instant::now()).as_secs_f32()
);
self.tx.send((expire_at, key.clone(), qey.clone())).unwrap();
}
x
})
.await
} }
#[inline] #[inline]
@ -114,6 +141,7 @@ impl<
match self.cache.get_value_or_guard_async(&key, &qey).await { match self.cache.get_value_or_guard_async(&key, &qey).await {
Ok(x) => Ok(x), Ok(x) => Ok(x),
Err(inner) => Err(PlaceholderGuardWithTTL { Err(inner) => Err(PlaceholderGuardWithTTL {
name: self.name,
inner, inner,
key, key,
qey, qey,
@ -129,6 +157,7 @@ impl<
} }
/// if the item was too large to insert, it is returned with the error /// if the item was too large to insert, it is returned with the error
/// IMPORTANT! Inserting the same key multiple times does NOT reset the TTL!
#[inline] #[inline]
pub fn try_insert(&self, key: Key, qey: Qey, val: Val) -> Result<(), (Key, Qey, Val)> { pub fn try_insert(&self, key: Key, qey: Qey, val: Val) -> Result<(), (Key, Qey, Val)> {
let expire_at = Instant::now() + self.ttl; let expire_at = Instant::now() + self.ttl;
@ -138,6 +167,14 @@ impl<
if weight <= self.max_item_weight { if weight <= self.max_item_weight {
self.cache.insert(key.clone(), qey.clone(), val); self.cache.insert(key.clone(), qey.clone(), val);
trace!(
"{}, {:?}, {:?} expiring in {}s",
self.name,
&key,
&qey,
expire_at.duration_since(Instant::now()).as_secs_f32()
);
self.tx.send((expire_at, key, qey)).unwrap(); self.tx.send((expire_at, key, qey)).unwrap();
Ok(()) Ok(())
@ -163,26 +200,51 @@ impl<
} }
impl< impl<
Key: Eq + Hash, Key: Debug + Eq + Hash,
Qey: Eq + Hash, Qey: Debug + Eq + Hash,
Val: Clone, Val: Clone,
We: Weighter<Key, Qey, Val> + Clone, We: Weighter<Key, Qey, Val> + Clone,
B: BuildHasher + Clone, B: BuildHasher + Clone,
> KQCacheWithTTLTask<Key, Qey, Val, We, B> > KQCacheWithTTLTask<Key, Qey, Val, We, B>
{ {
async fn run(self) { async fn run(self) {
while let Ok((expire_at, key, qey)) = self.rx.recv_async().await { trace!("watching for expirations on {}", self.name);
sleep_until(expire_at).await;
self.cache.remove(&key, &qey); while let Ok((expire_at, key, qey)) = self.rx.recv_async().await {
let now = Instant::now();
if expire_at > now {
if log_enabled!(log::Level::Trace) {
trace!(
"{}, {:?}, {:?} sleeping for {}ms.",
self.name,
key,
qey,
expire_at.duration_since(now).as_millis(),
);
}
sleep_until(expire_at).await;
trace!("{}, {:?}, {:?} done sleeping", self.name, key, qey);
} else {
trace!("no need to sleep!");
}
if self.cache.remove(&key, &qey) {
trace!("removed {}, {:?}, {:?}", self.name, key, qey);
} else {
trace!("empty {}, {:?}, {:?}", self.name, key, qey);
};
} }
trace!("watching for expirations on {}", self.name)
} }
} }
impl< impl<
'a, 'a,
Key: Clone + Hash + Eq, Key: Clone + Debug + Hash + Eq,
Qey: Clone + Hash + Eq, Qey: Clone + Debug + Hash + Eq,
Val: Clone, Val: Clone,
We: Weighter<Key, Qey, Val>, We: Weighter<Key, Qey, Val>,
B: BuildHasher, B: BuildHasher,
@ -193,6 +255,16 @@ impl<
self.inner.insert(val); self.inner.insert(val);
if log_enabled!(log::Level::Trace) {
trace!(
"{}, {:?}, {:?} expiring in {}s",
self.name,
self.key,
self.qey,
expire_at.duration_since(Instant::now()).as_secs_f32()
);
}
self.tx.send((expire_at, self.key, self.qey)).unwrap(); self.tx.send((expire_at, self.key, self.qey)).unwrap();
} }
} }

View File

@ -16,7 +16,7 @@ mod tests {
#[tokio::test(start_paused = true)] #[tokio::test(start_paused = true)]
async fn test_time_passing() { async fn test_time_passing() {
let x = CacheWithTTL::<u32, u32>::new(2, Duration::from_secs(2)).await; let x = CacheWithTTL::<u32, u32>::new("test", 2, Duration::from_secs(2)).await;
assert!(x.get(&0).is_none()); assert!(x.get(&0).is_none());
@ -38,7 +38,7 @@ mod tests {
#[tokio::test(start_paused = true)] #[tokio::test(start_paused = true)]
async fn test_capacity_based_eviction() { async fn test_capacity_based_eviction() {
let x = CacheWithTTL::<u32, ()>::new(1, Duration::from_secs(2)).await; let x = CacheWithTTL::<u32, ()>::new("test", 1, Duration::from_secs(2)).await;
assert!(x.get(&0).is_none()); assert!(x.get(&0).is_none());

View File

@ -507,8 +507,12 @@ impl Web3ProxyApp {
// if there is no database of users, there will be no keys and so this will be empty // if there is no database of users, there will be no keys and so this will be empty
// TODO: max_capacity from config // TODO: max_capacity from config
// TODO: ttl from config // TODO: ttl from config
let rpc_secret_key_cache = let rpc_secret_key_cache = CacheWithTTL::arc_with_capacity(
CacheWithTTL::arc_with_capacity(10_000, Duration::from_secs(600)).await; "rpc_secret_key_cache",
10_000,
Duration::from_secs(600),
)
.await;
// create a channel for receiving stats // create a channel for receiving stats
// we do this in a channel so we don't slow down our response to the users // we do this in a channel so we don't slow down our response to the users
@ -601,8 +605,12 @@ impl Web3ProxyApp {
// TODO: different chains might handle this differently // TODO: different chains might handle this differently
// TODO: what should we set? 5 minutes is arbitrary. the nodes themselves hold onto transactions for much longer // TODO: what should we set? 5 minutes is arbitrary. the nodes themselves hold onto transactions for much longer
// TODO: this used to be time_to_update, but // TODO: this used to be time_to_update, but
let pending_transactions = let pending_transactions = CacheWithTTL::arc_with_capacity(
CacheWithTTL::arc_with_capacity(10_000, Duration::from_secs(300)).await; "pending_transactions",
10_000,
Duration::from_secs(300),
)
.await;
// responses can be very different in sizes, so this is a cache with a max capacity and a weigher // responses can be very different in sizes, so this is a cache with a max capacity and a weigher
// TODO: we should emit stats to calculate a more accurate expected cache size // TODO: we should emit stats to calculate a more accurate expected cache size
@ -610,6 +618,7 @@ impl Web3ProxyApp {
// TODO: configurable max item weight instead of using ~0.1% // TODO: configurable max item weight instead of using ~0.1%
// TODO: resize the cache automatically // TODO: resize the cache automatically
let response_cache = JsonRpcResponseCache::new_with_weights( let response_cache = JsonRpcResponseCache::new_with_weights(
"response_cache",
(top_config.app.response_cache_max_bytes / 16_384) as usize, (top_config.app.response_cache_max_bytes / 16_384) as usize,
NonZeroU32::try_from((top_config.app.response_cache_max_bytes / 1024) as u32).unwrap(), NonZeroU32::try_from((top_config.app.response_cache_max_bytes / 1024) as u32).unwrap(),
top_config.app.response_cache_max_bytes, top_config.app.response_cache_max_bytes,

View File

@ -140,6 +140,7 @@ fn main() -> anyhow::Result<()> {
"ethers=debug", "ethers=debug",
"ethers_providers::rpc=off", "ethers_providers::rpc=off",
"ethers_providers=debug", "ethers_providers=debug",
"quick_cache_ttl=debug",
"redis_rate_limit=debug", "redis_rate_limit=debug",
"web3_proxy::rpcs::blockchain=info", "web3_proxy::rpcs::blockchain=info",
"web3_proxy::rpcs::request=debug", "web3_proxy::rpcs::request=debug",
@ -154,6 +155,7 @@ fn main() -> anyhow::Result<()> {
"ethers=debug", "ethers=debug",
"ethers_providers::rpc=off", "ethers_providers::rpc=off",
"ethers_providers=error", "ethers_providers=error",
"quick_cache_ttl=info",
"redis_rate_limit=debug", "redis_rate_limit=debug",
// "web3_proxy::stats::influxdb_queries=trace", // "web3_proxy::stats::influxdb_queries=trace",
"web3_proxy=debug", "web3_proxy=debug",

View File

@ -18,7 +18,7 @@ use axum::{
}; };
use http::{header::AUTHORIZATION, StatusCode}; use http::{header::AUTHORIZATION, StatusCode};
use listenfd::ListenFd; use listenfd::ListenFd;
use log::info; use log::{debug, info};
use quick_cache_ttl::UnitWeighter; use quick_cache_ttl::UnitWeighter;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
@ -31,7 +31,7 @@ use tower_http::sensitive_headers::SetSensitiveRequestHeadersLayer;
use self::errors::Web3ProxyResult; use self::errors::Web3ProxyResult;
/// simple keys for caching responses /// simple keys for caching responses
#[derive(Copy, Clone, Hash, PartialEq, Eq, EnumCount, EnumIter)] #[derive(Copy, Clone, Debug, Hash, PartialEq, Eq, EnumCount, EnumIter)]
pub enum ResponseCacheKey { pub enum ResponseCacheKey {
BackupsNeeded, BackupsNeeded,
Health, Health,
@ -57,7 +57,14 @@ pub async fn serve(
// TODO: latest moka allows for different ttls for different // TODO: latest moka allows for different ttls for different
let response_cache_size = ResponseCacheKey::COUNT; let response_cache_size = ResponseCacheKey::COUNT;
let response_cache = ResponseCache::new(response_cache_size, Duration::from_secs(1)).await; debug!("response_cache size: {}", response_cache_size);
let response_cache = ResponseCache::new(
"response_cache",
response_cache_size,
Duration::from_secs(1),
)
.await;
// TODO: read config for if fastest/versus should be available publicly. default off // TODO: read config for if fastest/versus should be available publicly. default off

View File

@ -12,6 +12,7 @@ use axum::{
Extension, Extension,
}; };
use axum_macros::debug_handler; use axum_macros::debug_handler;
use log::debug;
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use serde_json::json; use serde_json::json;
use std::sync::Arc; use std::sync::Arc;
@ -125,6 +126,8 @@ pub async fn status(
// TODO: _status doesn't need to be async, but _quick_cache_ttl needs an async function // TODO: _status doesn't need to be async, but _quick_cache_ttl needs an async function
#[inline] #[inline]
async fn _status(app: Arc<Web3ProxyApp>) -> (StatusCode, &'static str, Bytes) { async fn _status(app: Arc<Web3ProxyApp>) -> (StatusCode, &'static str, Bytes) {
debug!("status is not cached");
// TODO: what else should we include? uptime, cache hit rates, cpu load, memory used // TODO: what else should we include? uptime, cache hit rates, cpu load, memory used
// TODO: the hostname is probably not going to change. only get once at the start? // TODO: the hostname is probably not going to change. only get once at the start?
let body = json!({ let body = json!({

View File

@ -145,6 +145,7 @@ mod tests {
let response_cache: CacheWithTTL<u32, JsonRpcResponseData, JsonRpcResponseWeigher> = let response_cache: CacheWithTTL<u32, JsonRpcResponseData, JsonRpcResponseWeigher> =
CacheWithTTL::new_with_weights( CacheWithTTL::new_with_weights(
"test",
5, 5,
max_item_weight.try_into().unwrap(), max_item_weight.try_into().unwrap(),
weight_capacity, weight_capacity,

View File

@ -175,20 +175,20 @@ impl Web3Rpcs {
let block_num = block.number(); let block_num = block.number();
// this block is very likely already in block_hashes
// TODO: use their get_with
let block_hash = *block.hash();
// TODO: think more about heaviest_chain. would be better to do the check inside this function // TODO: think more about heaviest_chain. would be better to do the check inside this function
if heaviest_chain { if heaviest_chain {
// this is the only place that writes to block_numbers // this is the only place that writes to block_numbers
// multiple inserts should be okay though // multiple inserts should be okay though
// TODO: info that there was a fork? // TODO: info that there was a fork?
if let Err((k, v)) = self.blocks_by_number.try_insert(*block_num, *block.hash()) { self.blocks_by_number
warn!("unable to cache {} as {}", k, v); .get_or_insert_async(block_num, async move { block_hash })
} .await;
} }
// this block is very likely already in block_hashes
// TODO: use their get_with
let block_hash = *block.hash();
let block = self let block = self
.blocks_by_hash .blocks_by_hash
.get_or_insert_async(&block_hash, async move { block }) .get_or_insert_async(&block_hash, async move { block })

View File

@ -97,14 +97,16 @@ impl Web3Rpcs {
// these blocks don't have full transactions, but they do have rather variable amounts of transaction hashes // these blocks don't have full transactions, but they do have rather variable amounts of transaction hashes
// TODO: actual weighter on this // TODO: actual weighter on this
// TODO: time_to_idle instead? // TODO: time_to_idle instead?
let blocks_by_hash: BlocksByHashCache = let blocks_by_hash: BlocksByHashCache = Arc::new(
Arc::new(CacheWithTTL::new(10_000, Duration::from_secs(30 * 60)).await); CacheWithTTL::new("blocks_by_hash", 10_000, Duration::from_secs(30 * 60)).await,
);
// all block numbers are the same size, so no need for weigher // all block numbers are the same size, so no need for weigher
// TODO: limits from config // TODO: limits from config
// TODO: time_to_idle instead? // TODO: time_to_idle instead?
let blocks_by_number = let blocks_by_number = Arc::new(
Arc::new(CacheWithTTL::new(10_000, Duration::from_secs(30 * 60)).await); CacheWithTTL::new("blocks_by_number", 10_000, Duration::from_secs(30 * 60)).await,
);
let (watch_consensus_rpcs_sender, consensus_connections_watcher) = let (watch_consensus_rpcs_sender, consensus_connections_watcher) =
watch::channel(Default::default()); watch::channel(Default::default());
@ -1444,14 +1446,25 @@ mod tests {
watch_consensus_head_sender: Some(watch_consensus_head_sender), watch_consensus_head_sender: Some(watch_consensus_head_sender),
watch_consensus_rpcs_sender, watch_consensus_rpcs_sender,
pending_transaction_cache: CacheWithTTL::arc_with_capacity( pending_transaction_cache: CacheWithTTL::arc_with_capacity(
"pending_transaction_cache",
100, 100,
Duration::from_secs(60), Duration::from_secs(60),
) )
.await, .await,
pending_tx_id_receiver, pending_tx_id_receiver,
pending_tx_id_sender, pending_tx_id_sender,
blocks_by_hash: CacheWithTTL::arc_with_capacity(100, Duration::from_secs(60)).await, blocks_by_hash: CacheWithTTL::arc_with_capacity(
blocks_by_number: CacheWithTTL::arc_with_capacity(100, Duration::from_secs(60)).await, "blocks_by_hash",
100,
Duration::from_secs(60),
)
.await,
blocks_by_number: CacheWithTTL::arc_with_capacity(
"blocks_by_number",
100,
Duration::from_secs(60),
)
.await,
// TODO: test max_block_age? // TODO: test max_block_age?
max_block_age: None, max_block_age: None,
// TODO: test max_block_lag? // TODO: test max_block_lag?
@ -1722,14 +1735,25 @@ mod tests {
watch_consensus_head_sender: Some(watch_consensus_head_sender), watch_consensus_head_sender: Some(watch_consensus_head_sender),
watch_consensus_rpcs_sender, watch_consensus_rpcs_sender,
pending_transaction_cache: CacheWithTTL::arc_with_capacity( pending_transaction_cache: CacheWithTTL::arc_with_capacity(
"pending_transaction_cache",
100, 100,
Duration::from_secs(120), Duration::from_secs(120),
) )
.await, .await,
pending_tx_id_receiver, pending_tx_id_receiver,
pending_tx_id_sender, pending_tx_id_sender,
blocks_by_hash: CacheWithTTL::arc_with_capacity(100, Duration::from_secs(120)).await, blocks_by_hash: CacheWithTTL::arc_with_capacity(
blocks_by_number: CacheWithTTL::arc_with_capacity(100, Duration::from_secs(120)).await, "blocks_by_hash",
100,
Duration::from_secs(120),
)
.await,
blocks_by_number: CacheWithTTL::arc_with_capacity(
"blocks_by_number",
100,
Duration::from_secs(120),
)
.await,
min_head_rpcs: 1, min_head_rpcs: 1,
min_sum_soft_limit: 4_000, min_sum_soft_limit: 4_000,
max_block_age: None, max_block_age: None,
@ -1906,15 +1930,25 @@ mod tests {
watch_consensus_head_sender: Some(watch_consensus_head_sender), watch_consensus_head_sender: Some(watch_consensus_head_sender),
watch_consensus_rpcs_sender, watch_consensus_rpcs_sender,
pending_transaction_cache: CacheWithTTL::arc_with_capacity( pending_transaction_cache: CacheWithTTL::arc_with_capacity(
"pending_transaction_cache",
10_000, 10_000,
Duration::from_secs(120), Duration::from_secs(120),
) )
.await, .await,
pending_tx_id_receiver, pending_tx_id_receiver,
pending_tx_id_sender, pending_tx_id_sender,
blocks_by_hash: CacheWithTTL::arc_with_capacity(10_000, Duration::from_secs(120)).await, blocks_by_hash: CacheWithTTL::arc_with_capacity(
blocks_by_number: CacheWithTTL::arc_with_capacity(10_000, Duration::from_secs(120)) "blocks_by_hash",
.await, 10_000,
Duration::from_secs(120),
)
.await,
blocks_by_number: CacheWithTTL::arc_with_capacity(
"blocks_by_number",
10_000,
Duration::from_secs(120),
)
.await,
min_head_rpcs: 1, min_head_rpcs: 1,
min_sum_soft_limit: 1_000, min_sum_soft_limit: 1_000,
max_block_age: None, max_block_age: None,