cargo fmt

This commit is contained in:
Bryan Stitt 2023-04-05 12:19:03 -07:00
parent 0069e76040
commit 05db94177d
9 changed files with 160 additions and 175 deletions

@ -40,7 +40,7 @@ pub struct Model {
pub max_response_bytes: u64, pub max_response_bytes: u64,
pub archive_request: bool, pub archive_request: bool,
pub origin: Option<String>, pub origin: Option<String>,
pub migrated: Option<DateTime> pub migrated: Option<DateTime>,
} }
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]

@ -14,10 +14,10 @@ mod m20221211_124002_request_method_privacy;
mod m20221213_134158_move_login_into_database; mod m20221213_134158_move_login_into_database;
mod m20230117_191358_admin_table; mod m20230117_191358_admin_table;
mod m20230119_204135_better_free_tier; mod m20230119_204135_better_free_tier;
mod m20230125_204810_stats_v2;
mod m20230130_124740_read_only_login_logic; mod m20230130_124740_read_only_login_logic;
mod m20230130_165144_prepare_admin_imitation_pre_login; mod m20230130_165144_prepare_admin_imitation_pre_login;
mod m20230215_152254_admin_trail; mod m20230215_152254_admin_trail;
mod m20230125_204810_stats_v2;
mod m20230307_002623_migrate_rpc_accounting_to_rpc_accounting_v2; mod m20230307_002623_migrate_rpc_accounting_to_rpc_accounting_v2;
pub struct Migrator; pub struct Migrator;

@ -19,10 +19,10 @@ impl MigrationTrait for Migration {
.primary_key(), .primary_key(),
) )
.col( .col(
ColumnDef::new(Admin::UserId) ColumnDef::new(Admin::UserId)
.big_unsigned() .big_unsigned()
.unique_key() .unique_key()
.not_null() .not_null(),
) )
.foreign_key( .foreign_key(
ForeignKey::create() ForeignKey::create()
@ -47,7 +47,7 @@ impl MigrationTrait for Migration {
#[derive(Iden)] #[derive(Iden)]
enum User { enum User {
Table, Table,
Id Id,
} }
#[derive(Iden)] #[derive(Iden)]

@ -19,10 +19,7 @@ impl MigrationTrait for Migration {
.primary_key(), .primary_key(),
) )
.col( .col(
ColumnDef::new(AdminTrail::Caller) ColumnDef::new(AdminTrail::Caller).big_unsigned().not_null(), // TODO: Add Foreign Key
.big_unsigned()
.not_null()
// TODO: Add Foreign Key
) )
.foreign_key( .foreign_key(
sea_query::ForeignKey::create() sea_query::ForeignKey::create()
@ -30,31 +27,21 @@ impl MigrationTrait for Migration {
.to(User::Table, User::Id), .to(User::Table, User::Id),
) )
.col( .col(
ColumnDef::new(AdminTrail::ImitatingUser) ColumnDef::new(AdminTrail::ImitatingUser).big_unsigned(), // Can be null bcs maybe we're just logging in / using endpoints that don't imitate a user
.big_unsigned() // TODO: Add Foreign Key
// Can be null bcs maybe we're just logging in / using endpoints that don't imitate a user
// TODO: Add Foreign Key
) )
.foreign_key( .foreign_key(
sea_query::ForeignKey::create() sea_query::ForeignKey::create()
.from(AdminTrail::Table, AdminTrail::ImitatingUser) .from(AdminTrail::Table, AdminTrail::ImitatingUser)
.to(User::Table, User::Id), .to(User::Table, User::Id),
) )
.col( .col(ColumnDef::new(AdminTrail::Endpoint).string().not_null())
ColumnDef::new(AdminTrail::Endpoint) .col(ColumnDef::new(AdminTrail::Payload).string().not_null())
.string()
.not_null()
)
.col(
ColumnDef::new(AdminTrail::Payload)
.string()
.not_null()
)
.col( .col(
ColumnDef::new(AdminTrail::Timestamp) ColumnDef::new(AdminTrail::Timestamp)
.timestamp() .timestamp()
.not_null() .not_null()
.extra("DEFAULT CURRENT_TIMESTAMP".to_string()) .extra("DEFAULT CURRENT_TIMESTAMP".to_string()),
) )
.to_owned(), .to_owned(),
) )
@ -78,10 +65,9 @@ enum AdminTrail {
ImitatingUser, ImitatingUser,
Endpoint, Endpoint,
Payload, Payload,
Timestamp Timestamp,
} }
/// Learn more at https://docs.rs/sea-query#iden /// Learn more at https://docs.rs/sea-query#iden
#[derive(Iden)] #[derive(Iden)]
enum User { enum User {
@ -90,4 +76,4 @@ enum User {
// Address, // Address,
// Description, // Description,
// Email, // Email,
} }

@ -12,11 +12,8 @@ impl MigrationTrait for Migration {
Table::alter() Table::alter()
.table(RpcAccounting::Table) .table(RpcAccounting::Table)
.to_owned() .to_owned()
.add_column( .add_column(ColumnDef::new(RpcAccounting::Migrated).timestamp())
ColumnDef::new(RpcAccounting::Migrated) .to_owned(),
.timestamp()
)
.to_owned()
) )
.await .await
} }
@ -27,7 +24,7 @@ impl MigrationTrait for Migration {
Table::alter() Table::alter()
.table(RpcAccounting::Table) .table(RpcAccounting::Table)
.drop_column(RpcAccounting::Migrated) .drop_column(RpcAccounting::Migrated)
.to_owned() .to_owned(),
) )
.await .await
} }
@ -76,6 +73,5 @@ enum RpcAccounting {
P90ResponseBytes, P90ResponseBytes,
P99ResponseBytes, P99ResponseBytes,
MaxResponseBytes, MaxResponseBytes,
Migrated Migrated,
} }

@ -232,23 +232,21 @@ pub fn get_query_window_seconds_from_params(
} }
pub fn get_stats_column_from_params( pub fn get_stats_column_from_params(
params: &HashMap<String, String> params: &HashMap<String, String>,
) -> Result<&str, FrontendErrorResponse> { ) -> Result<&str, FrontendErrorResponse> {
params.get("query_stats_column").map_or_else( params.get("query_stats_column").map_or_else(
|| { || Ok("frontend_requests"),
Ok("frontend_requests")
},
|query_stats_column: &String| { |query_stats_column: &String| {
// Must be one of: Otherwise respond with an error ... // Must be one of: Otherwise respond with an error ...
match query_stats_column.as_str() { match query_stats_column.as_str() {
"frontend_requests" | "frontend_requests"
"backend_requests" | | "backend_requests"
"cache_hits" | | "cache_hits"
"cache_misses" | | "cache_misses"
"no_servers" | | "no_servers"
"sum_request_bytes" | | "sum_request_bytes"
"sum_response_bytes" | | "sum_response_bytes"
"sum_response_millis" => Ok(query_stats_column), | "sum_response_millis" => Ok(query_stats_column),
_ => Err(FrontendErrorResponse::BadRequest( _ => Err(FrontendErrorResponse::BadRequest(
"Unable to parse query_stats_column. It must be one of: \ "Unable to parse query_stats_column. It must be one of: \
frontend_requests, \ frontend_requests, \
@ -258,9 +256,10 @@ pub fn get_stats_column_from_params(
no_servers, \ no_servers, \
sum_request_bytes, \ sum_request_bytes, \
sum_response_bytes, \ sum_response_bytes, \
sum_response_millis".to_string() sum_response_millis"
)) .to_string(),
)),
} }
} },
) )
} }

@ -1,5 +1,5 @@
pub mod app;
pub mod admin_queries; pub mod admin_queries;
pub mod app;
pub mod atomics; pub mod atomics;
pub mod block_number; pub mod block_number;
pub mod config; pub mod config;

@ -1,3 +1,4 @@
use super::StatType;
use crate::app::Web3ProxyApp; use crate::app::Web3ProxyApp;
use crate::frontend::errors::FrontendErrorResponse; use crate::frontend::errors::FrontendErrorResponse;
use crate::http_params::{ use crate::http_params::{
@ -22,7 +23,6 @@ use migration::{Condition, Expr, SimpleExpr};
use redis_rate_limiter::redis; use redis_rate_limiter::redis;
use redis_rate_limiter::redis::AsyncCommands; use redis_rate_limiter::redis::AsyncCommands;
use serde_json::json; use serde_json::json;
use super::StatType;
pub fn filter_query_window_seconds( pub fn filter_query_window_seconds(
query_window_seconds: u64, query_window_seconds: u64,

@ -1,4 +1,5 @@
use super::StatType; use super::StatType;
use crate::http_params::get_stats_column_from_params;
use crate::{ use crate::{
app::Web3ProxyApp, app::Web3ProxyApp,
frontend::errors::FrontendErrorResponse, frontend::errors::FrontendErrorResponse,
@ -14,6 +15,7 @@ use axum::{
Json, TypedHeader, Json, TypedHeader,
}; };
use chrono::{DateTime, FixedOffset}; use chrono::{DateTime, FixedOffset};
use entities::{rpc_accounting, rpc_key};
use fstrings::{f, format_args_f}; use fstrings::{f, format_args_f};
use hashbrown::HashMap; use hashbrown::HashMap;
use influxdb2::models::Query; use influxdb2::models::Query;
@ -22,8 +24,6 @@ use itertools::Itertools;
use log::{info, warn}; use log::{info, warn};
use serde::Serialize; use serde::Serialize;
use serde_json::{json, Number, Value}; use serde_json::{json, Number, Value};
use entities::{rpc_accounting, rpc_key};
use crate::http_params::get_stats_column_from_params;
// This type-API is extremely brittle! Make sure that the types conform 1-to-1 as defined here // This type-API is extremely brittle! Make sure that the types conform 1-to-1 as defined here
// https://docs.rs/influxdb2-structmap/0.2.0/src/influxdb2_structmap/value.rs.html#1-98 // https://docs.rs/influxdb2-structmap/0.2.0/src/influxdb2_structmap/value.rs.html#1-98
@ -96,7 +96,10 @@ pub async fn query_user_stats<'a>(
// Return a bad request if query_start == query_stop, because then the query is empty basically // Return a bad request if query_start == query_stop, because then the query is empty basically
if query_start == query_stop { if query_start == query_stop {
return Err(FrontendErrorResponse::BadRequest("Start and Stop date cannot be equal. Please specify a (different) start date.".to_owned())); return Err(FrontendErrorResponse::BadRequest(
"Start and Stop date cannot be equal. Please specify a (different) start date."
.to_owned(),
));
} }
info!("Got this far 6"); info!("Got this far 6");
@ -117,19 +120,30 @@ pub async fn query_user_stats<'a>(
// TODO: Should be taken from the config, not hardcoded ... // TODO: Should be taken from the config, not hardcoded ...
// TODO: Turn into a 500 error if bucket is not found .. // TODO: Turn into a 500 error if bucket is not found ..
// Or just unwrap or so // Or just unwrap or so
let bucket = &app.config.influxdb_bucket.clone().context("No influxdb bucket was provided")?; // "web3_proxy"; let bucket = &app
.config
.influxdb_bucket
.clone()
.context("No influxdb bucket was provided")?; // "web3_proxy";
info!("Bucket is {:?}", bucket); info!("Bucket is {:?}", bucket);
info!("Got this far 7"); info!("Got this far 7");
// , "archive_needed", "error_response" // , "archive_needed", "error_response"
let mut group_columns = vec!["chain_id", "_measurement", "_field", "_measurement", "error_response", "archive_needed"]; let mut group_columns = vec![
"chain_id",
"_measurement",
"_field",
"_measurement",
"error_response",
"archive_needed",
];
let mut filter_chain_id = "".to_string(); let mut filter_chain_id = "".to_string();
// Add to group columns the method, if we want the detailed view as well // Add to group columns the method, if we want the detailed view as well
match stat_response_type { match stat_response_type {
StatType::Detailed => { StatType::Detailed => {
group_columns.push("method"); group_columns.push("method");
}, }
_ => {} _ => {}
} }
@ -152,15 +166,16 @@ pub async fn query_user_stats<'a>(
let filter_field = match stat_response_type { let filter_field = match stat_response_type {
StatType::Aggregated => { StatType::Aggregated => {
f!(r#"|> filter(fn: (r) => r["_field"] == "{stats_column}")"#) f!(r#"|> filter(fn: (r) => r["_field"] == "{stats_column}")"#)
}, }
// TODO: Detailed should still filter it, but just "group-by" method (call it once per each method ... // TODO: Detailed should still filter it, but just "group-by" method (call it once per each method ...
// Or maybe it shouldn't filter it ... // Or maybe it shouldn't filter it ...
StatType::Detailed => { StatType::Detailed => "".to_string(),
"".to_string()
},
}; };
info!("Query start and stop are: {:?} {:?}", query_start, query_stop); info!(
"Query start and stop are: {:?} {:?}",
query_start, query_stop
);
info!("Query column parameters are: {:?}", stats_column); info!("Query column parameters are: {:?}", stats_column);
info!("Query measurement is: {:?}", measurement); info!("Query measurement is: {:?}", measurement);
info!("Filters are: {:?} {:?}", filter_field, filter_chain_id); info!("Filters are: {:?} {:?}", filter_field, filter_chain_id);
@ -209,7 +224,9 @@ pub async fn query_user_stats<'a>(
// Return a different result based on the query // Return a different result based on the query
let datapoints = match stat_response_type { let datapoints = match stat_response_type {
StatType::Aggregated => { StatType::Aggregated => {
let influx_responses: Vec<AggregatedRpcAccounting> = influxdb_client.query::<AggregatedRpcAccounting>(Some(query)).await?; let influx_responses: Vec<AggregatedRpcAccounting> = influxdb_client
.query::<AggregatedRpcAccounting>(Some(query))
.await?;
info!("Influx responses are {:?}", &influx_responses); info!("Influx responses are {:?}", &influx_responses);
for res in &influx_responses { for res in &influx_responses {
info!("Resp is: {:?}", res); info!("Resp is: {:?}", res);
@ -232,107 +249,97 @@ pub async fn query_user_stats<'a>(
influx_responses influx_responses
.into_iter() .into_iter()
.map(|x| { .map(|x| (x._time.clone(), x))
(x._time.clone(), x)
})
.into_group_map() .into_group_map()
.into_iter() .into_iter()
.map(|(group, grouped_items)| { .map(|(group, grouped_items)| {
info!("Group is: {:?}", group);
info!("Group is: {:?}", group); // Now put all the fields next to each other
// (there will be exactly one field per timestamp, but we want to arrive at a new object)
let mut out = HashMap::new();
// Could also add a timestamp
// Now put all the fields next to each other let mut archive_requests = 0;
// (there will be exactly one field per timestamp, but we want to arrive at a new object) let mut error_responses = 0;
let mut out = HashMap::new();
// Could also add a timestamp
let mut archive_requests = 0; out.insert("method".to_owned(), json!("null"));
let mut error_responses = 0;
out.insert("method".to_owned(), json!("null")); for x in grouped_items {
info!("Iterating over grouped item {:?}", x);
for x in grouped_items { let key = format!("total_{}", x._field).to_string();
info!("Iterating over grouped item {:?}", x); info!("Looking at: {:?}", key);
let key = format!("total_{}", x._field).to_string(); // Insert it once, and then fix it
info!("Looking at: {:?}", key); match out.get_mut(&key) {
Some(existing) => {
match existing {
Value::Number(old_value) => {
// unwrap will error when someone has too many credits ..
let old_value = old_value.as_i64().unwrap();
warn!("Old value is {:?}", old_value);
*existing = serde_json::Value::Number(Number::from(
old_value + x._value,
));
warn!("New value is {:?}", old_value);
}
_ => {
panic!("Should be nothing but a number")
}
};
}
None => {
warn!("Does not exist yet! Insert new!");
out.insert(key, serde_json::Value::Number(Number::from(x._value)));
}
};
// Insert it once, and then fix it if !out.contains_key("query_window_timestamp") {
match out.get_mut(&key) {
Some (existing) => {
match existing {
Value::Number(old_value) => {
// unwrap will error when someone has too many credits ..
let old_value = old_value.as_i64().unwrap();
warn!("Old value is {:?}", old_value);
*existing = serde_json::Value::Number(Number::from(old_value + x._value));
warn!("New value is {:?}", old_value);
},
_ => {panic!("Should be nothing but a number")}
};
}
None => {
warn!("Does not exist yet! Insert new!");
out.insert( out.insert(
key, "query_window_timestamp".to_owned(),
serde_json::Value::Number(Number::from(x._value)) // serde_json::Value::Number(x.time.timestamp().into())
json!(x._time.timestamp()),
); );
} }
};
if !out.contains_key("query_window_timestamp") { // Interpret archive needed as a boolean
out.insert( let archive_needed = match x.archive_needed.as_str() {
"query_window_timestamp".to_owned(), "true" => true,
// serde_json::Value::Number(x.time.timestamp().into()) "false" => false,
json!(x._time.timestamp()) _ => {
); panic!("This should never be!")
} }
};
let error_response = match x.error_response.as_str() {
"true" => true,
"false" => false,
_ => {
panic!("This should never be!")
}
};
// Interpret archive needed as a boolean // Add up to archive requests and error responses
let archive_needed = match x.archive_needed.as_str() { // TODO: Gotta double check if errors & archive is based on frontend requests, or other metrics
"true" => { if x._field == "frontend_requests" && archive_needed {
true archive_requests += x._value as u64 // This is the number of requests
},
"false" => {
false
},
_ => {
panic!("This should never be!")
} }
}; if x._field == "frontend_requests" && error_response {
let error_response = match x.error_response.as_str() { error_responses += x._value as u64
"true" => {
true
},
"false" => {
false
},
_ => {
panic!("This should never be!")
} }
};
// Add up to archive requests and error responses
// TODO: Gotta double check if errors & archive is based on frontend requests, or other metrics
if x._field == "frontend_requests" && archive_needed {
archive_requests += x._value as u64 // This is the number of requests
}
if x._field == "frontend_requests" && error_response {
error_responses += x._value as u64
} }
} out.insert("archive_request".to_owned(), json!(archive_requests));
out.insert("error_response".to_owned(), json!(error_responses));
out.insert("archive_request".to_owned(), json!(archive_requests)); json!(out)
out.insert("error_response".to_owned(), json!(error_responses)); })
.collect::<Vec<_>>()
json!(out) }
}).collect::<Vec<_>>()
},
StatType::Detailed => { StatType::Detailed => {
let influx_responses: Vec<DetailedRpcAccounting> = influxdb_client.query::<DetailedRpcAccounting>(Some(query)).await?; let influx_responses: Vec<DetailedRpcAccounting> = influxdb_client
.query::<DetailedRpcAccounting>(Some(query))
.await?;
info!("Influx responses are {:?}", &influx_responses); info!("Influx responses are {:?}", &influx_responses);
for res in &influx_responses { for res in &influx_responses {
info!("Resp is: {:?}", res); info!("Resp is: {:?}", res);
@ -341,9 +348,7 @@ pub async fn query_user_stats<'a>(
// Group by all fields together .. // Group by all fields together ..
influx_responses influx_responses
.into_iter() .into_iter()
.map(|x| { .map(|x| ((x._time.clone(), x.method.clone()), x))
((x._time.clone(), x.method.clone()), x)
})
.into_group_map() .into_group_map()
.into_iter() .into_iter()
.map(|(group, grouped_items)| { .map(|(group, grouped_items)| {
@ -367,24 +372,25 @@ pub async fn query_user_stats<'a>(
// Insert it once, and then fix it // Insert it once, and then fix it
match out.get_mut(&key) { match out.get_mut(&key) {
Some (existing) => { Some(existing) => {
match existing { match existing {
Value::Number(old_value) => { Value::Number(old_value) => {
// unwrap will error when someone has too many credits .. // unwrap will error when someone has too many credits ..
let old_value = old_value.as_i64().unwrap(); let old_value = old_value.as_i64().unwrap();
warn!("Old value is {:?}", old_value); warn!("Old value is {:?}", old_value);
*existing = serde_json::Value::Number(Number::from(old_value + x._value)); *existing = serde_json::Value::Number(Number::from(
old_value + x._value,
));
warn!("New value is {:?}", old_value); warn!("New value is {:?}", old_value);
}, }
_ => {panic!("Should be nothing but a number")} _ => {
panic!("Should be nothing but a number")
}
}; };
} }
None => { None => {
warn!("Does not exist yet! Insert new!"); warn!("Does not exist yet! Insert new!");
out.insert( out.insert(key, serde_json::Value::Number(Number::from(x._value)));
key,
serde_json::Value::Number(Number::from(x._value))
);
} }
}; };
@ -392,29 +398,21 @@ pub async fn query_user_stats<'a>(
out.insert( out.insert(
"query_window_timestamp".to_owned(), "query_window_timestamp".to_owned(),
// serde_json::Value::Number(x.time.timestamp().into()) // serde_json::Value::Number(x.time.timestamp().into())
json!(x._time.timestamp()) json!(x._time.timestamp()),
); );
} }
// Interpret archive needed as a boolean // Interpret archive needed as a boolean
let archive_needed = match x.archive_needed.as_str() { let archive_needed = match x.archive_needed.as_str() {
"true" => { "true" => true,
true "false" => false,
},
"false" => {
false
},
_ => { _ => {
panic!("This should never be!") panic!("This should never be!")
} }
}; };
let error_response = match x.error_response.as_str() { let error_response = match x.error_response.as_str() {
"true" => { "true" => true,
true "false" => false,
},
"false" => {
false
},
_ => { _ => {
panic!("This should never be!") panic!("This should never be!")
} }
@ -428,23 +426,29 @@ pub async fn query_user_stats<'a>(
if x._field == "frontend_requests" && error_response { if x._field == "frontend_requests" && error_response {
error_responses += x._value as i32 error_responses += x._value as i32
} }
} }
out.insert("archive_request".to_owned(), json!(archive_requests)); out.insert("archive_request".to_owned(), json!(archive_requests));
out.insert("error_response".to_owned(), json!(error_responses)); out.insert("error_response".to_owned(), json!(error_responses));
json!(out) json!(out)
}).collect::<Vec<_>>() })
.collect::<Vec<_>>()
} }
}; };
// I suppose archive requests could be either gathered by default (then summed up), or retrieved on a second go. // I suppose archive requests could be either gathered by default (then summed up), or retrieved on a second go.
// Same with error responses .. // Same with error responses ..
let mut response_body = HashMap::new(); let mut response_body = HashMap::new();
response_body.insert("num_items", serde_json::Value::Number(datapoints.len().into())); response_body.insert(
"num_items",
serde_json::Value::Number(datapoints.len().into()),
);
response_body.insert("result", serde_json::Value::Array(datapoints)); response_body.insert("result", serde_json::Value::Array(datapoints));
response_body.insert("query_window_seconds", serde_json::Value::Number(query_window_seconds.into())); response_body.insert(
"query_window_seconds",
serde_json::Value::Number(query_window_seconds.into()),
);
response_body.insert("query_start", serde_json::Value::Number(query_start.into())); response_body.insert("query_start", serde_json::Value::Number(query_start.into()));
response_body.insert("chain_id", serde_json::Value::Number(chain_id.into())); response_body.insert("chain_id", serde_json::Value::Number(chain_id.into()));