diff --git a/TODO.md b/TODO.md index 138d9d88..612545c2 100644 --- a/TODO.md +++ b/TODO.md @@ -220,19 +220,45 @@ These are roughly in order of completition - [x] remove request per minute and concurrency limits from the keys. those are on the user tiers now. - [x] revertLogs db table should have rpc_key_id on it - [x] the relation in Relation is wrong now. it is called user_key_id, but point to the rpc key table -- [ ] include if archive query or not in the stats +- [x] instruments are missing. maybe that is why sentry had broken traces +- [x] description should default to an empty string instead of being nullable +- [-] include if archive query or not in the stats - this is already partially done, but we need to double check it works. preferrably with tests +- [ ] make archive request column show in the stat aggregation +- [ ] i think our server picking is incorrect somehow. + we upgraded erigon to a version with a broken websocket + but that made it clear we still route to the lagged server sometimes. this is bad, but retries keep it from giving users bad data. +- [ ] add indexes to speed up stat queries - [-] add configurable size limits to all the Caches - [ ] instead of configuring each cache with MB sizes, have one value for total memory footprint and then percentages for each cache -- [-] let users choose a % to log (or maybe x/second). someone like curve logging all reverts will be a BIG database very quickly - - this must be opt-in or spawned since it will slow things down and will make their calls less private - - [ ] automatic pruning of old revert logs once too many are collected - - [ ] we currently default to 0.0 and don't expose a way to edit it. we have a database row, but we don't use it +- [ ] when displaying the user's data, they just see an opaque id for their tier. We should join that data +- [ ] detailed stats should also filterable by "method" ## V1 These are not yet ordered. +- [ ] the public rpc is rate limited by ip and the authenticated rpc is rate limit by key + - this means if a dapp uses the authenticated RPC on their website, they could get rate limited more easily +- [ ] add cacheing to speed up stat queries +- [ ] take an option to set a non-default role when creating a user +- [ ] different prune levels for free tiers +- [ ] have a test that runs ethspam and versus +- [ ] status page show git hash of running version +- [ ] Email confirmation + - [ ] we'll need a pretty template email that the backend will send. + - [ ] That will link them to a a page on llamanodes.com + - [ ] There, they click "confirm" (or JavaScript does it for them automatically) to POST to this new endpoint +- [ ] test in the migration repo that sets up a sqlite database that runs up and down +- [ ] unbounded queues are risky. add limits +- [-] let users choose a % to log (or maybe x/second). someone like curve logging all reverts will be a BIG database very quickly + - this must be opt-in or spawned since it will slow things down and will make their calls less private + - [ ] automatic pruning of old revert logs once too many are collected + - [ ] we currently default to 0.0 and don't expose a way to edit it. we have a database row, but we don't use it +- [ ] after running for a while, https://eth-ski.llamanodes.com/status is only at 157 blocks and hashes. i thought they would be near 10k after running for a while + - adding uptime to the status should help + - i think this is already in our todo list +- [ ] improve private transactions. keep re-broadcasting until they are confirmed - [ ] with a test that creates a user and modifies their key - [ ] Uuid/Ulid instead of big_unsigned for database ids - might have to use Uuid in sea-orm and then convert to Ulid on display diff --git a/web3_proxy/src/app_stats.rs b/web3_proxy/src/app_stats.rs index 399336a6..7cd286dd 100644 --- a/web3_proxy/src/app_stats.rs +++ b/web3_proxy/src/app_stats.rs @@ -354,6 +354,8 @@ impl StatEmitter { } } + info!("saving {} stats", response_aggregate_map.len()); + for (key, aggregate) in response_aggregate_map.drain() { if let Err(err) = aggregate .save(self.chain_id, &self.db_conn, key, period_timestamp) diff --git a/web3_proxy/src/frontend/users.rs b/web3_proxy/src/frontend/users.rs index 27f874ad..d7e5294f 100644 --- a/web3_proxy/src/frontend/users.rs +++ b/web3_proxy/src/frontend/users.rs @@ -3,10 +3,12 @@ use super::authorization::{login_is_authorized, RpcSecretKey}; use super::errors::FrontendResult; use crate::app::Web3ProxyApp; +use crate::frontend::errors::FrontendErrorResponse; use crate::user_queries::{ - get_aggregate_rpc_stats_from_params, get_detailed_stats, get_page_from_params, + get_chain_id_from_params, get_query_start_from_params, get_query_window_seconds_from_params, + get_user_id_from_params, }; -use crate::user_queries::{get_chain_id_from_params, get_query_start_from_params}; +use crate::user_queries::{get_detailed_rpc_stats_for_params, get_page_from_params}; use crate::user_token::UserBearerToken; use anyhow::Context; use axum::headers::{Header, Origin, Referer, UserAgent}; @@ -18,16 +20,17 @@ use axum::{ }; use axum_client_ip::ClientIp; use axum_macros::debug_handler; -use entities::{revert_log, rpc_key, user}; +use entities::{revert_log, rpc_accounting, rpc_key, user}; use ethers::{prelude::Address, types::Bytes}; use hashbrown::HashMap; use http::{HeaderValue, StatusCode}; use ipnet::IpNet; use itertools::Itertools; +use migration::{Condition, Expr, SimpleExpr}; use redis_rate_limiter::redis::AsyncCommands; use sea_orm::{ ActiveModelTrait, ColumnTrait, EntityTrait, PaginatorTrait, QueryFilter, QueryOrder, - TransactionTrait, + QuerySelect, TransactionTrait, }; use serde::Deserialize; use serde_json::json; @@ -747,7 +750,7 @@ pub async fn user_stats_detailed_get( bearer: Option>>, Query(params): Query>, ) -> FrontendResult { - let x = get_detailed_stats(&app, bearer, params).await?; + let x = get_detailed_rpc_stats_for_params(&app, bearer, params).await?; Ok(Json(x).into_response()) } @@ -760,7 +763,159 @@ pub async fn user_stats_aggregate_get( Extension(app): Extension>, Query(params): Query>, ) -> FrontendResult { - let x = get_aggregate_rpc_stats_from_params(&app, bearer, params).await?; + let db_conn = app.db_conn().context("connecting to db")?; + let redis_conn = app.redis_conn().await.context("connecting to redis")?; - Ok(Json(x).into_response()) + let mut response = HashMap::new(); + + let q = rpc_accounting::Entity::find() + .select_only() + .column_as( + rpc_accounting::Column::FrontendRequests.sum(), + "total_requests", + ) + .column_as( + rpc_accounting::Column::BackendRequests.sum(), + "total_backend_retries", + ) + .column_as( + rpc_accounting::Column::CacheMisses.sum(), + "total_cache_misses", + ) + .column_as(rpc_accounting::Column::CacheHits.sum(), "total_cache_hits") + .column_as( + rpc_accounting::Column::SumResponseBytes.sum(), + "total_response_bytes", + ) + .column_as( + // TODO: can we sum bools like this? + rpc_accounting::Column::ErrorResponse.sum(), + "total_error_responses", + ) + .column_as( + rpc_accounting::Column::SumResponseMillis.sum(), + "total_response_millis", + ); + + let condition = Condition::all(); + + // TODO: DRYer! move this onto query_window_seconds_from_params? + let query_window_seconds = get_query_window_seconds_from_params(¶ms)?; + let q = if query_window_seconds == 0 { + // TODO: order by more than this? + // query_window_seconds is not set so we aggregate all records + // TODO: i am pretty sure we need to filter by something + q + } else { + // TODO: is there a better way to do this? how can we get "period_datetime" into this with types? + // TODO: how can we get the first window to start at query_start_timestamp + let expr = Expr::cust_with_values( + "FLOOR(UNIX_TIMESTAMP(rpc_accounting.period_datetime) / ?) * ?", + [query_window_seconds, query_window_seconds], + ); + + response.insert( + "query_window_seconds", + serde_json::Value::Number(query_window_seconds.into()), + ); + + q.column_as(expr, "query_window") + .group_by(Expr::cust("query_window")) + // TODO: is there a simpler way to order_by? + .order_by_asc(SimpleExpr::Custom("query_window".to_string())) + }; + + // aggregate stats after query_start + // TODO: minimum query_start of 90 days? + let query_start = get_query_start_from_params(¶ms)?; + // TODO: if no query_start, don't add to response or condition + response.insert( + "query_start", + serde_json::Value::Number(query_start.timestamp().into()), + ); + let condition = condition.add(rpc_accounting::Column::PeriodDatetime.gte(query_start)); + + // filter on chain_id + let chain_id = get_chain_id_from_params(&app, ¶ms)?; + let (condition, q) = if chain_id == 0 { + // fetch all the chains. don't filter or aggregate + (condition, q) + } else { + let condition = condition.add(rpc_accounting::Column::ChainId.eq(chain_id)); + + response.insert("chain_id", serde_json::Value::Number(chain_id.into())); + + (condition, q) + }; + + // filter on rpc_key_id + // TODO: move getting the param and checking the bearer token into a helper function + let (condition, q) = if let Some(rpc_key_id) = params.get("rpc_key_id") { + let rpc_key_id = rpc_key_id.parse::().map_err(|e| { + FrontendErrorResponse::StatusCode( + StatusCode::BAD_REQUEST, + "Unable to parse rpc_key_id".to_string(), + e.into(), + ) + })?; + + if rpc_key_id == 0 { + (condition, q) + } else { + // TODO: make sure that the bearer token is allowed to view this rpc_key_id + let q = q.group_by(rpc_accounting::Column::RpcKeyId); + + let condition = condition.add(rpc_accounting::Column::RpcKeyId.eq(rpc_key_id)); + + response.insert("rpc_key_id", serde_json::Value::Number(rpc_key_id.into())); + + (condition, q) + } + } else { + (condition, q) + }; + + // get_user_id_from_params checks that the bearer is connected to this user_id + let user_id = get_user_id_from_params(redis_conn, bearer, ¶ms).await?; + let (condition, q) = if user_id == 0 { + // 0 means everyone. don't filter on user + (condition, q) + } else { + let q = q.left_join(rpc_key::Entity); + + let condition = condition.add(rpc_key::Column::UserId.eq(user_id)); + + response.insert("user_id", serde_json::Value::Number(user_id.into())); + + (condition, q) + }; + + // now that all the conditions are set up. add them to the query + let q = q.filter(condition); + + // TODO: trace log query here? i think sea orm has a useful log level for this + + // set up pagination + let page = get_page_from_params(¶ms)?; + response.insert("page", serde_json::to_value(page).expect("can't fail")); + + // TODO: page size from param with a max from the config + let page_size = 200; + response.insert( + "page_size", + serde_json::to_value(page_size).expect("can't fail"), + ); + + // query the database + let aggregate = q + .into_json() + .paginate(&db_conn, page_size) + .fetch_page(page) + // TODO: timeouts here? or are they already set up on the connection + .await?; + + // add the query response to the response + response.insert("aggregate", serde_json::Value::Array(aggregate)); + + Ok(Json(response).into_response()) } diff --git a/web3_proxy/src/user_queries.rs b/web3_proxy/src/user_queries.rs index 3ac22d2e..72c1a849 100644 --- a/web3_proxy/src/user_queries.rs +++ b/web3_proxy/src/user_queries.rs @@ -6,7 +6,7 @@ use axum::{ use chrono::NaiveDateTime; use entities::{rpc_accounting, rpc_key}; use hashbrown::HashMap; -use migration::{Expr, SimpleExpr}; +use migration::Expr; use num::Zero; use redis_rate_limiter::{redis::AsyncCommands, RedisConnection}; use sea_orm::{ @@ -20,7 +20,7 @@ use crate::{app::Web3ProxyApp, user_token::UserBearerToken}; /// get the attached address from redis for the given auth_token. /// 0 means all users #[instrument(level = "trace", skip(redis_conn))] -async fn get_user_id_from_params( +pub async fn get_user_id_from_params( mut redis_conn: RedisConnection, // this is a long type. should we strip it down? bearer: Option>>, @@ -165,149 +165,9 @@ pub fn get_query_window_seconds_from_params( ) } -/// stats aggregated across a time period -/// TODO: aggregate on everything, or let the caller decide? -#[instrument(level = "trace")] -pub async fn get_aggregate_rpc_stats_from_params( - app: &Web3ProxyApp, - bearer: Option>>, - params: HashMap, -) -> anyhow::Result> { - let db_conn = app.db_conn().context("connecting to db")?; - let redis_conn = app.redis_conn().await.context("connecting to redis")?; - - let mut response = HashMap::new(); - - let page = get_page_from_params(¶ms)?; - response.insert("page", serde_json::to_value(page)?); - - // TODO: page size from param with a max from the config - let page_size = 200; - response.insert("page_size", serde_json::to_value(page_size)?); - - let q = rpc_accounting::Entity::find() - .select_only() - .column_as( - rpc_accounting::Column::FrontendRequests.sum(), - "total_requests", - ) - .column_as( - rpc_accounting::Column::BackendRequests.sum(), - "total_backend_retries", - ) - .column_as( - rpc_accounting::Column::CacheMisses.sum(), - "total_cache_misses", - ) - .column_as(rpc_accounting::Column::CacheHits.sum(), "total_cache_hits") - .column_as( - rpc_accounting::Column::SumResponseBytes.sum(), - "total_response_bytes", - ) - .column_as( - // TODO: can we sum bools like this? - rpc_accounting::Column::ErrorResponse.sum(), - "total_error_responses", - ) - .column_as( - rpc_accounting::Column::SumResponseMillis.sum(), - "total_response_millis", - ); - - let condition = Condition::all(); - - // TODO: DRYer! move this onto query_window_seconds_from_params? - let query_window_seconds = get_query_window_seconds_from_params(¶ms)?; - let q = if query_window_seconds.is_zero() { - // TODO: order by more than this? - // query_window_seconds is not set so we aggregate all records - // TODO: i am pretty sure we need to filter by something - q - } else { - // TODO: is there a better way to do this? how can we get "period_datetime" into this with types? - // TODO: how can we get the first window to start at query_start_timestamp - let expr = Expr::cust_with_values( - "FLOOR(UNIX_TIMESTAMP(rpc_accounting.period_datetime) / ?) * ?", - [query_window_seconds, query_window_seconds], - ); - - response.insert( - "query_window_seconds", - serde_json::to_value(query_window_seconds)?, - ); - - q.column_as(expr, "query_window") - .group_by(Expr::cust("query_window")) - // TODO: is there a simpler way to order_by? - .order_by_asc(SimpleExpr::Custom("query_window".to_string())) - }; - - // aggregate stats after query_start - // TODO: minimum query_start of 90 days? - let query_start = get_query_start_from_params(¶ms)?; - // TODO: if no query_start, don't add to response or condition - response.insert( - "query_start", - serde_json::to_value(query_start.timestamp() as u64)?, - ); - let condition = condition.add(rpc_accounting::Column::PeriodDatetime.gte(query_start)); - - // filter on chain_id - let chain_id = get_chain_id_from_params(app, ¶ms)?; - let (condition, q) = if chain_id.is_zero() { - // fetch all the chains. don't filter or aggregate - (condition, q) - } else { - let condition = condition.add(rpc_accounting::Column::ChainId.eq(chain_id)); - - response.insert("chain_id", serde_json::to_value(chain_id)?); - - (condition, q) - }; - - // filter on user_id - // TODO: what about filter on rpc_key_id? - // get_user_id_from_params checks that the bearer is connected to this user_id - let user_id = get_user_id_from_params(redis_conn, bearer, ¶ms).await?; - let (condition, q) = if user_id.is_zero() { - // 0 means everyone. don't filter on user - (condition, q) - } else { - // TODO: are these joins correct? do we need these columns? - // TODO: also join on on keys where user is a secondary user? - let q = q - .join(JoinType::InnerJoin, rpc_accounting::Relation::RpcKey.def()) - .column(rpc_accounting::Column::Id) - .column(rpc_key::Column::Id) - .join(JoinType::InnerJoin, rpc_key::Relation::User.def()) - .column(rpc_key::Column::UserId); - - let condition = condition.add(rpc_key::Column::UserId.eq(user_id)); - - (condition, q) - }; - - // now that all the conditions are set up. add them to the query - let q = q.filter(condition); - - // TODO: trace log query here? i think sea orm has a useful log level for this - - // query the database - let aggregate = q - .into_json() - .paginate(&db_conn, page_size) - .fetch_page(page) - .await?; - - // add the query response to the response - response.insert("aggregate", serde_json::Value::Array(aggregate)); - - Ok(response) -} - /// stats grouped by key_id and error_repsponse and method and key #[instrument(level = "trace")] -pub async fn get_detailed_stats( +pub async fn get_detailed_rpc_stats_for_params( app: &Web3ProxyApp, bearer: Option>>, params: HashMap,