diff --git a/entities/src/rpc_accounting.rs b/entities/src/rpc_accounting.rs index e39a8ac1..d236378c 100644 --- a/entities/src/rpc_accounting.rs +++ b/entities/src/rpc_accounting.rs @@ -13,7 +13,7 @@ pub struct Model { pub timestamp: DateTimeUtc, pub method: String, pub backend_requests: u32, - pub error_response: i8, + pub error_response: bool, pub query_millis: u32, pub request_bytes: u32, pub response_bytes: u32, diff --git a/web3_proxy/src/app.rs b/web3_proxy/src/app.rs index ea172370..72a7071d 100644 --- a/web3_proxy/src/app.rs +++ b/web3_proxy/src/app.rs @@ -267,14 +267,15 @@ impl Web3ProxyApp { // we do this in a channel so we don't slow down our response to the users let stat_sender = if let Some(db_conn) = db_conn.clone() { // TODO: sender and receiver here are a little confusing. because the thing that reads the receiver is what actually submits the stats - let (stat_sender, stat_handle) = { - // TODO: period from + let (stat_sender, stat_handle, save_handle) = { + // TODO: period from config instead of always being 60 seconds let emitter = StatEmitter::new(top_config.app.chain_id, db_conn, 60); emitter.spawn().await? }; handles.push(stat_handle); + handles.push(save_handle); Some(stat_sender) } else { diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index 207a0bb4..c9f25177 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -3,7 +3,7 @@ use crate::app::{UserKeyData, Web3ProxyApp}; use crate::jsonrpc::JsonRpcRequest; use anyhow::Context; use axum::headers::{authorization::Bearer, Origin, Referer, UserAgent}; -use chrono::{DateTime, Utc}; +use chrono::{Utc}; use deferred_rate_limiter::DeferredRateLimitResult; use entities::user_keys; use ipnet::IpNet; @@ -54,7 +54,7 @@ pub struct AuthorizedKey { #[derive(Debug, Default, Serialize)] pub struct RequestMetadata { - pub datetime: DateTime, + pub timestamp: u64, pub request_bytes: AtomicUsize, pub backend_requests: AtomicU16, pub error_response: AtomicBool, @@ -78,7 +78,7 @@ impl RequestMetadata { Self { request_bytes: request_bytes.into(), - datetime: Utc::now(), + timestamp: Utc::now().timestamp() as u64, ..Default::default() } } diff --git a/web3_proxy/src/stats.rs b/web3_proxy/src/stats.rs index e0f7fe11..ec2ae9c3 100644 --- a/web3_proxy/src/stats.rs +++ b/web3_proxy/src/stats.rs @@ -1,11 +1,11 @@ use crate::frontend::authorization::{AuthorizedKey, RequestMetadata}; -use chrono::{DateTime, Utc}; +use anyhow::Context; +use chrono::{TimeZone, Utc}; use derive_more::From; use entities::rpc_accounting; use moka::future::{Cache, CacheBuilder}; -use parking_lot::{Mutex, RwLock}; -use sea_orm::DatabaseConnection; -use std::sync::atomic::Ordering; +use sea_orm::{ActiveModelTrait, DatabaseConnection}; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::{ sync::atomic::{AtomicU32, AtomicU64}, @@ -29,15 +29,14 @@ pub struct ProxyResponseAggregate { // user_key_id: u64, // method: String, // error_response: bool, + first_timestamp: u64, frontend_requests: AtomicU32, backend_requests: AtomicU32, - first_datetime: DateTime, - // TODO: would like to not need a mutex. see how it performs before caring too much - last_timestamp: Mutex>, + last_timestamp: AtomicU64, first_response_millis: u32, sum_response_millis: AtomicU32, - sum_request_bytes: AtomicU32, - sum_response_bytes: AtomicU32, + sum_request_bytes: AtomicUsize, + sum_response_bytes: AtomicUsize, } /// key is the (user_key_id, method, error_response) @@ -56,6 +55,7 @@ pub struct StatEmitter { period_seconds: u64, /// the outer cache has a TTL and a handler for expiration aggregated_proxy_responses: TimeProxyResponseCache, + save_rx: flume::Receiver, } /// A stat that we aggregate and then store in a database. @@ -77,8 +77,15 @@ impl ProxyResponseStat { impl StatEmitter { pub fn new(chain_id: u64, db_conn: DatabaseConnection, period_seconds: u64) -> Arc { + let (save_tx, save_rx) = flume::unbounded(); + let aggregated_proxy_responses = CacheBuilder::default() .time_to_live(Duration::from_secs(period_seconds * 3 / 2)) + .eviction_listener_with_queued_delivery_mode(move |k, v, r| { + if let Err(err) = save_tx.send(v) { + error!(?err, "unable to save. sender closed!"); + } + }) .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::new()); let s = Self { @@ -86,6 +93,7 @@ impl StatEmitter { db_conn, period_seconds, aggregated_proxy_responses, + save_rx, }; Arc::new(s) @@ -93,46 +101,110 @@ impl StatEmitter { pub async fn spawn( self: Arc, - ) -> anyhow::Result<(flume::Sender, JoinHandle>)> { - let (tx, rx) = flume::unbounded::(); + ) -> anyhow::Result<( + flume::Sender, + JoinHandle>, + JoinHandle>, + )> { + let (aggregate_tx, aggregate_rx) = flume::unbounded::(); // simple future that reads the channel and emits stats - let f = async move { - // TODO: select on shutdown handle so we can be sure to save every aggregate! - while let Ok(x) = rx.recv_async().await { - trace!(?x, "emitting stat"); + let aggregate_f = { + let aggregated_proxy_responses = self.aggregated_proxy_responses.clone(); + let clone = self.clone(); + async move { + // TODO: select on shutdown handle so we can be sure to save every aggregate! + while let Ok(x) = aggregate_rx.recv_async().await { + trace!(?x, "aggregating stat"); - // TODO: increment global stats (in redis? in local cache for prometheus?) + // TODO: increment global stats (in redis? in local cache for prometheus?) - let clone = self.clone(); + // TODO: batch stats? spawn this? + // TODO: where can we wait on this handle? + let clone = clone.clone(); + tokio::spawn(async move { clone.aggregate_stat(x).await }); - // TODO: batch stats? spawn this? - // TODO: where can we wait on this handle? - tokio::spawn(async move { clone.queue_user_stat(x).await }); + // no need to save manually. they save on expire + } - // no need to save manually. they save on expire + // shutting down. force a save + // TODO: this is handled by a background thread! we need to make sure the thread survives long enough to do its work! + aggregated_proxy_responses.invalidate_all(); + + info!("stat aggregator exited"); + + Ok(()) } - - // shutting down. force a save - self.save_user_stats().await?; - - info!("stat emitter exited"); - - Ok(()) }; - let handle = tokio::spawn(f); + let save_f = { + let db_conn = self.db_conn.clone(); + let save_rx = self.save_rx.clone(); + let chain_id = self.chain_id; + async move { + while let Ok(x) = save_rx.recv_async().await { + // TODO: batch these + for (k, v) in x.into_iter() { + // TODO: try_unwrap()? + let (user_key_id, method, error_response) = k.as_ref(); - Ok((tx, handle)) + info!(?user_key_id, ?method, ?error_response, "saving"); + + let first_timestamp = Utc.timestamp(v.first_timestamp as i64, 0); + let frontend_requests = v.frontend_requests.load(Ordering::Acquire); + let backend_requests = v.backend_requests.load(Ordering::Acquire); + let first_response_millis = v.first_response_millis; + let sum_request_bytes = v.sum_request_bytes.load(Ordering::Acquire); + let sum_response_millis = v.sum_response_millis.load(Ordering::Acquire); + let sum_response_bytes = v.sum_response_bytes.load(Ordering::Acquire); + + let stat = rpc_accounting::ActiveModel { + user_key_id: sea_orm::Set(*user_key_id), + chain_id: sea_orm::Set(chain_id), + method: sea_orm::Set(method.clone()), + error_response: sea_orm::Set(*error_response), + first_timestamp: sea_orm::Set(first_timestamp), + frontend_requests: sea_orm::Set(frontend_requests) + backend_requests: sea_orm::Set(backend_requests), + first_query_millis: sea_orm::Set(first_query_millis), + sum_request_bytes: sea_orm::Set(sum_request_bytes), + sum_response_millis: sea_orm::Set(sum_response_millis), + sum_response_bytes: sea_orm::Set(sum_response_bytes), + ..Default::default() + }; + + // TODO: if this fails, rever adding the user, too + if let Err(err) = stat + .save(&db_conn) + .await + .context("Saving rpc_accounting stat") + { + error!(?err, "unable to save aggregated stats"); + } + } + } + + info!("stat saver exited"); + + Ok(()) + } + }; + + // TODO: join and flatten these handles + let aggregate_handle = tokio::spawn(aggregate_f); + let save_handle = tokio::spawn(save_f); + + Ok((aggregate_tx, aggregate_handle, save_handle)) } - pub async fn queue_user_stat(&self, stat: Web3ProxyStat) -> anyhow::Result<()> { + pub async fn aggregate_stat(&self, stat: Web3ProxyStat) -> anyhow::Result<()> { + trace!(?stat, "aggregating"); match stat { Web3ProxyStat::ProxyResponse(x) => { // TODO: move this into another function? // get the user cache for the current time bucket - let time_bucket = (x.metadata.datetime.timestamp() as u64) / self.period_seconds; + let time_bucket = x.metadata.timestamp / self.period_seconds; let user_cache = self .aggregated_proxy_responses .get_with(time_bucket, async move { @@ -143,18 +215,18 @@ impl StatEmitter { let error_response = x.metadata.error_response.load(Ordering::Acquire); - let key = (x.user_key_id, x.method.clone(), error_response); + let key = (x.user_key_id, x.method, error_response); + + let timestamp = x.metadata.timestamp; + let response_millis = x.metadata.response_millis.load(Ordering::Acquire); let user_aggregate = user_cache .get_with(key, async move { - let last_timestamp = Mutex::new(x.metadata.datetime); + let last_timestamp = timestamp.into(); let aggregate = ProxyResponseAggregate { - first_datetime: x.metadata.datetime, - first_response_millis: x - .metadata - .response_millis - .load(Ordering::Acquire), + first_timestamp: timestamp, + first_response_millis: response_millis, last_timestamp, ..Default::default() }; @@ -163,14 +235,34 @@ impl StatEmitter { }) .await; - todo!(); + user_aggregate + .backend_requests + .fetch_add(1, Ordering::Acquire); + + user_aggregate + .frontend_requests + .fetch_add(1, Ordering::Acquire); + + let request_bytes = x.metadata.request_bytes.load(Ordering::Acquire); + user_aggregate + .sum_request_bytes + .fetch_add(request_bytes, Ordering::Release); + + let response_bytes = x.metadata.response_bytes.load(Ordering::Acquire); + user_aggregate + .sum_response_bytes + .fetch_add(response_bytes, Ordering::Release); + + user_aggregate + .sum_response_millis + .fetch_add(response_millis, Ordering::Release); + + user_aggregate + .last_timestamp + .fetch_max(x.metadata.timestamp, Ordering::Release); } } Ok(()) } - - pub async fn save_user_stats(&self) -> anyhow::Result<()> { - todo!(); - } }