From f4cebde53f31c0b1ae128a263b8c2b1a71d1f51a Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Mon, 15 May 2023 10:48:59 -0700 Subject: [PATCH] use ttl quick cache --- Cargo.lock | 2 +- quick_cache_ttl/src/cache.rs | 78 ++++++++++++++++ quick_cache_ttl/src/kq_cache.rs | 143 ++++++++++++++++++++++++++++++ quick_cache_ttl/src/lib.rs | 1 + web3_proxy/Cargo.toml | 2 +- web3_proxy/src/app/mod.rs | 11 ++- web3_proxy/src/frontend/mod.rs | 19 ++-- web3_proxy/src/frontend/status.rs | 81 +++++------------ web3_proxy/src/response_cache.rs | 11 ++- 9 files changed, 278 insertions(+), 70 deletions(-) create mode 100644 quick_cache_ttl/src/cache.rs create mode 100644 quick_cache_ttl/src/kq_cache.rs diff --git a/Cargo.lock b/Cargo.lock index 0cdee203..4292411b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6708,7 +6708,7 @@ dependencies = [ "parking_lot 0.12.1", "prettytable", "proctitle", - "quick_cache", + "quick_cache_ttl", "rdkafka", "redis-rate-limiter", "regex", diff --git a/quick_cache_ttl/src/cache.rs b/quick_cache_ttl/src/cache.rs new file mode 100644 index 00000000..6afceb23 --- /dev/null +++ b/quick_cache_ttl/src/cache.rs @@ -0,0 +1,78 @@ +use quick_cache::{DefaultHashBuilder, UnitWeighter, Weighter}; +use std::{ + future::Future, + hash::{BuildHasher, Hash}, + time::Duration, +}; + +use crate::{KQCacheWithTTL, PlaceholderGuardWithTTL}; + +pub struct CacheWithTTL(KQCacheWithTTL); + +impl + CacheWithTTL +{ + pub async fn new_with_unit_weights(estimated_items_capacity: usize, ttl: Duration) -> Self { + Self::new( + estimated_items_capacity, + estimated_items_capacity as u64, + UnitWeighter, + DefaultHashBuilder::default(), + ttl, + ) + .await + } +} + +impl< + Key: Eq + Hash + Clone + Send + Sync + 'static, + Val: Clone + Send + Sync + 'static, + We: Weighter + Clone + Send + Sync + 'static, + B: BuildHasher + Clone + Send + Sync + 'static, + > CacheWithTTL +{ + pub async fn new( + estimated_items_capacity: usize, + weight_capacity: u64, + weighter: We, + hash_builder: B, + ttl: Duration, + ) -> Self { + let inner = KQCacheWithTTL::new( + estimated_items_capacity, + weight_capacity, + weighter, + hash_builder, + ttl, + ) + .await; + + Self(inner) + } + + #[inline] + pub async fn get_or_insert_async(&self, key: &Key, f: Fut) -> Result + where + Fut: Future>, + { + self.0.get_or_insert_async(key, &(), f).await + } + + #[inline] + pub async fn get_value_or_guard_async( + &self, + key: Key, + ) -> Result> { + self.0.get_value_or_guard_async(key, ()).await + } + + #[inline] + pub fn insert(&self, key: Key, val: Val) { + self.0.insert(key, (), val) + } + + #[inline] + pub fn remove(&self, key: &Key) -> bool { + self.0.remove(key, &()) + } +} diff --git a/quick_cache_ttl/src/kq_cache.rs b/quick_cache_ttl/src/kq_cache.rs new file mode 100644 index 00000000..5859cb8d --- /dev/null +++ b/quick_cache_ttl/src/kq_cache.rs @@ -0,0 +1,143 @@ +use quick_cache::sync::KQCache; +use quick_cache::{PlaceholderGuard, Weighter}; +use std::future::Future; +use std::hash::{BuildHasher, Hash}; +use std::sync::Arc; +use std::time::Duration; +use tokio::task::JoinHandle; +use tokio::time::{sleep_until, Instant}; + +pub struct KQCacheWithTTL { + pub(crate) cache: Arc>, + pub task_handle: JoinHandle<()>, + ttl: Duration, + pub(crate) tx: flume::Sender<(Instant, Key, Qey)>, +} + +struct KQCacheWithTTLTask { + cache: Arc>, + rx: flume::Receiver<(Instant, Key, Qey)>, +} + +pub struct PlaceholderGuardWithTTL<'a, Key, Qey, Val, We, B> { + inner: PlaceholderGuard<'a, Key, Qey, Val, We, B>, + key: Key, + qey: Qey, + ttl: Duration, + tx: &'a flume::Sender<(Instant, Key, Qey)>, +} + +impl< + Key: Eq + Hash + Clone + Send + Sync + 'static, + Qey: Eq + Hash + Clone + Send + Sync + 'static, + Val: Clone + Send + Sync + 'static, + We: Weighter + Clone + Send + Sync + 'static, + B: BuildHasher + Clone + Send + Sync + 'static, + > KQCacheWithTTL +{ + pub async fn new( + estimated_items_capacity: usize, + weight_capacity: u64, + weighter: We, + hash_builder: B, + ttl: Duration, + ) -> Self { + let (tx, rx) = flume::unbounded(); + + let cache = KQCache::with( + estimated_items_capacity, + weight_capacity, + weighter, + hash_builder, + ); + + let cache = Arc::new(cache); + + let task = KQCacheWithTTLTask { + cache: cache.clone(), + rx, + }; + + let task_handle = tokio::spawn(task.run()); + + Self { + cache, + task_handle, + ttl, + tx, + } + } + + #[inline] + pub async fn get_or_insert_async(&self, key: &Key, qey: &Qey, f: Fut) -> Result + where + Fut: Future>, + { + self.cache.get_or_insert_async(key, qey, f).await + } + + #[inline] + pub async fn get_value_or_guard_async( + &self, + key: Key, + qey: Qey, + ) -> Result> { + match self.cache.get_value_or_guard_async(&key, &qey).await { + Ok(x) => Ok(x), + Err(inner) => Err(PlaceholderGuardWithTTL { + inner, + key, + qey, + ttl: self.ttl, + tx: &self.tx, + }), + } + } + + pub fn insert(&self, key: Key, qey: Qey, val: Val) { + let expire_at = Instant::now() + self.ttl; + + self.cache.insert(key.clone(), qey.clone(), val); + + self.tx.send((expire_at, key, qey)).unwrap(); + } + + pub fn remove(&self, key: &Key, qey: &Qey) -> bool { + self.cache.remove(key, qey) + } +} + +impl< + Key: Eq + Hash, + Qey: Eq + Hash, + Val: Clone, + We: Weighter + Clone, + B: BuildHasher + Clone, + > KQCacheWithTTLTask +{ + async fn run(self) { + while let Ok((expire_at, key, qey)) = self.rx.recv_async().await { + sleep_until(expire_at).await; + + self.cache.remove(&key, &qey); + } + } +} + +impl< + 'a, + Key: Clone + Hash + Eq, + Qey: Clone + Hash + Eq, + Val: Clone, + We: Weighter, + B: BuildHasher, + > PlaceholderGuardWithTTL<'a, Key, Qey, Val, We, B> +{ + pub fn insert(self, val: Val) { + let expire_at = Instant::now() + self.ttl; + + self.inner.insert(val); + + self.tx.send((expire_at, self.key, self.qey)).unwrap(); + } +} diff --git a/quick_cache_ttl/src/lib.rs b/quick_cache_ttl/src/lib.rs index a538704e..b2a05332 100644 --- a/quick_cache_ttl/src/lib.rs +++ b/quick_cache_ttl/src/lib.rs @@ -3,3 +3,4 @@ mod kq_cache; pub use cache::CacheWithTTL; pub use kq_cache::{KQCacheWithTTL, PlaceholderGuardWithTTL}; +pub use quick_cache::{DefaultHashBuilder, UnitWeighter, Weighter}; diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index 91c2b26a..bca468c6 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -19,6 +19,7 @@ deferred-rate-limiter = { path = "../deferred-rate-limiter" } entities = { path = "../entities" } latency = { path = "../latency" } migration = { path = "../migration" } +quick_cache_ttl = { path = "../quick_cache_ttl" } redis-rate-limiter = { path = "../redis-rate-limiter" } thread-fast-rng = { path = "../thread-fast-rng" } @@ -70,7 +71,6 @@ pagerduty-rs = { version = "0.1.6", default-features = false, features = ["async parking_lot = { version = "0.12.1", features = ["arc_lock"] } prettytable = "*" proctitle = "0.1.1" -quick_cache = "0.3.0" rdkafka = { version = "0.30.0" } regex = "1.8.1" reqwest = { version = "0.11.17", default-features = false, features = ["json", "tokio-rustls"] } diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index a8e8ad01..0a1c188a 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -36,6 +36,7 @@ use ethers::types::U256; use ethers::utils::rlp::{Decodable, Rlp}; use futures::future::join_all; use futures::stream::{FuturesUnordered, StreamExt}; +use hashbrown::hash_map::DefaultHashBuilder; use hashbrown::{HashMap, HashSet}; use ipnet::IpNet; use log::{debug, error, info, trace, warn, Level}; @@ -617,11 +618,15 @@ impl Web3ProxyApp { // responses can be very different in sizes, so this is a cache with a max capacity and a weigher // TODO: don't allow any response to be bigger than X% of the cache // TODO: we should emit stats to calculate a more accurate expected cache size - let response_cache = JsonRpcQueryCache::with_weighter( + // TODO: do we actually want a TTL on this? + let response_cache = JsonRpcQueryCache::new( (top_config.app.response_cache_max_bytes / 2048) as usize, top_config.app.response_cache_max_bytes, JsonRpcQueryWeigher, - ); + DefaultHashBuilder::default(), + Duration::from_secs(3600), + ) + .await; // create semaphores for concurrent connection limits // TODO: what should tti be for semaphores? @@ -1756,7 +1761,7 @@ impl Web3ProxyApp { match self .jsonrpc_query_cache - .get_value_or_guard_async(&cache_key).await + .get_value_or_guard_async(cache_key).await { Ok(x) => x, Err(x) => { diff --git a/web3_proxy/src/frontend/mod.rs b/web3_proxy/src/frontend/mod.rs index 0aca1b02..b5ebbbe5 100644 --- a/web3_proxy/src/frontend/mod.rs +++ b/web3_proxy/src/frontend/mod.rs @@ -19,11 +19,12 @@ use axum::{ use http::{header::AUTHORIZATION, StatusCode}; use listenfd::ListenFd; use log::info; -use std::iter::once; +use quick_cache_ttl::UnitWeighter; use std::net::SocketAddr; use std::sync::Arc; +use std::{iter::once, time::Duration}; use strum::{EnumCount, EnumIter}; -use tokio::{sync::broadcast, time::Instant}; +use tokio::sync::broadcast; use tower_http::cors::CorsLayer; use tower_http::sensitive_headers::SetSensitiveRequestHeadersLayer; @@ -35,8 +36,12 @@ pub enum FrontendResponseCacheKey { Status, } -pub type FrontendJsonResponseCache = - quick_cache::sync::Cache; +pub type FrontendJsonResponseCache = quick_cache_ttl::CacheWithTTL< + FrontendResponseCacheKey, + (StatusCode, axum::body::Bytes), + UnitWeighter, + quick_cache_ttl::DefaultHashBuilder, +>; /// Start the frontend server. pub async fn serve( @@ -50,7 +55,11 @@ pub async fn serve( // TODO: latest moka allows for different ttls for different let response_cache_size = FrontendResponseCacheKey::COUNT; - let json_response_cache = FrontendJsonResponseCache::new(response_cache_size); + let json_response_cache = FrontendJsonResponseCache::new_with_unit_weights( + response_cache_size, + Duration::from_secs(1), + ) + .await; // TODO: read config for if fastest/versus should be available publicly. default off diff --git a/web3_proxy/src/frontend/status.rs b/web3_proxy/src/frontend/status.rs index 4bc30aa3..a23b0a70 100644 --- a/web3_proxy/src/frontend/status.rs +++ b/web3_proxy/src/frontend/status.rs @@ -4,17 +4,12 @@ //! They will eventually move to another port. use super::{FrontendJsonResponseCache, FrontendResponseCacheKey}; -use crate::{ - app::{Web3ProxyApp, APP_USER_AGENT}, - frontend::errors::Web3ProxyError, -}; +use crate::app::{Web3ProxyApp, APP_USER_AGENT}; use axum::{body::Bytes, http::StatusCode, response::IntoResponse, Extension}; use axum_macros::debug_handler; -use futures::Future; use once_cell::sync::Lazy; use serde_json::json; -use std::{sync::Arc, time::Duration}; -use tokio::time::Instant; +use std::{convert::Infallible, sync::Arc}; static HEALTH_OK: Lazy = Lazy::new(|| Bytes::from("OK\n")); static HEALTH_NOT_OK: Lazy = Lazy::new(|| Bytes::from(":(\n")); @@ -22,54 +17,21 @@ static HEALTH_NOT_OK: Lazy = Lazy::new(|| Bytes::from(":(\n")); static BACKUPS_NEEDED_TRUE: Lazy = Lazy::new(|| Bytes::from("true\n")); static BACKUPS_NEEDED_FALSE: Lazy = Lazy::new(|| Bytes::from("false\n")); -/// simple ttl for -// TODO: make this generic for any cache/key -async fn _quick_cache_ttl( - app: Arc, - cache: Arc, - key: FrontendResponseCacheKey, - f: impl Fn(Arc) -> Fut, -) -> (StatusCode, Bytes) -where - Fut: Future, -{ - let mut response; - let expire_at; - - (response, expire_at) = cache - .get_or_insert_async::(&key, async { - let expire_at = Instant::now() + Duration::from_millis(1000); - - let response = f(app.clone()).await; - - Ok((response, expire_at)) - }) - .await - .unwrap(); - - if Instant::now() >= expire_at { - // TODO: this expiration isn't perfect - // parallel requests could overwrite eachother - // its good enough for now - let expire_at = Instant::now() + Duration::from_millis(1000); - - response = f(app).await; - - cache.insert(key, (response.clone(), expire_at)); - } - - response -} - /// Health check page for load balancers to use. #[debug_handler] pub async fn health( Extension(app): Extension>, Extension(cache): Extension>, ) -> impl IntoResponse { - _quick_cache_ttl(app, cache, FrontendResponseCacheKey::Health, _health).await + cache + .get_or_insert_async::(&FrontendResponseCacheKey::Health, async move { + Ok(_health(app).await) + }) + .await } +// TODO: _health doesn't need to be async, but _quick_cache_ttl needs an async function +#[inline] async fn _health(app: Arc) -> (StatusCode, Bytes) { if app.balanced_rpcs.synced() { (StatusCode::OK, HEALTH_OK.clone()) @@ -84,15 +46,15 @@ pub async fn backups_needed( Extension(app): Extension>, Extension(cache): Extension>, ) -> impl IntoResponse { - _quick_cache_ttl( - app, - cache, - FrontendResponseCacheKey::BackupsNeeded, - _backups_needed, - ) - .await + cache + .get_or_insert_async::( + &FrontendResponseCacheKey::BackupsNeeded, + async move { Ok(_backups_needed(app).await) }, + ) + .await } +#[inline] async fn _backups_needed(app: Arc) -> (StatusCode, Bytes) { let code = { let consensus_rpcs = app @@ -122,16 +84,21 @@ async fn _backups_needed(app: Arc) -> (StatusCode, Bytes) { /// Very basic status page. /// -/// TODO: replace this with proper stats and monitoring +/// TODO: replace this with proper stats and monitoring. frontend uses it for their public dashboards though #[debug_handler] pub async fn status( Extension(app): Extension>, Extension(cache): Extension>, ) -> impl IntoResponse { - _quick_cache_ttl(app, cache, FrontendResponseCacheKey::Status, _status).await + cache + .get_or_insert_async::(&FrontendResponseCacheKey::Status, async move { + Ok(_status(app).await) + }) + .await } -// TODO: this doesn't need to be async, but _quick_cache_ttl needs an async function +// TODO: _status doesn't need to be async, but _quick_cache_ttl needs an async function +#[inline] async fn _status(app: Arc) -> (StatusCode, Bytes) { // TODO: what else should we include? uptime, cache hit rates, cpu load, memory used // TODO: the hostname is probably not going to change. only get once at the start? diff --git a/web3_proxy/src/response_cache.rs b/web3_proxy/src/response_cache.rs index c7e710bd..7b269847 100644 --- a/web3_proxy/src/response_cache.rs +++ b/web3_proxy/src/response_cache.rs @@ -3,7 +3,8 @@ use crate::{ }; use derive_more::From; use ethers::providers::ProviderError; -use quick_cache::{sync::Cache as QuickCache, Weighter}; +use hashbrown::hash_map::DefaultHashBuilder; +use quick_cache_ttl::{CacheWithTTL, Weighter}; use serde_json::value::RawValue; use std::{ borrow::Cow, @@ -33,8 +34,12 @@ impl Hash for JsonRpcQueryCacheKey { } } -pub type JsonRpcQueryCache = - QuickCache; +pub type JsonRpcQueryCache = CacheWithTTL< + JsonRpcQueryCacheKey, + JsonRpcResponseData, + JsonRpcQueryWeigher, + DefaultHashBuilder, +>; #[derive(Clone)] pub struct JsonRpcQueryWeigher;