use a cancel-safe channel for stats

This commit is contained in:
Bryan Stitt 2023-05-12 16:02:43 -07:00
parent 510612d343
commit 0c990b0755
3 changed files with 14 additions and 16 deletions

@ -54,7 +54,7 @@ use std::num::NonZeroU64;
use std::str::FromStr; use std::str::FromStr;
use std::sync::{atomic, Arc}; use std::sync::{atomic, Arc};
use std::time::Duration; use std::time::Duration;
use tokio::sync::{broadcast, watch, Semaphore}; use tokio::sync::{broadcast, mpsc, watch, Semaphore};
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use tokio::time::{sleep, timeout}; use tokio::time::{sleep, timeout};
use ulid::Ulid; use ulid::Ulid;
@ -265,7 +265,7 @@ pub struct Web3ProxyApp {
Cache<UserBearerToken, Arc<Semaphore>, hashbrown::hash_map::DefaultHashBuilder>, Cache<UserBearerToken, Arc<Semaphore>, hashbrown::hash_map::DefaultHashBuilder>,
pub kafka_producer: Option<rdkafka::producer::FutureProducer>, pub kafka_producer: Option<rdkafka::producer::FutureProducer>,
/// channel for sending stats in a background task /// channel for sending stats in a background task
pub stat_sender: Option<kanal::AsyncSender<AppStat>>, pub stat_sender: Option<mpsc::UnboundedSender<AppStat>>,
} }
/// flatten a JoinError into an anyhow error /// flatten a JoinError into an anyhow error

@ -33,7 +33,7 @@ use std::mem;
use std::sync::atomic::{self, AtomicBool, AtomicI64, AtomicU64, AtomicUsize}; use std::sync::atomic::{self, AtomicBool, AtomicI64, AtomicU64, AtomicUsize};
use std::time::Duration; use std::time::Duration;
use std::{net::IpAddr, str::FromStr, sync::Arc}; use std::{net::IpAddr, str::FromStr, sync::Arc};
use tokio::sync::{OwnedSemaphorePermit, Semaphore}; use tokio::sync::{mpsc, OwnedSemaphorePermit, Semaphore};
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use tokio::time::Instant; use tokio::time::Instant;
use ulid::Ulid; use ulid::Ulid;
@ -266,8 +266,8 @@ pub struct RequestMetadata {
/// TODO: maybe this shouldn't be determined by ProxyMode. A request param should probably enable this /// TODO: maybe this shouldn't be determined by ProxyMode. A request param should probably enable this
pub kafka_debug_logger: Option<Arc<KafkaDebugLogger>>, pub kafka_debug_logger: Option<Arc<KafkaDebugLogger>>,
/// Channel to send stats to /// Cancel-safe channel to send stats to
pub stat_sender: Option<kanal::AsyncSender<AppStat>>, pub stat_sender: Option<mpsc::UnboundedSender<AppStat>>,
} }
impl Default for RequestMetadata { impl Default for RequestMetadata {
@ -457,12 +457,10 @@ impl RequestMetadata {
let stat: AppStat = stat.into(); let stat: AppStat = stat.into();
// can't use async because a Drop can call this
let stat_sender = stat_sender.to_sync();
if let Err(err) = stat_sender.send(stat) { if let Err(err) = stat_sender.send(stat) {
error!("failed sending stat: {:?}", err); error!("failed sending stat {:?}: {:?}", err.0, err);
// TODO: return it? that seems like it might cause an infinite loop // TODO: return it? that seems like it might cause an infinite loop
// TODO: but dropping stats is bad... hmm... i guess better to undercharge customers than overcharge
}; };
Ok(None) Ok(None)

@ -8,7 +8,7 @@ use log::{error, info, trace};
use migration::sea_orm::prelude::Decimal; use migration::sea_orm::prelude::Decimal;
use migration::sea_orm::DatabaseConnection; use migration::sea_orm::DatabaseConnection;
use std::time::Duration; use std::time::Duration;
use tokio::sync::broadcast; use tokio::sync::{broadcast, mpsc};
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use tokio::time::interval; use tokio::time::interval;
@ -30,7 +30,7 @@ pub struct BufferedRpcQueryStats {
#[derive(From)] #[derive(From)]
pub struct SpawnedStatBuffer { pub struct SpawnedStatBuffer {
pub stat_sender: kanal::AsyncSender<AppStat>, pub stat_sender: mpsc::UnboundedSender<AppStat>,
/// these handles are important and must be allowed to finish /// these handles are important and must be allowed to finish
pub background_handle: JoinHandle<anyhow::Result<()>>, pub background_handle: JoinHandle<anyhow::Result<()>>,
} }
@ -65,7 +65,7 @@ impl StatBuffer {
return Ok(None); return Ok(None);
} }
let (stat_sender, stat_receiver) = kanal::unbounded_async(); let (stat_sender, stat_receiver) = mpsc::unbounded_channel();
let timestamp_precision = TimestampPrecision::Seconds; let timestamp_precision = TimestampPrecision::Seconds;
let mut new = Self { let mut new = Self {
@ -94,7 +94,7 @@ impl StatBuffer {
async fn aggregate_and_save_loop( async fn aggregate_and_save_loop(
&mut self, &mut self,
bucket: String, bucket: String,
stat_receiver: kanal::AsyncReceiver<AppStat>, mut stat_receiver: mpsc::UnboundedReceiver<AppStat>,
mut shutdown_receiver: broadcast::Receiver<()>, mut shutdown_receiver: broadcast::Receiver<()>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let mut tsdb_save_interval = let mut tsdb_save_interval =
@ -111,7 +111,7 @@ impl StatBuffer {
// info!("Received stat"); // info!("Received stat");
// save the stat to a buffer // save the stat to a buffer
match stat { match stat {
Ok(AppStat::RpcQuery(stat)) => { Some(AppStat::RpcQuery(stat)) => {
if self.influxdb_client.is_some() { if self.influxdb_client.is_some() {
// TODO: round the timestamp at all? // TODO: round the timestamp at all?
@ -128,8 +128,8 @@ impl StatBuffer {
self.accounting_db_buffer.entry(stat.accounting_key(self.billing_period_seconds)).or_default().add(stat); self.accounting_db_buffer.entry(stat.accounting_key(self.billing_period_seconds)).or_default().add(stat);
} }
} }
Err(err) => { None => {
error!("error receiving stat: {:?}", err); info!("done receiving stats");
break; break;
} }
} }