web3-proxy/web3_proxy/src/stats.rs

373 lines
14 KiB
Rust
Raw Normal View History

2022-10-10 07:15:07 +03:00
use crate::frontend::authorization::{AuthorizedKey, RequestMetadata};
use anyhow::Context;
use chrono::{TimeZone, Utc};
use derive_more::From;
2022-10-10 07:15:07 +03:00
use entities::rpc_accounting;
2022-10-11 08:13:00 +03:00
use hdrhistogram::Histogram;
2022-10-10 07:15:07 +03:00
use moka::future::{Cache, CacheBuilder};
use sea_orm::{ActiveModelTrait, DatabaseConnection};
2022-10-11 08:13:00 +03:00
use std::sync::atomic::{AtomicU64, Ordering};
2022-10-10 07:15:07 +03:00
use std::sync::Arc;
2022-10-11 08:13:00 +03:00
use std::{sync::atomic::AtomicU32, time::Duration};
use tokio::sync::Mutex as AsyncMutex;
2022-10-03 21:08:01 +03:00
use tokio::task::JoinHandle;
2022-10-10 07:15:07 +03:00
use tracing::{error, info, trace};
2022-10-10 07:15:07 +03:00
/// TODO: where should this be defined?
/// TODO: can we use something inside sea_orm instead?
#[derive(Debug)]
2022-10-10 07:15:07 +03:00
pub struct ProxyResponseStat {
user_key_id: u64,
method: String,
2022-10-11 20:34:25 +03:00
metadata: AsyncMutex<RequestMetadata>,
}
2022-10-11 08:13:00 +03:00
pub type TimeBucketTimestamp = u64;
2022-10-11 20:34:25 +03:00
pub struct ProxyResponseHistograms {
request_bytes: Histogram<u64>,
response_bytes: Histogram<u64>,
response_millis: Histogram<u64>,
}
impl Default for ProxyResponseHistograms {
fn default() -> Self {
// TODO: how many significant figures?
let request_bytes = Histogram::new(5).expect("creating request_bytes histogram");
let response_bytes = Histogram::new(5).expect("creating response_bytes histogram");
let response_millis = Histogram::new(5).expect("creating response_millis histogram");
Self {
request_bytes,
response_bytes,
response_millis,
}
}
}
2022-10-10 07:15:07 +03:00
// TODO: impl From for our database model
pub struct ProxyResponseAggregate {
2022-10-11 08:13:00 +03:00
// these are the key
2022-10-10 07:15:07 +03:00
// user_key_id: u64,
// method: String,
// error_response: bool,
2022-10-11 08:13:00 +03:00
// TODO: this is the grandparent key. get it from there somehow
period_timestamp: u64,
2022-10-10 07:15:07 +03:00
frontend_requests: AtomicU32,
backend_requests: AtomicU32,
2022-10-11 08:13:00 +03:00
backend_retries: AtomicU32,
cache_misses: AtomicU32,
cache_hits: AtomicU32,
sum_request_bytes: AtomicU64,
sum_response_bytes: AtomicU64,
sum_response_millis: AtomicU64,
2022-10-11 20:34:25 +03:00
histograms: AsyncMutex<ProxyResponseHistograms>,
2022-10-11 08:13:00 +03:00
}
#[derive(Clone, Debug, From, Hash, PartialEq, Eq, PartialOrd, Ord)]
pub struct UserProxyResponseKey {
user_key_id: u64,
method: String,
error_response: bool,
}
2022-10-10 07:15:07 +03:00
/// key is the (user_key_id, method, error_response)
pub type UserProxyResponseCache = Cache<
2022-10-11 08:13:00 +03:00
UserProxyResponseKey,
2022-10-10 07:15:07 +03:00
Arc<ProxyResponseAggregate>,
hashbrown::hash_map::DefaultHashBuilder,
>;
2022-10-11 08:13:00 +03:00
/// key is the "time bucket's timestamp" (timestamp / period * period)
2022-10-10 07:15:07 +03:00
pub type TimeProxyResponseCache =
2022-10-11 08:13:00 +03:00
Cache<TimeBucketTimestamp, UserProxyResponseCache, hashbrown::hash_map::DefaultHashBuilder>;
2022-10-10 07:15:07 +03:00
pub struct StatEmitter {
chain_id: u64,
db_conn: DatabaseConnection,
period_seconds: u64,
/// the outer cache has a TTL and a handler for expiration
aggregated_proxy_responses: TimeProxyResponseCache,
save_rx: flume::Receiver<UserProxyResponseCache>,
}
2022-10-10 07:15:07 +03:00
/// A stat that we aggregate and then store in a database.
#[derive(Debug, From)]
2022-10-03 21:08:01 +03:00
pub enum Web3ProxyStat {
ProxyResponse(ProxyResponseStat),
2022-10-03 21:08:01 +03:00
}
2022-10-10 07:15:07 +03:00
impl ProxyResponseStat {
// TODO: should RequestMetadata be in an arc? or can we handle refs here?
pub fn new(method: String, authorized_key: AuthorizedKey, metadata: RequestMetadata) -> Self {
2022-10-11 20:34:25 +03:00
let metadata = AsyncMutex::new(metadata);
2022-10-10 07:15:07 +03:00
Self {
user_key_id: authorized_key.user_key_id,
method,
metadata,
2022-10-03 21:08:01 +03:00
}
}
}
impl StatEmitter {
2022-10-10 07:15:07 +03:00
pub fn new(chain_id: u64, db_conn: DatabaseConnection, period_seconds: u64) -> Arc<Self> {
let (save_tx, save_rx) = flume::unbounded();
2022-10-11 08:13:00 +03:00
// this needs to be long enough that there are definitely no outstanding queries
// TODO: what should the "safe" multiplier be? what if something is late?
let ttl_seconds = period_seconds * 3;
2022-10-10 07:15:07 +03:00
let aggregated_proxy_responses = CacheBuilder::default()
2022-10-11 08:13:00 +03:00
.time_to_live(Duration::from_secs(ttl_seconds))
.eviction_listener_with_queued_delivery_mode(move |_, v, _| {
// this function must not panic!
if let Err(err) = save_tx.send(v) {
error!(?err, "unable to save. sender closed!");
}
})
2022-10-10 07:15:07 +03:00
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::new());
let s = Self {
chain_id,
db_conn,
period_seconds,
aggregated_proxy_responses,
save_rx,
2022-10-10 07:15:07 +03:00
};
Arc::new(s)
}
pub async fn spawn(
2022-10-10 07:15:07 +03:00
self: Arc<Self>,
) -> anyhow::Result<(
flume::Sender<Web3ProxyStat>,
JoinHandle<anyhow::Result<()>>,
JoinHandle<anyhow::Result<()>>,
)> {
let (aggregate_tx, aggregate_rx) = flume::unbounded::<Web3ProxyStat>();
2022-10-03 21:08:01 +03:00
2022-10-11 08:13:00 +03:00
// TODO: join and flatten these handles
let aggregate_handle = tokio::spawn(self.clone().aggregate_stats_loop(aggregate_rx));
let save_handle = { tokio::spawn(self.save_stats_loop()) };
2022-10-11 08:13:00 +03:00
Ok((aggregate_tx, aggregate_handle, save_handle))
}
2022-10-11 08:13:00 +03:00
/// simple future that reads the channel and aggregates stats in a local cache.
async fn aggregate_stats_loop(
self: Arc<Self>,
aggregate_rx: flume::Receiver<Web3ProxyStat>,
) -> anyhow::Result<()> {
// TODO: select on shutdown handle so we can be sure to save every aggregate!
while let Ok(x) = aggregate_rx.recv_async().await {
trace!(?x, "aggregating stat");
2022-10-11 08:13:00 +03:00
// TODO: increment global stats (in redis? in local cache for prometheus?)
2022-10-11 08:13:00 +03:00
// TODO: batch stats? spawn this?
// TODO: where can we wait on this handle?
let clone = self.clone();
tokio::spawn(async move { clone.aggregate_stat(x).await });
2022-10-10 07:15:07 +03:00
2022-10-11 08:13:00 +03:00
// no need to save manually. they save on expire
}
2022-10-11 08:13:00 +03:00
// shutting down. force a save
// we do not use invalidate_all because that is done on a background thread
for (key, _) in self.aggregated_proxy_responses.into_iter() {
self.aggregated_proxy_responses.invalidate(&key).await;
}
2022-10-03 21:08:01 +03:00
2022-10-11 08:13:00 +03:00
info!("stat aggregator exited");
2022-10-10 07:15:07 +03:00
2022-10-11 08:13:00 +03:00
Ok(())
}
2022-10-03 21:08:01 +03:00
2022-10-11 08:13:00 +03:00
async fn save_stats_loop(self: Arc<Self>) -> anyhow::Result<()> {
while let Ok(x) = self.save_rx.recv_async().await {
// TODO: batch these
for (k, v) in x.into_iter() {
info!(?k, "saving");
let period_datetime = Utc.timestamp(v.period_timestamp as i64, 0);
let frontend_requests = v.frontend_requests.load(Ordering::Acquire);
let backend_requests = v.backend_requests.load(Ordering::Acquire);
let backend_retries = v.backend_retries.load(Ordering::Acquire);
let cache_misses = v.cache_misses.load(Ordering::Acquire);
let cache_hits = v.cache_hits.load(Ordering::Acquire);
2022-10-11 20:34:25 +03:00
let sum_request_bytes = v.sum_request_bytes.load(Ordering::Acquire);
let sum_response_millis = v.sum_response_millis.load(Ordering::Acquire);
let sum_response_bytes = v.sum_response_bytes.load(Ordering::Acquire);
2022-10-11 08:13:00 +03:00
2022-10-11 20:34:25 +03:00
let histograms = v.histograms.lock().await;
let request_bytes = &histograms.request_bytes;
2022-10-11 08:13:00 +03:00
let min_request_bytes = request_bytes.min();
let mean_request_bytes = request_bytes.mean();
let p50_request_bytes = request_bytes.value_at_quantile(0.50);
let p90_request_bytes = request_bytes.value_at_quantile(0.90);
let p99_request_bytes = request_bytes.value_at_quantile(0.99);
let max_request_bytes = request_bytes.max();
2022-10-11 20:34:25 +03:00
let response_millis = &histograms.response_millis;
2022-10-11 08:13:00 +03:00
let min_response_millis = response_millis.min();
let mean_response_millis = response_millis.mean();
let p50_response_millis = response_millis.value_at_quantile(0.50);
let p90_response_millis = response_millis.value_at_quantile(0.90);
let p99_response_millis = response_millis.value_at_quantile(0.99);
let max_response_millis = response_millis.max();
2022-10-11 20:34:25 +03:00
let response_bytes = &histograms.response_bytes;
2022-10-11 08:13:00 +03:00
let min_response_bytes = response_bytes.min();
let mean_response_bytes = response_bytes.mean();
let p50_response_bytes = response_bytes.value_at_quantile(0.50);
let p90_response_bytes = response_bytes.value_at_quantile(0.90);
let p99_response_bytes = response_bytes.value_at_quantile(0.99);
let max_response_bytes = response_bytes.max();
2022-10-11 20:34:25 +03:00
drop(histograms);
2022-10-11 08:13:00 +03:00
let stat = rpc_accounting::ActiveModel {
2022-10-11 20:34:25 +03:00
id: sea_orm::NotSet,
2022-10-11 08:13:00 +03:00
user_key_id: sea_orm::Set(k.user_key_id),
chain_id: sea_orm::Set(self.chain_id),
method: sea_orm::Set(k.method.clone()),
error_response: sea_orm::Set(k.error_response),
period_datetime: sea_orm::Set(period_datetime),
frontend_requests: sea_orm::Set(frontend_requests),
backend_requests: sea_orm::Set(backend_requests),
backend_retries: sea_orm::Set(backend_retries),
cache_misses: sea_orm::Set(cache_misses),
cache_hits: sea_orm::Set(cache_hits),
sum_request_bytes: sea_orm::Set(sum_request_bytes),
min_request_bytes: sea_orm::Set(min_request_bytes),
mean_request_bytes: sea_orm::Set(mean_request_bytes),
p50_request_bytes: sea_orm::Set(p50_request_bytes),
p90_request_bytes: sea_orm::Set(p90_request_bytes),
p99_request_bytes: sea_orm::Set(p99_request_bytes),
max_request_bytes: sea_orm::Set(max_request_bytes),
sum_response_millis: sea_orm::Set(sum_response_millis),
min_response_millis: sea_orm::Set(min_response_millis),
mean_response_millis: sea_orm::Set(mean_response_millis),
p50_response_millis: sea_orm::Set(p50_response_millis),
p90_response_millis: sea_orm::Set(p90_response_millis),
p99_response_millis: sea_orm::Set(p99_response_millis),
max_response_millis: sea_orm::Set(max_response_millis),
sum_response_bytes: sea_orm::Set(sum_response_bytes),
min_response_bytes: sea_orm::Set(min_response_bytes),
mean_response_bytes: sea_orm::Set(mean_response_bytes),
p50_response_bytes: sea_orm::Set(p50_response_bytes),
p90_response_bytes: sea_orm::Set(p90_response_bytes),
p99_response_bytes: sea_orm::Set(p99_response_bytes),
max_response_bytes: sea_orm::Set(max_response_bytes),
};
// TODO: if this fails, rever adding the user, too
if let Err(err) = stat
.save(&self.db_conn)
.await
.context("Saving rpc_accounting stat")
{
error!(?err, "unable to save aggregated stats");
}
}
2022-10-11 08:13:00 +03:00
}
2022-10-03 21:08:01 +03:00
2022-10-11 08:13:00 +03:00
info!("stat saver exited");
2022-10-03 21:08:01 +03:00
2022-10-11 08:13:00 +03:00
Ok(())
2022-10-03 21:08:01 +03:00
}
2022-10-10 07:15:07 +03:00
pub async fn aggregate_stat(&self, stat: Web3ProxyStat) -> anyhow::Result<()> {
trace!(?stat, "aggregating");
2022-10-10 07:15:07 +03:00
match stat {
Web3ProxyStat::ProxyResponse(x) => {
2022-10-11 08:13:00 +03:00
// TODO: move this whole closure to another function?
2022-10-11 20:34:25 +03:00
let metadata = x.metadata.lock().await;
2022-10-11 08:13:00 +03:00
// TODO: move period calculation into another function?
let period_timestamp =
2022-10-11 20:34:25 +03:00
metadata.timestamp / self.period_seconds * self.period_seconds;
2022-10-10 07:15:07 +03:00
2022-10-11 08:13:00 +03:00
// get the user cache for the current period
2022-10-10 07:15:07 +03:00
let user_cache = self
.aggregated_proxy_responses
2022-10-11 08:13:00 +03:00
.get_with(period_timestamp, async move {
2022-10-10 07:15:07 +03:00
CacheBuilder::default()
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::new())
})
.await;
2022-10-11 20:34:25 +03:00
let key = (x.user_key_id, x.method, metadata.error_response).into();
2022-10-10 07:15:07 +03:00
let user_aggregate = user_cache
.get_with(key, async move {
2022-10-11 20:34:25 +03:00
let histograms = ProxyResponseHistograms::default();
2022-10-10 07:15:07 +03:00
let aggregate = ProxyResponseAggregate {
2022-10-11 08:13:00 +03:00
period_timestamp,
// start most things at 0 because we add outside this getter
frontend_requests: 0.into(),
backend_requests: 0.into(),
backend_retries: 0.into(),
cache_misses: 0.into(),
cache_hits: 0.into(),
sum_request_bytes: 0.into(),
sum_response_bytes: 0.into(),
sum_response_millis: 0.into(),
2022-10-11 20:34:25 +03:00
histograms: AsyncMutex::new(histograms),
2022-10-10 07:15:07 +03:00
};
Arc::new(aggregate)
})
.await;
2022-10-11 20:34:25 +03:00
// a stat always come from just 1 frontend request
user_aggregate
.frontend_requests
.fetch_add(1, Ordering::Acquire);
2022-10-11 20:34:25 +03:00
// a stat might have multiple backend requests
user_aggregate
.backend_requests
.fetch_add(metadata.backend_requests, Ordering::Acquire);
2022-10-11 08:13:00 +03:00
user_aggregate
.sum_request_bytes
2022-10-11 20:34:25 +03:00
.fetch_add(metadata.request_bytes, Ordering::Release);
2022-10-11 08:13:00 +03:00
user_aggregate
.sum_response_bytes
2022-10-11 20:34:25 +03:00
.fetch_add(metadata.response_bytes, Ordering::Release);
2022-10-11 20:34:25 +03:00
user_aggregate
.sum_response_millis
.fetch_add(metadata.response_millis, Ordering::Release);
2022-10-11 08:13:00 +03:00
2022-10-11 20:34:25 +03:00
{
let mut histograms = user_aggregate.histograms.lock().await;
2022-10-11 08:13:00 +03:00
2022-10-11 20:34:25 +03:00
// TODO: record_correct?
histograms.request_bytes.record(metadata.request_bytes)?;
2022-10-11 08:13:00 +03:00
2022-10-11 20:34:25 +03:00
histograms.response_bytes.record(metadata.response_bytes)?;
2022-10-11 08:13:00 +03:00
2022-10-11 20:34:25 +03:00
histograms
.response_millis
.record(metadata.response_millis)?;
}
2022-10-10 07:15:07 +03:00
}
}
Ok(())
}
2022-10-03 21:08:01 +03:00
}