flush all stats

This commit is contained in:
Bryan Stitt 2023-07-19 18:41:40 -07:00
parent e6dc64f369
commit 90ffb5254a
3 changed files with 143 additions and 96 deletions

@ -92,7 +92,7 @@ tokio = { version = "1.29.1", features = ["full", "tracing"] }
tokio-console = { version = "0.1.9", optional = true }
tokio-stream = { version = "0.1.14", features = ["sync"] }
toml = "0.7.6"
tower = { version = "0.4.13", features = ["tracing"] }
tower = { version = "0.4.13", features = ["timeout", "tracing"] }
tower-http = { version = "0.4.1", features = ["cors", "sensitive-headers", "trace"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

@ -2,6 +2,7 @@ use super::{AppStat, FlushedStats, RpcQueryKey};
use crate::app::Web3ProxyJoinHandle;
use crate::caches::{RpcSecretKeyCache, UserBalanceCache};
use crate::errors::Web3ProxyResult;
use crate::frontend::authorization::RequestMetadata;
use crate::globals::global_db_conn;
use crate::stats::RpcQueryStats;
use derive_more::From;
@ -124,85 +125,10 @@ impl StatBuffer {
loop {
tokio::select! {
stat = stat_receiver.recv() => {
// trace!("Received stat");
// save the stat to a buffer
// TODO: tokio spawn this!
match stat {
Some(AppStat::RpcQuery(request_metadata)) => {
// we convert on this side of the channel so that we don't slow down the request
let stat = RpcQueryStats::try_from_metadata(request_metadata)?;
// update the latest balance
// do this BEFORE emitting any stats
let mut approximate_balance_remaining = 0.into();
// TODO: re-enable this once I know its not the cause of Polygon W3P crashing all the time
// TODO: we want to do this even if the db is down. we need to buffer if there is an outage!
if let Ok(db_conn) = global_db_conn().await {
let user_id = stat.authorization.checks.user_id;
// update the user's balance
if user_id != 0 {
// update the user's cached balance
let mut user_balance = stat.authorization.checks.latest_balance.write().await;
// TODO: move this to a helper function
user_balance.total_frontend_requests += 1;
user_balance.total_spent += stat.compute_unit_cost;
if !stat.backend_rpcs_used.is_empty() {
user_balance.total_cache_misses += 1;
}
// if paid_credits_used is true, then they were premium at the start of the request
if stat.authorization.checks.paid_credits_used {
// TODO: this lets them get a negative remaining balance. we should clear if close to 0
user_balance.total_spent_paid_credits += stat.compute_unit_cost;
// check if they still have premium
if user_balance.active_premium() {
// TODO: referall credits here? i think in the save_db section still makes sense for those
} else if let Err(err) = self.user_balance_cache.invalidate(&user_balance.user_id, &db_conn, &self.rpc_secret_key_cache).await {
// was premium, but isn't anymore due to paying for this query. clear the cache
// TODO: stop at <$0.000001 instead of negative?
warn!(?err, "unable to clear caches");
}
} else if user_balance.active_premium() {
// paid credits were not used, but now we have active premium. invalidate the caches
// TODO: this seems unliekly. should we warn if this happens so we can investigate?
if let Err(err) = self.user_balance_cache.invalidate(&user_balance.user_id, &db_conn, &self.rpc_secret_key_cache).await {
// was premium, but isn't anymore due to paying for this query. clear the cache
// TODO: stop at <$0.000001 instead of negative?
warn!(?err, "unable to clear caches");
}
}
approximate_balance_remaining = user_balance.remaining();
}
let accounting_key = stat.accounting_key(self.billing_period_seconds);
if accounting_key.is_registered() {
self.accounting_db_buffer.entry(accounting_key).or_default().add(stat.clone(), approximate_balance_remaining).await;
}
}
if self.influxdb_client.is_some() {
// TODO: round the timestamp at all?
if let Some(opt_in_timeseries_key) = stat.owned_timeseries_key() {
self.opt_in_timeseries_buffer.entry(opt_in_timeseries_key).or_default().add(stat.clone(), approximate_balance_remaining).await;
}
let global_timeseries_key = stat.global_timeseries_key();
self.global_timeseries_buffer.entry(global_timeseries_key).or_default().add(stat, approximate_balance_remaining).await;
}
}
None => {
info!("error receiving stat");
break;
}
if let Some(stat) = stat {
self._buffer_app_stat(stat).await?
} else {
break;
}
}
_ = db_save_interval.tick() => {
@ -225,14 +151,18 @@ impl StatBuffer {
Some(x) => {
trace!("flush");
let tsdb_count = self.save_tsdb_stats().await;
// fill the buffer
while let Ok(stat) = stat_receiver.try_recv() {
self._buffer_app_stat(stat).await?;
}
// flush the buffers
let tsdb_count = self.save_tsdb_stats().await;
let relational_count = self.save_relational_stats().await;
// notify
let flushed_stats = FlushedStats{ timeseries: tsdb_count, relational: relational_count};
trace!(?flushed_stats);
if let Err(err) = x.send(flushed_stats) {
error!(?flushed_stats, ?err, "unable to notify about flushed stats");
}
@ -284,6 +214,112 @@ impl StatBuffer {
Ok(())
}
async fn _buffer_app_stat(&mut self, stat: AppStat) -> Web3ProxyResult<()> {
match stat {
AppStat::RpcQuery(request_metadata) => {
self._buffer_request_metadata(request_metadata).await?;
}
}
Ok(())
}
async fn _buffer_request_metadata(
&mut self,
request_metadata: RequestMetadata,
) -> Web3ProxyResult<()> {
// we convert on this side of the channel so that we don't slow down the request
let stat = RpcQueryStats::try_from_metadata(request_metadata)?;
// update the latest balance
// do this BEFORE emitting any stats
let mut approximate_balance_remaining = 0.into();
// TODO: re-enable this once I know its not the cause of Polygon W3P crashing all the time
// TODO: we want to do this even if the db is down. we need to buffer if there is an outage!
if let Ok(db_conn) = global_db_conn().await {
let user_id = stat.authorization.checks.user_id;
// update the user's balance
if user_id != 0 {
// update the user's cached balance
let mut user_balance = stat.authorization.checks.latest_balance.write().await;
// TODO: move this to a helper function
user_balance.total_frontend_requests += 1;
user_balance.total_spent += stat.compute_unit_cost;
if !stat.backend_rpcs_used.is_empty() {
user_balance.total_cache_misses += 1;
}
// if paid_credits_used is true, then they were premium at the start of the request
if stat.authorization.checks.paid_credits_used {
// TODO: this lets them get a negative remaining balance. we should clear if close to 0
user_balance.total_spent_paid_credits += stat.compute_unit_cost;
// check if they still have premium
if user_balance.active_premium() {
// TODO: referall credits here? i think in the save_db section still makes sense for those
} else if let Err(err) = self
.user_balance_cache
.invalidate(&user_balance.user_id, &db_conn, &self.rpc_secret_key_cache)
.await
{
// was premium, but isn't anymore due to paying for this query. clear the cache
// TODO: stop at <$0.000001 instead of negative?
warn!(?err, "unable to clear caches");
}
} else if user_balance.active_premium() {
// paid credits were not used, but now we have active premium. invalidate the caches
// TODO: this seems unliekly. should we warn if this happens so we can investigate?
if let Err(err) = self
.user_balance_cache
.invalidate(&user_balance.user_id, &db_conn, &self.rpc_secret_key_cache)
.await
{
// was premium, but isn't anymore due to paying for this query. clear the cache
// TODO: stop at <$0.000001 instead of negative?
warn!(?err, "unable to clear caches");
}
}
approximate_balance_remaining = user_balance.remaining();
}
let accounting_key = stat.accounting_key(self.billing_period_seconds);
if accounting_key.is_registered() {
self.accounting_db_buffer
.entry(accounting_key)
.or_default()
.add(stat.clone(), approximate_balance_remaining)
.await;
}
}
if self.influxdb_client.is_some() {
// TODO: round the timestamp at all?
if let Some(opt_in_timeseries_key) = stat.owned_timeseries_key() {
self.opt_in_timeseries_buffer
.entry(opt_in_timeseries_key)
.or_default()
.add(stat.clone(), approximate_balance_remaining)
.await;
}
let global_timeseries_key = stat.global_timeseries_key();
self.global_timeseries_buffer
.entry(global_timeseries_key)
.or_default()
.add(stat, approximate_balance_remaining)
.await;
}
Ok(())
}
async fn save_relational_stats(&mut self) -> usize {
let mut count = 0;

@ -118,17 +118,28 @@ async fn test_multiple_proxies_stats_add_up() {
// Flush all stats here
// TODO: the test should maybe pause time so that stats definitely flush from our queries.
let flush_0_count = x_0.flush_stats().await.unwrap();
let flush_1_count = x_1.flush_stats().await.unwrap();
let flush_0_count_0 = x_0.flush_stats().await.unwrap();
let flush_1_count_0 = x_1.flush_stats().await.unwrap();
// Wait a bit
// TODO: instead of waiting a bit, make flush_stats wait until all stats are handled before returning
sleep(Duration::from_secs(5)).await;
info!("Counts 0 are: {:?}", flush_0_count);
assert_eq!(flush_0_count.relational, 1);
assert_eq!(flush_0_count.timeseries, 2);
info!("Counts 1 are: {:?}", flush_1_count);
assert_eq!(flush_1_count.relational, 1);
assert_eq!(flush_1_count.timeseries, 2);
info!("Counts 0 are: {:?}", flush_0_count_0);
assert_eq!(flush_0_count_0.relational, 1);
assert_eq!(flush_0_count_0.timeseries, 2);
info!("Counts 1 are: {:?}", flush_1_count_0);
assert_eq!(flush_1_count_0.relational, 1);
assert_eq!(flush_1_count_0.timeseries, 2);
// // no more stats should arrive
let flush_0_count_1 = x_0.flush_stats().await.unwrap();
let flush_1_count_1 = x_1.flush_stats().await.unwrap();
info!("Counts 0 are: {:?}", flush_0_count_1);
assert_eq!(flush_0_count_1.relational, 0);
assert_eq!(flush_0_count_1.timeseries, 0);
info!("Counts 1 are: {:?}", flush_1_count_0);
assert_eq!(flush_1_count_1.relational, 0);
assert_eq!(flush_1_count_1.timeseries, 0);
// get stats now
// todo!("Need to validate all the stat accounting now");
@ -145,18 +156,18 @@ async fn test_multiple_proxies_stats_add_up() {
// Get the balance
let user_0_balance_post = user_get_balance(&x_0, &r, &user_0_login).await;
let influx_stats = influx_aggregate_stats["result"].get(0).unwrap();
let mysql_stats = mysql_stats["stats"].get(0).unwrap();
assert_eq!(
user_0_balance_post.total_frontend_requests,
number_requests * 3
);
info!("Influx and mysql stats are");
info!(?influx_stats);
info!(?mysql_stats);
assert_eq!(
user_0_balance_post.total_frontend_requests,
number_requests * 3
);
assert_eq!(
mysql_stats["error_response"],
influx_stats["error_response"]