diff --git a/web3_proxy/src/app_stats.rs b/web3_proxy/src/app_stats.rs index 7cd286dd..a3b0d3de 100644 --- a/web3_proxy/src/app_stats.rs +++ b/web3_proxy/src/app_stats.rs @@ -354,7 +354,7 @@ impl StatEmitter { } } - info!("saving {} stats", response_aggregate_map.len()); + info!("saving {} pending stats", response_aggregate_map.len()); for (key, aggregate) in response_aggregate_map.drain() { if let Err(err) = aggregate diff --git a/web3_proxy/src/frontend/mod.rs b/web3_proxy/src/frontend/mod.rs index 3a6ecd1e..a636e1ba 100644 --- a/web3_proxy/src/frontend/mod.rs +++ b/web3_proxy/src/frontend/mod.rs @@ -1,7 +1,7 @@ //! `frontend` contains HTTP and websocket endpoints for use by users and admins. pub mod authorization; -mod errors; +pub mod errors; // TODO: these are only public so docs are generated. What's a better way to do this? pub mod rpc_proxy_http; pub mod rpc_proxy_ws; diff --git a/web3_proxy/src/frontend/users.rs b/web3_proxy/src/frontend/users.rs index d7e5294f..ba8711b6 100644 --- a/web3_proxy/src/frontend/users.rs +++ b/web3_proxy/src/frontend/users.rs @@ -3,12 +3,10 @@ use super::authorization::{login_is_authorized, RpcSecretKey}; use super::errors::FrontendResult; use crate::app::Web3ProxyApp; -use crate::frontend::errors::FrontendErrorResponse; +use crate::user_queries::get_page_from_params; use crate::user_queries::{ - get_chain_id_from_params, get_query_start_from_params, get_query_window_seconds_from_params, - get_user_id_from_params, + get_chain_id_from_params, get_query_start_from_params, query_user_stats, StatResponse, }; -use crate::user_queries::{get_detailed_rpc_stats_for_params, get_page_from_params}; use crate::user_token::UserBearerToken; use anyhow::Context; use axum::headers::{Header, Origin, Referer, UserAgent}; @@ -20,17 +18,16 @@ use axum::{ }; use axum_client_ip::ClientIp; use axum_macros::debug_handler; -use entities::{revert_log, rpc_accounting, rpc_key, user}; +use entities::{revert_log, rpc_key, user}; use ethers::{prelude::Address, types::Bytes}; use hashbrown::HashMap; use http::{HeaderValue, StatusCode}; use ipnet::IpNet; use itertools::Itertools; -use migration::{Condition, Expr, SimpleExpr}; use redis_rate_limiter::redis::AsyncCommands; use sea_orm::{ ActiveModelTrait, ColumnTrait, EntityTrait, PaginatorTrait, QueryFilter, QueryOrder, - QuerySelect, TransactionTrait, + TransactionTrait, }; use serde::Deserialize; use serde_json::json; @@ -734,6 +731,19 @@ pub async fn user_revert_logs_get( Ok(Json(response).into_response()) } +/// `GET /user/stats/aggregate` -- Public endpoint for aggregate stats such as bandwidth used and methods requested. +#[debug_handler] +#[instrument(level = "trace")] +pub async fn user_stats_aggregate_get( + Extension(app): Extension>, + bearer: Option>>, + Query(params): Query>, +) -> FrontendResult { + let response = query_user_stats(&app, bearer, ¶ms, StatResponse::Aggregate).await?; + + Ok(Json(response).into_response()) +} + /// `GET /user/stats/detailed` -- Use a bearer token to get the user's key stats such as bandwidth used and methods requested. /// /// If no bearer is provided, detailed stats for all users will be shown. @@ -750,172 +760,7 @@ pub async fn user_stats_detailed_get( bearer: Option>>, Query(params): Query>, ) -> FrontendResult { - let x = get_detailed_rpc_stats_for_params(&app, bearer, params).await?; - - Ok(Json(x).into_response()) -} - -/// `GET /user/stats/aggregate` -- Public endpoint for aggregate stats such as bandwidth used and methods requested. -#[debug_handler] -#[instrument(level = "trace")] -pub async fn user_stats_aggregate_get( - bearer: Option>>, - Extension(app): Extension>, - Query(params): Query>, -) -> FrontendResult { - let db_conn = app.db_conn().context("connecting to db")?; - let redis_conn = app.redis_conn().await.context("connecting to redis")?; - - let mut response = HashMap::new(); - - let q = rpc_accounting::Entity::find() - .select_only() - .column_as( - rpc_accounting::Column::FrontendRequests.sum(), - "total_requests", - ) - .column_as( - rpc_accounting::Column::BackendRequests.sum(), - "total_backend_retries", - ) - .column_as( - rpc_accounting::Column::CacheMisses.sum(), - "total_cache_misses", - ) - .column_as(rpc_accounting::Column::CacheHits.sum(), "total_cache_hits") - .column_as( - rpc_accounting::Column::SumResponseBytes.sum(), - "total_response_bytes", - ) - .column_as( - // TODO: can we sum bools like this? - rpc_accounting::Column::ErrorResponse.sum(), - "total_error_responses", - ) - .column_as( - rpc_accounting::Column::SumResponseMillis.sum(), - "total_response_millis", - ); - - let condition = Condition::all(); - - // TODO: DRYer! move this onto query_window_seconds_from_params? - let query_window_seconds = get_query_window_seconds_from_params(¶ms)?; - let q = if query_window_seconds == 0 { - // TODO: order by more than this? - // query_window_seconds is not set so we aggregate all records - // TODO: i am pretty sure we need to filter by something - q - } else { - // TODO: is there a better way to do this? how can we get "period_datetime" into this with types? - // TODO: how can we get the first window to start at query_start_timestamp - let expr = Expr::cust_with_values( - "FLOOR(UNIX_TIMESTAMP(rpc_accounting.period_datetime) / ?) * ?", - [query_window_seconds, query_window_seconds], - ); - - response.insert( - "query_window_seconds", - serde_json::Value::Number(query_window_seconds.into()), - ); - - q.column_as(expr, "query_window") - .group_by(Expr::cust("query_window")) - // TODO: is there a simpler way to order_by? - .order_by_asc(SimpleExpr::Custom("query_window".to_string())) - }; - - // aggregate stats after query_start - // TODO: minimum query_start of 90 days? - let query_start = get_query_start_from_params(¶ms)?; - // TODO: if no query_start, don't add to response or condition - response.insert( - "query_start", - serde_json::Value::Number(query_start.timestamp().into()), - ); - let condition = condition.add(rpc_accounting::Column::PeriodDatetime.gte(query_start)); - - // filter on chain_id - let chain_id = get_chain_id_from_params(&app, ¶ms)?; - let (condition, q) = if chain_id == 0 { - // fetch all the chains. don't filter or aggregate - (condition, q) - } else { - let condition = condition.add(rpc_accounting::Column::ChainId.eq(chain_id)); - - response.insert("chain_id", serde_json::Value::Number(chain_id.into())); - - (condition, q) - }; - - // filter on rpc_key_id - // TODO: move getting the param and checking the bearer token into a helper function - let (condition, q) = if let Some(rpc_key_id) = params.get("rpc_key_id") { - let rpc_key_id = rpc_key_id.parse::().map_err(|e| { - FrontendErrorResponse::StatusCode( - StatusCode::BAD_REQUEST, - "Unable to parse rpc_key_id".to_string(), - e.into(), - ) - })?; - - if rpc_key_id == 0 { - (condition, q) - } else { - // TODO: make sure that the bearer token is allowed to view this rpc_key_id - let q = q.group_by(rpc_accounting::Column::RpcKeyId); - - let condition = condition.add(rpc_accounting::Column::RpcKeyId.eq(rpc_key_id)); - - response.insert("rpc_key_id", serde_json::Value::Number(rpc_key_id.into())); - - (condition, q) - } - } else { - (condition, q) - }; - - // get_user_id_from_params checks that the bearer is connected to this user_id - let user_id = get_user_id_from_params(redis_conn, bearer, ¶ms).await?; - let (condition, q) = if user_id == 0 { - // 0 means everyone. don't filter on user - (condition, q) - } else { - let q = q.left_join(rpc_key::Entity); - - let condition = condition.add(rpc_key::Column::UserId.eq(user_id)); - - response.insert("user_id", serde_json::Value::Number(user_id.into())); - - (condition, q) - }; - - // now that all the conditions are set up. add them to the query - let q = q.filter(condition); - - // TODO: trace log query here? i think sea orm has a useful log level for this - - // set up pagination - let page = get_page_from_params(¶ms)?; - response.insert("page", serde_json::to_value(page).expect("can't fail")); - - // TODO: page size from param with a max from the config - let page_size = 200; - response.insert( - "page_size", - serde_json::to_value(page_size).expect("can't fail"), - ); - - // query the database - let aggregate = q - .into_json() - .paginate(&db_conn, page_size) - .fetch_page(page) - // TODO: timeouts here? or are they already set up on the connection - .await?; - - // add the query response to the response - response.insert("aggregate", serde_json::Value::Array(aggregate)); + let response = query_user_stats(&app, bearer, ¶ms, StatResponse::Detailed).await?; Ok(Json(response).into_response()) } diff --git a/web3_proxy/src/user_queries.rs b/web3_proxy/src/user_queries.rs index 72c1a849..9b92c472 100644 --- a/web3_proxy/src/user_queries.rs +++ b/web3_proxy/src/user_queries.rs @@ -1,3 +1,5 @@ +use crate::frontend::errors::FrontendErrorResponse; +use crate::{app::Web3ProxyApp, user_token::UserBearerToken}; use anyhow::Context; use axum::{ headers::{authorization::Bearer, Authorization}, @@ -6,17 +8,14 @@ use axum::{ use chrono::NaiveDateTime; use entities::{rpc_accounting, rpc_key}; use hashbrown::HashMap; -use migration::Expr; -use num::Zero; +use http::StatusCode; +use migration::{Condition, Expr, SimpleExpr}; use redis_rate_limiter::{redis::AsyncCommands, RedisConnection}; use sea_orm::{ - ColumnTrait, Condition, EntityTrait, JoinType, PaginatorTrait, QueryFilter, QueryOrder, - QuerySelect, RelationTrait, + ColumnTrait, EntityTrait, PaginatorTrait, QueryFilter, QueryOrder, QuerySelect, Select, }; use tracing::{instrument, warn}; -use crate::{app::Web3ProxyApp, user_token::UserBearerToken}; - /// get the attached address from redis for the given auth_token. /// 0 means all users #[instrument(level = "trace", skip(redis_conn))] @@ -147,77 +146,86 @@ pub fn get_page_from_params(params: &HashMap) -> anyhow::Result< #[instrument(level = "trace")] pub fn get_query_window_seconds_from_params( params: &HashMap, -) -> anyhow::Result { +) -> Result { params.get("query_window_seconds").map_or_else( || { // no page in params. set default Ok(0) }, - |x: &String| { + |query_window_seconds: &String| { // parse the given timestamp // TODO: error code 401 - let x = x - .parse() - .context("parsing query window seconds from params")?; - - Ok(x) + query_window_seconds.parse::().map_err(|e| { + FrontendErrorResponse::StatusCode( + StatusCode::BAD_REQUEST, + "Unable to parse rpc_key_id".to_string(), + e.into(), + ) + }) }, ) } -/// stats grouped by key_id and error_repsponse and method and key -#[instrument(level = "trace")] -pub async fn get_detailed_rpc_stats_for_params( - app: &Web3ProxyApp, +pub fn filter_query_window_seconds( + params: &HashMap, + response: &mut HashMap<&str, serde_json::Value>, + q: Select, +) -> Result, FrontendErrorResponse> { + let query_window_seconds = get_query_window_seconds_from_params(¶ms)?; + + if query_window_seconds == 0 { + // TODO: order by more than this? + // query_window_seconds is not set so we aggregate all records + // TODO: i am pretty sure we need to filter by something + return Ok(q); + } + + // TODO: is there a better way to do this? how can we get "period_datetime" into this with types? + // TODO: how can we get the first window to start at query_start_timestamp + let expr = Expr::cust_with_values( + "FLOOR(UNIX_TIMESTAMP(rpc_accounting.period_datetime) / ?) * ?", + [query_window_seconds, query_window_seconds], + ); + + response.insert( + "query_window_seconds", + serde_json::Value::Number(query_window_seconds.into()), + ); + + let q = q + .column_as(expr, "query_window") + .group_by(Expr::cust("query_window")) + // TODO: is there a simpler way to order_by? + .order_by_asc(SimpleExpr::Custom("query_window".to_string())); + + Ok(q) +} + +pub enum StatResponse { + Aggregate, + Detailed, +} + +pub async fn query_user_stats<'a>( + app: &'a Web3ProxyApp, bearer: Option>>, - params: HashMap, -) -> anyhow::Result> { + params: &'a HashMap, + stat_response_type: StatResponse, +) -> Result, FrontendErrorResponse> { let db_conn = app.db_conn().context("connecting to db")?; let redis_conn = app.redis_conn().await.context("connecting to redis")?; - let user_id = get_user_id_from_params(redis_conn, bearer, ¶ms).await?; - let rpc_key_id = get_rpc_key_id_from_params(user_id, ¶ms)?; - let chain_id = get_chain_id_from_params(app, ¶ms)?; - let query_start = get_query_start_from_params(¶ms)?; - let query_window_seconds = get_query_window_seconds_from_params(¶ms)?; - let page = get_page_from_params(¶ms)?; - // TODO: handle secondary users, too - - // TODO: page size from config? from params with a max in the config? - let page_size = 200; - - // TODO: minimum query_start of 90 days? - let mut response = HashMap::new(); - response.insert("page", serde_json::to_value(page)?); - response.insert("page_size", serde_json::to_value(page_size)?); - response.insert("chain_id", serde_json::to_value(chain_id)?); - response.insert( - "query_start", - serde_json::to_value(query_start.timestamp() as u64)?, - ); - - // TODO: how do we get count reverts compared to other errors? does it matter? what about http errors to our users? - // TODO: how do we count uptime? let q = rpc_accounting::Entity::find() .select_only() - // groups - .column(rpc_accounting::Column::ErrorResponse) - .group_by(rpc_accounting::Column::ErrorResponse) - .column(rpc_accounting::Column::Method) - .group_by(rpc_accounting::Column::Method) - .column(rpc_accounting::Column::ArchiveRequest) - .group_by(rpc_accounting::Column::ArchiveRequest) - // chain id is added later - // aggregate columns .column_as( rpc_accounting::Column::FrontendRequests.sum(), "total_requests", ) .column_as( rpc_accounting::Column::BackendRequests.sum(), - "total_backend_requests", + "total_backend_retries", ) .column_as( rpc_accounting::Column::CacheMisses.sum(), @@ -236,111 +244,115 @@ pub async fn get_detailed_rpc_stats_for_params( .column_as( rpc_accounting::Column::SumResponseMillis.sum(), "total_response_millis", - ) - // TODO: order on method next? - .order_by_asc(rpc_accounting::Column::PeriodDatetime.min()); + ); - let condition = Condition::all().add(rpc_accounting::Column::PeriodDatetime.gte(query_start)); + let condition = Condition::all(); - let (condition, q) = if chain_id.is_zero() { - // fetch all the chains. don't filter - // TODO: wait. do we want chain id on the logs? we can get that by joining key - let q = q - .column(rpc_accounting::Column::ChainId) - .group_by(rpc_accounting::Column::ChainId); + let q = if let StatResponse::Detailed = stat_response_type { + // group by the columns that we use as keys in other places of the code + q.column(rpc_accounting::Column::ErrorResponse) + .group_by(rpc_accounting::Column::ErrorResponse) + .column(rpc_accounting::Column::Method) + .group_by(rpc_accounting::Column::Method) + .column(rpc_accounting::Column::ArchiveRequest) + .group_by(rpc_accounting::Column::ArchiveRequest) + } else { + q + }; + let q = filter_query_window_seconds(params, &mut response, q)?; + + // aggregate stats after query_start + // TODO: minimum query_start of 90 days? + let query_start = get_query_start_from_params(params)?; + // TODO: if no query_start, don't add to response or condition + response.insert( + "query_start", + serde_json::Value::Number(query_start.timestamp().into()), + ); + let condition = condition.add(rpc_accounting::Column::PeriodDatetime.gte(query_start)); + + // filter on chain_id + let chain_id = get_chain_id_from_params(app, params)?; + let (condition, q) = if chain_id == 0 { + // fetch all the chains. don't filter or aggregate (condition, q) } else { let condition = condition.add(rpc_accounting::Column::ChainId.eq(chain_id)); + response.insert("chain_id", serde_json::Value::Number(chain_id.into())); + (condition, q) }; - let (condition, q) = if user_id != 0 || rpc_key_id != 0 { - // if user id or rpc key id is specified, we need to join on at least rpc_key_id - let q = q - .join(JoinType::InnerJoin, rpc_accounting::Relation::RpcKey.def()) - .column(rpc_key::Column::Id); + // filter on rpc_key_id + // TODO: move getting the param and checking the bearer token into a helper function + let (condition, q) = if let Some(rpc_key_id) = params.get("rpc_key_id") { + let rpc_key_id = rpc_key_id.parse::().map_err(|e| { + FrontendErrorResponse::StatusCode( + StatusCode::BAD_REQUEST, + "Unable to parse rpc_key_id".to_string(), + e.into(), + ) + })?; - // .group_by(rpc_key::Column::Id); - - let condition = condition.add(rpc_key::Column::UserId.eq(user_id)); - - (condition, q) - } else { - // both user_id and rpc_key_id are 0, show aggregate stats - (condition, q) - }; - - let (condition, q) = if user_id == 0 { - // 0 means everyone. don't filter on user_key_id - (condition, q) - } else { - // TODO: add authentication here! make sure this user_id is owned by the authenticated user - // TODO: what about keys where this user is a secondary user? - let q = q - .join(JoinType::InnerJoin, rpc_accounting::Relation::RpcKey.def()) - .column(rpc_key::Column::Id) - .group_by(rpc_key::Column::Id); - - let condition = condition.add(rpc_key::Column::UserId.eq(user_id)); - - let q = if rpc_key_id == 0 { - q.column(rpc_key::Column::UserId) - .group_by(rpc_key::Column::UserId) + if rpc_key_id == 0 { + (condition, q) } else { - response.insert("rpc_key_id", serde_json::to_value(rpc_key_id)?); + // TODO: make sure that the bearer token is allowed to view this rpc_key_id + let q = q.group_by(rpc_accounting::Column::RpcKeyId); - // no need to group_by user_id when we are grouping by key_id - q.column(rpc_key::Column::Id).group_by(rpc_key::Column::Id) - }; + let condition = condition.add(rpc_accounting::Column::RpcKeyId.eq(rpc_key_id)); + + response.insert("rpc_key_id", serde_json::Value::Number(rpc_key_id.into())); + + (condition, q) + } + } else { + (condition, q) + }; + + // get_user_id_from_params checks that the bearer is connected to this user_id + let user_id = get_user_id_from_params(redis_conn, bearer, ¶ms).await?; + let (condition, q) = if user_id == 0 { + // 0 means everyone. don't filter on user + (condition, q) + } else { + let q = q.left_join(rpc_key::Entity); + + let condition = condition.add(rpc_key::Column::UserId.eq(user_id)); + + response.insert("user_id", serde_json::Value::Number(user_id.into())); (condition, q) }; - let q = if query_window_seconds != 0 { - /* - let query_start_timestamp: u64 = query_start - .timestamp() - .try_into() - .context("query_start to timestamp")?; - */ - // TODO: is there a better way to do this? how can we get "period_datetime" into this with types? - // TODO: how can we get the first window to start at query_start_timestamp - let expr = Expr::cust_with_values( - "FLOOR(UNIX_TIMESTAMP(rpc_accounting.period_datetime) / ?) * ?", - [query_window_seconds, query_window_seconds], - ); - - response.insert( - "query_window_seconds", - serde_json::to_value(query_window_seconds)?, - ); - - q.column_as(expr, "query_window_seconds") - .group_by(Expr::cust("query_window_seconds")) - } else { - // TODO: order by more than this? - // query_window_seconds is not set so we aggregate all records - q - }; - + // now that all the conditions are set up. add them to the query let q = q.filter(condition); - // log query here. i think sea orm has a useful log level for this + // TODO: trace log query here? i think sea orm has a useful log level for this - // TODO: transform this into a nested hashmap instead of a giant table? - let r = q + // set up pagination + let page = get_page_from_params(¶ms)?; + response.insert("page", serde_json::to_value(page).expect("can't fail")); + + // TODO: page size from param with a max from the config + let page_size = 200; + response.insert( + "page_size", + serde_json::to_value(page_size).expect("can't fail"), + ); + + // query the database + let query_response = q .into_json() .paginate(&db_conn, page_size) .fetch_page(page) + // TODO: timeouts here? or are they already set up on the connection .await?; - response.insert("detailed_aggregate", serde_json::Value::Array(r)); - - // number of keys - // number of secondary keys - // avg and max concurrent requests per second per api key + // add the query_response to the json response + response.insert("result", serde_json::Value::Array(query_response)); Ok(response) }