2023-05-13 01:15:32 +03:00
|
|
|
use super::{AppStat, RpcQueryKey};
|
2023-06-07 19:39:30 +03:00
|
|
|
use crate::app::{RpcSecretKeyCache, UserBalanceCache, Web3ProxyJoinHandle};
|
2023-05-31 07:26:11 +03:00
|
|
|
use crate::errors::Web3ProxyResult;
|
2023-06-17 09:14:43 +03:00
|
|
|
use crate::frontend::authorization::Balance;
|
2023-05-13 01:15:32 +03:00
|
|
|
use derive_more::From;
|
|
|
|
use futures::stream;
|
|
|
|
use hashbrown::HashMap;
|
|
|
|
use influxdb2::api::write::TimestampPrecision;
|
|
|
|
use migration::sea_orm::prelude::Decimal;
|
|
|
|
use migration::sea_orm::DatabaseConnection;
|
2023-06-17 09:14:43 +03:00
|
|
|
use parking_lot::RwLock;
|
2023-06-07 19:39:30 +03:00
|
|
|
use std::sync::Arc;
|
2023-05-13 01:15:32 +03:00
|
|
|
use std::time::Duration;
|
2023-06-17 09:14:43 +03:00
|
|
|
use tokio::sync::broadcast;
|
2023-05-13 01:15:32 +03:00
|
|
|
use tokio::time::interval;
|
2023-06-24 02:28:45 +03:00
|
|
|
use tracing::{error, info, trace};
|
2023-05-13 01:15:32 +03:00
|
|
|
|
|
|
|
#[derive(Debug, Default)]
|
|
|
|
pub struct BufferedRpcQueryStats {
|
|
|
|
pub frontend_requests: u64,
|
|
|
|
pub backend_requests: u64,
|
|
|
|
pub backend_retries: u64,
|
|
|
|
pub no_servers: u64,
|
|
|
|
pub cache_misses: u64,
|
|
|
|
pub cache_hits: u64,
|
|
|
|
pub sum_request_bytes: u64,
|
|
|
|
pub sum_response_bytes: u64,
|
|
|
|
pub sum_response_millis: u64,
|
|
|
|
pub sum_credits_used: Decimal,
|
2023-06-24 10:48:56 +03:00
|
|
|
pub sum_cu_used: Decimal,
|
2023-06-17 09:14:43 +03:00
|
|
|
/// The user's balance at this point in time. Multiple queries might be modifying it at once.
|
|
|
|
pub latest_balance: Arc<RwLock<Balance>>,
|
2023-05-13 01:15:32 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(From)]
|
|
|
|
pub struct SpawnedStatBuffer {
|
2023-05-13 21:13:02 +03:00
|
|
|
pub stat_sender: flume::Sender<AppStat>,
|
2023-05-13 01:15:32 +03:00
|
|
|
/// these handles are important and must be allowed to finish
|
2023-05-24 00:40:34 +03:00
|
|
|
pub background_handle: Web3ProxyJoinHandle<()>,
|
2023-05-13 01:15:32 +03:00
|
|
|
}
|
|
|
|
pub struct StatBuffer {
|
|
|
|
accounting_db_buffer: HashMap<RpcQueryKey, BufferedRpcQueryStats>,
|
|
|
|
billing_period_seconds: i64,
|
|
|
|
chain_id: u64,
|
|
|
|
db_conn: Option<DatabaseConnection>,
|
|
|
|
db_save_interval_seconds: u32,
|
|
|
|
global_timeseries_buffer: HashMap<RpcQueryKey, BufferedRpcQueryStats>,
|
|
|
|
influxdb_client: Option<influxdb2::Client>,
|
|
|
|
opt_in_timeseries_buffer: HashMap<RpcQueryKey, BufferedRpcQueryStats>,
|
2023-06-07 19:39:30 +03:00
|
|
|
rpc_secret_key_cache: RpcSecretKeyCache,
|
|
|
|
user_balance_cache: UserBalanceCache,
|
2023-05-13 01:15:32 +03:00
|
|
|
timestamp_precision: TimestampPrecision,
|
|
|
|
tsdb_save_interval_seconds: u32,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl StatBuffer {
|
|
|
|
#[allow(clippy::too_many_arguments)]
|
|
|
|
pub fn try_spawn(
|
|
|
|
billing_period_seconds: i64,
|
|
|
|
bucket: String,
|
|
|
|
chain_id: u64,
|
|
|
|
db_conn: Option<DatabaseConnection>,
|
|
|
|
db_save_interval_seconds: u32,
|
|
|
|
influxdb_client: Option<influxdb2::Client>,
|
|
|
|
rpc_secret_key_cache: Option<RpcSecretKeyCache>,
|
2023-06-07 19:39:30 +03:00
|
|
|
user_balance_cache: Option<UserBalanceCache>,
|
2023-05-13 01:15:32 +03:00
|
|
|
shutdown_receiver: broadcast::Receiver<()>,
|
|
|
|
tsdb_save_interval_seconds: u32,
|
|
|
|
) -> anyhow::Result<Option<SpawnedStatBuffer>> {
|
|
|
|
if db_conn.is_none() && influxdb_client.is_none() {
|
|
|
|
return Ok(None);
|
|
|
|
}
|
|
|
|
|
2023-05-13 21:13:02 +03:00
|
|
|
let (stat_sender, stat_receiver) = flume::unbounded();
|
2023-05-13 01:15:32 +03:00
|
|
|
|
|
|
|
let timestamp_precision = TimestampPrecision::Seconds;
|
|
|
|
let mut new = Self {
|
|
|
|
accounting_db_buffer: Default::default(),
|
|
|
|
billing_period_seconds,
|
|
|
|
chain_id,
|
|
|
|
db_conn,
|
|
|
|
db_save_interval_seconds,
|
|
|
|
global_timeseries_buffer: Default::default(),
|
|
|
|
influxdb_client,
|
|
|
|
opt_in_timeseries_buffer: Default::default(),
|
2023-06-07 19:39:30 +03:00
|
|
|
rpc_secret_key_cache: rpc_secret_key_cache.unwrap(),
|
|
|
|
user_balance_cache: user_balance_cache.unwrap(),
|
2023-05-13 01:15:32 +03:00
|
|
|
timestamp_precision,
|
|
|
|
tsdb_save_interval_seconds,
|
|
|
|
};
|
|
|
|
|
|
|
|
// any errors inside this task will cause the application to exit
|
|
|
|
let handle = tokio::spawn(async move {
|
|
|
|
new.aggregate_and_save_loop(bucket, stat_receiver, shutdown_receiver)
|
|
|
|
.await
|
|
|
|
});
|
|
|
|
|
|
|
|
Ok(Some((stat_sender, handle).into()))
|
|
|
|
}
|
|
|
|
|
|
|
|
async fn aggregate_and_save_loop(
|
|
|
|
&mut self,
|
|
|
|
bucket: String,
|
2023-05-13 21:13:02 +03:00
|
|
|
stat_receiver: flume::Receiver<AppStat>,
|
2023-05-13 01:15:32 +03:00
|
|
|
mut shutdown_receiver: broadcast::Receiver<()>,
|
2023-05-24 00:40:34 +03:00
|
|
|
) -> Web3ProxyResult<()> {
|
2023-05-13 01:15:32 +03:00
|
|
|
let mut tsdb_save_interval =
|
|
|
|
interval(Duration::from_secs(self.tsdb_save_interval_seconds as u64));
|
|
|
|
let mut db_save_interval =
|
|
|
|
interval(Duration::from_secs(self.db_save_interval_seconds as u64));
|
|
|
|
|
|
|
|
loop {
|
|
|
|
tokio::select! {
|
2023-05-13 21:13:02 +03:00
|
|
|
stat = stat_receiver.recv_async() => {
|
|
|
|
// trace!("Received stat");
|
2023-05-13 01:15:32 +03:00
|
|
|
// save the stat to a buffer
|
|
|
|
match stat {
|
2023-05-13 21:13:02 +03:00
|
|
|
Ok(AppStat::RpcQuery(stat)) => {
|
2023-05-13 01:15:32 +03:00
|
|
|
if self.influxdb_client.is_some() {
|
|
|
|
// TODO: round the timestamp at all?
|
|
|
|
|
|
|
|
let global_timeseries_key = stat.global_timeseries_key();
|
|
|
|
|
|
|
|
self.global_timeseries_buffer.entry(global_timeseries_key).or_default().add(stat.clone());
|
|
|
|
|
2023-06-19 23:00:57 +03:00
|
|
|
if let Some(opt_in_timeseries_key) = stat.owned_timeseries_key() {
|
2023-05-13 01:15:32 +03:00
|
|
|
self.opt_in_timeseries_buffer.entry(opt_in_timeseries_key).or_default().add(stat.clone());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if self.db_conn.is_some() {
|
|
|
|
self.accounting_db_buffer.entry(stat.accounting_key(self.billing_period_seconds)).or_default().add(stat);
|
|
|
|
}
|
|
|
|
}
|
2023-05-13 21:13:02 +03:00
|
|
|
Err(err) => {
|
|
|
|
info!("error receiving stat: {}", err);
|
2023-05-13 01:15:32 +03:00
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
_ = db_save_interval.tick() => {
|
2023-06-08 21:46:38 +03:00
|
|
|
trace!("DB save internal tick");
|
2023-05-13 01:15:32 +03:00
|
|
|
let count = self.save_relational_stats().await;
|
|
|
|
if count > 0 {
|
|
|
|
trace!("Saved {} stats to the relational db", count);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
_ = tsdb_save_interval.tick() => {
|
2023-06-08 21:46:38 +03:00
|
|
|
trace!("TSDB save internal tick");
|
2023-05-13 01:15:32 +03:00
|
|
|
let count = self.save_tsdb_stats(&bucket).await;
|
|
|
|
if count > 0 {
|
|
|
|
trace!("Saved {} stats to the tsdb", count);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
x = shutdown_receiver.recv() => {
|
|
|
|
match x {
|
|
|
|
Ok(_) => {
|
|
|
|
info!("stat_loop shutting down");
|
|
|
|
},
|
|
|
|
Err(err) => error!("stat_loop shutdown receiver err={:?}", err),
|
|
|
|
}
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
let saved_relational = self.save_relational_stats().await;
|
|
|
|
|
|
|
|
info!("saved {} pending relational stat(s)", saved_relational);
|
|
|
|
|
|
|
|
let saved_tsdb = self.save_tsdb_stats(&bucket).await;
|
|
|
|
|
|
|
|
info!("saved {} pending tsdb stat(s)", saved_tsdb);
|
|
|
|
|
|
|
|
info!("accounting and stat save loop complete");
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
async fn save_relational_stats(&mut self) -> usize {
|
|
|
|
let mut count = 0;
|
|
|
|
|
|
|
|
if let Some(db_conn) = self.db_conn.as_ref() {
|
|
|
|
count = self.accounting_db_buffer.len();
|
|
|
|
for (key, stat) in self.accounting_db_buffer.drain() {
|
|
|
|
// TODO: batch saves
|
|
|
|
// TODO: i don't like passing key (which came from the stat) to the function on the stat. but it works for now
|
|
|
|
if let Err(err) = stat
|
|
|
|
.save_db(
|
|
|
|
self.chain_id,
|
|
|
|
db_conn,
|
|
|
|
key,
|
2023-06-07 19:39:30 +03:00
|
|
|
&self.rpc_secret_key_cache,
|
|
|
|
&self.user_balance_cache,
|
2023-05-13 01:15:32 +03:00
|
|
|
)
|
|
|
|
.await
|
|
|
|
{
|
|
|
|
error!("unable to save accounting entry! err={:?}", err);
|
|
|
|
};
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
count
|
|
|
|
}
|
|
|
|
|
|
|
|
// TODO: bucket should be an enum so that we don't risk typos
|
|
|
|
async fn save_tsdb_stats(&mut self, bucket: &str) -> usize {
|
|
|
|
let mut count = 0;
|
|
|
|
|
|
|
|
if let Some(influxdb_client) = self.influxdb_client.as_ref() {
|
|
|
|
// TODO: use stream::iter properly to avoid allocating this Vec
|
|
|
|
let mut points = vec![];
|
|
|
|
|
|
|
|
for (key, stat) in self.global_timeseries_buffer.drain() {
|
|
|
|
// TODO: i don't like passing key (which came from the stat) to the function on the stat. but it works for now
|
|
|
|
match stat
|
|
|
|
.build_timeseries_point("global_proxy", self.chain_id, key)
|
|
|
|
.await
|
|
|
|
{
|
|
|
|
Ok(point) => {
|
|
|
|
points.push(point);
|
|
|
|
}
|
|
|
|
Err(err) => {
|
|
|
|
error!("unable to build global stat! err={:?}", err);
|
|
|
|
}
|
|
|
|
};
|
|
|
|
}
|
|
|
|
|
|
|
|
for (key, stat) in self.opt_in_timeseries_buffer.drain() {
|
|
|
|
// TODO: i don't like passing key (which came from the stat) to the function on the stat. but it works for now
|
|
|
|
match stat
|
|
|
|
.build_timeseries_point("opt_in_proxy", self.chain_id, key)
|
|
|
|
.await
|
|
|
|
{
|
|
|
|
Ok(point) => {
|
|
|
|
points.push(point);
|
|
|
|
}
|
|
|
|
Err(err) => {
|
|
|
|
// TODO: if this errors, we throw away some of the pending stats! we should probably buffer them somewhere to be tried again
|
|
|
|
error!("unable to build opt-in stat! err={:?}", err);
|
|
|
|
}
|
|
|
|
};
|
|
|
|
}
|
|
|
|
|
|
|
|
count = points.len();
|
|
|
|
|
|
|
|
if count > 0 {
|
|
|
|
// TODO: put max_batch_size in config?
|
|
|
|
// TODO: i think the real limit is the byte size of the http request. so, a simple line count won't work very well
|
|
|
|
let max_batch_size = 100;
|
|
|
|
|
|
|
|
let mut num_left = count;
|
|
|
|
|
|
|
|
while num_left > 0 {
|
|
|
|
let batch_size = num_left.min(max_batch_size);
|
|
|
|
|
2023-05-18 10:43:33 +03:00
|
|
|
// TODO: there has to be a better way to chunk this up. chunk on the stream with the stream being an iter?
|
2023-05-13 01:15:32 +03:00
|
|
|
let p = points.split_off(batch_size);
|
|
|
|
|
|
|
|
num_left -= batch_size;
|
|
|
|
|
|
|
|
if let Err(err) = influxdb_client
|
2023-05-18 10:43:33 +03:00
|
|
|
.write_with_precision(
|
|
|
|
bucket,
|
|
|
|
stream::iter(points),
|
|
|
|
self.timestamp_precision,
|
|
|
|
)
|
2023-05-13 01:15:32 +03:00
|
|
|
.await
|
|
|
|
{
|
|
|
|
// TODO: if this errors, we throw away some of the pending stats! we should probably buffer them somewhere to be tried again
|
|
|
|
error!("unable to save {} tsdb stats! err={:?}", batch_size, err);
|
|
|
|
}
|
2023-05-18 10:43:33 +03:00
|
|
|
|
|
|
|
points = p;
|
2023-05-13 01:15:32 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
count
|
|
|
|
}
|
|
|
|
}
|