From f3e9f6c38792543927dea5e33641650ed5085852 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Thu, 3 Nov 2022 22:16:27 +0000 Subject: [PATCH] bug fixes --- TODO.md | 3 + config/example.toml | 2 + web3_proxy/src/rpcs/connection.rs | 15 ++- web3_proxy/src/rpcs/connections.rs | 4 +- web3_proxy/src/user_queries.rs | 150 ++++++++++++++++------------- 5 files changed, 101 insertions(+), 73 deletions(-) diff --git a/TODO.md b/TODO.md index 2a253476..138d9d88 100644 --- a/TODO.md +++ b/TODO.md @@ -217,6 +217,9 @@ These are roughly in order of completition - [x] instead of requests_per_minute on every key, have a "user_tier" that gets joined - [x] document url params with examples - [x] improve "docs/http routes.txt" +- [x] remove request per minute and concurrency limits from the keys. those are on the user tiers now. +- [x] revertLogs db table should have rpc_key_id on it +- [x] the relation in Relation is wrong now. it is called user_key_id, but point to the rpc key table - [ ] include if archive query or not in the stats - this is already partially done, but we need to double check it works. preferrably with tests - [-] add configurable size limits to all the Caches diff --git a/config/example.toml b/config/example.toml index fa19b9ff..71d8c2a6 100644 --- a/config/example.toml +++ b/config/example.toml @@ -15,6 +15,8 @@ volatile_redis_url = "redis://dev-vredis:6379/" redirect_public_url = "https://llamanodes.com/free-rpc-stats" redirect_user_url = "https://llamanodes.com/user-rpc-stats/{{user_id}}" +sentry_url = "https://YOURKEYA.ingest.sentry.io/YOURKEYB" + # 0 = block all public requests public_max_concurrent_requests = 5 # 0 = block all public requests diff --git a/web3_proxy/src/rpcs/connection.rs b/web3_proxy/src/rpcs/connection.rs index 5761aba0..3d806a6d 100644 --- a/web3_proxy/src/rpcs/connection.rs +++ b/web3_proxy/src/rpcs/connection.rs @@ -236,14 +236,12 @@ impl Web3Connection { Ok(limit) } - /// TODO: this might be too simple. different nodes can prune differently + /// TODO: this might be too simple. different nodes can prune differently. its possible we will have a block range pub fn block_data_limit(&self) -> U64 { - self.block_data_limit.load(atomic::Ordering::Acquire).into() + self.block_data_limit.load(atomic::Ordering::Relaxed).into() } pub fn has_block_data(&self, needed_block_num: &U64) -> bool { - let block_data_limit: U64 = self.block_data_limit(); - let head_block_id = self.head_block_id.read().clone(); let newest_block_num = match head_block_id { @@ -251,11 +249,18 @@ impl Web3Connection { Some(x) => x.num, }; + if needed_block_num > &newest_block_num { + return false; + } + + // if this is a pruning node, we might not actually have the block + let block_data_limit: U64 = self.block_data_limit(); + let oldest_block_num = newest_block_num .saturating_sub(block_data_limit) .max(U64::one()); - needed_block_num >= &oldest_block_num && needed_block_num <= &newest_block_num + needed_block_num >= &oldest_block_num } /// reconnect to the provider. errors are retried forever with exponential backoff with jitter. diff --git a/web3_proxy/src/rpcs/connections.rs b/web3_proxy/src/rpcs/connections.rs index 72667fab..bc200c6d 100644 --- a/web3_proxy/src/rpcs/connections.rs +++ b/web3_proxy/src/rpcs/connections.rs @@ -380,7 +380,7 @@ impl Web3Connections { let mut earliest_retry_at = None; // filter the synced rpcs - // TODO: we are going to be checking "has_block_data" a lot now. i think we pretty much always have min_block_needed now that we override "latest" + // TODO: we are going to be checking "has_block_data" a lot now let mut synced_rpcs: Vec> = if let Some(min_block_needed) = min_block_needed { self.conns @@ -527,6 +527,7 @@ impl Web3Connections { // TODO: maximum retries? right now its the total number of servers loop { if skip_rpcs.len() == self.conns.len() { + // no servers to try break; } match self @@ -646,6 +647,7 @@ impl Web3Connections { .store(true, Ordering::Release); } + // TODO: what error code? 502? Err(anyhow::anyhow!("all {} tries exhausted", skip_rpcs.len())) } diff --git a/web3_proxy/src/user_queries.rs b/web3_proxy/src/user_queries.rs index 16770f3c..3ac22d2e 100644 --- a/web3_proxy/src/user_queries.rs +++ b/web3_proxy/src/user_queries.rs @@ -6,14 +6,14 @@ use axum::{ use chrono::NaiveDateTime; use entities::{rpc_accounting, rpc_key}; use hashbrown::HashMap; -use migration::Expr; +use migration::{Expr, SimpleExpr}; use num::Zero; use redis_rate_limiter::{redis::AsyncCommands, RedisConnection}; use sea_orm::{ ColumnTrait, Condition, EntityTrait, JoinType, PaginatorTrait, QueryFilter, QueryOrder, QuerySelect, RelationTrait, }; -use tracing::{instrument, trace}; +use tracing::{instrument, warn}; use crate::{app::Web3ProxyApp, user_token::UserBearerToken}; @@ -52,12 +52,16 @@ async fn get_user_id_from_params( // 0 means all Ok(0) } - (None, Some(_)) => { + (None, Some(x)) => { // they do not have a bearer token, but requested a specific id. block // TODO: proper error code // TODO: maybe instead of this sharp edged warn, we have a config value? // TODO: check config for if we should deny or allow this - Err(anyhow::anyhow!("permission denied")) + // Err(anyhow::anyhow!("permission denied")) + + // TODO: make this a flag + warn!("allowing without auth during development!"); + Ok(x.parse()?) } } } @@ -161,7 +165,8 @@ pub fn get_query_window_seconds_from_params( ) } -/// stats aggregated across a large time period +/// stats aggregated across a time period +/// TODO: aggregate on everything, or let the caller decide? #[instrument(level = "trace")] pub async fn get_aggregate_rpc_stats_from_params( app: &Web3ProxyApp, @@ -171,33 +176,15 @@ pub async fn get_aggregate_rpc_stats_from_params( let db_conn = app.db_conn().context("connecting to db")?; let redis_conn = app.redis_conn().await.context("connecting to redis")?; - let user_id = get_user_id_from_params(redis_conn, bearer, ¶ms).await?; - let chain_id = get_chain_id_from_params(app, ¶ms)?; - let query_start = get_query_start_from_params(¶ms)?; - let query_window_seconds = get_query_window_seconds_from_params(¶ms)?; - let page = get_page_from_params(¶ms)?; - - // TODO: warn if unknown fields in params - - // TODO: page size from config - let page_size = 200; - - trace!(?chain_id, %query_start, ?user_id, "get_aggregate_stats"); - - // TODO: minimum query_start of 90 days? - let mut response = HashMap::new(); + let page = get_page_from_params(¶ms)?; response.insert("page", serde_json::to_value(page)?); - response.insert("page_size", serde_json::to_value(page_size)?); - response.insert("chain_id", serde_json::to_value(chain_id)?); - response.insert( - "query_start", - serde_json::to_value(query_start.timestamp() as u64)?, - ); - // TODO: how do we get count reverts compared to other errors? does it matter? what about http errors to our users? - // TODO: how do we count uptime? + // TODO: page size from param with a max from the config + let page_size = 200; + response.insert("page_size", serde_json::to_value(page_size)?); + let q = rpc_accounting::Entity::find() .select_only() .column_as( @@ -225,17 +212,18 @@ pub async fn get_aggregate_rpc_stats_from_params( .column_as( rpc_accounting::Column::SumResponseMillis.sum(), "total_response_millis", - ) - .order_by_asc(rpc_accounting::Column::PeriodDatetime.min()); + ); - // TODO: DRYer - let q = if query_window_seconds != 0 { - /* - let query_start_timestamp: u64 = query_start - .timestamp() - .try_into() - .context("query_start to timestamp")?; - */ + let condition = Condition::all(); + + // TODO: DRYer! move this onto query_window_seconds_from_params? + let query_window_seconds = get_query_window_seconds_from_params(¶ms)?; + let q = if query_window_seconds.is_zero() { + // TODO: order by more than this? + // query_window_seconds is not set so we aggregate all records + // TODO: i am pretty sure we need to filter by something + q + } else { // TODO: is there a better way to do this? how can we get "period_datetime" into this with types? // TODO: how can we get the first window to start at query_start_timestamp let expr = Expr::cust_with_values( @@ -248,61 +236,70 @@ pub async fn get_aggregate_rpc_stats_from_params( serde_json::to_value(query_window_seconds)?, ); - q.column_as(expr, "query_window_seconds") - .group_by(Expr::cust("query_window_seconds")) - } else { - // TODO: order by more than this? - // query_window_seconds is not set so we aggregate all records - q + q.column_as(expr, "query_window") + .group_by(Expr::cust("query_window")) + // TODO: is there a simpler way to order_by? + .order_by_asc(SimpleExpr::Custom("query_window".to_string())) }; - let condition = Condition::all().add(rpc_accounting::Column::PeriodDatetime.gte(query_start)); + // aggregate stats after query_start + // TODO: minimum query_start of 90 days? + let query_start = get_query_start_from_params(¶ms)?; + // TODO: if no query_start, don't add to response or condition + response.insert( + "query_start", + serde_json::to_value(query_start.timestamp() as u64)?, + ); + let condition = condition.add(rpc_accounting::Column::PeriodDatetime.gte(query_start)); + // filter on chain_id + let chain_id = get_chain_id_from_params(app, ¶ms)?; let (condition, q) = if chain_id.is_zero() { - // fetch all the chains. don't filter - // TODO: wait. do we want chain id on the logs? we can get that by joining key - let q = q - .column(rpc_accounting::Column::ChainId) - .group_by(rpc_accounting::Column::ChainId); - + // fetch all the chains. don't filter or aggregate (condition, q) } else { let condition = condition.add(rpc_accounting::Column::ChainId.eq(chain_id)); + response.insert("chain_id", serde_json::to_value(chain_id)?); + (condition, q) }; + // filter on user_id + // TODO: what about filter on rpc_key_id? + // get_user_id_from_params checks that the bearer is connected to this user_id + let user_id = get_user_id_from_params(redis_conn, bearer, ¶ms).await?; let (condition, q) = if user_id.is_zero() { // 0 means everyone. don't filter on user (condition, q) } else { - // TODO: authentication here? or should that be higher in the stack? here sems safest - // TODO: only join some columns - // TODO: are these joins correct? - // TODO: what about keys where they are the secondary users? + // TODO: are these joins correct? do we need these columns? + // TODO: also join on on keys where user is a secondary user? let q = q .join(JoinType::InnerJoin, rpc_accounting::Relation::RpcKey.def()) - .column(rpc_key::Column::UserId) - .group_by(rpc_key::Column::UserId); + .column(rpc_accounting::Column::Id) + .column(rpc_key::Column::Id) + .join(JoinType::InnerJoin, rpc_key::Relation::User.def()) + .column(rpc_key::Column::UserId); let condition = condition.add(rpc_key::Column::UserId.eq(user_id)); (condition, q) }; + // now that all the conditions are set up. add them to the query let q = q.filter(condition); - // TODO: enum between searching on rpc_key_id on user_id - // TODO: handle secondary users, too - - // log query here. i think sea orm has a useful log level for this + // TODO: trace log query here? i think sea orm has a useful log level for this + // query the database let aggregate = q .into_json() .paginate(&db_conn, page_size) .fetch_page(page) .await?; + // add the query response to the response response.insert("aggregate", serde_json::Value::Array(aggregate)); Ok(response) @@ -326,7 +323,7 @@ pub async fn get_detailed_stats( let page = get_page_from_params(¶ms)?; // TODO: handle secondary users, too - // TODO: page size from config + // TODO: page size from config? from params with a max in the config? let page_size = 200; // TODO: minimum query_start of 90 days? @@ -350,6 +347,9 @@ pub async fn get_detailed_stats( .group_by(rpc_accounting::Column::ErrorResponse) .column(rpc_accounting::Column::Method) .group_by(rpc_accounting::Column::Method) + .column(rpc_accounting::Column::ArchiveRequest) + .group_by(rpc_accounting::Column::ArchiveRequest) + // chain id is added later // aggregate columns .column_as( rpc_accounting::Column::FrontendRequests.sum(), @@ -396,16 +396,32 @@ pub async fn get_detailed_stats( (condition, q) }; - let (condition, q) = if user_id == 0 { - // 0 means everyone. don't filter on user + let (condition, q) = if user_id != 0 || rpc_key_id != 0 { + // if user id or rpc key id is specified, we need to join on at least rpc_key_id + let q = q + .join(JoinType::InnerJoin, rpc_accounting::Relation::RpcKey.def()) + .column(rpc_key::Column::Id); + + // .group_by(rpc_key::Column::Id); + + let condition = condition.add(rpc_key::Column::UserId.eq(user_id)); + (condition, q) } else { - // TODO: move authentication here? + // both user_id and rpc_key_id are 0, show aggregate stats + (condition, q) + }; + + let (condition, q) = if user_id == 0 { + // 0 means everyone. don't filter on user_key_id + (condition, q) + } else { + // TODO: add authentication here! make sure this user_id is owned by the authenticated user // TODO: what about keys where this user is a secondary user? let q = q .join(JoinType::InnerJoin, rpc_accounting::Relation::RpcKey.def()) - .column(rpc_key::Column::UserId) - .group_by(rpc_key::Column::UserId); + .column(rpc_key::Column::Id) + .group_by(rpc_key::Column::Id); let condition = condition.add(rpc_key::Column::UserId.eq(user_id));