serialize the caches on /status

This commit is contained in:
Bryan Stitt 2023-06-07 10:48:55 -07:00
parent 63499c1564
commit 2b814e7a4d
7 changed files with 70 additions and 28 deletions

1
Cargo.lock generated

@ -4042,6 +4042,7 @@ dependencies = [
"flume",
"log",
"quick_cache",
"serde",
"tokio",
]

@ -9,6 +9,7 @@ edition = "2021"
flume = "0.10.14"
log = "0.4.18"
quick_cache = "0.3.0"
serde = "1"
tokio = { version = "1.28.2", features = ["full"] }
[dev-dependencies]

@ -1,4 +1,6 @@
use crate::{KQCacheWithTTL, PlaceholderGuardWithTTL};
use quick_cache::{DefaultHashBuilder, UnitWeighter, Weighter};
use serde::{Serialize, Serializer};
use std::{
fmt::Debug,
future::Future,
@ -8,8 +10,6 @@ use std::{
time::Duration,
};
use crate::{KQCacheWithTTL, PlaceholderGuardWithTTL};
pub struct CacheWithTTL<Key, Val, We = UnitWeighter, B = DefaultHashBuilder>(
KQCacheWithTTL<Key, (), Val, We, B>,
);
@ -110,14 +110,6 @@ impl<
self.0.get_or_insert_async(key, &(), f).await
}
#[inline]
pub async fn try_get_or_insert_async<E, Fut>(&self, key: &Key, f: Fut) -> Result<Val, E>
where
Fut: Future<Output = Result<Val, E>>,
{
self.0.try_get_or_insert_async(key, &(), f).await
}
#[inline]
pub async fn get_value_or_guard_async(
&self,
@ -126,6 +118,16 @@ impl<
self.0.get_value_or_guard_async(key, ()).await
}
#[inline]
pub fn peek(&self, key: &Key) -> Option<Val> {
self.0.peek(key, &())
}
#[inline]
pub fn remove(&self, key: &Key) -> bool {
self.0.remove(key, &())
}
/// if the item was too large to insert, it is returned with the error
/// IMPORTANT! Inserting the same key multiple times does NOT reset the TTL!
#[inline]
@ -134,7 +136,25 @@ impl<
}
#[inline]
pub fn remove(&self, key: &Key) -> bool {
self.0.remove(key, &())
pub async fn try_get_or_insert_async<E, Fut>(&self, key: &Key, f: Fut) -> Result<Val, E>
where
Fut: Future<Output = Result<Val, E>>,
{
self.0.try_get_or_insert_async(key, &(), f).await
}
}
impl<
Key: Clone + Debug + Eq + Hash + Send + Sync + 'static,
Val: Clone + Send + Sync + 'static,
We: Weighter<Key, (), Val> + Clone + Send + Sync + 'static,
B: BuildHasher + Clone + Send + Sync + 'static,
> Serialize for CacheWithTTL<Key, Val, We, B>
{
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
self.0.serialize(serializer)
}
}

@ -1,6 +1,8 @@
use log::{log_enabled, trace};
use quick_cache::sync::KQCache;
use quick_cache::{PlaceholderGuard, Weighter};
use serde::ser::SerializeStruct;
use serde::{Serialize, Serializer};
use std::convert::Infallible;
use std::fmt::Debug;
use std::future::Future;
@ -147,11 +149,6 @@ impl<
}
}
#[inline]
pub fn hits(&self) -> u64 {
self.cache.hits()
}
/// if the item was too large to insert, it is returned with the error
/// IMPORTANT! Inserting the same key multiple times does NOT reset the TTL!
#[inline]
@ -179,11 +176,6 @@ impl<
}
}
#[inline]
pub fn misses(&self) -> u64 {
self.cache.misses()
}
#[inline]
pub fn peek(&self, key: &Key, qey: &Qey) -> Option<Val> {
self.cache.peek(key, qey)
@ -268,3 +260,29 @@ impl<
}
}
}
impl<
Key: Clone + Debug + Eq + Hash + Send + Sync + 'static,
Qey: Clone + Debug + Eq + Hash + Send + Sync + 'static,
Val: Clone + Send + Sync + 'static,
We: Weighter<Key, Qey, Val> + Clone + Send + Sync + 'static,
B: BuildHasher + Clone + Send + Sync + 'static,
> Serialize for KQCacheWithTTL<Key, Qey, Val, We, B>
{
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut state = serializer.serialize_struct(self.name, 5)?;
state.serialize_field("len", &self.cache.len())?;
state.serialize_field("weight", &self.cache.weight())?;
state.serialize_field("capacity", &self.cache.capacity())?;
state.serialize_field("hits", &self.cache.hits())?;
state.serialize_field("misses", &self.cache.misses())?;
state.end()
}
}

@ -7,7 +7,7 @@ use crate::jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest};
use crate::rpcs::one::Web3Rpc;
use crate::stats::{AppStat, BackendRequests, RpcQueryStats};
use crate::user_token::UserBearerToken;
use anyhow::{Context, Error};
use anyhow::{Context};
use axum::headers::authorization::Bearer;
use axum::headers::{Header, Origin, Referer, UserAgent};
use chrono::Utc;
@ -25,7 +25,6 @@ use ipnet::IpNet;
use log::{error, trace, warn};
use migration::sea_orm::prelude::Decimal;
use migration::sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter};
use num_traits::ToPrimitive;
use rdkafka::message::{Header as KafkaHeader, OwnedHeaders as KafkaOwnedHeaders, OwnedMessage};
use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::util::Timeout as KafkaTimeout;

@ -135,12 +135,15 @@ async fn _status(app: Arc<Web3ProxyApp>) -> (StatusCode, &'static str, Bytes) {
// TODO: what else should we include? uptime, cache hit rates, cpu load, memory used
// TODO: the hostname is probably not going to change. only get once at the start?
let body = json!({
"version": APP_USER_AGENT,
"chain_id": app.config.chain_id,
"balanced_rpcs": app.balanced_rpcs,
"private_rpcs": app.private_rpcs,
"bundler_4337_rpcs": app.bundler_4337_rpcs,
"chain_id": app.config.chain_id,
"hostname": app.hostname,
"jsonrpc_response_cache": app.jsonrpc_response_cache,
"private_rpcs": app.private_rpcs,
"rpc_secret_key_cache": app.rpc_secret_key_cache,
"user_balance_cache": app.user_balance_cache,
"version": APP_USER_AGENT,
});
let body = body.to_string().into_bytes();

@ -1234,7 +1234,7 @@ impl Serialize for Web3Rpcs {
where
S: Serializer,
{
let mut state = serializer.serialize_struct("Web3Rpcs", 1)?;
let mut state = serializer.serialize_struct("Web3Rpcs", 2)?;
{
let by_name = self.by_name.load();