2023-01-26 08:24:09 +03:00
//! Store "stats" in a database for billing and a different database for graphing
2023-02-22 07:25:02 +03:00
//!
2023-01-26 08:24:09 +03:00
//! TODO: move some of these structs/functions into their own file?
pub mod db_queries ;
pub mod influxdb_queries ;
use crate ::frontend ::authorization ::{ Authorization , RequestMetadata } ;
use axum ::headers ::Origin ;
use chrono ::{ TimeZone , Utc } ;
use derive_more ::From ;
use entities ::rpc_accounting_v2 ;
use entities ::sea_orm_active_enums ::TrackingLevel ;
use futures ::stream ;
use hashbrown ::HashMap ;
use influxdb2 ::api ::write ::TimestampPrecision ;
use influxdb2 ::models ::DataPoint ;
use log ::{ error , info } ;
use migration ::sea_orm ::{ self , DatabaseConnection , EntityTrait } ;
use migration ::{ Expr , OnConflict } ;
use std ::num ::NonZeroU64 ;
use std ::sync ::atomic ::Ordering ;
use std ::sync ::Arc ;
use std ::time ::Duration ;
use tokio ::sync ::broadcast ;
use tokio ::task ::JoinHandle ;
use tokio ::time ::interval ;
pub enum StatType {
Aggregated ,
Detailed ,
}
2023-03-21 00:00:49 +03:00
// Pub is needed for migration ... I could also write a second constructor for this if needed
2023-01-26 08:24:09 +03:00
/// TODO: better name?
#[ derive(Clone, Debug) ]
pub struct RpcQueryStats {
2023-03-21 00:00:49 +03:00
pub authorization : Arc < Authorization > ,
2023-03-30 14:54:01 +03:00
pub method : Option < String > ,
2023-03-21 00:00:49 +03:00
pub archive_request : bool ,
pub error_response : bool ,
pub request_bytes : u64 ,
2023-01-26 08:24:09 +03:00
/// if backend_requests is 0, there was a cache_hit
2023-03-26 16:53:17 +03:00
// pub frontend_request: u64,
2023-03-21 00:00:49 +03:00
pub backend_requests : u64 ,
pub response_bytes : u64 ,
pub response_millis : u64 ,
pub response_timestamp : i64 ,
2023-01-26 08:24:09 +03:00
}
#[ derive(Clone, From, Hash, PartialEq, Eq) ]
2023-03-21 00:00:49 +03:00
pub struct RpcQueryKey {
2023-01-26 08:24:09 +03:00
/// unix epoch time
/// for the time series db, this is (close to) the time that the response was sent
/// for the account database, this is rounded to the week
response_timestamp : i64 ,
/// true if an archive server was needed to serve the request
archive_needed : bool ,
/// true if the response was some sort of JSONRPC error
error_response : bool ,
/// method tracking is opt-in
method : Option < String > ,
/// origin tracking is opt-in
origin : Option < Origin > ,
/// None if the public url was used
rpc_secret_key_id : Option < NonZeroU64 > ,
}
/// round the unix epoch time to the start of a period
fn round_timestamp ( timestamp : i64 , period_seconds : i64 ) -> i64 {
timestamp / period_seconds * period_seconds
}
impl RpcQueryStats {
/// rpc keys can opt into multiple levels of tracking.
/// we always need enough to handle billing, so even the "none" level still has some minimal tracking.
/// This "accounting_key" is used in the relational database.
/// anonymous users are also saved in the relational database so that the host can do their own cost accounting.
fn accounting_key ( & self , period_seconds : i64 ) -> RpcQueryKey {
let response_timestamp = round_timestamp ( self . response_timestamp , period_seconds ) ;
let rpc_secret_key_id = self . authorization . checks . rpc_secret_key_id ;
let ( method , origin ) = match self . authorization . checks . tracking_level {
TrackingLevel ::None = > {
// this RPC key requested no tracking. this is the default
// do not store the method or the origin
( None , None )
}
TrackingLevel ::Aggregated = > {
// this RPC key requested tracking aggregated across all methods and origins
// TODO: think about this more. do we want the origin or not? grouping free cost per site might be useful. i'd rather not collect things if we don't have a planned purpose though
let method = None ;
let origin = None ;
( method , origin )
}
TrackingLevel ::Detailed = > {
// detailed tracking keeps track of the method and origin
// depending on the request, the origin might still be None
2023-03-30 14:54:01 +03:00
let method = self . method . clone ( ) ;
2023-01-26 08:24:09 +03:00
let origin = self . authorization . origin . clone ( ) ;
( method , origin )
}
} ;
RpcQueryKey {
response_timestamp ,
archive_needed : self . archive_request ,
error_response : self . error_response ,
method ,
rpc_secret_key_id ,
origin ,
}
}
/// all queries are aggregated
/// TODO: should we store "anon" or "registered" as a key just to be able to split graphs?
fn global_timeseries_key ( & self ) -> RpcQueryKey {
2023-03-30 14:54:01 +03:00
let method = self . method . clone ( ) ;
2023-01-26 08:24:09 +03:00
// we don't store origin in the timeseries db. its only used for optional accounting
let origin = None ;
// everyone gets grouped together
let rpc_secret_key_id = None ;
RpcQueryKey {
response_timestamp : self . response_timestamp ,
archive_needed : self . archive_request ,
error_response : self . error_response ,
method ,
rpc_secret_key_id ,
origin ,
}
}
fn opt_in_timeseries_key ( & self ) -> RpcQueryKey {
// we don't store origin in the timeseries db. its only optionaly used for accounting
let origin = None ;
let ( method , rpc_secret_key_id ) = match self . authorization . checks . tracking_level {
TrackingLevel ::None = > {
// this RPC key requested no tracking. this is the default.
// we still want graphs though, so we just use None as the rpc_secret_key_id
2023-03-30 14:54:01 +03:00
( self . method . clone ( ) , None )
2023-01-26 08:24:09 +03:00
}
TrackingLevel ::Aggregated = > {
// this RPC key requested tracking aggregated across all methods
( None , self . authorization . checks . rpc_secret_key_id )
}
TrackingLevel ::Detailed = > {
// detailed tracking keeps track of the method
(
2023-03-30 14:54:01 +03:00
self . method . clone ( ) ,
2023-01-26 08:24:09 +03:00
self . authorization . checks . rpc_secret_key_id ,
)
}
} ;
RpcQueryKey {
response_timestamp : self . response_timestamp ,
archive_needed : self . archive_request ,
error_response : self . error_response ,
method ,
rpc_secret_key_id ,
origin ,
}
}
}
#[ derive(Default) ]
pub struct BufferedRpcQueryStats {
2023-03-21 00:00:49 +03:00
pub frontend_requests : u64 ,
pub backend_requests : u64 ,
pub backend_retries : u64 ,
pub no_servers : u64 ,
pub cache_misses : u64 ,
pub cache_hits : u64 ,
pub sum_request_bytes : u64 ,
pub sum_response_bytes : u64 ,
pub sum_response_millis : u64 ,
2023-01-26 08:24:09 +03:00
}
/// A stat that we aggregate and then store in a database.
/// For now there is just one, but I think there might be others later
#[ derive(Debug, From) ]
pub enum AppStat {
RpcQuery ( RpcQueryStats ) ,
}
#[ derive(From) ]
pub struct SpawnedStatBuffer {
pub stat_sender : flume ::Sender < AppStat > ,
/// these handles are important and must be allowed to finish
pub background_handle : JoinHandle < anyhow ::Result < ( ) > > ,
}
pub struct StatBuffer {
chain_id : u64 ,
db_conn : Option < DatabaseConnection > ,
influxdb_client : Option < influxdb2 ::Client > ,
tsdb_save_interval_seconds : u32 ,
db_save_interval_seconds : u32 ,
billing_period_seconds : i64 ,
}
impl BufferedRpcQueryStats {
fn add ( & mut self , stat : RpcQueryStats ) {
// a stat always come from just 1 frontend request
self . frontend_requests + = 1 ;
if stat . backend_requests = = 0 {
// no backend request. cache hit!
self . cache_hits + = 1 ;
} else {
// backend requests! cache miss!
self . cache_misses + = 1 ;
// a single frontend request might have multiple backend requests
self . backend_requests + = stat . backend_requests ;
}
self . sum_request_bytes + = stat . request_bytes ;
self . sum_response_bytes + = stat . response_bytes ;
self . sum_response_millis + = stat . response_millis ;
}
// TODO: take a db transaction instead so that we can batch?
async fn save_db (
self ,
chain_id : u64 ,
db_conn : & DatabaseConnection ,
key : RpcQueryKey ,
) -> anyhow ::Result < ( ) > {
let period_datetime = Utc . timestamp_opt ( key . response_timestamp as i64 , 0 ) . unwrap ( ) ;
// this is a lot of variables
let accounting_entry = rpc_accounting_v2 ::ActiveModel {
id : sea_orm ::NotSet ,
rpc_key_id : sea_orm ::Set ( key . rpc_secret_key_id . map ( Into ::into ) ) ,
origin : sea_orm ::Set ( key . origin . map ( | x | x . to_string ( ) ) ) ,
chain_id : sea_orm ::Set ( chain_id ) ,
period_datetime : sea_orm ::Set ( period_datetime ) ,
method : sea_orm ::Set ( key . method ) ,
archive_needed : sea_orm ::Set ( key . archive_needed ) ,
error_response : sea_orm ::Set ( key . error_response ) ,
frontend_requests : sea_orm ::Set ( self . frontend_requests ) ,
backend_requests : sea_orm ::Set ( self . backend_requests ) ,
backend_retries : sea_orm ::Set ( self . backend_retries ) ,
no_servers : sea_orm ::Set ( self . no_servers ) ,
cache_misses : sea_orm ::Set ( self . cache_misses ) ,
cache_hits : sea_orm ::Set ( self . cache_hits ) ,
sum_request_bytes : sea_orm ::Set ( self . sum_request_bytes ) ,
sum_response_millis : sea_orm ::Set ( self . sum_response_millis ) ,
sum_response_bytes : sea_orm ::Set ( self . sum_response_bytes ) ,
} ;
rpc_accounting_v2 ::Entity ::insert ( accounting_entry )
. on_conflict (
OnConflict ::new ( )
. values ( [
(
rpc_accounting_v2 ::Column ::FrontendRequests ,
Expr ::col ( rpc_accounting_v2 ::Column ::FrontendRequests )
. add ( self . frontend_requests ) ,
) ,
(
rpc_accounting_v2 ::Column ::BackendRequests ,
Expr ::col ( rpc_accounting_v2 ::Column ::BackendRequests )
. add ( self . backend_requests ) ,
) ,
(
rpc_accounting_v2 ::Column ::BackendRetries ,
Expr ::col ( rpc_accounting_v2 ::Column ::BackendRetries )
. add ( self . backend_retries ) ,
) ,
(
rpc_accounting_v2 ::Column ::NoServers ,
Expr ::col ( rpc_accounting_v2 ::Column ::NoServers ) . add ( self . no_servers ) ,
) ,
(
rpc_accounting_v2 ::Column ::CacheMisses ,
Expr ::col ( rpc_accounting_v2 ::Column ::CacheMisses )
. add ( self . cache_misses ) ,
) ,
(
rpc_accounting_v2 ::Column ::CacheHits ,
Expr ::col ( rpc_accounting_v2 ::Column ::CacheHits ) . add ( self . cache_hits ) ,
) ,
(
rpc_accounting_v2 ::Column ::SumRequestBytes ,
Expr ::col ( rpc_accounting_v2 ::Column ::SumRequestBytes )
. add ( self . sum_request_bytes ) ,
) ,
(
rpc_accounting_v2 ::Column ::SumResponseMillis ,
Expr ::col ( rpc_accounting_v2 ::Column ::SumResponseMillis )
. add ( self . sum_response_millis ) ,
) ,
(
rpc_accounting_v2 ::Column ::SumResponseBytes ,
Expr ::col ( rpc_accounting_v2 ::Column ::SumResponseBytes )
. add ( self . sum_response_bytes ) ,
) ,
] )
. to_owned ( ) ,
)
. exec ( db_conn )
. await ? ;
Ok ( ( ) )
}
// TODO: change this to return a DataPoint?
async fn save_timeseries (
self ,
bucket : & str ,
measurement : & str ,
chain_id : u64 ,
influxdb2_clent : & influxdb2 ::Client ,
key : RpcQueryKey ,
) -> anyhow ::Result < ( ) > {
// TODO: error if key.origin is set?
// TODO: what name?
let mut builder = DataPoint ::builder ( measurement ) ;
builder = builder . tag ( " chain_id " , chain_id . to_string ( ) ) ;
if let Some ( rpc_secret_key_id ) = key . rpc_secret_key_id {
builder = builder . tag ( " rpc_secret_key_id " , rpc_secret_key_id . to_string ( ) ) ;
}
if let Some ( method ) = key . method {
builder = builder . tag ( " method " , method ) ;
}
builder = builder
. tag ( " archive_needed " , key . archive_needed . to_string ( ) )
. tag ( " error_response " , key . error_response . to_string ( ) )
. field ( " frontend_requests " , self . frontend_requests as i64 )
. field ( " backend_requests " , self . backend_requests as i64 )
. field ( " no_servers " , self . no_servers as i64 )
. field ( " cache_misses " , self . cache_misses as i64 )
. field ( " cache_hits " , self . cache_hits as i64 )
. field ( " sum_request_bytes " , self . sum_request_bytes as i64 )
. field ( " sum_response_millis " , self . sum_response_millis as i64 )
. field ( " sum_response_bytes " , self . sum_response_bytes as i64 ) ;
builder = builder . timestamp ( key . response_timestamp ) ;
let timestamp_precision = TimestampPrecision ::Seconds ;
let points = [ builder . build ( ) ? ] ;
// TODO: bucket should be an enum so that we don't risk typos
influxdb2_clent
. write_with_precision ( bucket , stream ::iter ( points ) , timestamp_precision )
. await ? ;
Ok ( ( ) )
}
}
impl RpcQueryStats {
pub fn new (
2023-03-30 14:54:01 +03:00
method : Option < String > ,
2023-01-26 08:24:09 +03:00
authorization : Arc < Authorization > ,
metadata : Arc < RequestMetadata > ,
2023-03-25 21:26:23 +03:00
response_bytes : usize ,
2023-01-26 08:24:09 +03:00
) -> Self {
// TODO: try_unwrap the metadata to be sure that all the stats for this request have been collected
// TODO: otherwise, i think the whole thing should be in a single lock that we can "reset" when a stat is created
let archive_request = metadata . archive_request . load ( Ordering ::Acquire ) ;
let backend_requests = metadata . backend_requests . lock ( ) . len ( ) as u64 ;
let request_bytes = metadata . request_bytes ;
let error_response = metadata . error_response . load ( Ordering ::Acquire ) ;
let response_millis = metadata . start_instant . elapsed ( ) . as_millis ( ) as u64 ;
let response_bytes = response_bytes as u64 ;
let response_timestamp = Utc ::now ( ) . timestamp ( ) ;
Self {
authorization ,
archive_request ,
method ,
backend_requests ,
request_bytes ,
error_response ,
response_bytes ,
response_millis ,
response_timestamp ,
}
}
2023-03-21 00:00:49 +03:00
2023-03-25 19:56:45 +03:00
/// Only used for migration from stats_v1 to stats_v2/v3
pub fn modify_struct (
& mut self ,
response_millis : u64 ,
response_timestamp : i64 ,
2023-03-25 21:26:23 +03:00
backend_requests : u64 ,
2023-03-25 19:56:45 +03:00
) {
self . response_millis = response_millis ;
self . response_timestamp = response_timestamp ;
self . backend_requests = backend_requests ;
}
2023-01-26 08:24:09 +03:00
}
impl StatBuffer {
pub fn try_spawn (
chain_id : u64 ,
2023-03-10 22:26:15 +03:00
bucket : String ,
2023-01-26 08:24:09 +03:00
db_conn : Option < DatabaseConnection > ,
influxdb_client : Option < influxdb2 ::Client > ,
db_save_interval_seconds : u32 ,
tsdb_save_interval_seconds : u32 ,
billing_period_seconds : i64 ,
shutdown_receiver : broadcast ::Receiver < ( ) > ,
) -> anyhow ::Result < Option < SpawnedStatBuffer > > {
if db_conn . is_none ( ) & & influxdb_client . is_none ( ) {
return Ok ( None ) ;
}
let ( stat_sender , stat_receiver ) = flume ::unbounded ( ) ;
let mut new = Self {
chain_id ,
db_conn ,
influxdb_client ,
db_save_interval_seconds ,
tsdb_save_interval_seconds ,
billing_period_seconds ,
} ;
// any errors inside this task will cause the application to exit
let handle = tokio ::spawn ( async move {
2023-03-10 22:26:15 +03:00
new . aggregate_and_save_loop ( bucket , stat_receiver , shutdown_receiver )
2023-01-26 08:24:09 +03:00
. await
} ) ;
Ok ( Some ( ( stat_sender , handle ) . into ( ) ) )
}
async fn aggregate_and_save_loop (
& mut self ,
2023-03-10 22:26:15 +03:00
bucket : String ,
2023-01-26 08:24:09 +03:00
stat_receiver : flume ::Receiver < AppStat > ,
mut shutdown_receiver : broadcast ::Receiver < ( ) > ,
) -> anyhow ::Result < ( ) > {
let mut tsdb_save_interval =
interval ( Duration ::from_secs ( self . tsdb_save_interval_seconds as u64 ) ) ;
let mut db_save_interval =
interval ( Duration ::from_secs ( self . db_save_interval_seconds as u64 ) ) ;
// TODO: this is used for rpc_accounting_v2 and influxdb. give it a name to match that? "stat" of some kind?
let mut global_timeseries_buffer = HashMap ::< RpcQueryKey , BufferedRpcQueryStats > ::new ( ) ;
let mut opt_in_timeseries_buffer = HashMap ::< RpcQueryKey , BufferedRpcQueryStats > ::new ( ) ;
let mut accounting_db_buffer = HashMap ::< RpcQueryKey , BufferedRpcQueryStats > ::new ( ) ;
2023-03-18 12:15:21 +03:00
// TODO: Somewhere here we should probably be updating the balance of the user
// And also update the credits used etc. for the referred user
2023-01-26 08:24:09 +03:00
loop {
tokio ::select! {
stat = stat_receiver . recv_async ( ) = > {
2023-03-30 14:54:01 +03:00
// info!("Received stat");
2023-01-26 08:24:09 +03:00
// save the stat to a buffer
match stat {
Ok ( AppStat ::RpcQuery ( stat ) ) = > {
if self . influxdb_client . is_some ( ) {
// TODO: round the timestamp at all?
let global_timeseries_key = stat . global_timeseries_key ( ) ;
global_timeseries_buffer . entry ( global_timeseries_key ) . or_default ( ) . add ( stat . clone ( ) ) ;
let opt_in_timeseries_key = stat . opt_in_timeseries_key ( ) ;
opt_in_timeseries_buffer . entry ( opt_in_timeseries_key ) . or_default ( ) . add ( stat . clone ( ) ) ;
}
if self . db_conn . is_some ( ) {
accounting_db_buffer . entry ( stat . accounting_key ( self . billing_period_seconds ) ) . or_default ( ) . add ( stat ) ;
}
}
Err ( err ) = > {
error! ( " error receiving stat: {:?} " , err ) ;
break ;
}
}
}
_ = db_save_interval . tick ( ) = > {
2023-03-30 14:54:01 +03:00
// info!("DB save internal tick");
2023-01-26 08:24:09 +03:00
let db_conn = self . db_conn . as_ref ( ) . expect ( " db connection should always exist if there are buffered stats " ) ;
// TODO: batch saves
for ( key , stat ) in accounting_db_buffer . drain ( ) {
// TODO: i don't like passing key (which came from the stat) to the function on the stat. but it works for now
if let Err ( err ) = stat . save_db ( self . chain_id , db_conn , key ) . await {
error! ( " unable to save accounting entry! err={:?} " , err ) ;
} ;
}
}
_ = tsdb_save_interval . tick ( ) = > {
2023-03-30 14:54:01 +03:00
// info!("TSDB save internal tick");
2023-01-26 08:24:09 +03:00
// TODO: batch saves
// TODO: better bucket names
let influxdb_client = self . influxdb_client . as_ref ( ) . expect ( " influxdb client should always exist if there are buffered stats " ) ;
for ( key , stat ) in global_timeseries_buffer . drain ( ) {
// TODO: i don't like passing key (which came from the stat) to the function on the stat. but it works for now
2023-03-10 22:26:15 +03:00
if let Err ( err ) = stat . save_timeseries ( bucket . clone ( ) . as_ref ( ) , " global_proxy " , self . chain_id , influxdb_client , key ) . await {
2023-01-26 08:24:09 +03:00
error! ( " unable to save global stat! err={:?} " , err ) ;
} ;
}
for ( key , stat ) in opt_in_timeseries_buffer . drain ( ) {
// TODO: i don't like passing key (which came from the stat) to the function on the stat. but it works for now
2023-03-10 22:26:15 +03:00
if let Err ( err ) = stat . save_timeseries ( bucket . clone ( ) . as_ref ( ) , " opt_in_proxy " , self . chain_id , influxdb_client , key ) . await {
2023-01-26 08:24:09 +03:00
error! ( " unable to save opt-in stat! err={:?} " , err ) ;
} ;
}
}
x = shutdown_receiver . recv ( ) = > {
2023-03-25 19:56:45 +03:00
info! ( " shutdown signal --- " ) ;
2023-01-26 08:24:09 +03:00
match x {
Ok ( _ ) = > {
info! ( " stat_loop shutting down " ) ;
} ,
Err ( err ) = > error! ( " stat_loop shutdown receiver err={:?} " , err ) ,
}
break ;
}
}
}
// TODO: dry
if let Some ( db_conn ) = self . db_conn . as_ref ( ) {
info! (
" saving {} buffered accounting entries " ,
accounting_db_buffer . len ( ) ,
) ;
for ( key , stat ) in accounting_db_buffer . drain ( ) {
if let Err ( err ) = stat . save_db ( self . chain_id , db_conn , key ) . await {
error! (
" Unable to save accounting entry while shutting down! err={:?} " ,
err
) ;
} ;
}
}
// TODO: dry
if let Some ( influxdb_client ) = self . influxdb_client . as_ref ( ) {
info! (
" saving {} buffered global stats " ,
global_timeseries_buffer . len ( ) ,
) ;
for ( key , stat ) in global_timeseries_buffer . drain ( ) {
if let Err ( err ) = stat
2023-03-25 21:26:23 +03:00
. save_timeseries ( & bucket , " global_proxy " , self . chain_id , influxdb_client , key )
2023-01-26 08:24:09 +03:00
. await
{
error! (
" Unable to save global stat while shutting down! err={:?} " ,
err
) ;
} ;
}
info! (
" saving {} buffered opt-in stats " ,
opt_in_timeseries_buffer . len ( ) ,
) ;
for ( key , stat ) in opt_in_timeseries_buffer . drain ( ) {
if let Err ( err ) = stat
2023-03-25 21:26:23 +03:00
. save_timeseries ( & bucket , " opt_in_proxy " , self . chain_id , influxdb_client , key )
2023-01-26 08:24:09 +03:00
. await
{
error! (
" unable to save opt-in stat while shutting down! err={:?} " ,
err
) ;
} ;
}
}
info! ( " accounting and stat save loop complete " ) ;
Ok ( ( ) )
}
}