diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 113c40d5..befc1d98 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -51,7 +51,7 @@ use std::str::FromStr; use std::sync::atomic::{AtomicU16, Ordering}; use std::sync::{atomic, Arc}; use std::time::Duration; -use tokio::sync::{broadcast, watch, Semaphore}; +use tokio::sync::{broadcast, watch, Semaphore, oneshot}; use tokio::task::JoinHandle; use tokio::time::{sleep, timeout}; use tracing::{error, info, trace, warn, Level}; @@ -185,7 +185,7 @@ impl Web3ProxyApp { top_config: TopConfig, num_workers: usize, shutdown_sender: broadcast::Sender<()>, - flush_stat_buffer_receiver: broadcast::Receiver<()> + flush_stat_buffer_receiver: flume::Receiver>, ) -> anyhow::Result { let stat_buffer_shutdown_receiver = shutdown_sender.subscribe(); let mut background_shutdown_receiver = shutdown_sender.subscribe(); @@ -385,31 +385,27 @@ impl Web3ProxyApp { // create a channel for receiving stats // we do this in a channel so we don't slow down our response to the users // stats can be saved in mysql, influxdb, both, or none - let mut stat_sender = None; - if let Some(influxdb_bucket) = top_config.app.influxdb_bucket.clone() { - if let Some(spawned_stat_buffer) = StatBuffer::try_spawn( - BILLING_PERIOD_SECONDS, - influxdb_bucket, - top_config.app.chain_id, - db_conn.clone(), - 60, - influxdb_client.clone(), - Some(rpc_secret_key_cache.clone()), - Some(user_balance_cache.clone()), - stat_buffer_shutdown_receiver, - 1, - flush_stat_buffer_receiver, - )? { - // since the database entries are used for accounting, we want to be sure everything is saved before exiting - important_background_handles.push(spawned_stat_buffer.background_handle); + let stat_sender = if let Some(spawned_stat_buffer) = StatBuffer::try_spawn( + BILLING_PERIOD_SECONDS, + top_config.app.chain_id, + db_conn.clone(), + 60, + top_config.app.influxdb_bucket.clone(), + influxdb_client.clone(), + Some(rpc_secret_key_cache.clone()), + Some(user_balance_cache.clone()), + stat_buffer_shutdown_receiver, + 1, + flush_stat_buffer_receiver, + )? { + // since the database entries are used for accounting, we want to be sure everything is saved before exiting + important_background_handles.push(spawned_stat_buffer.background_handle); - stat_sender = Some(spawned_stat_buffer.stat_sender); - } - } - - if stat_sender.is_none() { + Some(spawned_stat_buffer.stat_sender) + } else { info!("stats will not be collected"); - } + None + }; // make a http shared client // TODO: can we configure the connection pool? should we? diff --git a/web3_proxy/src/stats/stat_buffer.rs b/web3_proxy/src/stats/stat_buffer.rs index d454bb2e..cde0f1a5 100644 --- a/web3_proxy/src/stats/stat_buffer.rs +++ b/web3_proxy/src/stats/stat_buffer.rs @@ -9,9 +9,9 @@ use influxdb2::api::write::TimestampPrecision; use migration::sea_orm::prelude::Decimal; use migration::sea_orm::DatabaseConnection; use std::time::Duration; -use tokio::sync::broadcast; +use tokio::sync::{broadcast, oneshot}; use tokio::time::{interval, sleep}; -use tracing::{error, info, trace}; +use tracing::{error, info, trace, warn}; #[derive(Debug, Default)] pub struct BufferedRpcQueryStats { @@ -43,6 +43,7 @@ pub struct StatBuffer { db_conn: Option, db_save_interval_seconds: u32, global_timeseries_buffer: HashMap, + influxdb_bucket: Option, influxdb_client: Option, opt_in_timeseries_buffer: HashMap, rpc_secret_key_cache: RpcSecretKeyCache, @@ -55,17 +56,21 @@ impl StatBuffer { #[allow(clippy::too_many_arguments)] pub fn try_spawn( billing_period_seconds: i64, - bucket: String, chain_id: u64, db_conn: Option, db_save_interval_seconds: u32, - influxdb_client: Option, + influxdb_bucket: Option, + mut influxdb_client: Option, rpc_secret_key_cache: Option, user_balance_cache: Option, shutdown_receiver: broadcast::Receiver<()>, tsdb_save_interval_seconds: u32, - flush_receiver: broadcast::Receiver<()>, + flush_receiver: flume::Receiver>, ) -> anyhow::Result> { + if influxdb_bucket.is_none() { + influxdb_client = None; + } + if db_conn.is_none() && influxdb_client.is_none() { return Ok(None); } @@ -73,6 +78,7 @@ impl StatBuffer { let (stat_sender, stat_receiver) = flume::unbounded(); let timestamp_precision = TimestampPrecision::Seconds; + let mut new = Self { accounting_db_buffer: Default::default(), billing_period_seconds, @@ -80,6 +86,7 @@ impl StatBuffer { db_conn, db_save_interval_seconds, global_timeseries_buffer: Default::default(), + influxdb_bucket, influxdb_client, opt_in_timeseries_buffer: Default::default(), rpc_secret_key_cache: rpc_secret_key_cache.unwrap(), @@ -89,8 +96,9 @@ impl StatBuffer { }; // any errors inside this task will cause the application to exit + // TODO? change this to the X and XTask pattern like the latency crate uses let handle = tokio::spawn(async move { - new.aggregate_and_save_loop(bucket, stat_receiver, shutdown_receiver, flush_receiver) + new.aggregate_and_save_loop(stat_receiver, shutdown_receiver, flush_receiver) .await }); @@ -99,10 +107,9 @@ impl StatBuffer { async fn aggregate_and_save_loop( &mut self, - bucket: String, stat_receiver: flume::Receiver, mut shutdown_receiver: broadcast::Receiver<()>, - mut flush_receiver: broadcast::Receiver<()>, + flush_receiver: flume::Receiver>, ) -> Web3ProxyResult<()> { let mut tsdb_save_interval = interval(Duration::from_secs(self.tsdb_save_interval_seconds as u64)); @@ -147,22 +154,30 @@ impl StatBuffer { } _ = tsdb_save_interval.tick() => { trace!("TSDB save internal tick"); - let count = self.save_tsdb_stats(&bucket).await; + let count = self.save_tsdb_stats().await; if count > 0 { trace!("Saved {} stats to the tsdb", count); } } - _ = flush_receiver.recv() => { - trace!("flush"); + x = flush_receiver.recv_async() => { + if let Ok(x) = x { + trace!("flush"); - let count = self.save_tsdb_stats(&bucket).await; - if count > 0 { - trace!("Flushed {} stats to the tsdb", count); - } + let tsdb_count = self.save_tsdb_stats().await; + if tsdb_count > 0 { + trace!("Flushed {} stats to the tsdb", tsdb_count); + } - let count = self.save_relational_stats().await; - if count > 0 { - trace!("Flushed {} stats to the relational db", count); + let relational_count = self.save_relational_stats().await; + if relational_count > 0 { + trace!("Flushed {} stats to the relational db", relational_count); + } + + if let Err(err) = x.send((tsdb_count, relational_count)) { + warn!(%tsdb_count, %relational_count, ?err, "unable to notify about flushed stats"); + } + } else { + unimplemented!() } } x = shutdown_receiver.recv() => { @@ -197,7 +212,7 @@ impl StatBuffer { info!("saved {} pending relational stat(s)", saved_relational); - let saved_tsdb = self.save_tsdb_stats(&bucket).await; + let saved_tsdb = self.save_tsdb_stats().await; info!("saved {} pending tsdb stat(s)", saved_tsdb); @@ -233,10 +248,15 @@ impl StatBuffer { } // TODO: bucket should be an enum so that we don't risk typos - async fn save_tsdb_stats(&mut self, bucket: &str) -> usize { + async fn save_tsdb_stats(&mut self) -> usize { let mut count = 0; if let Some(influxdb_client) = self.influxdb_client.as_ref() { + let influxdb_bucket = self + .influxdb_bucket + .as_ref() + .expect("if client is set, bucket must be set"); + // TODO: use stream::iter properly to avoid allocating this Vec let mut points = vec![]; @@ -290,7 +310,7 @@ impl StatBuffer { if let Err(err) = influxdb_client .write_with_precision( - bucket, + influxdb_bucket, stream::iter(points), self.timestamp_precision, ) diff --git a/web3_proxy/src/sub_commands/migrate_stats_to_v2.rs b/web3_proxy/src/sub_commands/migrate_stats_to_v2.rs index 9e83e17c..ce0ae3fe 100644 --- a/web3_proxy/src/sub_commands/migrate_stats_to_v2.rs +++ b/web3_proxy/src/sub_commands/migrate_stats_to_v2.rs @@ -72,19 +72,15 @@ impl MigrateStatsToV2SubCommand { None => None, }; - let (_flush_sender, flush_receiver) = broadcast::channel(1); + let (_flush_sender, flush_receiver) = flume::bounded(1); // Spawn the stat-sender let emitter_spawn = StatBuffer::try_spawn( BILLING_PERIOD_SECONDS, - top_config - .app - .influxdb_bucket - .clone() - .context("No influxdb bucket was provided")?, top_config.app.chain_id, Some(db_conn.clone()), 30, + top_config.app.influxdb_bucket.clone(), influxdb_client.clone(), None, None, diff --git a/web3_proxy/src/sub_commands/proxyd.rs b/web3_proxy/src/sub_commands/proxyd.rs index 45e5d9e6..272c7d9c 100644 --- a/web3_proxy/src/sub_commands/proxyd.rs +++ b/web3_proxy/src/sub_commands/proxyd.rs @@ -12,7 +12,7 @@ use std::sync::Arc; use std::time::Duration; use std::{fs, thread}; use tokio::select; -use tokio::sync::broadcast; +use tokio::sync::{broadcast, oneshot}; use tokio::time::{sleep_until, Instant}; use tracing::{error, info, trace, warn}; @@ -37,12 +37,12 @@ impl ProxydSubCommand { top_config_path: PathBuf, num_workers: usize, ) -> anyhow::Result<()> { - let (shutdown_sender, _) = broadcast::channel(1); + let (frontend_shutdown_sender, _) = broadcast::channel(1); // TODO: i think there is a small race. if config_path changes let frontend_port = Arc::new(self.port.into()); let prometheus_port = Arc::new(self.prometheus_port.into()); - let (flush_stat_buffer_sender, _) = broadcast::channel(1); + let (_flush_stat_buffer_sender, flush_stat_buffer_receiver) = flume::bounded(1); Self::_main( top_config, @@ -50,8 +50,8 @@ impl ProxydSubCommand { frontend_port, prometheus_port, num_workers, - shutdown_sender, - flush_stat_buffer_sender, + frontend_shutdown_sender, + flush_stat_buffer_receiver, ) .await } @@ -64,7 +64,7 @@ impl ProxydSubCommand { prometheus_port: Arc, num_workers: usize, frontend_shutdown_sender: broadcast::Sender<()>, - flush_stat_buffer_sender: broadcast::Sender<()>, + flush_stat_buffer_receiver: flume::Receiver>, ) -> anyhow::Result<()> { // tokio has code for catching ctrl+c so we use that to shut down in most cases // frontend_shutdown_sender is currently only used in tests, but we might make a /shutdown endpoint or something @@ -85,7 +85,7 @@ impl ProxydSubCommand { top_config.clone(), num_workers, app_shutdown_sender.clone(), - flush_stat_buffer_sender.subscribe(), + flush_stat_buffer_receiver, ) .await?; diff --git a/web3_proxy/tests/common/app.rs b/web3_proxy/tests/common/app.rs index 4425e0f9..06192ed6 100644 --- a/web3_proxy/tests/common/app.rs +++ b/web3_proxy/tests/common/app.rs @@ -20,7 +20,10 @@ use std::{sync::Arc, time::Duration}; use tokio::{ net::TcpStream, process::Command as AsyncCommand, - sync::broadcast::{self, error::SendError}, + sync::{ + broadcast::{self, error::SendError}, + oneshot, + }, task::JoinHandle, time::{sleep, Instant}, }; @@ -55,7 +58,7 @@ pub struct TestApp { pub proxy_provider: Provider, /// tell the app to flush stats to the database - flush_stat_buffer_sender: broadcast::Sender<()>, + flush_stat_buffer_sender: flume::Sender>, /// tell the app to shut down (use `self.stop()`). shutdown_sender: broadcast::Sender<()>, @@ -272,7 +275,7 @@ impl TestApp { let frontend_port_arc = Arc::new(AtomicU16::new(0)); let prometheus_port_arc = Arc::new(AtomicU16::new(0)); - let (flush_stat_buffer_sender, _flush_stat_buffer_receiver) = broadcast::channel(1); + let (flush_stat_buffer_sender, flush_stat_buffer_receiver) = flume::bounded(1); // spawn the app // TODO: spawn in a thread so we can run from non-async tests and so the Drop impl can wait for it to stop @@ -284,7 +287,7 @@ impl TestApp { prometheus_port_arc, num_workers, shutdown_sender.clone(), - flush_stat_buffer_sender.clone(), + flush_stat_buffer_receiver, )) }; @@ -320,8 +323,15 @@ impl TestApp { self.db.as_ref().unwrap().conn.as_ref().unwrap() } - pub fn flush_stats(&self) { - self.flush_stat_buffer_sender.send(()).unwrap(); + #[allow(unused)] + pub async fn flush_stats(&self) -> anyhow::Result<(usize, usize)> { + let (tx, rx) = oneshot::channel(); + + self.flush_stat_buffer_sender.send(tx)?; + + let x = rx.await?; + + Ok(x) } pub fn stop(&self) -> Result> { diff --git a/web3_proxy/tests/common/mod.rs b/web3_proxy/tests/common/mod.rs index b948b7fc..8c9bc1e4 100644 --- a/web3_proxy/tests/common/mod.rs +++ b/web3_proxy/tests/common/mod.rs @@ -1,3 +1,3 @@ -mod app; +pub mod app; pub use self::app::TestApp; diff --git a/web3_proxy/tests/test_proxy.rs b/web3_proxy/tests/test_proxy.rs index e58134ef..83106df2 100644 --- a/web3_proxy/tests/test_proxy.rs +++ b/web3_proxy/tests/test_proxy.rs @@ -10,11 +10,13 @@ use tokio::{ }; use web3_proxy::rpcs::blockchain::ArcBlock; -// #[cfg_attr(not(feature = "tests-needing-docker"), ignore)] -#[ignore = "under construction"] +#[cfg_attr(not(feature = "tests-needing-docker"), ignore)] #[test_log::test(tokio::test)] async fn it_migrates_the_db() { - TestApp::spawn(true).await; + let x = TestApp::spawn(true).await; + + // we call flush stats more to be sure it works than because we expect it to save any stats + x.flush_stats().await.unwrap(); } #[test_log::test(tokio::test)] @@ -89,6 +91,9 @@ async fn it_starts_and_stops() { assert_eq!(anvil_result, proxy_result.unwrap()); + // this won't do anything since stats aren't tracked when there isn't a db + x.flush_stats().await.unwrap_err(); + // most tests won't need to wait, but we should wait here to be sure all the shutdown logic works properly x.wait().await; }