diff --git a/entities/src/rpc_accounting.rs b/entities/src/rpc_accounting.rs index 70c79ab3..df3d3e88 100644 --- a/entities/src/rpc_accounting.rs +++ b/entities/src/rpc_accounting.rs @@ -40,7 +40,7 @@ pub struct Model { pub max_response_bytes: u64, pub archive_request: bool, pub origin: Option, - pub migrated: Option + pub migrated: Option, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] diff --git a/migration/src/lib.rs b/migration/src/lib.rs index 0ada6ac7..12606a2f 100644 --- a/migration/src/lib.rs +++ b/migration/src/lib.rs @@ -14,10 +14,10 @@ mod m20221211_124002_request_method_privacy; mod m20221213_134158_move_login_into_database; mod m20230117_191358_admin_table; mod m20230119_204135_better_free_tier; +mod m20230125_204810_stats_v2; mod m20230130_124740_read_only_login_logic; mod m20230130_165144_prepare_admin_imitation_pre_login; mod m20230215_152254_admin_trail; -mod m20230125_204810_stats_v2; mod m20230307_002623_migrate_rpc_accounting_to_rpc_accounting_v2; pub struct Migrator; diff --git a/migration/src/m20230117_191358_admin_table.rs b/migration/src/m20230117_191358_admin_table.rs index 2505d314..1d11ebf3 100644 --- a/migration/src/m20230117_191358_admin_table.rs +++ b/migration/src/m20230117_191358_admin_table.rs @@ -19,10 +19,10 @@ impl MigrationTrait for Migration { .primary_key(), ) .col( - ColumnDef::new(Admin::UserId) + ColumnDef::new(Admin::UserId) .big_unsigned() .unique_key() - .not_null() + .not_null(), ) .foreign_key( ForeignKey::create() @@ -47,7 +47,7 @@ impl MigrationTrait for Migration { #[derive(Iden)] enum User { Table, - Id + Id, } #[derive(Iden)] diff --git a/migration/src/m20230215_152254_admin_trail.rs b/migration/src/m20230215_152254_admin_trail.rs index 67e386a8..c4c4d21b 100644 --- a/migration/src/m20230215_152254_admin_trail.rs +++ b/migration/src/m20230215_152254_admin_trail.rs @@ -19,10 +19,7 @@ impl MigrationTrait for Migration { .primary_key(), ) .col( - ColumnDef::new(AdminTrail::Caller) - .big_unsigned() - .not_null() - // TODO: Add Foreign Key + ColumnDef::new(AdminTrail::Caller).big_unsigned().not_null(), // TODO: Add Foreign Key ) .foreign_key( sea_query::ForeignKey::create() @@ -30,31 +27,21 @@ impl MigrationTrait for Migration { .to(User::Table, User::Id), ) .col( - ColumnDef::new(AdminTrail::ImitatingUser) - .big_unsigned() - // Can be null bcs maybe we're just logging in / using endpoints that don't imitate a user - // TODO: Add Foreign Key + ColumnDef::new(AdminTrail::ImitatingUser).big_unsigned(), // Can be null bcs maybe we're just logging in / using endpoints that don't imitate a user + // TODO: Add Foreign Key ) .foreign_key( sea_query::ForeignKey::create() .from(AdminTrail::Table, AdminTrail::ImitatingUser) .to(User::Table, User::Id), ) - .col( - ColumnDef::new(AdminTrail::Endpoint) - .string() - .not_null() - ) - .col( - ColumnDef::new(AdminTrail::Payload) - .string() - .not_null() - ) + .col(ColumnDef::new(AdminTrail::Endpoint).string().not_null()) + .col(ColumnDef::new(AdminTrail::Payload).string().not_null()) .col( ColumnDef::new(AdminTrail::Timestamp) .timestamp() .not_null() - .extra("DEFAULT CURRENT_TIMESTAMP".to_string()) + .extra("DEFAULT CURRENT_TIMESTAMP".to_string()), ) .to_owned(), ) @@ -78,10 +65,9 @@ enum AdminTrail { ImitatingUser, Endpoint, Payload, - Timestamp + Timestamp, } - /// Learn more at https://docs.rs/sea-query#iden #[derive(Iden)] enum User { @@ -90,4 +76,4 @@ enum User { // Address, // Description, // Email, -} \ No newline at end of file +} diff --git a/migration/src/m20230307_002623_migrate_rpc_accounting_to_rpc_accounting_v2.rs b/migration/src/m20230307_002623_migrate_rpc_accounting_to_rpc_accounting_v2.rs index 129fa4b3..ab468e44 100644 --- a/migration/src/m20230307_002623_migrate_rpc_accounting_to_rpc_accounting_v2.rs +++ b/migration/src/m20230307_002623_migrate_rpc_accounting_to_rpc_accounting_v2.rs @@ -12,11 +12,8 @@ impl MigrationTrait for Migration { Table::alter() .table(RpcAccounting::Table) .to_owned() - .add_column( - ColumnDef::new(RpcAccounting::Migrated) - .timestamp() - ) - .to_owned() + .add_column(ColumnDef::new(RpcAccounting::Migrated).timestamp()) + .to_owned(), ) .await } @@ -27,7 +24,7 @@ impl MigrationTrait for Migration { Table::alter() .table(RpcAccounting::Table) .drop_column(RpcAccounting::Migrated) - .to_owned() + .to_owned(), ) .await } @@ -76,6 +73,5 @@ enum RpcAccounting { P90ResponseBytes, P99ResponseBytes, MaxResponseBytes, - Migrated + Migrated, } - diff --git a/web3_proxy/src/http_params.rs b/web3_proxy/src/http_params.rs index 4c4d671a..35236465 100644 --- a/web3_proxy/src/http_params.rs +++ b/web3_proxy/src/http_params.rs @@ -232,23 +232,21 @@ pub fn get_query_window_seconds_from_params( } pub fn get_stats_column_from_params( - params: &HashMap + params: &HashMap, ) -> Result<&str, FrontendErrorResponse> { params.get("query_stats_column").map_or_else( - || { - Ok("frontend_requests") - }, + || Ok("frontend_requests"), |query_stats_column: &String| { // Must be one of: Otherwise respond with an error ... match query_stats_column.as_str() { - "frontend_requests" | - "backend_requests" | - "cache_hits" | - "cache_misses" | - "no_servers" | - "sum_request_bytes" | - "sum_response_bytes" | - "sum_response_millis" => Ok(query_stats_column), + "frontend_requests" + | "backend_requests" + | "cache_hits" + | "cache_misses" + | "no_servers" + | "sum_request_bytes" + | "sum_response_bytes" + | "sum_response_millis" => Ok(query_stats_column), _ => Err(FrontendErrorResponse::BadRequest( "Unable to parse query_stats_column. It must be one of: \ frontend_requests, \ @@ -258,9 +256,10 @@ pub fn get_stats_column_from_params( no_servers, \ sum_request_bytes, \ sum_response_bytes, \ - sum_response_millis".to_string() - )) + sum_response_millis" + .to_string(), + )), } - } + }, ) -} \ No newline at end of file +} diff --git a/web3_proxy/src/lib.rs b/web3_proxy/src/lib.rs index 83fa397b..229d5cb0 100644 --- a/web3_proxy/src/lib.rs +++ b/web3_proxy/src/lib.rs @@ -1,5 +1,5 @@ -pub mod app; pub mod admin_queries; +pub mod app; pub mod atomics; pub mod block_number; pub mod config; diff --git a/web3_proxy/src/stats/db_queries.rs b/web3_proxy/src/stats/db_queries.rs index bd6655de..247bc360 100644 --- a/web3_proxy/src/stats/db_queries.rs +++ b/web3_proxy/src/stats/db_queries.rs @@ -1,3 +1,4 @@ +use super::StatType; use crate::app::Web3ProxyApp; use crate::frontend::errors::FrontendErrorResponse; use crate::http_params::{ @@ -22,7 +23,6 @@ use migration::{Condition, Expr, SimpleExpr}; use redis_rate_limiter::redis; use redis_rate_limiter::redis::AsyncCommands; use serde_json::json; -use super::StatType; pub fn filter_query_window_seconds( query_window_seconds: u64, diff --git a/web3_proxy/src/stats/influxdb_queries.rs b/web3_proxy/src/stats/influxdb_queries.rs index 208cd0fb..4bb6d294 100644 --- a/web3_proxy/src/stats/influxdb_queries.rs +++ b/web3_proxy/src/stats/influxdb_queries.rs @@ -1,4 +1,5 @@ use super::StatType; +use crate::http_params::get_stats_column_from_params; use crate::{ app::Web3ProxyApp, frontend::errors::FrontendErrorResponse, @@ -14,6 +15,7 @@ use axum::{ Json, TypedHeader, }; use chrono::{DateTime, FixedOffset}; +use entities::{rpc_accounting, rpc_key}; use fstrings::{f, format_args_f}; use hashbrown::HashMap; use influxdb2::models::Query; @@ -22,8 +24,6 @@ use itertools::Itertools; use log::{info, warn}; use serde::Serialize; use serde_json::{json, Number, Value}; -use entities::{rpc_accounting, rpc_key}; -use crate::http_params::get_stats_column_from_params; // This type-API is extremely brittle! Make sure that the types conform 1-to-1 as defined here // https://docs.rs/influxdb2-structmap/0.2.0/src/influxdb2_structmap/value.rs.html#1-98 @@ -96,7 +96,10 @@ pub async fn query_user_stats<'a>( // Return a bad request if query_start == query_stop, because then the query is empty basically if query_start == query_stop { - return Err(FrontendErrorResponse::BadRequest("Start and Stop date cannot be equal. Please specify a (different) start date.".to_owned())); + return Err(FrontendErrorResponse::BadRequest( + "Start and Stop date cannot be equal. Please specify a (different) start date." + .to_owned(), + )); } info!("Got this far 6"); @@ -117,19 +120,30 @@ pub async fn query_user_stats<'a>( // TODO: Should be taken from the config, not hardcoded ... // TODO: Turn into a 500 error if bucket is not found .. // Or just unwrap or so - let bucket = &app.config.influxdb_bucket.clone().context("No influxdb bucket was provided")?; // "web3_proxy"; + let bucket = &app + .config + .influxdb_bucket + .clone() + .context("No influxdb bucket was provided")?; // "web3_proxy"; info!("Bucket is {:?}", bucket); info!("Got this far 7"); // , "archive_needed", "error_response" - let mut group_columns = vec!["chain_id", "_measurement", "_field", "_measurement", "error_response", "archive_needed"]; + let mut group_columns = vec![ + "chain_id", + "_measurement", + "_field", + "_measurement", + "error_response", + "archive_needed", + ]; let mut filter_chain_id = "".to_string(); // Add to group columns the method, if we want the detailed view as well match stat_response_type { StatType::Detailed => { group_columns.push("method"); - }, + } _ => {} } @@ -152,15 +166,16 @@ pub async fn query_user_stats<'a>( let filter_field = match stat_response_type { StatType::Aggregated => { f!(r#"|> filter(fn: (r) => r["_field"] == "{stats_column}")"#) - }, + } // TODO: Detailed should still filter it, but just "group-by" method (call it once per each method ... // Or maybe it shouldn't filter it ... - StatType::Detailed => { - "".to_string() - }, + StatType::Detailed => "".to_string(), }; - info!("Query start and stop are: {:?} {:?}", query_start, query_stop); + info!( + "Query start and stop are: {:?} {:?}", + query_start, query_stop + ); info!("Query column parameters are: {:?}", stats_column); info!("Query measurement is: {:?}", measurement); info!("Filters are: {:?} {:?}", filter_field, filter_chain_id); @@ -209,7 +224,9 @@ pub async fn query_user_stats<'a>( // Return a different result based on the query let datapoints = match stat_response_type { StatType::Aggregated => { - let influx_responses: Vec = influxdb_client.query::(Some(query)).await?; + let influx_responses: Vec = influxdb_client + .query::(Some(query)) + .await?; info!("Influx responses are {:?}", &influx_responses); for res in &influx_responses { info!("Resp is: {:?}", res); @@ -232,107 +249,97 @@ pub async fn query_user_stats<'a>( influx_responses .into_iter() - .map(|x| { - (x._time.clone(), x) - }) + .map(|x| (x._time.clone(), x)) .into_group_map() - .into_iter() - .map(|(group, grouped_items)| { + .into_iter() + .map(|(group, grouped_items)| { + info!("Group is: {:?}", group); - info!("Group is: {:?}", group); + // Now put all the fields next to each other + // (there will be exactly one field per timestamp, but we want to arrive at a new object) + let mut out = HashMap::new(); + // Could also add a timestamp - // Now put all the fields next to each other - // (there will be exactly one field per timestamp, but we want to arrive at a new object) - let mut out = HashMap::new(); - // Could also add a timestamp + let mut archive_requests = 0; + let mut error_responses = 0; - let mut archive_requests = 0; - let mut error_responses = 0; + out.insert("method".to_owned(), json!("null")); - out.insert("method".to_owned(), json!("null")); + for x in grouped_items { + info!("Iterating over grouped item {:?}", x); - for x in grouped_items { - info!("Iterating over grouped item {:?}", x); + let key = format!("total_{}", x._field).to_string(); + info!("Looking at: {:?}", key); - let key = format!("total_{}", x._field).to_string(); - info!("Looking at: {:?}", key); + // Insert it once, and then fix it + match out.get_mut(&key) { + Some(existing) => { + match existing { + Value::Number(old_value) => { + // unwrap will error when someone has too many credits .. + let old_value = old_value.as_i64().unwrap(); + warn!("Old value is {:?}", old_value); + *existing = serde_json::Value::Number(Number::from( + old_value + x._value, + )); + warn!("New value is {:?}", old_value); + } + _ => { + panic!("Should be nothing but a number") + } + }; + } + None => { + warn!("Does not exist yet! Insert new!"); + out.insert(key, serde_json::Value::Number(Number::from(x._value))); + } + }; - // Insert it once, and then fix it - match out.get_mut(&key) { - Some (existing) => { - match existing { - Value::Number(old_value) => { - // unwrap will error when someone has too many credits .. - let old_value = old_value.as_i64().unwrap(); - warn!("Old value is {:?}", old_value); - *existing = serde_json::Value::Number(Number::from(old_value + x._value)); - warn!("New value is {:?}", old_value); - }, - _ => {panic!("Should be nothing but a number")} - }; - } - None => { - warn!("Does not exist yet! Insert new!"); + if !out.contains_key("query_window_timestamp") { out.insert( - key, - serde_json::Value::Number(Number::from(x._value)) + "query_window_timestamp".to_owned(), + // serde_json::Value::Number(x.time.timestamp().into()) + json!(x._time.timestamp()), ); } - }; - if !out.contains_key("query_window_timestamp") { - out.insert( - "query_window_timestamp".to_owned(), - // serde_json::Value::Number(x.time.timestamp().into()) - json!(x._time.timestamp()) - ); - } + // Interpret archive needed as a boolean + let archive_needed = match x.archive_needed.as_str() { + "true" => true, + "false" => false, + _ => { + panic!("This should never be!") + } + }; + let error_response = match x.error_response.as_str() { + "true" => true, + "false" => false, + _ => { + panic!("This should never be!") + } + }; - // Interpret archive needed as a boolean - let archive_needed = match x.archive_needed.as_str() { - "true" => { - true - }, - "false" => { - false - }, - _ => { - panic!("This should never be!") + // Add up to archive requests and error responses + // TODO: Gotta double check if errors & archive is based on frontend requests, or other metrics + if x._field == "frontend_requests" && archive_needed { + archive_requests += x._value as u64 // This is the number of requests } - }; - let error_response = match x.error_response.as_str() { - "true" => { - true - }, - "false" => { - false - }, - _ => { - panic!("This should never be!") + if x._field == "frontend_requests" && error_response { + error_responses += x._value as u64 } - }; - - // Add up to archive requests and error responses - // TODO: Gotta double check if errors & archive is based on frontend requests, or other metrics - if x._field == "frontend_requests" && archive_needed { - archive_requests += x._value as u64 // This is the number of requests - } - if x._field == "frontend_requests" && error_response { - error_responses += x._value as u64 } - } + out.insert("archive_request".to_owned(), json!(archive_requests)); + out.insert("error_response".to_owned(), json!(error_responses)); - out.insert("archive_request".to_owned(), json!(archive_requests)); - out.insert("error_response".to_owned(), json!(error_responses)); - - json!(out) - }).collect::>() - - - }, + json!(out) + }) + .collect::>() + } StatType::Detailed => { - let influx_responses: Vec = influxdb_client.query::(Some(query)).await?; + let influx_responses: Vec = influxdb_client + .query::(Some(query)) + .await?; info!("Influx responses are {:?}", &influx_responses); for res in &influx_responses { info!("Resp is: {:?}", res); @@ -341,9 +348,7 @@ pub async fn query_user_stats<'a>( // Group by all fields together .. influx_responses .into_iter() - .map(|x| { - ((x._time.clone(), x.method.clone()), x) - }) + .map(|x| ((x._time.clone(), x.method.clone()), x)) .into_group_map() .into_iter() .map(|(group, grouped_items)| { @@ -367,24 +372,25 @@ pub async fn query_user_stats<'a>( // Insert it once, and then fix it match out.get_mut(&key) { - Some (existing) => { + Some(existing) => { match existing { Value::Number(old_value) => { // unwrap will error when someone has too many credits .. let old_value = old_value.as_i64().unwrap(); warn!("Old value is {:?}", old_value); - *existing = serde_json::Value::Number(Number::from(old_value + x._value)); + *existing = serde_json::Value::Number(Number::from( + old_value + x._value, + )); warn!("New value is {:?}", old_value); - }, - _ => {panic!("Should be nothing but a number")} + } + _ => { + panic!("Should be nothing but a number") + } }; } None => { warn!("Does not exist yet! Insert new!"); - out.insert( - key, - serde_json::Value::Number(Number::from(x._value)) - ); + out.insert(key, serde_json::Value::Number(Number::from(x._value))); } }; @@ -392,29 +398,21 @@ pub async fn query_user_stats<'a>( out.insert( "query_window_timestamp".to_owned(), // serde_json::Value::Number(x.time.timestamp().into()) - json!(x._time.timestamp()) + json!(x._time.timestamp()), ); } // Interpret archive needed as a boolean let archive_needed = match x.archive_needed.as_str() { - "true" => { - true - }, - "false" => { - false - }, + "true" => true, + "false" => false, _ => { panic!("This should never be!") } }; let error_response = match x.error_response.as_str() { - "true" => { - true - }, - "false" => { - false - }, + "true" => true, + "false" => false, _ => { panic!("This should never be!") } @@ -428,23 +426,29 @@ pub async fn query_user_stats<'a>( if x._field == "frontend_requests" && error_response { error_responses += x._value as i32 } - } out.insert("archive_request".to_owned(), json!(archive_requests)); out.insert("error_response".to_owned(), json!(error_responses)); json!(out) - }).collect::>() + }) + .collect::>() } }; // I suppose archive requests could be either gathered by default (then summed up), or retrieved on a second go. // Same with error responses .. let mut response_body = HashMap::new(); - response_body.insert("num_items", serde_json::Value::Number(datapoints.len().into())); + response_body.insert( + "num_items", + serde_json::Value::Number(datapoints.len().into()), + ); response_body.insert("result", serde_json::Value::Array(datapoints)); - response_body.insert("query_window_seconds", serde_json::Value::Number(query_window_seconds.into())); + response_body.insert( + "query_window_seconds", + serde_json::Value::Number(query_window_seconds.into()), + ); response_body.insert("query_start", serde_json::Value::Number(query_start.into())); response_body.insert("chain_id", serde_json::Value::Number(chain_id.into()));