2022-10-07 05:15:53 +03:00
|
|
|
use anyhow::Context;
|
2022-10-03 23:02:05 +03:00
|
|
|
use derive_more::From;
|
2022-10-07 05:15:53 +03:00
|
|
|
use redis_rate_limiter::{redis, RedisConnection};
|
|
|
|
use std::fmt::Display;
|
2022-10-03 21:08:01 +03:00
|
|
|
use tokio::task::JoinHandle;
|
2022-10-07 05:15:53 +03:00
|
|
|
use tracing::{debug, error, info};
|
2022-10-03 21:08:01 +03:00
|
|
|
|
2022-10-03 23:02:05 +03:00
|
|
|
use crate::frontend::authorization::AuthorizedRequest;
|
|
|
|
|
|
|
|
#[derive(Debug)]
|
|
|
|
pub enum ProxyResponseType {
|
|
|
|
CacheHit,
|
|
|
|
CacheMiss,
|
|
|
|
Error,
|
|
|
|
}
|
|
|
|
|
2022-10-07 05:15:53 +03:00
|
|
|
impl Display for ProxyResponseType {
|
|
|
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
|
|
match self {
|
2022-10-07 05:21:24 +03:00
|
|
|
ProxyResponseType::CacheHit => f.write_str("ch"),
|
|
|
|
ProxyResponseType::CacheMiss => f.write_str("cm"),
|
|
|
|
ProxyResponseType::Error => f.write_str("err"),
|
2022-10-03 23:02:05 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// TODO: where should this be defined?
|
2022-10-07 05:15:53 +03:00
|
|
|
#[derive(Debug)]
|
|
|
|
pub struct ProxyResponseStat(String);
|
2022-10-03 21:08:01 +03:00
|
|
|
|
2022-10-07 05:15:53 +03:00
|
|
|
/// A very basic stat that we store in redis.
|
|
|
|
/// This probably belongs in a true time series database like influxdb, but client
|
2022-10-03 23:02:05 +03:00
|
|
|
impl ProxyResponseStat {
|
|
|
|
pub fn new(method: String, response_type: ProxyResponseType, who: &AuthorizedRequest) -> Self {
|
2022-10-07 05:15:53 +03:00
|
|
|
// TODO: what order?
|
|
|
|
// TODO: app specific prefix. need at least the chain id
|
|
|
|
let redis_key = format!("proxy_response:{}:{}:{}", method, response_type, who);
|
|
|
|
|
|
|
|
Self(redis_key)
|
2022-10-03 23:02:05 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Debug, From)]
|
2022-10-03 21:08:01 +03:00
|
|
|
pub enum Web3ProxyStat {
|
2022-10-03 23:02:05 +03:00
|
|
|
ProxyResponse(ProxyResponseStat),
|
2022-10-03 21:08:01 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
impl Web3ProxyStat {
|
2022-10-07 05:15:53 +03:00
|
|
|
fn into_redis_key(self, chain_id: u64) -> String {
|
2022-10-03 21:08:01 +03:00
|
|
|
match self {
|
2022-10-07 05:15:53 +03:00
|
|
|
Self::ProxyResponse(x) => format!("{}:{}", x.0, chain_id),
|
2022-10-03 21:08:01 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
pub struct StatEmitter;
|
|
|
|
|
|
|
|
impl StatEmitter {
|
2022-10-07 05:15:53 +03:00
|
|
|
pub async fn spawn(
|
|
|
|
chain_id: u64,
|
|
|
|
mut redis_conn: RedisConnection,
|
|
|
|
) -> anyhow::Result<(flume::Sender<Web3ProxyStat>, JoinHandle<anyhow::Result<()>>)> {
|
2022-10-03 21:08:01 +03:00
|
|
|
let (tx, rx) = flume::unbounded::<Web3ProxyStat>();
|
|
|
|
|
2022-10-07 05:15:53 +03:00
|
|
|
// simple future that reads the channel and emits stats
|
2022-10-03 21:08:01 +03:00
|
|
|
let f = async move {
|
|
|
|
while let Ok(x) = rx.recv_async().await {
|
2022-10-07 05:15:53 +03:00
|
|
|
// TODO: batch stats? spawn this?
|
|
|
|
|
|
|
|
let x = x.into_redis_key(chain_id);
|
2022-10-03 23:02:05 +03:00
|
|
|
|
2022-10-07 05:15:53 +03:00
|
|
|
// TODO: this is too loud. just doing it for dev
|
|
|
|
debug!(?x, "emitting stat");
|
2022-10-03 23:02:05 +03:00
|
|
|
|
2022-10-07 05:21:24 +03:00
|
|
|
if let Err(err) = redis::Cmd::incr(&x, 1)
|
2022-10-07 05:15:53 +03:00
|
|
|
.query_async::<_, ()>(&mut redis_conn)
|
|
|
|
.await
|
|
|
|
.context("incrementing stat")
|
|
|
|
{
|
|
|
|
error!(?err, "emitting stat")
|
2022-10-03 21:08:01 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
info!("stat emitter exited");
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
};
|
|
|
|
|
|
|
|
let handle = tokio::spawn(f);
|
|
|
|
|
2022-10-07 05:15:53 +03:00
|
|
|
Ok((tx, handle))
|
2022-10-03 21:08:01 +03:00
|
|
|
}
|
|
|
|
}
|