Flush stats (#161)

* create buffer if mysql OR influx is set up

* this one flush should error

* it helps to set up the db in the db migration test

* comment
This commit is contained in:
Bryan Stitt 2023-07-05 19:24:21 -07:00 committed by GitHub
parent eb7b98fdbe
commit 3bbbdd5596
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 96 additions and 69 deletions

@ -51,7 +51,7 @@ use std::str::FromStr;
use std::sync::atomic::{AtomicU16, Ordering};
use std::sync::{atomic, Arc};
use std::time::Duration;
use tokio::sync::{broadcast, watch, Semaphore};
use tokio::sync::{broadcast, watch, Semaphore, oneshot};
use tokio::task::JoinHandle;
use tokio::time::{sleep, timeout};
use tracing::{error, info, trace, warn, Level};
@ -185,7 +185,7 @@ impl Web3ProxyApp {
top_config: TopConfig,
num_workers: usize,
shutdown_sender: broadcast::Sender<()>,
flush_stat_buffer_receiver: broadcast::Receiver<()>
flush_stat_buffer_receiver: flume::Receiver<oneshot::Sender<(usize, usize)>>,
) -> anyhow::Result<Web3ProxyAppSpawn> {
let stat_buffer_shutdown_receiver = shutdown_sender.subscribe();
let mut background_shutdown_receiver = shutdown_sender.subscribe();
@ -385,31 +385,27 @@ impl Web3ProxyApp {
// create a channel for receiving stats
// we do this in a channel so we don't slow down our response to the users
// stats can be saved in mysql, influxdb, both, or none
let mut stat_sender = None;
if let Some(influxdb_bucket) = top_config.app.influxdb_bucket.clone() {
if let Some(spawned_stat_buffer) = StatBuffer::try_spawn(
BILLING_PERIOD_SECONDS,
influxdb_bucket,
top_config.app.chain_id,
db_conn.clone(),
60,
influxdb_client.clone(),
Some(rpc_secret_key_cache.clone()),
Some(user_balance_cache.clone()),
stat_buffer_shutdown_receiver,
1,
flush_stat_buffer_receiver,
)? {
// since the database entries are used for accounting, we want to be sure everything is saved before exiting
important_background_handles.push(spawned_stat_buffer.background_handle);
let stat_sender = if let Some(spawned_stat_buffer) = StatBuffer::try_spawn(
BILLING_PERIOD_SECONDS,
top_config.app.chain_id,
db_conn.clone(),
60,
top_config.app.influxdb_bucket.clone(),
influxdb_client.clone(),
Some(rpc_secret_key_cache.clone()),
Some(user_balance_cache.clone()),
stat_buffer_shutdown_receiver,
1,
flush_stat_buffer_receiver,
)? {
// since the database entries are used for accounting, we want to be sure everything is saved before exiting
important_background_handles.push(spawned_stat_buffer.background_handle);
stat_sender = Some(spawned_stat_buffer.stat_sender);
}
}
if stat_sender.is_none() {
Some(spawned_stat_buffer.stat_sender)
} else {
info!("stats will not be collected");
}
None
};
// make a http shared client
// TODO: can we configure the connection pool? should we?

@ -9,9 +9,9 @@ use influxdb2::api::write::TimestampPrecision;
use migration::sea_orm::prelude::Decimal;
use migration::sea_orm::DatabaseConnection;
use std::time::Duration;
use tokio::sync::broadcast;
use tokio::sync::{broadcast, oneshot};
use tokio::time::{interval, sleep};
use tracing::{error, info, trace};
use tracing::{error, info, trace, warn};
#[derive(Debug, Default)]
pub struct BufferedRpcQueryStats {
@ -43,6 +43,7 @@ pub struct StatBuffer {
db_conn: Option<DatabaseConnection>,
db_save_interval_seconds: u32,
global_timeseries_buffer: HashMap<RpcQueryKey, BufferedRpcQueryStats>,
influxdb_bucket: Option<String>,
influxdb_client: Option<influxdb2::Client>,
opt_in_timeseries_buffer: HashMap<RpcQueryKey, BufferedRpcQueryStats>,
rpc_secret_key_cache: RpcSecretKeyCache,
@ -55,17 +56,21 @@ impl StatBuffer {
#[allow(clippy::too_many_arguments)]
pub fn try_spawn(
billing_period_seconds: i64,
bucket: String,
chain_id: u64,
db_conn: Option<DatabaseConnection>,
db_save_interval_seconds: u32,
influxdb_client: Option<influxdb2::Client>,
influxdb_bucket: Option<String>,
mut influxdb_client: Option<influxdb2::Client>,
rpc_secret_key_cache: Option<RpcSecretKeyCache>,
user_balance_cache: Option<UserBalanceCache>,
shutdown_receiver: broadcast::Receiver<()>,
tsdb_save_interval_seconds: u32,
flush_receiver: broadcast::Receiver<()>,
flush_receiver: flume::Receiver<oneshot::Sender<(usize, usize)>>,
) -> anyhow::Result<Option<SpawnedStatBuffer>> {
if influxdb_bucket.is_none() {
influxdb_client = None;
}
if db_conn.is_none() && influxdb_client.is_none() {
return Ok(None);
}
@ -73,6 +78,7 @@ impl StatBuffer {
let (stat_sender, stat_receiver) = flume::unbounded();
let timestamp_precision = TimestampPrecision::Seconds;
let mut new = Self {
accounting_db_buffer: Default::default(),
billing_period_seconds,
@ -80,6 +86,7 @@ impl StatBuffer {
db_conn,
db_save_interval_seconds,
global_timeseries_buffer: Default::default(),
influxdb_bucket,
influxdb_client,
opt_in_timeseries_buffer: Default::default(),
rpc_secret_key_cache: rpc_secret_key_cache.unwrap(),
@ -89,8 +96,9 @@ impl StatBuffer {
};
// any errors inside this task will cause the application to exit
// TODO? change this to the X and XTask pattern like the latency crate uses
let handle = tokio::spawn(async move {
new.aggregate_and_save_loop(bucket, stat_receiver, shutdown_receiver, flush_receiver)
new.aggregate_and_save_loop(stat_receiver, shutdown_receiver, flush_receiver)
.await
});
@ -99,10 +107,9 @@ impl StatBuffer {
async fn aggregate_and_save_loop(
&mut self,
bucket: String,
stat_receiver: flume::Receiver<AppStat>,
mut shutdown_receiver: broadcast::Receiver<()>,
mut flush_receiver: broadcast::Receiver<()>,
flush_receiver: flume::Receiver<oneshot::Sender<(usize, usize)>>,
) -> Web3ProxyResult<()> {
let mut tsdb_save_interval =
interval(Duration::from_secs(self.tsdb_save_interval_seconds as u64));
@ -147,22 +154,30 @@ impl StatBuffer {
}
_ = tsdb_save_interval.tick() => {
trace!("TSDB save internal tick");
let count = self.save_tsdb_stats(&bucket).await;
let count = self.save_tsdb_stats().await;
if count > 0 {
trace!("Saved {} stats to the tsdb", count);
}
}
_ = flush_receiver.recv() => {
trace!("flush");
x = flush_receiver.recv_async() => {
if let Ok(x) = x {
trace!("flush");
let count = self.save_tsdb_stats(&bucket).await;
if count > 0 {
trace!("Flushed {} stats to the tsdb", count);
}
let tsdb_count = self.save_tsdb_stats().await;
if tsdb_count > 0 {
trace!("Flushed {} stats to the tsdb", tsdb_count);
}
let count = self.save_relational_stats().await;
if count > 0 {
trace!("Flushed {} stats to the relational db", count);
let relational_count = self.save_relational_stats().await;
if relational_count > 0 {
trace!("Flushed {} stats to the relational db", relational_count);
}
if let Err(err) = x.send((tsdb_count, relational_count)) {
warn!(%tsdb_count, %relational_count, ?err, "unable to notify about flushed stats");
}
} else {
unimplemented!()
}
}
x = shutdown_receiver.recv() => {
@ -197,7 +212,7 @@ impl StatBuffer {
info!("saved {} pending relational stat(s)", saved_relational);
let saved_tsdb = self.save_tsdb_stats(&bucket).await;
let saved_tsdb = self.save_tsdb_stats().await;
info!("saved {} pending tsdb stat(s)", saved_tsdb);
@ -233,10 +248,15 @@ impl StatBuffer {
}
// TODO: bucket should be an enum so that we don't risk typos
async fn save_tsdb_stats(&mut self, bucket: &str) -> usize {
async fn save_tsdb_stats(&mut self) -> usize {
let mut count = 0;
if let Some(influxdb_client) = self.influxdb_client.as_ref() {
let influxdb_bucket = self
.influxdb_bucket
.as_ref()
.expect("if client is set, bucket must be set");
// TODO: use stream::iter properly to avoid allocating this Vec
let mut points = vec![];
@ -290,7 +310,7 @@ impl StatBuffer {
if let Err(err) = influxdb_client
.write_with_precision(
bucket,
influxdb_bucket,
stream::iter(points),
self.timestamp_precision,
)

@ -72,19 +72,15 @@ impl MigrateStatsToV2SubCommand {
None => None,
};
let (_flush_sender, flush_receiver) = broadcast::channel(1);
let (_flush_sender, flush_receiver) = flume::bounded(1);
// Spawn the stat-sender
let emitter_spawn = StatBuffer::try_spawn(
BILLING_PERIOD_SECONDS,
top_config
.app
.influxdb_bucket
.clone()
.context("No influxdb bucket was provided")?,
top_config.app.chain_id,
Some(db_conn.clone()),
30,
top_config.app.influxdb_bucket.clone(),
influxdb_client.clone(),
None,
None,

@ -12,7 +12,7 @@ use std::sync::Arc;
use std::time::Duration;
use std::{fs, thread};
use tokio::select;
use tokio::sync::broadcast;
use tokio::sync::{broadcast, oneshot};
use tokio::time::{sleep_until, Instant};
use tracing::{error, info, trace, warn};
@ -37,12 +37,12 @@ impl ProxydSubCommand {
top_config_path: PathBuf,
num_workers: usize,
) -> anyhow::Result<()> {
let (shutdown_sender, _) = broadcast::channel(1);
let (frontend_shutdown_sender, _) = broadcast::channel(1);
// TODO: i think there is a small race. if config_path changes
let frontend_port = Arc::new(self.port.into());
let prometheus_port = Arc::new(self.prometheus_port.into());
let (flush_stat_buffer_sender, _) = broadcast::channel(1);
let (_flush_stat_buffer_sender, flush_stat_buffer_receiver) = flume::bounded(1);
Self::_main(
top_config,
@ -50,8 +50,8 @@ impl ProxydSubCommand {
frontend_port,
prometheus_port,
num_workers,
shutdown_sender,
flush_stat_buffer_sender,
frontend_shutdown_sender,
flush_stat_buffer_receiver,
)
.await
}
@ -64,7 +64,7 @@ impl ProxydSubCommand {
prometheus_port: Arc<AtomicU16>,
num_workers: usize,
frontend_shutdown_sender: broadcast::Sender<()>,
flush_stat_buffer_sender: broadcast::Sender<()>,
flush_stat_buffer_receiver: flume::Receiver<oneshot::Sender<(usize, usize)>>,
) -> anyhow::Result<()> {
// tokio has code for catching ctrl+c so we use that to shut down in most cases
// frontend_shutdown_sender is currently only used in tests, but we might make a /shutdown endpoint or something
@ -85,7 +85,7 @@ impl ProxydSubCommand {
top_config.clone(),
num_workers,
app_shutdown_sender.clone(),
flush_stat_buffer_sender.subscribe(),
flush_stat_buffer_receiver,
)
.await?;

@ -20,7 +20,10 @@ use std::{sync::Arc, time::Duration};
use tokio::{
net::TcpStream,
process::Command as AsyncCommand,
sync::broadcast::{self, error::SendError},
sync::{
broadcast::{self, error::SendError},
oneshot,
},
task::JoinHandle,
time::{sleep, Instant},
};
@ -55,7 +58,7 @@ pub struct TestApp {
pub proxy_provider: Provider<Http>,
/// tell the app to flush stats to the database
flush_stat_buffer_sender: broadcast::Sender<()>,
flush_stat_buffer_sender: flume::Sender<oneshot::Sender<(usize, usize)>>,
/// tell the app to shut down (use `self.stop()`).
shutdown_sender: broadcast::Sender<()>,
@ -272,7 +275,7 @@ impl TestApp {
let frontend_port_arc = Arc::new(AtomicU16::new(0));
let prometheus_port_arc = Arc::new(AtomicU16::new(0));
let (flush_stat_buffer_sender, _flush_stat_buffer_receiver) = broadcast::channel(1);
let (flush_stat_buffer_sender, flush_stat_buffer_receiver) = flume::bounded(1);
// spawn the app
// TODO: spawn in a thread so we can run from non-async tests and so the Drop impl can wait for it to stop
@ -284,7 +287,7 @@ impl TestApp {
prometheus_port_arc,
num_workers,
shutdown_sender.clone(),
flush_stat_buffer_sender.clone(),
flush_stat_buffer_receiver,
))
};
@ -320,8 +323,15 @@ impl TestApp {
self.db.as_ref().unwrap().conn.as_ref().unwrap()
}
pub fn flush_stats(&self) {
self.flush_stat_buffer_sender.send(()).unwrap();
#[allow(unused)]
pub async fn flush_stats(&self) -> anyhow::Result<(usize, usize)> {
let (tx, rx) = oneshot::channel();
self.flush_stat_buffer_sender.send(tx)?;
let x = rx.await?;
Ok(x)
}
pub fn stop(&self) -> Result<usize, SendError<()>> {

@ -1,3 +1,3 @@
mod app;
pub mod app;
pub use self::app::TestApp;

@ -10,11 +10,13 @@ use tokio::{
};
use web3_proxy::rpcs::blockchain::ArcBlock;
// #[cfg_attr(not(feature = "tests-needing-docker"), ignore)]
#[ignore = "under construction"]
#[cfg_attr(not(feature = "tests-needing-docker"), ignore)]
#[test_log::test(tokio::test)]
async fn it_migrates_the_db() {
TestApp::spawn(true).await;
let x = TestApp::spawn(true).await;
// we call flush stats more to be sure it works than because we expect it to save any stats
x.flush_stats().await.unwrap();
}
#[test_log::test(tokio::test)]
@ -89,6 +91,9 @@ async fn it_starts_and_stops() {
assert_eq!(anvil_result, proxy_result.unwrap());
// this won't do anything since stats aren't tracked when there isn't a db
x.flush_stats().await.unwrap_err();
// most tests won't need to wait, but we should wait here to be sure all the shutdown logic works properly
x.wait().await;
}