stats almost work

just need to update the migration to match our new ideas for columns.

should also make the shutdown smarter so that nothing gets lost
This commit is contained in:
Bryan Stitt 2022-10-10 05:35:25 +00:00
parent 25aa68a5bf
commit 825370b5d9
4 changed files with 143 additions and 50 deletions

View File

@ -13,7 +13,7 @@ pub struct Model {
pub timestamp: DateTimeUtc,
pub method: String,
pub backend_requests: u32,
pub error_response: i8,
pub error_response: bool,
pub query_millis: u32,
pub request_bytes: u32,
pub response_bytes: u32,

View File

@ -267,14 +267,15 @@ impl Web3ProxyApp {
// we do this in a channel so we don't slow down our response to the users
let stat_sender = if let Some(db_conn) = db_conn.clone() {
// TODO: sender and receiver here are a little confusing. because the thing that reads the receiver is what actually submits the stats
let (stat_sender, stat_handle) = {
// TODO: period from
let (stat_sender, stat_handle, save_handle) = {
// TODO: period from config instead of always being 60 seconds
let emitter = StatEmitter::new(top_config.app.chain_id, db_conn, 60);
emitter.spawn().await?
};
handles.push(stat_handle);
handles.push(save_handle);
Some(stat_sender)
} else {

View File

@ -3,7 +3,7 @@ use crate::app::{UserKeyData, Web3ProxyApp};
use crate::jsonrpc::JsonRpcRequest;
use anyhow::Context;
use axum::headers::{authorization::Bearer, Origin, Referer, UserAgent};
use chrono::{DateTime, Utc};
use chrono::{Utc};
use deferred_rate_limiter::DeferredRateLimitResult;
use entities::user_keys;
use ipnet::IpNet;
@ -54,7 +54,7 @@ pub struct AuthorizedKey {
#[derive(Debug, Default, Serialize)]
pub struct RequestMetadata {
pub datetime: DateTime<Utc>,
pub timestamp: u64,
pub request_bytes: AtomicUsize,
pub backend_requests: AtomicU16,
pub error_response: AtomicBool,
@ -78,7 +78,7 @@ impl RequestMetadata {
Self {
request_bytes: request_bytes.into(),
datetime: Utc::now(),
timestamp: Utc::now().timestamp() as u64,
..Default::default()
}
}

View File

@ -1,11 +1,11 @@
use crate::frontend::authorization::{AuthorizedKey, RequestMetadata};
use chrono::{DateTime, Utc};
use anyhow::Context;
use chrono::{TimeZone, Utc};
use derive_more::From;
use entities::rpc_accounting;
use moka::future::{Cache, CacheBuilder};
use parking_lot::{Mutex, RwLock};
use sea_orm::DatabaseConnection;
use std::sync::atomic::Ordering;
use sea_orm::{ActiveModelTrait, DatabaseConnection};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::{
sync::atomic::{AtomicU32, AtomicU64},
@ -29,15 +29,14 @@ pub struct ProxyResponseAggregate {
// user_key_id: u64,
// method: String,
// error_response: bool,
first_timestamp: u64,
frontend_requests: AtomicU32,
backend_requests: AtomicU32,
first_datetime: DateTime<Utc>,
// TODO: would like to not need a mutex. see how it performs before caring too much
last_timestamp: Mutex<DateTime<Utc>>,
last_timestamp: AtomicU64,
first_response_millis: u32,
sum_response_millis: AtomicU32,
sum_request_bytes: AtomicU32,
sum_response_bytes: AtomicU32,
sum_request_bytes: AtomicUsize,
sum_response_bytes: AtomicUsize,
}
/// key is the (user_key_id, method, error_response)
@ -56,6 +55,7 @@ pub struct StatEmitter {
period_seconds: u64,
/// the outer cache has a TTL and a handler for expiration
aggregated_proxy_responses: TimeProxyResponseCache,
save_rx: flume::Receiver<UserProxyResponseCache>,
}
/// A stat that we aggregate and then store in a database.
@ -77,8 +77,15 @@ impl ProxyResponseStat {
impl StatEmitter {
pub fn new(chain_id: u64, db_conn: DatabaseConnection, period_seconds: u64) -> Arc<Self> {
let (save_tx, save_rx) = flume::unbounded();
let aggregated_proxy_responses = CacheBuilder::default()
.time_to_live(Duration::from_secs(period_seconds * 3 / 2))
.eviction_listener_with_queued_delivery_mode(move |k, v, r| {
if let Err(err) = save_tx.send(v) {
error!(?err, "unable to save. sender closed!");
}
})
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::new());
let s = Self {
@ -86,6 +93,7 @@ impl StatEmitter {
db_conn,
period_seconds,
aggregated_proxy_responses,
save_rx,
};
Arc::new(s)
@ -93,46 +101,110 @@ impl StatEmitter {
pub async fn spawn(
self: Arc<Self>,
) -> anyhow::Result<(flume::Sender<Web3ProxyStat>, JoinHandle<anyhow::Result<()>>)> {
let (tx, rx) = flume::unbounded::<Web3ProxyStat>();
) -> anyhow::Result<(
flume::Sender<Web3ProxyStat>,
JoinHandle<anyhow::Result<()>>,
JoinHandle<anyhow::Result<()>>,
)> {
let (aggregate_tx, aggregate_rx) = flume::unbounded::<Web3ProxyStat>();
// simple future that reads the channel and emits stats
let f = async move {
// TODO: select on shutdown handle so we can be sure to save every aggregate!
while let Ok(x) = rx.recv_async().await {
trace!(?x, "emitting stat");
let aggregate_f = {
let aggregated_proxy_responses = self.aggregated_proxy_responses.clone();
let clone = self.clone();
async move {
// TODO: select on shutdown handle so we can be sure to save every aggregate!
while let Ok(x) = aggregate_rx.recv_async().await {
trace!(?x, "aggregating stat");
// TODO: increment global stats (in redis? in local cache for prometheus?)
// TODO: increment global stats (in redis? in local cache for prometheus?)
let clone = self.clone();
// TODO: batch stats? spawn this?
// TODO: where can we wait on this handle?
let clone = clone.clone();
tokio::spawn(async move { clone.aggregate_stat(x).await });
// TODO: batch stats? spawn this?
// TODO: where can we wait on this handle?
tokio::spawn(async move { clone.queue_user_stat(x).await });
// no need to save manually. they save on expire
}
// no need to save manually. they save on expire
// shutting down. force a save
// TODO: this is handled by a background thread! we need to make sure the thread survives long enough to do its work!
aggregated_proxy_responses.invalidate_all();
info!("stat aggregator exited");
Ok(())
}
// shutting down. force a save
self.save_user_stats().await?;
info!("stat emitter exited");
Ok(())
};
let handle = tokio::spawn(f);
let save_f = {
let db_conn = self.db_conn.clone();
let save_rx = self.save_rx.clone();
let chain_id = self.chain_id;
async move {
while let Ok(x) = save_rx.recv_async().await {
// TODO: batch these
for (k, v) in x.into_iter() {
// TODO: try_unwrap()?
let (user_key_id, method, error_response) = k.as_ref();
Ok((tx, handle))
info!(?user_key_id, ?method, ?error_response, "saving");
let first_timestamp = Utc.timestamp(v.first_timestamp as i64, 0);
let frontend_requests = v.frontend_requests.load(Ordering::Acquire);
let backend_requests = v.backend_requests.load(Ordering::Acquire);
let first_response_millis = v.first_response_millis;
let sum_request_bytes = v.sum_request_bytes.load(Ordering::Acquire);
let sum_response_millis = v.sum_response_millis.load(Ordering::Acquire);
let sum_response_bytes = v.sum_response_bytes.load(Ordering::Acquire);
let stat = rpc_accounting::ActiveModel {
user_key_id: sea_orm::Set(*user_key_id),
chain_id: sea_orm::Set(chain_id),
method: sea_orm::Set(method.clone()),
error_response: sea_orm::Set(*error_response),
first_timestamp: sea_orm::Set(first_timestamp),
frontend_requests: sea_orm::Set(frontend_requests)
backend_requests: sea_orm::Set(backend_requests),
first_query_millis: sea_orm::Set(first_query_millis),
sum_request_bytes: sea_orm::Set(sum_request_bytes),
sum_response_millis: sea_orm::Set(sum_response_millis),
sum_response_bytes: sea_orm::Set(sum_response_bytes),
..Default::default()
};
// TODO: if this fails, rever adding the user, too
if let Err(err) = stat
.save(&db_conn)
.await
.context("Saving rpc_accounting stat")
{
error!(?err, "unable to save aggregated stats");
}
}
}
info!("stat saver exited");
Ok(())
}
};
// TODO: join and flatten these handles
let aggregate_handle = tokio::spawn(aggregate_f);
let save_handle = tokio::spawn(save_f);
Ok((aggregate_tx, aggregate_handle, save_handle))
}
pub async fn queue_user_stat(&self, stat: Web3ProxyStat) -> anyhow::Result<()> {
pub async fn aggregate_stat(&self, stat: Web3ProxyStat) -> anyhow::Result<()> {
trace!(?stat, "aggregating");
match stat {
Web3ProxyStat::ProxyResponse(x) => {
// TODO: move this into another function?
// get the user cache for the current time bucket
let time_bucket = (x.metadata.datetime.timestamp() as u64) / self.period_seconds;
let time_bucket = x.metadata.timestamp / self.period_seconds;
let user_cache = self
.aggregated_proxy_responses
.get_with(time_bucket, async move {
@ -143,18 +215,18 @@ impl StatEmitter {
let error_response = x.metadata.error_response.load(Ordering::Acquire);
let key = (x.user_key_id, x.method.clone(), error_response);
let key = (x.user_key_id, x.method, error_response);
let timestamp = x.metadata.timestamp;
let response_millis = x.metadata.response_millis.load(Ordering::Acquire);
let user_aggregate = user_cache
.get_with(key, async move {
let last_timestamp = Mutex::new(x.metadata.datetime);
let last_timestamp = timestamp.into();
let aggregate = ProxyResponseAggregate {
first_datetime: x.metadata.datetime,
first_response_millis: x
.metadata
.response_millis
.load(Ordering::Acquire),
first_timestamp: timestamp,
first_response_millis: response_millis,
last_timestamp,
..Default::default()
};
@ -163,14 +235,34 @@ impl StatEmitter {
})
.await;
todo!();
user_aggregate
.backend_requests
.fetch_add(1, Ordering::Acquire);
user_aggregate
.frontend_requests
.fetch_add(1, Ordering::Acquire);
let request_bytes = x.metadata.request_bytes.load(Ordering::Acquire);
user_aggregate
.sum_request_bytes
.fetch_add(request_bytes, Ordering::Release);
let response_bytes = x.metadata.response_bytes.load(Ordering::Acquire);
user_aggregate
.sum_response_bytes
.fetch_add(response_bytes, Ordering::Release);
user_aggregate
.sum_response_millis
.fetch_add(response_millis, Ordering::Release);
user_aggregate
.last_timestamp
.fetch_max(x.metadata.timestamp, Ordering::Release);
}
}
Ok(())
}
pub async fn save_user_stats(&self) -> anyhow::Result<()> {
todo!();
}
}