2023-01-26 08:24:09 +03:00
use super ::StatType ;
use crate ::{
2023-02-22 07:25:02 +03:00
app ::Web3ProxyApp ,
frontend ::errors ::FrontendErrorResponse ,
http_params ::{
2023-02-22 09:04:47 +03:00
get_chain_id_from_params , get_query_start_from_params , get_query_stop_from_params ,
get_query_window_seconds_from_params , get_user_id_from_params ,
2023-02-22 07:25:02 +03:00
} ,
2023-01-26 08:24:09 +03:00
} ;
use anyhow ::Context ;
use axum ::{
headers ::{ authorization ::Bearer , Authorization } ,
2023-02-22 09:04:47 +03:00
response ::{ IntoResponse , Response } ,
Json , TypedHeader ,
2023-01-26 08:24:09 +03:00
} ;
2023-02-22 07:25:02 +03:00
use chrono ::{ DateTime , FixedOffset } ;
use fstrings ::{ f , format_args_f } ;
2023-01-26 08:24:09 +03:00
use hashbrown ::HashMap ;
2023-02-22 07:25:02 +03:00
use influxdb2 ::models ::Query ;
2023-02-22 08:28:50 +03:00
use influxdb2 ::FromDataPoint ;
2023-03-10 22:26:15 +03:00
use itertools ::Itertools ;
use log ::info ;
2023-02-22 09:04:47 +03:00
use serde ::Serialize ;
2023-03-10 22:26:15 +03:00
use serde_json ::{ json } ;
2023-01-26 08:24:09 +03:00
2023-02-22 09:04:47 +03:00
// TODO: include chain_id, method, and some other things in this struct
#[ derive(Debug, Default, FromDataPoint, Serialize) ]
2023-02-22 08:28:50 +03:00
pub struct AggregatedRpcAccounting {
field : String ,
value : f64 ,
time : DateTime < FixedOffset > ,
}
2023-01-26 08:24:09 +03:00
pub async fn query_user_stats < ' a > (
app : & ' a Web3ProxyApp ,
bearer : Option < TypedHeader < Authorization < Bearer > > > ,
params : & ' a HashMap < String , String > ,
stat_response_type : StatType ,
) -> Result < Response , FrontendErrorResponse > {
2023-03-10 22:26:15 +03:00
info! ( " Got this far 1 " ) ;
2023-01-26 08:24:09 +03:00
let db_conn = app . db_conn ( ) . context ( " query_user_stats needs a db " ) ? ;
let db_replica = app
. db_replica ( )
. context ( " query_user_stats needs a db replica " ) ? ;
2023-03-10 22:26:15 +03:00
info! ( " Got this far 2 " ) ;
2023-01-26 08:24:09 +03:00
let mut redis_conn = app
. redis_conn ( )
. await
. context ( " query_user_stats had a redis connection error " ) ?
. context ( " query_user_stats needs a redis " ) ? ;
// TODO: have a getter for this. do we need a connection pool on it?
2023-03-10 22:26:15 +03:00
info! ( " Got this far 3 " ) ;
2023-01-26 08:24:09 +03:00
let influxdb_client = app
. influxdb_client
. as_ref ( )
. context ( " query_user_stats needs an influxdb client " ) ? ;
2023-03-10 22:26:15 +03:00
info! ( " Got this far 4 " ) ;
2023-01-26 08:24:09 +03:00
// get the user id first. if it is 0, we should use a cache on the app
let user_id =
get_user_id_from_params ( & mut redis_conn , & db_conn , & db_replica , bearer , params ) . await ? ;
2023-03-10 22:26:15 +03:00
info! ( " Got this far 5 " ) ;
2023-02-22 07:25:02 +03:00
let query_window_seconds = get_query_window_seconds_from_params ( params ) ? ;
let query_start = get_query_start_from_params ( params ) ? . timestamp ( ) ;
let query_stop = get_query_stop_from_params ( params ) ? . timestamp ( ) ;
let chain_id = get_chain_id_from_params ( app , params ) ? ;
2023-03-10 22:26:15 +03:00
// query_window_seconds must be provided, and should be not 1s (?) by default ..
// Return a bad request if query_start == query_stop, because then the query is empty basically
if query_start = = query_stop {
return Err ( FrontendErrorResponse ::BadRequest ( " Start and Stop date cannot be equal. Please specify a (different) start date. " . to_owned ( ) ) ) ;
}
info! ( " Got this far 6 " ) ;
2023-02-22 07:25:02 +03:00
let measurement = if user_id = = 0 {
" global_proxy "
} else {
" opt_in_proxy "
} ;
2023-03-10 22:26:15 +03:00
// from(bucket: "dev_web3_proxy")
// |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
// |> filter(fn: (r) => r["_measurement"] == "opt_in_proxy" or r["_measurement"] == "global_proxy")
// |> filter(fn: (r) => r["_field"] == "frontend_requests" or r["_field"] == "backend_requests" or r["_field"] == "sum_request_bytes")
// |> group(columns: ["_field", "_measurement"])
// |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)
// |> yield(name: "mean")
// TODO: Should be taken from the config, not hardcoded ...
// TODO: Turn into a 500 error if bucket is not found ..
// Or just unwrap or so
let bucket = & app . config . influxdb_bucket . clone ( ) . context ( " No influxdb bucket was provided " ) ? ; // "web3_proxy";
info! ( " Bucket is {:?} " , bucket ) ;
info! ( " Got this far 7 " ) ;
2023-02-22 07:25:02 +03:00
let mut group_columns = vec! [ " _measurement " , " _field " ] ;
let mut filter_chain_id = " " . to_string ( ) ;
if chain_id = = 0 {
group_columns . push ( " chain_id " ) ;
} else {
filter_chain_id = f! ( r # "|> filter(fn: (r) => r["chain_id"] == "{chain_id}")"# ) ;
}
2023-03-10 22:26:15 +03:00
info! ( " Got this far 8 " ) ;
2023-02-22 07:25:02 +03:00
let group_columns = serde_json ::to_string ( & json! ( group_columns ) ) . unwrap ( ) ;
2023-03-10 22:26:15 +03:00
info! ( " Got this far 9 " ) ;
2023-02-22 07:25:02 +03:00
let group = match stat_response_type {
StatType ::Aggregated = > f! ( r # "|> group(columns: {group_columns})"# ) ,
StatType ::Detailed = > " " . to_string ( ) ,
} ;
2023-03-10 22:26:15 +03:00
info! ( " Got this far 10 " ) ;
2023-02-22 07:25:02 +03:00
let filter_field = match stat_response_type {
2023-03-10 22:26:15 +03:00
// StatType::Aggregated => f!(r#"|> filter(fn: (r) => r["_field"] == "frontend_requests")"#),
// Let's show all endpoints in a detailed stats
// StatType::Aggregated => "".to_string(), // f!(r#"|> filter(fn: (r) => r["_field"] == "frontend_requests")"#),
StatType ::Aggregated = > f! ( r # "|> filter(fn: (r) => r["_field"] == "frontend_requests" or r["_field"] == "backend_requests" or r["_field"] == "cache_hits" or r["_field"] == "cache_misses" or r["_field"] == "no_servers" or r["_field"] == "sum_request_bytes" or r["_field"] == "sum_response_bytes" or r["_field"] == "sum_response_millis")"# ) ,
2023-02-22 07:25:02 +03:00
StatType ::Detailed = > " " . to_string ( ) ,
} ;
2023-03-10 22:26:15 +03:00
info! ( " Query start and stop are: {:?} {:?} " , query_start , query_stop ) ;
info! ( " Query measurement is: {:?} " , measurement ) ;
info! ( " Filters are: {:?} {:?} " , filter_field , filter_chain_id ) ;
info! ( " Group is: {:?} " , group ) ;
info! ( " window seconds are: {:?} " , query_window_seconds ) ;
// These are taken care of probably ...
// reg. fields, collect: backend_requests, frontend_requests, cache_hits, cache_misses, total_request_bytes, total_response_bytes, total_response_millis
// "total_frontend_requests": "6",
// "total_response_bytes": "235",
// "total_response_millis": "0"
// "total_cache_hits": "6",
// "total_cache_misses": "0",
// Perhaps gotta run a second query to get all error responses
// "total_error_responses": "0",
// Same with archive requests
// "archive_request": 0,
// Group by method if detailed, else just keep all methods as "null". i think influxdb takes care of that
// "method": null,
// "total_backend_retries": "0",
info! ( " Got this far 11 " ) ;
2023-02-22 07:25:02 +03:00
let query = f! ( r #"
from ( bucket : " {bucket} " )
| > range ( start : { query_start } , stop : { query_stop } )
| > filter ( fn : ( r ) = > r [ " _measurement " ] = = " {measurement} " )
{ filter_field }
{ filter_chain_id }
{ group }
2023-03-10 22:26:15 +03:00
| > aggregateWindow ( every : { query_window_seconds } s , fn : sum , createEmpty : false )
| > yield ( name : " sum " )
2023-02-22 07:25:02 +03:00
" #);
2023-03-10 22:26:15 +03:00
info! ( " Raw query to db is: {:?} " , query ) ;
2023-02-22 09:04:47 +03:00
let query = Query ::new ( query . to_string ( ) ) ;
2023-03-10 22:26:15 +03:00
info! ( " Query to db is: {:?} " , query ) ;
2023-02-22 07:25:02 +03:00
2023-02-22 09:04:47 +03:00
// TODO: do not unwrap. add this error to FrontErrorResponse
// TODO: StatType::Aggregated and StatType::Detailed might need different types
2023-03-10 22:26:15 +03:00
// let unparsed: serde_json::Value = serde_json::Value::Array(influxdb_client.query(Some(query.clone())).await?);
// info!("Direct response is: {:?}", unparsed);
info! ( " Got this far 12 " ) ;
let influx_responses : Vec < AggregatedRpcAccounting > = influxdb_client . query ( Some ( query ) ) . await ? ;
info! ( " Influx responses are {:?} " , & influx_responses ) ;
for res in & influx_responses {
info! ( " Resp is: {:?} " , res ) ;
}
2023-02-22 07:25:02 +03:00
2023-03-10 22:26:15 +03:00
// Group by all fields together ..
let datapoints = influx_responses
. into_iter ( )
. group_by ( | x | {
// This looks ugly, revisit later
// x.field.clone()
( x . clone ( ) . time )
} )
. into_iter ( )
. map ( | ( group , grouped_items ) | {
// Now put all the fields next to each other
// (there will be exactly one field per timestamp, but we want to arrive at a new object)
let mut out = HashMap ::new ( ) ;
// Could also add a timestamp
for x in grouped_items {
out . insert (
f! ( r # "total_{x.field}"# ) ,
// serde_json::Value::Number(serde_json::Number::from(x.value))
json! ( x . value )
) ;
if ! out . contains_key ( " query_window_timestamp " ) {
out . insert (
" query_window_timestamp " . to_owned ( ) ,
// serde_json::Value::Number(x.time.timestamp().into())
json! ( x . time . timestamp ( ) )
) ;
}
}
json! ( out )
} ) . collect ::< Vec < _ > > ( ) ;
// I suppose archive requests could be either gathered by default (then summed up), or retrieved on a second go.
// Same with error responses ..
let mut out = HashMap ::new ( ) ;
out . insert ( " num_items " , serde_json ::Value ::Number ( datapoints . len ( ) . into ( ) ) ) ;
out . insert ( " result " , serde_json ::Value ::Array ( datapoints ) ) ;
out . insert ( " query_window_seconds " , serde_json ::Value ::Number ( query_window_seconds . into ( ) ) ) ;
out . insert ( " query_start " , serde_json ::Value ::Number ( query_start . into ( ) ) ) ;
out . insert ( " chain_id " , serde_json ::Value ::Number ( chain_id . into ( ) ) ) ;
info! ( " Got this far 13 {:?} " , out ) ;
let out = Json ( json! ( out ) ) . into_response ( ) ;
// Add the requests back into out
info! ( " Got this far 14 {:?} " , out ) ;
// TODO: Now impplement the proper response type
Ok ( out )
2023-01-26 08:24:09 +03:00
}