diff --git a/TODO.md b/TODO.md index 1686f86a..788a8096 100644 --- a/TODO.md +++ b/TODO.md @@ -194,8 +194,8 @@ These are roughly in order of completition - [-] GET profile endpoint - [-] POST profile endpoint - [-] GET stats endpoint - - [ ] display requests per second per api key (only with authentication!) - - [ ] display concurrent requests per api key (only with authentication!) + - [-] display requests per second per api key (only with authentication!) + - [-] display concurrent requests per api key (only with authentication!) - [ ] display distribution of methods per api key (eth_call, eth_getLogs, etc.) (only with authentication!) - [x] get aggregate stats endpoint - [ ] POST key endpoint @@ -208,6 +208,7 @@ These are roughly in order of completition - need an flume::watch on unflushed stats that we can subscribe to. wait for it to flip to true - [ ] include if archive query or not in the stats - this is already partially done, but we need to double check it works. preferrably with tests +- [ ] [paginate responses](https://www.sea-ql.org/SeaORM/docs/basic-crud/select/#paginate-result) - [ ] WARN http_request:request: web3_proxy::block_number: could not get block from params err=unexpected params length id=01GF4HTRKM4JV6NX52XSF9AYMW method=POST authorized_request=User(Some(SqlxMySqlPoolConnection), AuthorizedKey { ip: 10.11.12.15, origin: None, user_key_id: 4, log_revert_chance: 0.0000 }) - ERROR http_request:request:try_send_all_upstream_servers: web3_proxy::rpcs::request: bad response! err=JsonRpcClientError(JsonRpcError(JsonRpcError { code: -32000, message: "INTERNAL_ERROR: existing tx with same hash", data: None })) method=eth_sendRawTransaction rpc=local_erigon_alpha_archive id=01GF4HV03Y4ZNKQV8DW5NDQ5CG method=POST authorized_request=User(Some(SqlxMySqlPoolConnection), AuthorizedKey { ip: 10.11.12.15, origin: None, user_key_id: 4, log_revert_chance: 0.0000 }) self=Web3Connections { conns: {"local_erigon_alpha_archive_ws": Web3Connection { name: "local_erigon_alpha_archive_ws", blocks: "all", .. }, "local_geth_ws": Web3Connection { name: "local_geth_ws", blocks: 64, .. }, "local_erigon_alpha_archive": Web3Connection { name: "local_erigon_alpha_archive", blocks: "all", .. }}, .. } authorized_request=Some(User(Some(SqlxMySqlPoolConnection), AuthorizedKey { ip: 10.11.12.15, origin: None, user_key_id: 4, log_revert_chance: 0.0000 })) request=JsonRpcRequest { id: RawValue(39), method: "eth_sendRawTransaction", .. } request_metadata=Some(RequestMetadata { datetime: 2022-10-11T22:14:57.406829095Z, period_seconds: 60, request_bytes: 633, backend_requests: 0, no_servers: 0, error_response: false, response_bytes: 0, response_millis: 0 }) block_needed=None - why is it failing to get the block from params when its set to None? That should be the simple case diff --git a/web3_proxy/src/frontend/mod.rs b/web3_proxy/src/frontend/mod.rs index ebdd6f01..3a1ead80 100644 --- a/web3_proxy/src/frontend/mod.rs +++ b/web3_proxy/src/frontend/mod.rs @@ -73,11 +73,11 @@ pub async fn serve(port: u16, proxy_app: Arc) -> anyhow::Result<() .route("/user/balance/:txid", post(users::user_balance_post)) .route("/user/profile", get(users::user_profile_get)) .route("/user/profile", post(users::user_profile_post)) - .route("/user/stats", get(users::user_stats_get)) .route( "/user/stats/aggregate", get(users::user_stats_aggregate_get), ) + .route("/user/stats/detailed", get(users::user_stats_detailed_get)) .route("/user/logout", post(users::user_logout_post)) .route("/status", get(status::status)) // TODO: make this optional or remove it since it is available on another port diff --git a/web3_proxy/src/frontend/users.rs b/web3_proxy/src/frontend/users.rs index 90b7b6b9..0265df0d 100644 --- a/web3_proxy/src/frontend/users.rs +++ b/web3_proxy/src/frontend/users.rs @@ -3,7 +3,7 @@ use super::authorization::{login_is_authorized, UserKey}; use super::errors::FrontendResult; use crate::app::Web3ProxyApp; -use crate::user_stats::get_aggregate_stats; +use crate::user_stats::{get_aggregate_rpc_stats, get_detailed_stats}; use anyhow::Context; use axum::{ extract::{Path, Query}, @@ -416,38 +416,57 @@ pub async fn user_profile_get( todo!("user_profile_get"); } -/// `GET /user/stats` -- Use a bearer token to get the user's key stats such as bandwidth used and methods requested. +/// `GET /user/stats/detailed` -- Use a bearer token to get the user's key stats such as bandwidth used and methods requested. +/// +/// If no bearer is provided, detailed stats for all users will be shown /// /// - show number of requests used (so we can calculate average spending over a month, burn rate for a user etc, something like "Your balance will be depleted in xx days) /// /// TODO: one key per request? maybe /user/stats/:api_key? /// TODO: this will change as we add better support for secondary users. #[debug_handler] -pub async fn user_stats_get( +pub async fn user_stats_detailed_get( // TODO: turn this back on when done debugging. maybe add a path field for this // TypedHeader(Authorization(bearer)): TypedHeader>, + bearer: Option>>, Extension(app): Extension>, Query(params): Query>, ) -> FrontendResult { - /* + // TODO: how is db_conn supposed to be used? + let db = app.db_conn.clone().context("connecting to db")?; + // get the attached address from redis for the given auth_token. - let mut redis_conn = app.redis_conn().await?; + let mut redis_conn = app.redis_conn().await.context("connecting to redis")?; - // check for the bearer cache key - // TODO: move this to a helper function - let bearer_cache_key = format!("bearer:{}", bearer.token()); - let user_id = redis_conn - .get::<_, u64>(bearer_cache_key) - .await - // TODO: this should be a 403 - .context("fetching user_key_id from redis with bearer_cache_key")?; + // TODO: DRY + let user_id = match (bearer, params.get("user_id")) { + (Some(bearer), Some(params)) => { + // check for the bearer cache key + // TODO: move this to a helper function + let bearer_cache_key = format!("bearer:{}", bearer.token()); - */ - // TODO: remove this and get the user id that matches the bearer - let user_id = params.get("user_id").unwrap().parse().unwrap(); - - let db = app.db_conn.clone().context("no db")?; + // get the user id that is attached to this bearer token + redis_conn + .get::<_, u64>(bearer_cache_key) + .await + // TODO: this should be a 403 + .context("fetching user_key_id from redis with bearer_cache_key")? + } + (_, None) => { + // they have a bearer token. we don't care about it on public pages + // 0 means all + 0 + } + (None, Some(x)) => { + // they do not have a bearer token, but requested a specific id. block + // TODO: proper error code + // TODO: maybe instead of this sharp edged warn, we have a config value? + warn!("this should maybe be an access denied"); + x.parse().context("Parsing user_id param")? + } + }; + // TODO: DRY let chain_id = params .get("chain_id") .map_or_else::, _, _>( @@ -459,6 +478,7 @@ pub async fn user_stats_get( }, )?; + // TODO: DRY let query_start = params .get("timestamp") .map_or_else::, _, _>( @@ -480,7 +500,7 @@ pub async fn user_stats_get( }, )?; - let x = get_aggregate_stats(chain_id, &db, query_start, user_id).await?; + let x = get_detailed_stats(chain_id, &db, query_start, user_id).await?; Ok(Json(x).into_response()) } @@ -560,7 +580,7 @@ pub async fn user_stats_aggregate_get( )?; // TODO: optionally no chain id? - let x = get_aggregate_stats(chain_id, &db, query_start, user_id).await?; + let x = get_aggregate_rpc_stats(chain_id, &db, query_start, user_id).await?; Ok(Json(x).into_response()) } diff --git a/web3_proxy/src/user_stats.rs b/web3_proxy/src/user_stats.rs index 419c310f..a64a2589 100644 --- a/web3_proxy/src/user_stats.rs +++ b/web3_proxy/src/user_stats.rs @@ -1,4 +1,5 @@ use entities::{rpc_accounting, user, user_keys}; +use hashbrown::HashMap; use num::Zero; use sea_orm::{ ColumnTrait, Condition, DatabaseConnection, EntityTrait, JoinType, QueryFilter, QuerySelect, @@ -6,15 +7,7 @@ use sea_orm::{ }; use tracing::{debug, info, trace}; -pub async fn get_rpc_stats(chain_id: u64) -> u64 { - todo!(); -} - -pub async fn get_user_stats() -> u64 { - todo!(); -} - -pub async fn get_aggregate_stats( +pub async fn get_aggregate_rpc_stats( chain_id: u64, db: &DatabaseConnection, query_start: chrono::NaiveDateTime, @@ -101,3 +94,116 @@ pub async fn get_aggregate_stats( Ok(r) } + +pub async fn get_user_stats(chain_id: u64) -> u64 { + todo!(); +} + +/// stats grouped by key_id and error_repsponse and method and key +pub async fn get_detailed_stats( + chain_id: u64, + db_conn: &DatabaseConnection, + query_start: chrono::NaiveDateTime, + user_id: u64, +) -> anyhow::Result> { + // aggregate stats, but grouped by method and error + trace!(?chain_id, %query_start, ?user_id, "get_aggregate_stats"); + + let mut response = HashMap::new(); + + response.insert("chain_id", serde_json::to_value(chain_id)?); + response.insert("query_start", serde_json::to_value(query_start)?); + + // TODO: how do we get count reverts compared to other errors? does it matter? what about http errors to our users? + // TODO: how do we count uptime? + let q = rpc_accounting::Entity::find() + .select_only() + // groups + .column(rpc_accounting::Column::ErrorResponse) + .group_by(rpc_accounting::Column::ErrorResponse) + .column(rpc_accounting::Column::Method) + .group_by(rpc_accounting::Column::Method) + // aggregate columns + .column_as( + rpc_accounting::Column::FrontendRequests.sum(), + "total_requests", + ) + .column_as( + rpc_accounting::Column::CacheMisses.sum(), + "total_cache_misses", + ) + .column_as(rpc_accounting::Column::CacheHits.sum(), "total_cache_hits") + .column_as( + rpc_accounting::Column::BackendRetries.sum(), + "total_backend_retries", + ) + .column_as( + rpc_accounting::Column::SumResponseBytes.sum(), + "total_response_bytes", + ) + .column_as( + // TODO: can we sum bools like this? + rpc_accounting::Column::ErrorResponse.sum(), + "total_error_responses", + ) + .column_as( + rpc_accounting::Column::SumResponseMillis.sum(), + "total_response_millis", + ); + + let condition = Condition::all().add(rpc_accounting::Column::PeriodDatetime.gte(query_start)); + + let (condition, q) = if chain_id.is_zero() { + // fetch all the chains. don't filter + // TODO: wait. do we want chain id on the logs? we can get that by joining key + let q = q + .column(rpc_accounting::Column::ChainId) + .group_by(rpc_accounting::Column::ChainId); + + (condition, q) + } else { + let condition = condition.add(rpc_accounting::Column::ChainId.eq(chain_id)); + + (condition, q) + }; + + let (condition, q) = if user_id.is_zero() { + // 0 means everyone. don't filter on user + (condition, q) + } else { + // TODO: move authentication here? + // TODO: what about keys where this user is a secondary user? + let q = q + .join( + JoinType::InnerJoin, + rpc_accounting::Relation::UserKeys.def(), + ) + .column(user_keys::Column::UserId) + // no need to group_by user_id when we are grouping by key_id + // .group_by(user_keys::Column::UserId) + .column(user_keys::Column::Id) + .group_by(user_keys::Column::Id); + + let condition = condition.add(user_keys::Column::UserId.eq(user_id)); + + (condition, q) + }; + + let q = q.filter(condition); + + // TODO: enum between searching on user_key_id on user_id + // TODO: handle secondary users, too + + // log query here. i think sea orm has a useful log level for this + + // TODO: transform this into a nested hashmap instead of a giant table? + let r = q.into_json().all(db_conn).await?; + + response.insert("detailed_aggregate", serde_json::Value::Array(r)); + + // number of keys + // number of secondary keys + // avg and max concurrent requests per second per api key + + Ok(response) +}