stat buffer count total requests and print on exit

This commit is contained in:
Bryan Stitt 2023-07-21 15:25:02 -07:00
parent f7cfd39d93
commit f15f1027cf
4 changed files with 43 additions and 27 deletions

@ -125,11 +125,15 @@ impl StatBuffer {
let mut db_save_interval = let mut db_save_interval =
interval(Duration::from_secs(self.db_save_interval_seconds as u64)); interval(Duration::from_secs(self.db_save_interval_seconds as u64));
let mut total_frontend_requests = 0;
let mut tsdb_frontend_requests = 0;
let mut db_frontend_requests = 0;
loop { loop {
tokio::select! { tokio::select! {
stat = stat_receiver.recv() => { stat = stat_receiver.recv() => {
if let Some(stat) = stat { if let Some(stat) = stat {
self._buffer_app_stat(stat).await? total_frontend_requests += self._buffer_app_stat(stat).await?
} else { } else {
break; break;
} }
@ -137,16 +141,18 @@ impl StatBuffer {
_ = db_save_interval.tick() => { _ = db_save_interval.tick() => {
// TODO: tokio spawn this! (but with a semaphore on db_save_interval) // TODO: tokio spawn this! (but with a semaphore on db_save_interval)
trace!("DB save internal tick"); trace!("DB save internal tick");
let count = self.save_relational_stats().await; let (count, new_frontend_requests) = self.save_relational_stats().await;
if count > 0 { if count > 0 {
trace!("Saved {} stats to the relational db", count); db_frontend_requests += new_frontend_requests;
trace!("Saved {} stats for {} requests to the relational db", count, new_frontend_requests);
} }
} }
_ = tsdb_save_interval.tick() => { _ = tsdb_save_interval.tick() => {
trace!("TSDB save internal tick"); trace!("TSDB save internal tick");
let count = self.save_tsdb_stats().await; let (count, new_frontend_requests) = self.save_tsdb_stats().await;
if count > 0 { if count > 0 {
trace!("Saved {} stats to the tsdb", count); tsdb_frontend_requests += new_frontend_requests;
trace!("Saved {} stats for {} requests to the tsdb", count, new_frontend_requests);
} }
} }
x = flush_receiver.recv() => { x = flush_receiver.recv() => {
@ -194,25 +200,24 @@ impl StatBuffer {
self._flush(&mut stat_receiver).await?; self._flush(&mut stat_receiver).await?;
info!("accounting and stat save loop complete"); // TODO: if these totals don't match, something is wrong!
info!(%total_frontend_requests, %tsdb_frontend_requests, %db_frontend_requests, "accounting and stat save loop complete");
Ok(()) Ok(())
} }
async fn _buffer_app_stat(&mut self, stat: AppStat) -> Web3ProxyResult<()> { async fn _buffer_app_stat(&mut self, stat: AppStat) -> Web3ProxyResult<u64> {
match stat { match stat {
AppStat::RpcQuery(request_metadata) => { AppStat::RpcQuery(request_metadata) => {
self._buffer_request_metadata(request_metadata).await?; self._buffer_request_metadata(request_metadata).await
} }
} }
Ok(())
} }
async fn _buffer_request_metadata( async fn _buffer_request_metadata(
&mut self, &mut self,
request_metadata: RequestMetadata, request_metadata: RequestMetadata,
) -> Web3ProxyResult<()> { ) -> Web3ProxyResult<u64> {
// we convert on this side of the channel so that we don't slow down the request // we convert on this side of the channel so that we don't slow down the request
let stat = RpcQueryStats::try_from_metadata(request_metadata)?; let stat = RpcQueryStats::try_from_metadata(request_metadata)?;
@ -320,7 +325,7 @@ impl StatBuffer {
.await; .await;
} }
Ok(()) Ok(1)
} }
async fn _flush( async fn _flush(
@ -335,8 +340,9 @@ impl StatBuffer {
} }
// flush the buffers // flush the buffers
let tsdb_count = self.save_tsdb_stats().await; // TODO: include frontend counts here
let relational_count = self.save_relational_stats().await; let (tsdb_count, _) = self.save_tsdb_stats().await;
let (relational_count, _) = self.save_relational_stats().await;
// notify // notify
let flushed_stats = FlushedStats { let flushed_stats = FlushedStats {
@ -349,12 +355,15 @@ impl StatBuffer {
Ok(flushed_stats) Ok(flushed_stats)
} }
async fn save_relational_stats(&mut self) -> usize { async fn save_relational_stats(&mut self) -> (usize, u64) {
let mut count = 0; let mut count = 0;
let mut frontend_requests = 0;
if let Ok(db_conn) = global_db_conn().await { if let Ok(db_conn) = global_db_conn().await {
count = self.accounting_db_buffer.len(); count = self.accounting_db_buffer.len();
for (key, stat) in self.accounting_db_buffer.drain() { for (key, stat) in self.accounting_db_buffer.drain() {
let new_frontend_requests = stat.frontend_requests;
// TODO: batch saves // TODO: batch saves
// TODO: i don't like passing key (which came from the stat) to the function on the stat. but it works for now // TODO: i don't like passing key (which came from the stat) to the function on the stat. but it works for now
if let Err(err) = stat if let Err(err) = stat
@ -368,17 +377,20 @@ impl StatBuffer {
.await .await
{ {
// TODO: save the stat and retry later! // TODO: save the stat and retry later!
error!(?err, "unable to save accounting entry!"); error!(?err, %count, %new_frontend_requests, "unable to save accounting entry!");
} else {
frontend_requests += new_frontend_requests;
}; };
} }
} }
count (count, frontend_requests)
} }
// TODO: bucket should be an enum so that we don't risk typos // TODO: bucket should be an enum so that we don't risk typos
async fn save_tsdb_stats(&mut self) -> usize { async fn save_tsdb_stats(&mut self) -> (usize, u64) {
let mut count = 0; let mut count = 0;
let mut frontend_requests = 0;
if let Some(influxdb_client) = self.influxdb_client.as_ref() { if let Some(influxdb_client) = self.influxdb_client.as_ref() {
let influxdb_bucket = self let influxdb_bucket = self
@ -391,12 +403,15 @@ impl StatBuffer {
for (key, stat) in self.global_timeseries_buffer.drain() { for (key, stat) in self.global_timeseries_buffer.drain() {
// TODO: i don't like passing key (which came from the stat) to the function on the stat. but it works for now // TODO: i don't like passing key (which came from the stat) to the function on the stat. but it works for now
let new_frontend_requests = stat.frontend_requests;
match stat match stat
.build_timeseries_point("global_proxy", self.chain_id, key, &self.instance_hash) .build_timeseries_point("global_proxy", self.chain_id, key, &self.instance_hash)
.await .await
{ {
Ok(point) => { Ok(point) => {
points.push(point); points.push(point);
frontend_requests += new_frontend_requests;
} }
Err(err) => { Err(err) => {
// TODO: what can cause this? // TODO: what can cause this?
@ -456,6 +471,6 @@ impl StatBuffer {
} }
} }
count (count, frontend_requests)
} }
} }

@ -164,7 +164,7 @@ impl TestMysql {
} }
pub async fn conn(&self) -> DatabaseConnection { pub async fn conn(&self) -> DatabaseConnection {
connect_db(self.url.clone().unwrap(), 1, 5).await.unwrap() connect_db(self.url.clone().unwrap(), 1, 99).await.unwrap()
} }
} }

@ -121,12 +121,12 @@ async fn test_multiple_proxies_stats_add_up() {
let flush_0_count_0 = x_0.flush_stats().await.unwrap(); let flush_0_count_0 = x_0.flush_stats().await.unwrap();
let flush_1_count_0 = x_1.flush_stats().await.unwrap(); let flush_1_count_0 = x_1.flush_stats().await.unwrap();
// Wait a bit
// TODO: instead of waiting a bit, make flush_stats wait until all stats are handled before returning
sleep(Duration::from_secs(5)).await;
info!("Counts 0 are: {:?}", flush_0_count_0); info!("Counts 0 are: {:?}", flush_0_count_0);
assert_eq!(flush_0_count_0.relational, 1); assert_eq!(flush_0_count_0.relational, 1);
assert_eq!(flush_0_count_0.timeseries, 2); assert_eq!(flush_0_count_0.timeseries, 2);
// Wait a bit. TODO: instead of waiting. make flush stats more robust
sleep(Duration::from_secs(5)).await;
info!("Counts 1 are: {:?}", flush_1_count_0); info!("Counts 1 are: {:?}", flush_1_count_0);
assert_eq!(flush_1_count_0.relational, 1); assert_eq!(flush_1_count_0.relational, 1);
assert_eq!(flush_1_count_0.timeseries, 2); assert_eq!(flush_1_count_0.timeseries, 2);

@ -100,14 +100,12 @@ async fn test_single_proxy_stats_add_up() {
// TODO: the test should maybe pause time so that stats definitely flush from our queries. // TODO: the test should maybe pause time so that stats definitely flush from our queries.
let flush_0_count_0 = x.flush_stats().await.unwrap(); let flush_0_count_0 = x.flush_stats().await.unwrap();
// Wait a bit
// TODO: instead of waiting a bit, make flush_stats wait until all stats are handled before returning
sleep(Duration::from_secs(5)).await;
warn!("Counts 0 are: {:?}", flush_0_count_0); warn!("Counts 0 are: {:?}", flush_0_count_0);
assert_eq!(flush_0_count_0.relational, 1); assert_eq!(flush_0_count_0.relational, 1);
assert_eq!(flush_0_count_0.timeseries, 2); assert_eq!(flush_0_count_0.timeseries, 2);
// no more stats should arrive // Wait a bit. TODO: instead of waiting. make flush stats more robust
sleep(Duration::from_secs(5)).await;
let flush_0_count_1 = x.flush_stats().await.unwrap(); let flush_0_count_1 = x.flush_stats().await.unwrap();
warn!("Counts 0 are: {:?}", flush_0_count_1); warn!("Counts 0 are: {:?}", flush_0_count_1);
assert_eq!(flush_0_count_1.relational, 0); assert_eq!(flush_0_count_1.relational, 0);
@ -233,6 +231,9 @@ async fn test_single_proxy_stats_add_up() {
// "user_get_influx_stats_detailed stats are: {:?}", // "user_get_influx_stats_detailed stats are: {:?}",
// user_get_influx_stats_detailed // user_get_influx_stats_detailed
// ); // );
// drop x before the other things so that we don't get spurious errors
drop(x);
} }
// Gotta compare stats with influx: // Gotta compare stats with influx: