From f15f1027cff6ebe7104c21710afe6add31757ee2 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Fri, 21 Jul 2023 15:25:02 -0700 Subject: [PATCH] stat buffer count total requests and print on exit --- web3_proxy/src/stats/stat_buffer.rs | 53 ++++++++++++++++--------- web3_proxy/tests/common/mysql.rs | 2 +- web3_proxy/tests/test_multiple_proxy.rs | 6 +-- web3_proxy/tests/test_single_proxy.rs | 9 +++-- 4 files changed, 43 insertions(+), 27 deletions(-) diff --git a/web3_proxy/src/stats/stat_buffer.rs b/web3_proxy/src/stats/stat_buffer.rs index 7e384a06..805666cd 100644 --- a/web3_proxy/src/stats/stat_buffer.rs +++ b/web3_proxy/src/stats/stat_buffer.rs @@ -125,11 +125,15 @@ impl StatBuffer { let mut db_save_interval = interval(Duration::from_secs(self.db_save_interval_seconds as u64)); + let mut total_frontend_requests = 0; + let mut tsdb_frontend_requests = 0; + let mut db_frontend_requests = 0; + loop { tokio::select! { stat = stat_receiver.recv() => { if let Some(stat) = stat { - self._buffer_app_stat(stat).await? + total_frontend_requests += self._buffer_app_stat(stat).await? } else { break; } @@ -137,16 +141,18 @@ impl StatBuffer { _ = db_save_interval.tick() => { // TODO: tokio spawn this! (but with a semaphore on db_save_interval) trace!("DB save internal tick"); - let count = self.save_relational_stats().await; + let (count, new_frontend_requests) = self.save_relational_stats().await; if count > 0 { - trace!("Saved {} stats to the relational db", count); + db_frontend_requests += new_frontend_requests; + trace!("Saved {} stats for {} requests to the relational db", count, new_frontend_requests); } } _ = tsdb_save_interval.tick() => { trace!("TSDB save internal tick"); - let count = self.save_tsdb_stats().await; + let (count, new_frontend_requests) = self.save_tsdb_stats().await; if count > 0 { - trace!("Saved {} stats to the tsdb", count); + tsdb_frontend_requests += new_frontend_requests; + trace!("Saved {} stats for {} requests to the tsdb", count, new_frontend_requests); } } x = flush_receiver.recv() => { @@ -194,25 +200,24 @@ impl StatBuffer { self._flush(&mut stat_receiver).await?; - info!("accounting and stat save loop complete"); + // TODO: if these totals don't match, something is wrong! + info!(%total_frontend_requests, %tsdb_frontend_requests, %db_frontend_requests, "accounting and stat save loop complete"); Ok(()) } - async fn _buffer_app_stat(&mut self, stat: AppStat) -> Web3ProxyResult<()> { + async fn _buffer_app_stat(&mut self, stat: AppStat) -> Web3ProxyResult { match stat { AppStat::RpcQuery(request_metadata) => { - self._buffer_request_metadata(request_metadata).await?; + self._buffer_request_metadata(request_metadata).await } } - - Ok(()) } async fn _buffer_request_metadata( &mut self, request_metadata: RequestMetadata, - ) -> Web3ProxyResult<()> { + ) -> Web3ProxyResult { // we convert on this side of the channel so that we don't slow down the request let stat = RpcQueryStats::try_from_metadata(request_metadata)?; @@ -320,7 +325,7 @@ impl StatBuffer { .await; } - Ok(()) + Ok(1) } async fn _flush( @@ -335,8 +340,9 @@ impl StatBuffer { } // flush the buffers - let tsdb_count = self.save_tsdb_stats().await; - let relational_count = self.save_relational_stats().await; + // TODO: include frontend counts here + let (tsdb_count, _) = self.save_tsdb_stats().await; + let (relational_count, _) = self.save_relational_stats().await; // notify let flushed_stats = FlushedStats { @@ -349,12 +355,15 @@ impl StatBuffer { Ok(flushed_stats) } - async fn save_relational_stats(&mut self) -> usize { + async fn save_relational_stats(&mut self) -> (usize, u64) { let mut count = 0; + let mut frontend_requests = 0; if let Ok(db_conn) = global_db_conn().await { count = self.accounting_db_buffer.len(); for (key, stat) in self.accounting_db_buffer.drain() { + let new_frontend_requests = stat.frontend_requests; + // TODO: batch saves // TODO: i don't like passing key (which came from the stat) to the function on the stat. but it works for now if let Err(err) = stat @@ -368,17 +377,20 @@ impl StatBuffer { .await { // TODO: save the stat and retry later! - error!(?err, "unable to save accounting entry!"); + error!(?err, %count, %new_frontend_requests, "unable to save accounting entry!"); + } else { + frontend_requests += new_frontend_requests; }; } } - count + (count, frontend_requests) } // TODO: bucket should be an enum so that we don't risk typos - async fn save_tsdb_stats(&mut self) -> usize { + async fn save_tsdb_stats(&mut self) -> (usize, u64) { let mut count = 0; + let mut frontend_requests = 0; if let Some(influxdb_client) = self.influxdb_client.as_ref() { let influxdb_bucket = self @@ -391,12 +403,15 @@ impl StatBuffer { for (key, stat) in self.global_timeseries_buffer.drain() { // TODO: i don't like passing key (which came from the stat) to the function on the stat. but it works for now + let new_frontend_requests = stat.frontend_requests; + match stat .build_timeseries_point("global_proxy", self.chain_id, key, &self.instance_hash) .await { Ok(point) => { points.push(point); + frontend_requests += new_frontend_requests; } Err(err) => { // TODO: what can cause this? @@ -456,6 +471,6 @@ impl StatBuffer { } } - count + (count, frontend_requests) } } diff --git a/web3_proxy/tests/common/mysql.rs b/web3_proxy/tests/common/mysql.rs index ed3bc84e..fa7c744b 100644 --- a/web3_proxy/tests/common/mysql.rs +++ b/web3_proxy/tests/common/mysql.rs @@ -164,7 +164,7 @@ impl TestMysql { } pub async fn conn(&self) -> DatabaseConnection { - connect_db(self.url.clone().unwrap(), 1, 5).await.unwrap() + connect_db(self.url.clone().unwrap(), 1, 99).await.unwrap() } } diff --git a/web3_proxy/tests/test_multiple_proxy.rs b/web3_proxy/tests/test_multiple_proxy.rs index 1033fbee..93175bf4 100644 --- a/web3_proxy/tests/test_multiple_proxy.rs +++ b/web3_proxy/tests/test_multiple_proxy.rs @@ -121,12 +121,12 @@ async fn test_multiple_proxies_stats_add_up() { let flush_0_count_0 = x_0.flush_stats().await.unwrap(); let flush_1_count_0 = x_1.flush_stats().await.unwrap(); - // Wait a bit - // TODO: instead of waiting a bit, make flush_stats wait until all stats are handled before returning - sleep(Duration::from_secs(5)).await; info!("Counts 0 are: {:?}", flush_0_count_0); assert_eq!(flush_0_count_0.relational, 1); assert_eq!(flush_0_count_0.timeseries, 2); + + // Wait a bit. TODO: instead of waiting. make flush stats more robust + sleep(Duration::from_secs(5)).await; info!("Counts 1 are: {:?}", flush_1_count_0); assert_eq!(flush_1_count_0.relational, 1); assert_eq!(flush_1_count_0.timeseries, 2); diff --git a/web3_proxy/tests/test_single_proxy.rs b/web3_proxy/tests/test_single_proxy.rs index 33ed4e3b..dcabec11 100644 --- a/web3_proxy/tests/test_single_proxy.rs +++ b/web3_proxy/tests/test_single_proxy.rs @@ -100,14 +100,12 @@ async fn test_single_proxy_stats_add_up() { // TODO: the test should maybe pause time so that stats definitely flush from our queries. let flush_0_count_0 = x.flush_stats().await.unwrap(); - // Wait a bit - // TODO: instead of waiting a bit, make flush_stats wait until all stats are handled before returning - sleep(Duration::from_secs(5)).await; warn!("Counts 0 are: {:?}", flush_0_count_0); assert_eq!(flush_0_count_0.relational, 1); assert_eq!(flush_0_count_0.timeseries, 2); - // no more stats should arrive + // Wait a bit. TODO: instead of waiting. make flush stats more robust + sleep(Duration::from_secs(5)).await; let flush_0_count_1 = x.flush_stats().await.unwrap(); warn!("Counts 0 are: {:?}", flush_0_count_1); assert_eq!(flush_0_count_1.relational, 0); @@ -233,6 +231,9 @@ async fn test_single_proxy_stats_add_up() { // "user_get_influx_stats_detailed stats are: {:?}", // user_get_influx_stats_detailed // ); + + // drop x before the other things so that we don't get spurious errors + drop(x); } // Gotta compare stats with influx: