use ttl quick cache

This commit is contained in:
Bryan Stitt 2023-05-15 10:48:59 -07:00
parent b204a1f8ca
commit f4cebde53f
9 changed files with 278 additions and 70 deletions

2
Cargo.lock generated

@ -6708,7 +6708,7 @@ dependencies = [
"parking_lot 0.12.1",
"prettytable",
"proctitle",
"quick_cache",
"quick_cache_ttl",
"rdkafka",
"redis-rate-limiter",
"regex",

@ -0,0 +1,78 @@
use quick_cache::{DefaultHashBuilder, UnitWeighter, Weighter};
use std::{
future::Future,
hash::{BuildHasher, Hash},
time::Duration,
};
use crate::{KQCacheWithTTL, PlaceholderGuardWithTTL};
pub struct CacheWithTTL<Key, Val, We, B>(KQCacheWithTTL<Key, (), Val, We, B>);
impl<Key: Eq + Hash + Clone + Send + Sync + 'static, Val: Clone + Send + Sync + 'static>
CacheWithTTL<Key, Val, UnitWeighter, DefaultHashBuilder>
{
pub async fn new_with_unit_weights(estimated_items_capacity: usize, ttl: Duration) -> Self {
Self::new(
estimated_items_capacity,
estimated_items_capacity as u64,
UnitWeighter,
DefaultHashBuilder::default(),
ttl,
)
.await
}
}
impl<
Key: Eq + Hash + Clone + Send + Sync + 'static,
Val: Clone + Send + Sync + 'static,
We: Weighter<Key, (), Val> + Clone + Send + Sync + 'static,
B: BuildHasher + Clone + Send + Sync + 'static,
> CacheWithTTL<Key, Val, We, B>
{
pub async fn new(
estimated_items_capacity: usize,
weight_capacity: u64,
weighter: We,
hash_builder: B,
ttl: Duration,
) -> Self {
let inner = KQCacheWithTTL::new(
estimated_items_capacity,
weight_capacity,
weighter,
hash_builder,
ttl,
)
.await;
Self(inner)
}
#[inline]
pub async fn get_or_insert_async<E, Fut>(&self, key: &Key, f: Fut) -> Result<Val, E>
where
Fut: Future<Output = Result<Val, E>>,
{
self.0.get_or_insert_async(key, &(), f).await
}
#[inline]
pub async fn get_value_or_guard_async(
&self,
key: Key,
) -> Result<Val, PlaceholderGuardWithTTL<'_, Key, (), Val, We, B>> {
self.0.get_value_or_guard_async(key, ()).await
}
#[inline]
pub fn insert(&self, key: Key, val: Val) {
self.0.insert(key, (), val)
}
#[inline]
pub fn remove(&self, key: &Key) -> bool {
self.0.remove(key, &())
}
}

@ -0,0 +1,143 @@
use quick_cache::sync::KQCache;
use quick_cache::{PlaceholderGuard, Weighter};
use std::future::Future;
use std::hash::{BuildHasher, Hash};
use std::sync::Arc;
use std::time::Duration;
use tokio::task::JoinHandle;
use tokio::time::{sleep_until, Instant};
pub struct KQCacheWithTTL<Key, Qey, Val, We, B> {
pub(crate) cache: Arc<KQCache<Key, Qey, Val, We, B>>,
pub task_handle: JoinHandle<()>,
ttl: Duration,
pub(crate) tx: flume::Sender<(Instant, Key, Qey)>,
}
struct KQCacheWithTTLTask<Key, Qey, Val, We, B> {
cache: Arc<KQCache<Key, Qey, Val, We, B>>,
rx: flume::Receiver<(Instant, Key, Qey)>,
}
pub struct PlaceholderGuardWithTTL<'a, Key, Qey, Val, We, B> {
inner: PlaceholderGuard<'a, Key, Qey, Val, We, B>,
key: Key,
qey: Qey,
ttl: Duration,
tx: &'a flume::Sender<(Instant, Key, Qey)>,
}
impl<
Key: Eq + Hash + Clone + Send + Sync + 'static,
Qey: Eq + Hash + Clone + Send + Sync + 'static,
Val: Clone + Send + Sync + 'static,
We: Weighter<Key, Qey, Val> + Clone + Send + Sync + 'static,
B: BuildHasher + Clone + Send + Sync + 'static,
> KQCacheWithTTL<Key, Qey, Val, We, B>
{
pub async fn new(
estimated_items_capacity: usize,
weight_capacity: u64,
weighter: We,
hash_builder: B,
ttl: Duration,
) -> Self {
let (tx, rx) = flume::unbounded();
let cache = KQCache::with(
estimated_items_capacity,
weight_capacity,
weighter,
hash_builder,
);
let cache = Arc::new(cache);
let task = KQCacheWithTTLTask {
cache: cache.clone(),
rx,
};
let task_handle = tokio::spawn(task.run());
Self {
cache,
task_handle,
ttl,
tx,
}
}
#[inline]
pub async fn get_or_insert_async<E, Fut>(&self, key: &Key, qey: &Qey, f: Fut) -> Result<Val, E>
where
Fut: Future<Output = Result<Val, E>>,
{
self.cache.get_or_insert_async(key, qey, f).await
}
#[inline]
pub async fn get_value_or_guard_async(
&self,
key: Key,
qey: Qey,
) -> Result<Val, PlaceholderGuardWithTTL<'_, Key, Qey, Val, We, B>> {
match self.cache.get_value_or_guard_async(&key, &qey).await {
Ok(x) => Ok(x),
Err(inner) => Err(PlaceholderGuardWithTTL {
inner,
key,
qey,
ttl: self.ttl,
tx: &self.tx,
}),
}
}
pub fn insert(&self, key: Key, qey: Qey, val: Val) {
let expire_at = Instant::now() + self.ttl;
self.cache.insert(key.clone(), qey.clone(), val);
self.tx.send((expire_at, key, qey)).unwrap();
}
pub fn remove(&self, key: &Key, qey: &Qey) -> bool {
self.cache.remove(key, qey)
}
}
impl<
Key: Eq + Hash,
Qey: Eq + Hash,
Val: Clone,
We: Weighter<Key, Qey, Val> + Clone,
B: BuildHasher + Clone,
> KQCacheWithTTLTask<Key, Qey, Val, We, B>
{
async fn run(self) {
while let Ok((expire_at, key, qey)) = self.rx.recv_async().await {
sleep_until(expire_at).await;
self.cache.remove(&key, &qey);
}
}
}
impl<
'a,
Key: Clone + Hash + Eq,
Qey: Clone + Hash + Eq,
Val: Clone,
We: Weighter<Key, Qey, Val>,
B: BuildHasher,
> PlaceholderGuardWithTTL<'a, Key, Qey, Val, We, B>
{
pub fn insert(self, val: Val) {
let expire_at = Instant::now() + self.ttl;
self.inner.insert(val);
self.tx.send((expire_at, self.key, self.qey)).unwrap();
}
}

@ -3,3 +3,4 @@ mod kq_cache;
pub use cache::CacheWithTTL;
pub use kq_cache::{KQCacheWithTTL, PlaceholderGuardWithTTL};
pub use quick_cache::{DefaultHashBuilder, UnitWeighter, Weighter};

@ -19,6 +19,7 @@ deferred-rate-limiter = { path = "../deferred-rate-limiter" }
entities = { path = "../entities" }
latency = { path = "../latency" }
migration = { path = "../migration" }
quick_cache_ttl = { path = "../quick_cache_ttl" }
redis-rate-limiter = { path = "../redis-rate-limiter" }
thread-fast-rng = { path = "../thread-fast-rng" }
@ -70,7 +71,6 @@ pagerduty-rs = { version = "0.1.6", default-features = false, features = ["async
parking_lot = { version = "0.12.1", features = ["arc_lock"] }
prettytable = "*"
proctitle = "0.1.1"
quick_cache = "0.3.0"
rdkafka = { version = "0.30.0" }
regex = "1.8.1"
reqwest = { version = "0.11.17", default-features = false, features = ["json", "tokio-rustls"] }

@ -36,6 +36,7 @@ use ethers::types::U256;
use ethers::utils::rlp::{Decodable, Rlp};
use futures::future::join_all;
use futures::stream::{FuturesUnordered, StreamExt};
use hashbrown::hash_map::DefaultHashBuilder;
use hashbrown::{HashMap, HashSet};
use ipnet::IpNet;
use log::{debug, error, info, trace, warn, Level};
@ -617,11 +618,15 @@ impl Web3ProxyApp {
// responses can be very different in sizes, so this is a cache with a max capacity and a weigher
// TODO: don't allow any response to be bigger than X% of the cache
// TODO: we should emit stats to calculate a more accurate expected cache size
let response_cache = JsonRpcQueryCache::with_weighter(
// TODO: do we actually want a TTL on this?
let response_cache = JsonRpcQueryCache::new(
(top_config.app.response_cache_max_bytes / 2048) as usize,
top_config.app.response_cache_max_bytes,
JsonRpcQueryWeigher,
);
DefaultHashBuilder::default(),
Duration::from_secs(3600),
)
.await;
// create semaphores for concurrent connection limits
// TODO: what should tti be for semaphores?
@ -1756,7 +1761,7 @@ impl Web3ProxyApp {
match self
.jsonrpc_query_cache
.get_value_or_guard_async(&cache_key).await
.get_value_or_guard_async(cache_key).await
{
Ok(x) => x,
Err(x) => {

@ -19,11 +19,12 @@ use axum::{
use http::{header::AUTHORIZATION, StatusCode};
use listenfd::ListenFd;
use log::info;
use std::iter::once;
use quick_cache_ttl::UnitWeighter;
use std::net::SocketAddr;
use std::sync::Arc;
use std::{iter::once, time::Duration};
use strum::{EnumCount, EnumIter};
use tokio::{sync::broadcast, time::Instant};
use tokio::sync::broadcast;
use tower_http::cors::CorsLayer;
use tower_http::sensitive_headers::SetSensitiveRequestHeadersLayer;
@ -35,8 +36,12 @@ pub enum FrontendResponseCacheKey {
Status,
}
pub type FrontendJsonResponseCache =
quick_cache::sync::Cache<FrontendResponseCacheKey, ((StatusCode, axum::body::Bytes), Instant)>;
pub type FrontendJsonResponseCache = quick_cache_ttl::CacheWithTTL<
FrontendResponseCacheKey,
(StatusCode, axum::body::Bytes),
UnitWeighter,
quick_cache_ttl::DefaultHashBuilder,
>;
/// Start the frontend server.
pub async fn serve(
@ -50,7 +55,11 @@ pub async fn serve(
// TODO: latest moka allows for different ttls for different
let response_cache_size = FrontendResponseCacheKey::COUNT;
let json_response_cache = FrontendJsonResponseCache::new(response_cache_size);
let json_response_cache = FrontendJsonResponseCache::new_with_unit_weights(
response_cache_size,
Duration::from_secs(1),
)
.await;
// TODO: read config for if fastest/versus should be available publicly. default off

@ -4,17 +4,12 @@
//! They will eventually move to another port.
use super::{FrontendJsonResponseCache, FrontendResponseCacheKey};
use crate::{
app::{Web3ProxyApp, APP_USER_AGENT},
frontend::errors::Web3ProxyError,
};
use crate::app::{Web3ProxyApp, APP_USER_AGENT};
use axum::{body::Bytes, http::StatusCode, response::IntoResponse, Extension};
use axum_macros::debug_handler;
use futures::Future;
use once_cell::sync::Lazy;
use serde_json::json;
use std::{sync::Arc, time::Duration};
use tokio::time::Instant;
use std::{convert::Infallible, sync::Arc};
static HEALTH_OK: Lazy<Bytes> = Lazy::new(|| Bytes::from("OK\n"));
static HEALTH_NOT_OK: Lazy<Bytes> = Lazy::new(|| Bytes::from(":(\n"));
@ -22,54 +17,21 @@ static HEALTH_NOT_OK: Lazy<Bytes> = Lazy::new(|| Bytes::from(":(\n"));
static BACKUPS_NEEDED_TRUE: Lazy<Bytes> = Lazy::new(|| Bytes::from("true\n"));
static BACKUPS_NEEDED_FALSE: Lazy<Bytes> = Lazy::new(|| Bytes::from("false\n"));
/// simple ttl for
// TODO: make this generic for any cache/key
async fn _quick_cache_ttl<Fut>(
app: Arc<Web3ProxyApp>,
cache: Arc<FrontendJsonResponseCache>,
key: FrontendResponseCacheKey,
f: impl Fn(Arc<Web3ProxyApp>) -> Fut,
) -> (StatusCode, Bytes)
where
Fut: Future<Output = (StatusCode, Bytes)>,
{
let mut response;
let expire_at;
(response, expire_at) = cache
.get_or_insert_async::<Web3ProxyError>(&key, async {
let expire_at = Instant::now() + Duration::from_millis(1000);
let response = f(app.clone()).await;
Ok((response, expire_at))
})
.await
.unwrap();
if Instant::now() >= expire_at {
// TODO: this expiration isn't perfect
// parallel requests could overwrite eachother
// its good enough for now
let expire_at = Instant::now() + Duration::from_millis(1000);
response = f(app).await;
cache.insert(key, (response.clone(), expire_at));
}
response
}
/// Health check page for load balancers to use.
#[debug_handler]
pub async fn health(
Extension(app): Extension<Arc<Web3ProxyApp>>,
Extension(cache): Extension<Arc<FrontendJsonResponseCache>>,
) -> impl IntoResponse {
_quick_cache_ttl(app, cache, FrontendResponseCacheKey::Health, _health).await
cache
.get_or_insert_async::<Infallible, _>(&FrontendResponseCacheKey::Health, async move {
Ok(_health(app).await)
})
.await
}
// TODO: _health doesn't need to be async, but _quick_cache_ttl needs an async function
#[inline]
async fn _health(app: Arc<Web3ProxyApp>) -> (StatusCode, Bytes) {
if app.balanced_rpcs.synced() {
(StatusCode::OK, HEALTH_OK.clone())
@ -84,15 +46,15 @@ pub async fn backups_needed(
Extension(app): Extension<Arc<Web3ProxyApp>>,
Extension(cache): Extension<Arc<FrontendJsonResponseCache>>,
) -> impl IntoResponse {
_quick_cache_ttl(
app,
cache,
FrontendResponseCacheKey::BackupsNeeded,
_backups_needed,
)
.await
cache
.get_or_insert_async::<Infallible, _>(
&FrontendResponseCacheKey::BackupsNeeded,
async move { Ok(_backups_needed(app).await) },
)
.await
}
#[inline]
async fn _backups_needed(app: Arc<Web3ProxyApp>) -> (StatusCode, Bytes) {
let code = {
let consensus_rpcs = app
@ -122,16 +84,21 @@ async fn _backups_needed(app: Arc<Web3ProxyApp>) -> (StatusCode, Bytes) {
/// Very basic status page.
///
/// TODO: replace this with proper stats and monitoring
/// TODO: replace this with proper stats and monitoring. frontend uses it for their public dashboards though
#[debug_handler]
pub async fn status(
Extension(app): Extension<Arc<Web3ProxyApp>>,
Extension(cache): Extension<Arc<FrontendJsonResponseCache>>,
) -> impl IntoResponse {
_quick_cache_ttl(app, cache, FrontendResponseCacheKey::Status, _status).await
cache
.get_or_insert_async::<Infallible, _>(&FrontendResponseCacheKey::Status, async move {
Ok(_status(app).await)
})
.await
}
// TODO: this doesn't need to be async, but _quick_cache_ttl needs an async function
// TODO: _status doesn't need to be async, but _quick_cache_ttl needs an async function
#[inline]
async fn _status(app: Arc<Web3ProxyApp>) -> (StatusCode, Bytes) {
// TODO: what else should we include? uptime, cache hit rates, cpu load, memory used
// TODO: the hostname is probably not going to change. only get once at the start?

@ -3,7 +3,8 @@ use crate::{
};
use derive_more::From;
use ethers::providers::ProviderError;
use quick_cache::{sync::Cache as QuickCache, Weighter};
use hashbrown::hash_map::DefaultHashBuilder;
use quick_cache_ttl::{CacheWithTTL, Weighter};
use serde_json::value::RawValue;
use std::{
borrow::Cow,
@ -33,8 +34,12 @@ impl Hash for JsonRpcQueryCacheKey {
}
}
pub type JsonRpcQueryCache =
QuickCache<JsonRpcQueryCacheKey, JsonRpcResponseData, JsonRpcQueryWeigher>;
pub type JsonRpcQueryCache = CacheWithTTL<
JsonRpcQueryCacheKey,
JsonRpcResponseData,
JsonRpcQueryWeigher,
DefaultHashBuilder,
>;
#[derive(Clone)]
pub struct JsonRpcQueryWeigher;