From 90ffb5254a8fbe90192cfb5e1733c588246eca34 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Wed, 19 Jul 2023 18:41:40 -0700 Subject: [PATCH] flush all stats --- web3_proxy/Cargo.toml | 2 +- web3_proxy/src/stats/stat_buffer.rs | 200 ++++++++++++++---------- web3_proxy/tests/test_multiple_proxy.rs | 37 +++-- 3 files changed, 143 insertions(+), 96 deletions(-) diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index 1cfacb43..e781c2aa 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -92,7 +92,7 @@ tokio = { version = "1.29.1", features = ["full", "tracing"] } tokio-console = { version = "0.1.9", optional = true } tokio-stream = { version = "0.1.14", features = ["sync"] } toml = "0.7.6" -tower = { version = "0.4.13", features = ["tracing"] } +tower = { version = "0.4.13", features = ["timeout", "tracing"] } tower-http = { version = "0.4.1", features = ["cors", "sensitive-headers", "trace"] } tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/web3_proxy/src/stats/stat_buffer.rs b/web3_proxy/src/stats/stat_buffer.rs index 5af9473b..d106e196 100644 --- a/web3_proxy/src/stats/stat_buffer.rs +++ b/web3_proxy/src/stats/stat_buffer.rs @@ -2,6 +2,7 @@ use super::{AppStat, FlushedStats, RpcQueryKey}; use crate::app::Web3ProxyJoinHandle; use crate::caches::{RpcSecretKeyCache, UserBalanceCache}; use crate::errors::Web3ProxyResult; +use crate::frontend::authorization::RequestMetadata; use crate::globals::global_db_conn; use crate::stats::RpcQueryStats; use derive_more::From; @@ -124,85 +125,10 @@ impl StatBuffer { loop { tokio::select! { stat = stat_receiver.recv() => { - // trace!("Received stat"); - // save the stat to a buffer - - // TODO: tokio spawn this! - match stat { - Some(AppStat::RpcQuery(request_metadata)) => { - // we convert on this side of the channel so that we don't slow down the request - let stat = RpcQueryStats::try_from_metadata(request_metadata)?; - - // update the latest balance - // do this BEFORE emitting any stats - let mut approximate_balance_remaining = 0.into(); - - // TODO: re-enable this once I know its not the cause of Polygon W3P crashing all the time - // TODO: we want to do this even if the db is down. we need to buffer if there is an outage! - if let Ok(db_conn) = global_db_conn().await { - let user_id = stat.authorization.checks.user_id; - - // update the user's balance - if user_id != 0 { - // update the user's cached balance - let mut user_balance = stat.authorization.checks.latest_balance.write().await; - - // TODO: move this to a helper function - user_balance.total_frontend_requests += 1; - user_balance.total_spent += stat.compute_unit_cost; - - if !stat.backend_rpcs_used.is_empty() { - user_balance.total_cache_misses += 1; - } - - // if paid_credits_used is true, then they were premium at the start of the request - if stat.authorization.checks.paid_credits_used { - // TODO: this lets them get a negative remaining balance. we should clear if close to 0 - user_balance.total_spent_paid_credits += stat.compute_unit_cost; - - // check if they still have premium - if user_balance.active_premium() { - // TODO: referall credits here? i think in the save_db section still makes sense for those - } else if let Err(err) = self.user_balance_cache.invalidate(&user_balance.user_id, &db_conn, &self.rpc_secret_key_cache).await { - // was premium, but isn't anymore due to paying for this query. clear the cache - // TODO: stop at <$0.000001 instead of negative? - warn!(?err, "unable to clear caches"); - } - } else if user_balance.active_premium() { - // paid credits were not used, but now we have active premium. invalidate the caches - // TODO: this seems unliekly. should we warn if this happens so we can investigate? - if let Err(err) = self.user_balance_cache.invalidate(&user_balance.user_id, &db_conn, &self.rpc_secret_key_cache).await { - // was premium, but isn't anymore due to paying for this query. clear the cache - // TODO: stop at <$0.000001 instead of negative? - warn!(?err, "unable to clear caches"); - } - } - - approximate_balance_remaining = user_balance.remaining(); - } - - let accounting_key = stat.accounting_key(self.billing_period_seconds); - if accounting_key.is_registered() { - self.accounting_db_buffer.entry(accounting_key).or_default().add(stat.clone(), approximate_balance_remaining).await; - } - } - - if self.influxdb_client.is_some() { - // TODO: round the timestamp at all? - - if let Some(opt_in_timeseries_key) = stat.owned_timeseries_key() { - self.opt_in_timeseries_buffer.entry(opt_in_timeseries_key).or_default().add(stat.clone(), approximate_balance_remaining).await; - } - - let global_timeseries_key = stat.global_timeseries_key(); - - self.global_timeseries_buffer.entry(global_timeseries_key).or_default().add(stat, approximate_balance_remaining).await; - } - } - None => { - info!("error receiving stat"); - break; - } + if let Some(stat) = stat { + self._buffer_app_stat(stat).await? + } else { + break; } } _ = db_save_interval.tick() => { @@ -225,14 +151,18 @@ impl StatBuffer { Some(x) => { trace!("flush"); - let tsdb_count = self.save_tsdb_stats().await; + // fill the buffer + while let Ok(stat) = stat_receiver.try_recv() { + self._buffer_app_stat(stat).await?; + } + // flush the buffers + let tsdb_count = self.save_tsdb_stats().await; let relational_count = self.save_relational_stats().await; + // notify let flushed_stats = FlushedStats{ timeseries: tsdb_count, relational: relational_count}; - trace!(?flushed_stats); - if let Err(err) = x.send(flushed_stats) { error!(?flushed_stats, ?err, "unable to notify about flushed stats"); } @@ -284,6 +214,112 @@ impl StatBuffer { Ok(()) } + async fn _buffer_app_stat(&mut self, stat: AppStat) -> Web3ProxyResult<()> { + match stat { + AppStat::RpcQuery(request_metadata) => { + self._buffer_request_metadata(request_metadata).await?; + } + } + + Ok(()) + } + + async fn _buffer_request_metadata( + &mut self, + request_metadata: RequestMetadata, + ) -> Web3ProxyResult<()> { + // we convert on this side of the channel so that we don't slow down the request + let stat = RpcQueryStats::try_from_metadata(request_metadata)?; + + // update the latest balance + // do this BEFORE emitting any stats + let mut approximate_balance_remaining = 0.into(); + + // TODO: re-enable this once I know its not the cause of Polygon W3P crashing all the time + // TODO: we want to do this even if the db is down. we need to buffer if there is an outage! + if let Ok(db_conn) = global_db_conn().await { + let user_id = stat.authorization.checks.user_id; + + // update the user's balance + if user_id != 0 { + // update the user's cached balance + let mut user_balance = stat.authorization.checks.latest_balance.write().await; + + // TODO: move this to a helper function + user_balance.total_frontend_requests += 1; + user_balance.total_spent += stat.compute_unit_cost; + + if !stat.backend_rpcs_used.is_empty() { + user_balance.total_cache_misses += 1; + } + + // if paid_credits_used is true, then they were premium at the start of the request + if stat.authorization.checks.paid_credits_used { + // TODO: this lets them get a negative remaining balance. we should clear if close to 0 + user_balance.total_spent_paid_credits += stat.compute_unit_cost; + + // check if they still have premium + if user_balance.active_premium() { + // TODO: referall credits here? i think in the save_db section still makes sense for those + } else if let Err(err) = self + .user_balance_cache + .invalidate(&user_balance.user_id, &db_conn, &self.rpc_secret_key_cache) + .await + { + // was premium, but isn't anymore due to paying for this query. clear the cache + // TODO: stop at <$0.000001 instead of negative? + warn!(?err, "unable to clear caches"); + } + } else if user_balance.active_premium() { + // paid credits were not used, but now we have active premium. invalidate the caches + // TODO: this seems unliekly. should we warn if this happens so we can investigate? + if let Err(err) = self + .user_balance_cache + .invalidate(&user_balance.user_id, &db_conn, &self.rpc_secret_key_cache) + .await + { + // was premium, but isn't anymore due to paying for this query. clear the cache + // TODO: stop at <$0.000001 instead of negative? + warn!(?err, "unable to clear caches"); + } + } + + approximate_balance_remaining = user_balance.remaining(); + } + + let accounting_key = stat.accounting_key(self.billing_period_seconds); + if accounting_key.is_registered() { + self.accounting_db_buffer + .entry(accounting_key) + .or_default() + .add(stat.clone(), approximate_balance_remaining) + .await; + } + } + + if self.influxdb_client.is_some() { + // TODO: round the timestamp at all? + + if let Some(opt_in_timeseries_key) = stat.owned_timeseries_key() { + self.opt_in_timeseries_buffer + .entry(opt_in_timeseries_key) + .or_default() + .add(stat.clone(), approximate_balance_remaining) + .await; + } + + let global_timeseries_key = stat.global_timeseries_key(); + + self.global_timeseries_buffer + .entry(global_timeseries_key) + .or_default() + .add(stat, approximate_balance_remaining) + .await; + } + + Ok(()) + } + async fn save_relational_stats(&mut self) -> usize { let mut count = 0; diff --git a/web3_proxy/tests/test_multiple_proxy.rs b/web3_proxy/tests/test_multiple_proxy.rs index d8106ce9..a628770a 100644 --- a/web3_proxy/tests/test_multiple_proxy.rs +++ b/web3_proxy/tests/test_multiple_proxy.rs @@ -118,17 +118,28 @@ async fn test_multiple_proxies_stats_add_up() { // Flush all stats here // TODO: the test should maybe pause time so that stats definitely flush from our queries. - let flush_0_count = x_0.flush_stats().await.unwrap(); - let flush_1_count = x_1.flush_stats().await.unwrap(); + let flush_0_count_0 = x_0.flush_stats().await.unwrap(); + let flush_1_count_0 = x_1.flush_stats().await.unwrap(); // Wait a bit + // TODO: instead of waiting a bit, make flush_stats wait until all stats are handled before returning sleep(Duration::from_secs(5)).await; - info!("Counts 0 are: {:?}", flush_0_count); - assert_eq!(flush_0_count.relational, 1); - assert_eq!(flush_0_count.timeseries, 2); - info!("Counts 1 are: {:?}", flush_1_count); - assert_eq!(flush_1_count.relational, 1); - assert_eq!(flush_1_count.timeseries, 2); + info!("Counts 0 are: {:?}", flush_0_count_0); + assert_eq!(flush_0_count_0.relational, 1); + assert_eq!(flush_0_count_0.timeseries, 2); + info!("Counts 1 are: {:?}", flush_1_count_0); + assert_eq!(flush_1_count_0.relational, 1); + assert_eq!(flush_1_count_0.timeseries, 2); + + // // no more stats should arrive + let flush_0_count_1 = x_0.flush_stats().await.unwrap(); + let flush_1_count_1 = x_1.flush_stats().await.unwrap(); + info!("Counts 0 are: {:?}", flush_0_count_1); + assert_eq!(flush_0_count_1.relational, 0); + assert_eq!(flush_0_count_1.timeseries, 0); + info!("Counts 1 are: {:?}", flush_1_count_0); + assert_eq!(flush_1_count_1.relational, 0); + assert_eq!(flush_1_count_1.timeseries, 0); // get stats now // todo!("Need to validate all the stat accounting now"); @@ -145,18 +156,18 @@ async fn test_multiple_proxies_stats_add_up() { // Get the balance let user_0_balance_post = user_get_balance(&x_0, &r, &user_0_login).await; + let influx_stats = influx_aggregate_stats["result"].get(0).unwrap(); let mysql_stats = mysql_stats["stats"].get(0).unwrap(); - assert_eq!( - user_0_balance_post.total_frontend_requests, - number_requests * 3 - ); - info!("Influx and mysql stats are"); info!(?influx_stats); info!(?mysql_stats); + assert_eq!( + user_0_balance_post.total_frontend_requests, + number_requests * 3 + ); assert_eq!( mysql_stats["error_response"], influx_stats["error_response"]