diff --git a/Cargo.lock b/Cargo.lock index 15b0d7ab..4292411b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4225,6 +4225,15 @@ dependencies = [ "parking_lot 0.12.1", ] +[[package]] +name = "quick_cache_ttl" +version = "0.1.0" +dependencies = [ + "flume", + "quick_cache", + "tokio", +] + [[package]] name = "quote" version = "1.0.27" @@ -6699,7 +6708,7 @@ dependencies = [ "parking_lot 0.12.1", "prettytable", "proctitle", - "quick_cache", + "quick_cache_ttl", "rdkafka", "redis-rate-limiter", "regex", diff --git a/Cargo.toml b/Cargo.toml index 3c0a9c22..12175b98 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,7 @@ members = [ "entities", "latency", "migration", + "quick_cache_ttl", "rate-counter", "redis-rate-limiter", "thread-fast-rng", diff --git a/latency/src/peak_ewma/mod.rs b/latency/src/peak_ewma/mod.rs index 24712229..059ba06e 100644 --- a/latency/src/peak_ewma/mod.rs +++ b/latency/src/peak_ewma/mod.rs @@ -63,6 +63,7 @@ impl PeakEwmaLatency { ); // Update the RTT estimate to account for decay since the last update. + // TODO: having an update here means we don't actually write from just one thread!! Thats how we get partially written stuff i think estimate.update(0.0, self.decay_ns, now) } diff --git a/quick_cache_ttl/Cargo.toml b/quick_cache_ttl/Cargo.toml new file mode 100644 index 00000000..d0102b72 --- /dev/null +++ b/quick_cache_ttl/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "quick_cache_ttl" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +flume = "0.10.14" +quick_cache = "0.3.0" +tokio = { version = "1.28.1", features = ["full"] } + +[dev-dependencies] +tokio = { version = "1.28.1", features = ["full", "test-util"] } diff --git a/quick_cache_ttl/src/cache.rs b/quick_cache_ttl/src/cache.rs new file mode 100644 index 00000000..6afceb23 --- /dev/null +++ b/quick_cache_ttl/src/cache.rs @@ -0,0 +1,78 @@ +use quick_cache::{DefaultHashBuilder, UnitWeighter, Weighter}; +use std::{ + future::Future, + hash::{BuildHasher, Hash}, + time::Duration, +}; + +use crate::{KQCacheWithTTL, PlaceholderGuardWithTTL}; + +pub struct CacheWithTTL(KQCacheWithTTL); + +impl + CacheWithTTL +{ + pub async fn new_with_unit_weights(estimated_items_capacity: usize, ttl: Duration) -> Self { + Self::new( + estimated_items_capacity, + estimated_items_capacity as u64, + UnitWeighter, + DefaultHashBuilder::default(), + ttl, + ) + .await + } +} + +impl< + Key: Eq + Hash + Clone + Send + Sync + 'static, + Val: Clone + Send + Sync + 'static, + We: Weighter + Clone + Send + Sync + 'static, + B: BuildHasher + Clone + Send + Sync + 'static, + > CacheWithTTL +{ + pub async fn new( + estimated_items_capacity: usize, + weight_capacity: u64, + weighter: We, + hash_builder: B, + ttl: Duration, + ) -> Self { + let inner = KQCacheWithTTL::new( + estimated_items_capacity, + weight_capacity, + weighter, + hash_builder, + ttl, + ) + .await; + + Self(inner) + } + + #[inline] + pub async fn get_or_insert_async(&self, key: &Key, f: Fut) -> Result + where + Fut: Future>, + { + self.0.get_or_insert_async(key, &(), f).await + } + + #[inline] + pub async fn get_value_or_guard_async( + &self, + key: Key, + ) -> Result> { + self.0.get_value_or_guard_async(key, ()).await + } + + #[inline] + pub fn insert(&self, key: Key, val: Val) { + self.0.insert(key, (), val) + } + + #[inline] + pub fn remove(&self, key: &Key) -> bool { + self.0.remove(key, &()) + } +} diff --git a/quick_cache_ttl/src/kq_cache.rs b/quick_cache_ttl/src/kq_cache.rs new file mode 100644 index 00000000..5859cb8d --- /dev/null +++ b/quick_cache_ttl/src/kq_cache.rs @@ -0,0 +1,143 @@ +use quick_cache::sync::KQCache; +use quick_cache::{PlaceholderGuard, Weighter}; +use std::future::Future; +use std::hash::{BuildHasher, Hash}; +use std::sync::Arc; +use std::time::Duration; +use tokio::task::JoinHandle; +use tokio::time::{sleep_until, Instant}; + +pub struct KQCacheWithTTL { + pub(crate) cache: Arc>, + pub task_handle: JoinHandle<()>, + ttl: Duration, + pub(crate) tx: flume::Sender<(Instant, Key, Qey)>, +} + +struct KQCacheWithTTLTask { + cache: Arc>, + rx: flume::Receiver<(Instant, Key, Qey)>, +} + +pub struct PlaceholderGuardWithTTL<'a, Key, Qey, Val, We, B> { + inner: PlaceholderGuard<'a, Key, Qey, Val, We, B>, + key: Key, + qey: Qey, + ttl: Duration, + tx: &'a flume::Sender<(Instant, Key, Qey)>, +} + +impl< + Key: Eq + Hash + Clone + Send + Sync + 'static, + Qey: Eq + Hash + Clone + Send + Sync + 'static, + Val: Clone + Send + Sync + 'static, + We: Weighter + Clone + Send + Sync + 'static, + B: BuildHasher + Clone + Send + Sync + 'static, + > KQCacheWithTTL +{ + pub async fn new( + estimated_items_capacity: usize, + weight_capacity: u64, + weighter: We, + hash_builder: B, + ttl: Duration, + ) -> Self { + let (tx, rx) = flume::unbounded(); + + let cache = KQCache::with( + estimated_items_capacity, + weight_capacity, + weighter, + hash_builder, + ); + + let cache = Arc::new(cache); + + let task = KQCacheWithTTLTask { + cache: cache.clone(), + rx, + }; + + let task_handle = tokio::spawn(task.run()); + + Self { + cache, + task_handle, + ttl, + tx, + } + } + + #[inline] + pub async fn get_or_insert_async(&self, key: &Key, qey: &Qey, f: Fut) -> Result + where + Fut: Future>, + { + self.cache.get_or_insert_async(key, qey, f).await + } + + #[inline] + pub async fn get_value_or_guard_async( + &self, + key: Key, + qey: Qey, + ) -> Result> { + match self.cache.get_value_or_guard_async(&key, &qey).await { + Ok(x) => Ok(x), + Err(inner) => Err(PlaceholderGuardWithTTL { + inner, + key, + qey, + ttl: self.ttl, + tx: &self.tx, + }), + } + } + + pub fn insert(&self, key: Key, qey: Qey, val: Val) { + let expire_at = Instant::now() + self.ttl; + + self.cache.insert(key.clone(), qey.clone(), val); + + self.tx.send((expire_at, key, qey)).unwrap(); + } + + pub fn remove(&self, key: &Key, qey: &Qey) -> bool { + self.cache.remove(key, qey) + } +} + +impl< + Key: Eq + Hash, + Qey: Eq + Hash, + Val: Clone, + We: Weighter + Clone, + B: BuildHasher + Clone, + > KQCacheWithTTLTask +{ + async fn run(self) { + while let Ok((expire_at, key, qey)) = self.rx.recv_async().await { + sleep_until(expire_at).await; + + self.cache.remove(&key, &qey); + } + } +} + +impl< + 'a, + Key: Clone + Hash + Eq, + Qey: Clone + Hash + Eq, + Val: Clone, + We: Weighter, + B: BuildHasher, + > PlaceholderGuardWithTTL<'a, Key, Qey, Val, We, B> +{ + pub fn insert(self, val: Val) { + let expire_at = Instant::now() + self.ttl; + + self.inner.insert(val); + + self.tx.send((expire_at, self.key, self.qey)).unwrap(); + } +} diff --git a/quick_cache_ttl/src/lib.rs b/quick_cache_ttl/src/lib.rs new file mode 100644 index 00000000..b2a05332 --- /dev/null +++ b/quick_cache_ttl/src/lib.rs @@ -0,0 +1,6 @@ +mod cache; +mod kq_cache; + +pub use cache::CacheWithTTL; +pub use kq_cache::{KQCacheWithTTL, PlaceholderGuardWithTTL}; +pub use quick_cache::{DefaultHashBuilder, UnitWeighter, Weighter}; diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index 91c2b26a..bca468c6 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -19,6 +19,7 @@ deferred-rate-limiter = { path = "../deferred-rate-limiter" } entities = { path = "../entities" } latency = { path = "../latency" } migration = { path = "../migration" } +quick_cache_ttl = { path = "../quick_cache_ttl" } redis-rate-limiter = { path = "../redis-rate-limiter" } thread-fast-rng = { path = "../thread-fast-rng" } @@ -70,7 +71,6 @@ pagerduty-rs = { version = "0.1.6", default-features = false, features = ["async parking_lot = { version = "0.12.1", features = ["arc_lock"] } prettytable = "*" proctitle = "0.1.1" -quick_cache = "0.3.0" rdkafka = { version = "0.30.0" } regex = "1.8.1" reqwest = { version = "0.11.17", default-features = false, features = ["json", "tokio-rustls"] } diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index a8e8ad01..0a1c188a 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -36,6 +36,7 @@ use ethers::types::U256; use ethers::utils::rlp::{Decodable, Rlp}; use futures::future::join_all; use futures::stream::{FuturesUnordered, StreamExt}; +use hashbrown::hash_map::DefaultHashBuilder; use hashbrown::{HashMap, HashSet}; use ipnet::IpNet; use log::{debug, error, info, trace, warn, Level}; @@ -617,11 +618,15 @@ impl Web3ProxyApp { // responses can be very different in sizes, so this is a cache with a max capacity and a weigher // TODO: don't allow any response to be bigger than X% of the cache // TODO: we should emit stats to calculate a more accurate expected cache size - let response_cache = JsonRpcQueryCache::with_weighter( + // TODO: do we actually want a TTL on this? + let response_cache = JsonRpcQueryCache::new( (top_config.app.response_cache_max_bytes / 2048) as usize, top_config.app.response_cache_max_bytes, JsonRpcQueryWeigher, - ); + DefaultHashBuilder::default(), + Duration::from_secs(3600), + ) + .await; // create semaphores for concurrent connection limits // TODO: what should tti be for semaphores? @@ -1756,7 +1761,7 @@ impl Web3ProxyApp { match self .jsonrpc_query_cache - .get_value_or_guard_async(&cache_key).await + .get_value_or_guard_async(cache_key).await { Ok(x) => x, Err(x) => { diff --git a/web3_proxy/src/frontend/mod.rs b/web3_proxy/src/frontend/mod.rs index 0aca1b02..9eb2f6d9 100644 --- a/web3_proxy/src/frontend/mod.rs +++ b/web3_proxy/src/frontend/mod.rs @@ -19,24 +19,29 @@ use axum::{ use http::{header::AUTHORIZATION, StatusCode}; use listenfd::ListenFd; use log::info; -use std::iter::once; +use quick_cache_ttl::UnitWeighter; use std::net::SocketAddr; use std::sync::Arc; +use std::{iter::once, time::Duration}; use strum::{EnumCount, EnumIter}; -use tokio::{sync::broadcast, time::Instant}; +use tokio::sync::broadcast; use tower_http::cors::CorsLayer; use tower_http::sensitive_headers::SetSensitiveRequestHeadersLayer; /// simple keys for caching responses #[derive(Copy, Clone, Hash, PartialEq, Eq, EnumCount, EnumIter)] -pub enum FrontendResponseCacheKey { +pub enum ResponseCacheKey { BackupsNeeded, Health, Status, } -pub type FrontendJsonResponseCache = - quick_cache::sync::Cache; +pub type ResponseCache = quick_cache_ttl::CacheWithTTL< + ResponseCacheKey, + (StatusCode, &'static str, axum::body::Bytes), + UnitWeighter, + quick_cache_ttl::DefaultHashBuilder, +>; /// Start the frontend server. pub async fn serve( @@ -48,9 +53,10 @@ pub async fn serve( // setup caches for whatever the frontend needs // no need for max items since it is limited by the enum key // TODO: latest moka allows for different ttls for different - let response_cache_size = FrontendResponseCacheKey::COUNT; + let response_cache_size = ResponseCacheKey::COUNT; - let json_response_cache = FrontendJsonResponseCache::new(response_cache_size); + let response_cache = + ResponseCache::new_with_unit_weights(response_cache_size, Duration::from_secs(1)).await; // TODO: read config for if fastest/versus should be available publicly. default off @@ -216,7 +222,7 @@ pub async fn serve( // application state .layer(Extension(proxy_app)) // frontend caches - .layer(Extension(Arc::new(json_response_cache))) + .layer(Extension(Arc::new(response_cache))) // 404 for any unknown routes .fallback(errors::handler_404); diff --git a/web3_proxy/src/frontend/status.rs b/web3_proxy/src/frontend/status.rs index 4bc30aa3..1f1cb94b 100644 --- a/web3_proxy/src/frontend/status.rs +++ b/web3_proxy/src/frontend/status.rs @@ -3,18 +3,18 @@ //! For ease of development, users can currently access these endponts. //! They will eventually move to another port. -use super::{FrontendJsonResponseCache, FrontendResponseCacheKey}; -use crate::{ - app::{Web3ProxyApp, APP_USER_AGENT}, - frontend::errors::Web3ProxyError, +use super::{ResponseCache, ResponseCacheKey}; +use crate::app::{Web3ProxyApp, APP_USER_AGENT}; +use axum::{ + body::{Bytes, Full}, + http::StatusCode, + response::{IntoResponse, Response}, + Extension, }; -use axum::{body::Bytes, http::StatusCode, response::IntoResponse, Extension}; use axum_macros::debug_handler; -use futures::Future; use once_cell::sync::Lazy; use serde_json::json; -use std::{sync::Arc, time::Duration}; -use tokio::time::Instant; +use std::{convert::Infallible, sync::Arc}; static HEALTH_OK: Lazy = Lazy::new(|| Bytes::from("OK\n")); static HEALTH_NOT_OK: Lazy = Lazy::new(|| Bytes::from(":(\n")); @@ -22,59 +22,40 @@ static HEALTH_NOT_OK: Lazy = Lazy::new(|| Bytes::from(":(\n")); static BACKUPS_NEEDED_TRUE: Lazy = Lazy::new(|| Bytes::from("true\n")); static BACKUPS_NEEDED_FALSE: Lazy = Lazy::new(|| Bytes::from("false\n")); -/// simple ttl for -// TODO: make this generic for any cache/key -async fn _quick_cache_ttl( - app: Arc, - cache: Arc, - key: FrontendResponseCacheKey, - f: impl Fn(Arc) -> Fut, -) -> (StatusCode, Bytes) -where - Fut: Future, -{ - let mut response; - let expire_at; - - (response, expire_at) = cache - .get_or_insert_async::(&key, async { - let expire_at = Instant::now() + Duration::from_millis(1000); - - let response = f(app.clone()).await; - - Ok((response, expire_at)) - }) - .await - .unwrap(); - - if Instant::now() >= expire_at { - // TODO: this expiration isn't perfect - // parallel requests could overwrite eachother - // its good enough for now - let expire_at = Instant::now() + Duration::from_millis(1000); - - response = f(app).await; - - cache.insert(key, (response.clone(), expire_at)); - } - - response -} +static CONTENT_TYPE_JSON: &str = "application/json"; +static CONTENT_TYPE_PLAIN: &str = "text/plain"; /// Health check page for load balancers to use. #[debug_handler] pub async fn health( Extension(app): Extension>, - Extension(cache): Extension>, + Extension(cache): Extension>, ) -> impl IntoResponse { - _quick_cache_ttl(app, cache, FrontendResponseCacheKey::Health, _health).await + let (code, content_type, body) = cache + .get_or_insert_async::(&ResponseCacheKey::Health, async move { + Ok(_health(app).await) + }) + .await + .unwrap(); + + Response::builder() + .status(code) + .header("content-type", content_type) + .body(Full::from(body)) + .unwrap() } -async fn _health(app: Arc) -> (StatusCode, Bytes) { +// TODO: _health doesn't need to be async, but _quick_cache_ttl needs an async function +#[inline] +async fn _health(app: Arc) -> (StatusCode, &'static str, Bytes) { if app.balanced_rpcs.synced() { - (StatusCode::OK, HEALTH_OK.clone()) + (StatusCode::OK, CONTENT_TYPE_PLAIN, HEALTH_OK.clone()) } else { - (StatusCode::SERVICE_UNAVAILABLE, HEALTH_NOT_OK.clone()) + ( + StatusCode::SERVICE_UNAVAILABLE, + CONTENT_TYPE_PLAIN, + HEALTH_NOT_OK.clone(), + ) } } @@ -82,18 +63,24 @@ async fn _health(app: Arc) -> (StatusCode, Bytes) { #[debug_handler] pub async fn backups_needed( Extension(app): Extension>, - Extension(cache): Extension>, + Extension(cache): Extension>, ) -> impl IntoResponse { - _quick_cache_ttl( - app, - cache, - FrontendResponseCacheKey::BackupsNeeded, - _backups_needed, - ) - .await + let (code, content_type, body) = cache + .get_or_insert_async::(&ResponseCacheKey::BackupsNeeded, async move { + Ok(_backups_needed(app).await) + }) + .await + .unwrap(); + + Response::builder() + .status(code) + .header("content-type", content_type) + .body(Full::from(body)) + .unwrap() } -async fn _backups_needed(app: Arc) -> (StatusCode, Bytes) { +#[inline] +async fn _backups_needed(app: Arc) -> (StatusCode, &'static str, Bytes) { let code = { let consensus_rpcs = app .balanced_rpcs @@ -114,25 +101,37 @@ async fn _backups_needed(app: Arc) -> (StatusCode, Bytes) { }; if matches!(code, StatusCode::OK) { - (code, BACKUPS_NEEDED_FALSE.clone()) + (code, CONTENT_TYPE_PLAIN, BACKUPS_NEEDED_FALSE.clone()) } else { - (code, BACKUPS_NEEDED_TRUE.clone()) + (code, CONTENT_TYPE_PLAIN, BACKUPS_NEEDED_TRUE.clone()) } } /// Very basic status page. /// -/// TODO: replace this with proper stats and monitoring +/// TODO: replace this with proper stats and monitoring. frontend uses it for their public dashboards though #[debug_handler] pub async fn status( Extension(app): Extension>, - Extension(cache): Extension>, + Extension(cache): Extension>, ) -> impl IntoResponse { - _quick_cache_ttl(app, cache, FrontendResponseCacheKey::Status, _status).await + let (code, content_type, body) = cache + .get_or_insert_async::(&ResponseCacheKey::Status, async move { + Ok(_status(app).await) + }) + .await + .unwrap(); + + Response::builder() + .status(code) + .header("content-type", content_type) + .body(Full::from(body)) + .unwrap() } -// TODO: this doesn't need to be async, but _quick_cache_ttl needs an async function -async fn _status(app: Arc) -> (StatusCode, Bytes) { +// TODO: _status doesn't need to be async, but _quick_cache_ttl needs an async function +#[inline] +async fn _status(app: Arc) -> (StatusCode, &'static str, Bytes) { // TODO: what else should we include? uptime, cache hit rates, cpu load, memory used // TODO: the hostname is probably not going to change. only get once at the start? let body = json!({ @@ -154,5 +153,5 @@ async fn _status(app: Arc) -> (StatusCode, Bytes) { StatusCode::INTERNAL_SERVER_ERROR }; - (code, body) + (code, CONTENT_TYPE_JSON, body) } diff --git a/web3_proxy/src/response_cache.rs b/web3_proxy/src/response_cache.rs index c7e710bd..7b269847 100644 --- a/web3_proxy/src/response_cache.rs +++ b/web3_proxy/src/response_cache.rs @@ -3,7 +3,8 @@ use crate::{ }; use derive_more::From; use ethers::providers::ProviderError; -use quick_cache::{sync::Cache as QuickCache, Weighter}; +use hashbrown::hash_map::DefaultHashBuilder; +use quick_cache_ttl::{CacheWithTTL, Weighter}; use serde_json::value::RawValue; use std::{ borrow::Cow, @@ -33,8 +34,12 @@ impl Hash for JsonRpcQueryCacheKey { } } -pub type JsonRpcQueryCache = - QuickCache; +pub type JsonRpcQueryCache = CacheWithTTL< + JsonRpcQueryCacheKey, + JsonRpcResponseData, + JsonRpcQueryWeigher, + DefaultHashBuilder, +>; #[derive(Clone)] pub struct JsonRpcQueryWeigher;