From 8f3d31869f4af40230143ff0fa24bc05091c6bf1 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Tue, 11 Oct 2022 17:34:25 +0000 Subject: [PATCH] less locks and fix some stats --- migration/src/m20221007_213828_accounting.rs | 6 +- web3_proxy/src/frontend/authorization.rs | 13 +- web3_proxy/src/stats.rs | 130 +++++++++---------- 3 files changed, 74 insertions(+), 75 deletions(-) diff --git a/migration/src/m20221007_213828_accounting.rs b/migration/src/m20221007_213828_accounting.rs index df90811f..379e78e8 100644 --- a/migration/src/m20221007_213828_accounting.rs +++ b/migration/src/m20221007_213828_accounting.rs @@ -76,7 +76,7 @@ impl MigrationTrait for Migration { ) .col( ColumnDef::new(RpcAccounting::MeanRequestBytes) - .float_len(64) + .double() .not_null(), ) .col( @@ -111,7 +111,7 @@ impl MigrationTrait for Migration { ) .col( ColumnDef::new(RpcAccounting::MeanResponseMillis) - .float_len(64) + .double() .not_null(), ) .col( @@ -146,7 +146,7 @@ impl MigrationTrait for Migration { ) .col( ColumnDef::new(RpcAccounting::MeanResponseBytes) - .float_len(64) + .double() .not_null(), ) .col( diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index aab929d3..3b43cb92 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -13,7 +13,6 @@ use sea_orm::{prelude::Decimal, ColumnTrait, DatabaseConnection, EntityTrait, Qu use serde::Serialize; use std::fmt::Display; use std::mem::size_of_val; -use std::sync::atomic::{AtomicBool, AtomicU16, AtomicU64}; use std::{net::IpAddr, str::FromStr, sync::Arc}; use tokio::sync::{OwnedSemaphorePermit, Semaphore}; use tokio::time::Instant; @@ -55,11 +54,11 @@ pub struct AuthorizedKey { #[derive(Debug, Default, Serialize)] pub struct RequestMetadata { pub timestamp: u64, - pub request_bytes: AtomicU64, - pub backend_requests: AtomicU16, - pub error_response: AtomicBool, - pub response_bytes: AtomicU64, - pub response_millis: AtomicU64, + pub request_bytes: u64, + pub backend_requests: u32, + pub error_response: bool, + pub response_bytes: u64, + pub response_millis: u64, } #[derive(Clone, Debug, Serialize)] @@ -77,7 +76,7 @@ impl RequestMetadata { let request_bytes = size_of_val(request) as u64; Self { - request_bytes: request_bytes.into(), + request_bytes, timestamp: Utc::now().timestamp() as u64, ..Default::default() } diff --git a/web3_proxy/src/stats.rs b/web3_proxy/src/stats.rs index 1ad70c68..0255391b 100644 --- a/web3_proxy/src/stats.rs +++ b/web3_proxy/src/stats.rs @@ -19,11 +19,32 @@ use tracing::{error, info, trace}; pub struct ProxyResponseStat { user_key_id: u64, method: String, - metadata: RequestMetadata, + metadata: AsyncMutex, } pub type TimeBucketTimestamp = u64; +pub struct ProxyResponseHistograms { + request_bytes: Histogram, + response_bytes: Histogram, + response_millis: Histogram, +} + +impl Default for ProxyResponseHistograms { + fn default() -> Self { + // TODO: how many significant figures? + let request_bytes = Histogram::new(5).expect("creating request_bytes histogram"); + let response_bytes = Histogram::new(5).expect("creating response_bytes histogram"); + let response_millis = Histogram::new(5).expect("creating response_millis histogram"); + + Self { + request_bytes, + response_bytes, + response_millis, + } + } +} + // TODO: impl From for our database model pub struct ProxyResponseAggregate { // these are the key @@ -37,12 +58,10 @@ pub struct ProxyResponseAggregate { backend_retries: AtomicU32, cache_misses: AtomicU32, cache_hits: AtomicU32, - request_bytes: AsyncMutex>, sum_request_bytes: AtomicU64, - response_bytes: AsyncMutex>, sum_response_bytes: AtomicU64, - response_millis: AsyncMutex>, sum_response_millis: AtomicU64, + histograms: AsyncMutex, } #[derive(Clone, Debug, From, Hash, PartialEq, Eq, PartialOrd, Ord)] @@ -80,6 +99,8 @@ pub enum Web3ProxyStat { impl ProxyResponseStat { // TODO: should RequestMetadata be in an arc? or can we handle refs here? pub fn new(method: String, authorized_key: AuthorizedKey, metadata: RequestMetadata) -> Self { + let metadata = AsyncMutex::new(metadata); + Self { user_key_id: authorized_key.user_key_id, method, @@ -175,10 +196,14 @@ impl StatEmitter { let backend_retries = v.backend_retries.load(Ordering::Acquire); let cache_misses = v.cache_misses.load(Ordering::Acquire); let cache_hits = v.cache_hits.load(Ordering::Acquire); - - let request_bytes = v.request_bytes.lock().await; - let sum_request_bytes = v.sum_request_bytes.load(Ordering::Acquire); + let sum_response_millis = v.sum_response_millis.load(Ordering::Acquire); + let sum_response_bytes = v.sum_response_bytes.load(Ordering::Acquire); + + let histograms = v.histograms.lock().await; + + let request_bytes = &histograms.request_bytes; + let min_request_bytes = request_bytes.min(); let mean_request_bytes = request_bytes.mean(); let p50_request_bytes = request_bytes.value_at_quantile(0.50); @@ -186,11 +211,8 @@ impl StatEmitter { let p99_request_bytes = request_bytes.value_at_quantile(0.99); let max_request_bytes = request_bytes.max(); - drop(request_bytes); + let response_millis = &histograms.response_millis; - let response_millis = v.response_millis.lock().await; - - let sum_response_millis = v.sum_response_millis.load(Ordering::Acquire); let min_response_millis = response_millis.min(); let mean_response_millis = response_millis.mean(); let p50_response_millis = response_millis.value_at_quantile(0.50); @@ -198,11 +220,8 @@ impl StatEmitter { let p99_response_millis = response_millis.value_at_quantile(0.99); let max_response_millis = response_millis.max(); - drop(response_millis); + let response_bytes = &histograms.response_bytes; - let response_bytes = v.response_bytes.lock().await; - - let sum_response_bytes = v.sum_response_bytes.load(Ordering::Acquire); let min_response_bytes = response_bytes.min(); let mean_response_bytes = response_bytes.mean(); let p50_response_bytes = response_bytes.value_at_quantile(0.50); @@ -210,9 +229,11 @@ impl StatEmitter { let p99_response_bytes = response_bytes.value_at_quantile(0.99); let max_response_bytes = response_bytes.max(); - drop(response_bytes); + drop(histograms); let stat = rpc_accounting::ActiveModel { + id: sea_orm::NotSet, + user_key_id: sea_orm::Set(k.user_key_id), chain_id: sea_orm::Set(self.chain_id), method: sea_orm::Set(k.method.clone()), @@ -247,7 +268,6 @@ impl StatEmitter { p90_response_bytes: sea_orm::Set(p90_response_bytes), p99_response_bytes: sea_orm::Set(p99_response_bytes), max_response_bytes: sea_orm::Set(max_response_bytes), - ..Default::default() }; // TODO: if this fails, rever adding the user, too @@ -271,9 +291,11 @@ impl StatEmitter { match stat { Web3ProxyStat::ProxyResponse(x) => { // TODO: move this whole closure to another function? + let metadata = x.metadata.lock().await; + // TODO: move period calculation into another function? let period_timestamp = - x.metadata.timestamp / self.period_seconds * self.period_seconds; + metadata.timestamp / self.period_seconds * self.period_seconds; // get the user cache for the current period let user_cache = self @@ -284,12 +306,12 @@ impl StatEmitter { }) .await; - let error_response = x.metadata.error_response.load(Ordering::Acquire); - - let key = (x.user_key_id, x.method, error_response).into(); + let key = (x.user_key_id, x.method, metadata.error_response).into(); let user_aggregate = user_cache .get_with(key, async move { + let histograms = ProxyResponseHistograms::default(); + let aggregate = ProxyResponseAggregate { period_timestamp, // start most things at 0 because we add outside this getter @@ -298,72 +320,50 @@ impl StatEmitter { backend_retries: 0.into(), cache_misses: 0.into(), cache_hits: 0.into(), - // TODO: how many significant figures? - request_bytes: AsyncMutex::new( - Histogram::new(5).expect("creating request_bytes histogram"), - ), sum_request_bytes: 0.into(), - response_bytes: AsyncMutex::new( - Histogram::new(5).expect("creating response_bytes histogram"), - ), sum_response_bytes: 0.into(), - // TODO: new_with_max here? - response_millis: AsyncMutex::new( - Histogram::new(5).expect("creating response_millis histogram"), - ), sum_response_millis: 0.into(), + histograms: AsyncMutex::new(histograms), }; Arc::new(aggregate) }) .await; - user_aggregate - .backend_requests - .fetch_add(1, Ordering::Acquire); - + // a stat always come from just 1 frontend request user_aggregate .frontend_requests .fetch_add(1, Ordering::Acquire); - let request_bytes = x.metadata.request_bytes.load(Ordering::Acquire); - - let mut request_bytes_histogram = user_aggregate.request_bytes.lock().await; - - // TODO: record_correct? - request_bytes_histogram.record(request_bytes)?; - - drop(request_bytes_histogram); + // a stat might have multiple backend requests + user_aggregate + .backend_requests + .fetch_add(metadata.backend_requests, Ordering::Acquire); user_aggregate .sum_request_bytes - .fetch_add(request_bytes, Ordering::Release); - - let response_bytes = x.metadata.response_bytes.load(Ordering::Acquire); - - let mut response_bytes_histogram = user_aggregate.response_bytes.lock().await; - - // TODO: record_correct? - response_bytes_histogram.record(response_bytes)?; - - drop(response_bytes_histogram); + .fetch_add(metadata.request_bytes, Ordering::Release); user_aggregate .sum_response_bytes - .fetch_add(response_bytes, Ordering::Release); - - let response_millis = x.metadata.response_millis.load(Ordering::Acquire); - - let mut response_millis_histogram = user_aggregate.response_millis.lock().await; - - // TODO: record_correct? - response_millis_histogram.record(response_millis)?; - - drop(response_millis_histogram); + .fetch_add(metadata.response_bytes, Ordering::Release); user_aggregate .sum_response_millis - .fetch_add(response_millis, Ordering::Release); + .fetch_add(metadata.response_millis, Ordering::Release); + + { + let mut histograms = user_aggregate.histograms.lock().await; + + // TODO: record_correct? + histograms.request_bytes.record(metadata.request_bytes)?; + + histograms.response_bytes.record(metadata.response_bytes)?; + + histograms + .response_millis + .record(metadata.response_millis)?; + } } }