quick cache ttl max item size, better function names, and tests (#97)
* Revert "remove cache on /status and /health for now" This reverts commit166b0d810c
. * Revert "remove cache on backups needed, too" This reverts commit4597967def
. * add tests * add max_item_weight * comment * add some helper functions, max weights, and tests
This commit is contained in:
parent
5f0c99530c
commit
bb50efb7f9
|
@ -45,8 +45,7 @@ where
|
||||||
// TODO: time to live is not exactly right. we want this ttl counter to start only after redis is down. this works for now
|
// TODO: time to live is not exactly right. we want this ttl counter to start only after redis is down. this works for now
|
||||||
// TODO: what do these weigh?
|
// TODO: what do these weigh?
|
||||||
// TODO: allow skipping max_capacity
|
// TODO: allow skipping max_capacity
|
||||||
let local_cache =
|
let local_cache = CacheWithTTL::new(cache_size, Duration::from_secs(ttl)).await;
|
||||||
CacheWithTTL::new_with_capacity(cache_size, Duration::from_secs(ttl)).await;
|
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
local_cache,
|
local_cache,
|
||||||
|
@ -86,7 +85,7 @@ where
|
||||||
|
|
||||||
// set arc_deferred_rate_limit_result and return the count
|
// set arc_deferred_rate_limit_result and return the count
|
||||||
self.local_cache
|
self.local_cache
|
||||||
.get_or_insert_async::<anyhow::Error, _>(&key, async move {
|
.try_get_or_insert_async::<anyhow::Error, _>(&key, async move {
|
||||||
// we do not use the try operator here because we want to be okay with redis errors
|
// we do not use the try operator here because we want to be okay with redis errors
|
||||||
let redis_count = match rrl
|
let redis_count = match rrl
|
||||||
.throttle_label(&redis_key, Some(max_requests_per_period), count)
|
.throttle_label(&redis_key, Some(max_requests_per_period), count)
|
||||||
|
|
|
@ -2,6 +2,7 @@ use quick_cache::{DefaultHashBuilder, UnitWeighter, Weighter};
|
||||||
use std::{
|
use std::{
|
||||||
future::Future,
|
future::Future,
|
||||||
hash::{BuildHasher, Hash},
|
hash::{BuildHasher, Hash},
|
||||||
|
num::NonZeroU32,
|
||||||
sync::Arc,
|
sync::Arc,
|
||||||
time::Duration,
|
time::Duration,
|
||||||
};
|
};
|
||||||
|
@ -15,9 +16,10 @@ pub struct CacheWithTTL<Key, Val, We = UnitWeighter, B = DefaultHashBuilder>(
|
||||||
impl<Key: Eq + Hash + Clone + Send + Sync + 'static, Val: Clone + Send + Sync + 'static>
|
impl<Key: Eq + Hash + Clone + Send + Sync + 'static, Val: Clone + Send + Sync + 'static>
|
||||||
CacheWithTTL<Key, Val, UnitWeighter, DefaultHashBuilder>
|
CacheWithTTL<Key, Val, UnitWeighter, DefaultHashBuilder>
|
||||||
{
|
{
|
||||||
pub async fn new_with_capacity(capacity: usize, ttl: Duration) -> Self {
|
pub async fn new(capacity: usize, ttl: Duration) -> Self {
|
||||||
Self::new(
|
Self::new_with_options(
|
||||||
capacity,
|
capacity,
|
||||||
|
1.try_into().unwrap(),
|
||||||
capacity as u64,
|
capacity as u64,
|
||||||
UnitWeighter,
|
UnitWeighter,
|
||||||
DefaultHashBuilder::default(),
|
DefaultHashBuilder::default(),
|
||||||
|
@ -27,7 +29,7 @@ impl<Key: Eq + Hash + Clone + Send + Sync + 'static, Val: Clone + Send + Sync +
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn arc_with_capacity(capacity: usize, ttl: Duration) -> Arc<Self> {
|
pub async fn arc_with_capacity(capacity: usize, ttl: Duration) -> Arc<Self> {
|
||||||
let x = Self::new_with_capacity(capacity, ttl).await;
|
let x = Self::new(capacity, ttl).await;
|
||||||
|
|
||||||
Arc::new(x)
|
Arc::new(x)
|
||||||
}
|
}
|
||||||
|
@ -42,12 +44,16 @@ impl<
|
||||||
{
|
{
|
||||||
pub async fn new_with_weights(
|
pub async fn new_with_weights(
|
||||||
estimated_items_capacity: usize,
|
estimated_items_capacity: usize,
|
||||||
|
max_item_weigth: NonZeroU32,
|
||||||
weight_capacity: u64,
|
weight_capacity: u64,
|
||||||
weighter: We,
|
weighter: We,
|
||||||
ttl: Duration,
|
ttl: Duration,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let inner = KQCacheWithTTL::new(
|
let max_item_weigth = max_item_weigth.min((weight_capacity as u32).try_into().unwrap());
|
||||||
|
|
||||||
|
let inner = KQCacheWithTTL::new_with_options(
|
||||||
estimated_items_capacity,
|
estimated_items_capacity,
|
||||||
|
max_item_weigth,
|
||||||
weight_capacity,
|
weight_capacity,
|
||||||
weighter,
|
weighter,
|
||||||
B::default(),
|
B::default(),
|
||||||
|
@ -57,24 +63,18 @@ impl<
|
||||||
|
|
||||||
Self(inner)
|
Self(inner)
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
impl<
|
pub async fn new_with_options(
|
||||||
Key: Eq + Hash + Clone + Send + Sync + 'static,
|
|
||||||
Val: Clone + Send + Sync + 'static,
|
|
||||||
We: Weighter<Key, (), Val> + Clone + Send + Sync + 'static,
|
|
||||||
B: BuildHasher + Clone + Send + Sync + 'static,
|
|
||||||
> CacheWithTTL<Key, Val, We, B>
|
|
||||||
{
|
|
||||||
pub async fn new(
|
|
||||||
estimated_items_capacity: usize,
|
estimated_items_capacity: usize,
|
||||||
|
max_item_weight: NonZeroU32,
|
||||||
weight_capacity: u64,
|
weight_capacity: u64,
|
||||||
weighter: We,
|
weighter: We,
|
||||||
hash_builder: B,
|
hash_builder: B,
|
||||||
ttl: Duration,
|
ttl: Duration,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let inner = KQCacheWithTTL::new(
|
let inner = KQCacheWithTTL::new_with_options(
|
||||||
estimated_items_capacity,
|
estimated_items_capacity,
|
||||||
|
max_item_weight,
|
||||||
weight_capacity,
|
weight_capacity,
|
||||||
weighter,
|
weighter,
|
||||||
hash_builder,
|
hash_builder,
|
||||||
|
@ -91,11 +91,19 @@ impl<
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
pub async fn get_or_insert_async<E, Fut>(&self, key: &Key, f: Fut) -> Result<Val, E>
|
pub async fn get_or_insert_async<Fut>(&self, key: &Key, f: Fut) -> Val
|
||||||
|
where
|
||||||
|
Fut: Future<Output = Val>,
|
||||||
|
{
|
||||||
|
self.0.get_or_insert_async(key, &(), f).await
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
pub async fn try_get_or_insert_async<E, Fut>(&self, key: &Key, f: Fut) -> Result<Val, E>
|
||||||
where
|
where
|
||||||
Fut: Future<Output = Result<Val, E>>,
|
Fut: Future<Output = Result<Val, E>>,
|
||||||
{
|
{
|
||||||
self.0.get_or_insert_async(key, &(), f).await
|
self.0.try_get_or_insert_async(key, &(), f).await
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
|
@ -106,9 +114,10 @@ impl<
|
||||||
self.0.get_value_or_guard_async(key, ()).await
|
self.0.get_value_or_guard_async(key, ()).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// if the item was too large to insert, it is returned with the error
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn insert(&self, key: Key, val: Val) {
|
pub fn try_insert(&self, key: Key, val: Val) -> Result<(), (Key, Val)> {
|
||||||
self.0.insert(key, (), val)
|
self.0.try_insert(key, (), val).map_err(|(k, _q, v)| (k, v))
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
|
|
|
@ -1,7 +1,9 @@
|
||||||
use quick_cache::sync::KQCache;
|
use quick_cache::sync::KQCache;
|
||||||
use quick_cache::{PlaceholderGuard, Weighter};
|
use quick_cache::{PlaceholderGuard, Weighter};
|
||||||
|
use std::convert::Infallible;
|
||||||
use std::future::Future;
|
use std::future::Future;
|
||||||
use std::hash::{BuildHasher, Hash};
|
use std::hash::{BuildHasher, Hash};
|
||||||
|
use std::num::NonZeroU32;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use tokio::task::JoinHandle;
|
use tokio::task::JoinHandle;
|
||||||
|
@ -9,9 +11,12 @@ use tokio::time::{sleep_until, Instant};
|
||||||
|
|
||||||
pub struct KQCacheWithTTL<Key, Qey, Val, We, B> {
|
pub struct KQCacheWithTTL<Key, Qey, Val, We, B> {
|
||||||
cache: Arc<KQCache<Key, Qey, Val, We, B>>,
|
cache: Arc<KQCache<Key, Qey, Val, We, B>>,
|
||||||
pub task_handle: JoinHandle<()>,
|
max_item_weight: NonZeroU32,
|
||||||
ttl: Duration,
|
ttl: Duration,
|
||||||
tx: flume::Sender<(Instant, Key, Qey)>,
|
tx: flume::Sender<(Instant, Key, Qey)>,
|
||||||
|
weighter: We,
|
||||||
|
|
||||||
|
pub task_handle: JoinHandle<()>,
|
||||||
}
|
}
|
||||||
|
|
||||||
struct KQCacheWithTTLTask<Key, Qey, Val, We, B> {
|
struct KQCacheWithTTLTask<Key, Qey, Val, We, B> {
|
||||||
|
@ -35,8 +40,9 @@ impl<
|
||||||
B: BuildHasher + Clone + Send + Sync + 'static,
|
B: BuildHasher + Clone + Send + Sync + 'static,
|
||||||
> KQCacheWithTTL<Key, Qey, Val, We, B>
|
> KQCacheWithTTL<Key, Qey, Val, We, B>
|
||||||
{
|
{
|
||||||
pub async fn new(
|
pub async fn new_with_options(
|
||||||
estimated_items_capacity: usize,
|
estimated_items_capacity: usize,
|
||||||
|
max_item_weight: NonZeroU32,
|
||||||
weight_capacity: u64,
|
weight_capacity: u64,
|
||||||
weighter: We,
|
weighter: We,
|
||||||
hash_builder: B,
|
hash_builder: B,
|
||||||
|
@ -47,7 +53,7 @@ impl<
|
||||||
let cache = KQCache::with(
|
let cache = KQCache::with(
|
||||||
estimated_items_capacity,
|
estimated_items_capacity,
|
||||||
weight_capacity,
|
weight_capacity,
|
||||||
weighter,
|
weighter.clone(),
|
||||||
hash_builder,
|
hash_builder,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -62,9 +68,11 @@ impl<
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
cache,
|
cache,
|
||||||
|
max_item_weight,
|
||||||
task_handle,
|
task_handle,
|
||||||
ttl,
|
ttl,
|
||||||
tx,
|
tx,
|
||||||
|
weighter,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -74,7 +82,23 @@ impl<
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
pub async fn get_or_insert_async<E, Fut>(&self, key: &Key, qey: &Qey, f: Fut) -> Result<Val, E>
|
pub async fn get_or_insert_async<Fut>(&self, key: &Key, qey: &Qey, f: Fut) -> Val
|
||||||
|
where
|
||||||
|
Fut: Future<Output = Val>,
|
||||||
|
{
|
||||||
|
self.cache
|
||||||
|
.get_or_insert_async::<Infallible>(key, qey, async move { Ok(f.await) })
|
||||||
|
.await
|
||||||
|
.expect("infallible")
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
pub async fn try_get_or_insert_async<E, Fut>(
|
||||||
|
&self,
|
||||||
|
key: &Key,
|
||||||
|
qey: &Qey,
|
||||||
|
f: Fut,
|
||||||
|
) -> Result<Val, E>
|
||||||
where
|
where
|
||||||
Fut: Future<Output = Result<Val, E>>,
|
Fut: Future<Output = Result<Val, E>>,
|
||||||
{
|
{
|
||||||
|
@ -99,14 +123,40 @@ impl<
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn insert(&self, key: Key, qey: Qey, val: Val) {
|
#[inline]
|
||||||
let expire_at = Instant::now() + self.ttl;
|
pub fn hits(&self) -> u64 {
|
||||||
|
self.cache.hits()
|
||||||
self.cache.insert(key.clone(), qey.clone(), val);
|
|
||||||
|
|
||||||
self.tx.send((expire_at, key, qey)).unwrap();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// if the item was too large to insert, it is returned with the error
|
||||||
|
#[inline]
|
||||||
|
pub fn try_insert(&self, key: Key, qey: Qey, val: Val) -> Result<(), (Key, Qey, Val)> {
|
||||||
|
let expire_at = Instant::now() + self.ttl;
|
||||||
|
|
||||||
|
let weight = self.weighter.weight(&key, &qey, &val);
|
||||||
|
|
||||||
|
if weight <= self.max_item_weight {
|
||||||
|
self.cache.insert(key.clone(), qey.clone(), val);
|
||||||
|
|
||||||
|
self.tx.send((expire_at, key, qey)).unwrap();
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
} else {
|
||||||
|
Err((key, qey, val))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
pub fn misses(&self) -> u64 {
|
||||||
|
self.cache.misses()
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
pub fn peek(&self, key: &Key, qey: &Qey) -> Option<Val> {
|
||||||
|
self.cache.peek(key, qey)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
pub fn remove(&self, key: &Key, qey: &Qey) -> bool {
|
pub fn remove(&self, key: &Key, qey: &Qey) -> bool {
|
||||||
self.cache.remove(key, qey)
|
self.cache.remove(key, qey)
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,3 +5,55 @@ pub use cache::CacheWithTTL;
|
||||||
pub use kq_cache::{KQCacheWithTTL, PlaceholderGuardWithTTL};
|
pub use kq_cache::{KQCacheWithTTL, PlaceholderGuardWithTTL};
|
||||||
pub use quick_cache::sync::{Cache, KQCache};
|
pub use quick_cache::sync::{Cache, KQCache};
|
||||||
pub use quick_cache::{DefaultHashBuilder, UnitWeighter, Weighter};
|
pub use quick_cache::{DefaultHashBuilder, UnitWeighter, Weighter};
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use std::time::Duration;
|
||||||
|
use tokio::task::yield_now;
|
||||||
|
use tokio::time;
|
||||||
|
|
||||||
|
use crate::CacheWithTTL;
|
||||||
|
|
||||||
|
#[tokio::test(start_paused = true)]
|
||||||
|
async fn test_time_passing() {
|
||||||
|
let x = CacheWithTTL::<u32, u32>::new(2, Duration::from_secs(2)).await;
|
||||||
|
|
||||||
|
assert!(x.get(&0).is_none());
|
||||||
|
|
||||||
|
x.try_insert(0, 0).unwrap();
|
||||||
|
|
||||||
|
assert!(x.get(&0).is_some());
|
||||||
|
|
||||||
|
time::advance(Duration::from_secs(1)).await;
|
||||||
|
|
||||||
|
assert!(x.get(&0).is_some());
|
||||||
|
|
||||||
|
time::advance(Duration::from_secs(1)).await;
|
||||||
|
|
||||||
|
// yield so that the expiration code gets a chance to run
|
||||||
|
yield_now().await;
|
||||||
|
|
||||||
|
assert!(x.get(&0).is_none());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test(start_paused = true)]
|
||||||
|
async fn test_capacity_based_eviction() {
|
||||||
|
let x = CacheWithTTL::<u32, ()>::new(1, Duration::from_secs(2)).await;
|
||||||
|
|
||||||
|
assert!(x.get(&0).is_none());
|
||||||
|
|
||||||
|
x.try_insert(0, ()).unwrap();
|
||||||
|
|
||||||
|
assert!(x.get(&0).is_some());
|
||||||
|
|
||||||
|
x.try_insert(1, ()).unwrap();
|
||||||
|
|
||||||
|
assert!(x.get(&1).is_some());
|
||||||
|
assert!(x.get(&0).is_none());
|
||||||
|
}
|
||||||
|
|
||||||
|
// #[tokio::test(start_paused = true)]
|
||||||
|
// async fn test_overweight() {
|
||||||
|
// todo!("wip");
|
||||||
|
// }
|
||||||
|
}
|
||||||
|
|
|
@ -13,7 +13,7 @@ use crate::jsonrpc::{
|
||||||
JsonRpcRequestEnum,
|
JsonRpcRequestEnum,
|
||||||
};
|
};
|
||||||
use crate::response_cache::{
|
use crate::response_cache::{
|
||||||
JsonRpcQueryCache, JsonRpcQueryCacheKey, JsonRpcQueryWeigher, JsonRpcResponseData,
|
JsonRpcResponseCache, JsonRpcResponseCacheKey, JsonRpcResponseData, JsonRpcResponseWeigher,
|
||||||
};
|
};
|
||||||
use crate::rpcs::blockchain::Web3ProxyBlock;
|
use crate::rpcs::blockchain::Web3ProxyBlock;
|
||||||
use crate::rpcs::consensus::ConsensusWeb3Rpcs;
|
use crate::rpcs::consensus::ConsensusWeb3Rpcs;
|
||||||
|
@ -36,7 +36,6 @@ use ethers::types::U256;
|
||||||
use ethers::utils::rlp::{Decodable, Rlp};
|
use ethers::utils::rlp::{Decodable, Rlp};
|
||||||
use futures::future::join_all;
|
use futures::future::join_all;
|
||||||
use futures::stream::{FuturesUnordered, StreamExt};
|
use futures::stream::{FuturesUnordered, StreamExt};
|
||||||
use hashbrown::hash_map::DefaultHashBuilder;
|
|
||||||
use hashbrown::{HashMap, HashSet};
|
use hashbrown::{HashMap, HashSet};
|
||||||
use ipnet::IpNet;
|
use ipnet::IpNet;
|
||||||
use log::{debug, error, info, trace, warn, Level};
|
use log::{debug, error, info, trace, warn, Level};
|
||||||
|
@ -54,7 +53,7 @@ use serde_json::json;
|
||||||
use std::borrow::Cow;
|
use std::borrow::Cow;
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
use std::net::IpAddr;
|
use std::net::IpAddr;
|
||||||
use std::num::NonZeroU64;
|
use std::num::{NonZeroU32, NonZeroU64};
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
use std::sync::{atomic, Arc};
|
use std::sync::{atomic, Arc};
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
@ -144,7 +143,7 @@ pub struct Web3ProxyApp {
|
||||||
/// TODO: include another type so that we can use private miner relays that do not use JSONRPC requests
|
/// TODO: include another type so that we can use private miner relays that do not use JSONRPC requests
|
||||||
pub private_rpcs: Option<Arc<Web3Rpcs>>,
|
pub private_rpcs: Option<Arc<Web3Rpcs>>,
|
||||||
/// track JSONRPC responses
|
/// track JSONRPC responses
|
||||||
pub jsonrpc_query_cache: JsonRpcQueryCache,
|
pub jsonrpc_response_cache: JsonRpcResponseCache,
|
||||||
/// rpc clients that subscribe to newHeads use this channel
|
/// rpc clients that subscribe to newHeads use this channel
|
||||||
/// don't drop this or the sender will stop working
|
/// don't drop this or the sender will stop working
|
||||||
/// TODO: broadcast channel instead?
|
/// TODO: broadcast channel instead?
|
||||||
|
@ -606,14 +605,15 @@ impl Web3ProxyApp {
|
||||||
CacheWithTTL::arc_with_capacity(10_000, Duration::from_secs(300)).await;
|
CacheWithTTL::arc_with_capacity(10_000, Duration::from_secs(300)).await;
|
||||||
|
|
||||||
// responses can be very different in sizes, so this is a cache with a max capacity and a weigher
|
// responses can be very different in sizes, so this is a cache with a max capacity and a weigher
|
||||||
// TODO: don't allow any response to be bigger than X% of the cache
|
|
||||||
// TODO: we should emit stats to calculate a more accurate expected cache size
|
// TODO: we should emit stats to calculate a more accurate expected cache size
|
||||||
// TODO: do we actually want a TTL on this?
|
// TODO: do we actually want a TTL on this?
|
||||||
let response_cache = JsonRpcQueryCache::new(
|
// TODO: configurable max item weight instead of using ~0.1%
|
||||||
(top_config.app.response_cache_max_bytes / 2048) as usize,
|
// TODO: resize the cache automatically
|
||||||
|
let response_cache = JsonRpcResponseCache::new_with_weights(
|
||||||
|
(top_config.app.response_cache_max_bytes / 16_384) as usize,
|
||||||
|
NonZeroU32::try_from((top_config.app.response_cache_max_bytes / 1024) as u32).unwrap(),
|
||||||
top_config.app.response_cache_max_bytes,
|
top_config.app.response_cache_max_bytes,
|
||||||
JsonRpcQueryWeigher,
|
JsonRpcResponseWeigher,
|
||||||
DefaultHashBuilder::default(),
|
|
||||||
Duration::from_secs(3600),
|
Duration::from_secs(3600),
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
@ -716,7 +716,7 @@ impl Web3ProxyApp {
|
||||||
http_client,
|
http_client,
|
||||||
kafka_producer,
|
kafka_producer,
|
||||||
private_rpcs,
|
private_rpcs,
|
||||||
jsonrpc_query_cache: response_cache,
|
jsonrpc_response_cache: response_cache,
|
||||||
watch_consensus_head_receiver,
|
watch_consensus_head_receiver,
|
||||||
pending_tx_sender,
|
pending_tx_sender,
|
||||||
pending_transactions,
|
pending_transactions,
|
||||||
|
@ -1637,7 +1637,7 @@ impl Web3ProxyApp {
|
||||||
// we do this check before checking caches because it might modify the request params
|
// we do this check before checking caches because it might modify the request params
|
||||||
// TODO: add a stat for archive vs full since they should probably cost different
|
// TODO: add a stat for archive vs full since they should probably cost different
|
||||||
// TODO: this cache key can be rather large. is that okay?
|
// TODO: this cache key can be rather large. is that okay?
|
||||||
let cache_key: Option<JsonRpcQueryCacheKey> = match block_needed(
|
let cache_key: Option<JsonRpcResponseCacheKey> = match block_needed(
|
||||||
authorization,
|
authorization,
|
||||||
method,
|
method,
|
||||||
request.params.as_mut(),
|
request.params.as_mut(),
|
||||||
|
@ -1646,7 +1646,7 @@ impl Web3ProxyApp {
|
||||||
)
|
)
|
||||||
.await?
|
.await?
|
||||||
{
|
{
|
||||||
BlockNeeded::CacheSuccessForever => Some(JsonRpcQueryCacheKey {
|
BlockNeeded::CacheSuccessForever => Some(JsonRpcResponseCacheKey {
|
||||||
from_block: None,
|
from_block: None,
|
||||||
to_block: None,
|
to_block: None,
|
||||||
method: method.to_string(),
|
method: method.to_string(),
|
||||||
|
@ -1675,7 +1675,7 @@ impl Web3ProxyApp {
|
||||||
.await?
|
.await?
|
||||||
.block;
|
.block;
|
||||||
|
|
||||||
Some(JsonRpcQueryCacheKey {
|
Some(JsonRpcResponseCacheKey {
|
||||||
from_block: Some(request_block),
|
from_block: Some(request_block),
|
||||||
to_block: None,
|
to_block: None,
|
||||||
method: method.to_string(),
|
method: method.to_string(),
|
||||||
|
@ -1717,7 +1717,7 @@ impl Web3ProxyApp {
|
||||||
.await?
|
.await?
|
||||||
.block;
|
.block;
|
||||||
|
|
||||||
Some(JsonRpcQueryCacheKey {
|
Some(JsonRpcResponseCacheKey {
|
||||||
from_block: Some(from_block),
|
from_block: Some(from_block),
|
||||||
to_block: Some(to_block),
|
to_block: Some(to_block),
|
||||||
method: method.to_string(),
|
method: method.to_string(),
|
||||||
|
@ -1737,7 +1737,7 @@ impl Web3ProxyApp {
|
||||||
let to_block_num = cache_key.to_block.as_ref().map(|x| x.number.unwrap());
|
let to_block_num = cache_key.to_block.as_ref().map(|x| x.number.unwrap());
|
||||||
|
|
||||||
match self
|
match self
|
||||||
.jsonrpc_query_cache
|
.jsonrpc_response_cache
|
||||||
.get_value_or_guard_async(cache_key).await
|
.get_value_or_guard_async(cache_key).await
|
||||||
{
|
{
|
||||||
Ok(x) => x,
|
Ok(x) => x,
|
||||||
|
|
|
@ -1091,7 +1091,7 @@ impl Web3ProxyApp {
|
||||||
rpc_secret_key: RpcSecretKey,
|
rpc_secret_key: RpcSecretKey,
|
||||||
) -> Web3ProxyResult<AuthorizationChecks> {
|
) -> Web3ProxyResult<AuthorizationChecks> {
|
||||||
self.rpc_secret_key_cache
|
self.rpc_secret_key_cache
|
||||||
.get_or_insert_async(&rpc_secret_key, async move {
|
.try_get_or_insert_async(&rpc_secret_key, async move {
|
||||||
// trace!(?rpc_secret_key, "user cache miss");
|
// trace!(?rpc_secret_key, "user cache miss");
|
||||||
|
|
||||||
let db_replica = self
|
let db_replica = self
|
||||||
|
|
|
@ -57,8 +57,7 @@ pub async fn serve(
|
||||||
// TODO: latest moka allows for different ttls for different
|
// TODO: latest moka allows for different ttls for different
|
||||||
let response_cache_size = ResponseCacheKey::COUNT;
|
let response_cache_size = ResponseCacheKey::COUNT;
|
||||||
|
|
||||||
let response_cache =
|
let response_cache = ResponseCache::new(response_cache_size, Duration::from_secs(1)).await;
|
||||||
ResponseCache::new_with_capacity(response_cache_size, Duration::from_secs(1)).await;
|
|
||||||
|
|
||||||
// TODO: read config for if fastest/versus should be available publicly. default off
|
// TODO: read config for if fastest/versus should be available publicly. default off
|
||||||
|
|
||||||
|
|
|
@ -14,7 +14,7 @@ use axum::{
|
||||||
use axum_macros::debug_handler;
|
use axum_macros::debug_handler;
|
||||||
use once_cell::sync::Lazy;
|
use once_cell::sync::Lazy;
|
||||||
use serde_json::json;
|
use serde_json::json;
|
||||||
use std::{convert::Infallible, sync::Arc};
|
use std::sync::Arc;
|
||||||
|
|
||||||
static HEALTH_OK: Lazy<Bytes> = Lazy::new(|| Bytes::from("OK\n"));
|
static HEALTH_OK: Lazy<Bytes> = Lazy::new(|| Bytes::from("OK\n"));
|
||||||
static HEALTH_NOT_OK: Lazy<Bytes> = Lazy::new(|| Bytes::from(":(\n"));
|
static HEALTH_NOT_OK: Lazy<Bytes> = Lazy::new(|| Bytes::from(":(\n"));
|
||||||
|
@ -31,15 +31,9 @@ pub async fn health(
|
||||||
Extension(app): Extension<Arc<Web3ProxyApp>>,
|
Extension(app): Extension<Arc<Web3ProxyApp>>,
|
||||||
Extension(cache): Extension<Arc<ResponseCache>>,
|
Extension(cache): Extension<Arc<ResponseCache>>,
|
||||||
) -> impl IntoResponse {
|
) -> impl IntoResponse {
|
||||||
// let (code, content_type, body) = cache
|
let (code, content_type, body) = cache
|
||||||
// .get_or_insert_async::<Infallible, _>(&ResponseCacheKey::Health, async move {
|
.get_or_insert_async(&ResponseCacheKey::Health, async move { _health(app).await })
|
||||||
// Ok(_health(app).await)
|
.await;
|
||||||
// })
|
|
||||||
// .await
|
|
||||||
// .expect("this cache get is infallible");
|
|
||||||
|
|
||||||
// TODO: cache this once new TTLs work
|
|
||||||
let (code, content_type, body) = _health(app).await;
|
|
||||||
|
|
||||||
Response::builder()
|
Response::builder()
|
||||||
.status(code)
|
.status(code)
|
||||||
|
@ -68,15 +62,11 @@ pub async fn backups_needed(
|
||||||
Extension(app): Extension<Arc<Web3ProxyApp>>,
|
Extension(app): Extension<Arc<Web3ProxyApp>>,
|
||||||
Extension(cache): Extension<Arc<ResponseCache>>,
|
Extension(cache): Extension<Arc<ResponseCache>>,
|
||||||
) -> impl IntoResponse {
|
) -> impl IntoResponse {
|
||||||
// let (code, content_type, body) = cache
|
let (code, content_type, body) = cache
|
||||||
// .get_or_insert_async::<Infallible, _>(&ResponseCacheKey::BackupsNeeded, async move {
|
.get_or_insert_async(&ResponseCacheKey::BackupsNeeded, async move {
|
||||||
// Ok(_backups_needed(app).await)
|
_backups_needed(app).await
|
||||||
// })
|
})
|
||||||
// .await
|
.await;
|
||||||
// .expect("this cache get is infallible");
|
|
||||||
|
|
||||||
// TODO: cache this once new TTLs work
|
|
||||||
let (code, content_type, body) = _backups_needed(app).await;
|
|
||||||
|
|
||||||
Response::builder()
|
Response::builder()
|
||||||
.status(code)
|
.status(code)
|
||||||
|
@ -121,15 +111,9 @@ pub async fn status(
|
||||||
Extension(app): Extension<Arc<Web3ProxyApp>>,
|
Extension(app): Extension<Arc<Web3ProxyApp>>,
|
||||||
Extension(cache): Extension<Arc<ResponseCache>>,
|
Extension(cache): Extension<Arc<ResponseCache>>,
|
||||||
) -> impl IntoResponse {
|
) -> impl IntoResponse {
|
||||||
// let (code, content_type, body) = cache
|
let (code, content_type, body) = cache
|
||||||
// .get_or_insert_async::<Infallible, _>(&ResponseCacheKey::Status, async move {
|
.get_or_insert_async(&ResponseCacheKey::Status, async move { _status(app).await })
|
||||||
// Ok(_status(app).await)
|
.await;
|
||||||
// })
|
|
||||||
// .await
|
|
||||||
// .expect("this cache get is infallible");
|
|
||||||
|
|
||||||
// TODO: cache this once new TTLs work
|
|
||||||
let (code, content_type, body) = _status(app).await;
|
|
||||||
|
|
||||||
Response::builder()
|
Response::builder()
|
||||||
.status(code)
|
.status(code)
|
||||||
|
|
|
@ -3,7 +3,6 @@ use crate::{
|
||||||
};
|
};
|
||||||
use derive_more::From;
|
use derive_more::From;
|
||||||
use ethers::providers::ProviderError;
|
use ethers::providers::ProviderError;
|
||||||
use hashbrown::hash_map::DefaultHashBuilder;
|
|
||||||
use quick_cache_ttl::{CacheWithTTL, Weighter};
|
use quick_cache_ttl::{CacheWithTTL, Weighter};
|
||||||
use serde_json::value::RawValue;
|
use serde_json::value::RawValue;
|
||||||
use std::{
|
use std::{
|
||||||
|
@ -13,7 +12,7 @@ use std::{
|
||||||
};
|
};
|
||||||
|
|
||||||
#[derive(Clone, Debug, From, PartialEq, Eq)]
|
#[derive(Clone, Debug, From, PartialEq, Eq)]
|
||||||
pub struct JsonRpcQueryCacheKey {
|
pub struct JsonRpcResponseCacheKey {
|
||||||
pub from_block: Option<ArcBlock>,
|
pub from_block: Option<ArcBlock>,
|
||||||
pub to_block: Option<ArcBlock>,
|
pub to_block: Option<ArcBlock>,
|
||||||
pub method: String,
|
pub method: String,
|
||||||
|
@ -21,7 +20,7 @@ pub struct JsonRpcQueryCacheKey {
|
||||||
pub cache_errors: bool,
|
pub cache_errors: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Hash for JsonRpcQueryCacheKey {
|
impl Hash for JsonRpcResponseCacheKey {
|
||||||
fn hash<H: Hasher>(&self, state: &mut H) {
|
fn hash<H: Hasher>(&self, state: &mut H) {
|
||||||
self.from_block.as_ref().map(|x| x.hash).hash(state);
|
self.from_block.as_ref().map(|x| x.hash).hash(state);
|
||||||
self.to_block.as_ref().map(|x| x.hash).hash(state);
|
self.to_block.as_ref().map(|x| x.hash).hash(state);
|
||||||
|
@ -34,25 +33,21 @@ impl Hash for JsonRpcQueryCacheKey {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub type JsonRpcQueryCache = CacheWithTTL<
|
pub type JsonRpcResponseCache =
|
||||||
JsonRpcQueryCacheKey,
|
CacheWithTTL<JsonRpcResponseCacheKey, JsonRpcResponseData, JsonRpcResponseWeigher>;
|
||||||
JsonRpcResponseData,
|
|
||||||
JsonRpcQueryWeigher,
|
|
||||||
DefaultHashBuilder,
|
|
||||||
>;
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct JsonRpcQueryWeigher;
|
pub struct JsonRpcResponseWeigher;
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone, Debug)]
|
||||||
pub enum JsonRpcResponseData {
|
pub enum JsonRpcResponseData {
|
||||||
Result {
|
Result {
|
||||||
value: Box<RawValue>,
|
value: Box<RawValue>,
|
||||||
size: Option<NonZeroU32>,
|
num_bytes: NonZeroU32,
|
||||||
},
|
},
|
||||||
Error {
|
Error {
|
||||||
value: JsonRpcErrorData,
|
value: JsonRpcErrorData,
|
||||||
size: Option<NonZeroU32>,
|
num_bytes: NonZeroU32,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -60,16 +55,8 @@ impl JsonRpcResponseData {
|
||||||
pub fn num_bytes(&self) -> NonZeroU32 {
|
pub fn num_bytes(&self) -> NonZeroU32 {
|
||||||
// TODO: dry this somehow
|
// TODO: dry this somehow
|
||||||
match self {
|
match self {
|
||||||
JsonRpcResponseData::Result { value, size } => size.unwrap_or_else(|| {
|
JsonRpcResponseData::Result { num_bytes, .. } => *num_bytes,
|
||||||
let size = value.get().len();
|
JsonRpcResponseData::Error { num_bytes, .. } => *num_bytes,
|
||||||
|
|
||||||
NonZeroU32::new(size.clamp(1, u32::MAX as usize) as u32).unwrap()
|
|
||||||
}),
|
|
||||||
JsonRpcResponseData::Error { value, size } => size.unwrap_or_else(|| {
|
|
||||||
let size = serde_json::to_string(value).unwrap().len();
|
|
||||||
|
|
||||||
NonZeroU32::new(size.clamp(1, u32::MAX as usize) as u32).unwrap()
|
|
||||||
}),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -78,19 +65,28 @@ impl From<serde_json::Value> for JsonRpcResponseData {
|
||||||
fn from(value: serde_json::Value) -> Self {
|
fn from(value: serde_json::Value) -> Self {
|
||||||
let value = RawValue::from_string(value.to_string()).unwrap();
|
let value = RawValue::from_string(value.to_string()).unwrap();
|
||||||
|
|
||||||
Self::Result { value, size: None }
|
value.into()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<Box<RawValue>> for JsonRpcResponseData {
|
impl From<Box<RawValue>> for JsonRpcResponseData {
|
||||||
fn from(value: Box<RawValue>) -> Self {
|
fn from(value: Box<RawValue>) -> Self {
|
||||||
Self::Result { value, size: None }
|
let num_bytes = value.get().len();
|
||||||
|
|
||||||
|
let num_bytes = NonZeroU32::try_from(num_bytes as u32).unwrap();
|
||||||
|
|
||||||
|
Self::Result { value, num_bytes }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<JsonRpcErrorData> for JsonRpcResponseData {
|
impl From<JsonRpcErrorData> for JsonRpcResponseData {
|
||||||
fn from(value: JsonRpcErrorData) -> Self {
|
fn from(value: JsonRpcErrorData) -> Self {
|
||||||
Self::Error { value, size: None }
|
// TODO: wrap the error in a complete response?
|
||||||
|
let num_bytes = serde_json::to_string(&value).unwrap().len();
|
||||||
|
|
||||||
|
let num_bytes = NonZeroU32::try_from(num_bytes as u32).unwrap();
|
||||||
|
|
||||||
|
Self::Error { value, num_bytes }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -130,13 +126,61 @@ impl TryFrom<ProviderError> for JsonRpcErrorData {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Weighter<JsonRpcQueryCacheKey, (), JsonRpcResponseData> for JsonRpcQueryWeigher {
|
impl<K, Q> Weighter<K, Q, JsonRpcResponseData> for JsonRpcResponseWeigher {
|
||||||
fn weight(
|
fn weight(&self, _key: &K, _qey: &Q, value: &JsonRpcResponseData) -> NonZeroU32 {
|
||||||
&self,
|
|
||||||
_key: &JsonRpcQueryCacheKey,
|
|
||||||
_qey: &(),
|
|
||||||
value: &JsonRpcResponseData,
|
|
||||||
) -> NonZeroU32 {
|
|
||||||
value.num_bytes()
|
value.num_bytes()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::{JsonRpcResponseData, JsonRpcResponseWeigher};
|
||||||
|
use quick_cache_ttl::CacheWithTTL;
|
||||||
|
use std::{num::NonZeroU32, time::Duration};
|
||||||
|
|
||||||
|
#[tokio::test(start_paused = true)]
|
||||||
|
async fn test_json_rpc_query_weigher() {
|
||||||
|
let max_item_weight = 200;
|
||||||
|
let weight_capacity = 1_000;
|
||||||
|
|
||||||
|
let response_cache: CacheWithTTL<u32, JsonRpcResponseData, JsonRpcResponseWeigher> =
|
||||||
|
CacheWithTTL::new_with_weights(
|
||||||
|
5,
|
||||||
|
max_item_weight.try_into().unwrap(),
|
||||||
|
weight_capacity,
|
||||||
|
JsonRpcResponseWeigher,
|
||||||
|
Duration::from_secs(2),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
let small_data: JsonRpcResponseData = JsonRpcResponseData::Result {
|
||||||
|
value: Default::default(),
|
||||||
|
num_bytes: NonZeroU32::try_from(max_item_weight / 2).unwrap(),
|
||||||
|
};
|
||||||
|
|
||||||
|
let max_sized_data = JsonRpcResponseData::Result {
|
||||||
|
value: Default::default(),
|
||||||
|
num_bytes: NonZeroU32::try_from(max_item_weight).unwrap(),
|
||||||
|
};
|
||||||
|
|
||||||
|
let oversized_data = JsonRpcResponseData::Result {
|
||||||
|
value: Default::default(),
|
||||||
|
num_bytes: NonZeroU32::try_from(max_item_weight * 2).unwrap(),
|
||||||
|
};
|
||||||
|
|
||||||
|
response_cache.try_insert(0, small_data).unwrap();
|
||||||
|
|
||||||
|
response_cache.get(&0).unwrap();
|
||||||
|
|
||||||
|
response_cache.try_insert(1, max_sized_data).unwrap();
|
||||||
|
|
||||||
|
response_cache.get(&0).unwrap();
|
||||||
|
response_cache.get(&1).unwrap();
|
||||||
|
|
||||||
|
response_cache.try_insert(2, oversized_data).unwrap_err();
|
||||||
|
|
||||||
|
response_cache.get(&0).unwrap();
|
||||||
|
response_cache.get(&1).unwrap();
|
||||||
|
assert!(response_cache.get(&2).is_none());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -14,7 +14,6 @@ use quick_cache_ttl::CacheWithTTL;
|
||||||
use serde::ser::SerializeStruct;
|
use serde::ser::SerializeStruct;
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
use serde_json::json;
|
use serde_json::json;
|
||||||
use std::convert::Infallible;
|
|
||||||
use std::hash::Hash;
|
use std::hash::Hash;
|
||||||
use std::{cmp::Ordering, fmt::Display, sync::Arc};
|
use std::{cmp::Ordering, fmt::Display, sync::Arc};
|
||||||
use tokio::sync::broadcast;
|
use tokio::sync::broadcast;
|
||||||
|
@ -181,7 +180,9 @@ impl Web3Rpcs {
|
||||||
// this is the only place that writes to block_numbers
|
// this is the only place that writes to block_numbers
|
||||||
// multiple inserts should be okay though
|
// multiple inserts should be okay though
|
||||||
// TODO: info that there was a fork?
|
// TODO: info that there was a fork?
|
||||||
self.blocks_by_number.insert(*block_num, *block.hash());
|
if let Err((k, v)) = self.blocks_by_number.try_insert(*block_num, *block.hash()) {
|
||||||
|
warn!("unable to cache {} as {}", k, v);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// this block is very likely already in block_hashes
|
// this block is very likely already in block_hashes
|
||||||
|
@ -190,9 +191,8 @@ impl Web3Rpcs {
|
||||||
|
|
||||||
let block = self
|
let block = self
|
||||||
.blocks_by_hash
|
.blocks_by_hash
|
||||||
.get_or_insert_async::<Infallible, _>(&block_hash, async move { Ok(block) })
|
.get_or_insert_async(&block_hash, async move { block })
|
||||||
.await
|
.await;
|
||||||
.expect("this cache get is infallible");
|
|
||||||
|
|
||||||
Ok(block)
|
Ok(block)
|
||||||
}
|
}
|
||||||
|
|
|
@ -98,13 +98,13 @@ impl Web3Rpcs {
|
||||||
// TODO: actual weighter on this
|
// TODO: actual weighter on this
|
||||||
// TODO: time_to_idle instead?
|
// TODO: time_to_idle instead?
|
||||||
let blocks_by_hash: BlocksByHashCache =
|
let blocks_by_hash: BlocksByHashCache =
|
||||||
Arc::new(CacheWithTTL::new_with_capacity(10_000, Duration::from_secs(30 * 60)).await);
|
Arc::new(CacheWithTTL::new(10_000, Duration::from_secs(30 * 60)).await);
|
||||||
|
|
||||||
// all block numbers are the same size, so no need for weigher
|
// all block numbers are the same size, so no need for weigher
|
||||||
// TODO: limits from config
|
// TODO: limits from config
|
||||||
// TODO: time_to_idle instead?
|
// TODO: time_to_idle instead?
|
||||||
let blocks_by_number =
|
let blocks_by_number =
|
||||||
Arc::new(CacheWithTTL::new_with_capacity(10_000, Duration::from_secs(30 * 60)).await);
|
Arc::new(CacheWithTTL::new(10_000, Duration::from_secs(30 * 60)).await);
|
||||||
|
|
||||||
let (watch_consensus_rpcs_sender, consensus_connections_watcher) =
|
let (watch_consensus_rpcs_sender, consensus_connections_watcher) =
|
||||||
watch::channel(Default::default());
|
watch::channel(Default::default());
|
||||||
|
@ -339,7 +339,7 @@ impl Web3Rpcs {
|
||||||
.spawn(async move {
|
.spawn(async move {
|
||||||
loop {
|
loop {
|
||||||
sleep(Duration::from_secs(600)).await;
|
sleep(Duration::from_secs(600)).await;
|
||||||
// TODO: "every interval, check that the provider is still connected"
|
// TODO: "every interval, do a health check or disconnect the rpc"
|
||||||
}
|
}
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
|
|
|
@ -21,7 +21,6 @@ use redis_rate_limiter::{RedisPool, RedisRateLimitResult, RedisRateLimiter};
|
||||||
use serde::ser::{SerializeStruct, Serializer};
|
use serde::ser::{SerializeStruct, Serializer};
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
use serde_json::json;
|
use serde_json::json;
|
||||||
use std::convert::Infallible;
|
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
use std::hash::{Hash, Hasher};
|
use std::hash::{Hash, Hasher};
|
||||||
use std::sync::atomic::{self, AtomicU64, AtomicUsize};
|
use std::sync::atomic::{self, AtomicU64, AtomicUsize};
|
||||||
|
@ -444,12 +443,8 @@ impl Web3Rpc {
|
||||||
|
|
||||||
// if we already have this block saved, set new_head_block to that arc. otherwise store this copy
|
// if we already have this block saved, set new_head_block to that arc. otherwise store this copy
|
||||||
let new_head_block = block_map
|
let new_head_block = block_map
|
||||||
.get_or_insert_async::<Infallible, _>(
|
.get_or_insert_async(&new_hash, async move { new_head_block })
|
||||||
&new_hash,
|
.await;
|
||||||
async move { Ok(new_head_block) },
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.expect("this cache get is infallible");
|
|
||||||
|
|
||||||
// save the block so we don't send the same one multiple times
|
// save the block so we don't send the same one multiple times
|
||||||
// also save so that archive checks can know how far back to query
|
// also save so that archive checks can know how far back to query
|
||||||
|
|
Loading…
Reference in New Issue