From 94f205900ae6b9af7adb781c257babfa46f2c2f8 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Thu, 20 Oct 2022 23:50:06 +0000 Subject: [PATCH] missing loop --- web3_proxy/src/app_stats.rs | 46 +++++++++++++++++++------------------ 1 file changed, 24 insertions(+), 22 deletions(-) diff --git a/web3_proxy/src/app_stats.rs b/web3_proxy/src/app_stats.rs index e4570a2d..c1e700c5 100644 --- a/web3_proxy/src/app_stats.rs +++ b/web3_proxy/src/app_stats.rs @@ -121,12 +121,12 @@ impl ProxyResponseStat { let backend_requests = metadata.backend_requests.load(Ordering::Acquire); let period_seconds = metadata.period_seconds; let period_timestamp = - (metadata.datetime.timestamp() as u64) / period_seconds * period_seconds; + (metadata.start_datetime.timestamp() as u64) / period_seconds * period_seconds; let request_bytes = metadata.request_bytes; let error_response = metadata.error_response.load(Ordering::Acquire); - let response_millis = - (Utc::now().timestamp_millis() - metadata.datetime.timestamp_millis()) as u64; + // TODO: timestamps could get confused by leap seconds. need tokio time instead + let response_millis = metadata.start_instant.elapsed().as_millis() as u64; Self { user_key_id: authorized_key.user_key_id, @@ -197,29 +197,31 @@ impl StatEmitter { aggregate_rx: flume::Receiver, mut shutdown_receiver: broadcast::Receiver<()>, ) -> anyhow::Result<()> { - // TODO: select on shutdown handle so we can be sure to save every aggregate! - tokio::select! { - x = aggregate_rx.recv_async() => { - match x { - Ok(x) => { - trace!(?x, "aggregating stat"); + loop { + tokio::select! { + x = aggregate_rx.recv_async() => { + match x { + Ok(x) => { + trace!(?x, "aggregating stat"); - // TODO: increment global stats (in redis? in local cache for prometheus?) + // TODO: increment global stats (in redis? in local cache for prometheus?) - // TODO: batch stats? - // TODO: where can we wait on this handle? - let clone = self.clone(); - tokio::spawn(async move { clone.aggregate_stat(x).await }); - }, - Err(err) => { - error!(?err, "aggregate_rx"); + // TODO: batch stats? + // TODO: where can we wait on this handle? + let clone = self.clone(); + tokio::spawn(async move { clone.aggregate_stat(x).await }); + }, + Err(err) => { + error!(?err, "aggregate_rx"); + } } } - } - x = shutdown_receiver.recv() => { - match x { - Ok(_) => info!("aggregate stats loop shutting down"), - Err(err) => error!(?err, "shutdown receiver"), + x = shutdown_receiver.recv() => { + match x { + Ok(_) => info!("aggregate stats loop shutting down"), + Err(err) => error!(?err, "shutdown receiver"), + } + break; } } }