From cfd26940a9d2aaaa1162f28017dea20c29c472e7 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Tue, 11 Oct 2022 05:13:00 +0000 Subject: [PATCH] this might be too many columns --- Cargo.lock | 5 +- entities/src/rpc_accounting.rs | 35 +- migration/src/m20221007_213828_accounting.rs | 186 ++++++++-- web3_proxy/Cargo.toml | 1 + web3_proxy/src/frontend/authorization.rs | 12 +- web3_proxy/src/stats.rs | 344 ++++++++++++------- 6 files changed, 416 insertions(+), 167 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 338a17bb..e63e8869 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2208,9 +2208,9 @@ dependencies = [ [[package]] name = "hdrhistogram" -version = "7.5.1" +version = "7.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ea9fe3952d32674a14e0975009a3547af9ea364995b5ec1add2e23c2ae523ab" +checksum = "7f19b9f54f7c7f55e31401bb647626ce0cf0f67b0004982ce815b3ee72a02aa8" dependencies = [ "base64 0.13.0", "byteorder", @@ -5513,6 +5513,7 @@ dependencies = [ "futures", "handlebars", "hashbrown", + "hdrhistogram", "http", "ipnet", "metered", diff --git a/entities/src/rpc_accounting.rs b/entities/src/rpc_accounting.rs index d236378c..1fddc92b 100644 --- a/entities/src/rpc_accounting.rs +++ b/entities/src/rpc_accounting.rs @@ -3,20 +3,43 @@ use sea_orm::entity::prelude::*; use serde::{Deserialize, Serialize}; -#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel, Serialize, Deserialize)] +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Serialize, Deserialize)] #[sea_orm(table_name = "rpc_accounting")] +/// TODO: make migrations match these items pub struct Model { #[sea_orm(primary_key)] pub id: u64, pub user_key_id: u64, pub chain_id: u64, - pub timestamp: DateTimeUtc, pub method: String, - pub backend_requests: u32, pub error_response: bool, - pub query_millis: u32, - pub request_bytes: u32, - pub response_bytes: u32, + pub period_datetime: DateTimeUtc, + pub frontend_requests: u32, + pub backend_requests: u32, + pub backend_retries: u32, + pub cache_misses: u32, + pub cache_hits: u32, + pub sum_request_bytes: u64, + pub min_request_bytes: u64, + pub mean_request_bytes: f64, + pub p50_request_bytes: u64, + pub p90_request_bytes: u64, + pub p99_request_bytes: u64, + pub max_request_bytes: u64, + pub sum_response_millis: u64, + pub min_response_millis: u64, + pub mean_response_millis: f64, + pub p50_response_millis: u64, + pub p90_response_millis: u64, + pub p99_response_millis: u64, + pub max_response_millis: u64, + pub sum_response_bytes: u64, + pub min_response_bytes: u64, + pub mean_response_bytes: f64, + pub p50_response_bytes: u64, + pub p90_response_bytes: u64, + pub p99_response_bytes: u64, + pub max_response_bytes: u64, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] diff --git a/migration/src/m20221007_213828_accounting.rs b/migration/src/m20221007_213828_accounting.rs index 72ff2912..df90811f 100644 --- a/migration/src/m20221007_213828_accounting.rs +++ b/migration/src/m20221007_213828_accounting.rs @@ -28,53 +28,154 @@ impl MigrationTrait for Migration { .big_unsigned() .not_null(), ) - .col( - ColumnDef::new(RpcAccounting::Timestamp) - .timestamp() - .not_null(), - ) .col(ColumnDef::new(RpcAccounting::Method).string().not_null()) - .col( - ColumnDef::new(RpcAccounting::FrontendRequests) - .unsigned() - .not_null(), - ) - .col( - // 0 means cache hit - // 1 is hopefully what most require - // but there might be more if retries were necessary - ColumnDef::new(RpcAccounting::BackendRequests) - .unsigned() - .not_null(), - ) .col( ColumnDef::new(RpcAccounting::ErrorResponse) .boolean() .not_null(), ) .col( - ColumnDef::new(RpcAccounting::QueryMillis) - .unsigned() + ColumnDef::new(RpcAccounting::PeriodDatetime) + .timestamp() .not_null(), ) .col( - ColumnDef::new(RpcAccounting::RequestBytes) - .unsigned() + ColumnDef::new(RpcAccounting::FrontendRequests) + .big_unsigned() .not_null(), ) .col( - ColumnDef::new(RpcAccounting::ResponseBytes) - .unsigned() + ColumnDef::new(RpcAccounting::BackendRequests) + .big_unsigned() + .not_null(), + ) + .col( + ColumnDef::new(RpcAccounting::BackendRetries) + .big_unsigned() + .not_null(), + ) + .col( + ColumnDef::new(RpcAccounting::CacheMisses) + .big_unsigned() + .not_null(), + ) + .col( + ColumnDef::new(RpcAccounting::CacheHits) + .big_unsigned() + .not_null(), + ) + .col( + ColumnDef::new(RpcAccounting::SumRequestBytes) + .big_unsigned() + .not_null(), + ) + .col( + ColumnDef::new(RpcAccounting::MinRequestBytes) + .big_unsigned() + .not_null(), + ) + .col( + ColumnDef::new(RpcAccounting::MeanRequestBytes) + .float_len(64) + .not_null(), + ) + .col( + ColumnDef::new(RpcAccounting::P50RequestBytes) + .big_unsigned() + .not_null(), + ) + .col( + ColumnDef::new(RpcAccounting::P90RequestBytes) + .big_unsigned() + .not_null(), + ) + .col( + ColumnDef::new(RpcAccounting::P99RequestBytes) + .big_unsigned() + .not_null(), + ) + .col( + ColumnDef::new(RpcAccounting::MaxRequestBytes) + .big_unsigned() + .not_null(), + ) + .col( + ColumnDef::new(RpcAccounting::SumResponseMillis) + .big_unsigned() + .not_null(), + ) + .col( + ColumnDef::new(RpcAccounting::MinResponseMillis) + .big_unsigned() + .not_null(), + ) + .col( + ColumnDef::new(RpcAccounting::MeanResponseMillis) + .float_len(64) + .not_null(), + ) + .col( + ColumnDef::new(RpcAccounting::P50ResponseMillis) + .big_unsigned() + .not_null(), + ) + .col( + ColumnDef::new(RpcAccounting::P90ResponseMillis) + .big_unsigned() + .not_null(), + ) + .col( + ColumnDef::new(RpcAccounting::P99ResponseMillis) + .big_unsigned() + .not_null(), + ) + .col( + ColumnDef::new(RpcAccounting::MaxResponseMillis) + .big_unsigned() + .not_null(), + ) + .col( + ColumnDef::new(RpcAccounting::SumResponseBytes) + .big_unsigned() + .not_null(), + ) + .col( + ColumnDef::new(RpcAccounting::MinResponseBytes) + .big_unsigned() + .not_null(), + ) + .col( + ColumnDef::new(RpcAccounting::MeanResponseBytes) + .float_len(64) + .not_null(), + ) + .col( + ColumnDef::new(RpcAccounting::P50ResponseBytes) + .big_unsigned() + .not_null(), + ) + .col( + ColumnDef::new(RpcAccounting::P90ResponseBytes) + .big_unsigned() + .not_null(), + ) + .col( + ColumnDef::new(RpcAccounting::P99ResponseBytes) + .big_unsigned() + .not_null(), + ) + .col( + ColumnDef::new(RpcAccounting::MaxResponseBytes) + .big_unsigned() .not_null(), ) - .index(sea_query::Index::create().col(RpcAccounting::Timestamp)) - .index(sea_query::Index::create().col(RpcAccounting::Method)) - .index(sea_query::Index::create().col(RpcAccounting::BackendRequests)) .foreign_key( sea_query::ForeignKey::create() .from(RpcAccounting::Table, RpcAccounting::UserKeyId) .to(UserKeys::Table, UserKeys::Id), ) + .index(sea_query::Index::create().col(RpcAccounting::PeriodDatetime)) + .index(sea_query::Index::create().col(RpcAccounting::Method)) .to_owned(), ) .await @@ -98,14 +199,35 @@ pub enum UserKeys { enum RpcAccounting { Table, Id, - Timestamp, UserKeyId, ChainId, Method, + ErrorResponse, + PeriodDatetime, FrontendRequests, BackendRequests, - ErrorResponse, - QueryMillis, - RequestBytes, - ResponseBytes, + BackendRetries, + CacheMisses, + CacheHits, + SumRequestBytes, + MinRequestBytes, + MeanRequestBytes, + P50RequestBytes, + P90RequestBytes, + P99RequestBytes, + MaxRequestBytes, + SumResponseMillis, + MinResponseMillis, + MeanResponseMillis, + P50ResponseMillis, + P90ResponseMillis, + P99ResponseMillis, + MaxResponseMillis, + SumResponseBytes, + MinResponseBytes, + MeanResponseBytes, + P50ResponseBytes, + P90ResponseBytes, + P99ResponseBytes, + MaxResponseBytes, } diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index b13c17e7..1d2a89a5 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -35,6 +35,7 @@ fdlimit = "0.2.1" flume = "0.10.14" futures = { version = "0.3.24", features = ["thread-pool"] } hashbrown = { version = "0.12.3", features = ["serde"] } +hdrhistogram = "7.5.2" http = "0.2.8" ipnet = "2.5.0" metered = { version = "0.9.0", features = ["serialize"] } diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index c9f25177..aab929d3 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -3,7 +3,7 @@ use crate::app::{UserKeyData, Web3ProxyApp}; use crate::jsonrpc::JsonRpcRequest; use anyhow::Context; use axum::headers::{authorization::Bearer, Origin, Referer, UserAgent}; -use chrono::{Utc}; +use chrono::Utc; use deferred_rate_limiter::DeferredRateLimitResult; use entities::user_keys; use ipnet::IpNet; @@ -13,7 +13,7 @@ use sea_orm::{prelude::Decimal, ColumnTrait, DatabaseConnection, EntityTrait, Qu use serde::Serialize; use std::fmt::Display; use std::mem::size_of_val; -use std::sync::atomic::{AtomicBool, AtomicU16, AtomicU32, AtomicUsize}; +use std::sync::atomic::{AtomicBool, AtomicU16, AtomicU64}; use std::{net::IpAddr, str::FromStr, sync::Arc}; use tokio::sync::{OwnedSemaphorePermit, Semaphore}; use tokio::time::Instant; @@ -55,11 +55,11 @@ pub struct AuthorizedKey { #[derive(Debug, Default, Serialize)] pub struct RequestMetadata { pub timestamp: u64, - pub request_bytes: AtomicUsize, + pub request_bytes: AtomicU64, pub backend_requests: AtomicU16, pub error_response: AtomicBool, - pub response_bytes: AtomicUsize, - pub response_millis: AtomicU32, + pub response_bytes: AtomicU64, + pub response_millis: AtomicU64, } #[derive(Clone, Debug, Serialize)] @@ -74,7 +74,7 @@ pub enum AuthorizedRequest { impl RequestMetadata { pub fn new(request: &JsonRpcRequest) -> Self { - let request_bytes = size_of_val(request); + let request_bytes = size_of_val(request) as u64; Self { request_bytes: request_bytes.into(), diff --git a/web3_proxy/src/stats.rs b/web3_proxy/src/stats.rs index 8ae7bac5..1ad70c68 100644 --- a/web3_proxy/src/stats.rs +++ b/web3_proxy/src/stats.rs @@ -3,14 +3,13 @@ use anyhow::Context; use chrono::{TimeZone, Utc}; use derive_more::From; use entities::rpc_accounting; +use hdrhistogram::Histogram; use moka::future::{Cache, CacheBuilder}; use sea_orm::{ActiveModelTrait, DatabaseConnection}; -use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; -use std::{ - sync::atomic::{AtomicU32, AtomicU64}, - time::Duration, -}; +use std::{sync::atomic::AtomicU32, time::Duration}; +use tokio::sync::Mutex as AsyncMutex; use tokio::task::JoinHandle; use tracing::{error, info, trace}; @@ -23,31 +22,45 @@ pub struct ProxyResponseStat { metadata: RequestMetadata, } +pub type TimeBucketTimestamp = u64; + // TODO: impl From for our database model -#[derive(Default)] pub struct ProxyResponseAggregate { + // these are the key // user_key_id: u64, // method: String, // error_response: bool, - first_timestamp: u64, + // TODO: this is the grandparent key. get it from there somehow + period_timestamp: u64, frontend_requests: AtomicU32, backend_requests: AtomicU32, - last_timestamp: AtomicU64, - first_response_millis: u32, - sum_response_millis: AtomicU32, - sum_request_bytes: AtomicUsize, - sum_response_bytes: AtomicUsize, + backend_retries: AtomicU32, + cache_misses: AtomicU32, + cache_hits: AtomicU32, + request_bytes: AsyncMutex>, + sum_request_bytes: AtomicU64, + response_bytes: AsyncMutex>, + sum_response_bytes: AtomicU64, + response_millis: AsyncMutex>, + sum_response_millis: AtomicU64, +} + +#[derive(Clone, Debug, From, Hash, PartialEq, Eq, PartialOrd, Ord)] +pub struct UserProxyResponseKey { + user_key_id: u64, + method: String, + error_response: bool, } /// key is the (user_key_id, method, error_response) pub type UserProxyResponseCache = Cache< - (u64, String, bool), + UserProxyResponseKey, Arc, hashbrown::hash_map::DefaultHashBuilder, >; -/// key is the "time bucket" (timestamp / period) +/// key is the "time bucket's timestamp" (timestamp / period * period) pub type TimeProxyResponseCache = - Cache; + Cache; pub struct StatEmitter { chain_id: u64, @@ -79,9 +92,14 @@ impl StatEmitter { pub fn new(chain_id: u64, db_conn: DatabaseConnection, period_seconds: u64) -> Arc { let (save_tx, save_rx) = flume::unbounded(); + // this needs to be long enough that there are definitely no outstanding queries + // TODO: what should the "safe" multiplier be? what if something is late? + let ttl_seconds = period_seconds * 3; + let aggregated_proxy_responses = CacheBuilder::default() - .time_to_live(Duration::from_secs(period_seconds * 3 / 2)) - .eviction_listener_with_queued_delivery_mode(move |k, v, r| { + .time_to_live(Duration::from_secs(ttl_seconds)) + .eviction_listener_with_queued_delivery_mode(move |_, v, _| { + // this function must not panic! if let Err(err) = save_tx.send(v) { error!(?err, "unable to save. sender closed!"); } @@ -108,108 +126,159 @@ impl StatEmitter { )> { let (aggregate_tx, aggregate_rx) = flume::unbounded::(); - // simple future that reads the channel and emits stats - let aggregate_f = { - let aggregated_proxy_responses = self.aggregated_proxy_responses.clone(); - let clone = self.clone(); - async move { - // TODO: select on shutdown handle so we can be sure to save every aggregate! - while let Ok(x) = aggregate_rx.recv_async().await { - trace!(?x, "aggregating stat"); - - // TODO: increment global stats (in redis? in local cache for prometheus?) - - // TODO: batch stats? spawn this? - // TODO: where can we wait on this handle? - let clone = clone.clone(); - tokio::spawn(async move { clone.aggregate_stat(x).await }); - - // no need to save manually. they save on expire - } - - // shutting down. force a save - // we do not use invalidate_all because that is done on a background thread - for (key, _) in aggregated_proxy_responses.into_iter() { - aggregated_proxy_responses.invalidate(&key).await; - } - - info!("stat aggregator exited"); - - Ok(()) - } - }; - - let save_f = { - let db_conn = self.db_conn.clone(); - let save_rx = self.save_rx.clone(); - let chain_id = self.chain_id; - async move { - while let Ok(x) = save_rx.recv_async().await { - // TODO: batch these - for (k, v) in x.into_iter() { - // TODO: try_unwrap()? - let (user_key_id, method, error_response) = k.as_ref(); - - info!(?user_key_id, ?method, ?error_response, "saving"); - - let first_timestamp = Utc.timestamp(v.first_timestamp as i64, 0); - let frontend_requests = v.frontend_requests.load(Ordering::Acquire); - let backend_requests = v.backend_requests.load(Ordering::Acquire); - let first_response_millis = v.first_response_millis; - let sum_request_bytes = v.sum_request_bytes.load(Ordering::Acquire); - let sum_response_millis = v.sum_response_millis.load(Ordering::Acquire); - let sum_response_bytes = v.sum_response_bytes.load(Ordering::Acquire); - - let stat = rpc_accounting::ActiveModel { - user_key_id: sea_orm::Set(*user_key_id), - chain_id: sea_orm::Set(chain_id), - method: sea_orm::Set(method.clone()), - error_response: sea_orm::Set(*error_response), - first_timestamp: sea_orm::Set(first_timestamp), - frontend_requests: sea_orm::Set(frontend_requests) - backend_requests: sea_orm::Set(backend_requests), - first_query_millis: sea_orm::Set(first_query_millis), - sum_request_bytes: sea_orm::Set(sum_request_bytes), - sum_response_millis: sea_orm::Set(sum_response_millis), - sum_response_bytes: sea_orm::Set(sum_response_bytes), - ..Default::default() - }; - - // TODO: if this fails, rever adding the user, too - if let Err(err) = stat - .save(&db_conn) - .await - .context("Saving rpc_accounting stat") - { - error!(?err, "unable to save aggregated stats"); - } - } - } - - info!("stat saver exited"); - - Ok(()) - } - }; - // TODO: join and flatten these handles - let aggregate_handle = tokio::spawn(aggregate_f); - let save_handle = tokio::spawn(save_f); + let aggregate_handle = tokio::spawn(self.clone().aggregate_stats_loop(aggregate_rx)); + let save_handle = { tokio::spawn(self.save_stats_loop()) }; Ok((aggregate_tx, aggregate_handle, save_handle)) } + /// simple future that reads the channel and aggregates stats in a local cache. + async fn aggregate_stats_loop( + self: Arc, + aggregate_rx: flume::Receiver, + ) -> anyhow::Result<()> { + // TODO: select on shutdown handle so we can be sure to save every aggregate! + while let Ok(x) = aggregate_rx.recv_async().await { + trace!(?x, "aggregating stat"); + + // TODO: increment global stats (in redis? in local cache for prometheus?) + + // TODO: batch stats? spawn this? + // TODO: where can we wait on this handle? + let clone = self.clone(); + tokio::spawn(async move { clone.aggregate_stat(x).await }); + + // no need to save manually. they save on expire + } + + // shutting down. force a save + // we do not use invalidate_all because that is done on a background thread + for (key, _) in self.aggregated_proxy_responses.into_iter() { + self.aggregated_proxy_responses.invalidate(&key).await; + } + + info!("stat aggregator exited"); + + Ok(()) + } + + async fn save_stats_loop(self: Arc) -> anyhow::Result<()> { + while let Ok(x) = self.save_rx.recv_async().await { + // TODO: batch these + for (k, v) in x.into_iter() { + info!(?k, "saving"); + + let period_datetime = Utc.timestamp(v.period_timestamp as i64, 0); + let frontend_requests = v.frontend_requests.load(Ordering::Acquire); + let backend_requests = v.backend_requests.load(Ordering::Acquire); + let backend_retries = v.backend_retries.load(Ordering::Acquire); + let cache_misses = v.cache_misses.load(Ordering::Acquire); + let cache_hits = v.cache_hits.load(Ordering::Acquire); + + let request_bytes = v.request_bytes.lock().await; + + let sum_request_bytes = v.sum_request_bytes.load(Ordering::Acquire); + let min_request_bytes = request_bytes.min(); + let mean_request_bytes = request_bytes.mean(); + let p50_request_bytes = request_bytes.value_at_quantile(0.50); + let p90_request_bytes = request_bytes.value_at_quantile(0.90); + let p99_request_bytes = request_bytes.value_at_quantile(0.99); + let max_request_bytes = request_bytes.max(); + + drop(request_bytes); + + let response_millis = v.response_millis.lock().await; + + let sum_response_millis = v.sum_response_millis.load(Ordering::Acquire); + let min_response_millis = response_millis.min(); + let mean_response_millis = response_millis.mean(); + let p50_response_millis = response_millis.value_at_quantile(0.50); + let p90_response_millis = response_millis.value_at_quantile(0.90); + let p99_response_millis = response_millis.value_at_quantile(0.99); + let max_response_millis = response_millis.max(); + + drop(response_millis); + + let response_bytes = v.response_bytes.lock().await; + + let sum_response_bytes = v.sum_response_bytes.load(Ordering::Acquire); + let min_response_bytes = response_bytes.min(); + let mean_response_bytes = response_bytes.mean(); + let p50_response_bytes = response_bytes.value_at_quantile(0.50); + let p90_response_bytes = response_bytes.value_at_quantile(0.90); + let p99_response_bytes = response_bytes.value_at_quantile(0.99); + let max_response_bytes = response_bytes.max(); + + drop(response_bytes); + + let stat = rpc_accounting::ActiveModel { + user_key_id: sea_orm::Set(k.user_key_id), + chain_id: sea_orm::Set(self.chain_id), + method: sea_orm::Set(k.method.clone()), + error_response: sea_orm::Set(k.error_response), + period_datetime: sea_orm::Set(period_datetime), + frontend_requests: sea_orm::Set(frontend_requests), + backend_requests: sea_orm::Set(backend_requests), + backend_retries: sea_orm::Set(backend_retries), + cache_misses: sea_orm::Set(cache_misses), + cache_hits: sea_orm::Set(cache_hits), + + sum_request_bytes: sea_orm::Set(sum_request_bytes), + min_request_bytes: sea_orm::Set(min_request_bytes), + mean_request_bytes: sea_orm::Set(mean_request_bytes), + p50_request_bytes: sea_orm::Set(p50_request_bytes), + p90_request_bytes: sea_orm::Set(p90_request_bytes), + p99_request_bytes: sea_orm::Set(p99_request_bytes), + max_request_bytes: sea_orm::Set(max_request_bytes), + + sum_response_millis: sea_orm::Set(sum_response_millis), + min_response_millis: sea_orm::Set(min_response_millis), + mean_response_millis: sea_orm::Set(mean_response_millis), + p50_response_millis: sea_orm::Set(p50_response_millis), + p90_response_millis: sea_orm::Set(p90_response_millis), + p99_response_millis: sea_orm::Set(p99_response_millis), + max_response_millis: sea_orm::Set(max_response_millis), + + sum_response_bytes: sea_orm::Set(sum_response_bytes), + min_response_bytes: sea_orm::Set(min_response_bytes), + mean_response_bytes: sea_orm::Set(mean_response_bytes), + p50_response_bytes: sea_orm::Set(p50_response_bytes), + p90_response_bytes: sea_orm::Set(p90_response_bytes), + p99_response_bytes: sea_orm::Set(p99_response_bytes), + max_response_bytes: sea_orm::Set(max_response_bytes), + ..Default::default() + }; + + // TODO: if this fails, rever adding the user, too + if let Err(err) = stat + .save(&self.db_conn) + .await + .context("Saving rpc_accounting stat") + { + error!(?err, "unable to save aggregated stats"); + } + } + } + + info!("stat saver exited"); + + Ok(()) + } + pub async fn aggregate_stat(&self, stat: Web3ProxyStat) -> anyhow::Result<()> { trace!(?stat, "aggregating"); match stat { Web3ProxyStat::ProxyResponse(x) => { - // TODO: move this into another function? + // TODO: move this whole closure to another function? + // TODO: move period calculation into another function? + let period_timestamp = + x.metadata.timestamp / self.period_seconds * self.period_seconds; - // get the user cache for the current time bucket - let time_bucket = x.metadata.timestamp / self.period_seconds; + // get the user cache for the current period let user_cache = self .aggregated_proxy_responses - .get_with(time_bucket, async move { + .get_with(period_timestamp, async move { CacheBuilder::default() .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::new()) }) @@ -217,20 +286,32 @@ impl StatEmitter { let error_response = x.metadata.error_response.load(Ordering::Acquire); - let key = (x.user_key_id, x.method, error_response); - - let timestamp = x.metadata.timestamp; - let response_millis = x.metadata.response_millis.load(Ordering::Acquire); + let key = (x.user_key_id, x.method, error_response).into(); let user_aggregate = user_cache .get_with(key, async move { - let last_timestamp = timestamp.into(); - let aggregate = ProxyResponseAggregate { - first_timestamp: timestamp, - first_response_millis: response_millis, - last_timestamp, - ..Default::default() + period_timestamp, + // start most things at 0 because we add outside this getter + frontend_requests: 0.into(), + backend_requests: 0.into(), + backend_retries: 0.into(), + cache_misses: 0.into(), + cache_hits: 0.into(), + // TODO: how many significant figures? + request_bytes: AsyncMutex::new( + Histogram::new(5).expect("creating request_bytes histogram"), + ), + sum_request_bytes: 0.into(), + response_bytes: AsyncMutex::new( + Histogram::new(5).expect("creating response_bytes histogram"), + ), + sum_response_bytes: 0.into(), + // TODO: new_with_max here? + response_millis: AsyncMutex::new( + Histogram::new(5).expect("creating response_millis histogram"), + ), + sum_response_millis: 0.into(), }; Arc::new(aggregate) @@ -246,22 +327,43 @@ impl StatEmitter { .fetch_add(1, Ordering::Acquire); let request_bytes = x.metadata.request_bytes.load(Ordering::Acquire); + + let mut request_bytes_histogram = user_aggregate.request_bytes.lock().await; + + // TODO: record_correct? + request_bytes_histogram.record(request_bytes)?; + + drop(request_bytes_histogram); + user_aggregate .sum_request_bytes .fetch_add(request_bytes, Ordering::Release); let response_bytes = x.metadata.response_bytes.load(Ordering::Acquire); + + let mut response_bytes_histogram = user_aggregate.response_bytes.lock().await; + + // TODO: record_correct? + response_bytes_histogram.record(response_bytes)?; + + drop(response_bytes_histogram); + user_aggregate .sum_response_bytes .fetch_add(response_bytes, Ordering::Release); + let response_millis = x.metadata.response_millis.load(Ordering::Acquire); + + let mut response_millis_histogram = user_aggregate.response_millis.lock().await; + + // TODO: record_correct? + response_millis_histogram.record(response_millis)?; + + drop(response_millis_histogram); + user_aggregate .sum_response_millis .fetch_add(response_millis, Ordering::Release); - - user_aggregate - .last_timestamp - .fetch_max(x.metadata.timestamp, Ordering::Release); } }