From 53c7541fed9ac63acbb27631d3e3d97293f8e6f3 Mon Sep 17 00:00:00 2001 From: yenicelik Date: Sat, 25 Mar 2023 17:56:45 +0100 Subject: [PATCH] seems to wait until everything is flushed. I will have to debug some stuff that isnt saved properly in the mysql (and then check influx) --- web3_proxy/src/app/mod.rs | 38 ++- web3_proxy/src/bin/web3_proxy_cli/main.rs | 7 +- .../bin/web3_proxy_cli/migrate_stats_to_v2.rs | 306 +++++++++++++----- web3_proxy/src/rpcs/many.rs | 116 +++---- web3_proxy/src/stats/mod.rs | 17 + 5 files changed, 336 insertions(+), 148 deletions(-) diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 3834eb2a..424bee93 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -67,7 +67,7 @@ pub static APP_USER_AGENT: &str = concat!( ); // aggregate across 1 week -const BILLING_PERIOD_SECONDS: i64 = 60 * 60 * 24 * 7; +pub const BILLING_PERIOD_SECONDS: i64 = 60 * 60 * 24 * 7; #[derive(Debug, From)] struct ResponseCacheKey { @@ -575,7 +575,12 @@ impl Web3ProxyApp { // stats can be saved in mysql, influxdb, both, or none let stat_sender = if let Some(emitter_spawn) = StatBuffer::try_spawn( top_config.app.chain_id, - top_config.app.influxdb_bucket.clone().context("No influxdb bucket was provided")?.to_owned(), + top_config + .app + .influxdb_bucket + .clone() + .context("No influxdb bucket was provided")? + .to_owned(), db_conn.clone(), influxdb_client.clone(), 60, @@ -812,26 +817,27 @@ impl Web3ProxyApp { app_handles.push(config_handle); } -// ======= -// if important_background_handles.is_empty() { -// info!("no important background handles"); -// -// let f = tokio::spawn(async move { -// let _ = background_shutdown_receiver.recv().await; -// -// Ok(()) -// }); -// -// important_background_handles.push(f); -// >>>>>>> 77df3fa (stats v2) + // ======= + // if important_background_handles.is_empty() { + // info!("no important background handles"); + // + // let f = tokio::spawn(async move { + // let _ = background_shutdown_receiver.recv().await; + // + // Ok(()) + // }); + // + // important_background_handles.push(f); + // >>>>>>> 77df3fa (stats v2) Ok(( app, app_handles, important_background_handles, new_top_config_sender, - consensus_connections_watcher - ).into()) + consensus_connections_watcher, + ) + .into()) } pub async fn apply_top_config(&self, new_top_config: TopConfig) -> anyhow::Result<()> { diff --git a/web3_proxy/src/bin/web3_proxy_cli/main.rs b/web3_proxy/src/bin/web3_proxy_cli/main.rs index 88b64a0e..7cbeef83 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/main.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/main.rs @@ -374,12 +374,17 @@ fn main() -> anyhow::Result<()> { x.main(&db_conn).await } SubCommand::MigrateStatsToV2(x) => { + + let top_config = top_config.expect("--config is required to run the migration from stats-mysql to stats-influx"); + // let top_config_path = + // top_config_path.expect("path must be set if top_config exists"); + let db_url = cli_config .db_url .expect("'--config' (with a db) or '--db-url' is required to run the migration from stats-mysql to stats-influx"); let db_conn = get_db(db_url, 1, 1).await?; - x.main(&db_conn).await + x.main(top_config, &db_conn).await } SubCommand::Pagerduty(x) => { if cli_config.sentry_url.is_none() { diff --git a/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs b/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs index 280387c9..850182e9 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs @@ -1,18 +1,49 @@ -use std::net::{IpAddr, Ipv4Addr}; -use std::num::NonZeroU64; use anyhow::Context; use argh::FromArgs; -use entities::{rpc_key, rpc_accounting, rpc_accounting_v2, user}; +use chrono::{DateTime, Utc}; +use entities::{rpc_accounting, rpc_accounting_v2, rpc_key, user}; use ethers::types::Address; +use futures::stream::FuturesUnordered; +use futures::StreamExt; use hashbrown::HashMap; -use log::{debug, info, warn}; +use log::{debug, error, info, trace, warn}; use migration::sea_orm::{ self, ActiveModelTrait, ColumnTrait, DatabaseConnection, EntityTrait, IntoActiveModel, - QueryFilter, QuerySelect + QueryFilter, QuerySelect, UpdateResult, }; -use web3_proxy::app::AuthorizationChecks; -use web3_proxy::frontend::authorization::{Authorization, AuthorizationType, RequestMetadata, RpcSecretKey}; -use web3_proxy::stats::{BufferedRpcQueryStats, RpcQueryKey}; +use migration::{Expr, Value}; +use std::mem::swap; +use std::net::{IpAddr, Ipv4Addr}; +use std::num::NonZeroU64; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use tokio::sync::broadcast; +use tokio::time::{sleep, Instant}; +use web3_proxy::app::{AuthorizationChecks, Web3ProxyApp, BILLING_PERIOD_SECONDS}; +use web3_proxy::config::TopConfig; +use web3_proxy::frontend::authorization::{ + Authorization, AuthorizationType, RequestMetadata, RpcSecretKey, +}; +use web3_proxy::stats::{BufferedRpcQueryStats, RpcQueryKey, RpcQueryStats, StatBuffer}; + +// Helper function to go from DateTime to Instant +fn datetime_utc_to_instant(datetime: DateTime) -> anyhow::Result { + let epoch = datetime.timestamp(); // Get the Unix timestamp + let nanos = datetime.timestamp_subsec_nanos(); + + let duration_since_epoch = Duration::new(epoch as u64, nanos); + // let duration_since_datetime = Duration::new(, nanos); + let instant_new = Instant::now(); + warn!("Instant new is: {:?}", instant_new); + let unix_epoch = SystemTime::now().duration_since(UNIX_EPOCH).unwrap(); + warn!("Instant since unix epoch is: {:?}", unix_epoch); + + instant_new + .checked_sub(unix_epoch) + .context("Could not subtract unix epoch from instant now")? + .checked_add(duration_since_epoch) + .context("Could not add duration since epoch for updated time") +} /// change a user's address. #[derive(FromArgs, PartialEq, Eq, Debug)] @@ -20,14 +51,80 @@ use web3_proxy::stats::{BufferedRpcQueryStats, RpcQueryKey}; pub struct MigrateStatsToV2 {} impl MigrateStatsToV2 { - pub async fn main(self, db_conn: &DatabaseConnection) -> anyhow::Result<()> { + pub async fn main( + self, + top_config: TopConfig, + db_conn: &DatabaseConnection, + ) -> anyhow::Result<()> { + // Also add influxdb container ... + // let mut spawned_app = + // Web3ProxyApp::spawn(top_config.clone(), 2, app_shutdown_sender.clone()).await?; + + // we wouldn't really need this, but let's spawn this anyways + // easier than debugging the rest I suppose + let (app_shutdown_sender, _app_shutdown_receiver) = broadcast::channel(1); + let rpc_account_shutdown_recevier = app_shutdown_sender.subscribe(); + + // we must wait for these to end on their own (and they need to subscribe to shutdown_sender) + let mut important_background_handles = FuturesUnordered::new(); + + // Spawn the influxdb + let influxdb_client = match top_config.app.influxdb_host.as_ref() { + Some(influxdb_host) => { + let influxdb_org = top_config + .app + .influxdb_org + .clone() + .expect("influxdb_org needed when influxdb_host is set"); + let influxdb_token = top_config + .app + .influxdb_token + .clone() + .expect("influxdb_token needed when influxdb_host is set"); + + let influxdb_client = + influxdb2::Client::new(influxdb_host, influxdb_org, influxdb_token); + + // TODO: test the client now. having a stat for "started" can be useful on graphs to mark deploys + + Some(influxdb_client) + } + None => None, + }; + + // Spawn the stat-sender + let stat_sender = if let Some(emitter_spawn) = StatBuffer::try_spawn( + top_config.app.chain_id, + top_config + .app + .influxdb_bucket + .clone() + .context("No influxdb bucket was provided")? + .to_owned(), + Some(db_conn.clone()), + influxdb_client.clone(), + 30, + 1, + BILLING_PERIOD_SECONDS, + rpc_account_shutdown_recevier, + )? { + // since the database entries are used for accounting, we want to be sure everything is saved before exiting + important_background_handles.push(emitter_spawn.background_handle); + + Some(emitter_spawn.stat_sender) + } else { + None + }; + + info!("Background handles are: {:?}", important_background_handles); + + // Basically spawn the full app, look at web3_proxy CLI while true { - // (1) Load a batch of rows out of the old table until no more rows are left let old_records = rpc_accounting::Entity::find() .filter(rpc_accounting::Column::Migrated.is_null()) - .limit(10000) + .limit(2) .all(db_conn) .await?; if old_records.len() == 0 { @@ -42,8 +139,7 @@ impl MigrateStatsToV2 { let mut accounting_db_buffer = HashMap::::new(); // Iterate through all old rows, and put them into the above objects. - for x in old_records { - + for x in old_records.iter() { info!("Preparing for migration: {:?}", x); // TODO: Split up a single request into multiple requests ... @@ -57,103 +153,163 @@ impl MigrateStatsToV2 { .filter(rpc_key::Column::Id.eq(rpc_key_id)) .one(db_conn) .await? - .unwrap(); + .context("Could not find rpc_key_obj for the given rpc_key_id")?; // TODO: Create authrization // We can probably also randomly generate this, as we don't care about the user (?) AuthorizationChecks { user_id: rpc_key_obj.user_id, rpc_secret_key: Some(RpcSecretKey::Uuid(rpc_key_obj.secret_key)), - rpc_secret_key_id: Some(NonZeroU64::new(rpc_key_id).unwrap()), - ..Default::default() - } - }, - None => { - AuthorizationChecks { + rpc_secret_key_id: Some( + NonZeroU64::new(rpc_key_id) + .context("Could not use rpc_key_id to create a u64")?, + ), ..Default::default() } } + None => AuthorizationChecks { + ..Default::default() + }, }; // Then overwrite rpc_key_id and user_id (?) let authorization_type = AuthorizationType::Frontend; - let authorization = Authorization::try_new( - authorization_checks, - None, - IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), - None, - None, - None, - authorization_type + let authorization = Arc::new( + Authorization::try_new( + authorization_checks, + None, + IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), + None, + None, + None, + authorization_type, + ) + .context("Initializing Authorization Struct was not successful")?, ); // It will be like a fork basically (to simulate getting multiple single requests ...) // Iterate through all frontend requests // For each frontend request, create one object that will be emitted (make sure the timestamp is new) + let n = x.frontend_requests; - for _ in 1..x.frontend_requests { - info!("Creating a new frontend request"); + for _ in 1..n { + // info!("Creating a new frontend request"); // TODO: Create RequestMetadata let request_metadata = RequestMetadata { - start_instant: x.period_datetime.timestamp(), - request_bytes: x.request_bytes, - archive_request: x.archive_request, - backend_requests: todo!(), - no_servers: 0, // This is not relevant in the new version - error_response: x.error_response, - response_bytes: todo!(), - response_millis: todo!(), - response_from_backup_rpc: todo!() // I think we did not record this back then // Default::default() + start_instant: Instant::now(), // This is overwritten later on + request_bytes: (x.sum_request_bytes / n).into(), // Get the mean of all the request bytes + archive_request: x.archive_request.into(), + backend_requests: Default::default(), // This is not used, instead we modify the field later + no_servers: 0.into(), // This is not relevant in the new version + error_response: x.error_response.into(), + response_bytes: (x.sum_response_bytes / n).into(), + response_millis: (x.sum_response_millis / n).into(), + // We just don't have this data + response_from_backup_rpc: false.into(), // I think we did not record this back then // Default::default() }; + // (3) Send through a channel to a stat emitter + // Send it to the stats sender + if let Some(stat_sender_ref) = stat_sender.as_ref() { + let mut response_stat = RpcQueryStats::new( + x.clone().method.unwrap(), + authorization.clone(), + Arc::new(request_metadata), + (x.sum_response_bytes / n) + .try_into() + .context("sum bytes average is not calculated properly")?, + ); + // Modify the timestamps .. + response_stat.modify_struct( + (x.sum_response_millis / n), + x.period_datetime.timestamp(), // I suppose timestamp is millis as well ... should check this in the database + (x.backend_requests / n), + ); + info!("Sending stats: {:?}", response_stat); + stat_sender_ref + // .send(response_stat.into()) + .send_async(response_stat.into()) + .await + .context("stat_sender sending response_stat")?; + info!("Send! {:?}", stat_sender); + } else { + panic!("Stat sender was not spawned!"); + } + + // Create a new stats object + // Add it to the StatBuffer } - + // Let the stat_sender spawn / flush ... + // spawned_app.app.stat_sender.aggregate_and_save_loop() + // Send a signal to save ... } - // (N-1) Mark the batch as migrated - break; + // (3) Await that all items are properly processed + // TODO: Await all the background handles + info!("Waiting for a second until all is flushed"); + // Only after this mark all the items as processed / completed + + // If the items are in rpc_v2, delete the initial items from the database + + // (4) Update the batch in the old table with the current timestamp (Mark the batch as migrated) + let old_record_ids = old_records.iter().map(|x| x.id); + let update_result: UpdateResult = rpc_accounting::Entity::update_many() + .col_expr( + rpc_accounting::Column::Migrated, + Expr::value(Value::ChronoDateTimeUtc(Some(Box::new( + chrono::offset::Utc::now(), + )))), + ) + .filter(rpc_accounting::Column::Id.is_in(old_record_ids)) + // .set(pear) + .exec(db_conn) + .await?; + + info!("Update result is: {:?}", update_result); + + // (N-1) Mark the batch as migrated } + info!( + "Background handles (2) are: {:?}", + important_background_handles + ); + // Drop the handle + // Send the shutdown signal here (?) + // important_background_handles.clear(); + // Finally also send a shutdown signal + match app_shutdown_sender.send(()) { + Err(x) => { + panic!("Could not send shutdown signal! {:?}", x); + } + _ => {} + }; + // Drop the background handle + while let Some(x) = important_background_handles.next().await { + info!("Returned item is: {:?}", x); + match x { + Err(e) => { + error!("{:?}", e); + } + Ok(Err(e)) => { + error!("{:?}", e); + } + Ok(Ok(_)) => { + // TODO: how can we know which handle exited? + info!("a background handle exited"); + // Pop it in this case? + continue; + } + } + } - // (3) Update the batch in the old table with the current timestamp - - // (4) Send through a channel to a stat emitter - - - - // let old_address: Address = self.old_address.parse()?; - // let new_address: Address = self.new_address.parse()?; - // - // let old_address: Vec = old_address.to_fixed_bytes().into(); - // let new_address: Vec = new_address.to_fixed_bytes().into(); - // - // let u = user::Entity::find() - // .filter(user::Column::Address.eq(old_address)) - // .one(db_conn) - // .await? - // .context("No user found with that address")?; - // - // debug!("initial user: {:#?}", u); - // - // if u.address == new_address { - // info!("user already has this address"); - // } else { - // let mut u = u.into_active_model(); - // - // u.address = sea_orm::Set(new_address); - // - // let u = u.save(db_conn).await?; - // - // info!("changed user address"); - // - // debug!("updated user: {:#?}", u); - // } + info!("Here (?)"); Ok(()) } diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 1a1b8354..0c2528a8 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -34,6 +34,7 @@ use std::sync::atomic::{self, Ordering}; use std::sync::Arc; use std::{cmp, fmt}; use thread_fast_rng::rand::seq::SliceRandom; +use tokio; use tokio::sync::{broadcast, watch}; use tokio::task; use tokio::time::{interval, sleep, sleep_until, Duration, Instant, MissedTickBehavior}; @@ -167,7 +168,8 @@ impl Web3Rpcs { .max_capacity(10_000) .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()); - let (watch_consensus_rpcs_sender, consensus_connections_watcher) = watch::channel(Default::default()); + let (watch_consensus_rpcs_sender, consensus_connections_watcher) = + watch::channel(Default::default()); // by_name starts empty. self.apply_server_configs will add to it let by_name = Default::default(); @@ -318,43 +320,43 @@ impl Web3Rpcs { } } -// <<<<<<< HEAD + // <<<<<<< HEAD Ok(()) } -// ======= -// // TODO: max_capacity and time_to_idle from config -// // all block hashes are the same size, so no need for weigher -// let block_hashes = Cache::builder() -// .time_to_idle(Duration::from_secs(600)) -// .max_capacity(10_000) -// .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()); -// // all block numbers are the same size, so no need for weigher -// let block_numbers = Cache::builder() -// .time_to_idle(Duration::from_secs(600)) -// .max_capacity(10_000) -// .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()); -// -// let (watch_consensus_connections_sender, consensus_connections_watcher) = -// watch::channel(Default::default()); -// -// let watch_consensus_head_receiver = -// watch_consensus_head_sender.as_ref().map(|x| x.subscribe()); -// -// let connections = Arc::new(Self { -// by_name: connections, -// watch_consensus_rpcs_sender: watch_consensus_connections_sender, -// watch_consensus_head_receiver, -// pending_transactions, -// block_hashes, -// block_numbers, -// min_sum_soft_limit, -// min_head_rpcs, -// max_block_age, -// max_block_lag, -// }); -// -// let authorization = Arc::new(Authorization::internal(db_conn.clone())?); -// >>>>>>> 77df3fa (stats v2) + // ======= + // // TODO: max_capacity and time_to_idle from config + // // all block hashes are the same size, so no need for weigher + // let block_hashes = Cache::builder() + // .time_to_idle(Duration::from_secs(600)) + // .max_capacity(10_000) + // .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()); + // // all block numbers are the same size, so no need for weigher + // let block_numbers = Cache::builder() + // .time_to_idle(Duration::from_secs(600)) + // .max_capacity(10_000) + // .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()); + // + // let (watch_consensus_connections_sender, consensus_connections_watcher) = + // watch::channel(Default::default()); + // + // let watch_consensus_head_receiver = + // watch_consensus_head_sender.as_ref().map(|x| x.subscribe()); + // + // let connections = Arc::new(Self { + // by_name: connections, + // watch_consensus_rpcs_sender: watch_consensus_connections_sender, + // watch_consensus_head_receiver, + // pending_transactions, + // block_hashes, + // block_numbers, + // min_sum_soft_limit, + // min_head_rpcs, + // max_block_age, + // max_block_lag, + // }); + // + // let authorization = Arc::new(Authorization::internal(db_conn.clone())?); + // >>>>>>> 77df3fa (stats v2) pub fn get(&self, conn_name: &str) -> Option> { self.by_name.read().get(conn_name).cloned() @@ -364,12 +366,12 @@ impl Web3Rpcs { self.by_name.read().len() } -// <<<<<<< HEAD + // <<<<<<< HEAD pub fn is_empty(&self) -> bool { self.by_name.read().is_empty() -// ======= -// Ok((connections, handle, consensus_connections_watcher)) -// >>>>>>> 77df3fa (stats v2) + // ======= + // Ok((connections, handle, consensus_connections_watcher)) + // >>>>>>> 77df3fa (stats v2) } pub fn min_head_rpcs(&self) -> usize { @@ -395,7 +397,7 @@ impl Web3Rpcs { let clone = self.clone(); let authorization = authorization.clone(); let pending_tx_id_receiver = self.pending_tx_id_receiver.clone(); - let handle = task::spawn(async move { + let handle = tokio::task::spawn(async move { // TODO: set up this future the same as the block funnel while let Ok((pending_tx_id, rpc)) = pending_tx_id_receiver.recv_async().await { let f = clone.clone().process_incoming_tx_id( @@ -418,7 +420,7 @@ impl Web3Rpcs { let connections = Arc::clone(&self); let pending_tx_sender = pending_tx_sender.clone(); - let handle = task::Builder::default() + let handle = tokio::task::Builder::default() .name("process_incoming_blocks") .spawn(async move { connections @@ -432,12 +434,14 @@ impl Web3Rpcs { if futures.is_empty() { // no transaction or block subscriptions. - let handle = task::Builder::default().name("noop").spawn(async move { - loop { - sleep(Duration::from_secs(600)).await; - // TODO: "every interval, check that the provider is still connected" - } - })?; + let handle = tokio::task::Builder::default() + .name("noop") + .spawn(async move { + loop { + sleep(Duration::from_secs(600)).await; + // TODO: "every interval, check that the provider is still connected" + } + })?; futures.push(flatten_handle(handle)); } @@ -884,11 +888,11 @@ impl Web3Rpcs { // TODO: maximum retries? right now its the total number of servers loop { -// <<<<<<< HEAD + // <<<<<<< HEAD if skip_rpcs.len() >= self.by_name.read().len() { -// ======= -// if skip_rpcs.len() == self.by_name.len() { -// >>>>>>> 77df3fa (stats v2) + // ======= + // if skip_rpcs.len() == self.by_name.len() { + // >>>>>>> 77df3fa (stats v2) break; } @@ -1159,18 +1163,18 @@ impl Web3Rpcs { request_metadata.no_servers.fetch_add(1, Ordering::Release); } -// <<<<<<< HEAD + // <<<<<<< HEAD watch_consensus_rpcs.changed().await?; watch_consensus_rpcs.borrow_and_update(); -// ======= + // ======= // TODO: i don't think this will ever happen // TODO: return a 502? if it does? // return Err(anyhow::anyhow!("no available rpcs!")); // TODO: sleep how long? // TODO: subscribe to something in ConsensusWeb3Rpcs instead sleep(Duration::from_millis(200)).await; -// >>>>>>> 77df3fa (stats v2) + // >>>>>>> 77df3fa (stats v2) continue; } @@ -1299,13 +1303,13 @@ mod tests { // TODO: why is this allow needed? does tokio::test get in the way somehow? #![allow(unused_imports)] - use std::time::{SystemTime, UNIX_EPOCH}; use super::*; use crate::rpcs::consensus::ConsensusFinder; use crate::rpcs::{blockchain::Web3ProxyBlock, provider::Web3Provider}; use ethers::types::{Block, U256}; use log::{trace, LevelFilter}; use parking_lot::RwLock; + use std::time::{SystemTime, UNIX_EPOCH}; use tokio::sync::RwLock as AsyncRwLock; #[tokio::test] diff --git a/web3_proxy/src/stats/mod.rs b/web3_proxy/src/stats/mod.rs index 99cb77df..ccdd8939 100644 --- a/web3_proxy/src/stats/mod.rs +++ b/web3_proxy/src/stats/mod.rs @@ -391,6 +391,18 @@ impl RpcQueryStats { } } + /// Only used for migration from stats_v1 to stats_v2/v3 + pub fn modify_struct( + &mut self, + response_millis: u64, + response_timestamp: i64, + backend_requests: u64 + ) { + self.response_millis = response_millis; + self.response_timestamp = response_timestamp; + self.backend_requests = backend_requests; + } + } impl StatBuffer { @@ -434,6 +446,7 @@ impl StatBuffer { stat_receiver: flume::Receiver, mut shutdown_receiver: broadcast::Receiver<()>, ) -> anyhow::Result<()> { + info!("Aggregate and save loop is running"); let mut tsdb_save_interval = interval(Duration::from_secs(self.tsdb_save_interval_seconds as u64)); let mut db_save_interval = @@ -450,6 +463,7 @@ impl StatBuffer { loop { tokio::select! { stat = stat_receiver.recv_async() => { + info!("Received stat"); // save the stat to a buffer match stat { Ok(AppStat::RpcQuery(stat)) => { @@ -476,6 +490,7 @@ impl StatBuffer { } } _ = db_save_interval.tick() => { + info!("DB save internal tick"); let db_conn = self.db_conn.as_ref().expect("db connection should always exist if there are buffered stats"); // TODO: batch saves @@ -487,6 +502,7 @@ impl StatBuffer { } } _ = tsdb_save_interval.tick() => { + info!("TSDB save internal tick"); // TODO: batch saves // TODO: better bucket names let influxdb_client = self.influxdb_client.as_ref().expect("influxdb client should always exist if there are buffered stats"); @@ -506,6 +522,7 @@ impl StatBuffer { } } x = shutdown_receiver.recv() => { + info!("shutdown signal ---"); match x { Ok(_) => { info!("stat_loop shutting down");