Merge branch 'quick_cache_ttl' into devel

This commit is contained in:
Bryan Stitt 2023-05-16 15:42:10 -07:00
commit 59e864e70f
12 changed files with 351 additions and 84 deletions

11
Cargo.lock generated
View File

@ -4225,6 +4225,15 @@ dependencies = [
"parking_lot 0.12.1",
]
[[package]]
name = "quick_cache_ttl"
version = "0.1.0"
dependencies = [
"flume",
"quick_cache",
"tokio",
]
[[package]]
name = "quote"
version = "1.0.27"
@ -6699,7 +6708,7 @@ dependencies = [
"parking_lot 0.12.1",
"prettytable",
"proctitle",
"quick_cache",
"quick_cache_ttl",
"rdkafka",
"redis-rate-limiter",
"regex",

View File

@ -4,6 +4,7 @@ members = [
"entities",
"latency",
"migration",
"quick_cache_ttl",
"rate-counter",
"redis-rate-limiter",
"thread-fast-rng",

View File

@ -63,6 +63,7 @@ impl PeakEwmaLatency {
);
// Update the RTT estimate to account for decay since the last update.
// TODO: having an update here means we don't actually write from just one thread!! Thats how we get partially written stuff i think
estimate.update(0.0, self.decay_ns, now)
}

View File

@ -0,0 +1,14 @@
[package]
name = "quick_cache_ttl"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
flume = "0.10.14"
quick_cache = "0.3.0"
tokio = { version = "1.28.1", features = ["full"] }
[dev-dependencies]
tokio = { version = "1.28.1", features = ["full", "test-util"] }

View File

@ -0,0 +1,78 @@
use quick_cache::{DefaultHashBuilder, UnitWeighter, Weighter};
use std::{
future::Future,
hash::{BuildHasher, Hash},
time::Duration,
};
use crate::{KQCacheWithTTL, PlaceholderGuardWithTTL};
pub struct CacheWithTTL<Key, Val, We, B>(KQCacheWithTTL<Key, (), Val, We, B>);
impl<Key: Eq + Hash + Clone + Send + Sync + 'static, Val: Clone + Send + Sync + 'static>
CacheWithTTL<Key, Val, UnitWeighter, DefaultHashBuilder>
{
pub async fn new_with_unit_weights(estimated_items_capacity: usize, ttl: Duration) -> Self {
Self::new(
estimated_items_capacity,
estimated_items_capacity as u64,
UnitWeighter,
DefaultHashBuilder::default(),
ttl,
)
.await
}
}
impl<
Key: Eq + Hash + Clone + Send + Sync + 'static,
Val: Clone + Send + Sync + 'static,
We: Weighter<Key, (), Val> + Clone + Send + Sync + 'static,
B: BuildHasher + Clone + Send + Sync + 'static,
> CacheWithTTL<Key, Val, We, B>
{
pub async fn new(
estimated_items_capacity: usize,
weight_capacity: u64,
weighter: We,
hash_builder: B,
ttl: Duration,
) -> Self {
let inner = KQCacheWithTTL::new(
estimated_items_capacity,
weight_capacity,
weighter,
hash_builder,
ttl,
)
.await;
Self(inner)
}
#[inline]
pub async fn get_or_insert_async<E, Fut>(&self, key: &Key, f: Fut) -> Result<Val, E>
where
Fut: Future<Output = Result<Val, E>>,
{
self.0.get_or_insert_async(key, &(), f).await
}
#[inline]
pub async fn get_value_or_guard_async(
&self,
key: Key,
) -> Result<Val, PlaceholderGuardWithTTL<'_, Key, (), Val, We, B>> {
self.0.get_value_or_guard_async(key, ()).await
}
#[inline]
pub fn insert(&self, key: Key, val: Val) {
self.0.insert(key, (), val)
}
#[inline]
pub fn remove(&self, key: &Key) -> bool {
self.0.remove(key, &())
}
}

View File

@ -0,0 +1,143 @@
use quick_cache::sync::KQCache;
use quick_cache::{PlaceholderGuard, Weighter};
use std::future::Future;
use std::hash::{BuildHasher, Hash};
use std::sync::Arc;
use std::time::Duration;
use tokio::task::JoinHandle;
use tokio::time::{sleep_until, Instant};
pub struct KQCacheWithTTL<Key, Qey, Val, We, B> {
pub(crate) cache: Arc<KQCache<Key, Qey, Val, We, B>>,
pub task_handle: JoinHandle<()>,
ttl: Duration,
pub(crate) tx: flume::Sender<(Instant, Key, Qey)>,
}
struct KQCacheWithTTLTask<Key, Qey, Val, We, B> {
cache: Arc<KQCache<Key, Qey, Val, We, B>>,
rx: flume::Receiver<(Instant, Key, Qey)>,
}
pub struct PlaceholderGuardWithTTL<'a, Key, Qey, Val, We, B> {
inner: PlaceholderGuard<'a, Key, Qey, Val, We, B>,
key: Key,
qey: Qey,
ttl: Duration,
tx: &'a flume::Sender<(Instant, Key, Qey)>,
}
impl<
Key: Eq + Hash + Clone + Send + Sync + 'static,
Qey: Eq + Hash + Clone + Send + Sync + 'static,
Val: Clone + Send + Sync + 'static,
We: Weighter<Key, Qey, Val> + Clone + Send + Sync + 'static,
B: BuildHasher + Clone + Send + Sync + 'static,
> KQCacheWithTTL<Key, Qey, Val, We, B>
{
pub async fn new(
estimated_items_capacity: usize,
weight_capacity: u64,
weighter: We,
hash_builder: B,
ttl: Duration,
) -> Self {
let (tx, rx) = flume::unbounded();
let cache = KQCache::with(
estimated_items_capacity,
weight_capacity,
weighter,
hash_builder,
);
let cache = Arc::new(cache);
let task = KQCacheWithTTLTask {
cache: cache.clone(),
rx,
};
let task_handle = tokio::spawn(task.run());
Self {
cache,
task_handle,
ttl,
tx,
}
}
#[inline]
pub async fn get_or_insert_async<E, Fut>(&self, key: &Key, qey: &Qey, f: Fut) -> Result<Val, E>
where
Fut: Future<Output = Result<Val, E>>,
{
self.cache.get_or_insert_async(key, qey, f).await
}
#[inline]
pub async fn get_value_or_guard_async(
&self,
key: Key,
qey: Qey,
) -> Result<Val, PlaceholderGuardWithTTL<'_, Key, Qey, Val, We, B>> {
match self.cache.get_value_or_guard_async(&key, &qey).await {
Ok(x) => Ok(x),
Err(inner) => Err(PlaceholderGuardWithTTL {
inner,
key,
qey,
ttl: self.ttl,
tx: &self.tx,
}),
}
}
pub fn insert(&self, key: Key, qey: Qey, val: Val) {
let expire_at = Instant::now() + self.ttl;
self.cache.insert(key.clone(), qey.clone(), val);
self.tx.send((expire_at, key, qey)).unwrap();
}
pub fn remove(&self, key: &Key, qey: &Qey) -> bool {
self.cache.remove(key, qey)
}
}
impl<
Key: Eq + Hash,
Qey: Eq + Hash,
Val: Clone,
We: Weighter<Key, Qey, Val> + Clone,
B: BuildHasher + Clone,
> KQCacheWithTTLTask<Key, Qey, Val, We, B>
{
async fn run(self) {
while let Ok((expire_at, key, qey)) = self.rx.recv_async().await {
sleep_until(expire_at).await;
self.cache.remove(&key, &qey);
}
}
}
impl<
'a,
Key: Clone + Hash + Eq,
Qey: Clone + Hash + Eq,
Val: Clone,
We: Weighter<Key, Qey, Val>,
B: BuildHasher,
> PlaceholderGuardWithTTL<'a, Key, Qey, Val, We, B>
{
pub fn insert(self, val: Val) {
let expire_at = Instant::now() + self.ttl;
self.inner.insert(val);
self.tx.send((expire_at, self.key, self.qey)).unwrap();
}
}

View File

@ -0,0 +1,6 @@
mod cache;
mod kq_cache;
pub use cache::CacheWithTTL;
pub use kq_cache::{KQCacheWithTTL, PlaceholderGuardWithTTL};
pub use quick_cache::{DefaultHashBuilder, UnitWeighter, Weighter};

View File

@ -19,6 +19,7 @@ deferred-rate-limiter = { path = "../deferred-rate-limiter" }
entities = { path = "../entities" }
latency = { path = "../latency" }
migration = { path = "../migration" }
quick_cache_ttl = { path = "../quick_cache_ttl" }
redis-rate-limiter = { path = "../redis-rate-limiter" }
thread-fast-rng = { path = "../thread-fast-rng" }
@ -70,7 +71,6 @@ pagerduty-rs = { version = "0.1.6", default-features = false, features = ["async
parking_lot = { version = "0.12.1", features = ["arc_lock"] }
prettytable = "*"
proctitle = "0.1.1"
quick_cache = "0.3.0"
rdkafka = { version = "0.30.0" }
regex = "1.8.1"
reqwest = { version = "0.11.17", default-features = false, features = ["json", "tokio-rustls"] }

View File

@ -36,6 +36,7 @@ use ethers::types::U256;
use ethers::utils::rlp::{Decodable, Rlp};
use futures::future::join_all;
use futures::stream::{FuturesUnordered, StreamExt};
use hashbrown::hash_map::DefaultHashBuilder;
use hashbrown::{HashMap, HashSet};
use ipnet::IpNet;
use log::{debug, error, info, trace, warn, Level};
@ -617,11 +618,15 @@ impl Web3ProxyApp {
// responses can be very different in sizes, so this is a cache with a max capacity and a weigher
// TODO: don't allow any response to be bigger than X% of the cache
// TODO: we should emit stats to calculate a more accurate expected cache size
let response_cache = JsonRpcQueryCache::with_weighter(
// TODO: do we actually want a TTL on this?
let response_cache = JsonRpcQueryCache::new(
(top_config.app.response_cache_max_bytes / 2048) as usize,
top_config.app.response_cache_max_bytes,
JsonRpcQueryWeigher,
);
DefaultHashBuilder::default(),
Duration::from_secs(3600),
)
.await;
// create semaphores for concurrent connection limits
// TODO: what should tti be for semaphores?
@ -1756,7 +1761,7 @@ impl Web3ProxyApp {
match self
.jsonrpc_query_cache
.get_value_or_guard_async(&cache_key).await
.get_value_or_guard_async(cache_key).await
{
Ok(x) => x,
Err(x) => {

View File

@ -19,24 +19,29 @@ use axum::{
use http::{header::AUTHORIZATION, StatusCode};
use listenfd::ListenFd;
use log::info;
use std::iter::once;
use quick_cache_ttl::UnitWeighter;
use std::net::SocketAddr;
use std::sync::Arc;
use std::{iter::once, time::Duration};
use strum::{EnumCount, EnumIter};
use tokio::{sync::broadcast, time::Instant};
use tokio::sync::broadcast;
use tower_http::cors::CorsLayer;
use tower_http::sensitive_headers::SetSensitiveRequestHeadersLayer;
/// simple keys for caching responses
#[derive(Copy, Clone, Hash, PartialEq, Eq, EnumCount, EnumIter)]
pub enum FrontendResponseCacheKey {
pub enum ResponseCacheKey {
BackupsNeeded,
Health,
Status,
}
pub type FrontendJsonResponseCache =
quick_cache::sync::Cache<FrontendResponseCacheKey, ((StatusCode, axum::body::Bytes), Instant)>;
pub type ResponseCache = quick_cache_ttl::CacheWithTTL<
ResponseCacheKey,
(StatusCode, &'static str, axum::body::Bytes),
UnitWeighter,
quick_cache_ttl::DefaultHashBuilder,
>;
/// Start the frontend server.
pub async fn serve(
@ -48,9 +53,10 @@ pub async fn serve(
// setup caches for whatever the frontend needs
// no need for max items since it is limited by the enum key
// TODO: latest moka allows for different ttls for different
let response_cache_size = FrontendResponseCacheKey::COUNT;
let response_cache_size = ResponseCacheKey::COUNT;
let json_response_cache = FrontendJsonResponseCache::new(response_cache_size);
let response_cache =
ResponseCache::new_with_unit_weights(response_cache_size, Duration::from_secs(1)).await;
// TODO: read config for if fastest/versus should be available publicly. default off
@ -216,7 +222,7 @@ pub async fn serve(
// application state
.layer(Extension(proxy_app))
// frontend caches
.layer(Extension(Arc::new(json_response_cache)))
.layer(Extension(Arc::new(response_cache)))
// 404 for any unknown routes
.fallback(errors::handler_404);

View File

@ -3,18 +3,18 @@
//! For ease of development, users can currently access these endponts.
//! They will eventually move to another port.
use super::{FrontendJsonResponseCache, FrontendResponseCacheKey};
use crate::{
app::{Web3ProxyApp, APP_USER_AGENT},
frontend::errors::Web3ProxyError,
use super::{ResponseCache, ResponseCacheKey};
use crate::app::{Web3ProxyApp, APP_USER_AGENT};
use axum::{
body::{Bytes, Full},
http::StatusCode,
response::{IntoResponse, Response},
Extension,
};
use axum::{body::Bytes, http::StatusCode, response::IntoResponse, Extension};
use axum_macros::debug_handler;
use futures::Future;
use once_cell::sync::Lazy;
use serde_json::json;
use std::{sync::Arc, time::Duration};
use tokio::time::Instant;
use std::{convert::Infallible, sync::Arc};
static HEALTH_OK: Lazy<Bytes> = Lazy::new(|| Bytes::from("OK\n"));
static HEALTH_NOT_OK: Lazy<Bytes> = Lazy::new(|| Bytes::from(":(\n"));
@ -22,59 +22,40 @@ static HEALTH_NOT_OK: Lazy<Bytes> = Lazy::new(|| Bytes::from(":(\n"));
static BACKUPS_NEEDED_TRUE: Lazy<Bytes> = Lazy::new(|| Bytes::from("true\n"));
static BACKUPS_NEEDED_FALSE: Lazy<Bytes> = Lazy::new(|| Bytes::from("false\n"));
/// simple ttl for
// TODO: make this generic for any cache/key
async fn _quick_cache_ttl<Fut>(
app: Arc<Web3ProxyApp>,
cache: Arc<FrontendJsonResponseCache>,
key: FrontendResponseCacheKey,
f: impl Fn(Arc<Web3ProxyApp>) -> Fut,
) -> (StatusCode, Bytes)
where
Fut: Future<Output = (StatusCode, Bytes)>,
{
let mut response;
let expire_at;
(response, expire_at) = cache
.get_or_insert_async::<Web3ProxyError>(&key, async {
let expire_at = Instant::now() + Duration::from_millis(1000);
let response = f(app.clone()).await;
Ok((response, expire_at))
})
.await
.unwrap();
if Instant::now() >= expire_at {
// TODO: this expiration isn't perfect
// parallel requests could overwrite eachother
// its good enough for now
let expire_at = Instant::now() + Duration::from_millis(1000);
response = f(app).await;
cache.insert(key, (response.clone(), expire_at));
}
response
}
static CONTENT_TYPE_JSON: &str = "application/json";
static CONTENT_TYPE_PLAIN: &str = "text/plain";
/// Health check page for load balancers to use.
#[debug_handler]
pub async fn health(
Extension(app): Extension<Arc<Web3ProxyApp>>,
Extension(cache): Extension<Arc<FrontendJsonResponseCache>>,
Extension(cache): Extension<Arc<ResponseCache>>,
) -> impl IntoResponse {
_quick_cache_ttl(app, cache, FrontendResponseCacheKey::Health, _health).await
let (code, content_type, body) = cache
.get_or_insert_async::<Infallible, _>(&ResponseCacheKey::Health, async move {
Ok(_health(app).await)
})
.await
.unwrap();
Response::builder()
.status(code)
.header("content-type", content_type)
.body(Full::from(body))
.unwrap()
}
async fn _health(app: Arc<Web3ProxyApp>) -> (StatusCode, Bytes) {
// TODO: _health doesn't need to be async, but _quick_cache_ttl needs an async function
#[inline]
async fn _health(app: Arc<Web3ProxyApp>) -> (StatusCode, &'static str, Bytes) {
if app.balanced_rpcs.synced() {
(StatusCode::OK, HEALTH_OK.clone())
(StatusCode::OK, CONTENT_TYPE_PLAIN, HEALTH_OK.clone())
} else {
(StatusCode::SERVICE_UNAVAILABLE, HEALTH_NOT_OK.clone())
(
StatusCode::SERVICE_UNAVAILABLE,
CONTENT_TYPE_PLAIN,
HEALTH_NOT_OK.clone(),
)
}
}
@ -82,18 +63,24 @@ async fn _health(app: Arc<Web3ProxyApp>) -> (StatusCode, Bytes) {
#[debug_handler]
pub async fn backups_needed(
Extension(app): Extension<Arc<Web3ProxyApp>>,
Extension(cache): Extension<Arc<FrontendJsonResponseCache>>,
Extension(cache): Extension<Arc<ResponseCache>>,
) -> impl IntoResponse {
_quick_cache_ttl(
app,
cache,
FrontendResponseCacheKey::BackupsNeeded,
_backups_needed,
)
.await
let (code, content_type, body) = cache
.get_or_insert_async::<Infallible, _>(&ResponseCacheKey::BackupsNeeded, async move {
Ok(_backups_needed(app).await)
})
.await
.unwrap();
Response::builder()
.status(code)
.header("content-type", content_type)
.body(Full::from(body))
.unwrap()
}
async fn _backups_needed(app: Arc<Web3ProxyApp>) -> (StatusCode, Bytes) {
#[inline]
async fn _backups_needed(app: Arc<Web3ProxyApp>) -> (StatusCode, &'static str, Bytes) {
let code = {
let consensus_rpcs = app
.balanced_rpcs
@ -114,25 +101,37 @@ async fn _backups_needed(app: Arc<Web3ProxyApp>) -> (StatusCode, Bytes) {
};
if matches!(code, StatusCode::OK) {
(code, BACKUPS_NEEDED_FALSE.clone())
(code, CONTENT_TYPE_PLAIN, BACKUPS_NEEDED_FALSE.clone())
} else {
(code, BACKUPS_NEEDED_TRUE.clone())
(code, CONTENT_TYPE_PLAIN, BACKUPS_NEEDED_TRUE.clone())
}
}
/// Very basic status page.
///
/// TODO: replace this with proper stats and monitoring
/// TODO: replace this with proper stats and monitoring. frontend uses it for their public dashboards though
#[debug_handler]
pub async fn status(
Extension(app): Extension<Arc<Web3ProxyApp>>,
Extension(cache): Extension<Arc<FrontendJsonResponseCache>>,
Extension(cache): Extension<Arc<ResponseCache>>,
) -> impl IntoResponse {
_quick_cache_ttl(app, cache, FrontendResponseCacheKey::Status, _status).await
let (code, content_type, body) = cache
.get_or_insert_async::<Infallible, _>(&ResponseCacheKey::Status, async move {
Ok(_status(app).await)
})
.await
.unwrap();
Response::builder()
.status(code)
.header("content-type", content_type)
.body(Full::from(body))
.unwrap()
}
// TODO: this doesn't need to be async, but _quick_cache_ttl needs an async function
async fn _status(app: Arc<Web3ProxyApp>) -> (StatusCode, Bytes) {
// TODO: _status doesn't need to be async, but _quick_cache_ttl needs an async function
#[inline]
async fn _status(app: Arc<Web3ProxyApp>) -> (StatusCode, &'static str, Bytes) {
// TODO: what else should we include? uptime, cache hit rates, cpu load, memory used
// TODO: the hostname is probably not going to change. only get once at the start?
let body = json!({
@ -154,5 +153,5 @@ async fn _status(app: Arc<Web3ProxyApp>) -> (StatusCode, Bytes) {
StatusCode::INTERNAL_SERVER_ERROR
};
(code, body)
(code, CONTENT_TYPE_JSON, body)
}

View File

@ -3,7 +3,8 @@ use crate::{
};
use derive_more::From;
use ethers::providers::ProviderError;
use quick_cache::{sync::Cache as QuickCache, Weighter};
use hashbrown::hash_map::DefaultHashBuilder;
use quick_cache_ttl::{CacheWithTTL, Weighter};
use serde_json::value::RawValue;
use std::{
borrow::Cow,
@ -33,8 +34,12 @@ impl Hash for JsonRpcQueryCacheKey {
}
}
pub type JsonRpcQueryCache =
QuickCache<JsonRpcQueryCacheKey, JsonRpcResponseData, JsonRpcQueryWeigher>;
pub type JsonRpcQueryCache = CacheWithTTL<
JsonRpcQueryCacheKey,
JsonRpcResponseData,
JsonRpcQueryWeigher,
DefaultHashBuilder,
>;
#[derive(Clone)]
pub struct JsonRpcQueryWeigher;