From 0c990b07553b16aa776670bc0c55d1e3cf4e5ae7 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Fri, 12 May 2023 16:02:43 -0700 Subject: [PATCH] use a cancel-safe channel for stats --- web3_proxy/src/app/mod.rs | 4 ++-- web3_proxy/src/frontend/authorization.rs | 12 +++++------- web3_proxy/src/stats/stat_buffer.rs | 14 +++++++------- 3 files changed, 14 insertions(+), 16 deletions(-) diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 09dfb2ed..a7cfc851 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -54,7 +54,7 @@ use std::num::NonZeroU64; use std::str::FromStr; use std::sync::{atomic, Arc}; use std::time::Duration; -use tokio::sync::{broadcast, watch, Semaphore}; +use tokio::sync::{broadcast, mpsc, watch, Semaphore}; use tokio::task::JoinHandle; use tokio::time::{sleep, timeout}; use ulid::Ulid; @@ -265,7 +265,7 @@ pub struct Web3ProxyApp { Cache, hashbrown::hash_map::DefaultHashBuilder>, pub kafka_producer: Option, /// channel for sending stats in a background task - pub stat_sender: Option>, + pub stat_sender: Option>, } /// flatten a JoinError into an anyhow error diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index bb1d6ecb..15982954 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -33,7 +33,7 @@ use std::mem; use std::sync::atomic::{self, AtomicBool, AtomicI64, AtomicU64, AtomicUsize}; use std::time::Duration; use std::{net::IpAddr, str::FromStr, sync::Arc}; -use tokio::sync::{OwnedSemaphorePermit, Semaphore}; +use tokio::sync::{mpsc, OwnedSemaphorePermit, Semaphore}; use tokio::task::JoinHandle; use tokio::time::Instant; use ulid::Ulid; @@ -266,8 +266,8 @@ pub struct RequestMetadata { /// TODO: maybe this shouldn't be determined by ProxyMode. A request param should probably enable this pub kafka_debug_logger: Option>, - /// Channel to send stats to - pub stat_sender: Option>, + /// Cancel-safe channel to send stats to + pub stat_sender: Option>, } impl Default for RequestMetadata { @@ -457,12 +457,10 @@ impl RequestMetadata { let stat: AppStat = stat.into(); - // can't use async because a Drop can call this - let stat_sender = stat_sender.to_sync(); - if let Err(err) = stat_sender.send(stat) { - error!("failed sending stat: {:?}", err); + error!("failed sending stat {:?}: {:?}", err.0, err); // TODO: return it? that seems like it might cause an infinite loop + // TODO: but dropping stats is bad... hmm... i guess better to undercharge customers than overcharge }; Ok(None) diff --git a/web3_proxy/src/stats/stat_buffer.rs b/web3_proxy/src/stats/stat_buffer.rs index e71f90af..a9d14329 100644 --- a/web3_proxy/src/stats/stat_buffer.rs +++ b/web3_proxy/src/stats/stat_buffer.rs @@ -8,7 +8,7 @@ use log::{error, info, trace}; use migration::sea_orm::prelude::Decimal; use migration::sea_orm::DatabaseConnection; use std::time::Duration; -use tokio::sync::broadcast; +use tokio::sync::{broadcast, mpsc}; use tokio::task::JoinHandle; use tokio::time::interval; @@ -30,7 +30,7 @@ pub struct BufferedRpcQueryStats { #[derive(From)] pub struct SpawnedStatBuffer { - pub stat_sender: kanal::AsyncSender, + pub stat_sender: mpsc::UnboundedSender, /// these handles are important and must be allowed to finish pub background_handle: JoinHandle>, } @@ -65,7 +65,7 @@ impl StatBuffer { return Ok(None); } - let (stat_sender, stat_receiver) = kanal::unbounded_async(); + let (stat_sender, stat_receiver) = mpsc::unbounded_channel(); let timestamp_precision = TimestampPrecision::Seconds; let mut new = Self { @@ -94,7 +94,7 @@ impl StatBuffer { async fn aggregate_and_save_loop( &mut self, bucket: String, - stat_receiver: kanal::AsyncReceiver, + mut stat_receiver: mpsc::UnboundedReceiver, mut shutdown_receiver: broadcast::Receiver<()>, ) -> anyhow::Result<()> { let mut tsdb_save_interval = @@ -111,7 +111,7 @@ impl StatBuffer { // info!("Received stat"); // save the stat to a buffer match stat { - Ok(AppStat::RpcQuery(stat)) => { + Some(AppStat::RpcQuery(stat)) => { if self.influxdb_client.is_some() { // TODO: round the timestamp at all? @@ -128,8 +128,8 @@ impl StatBuffer { self.accounting_db_buffer.entry(stat.accounting_key(self.billing_period_seconds)).or_default().add(stat); } } - Err(err) => { - error!("error receiving stat: {:?}", err); + None => { + info!("done receiving stats"); break; } }