From e9abcf883f9a643e877fbf2bd2b3851aadbbfcb4 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Wed, 19 Oct 2022 18:38:00 +0000 Subject: [PATCH] query params and grouping for user stats --- TODO.md | 5 +- web3_proxy/src/app.rs | 2 +- web3_proxy/src/{stats.rs => app_stats.rs} | 0 web3_proxy/src/frontend/users.rs | 104 +++++++++------------- web3_proxy/src/lib.rs | 3 +- web3_proxy/src/user_stats.rs | 65 ++++++++++++++ 6 files changed, 115 insertions(+), 64 deletions(-) rename web3_proxy/src/{stats.rs => app_stats.rs} (100%) create mode 100644 web3_proxy/src/user_stats.rs diff --git a/TODO.md b/TODO.md index 4058786d..f8a8d324 100644 --- a/TODO.md +++ b/TODO.md @@ -202,6 +202,9 @@ These are roughly in order of completition - allow setting things such as private relay, revert logging, ip/origin/etc checks - [ ] GET logged reverts on an endpoint that requires authentication. - [ ] endpoint for creating/modifying api keys and their advanced security features +- [ ] graceful shutdown. stop taking new requests and don't stop until all outstanding queries are handled + - https://github.com/tokio-rs/mini-redis/blob/master/src/shutdown.rs + - we need this because we need to be sure all the queries are saved in the db. maybe put stuff in Drop - [ ] include if archive query or not in the stats - this is already partially done, but we need to double check it works. preferrably with tests - [ ] WARN http_request:request: web3_proxy::block_number: could not get block from params err=unexpected params length id=01GF4HTRKM4JV6NX52XSF9AYMW method=POST authorized_request=User(Some(SqlxMySqlPoolConnection), AuthorizedKey { ip: 10.11.12.15, origin: None, user_key_id: 4, log_revert_chance: 0.0000 }) @@ -347,8 +350,6 @@ in another repo: event subscriber - [ ] have a low-latency option that always tries at least two servers in parallel and then returns the first success? - this doubles our request load though. maybe only if the first one doesn't respond very quickly? - [ ] zero downtime deploys -- [ ] graceful shutdown. stop taking new requests and don't stop until all outstanding queries are handled - - https://github.com/tokio-rs/mini-redis/blob/master/src/shutdown.rs - [ ] are we using Acquire/Release/AcqRel properly? or do we need other modes? - [ ] use https://github.com/ledgerwatch/interfaces to talk to erigon directly instead of through erigon's rpcdaemon (possible example code which uses ledgerwatch/interfaces: https://github.com/akula-bft/akula/tree/master) - [ ] subscribe to pending transactions and build an intelligent gas estimator diff --git a/web3_proxy/src/app.rs b/web3_proxy/src/app.rs index 2874fbe9..6aefc808 100644 --- a/web3_proxy/src/app.rs +++ b/web3_proxy/src/app.rs @@ -1,5 +1,6 @@ // TODO: this file is way too big now. move things into other modules +use crate::app_stats::{ProxyResponseStat, StatEmitter, Web3ProxyStat}; use crate::block_number::block_needed; use crate::config::{AppConfig, TopConfig}; use crate::frontend::authorization::{AuthorizedRequest, RequestMetadata}; @@ -11,7 +12,6 @@ use crate::rpcs::blockchain::{ArcBlock, BlockId}; use crate::rpcs::connections::Web3Connections; use crate::rpcs::request::OpenRequestHandleMetrics; use crate::rpcs::transactions::TxStatus; -use crate::stats::{ProxyResponseStat, StatEmitter, Web3ProxyStat}; use anyhow::Context; use axum::extract::ws::Message; use axum::headers::{Referer, UserAgent}; diff --git a/web3_proxy/src/stats.rs b/web3_proxy/src/app_stats.rs similarity index 100% rename from web3_proxy/src/stats.rs rename to web3_proxy/src/app_stats.rs diff --git a/web3_proxy/src/frontend/users.rs b/web3_proxy/src/frontend/users.rs index 33febcbd..df814911 100644 --- a/web3_proxy/src/frontend/users.rs +++ b/web3_proxy/src/frontend/users.rs @@ -3,6 +3,7 @@ use super::authorization::{login_is_authorized, UserKey}; use super::errors::FrontendResult; use crate::app::Web3ProxyApp; +use crate::user_stats::get_aggregate_stats; use anyhow::Context; use axum::{ extract::{Path, Query}, @@ -12,20 +13,19 @@ use axum::{ }; use axum_client_ip::ClientIp; use axum_macros::debug_handler; -use entities::{rpc_accounting, user, user_keys}; +use chrono::NaiveDateTime; +use entities::{user, user_keys}; use ethers::{prelude::Address, types::Bytes}; use hashbrown::HashMap; use http::StatusCode; use redis_rate_limiter::redis::AsyncCommands; -use sea_orm::{ - ActiveModelTrait, ColumnTrait, Condition, EntityTrait, QueryFilter, QuerySelect, - TransactionTrait, -}; +use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, QueryFilter, TransactionTrait}; use serde::{Deserialize, Serialize}; use siwe::{Message, VerificationOpts}; use std::ops::Add; use std::sync::Arc; use time::{Duration, OffsetDateTime}; +use tracing::debug; use ulid::Ulid; /// `GET /user/login/:user_address` or `GET /user/login/:user_address/:message_eip` -- Start the "Sign In with Ethereum" (siwe) login flow. @@ -46,7 +46,8 @@ pub async fn user_login_get( ClientIp(ip): ClientIp, // TODO: what does axum's error handling look like if the path fails to parse? // TODO: allow ENS names here? - Path(mut params): Path>, + Path((user_address, message_eip)): Path<(Address, Option)>, + Query(query): Query>, ) -> FrontendResult { // give these named variables so that we drop them at the very end of this function let (_, _semaphore) = login_is_authorized(&app, ip).await?; @@ -68,17 +69,9 @@ pub async fn user_login_get( let expiration_time = issued_at.add(Duration::new(expire_seconds as i64, 0)); - let user_address: Address = params - .remove("user_address") - // TODO: map_err so this becomes a 500. routing must be bad - .context("impossible")? - .parse() - // TODO: map_err so this becomes a 401 - .context("bad input")?; - // TODO: get most of these from the app config let message = Message { - // TODO: should domain be llamanodes, or llamarpc? + // TODO: should domain be llamanodes, or llamarpc, or the subdomain of llamarpc? domain: "staging.llamanodes.com".parse().unwrap(), address: user_address.to_fixed_bytes(), statement: Some("🦙🦙🦙🦙🦙".to_string()), @@ -99,6 +92,7 @@ pub async fn user_login_get( // the address isn't enough. we need to save the actual message so we can read the nonce // TODO: what message format is the most efficient to store in redis? probably eip191_bytes // we add 1 to expire_seconds just to be sure redis has the key for the full expiration_time + // TODO: store a maximum number of attempted logins? anyone can request so we don't want to allow DOS attacks app.redis_conn() .await? .set_ex(session_key, message.to_string(), expire_seconds + 1) @@ -106,9 +100,7 @@ pub async fn user_login_get( // there are multiple ways to sign messages and not all wallets support them // TODO: default message eip from config? - let message_eip = params - .remove("message_eip") - .unwrap_or_else(|| "eip4361".to_string()); + let message_eip = message_eip.unwrap_or_else(|| "eip4361".to_string()); let message: String = match message_eip.as_str() { "eip191_bytes" => Bytes::from(message.eip191_bytes().unwrap()).to_string(), @@ -433,53 +425,45 @@ pub async fn user_stats_get( #[debug_handler] pub async fn user_stats_aggregate_get( Extension(app): Extension>, - Path(mut params): Path>, + Query(params): Query>, ) -> FrontendResult { // TODO: how is this supposed to be used? let db = app.db_conn.clone().context("no db")?; - // TODO: read this from params - let timeframe = chrono::Utc::now() - chrono::Duration::days(30); + let chain_id = params + .get("chain_id") + .map_or_else::, _, _>( + || Ok(app.config.chain_id), + |c| { + let c = c.parse()?; - // TODO: how do we get count reverts compared to other errors? does it matter? what about http errors to our users? - // TODO: how do we count uptime? - let x = rpc_accounting::Entity::find() - .select_only() - .column_as( - rpc_accounting::Column::FrontendRequests.sum(), - "total_requests", - ) - .column_as( - rpc_accounting::Column::CacheMisses.sum(), - "total_cache_misses", - ) - .column_as(rpc_accounting::Column::CacheHits.sum(), "total_cache_hits") - .column_as( - rpc_accounting::Column::BackendRetries.sum(), - "total_backend_retries", - ) - .column_as( - rpc_accounting::Column::SumResponseBytes.sum(), - "total_response_bytes", - ) - .column_as( - // TODO: can we sum bools like this? - rpc_accounting::Column::ErrorResponse.sum(), - "total_error_responses", - ) - .column_as( - rpc_accounting::Column::SumResponseMillis.sum(), - "total_response_millis", - ) - .filter( - Condition::all() - .add(rpc_accounting::Column::ChainId.eq(app.config.chain_id)) - .add(rpc_accounting::Column::PeriodDatetime.gte(timeframe)), - ) - .into_json() - .one(&db) - .await? - .unwrap(); + Ok(c) + }, + )?; + + let query_start = params + .get("start_timestamp") + .map_or_else::, _, _>( + || { + // no timestamp in params. set default + let x = chrono::Utc::now() - chrono::Duration::days(30); + + Ok(x.naive_utc()) + }, + |x: &String| { + // parse the given timestamp + let x = x.parse::().context("parsing timestamp query param")?; + + // TODO: error code 401 + let x = NaiveDateTime::from_timestamp_opt(x, 0) + .context("parsing timestamp query param")?; + + Ok(x) + }, + )?; + + // TODO: optionally no chain id? + let x = get_aggregate_stats(chain_id, &db, query_start).await?; Ok(Json(x).into_response()) } diff --git a/web3_proxy/src/lib.rs b/web3_proxy/src/lib.rs index fedf565c..c9ca7910 100644 --- a/web3_proxy/src/lib.rs +++ b/web3_proxy/src/lib.rs @@ -1,4 +1,5 @@ pub mod app; +pub mod app_stats; pub mod block_number; pub mod config; pub mod frontend; @@ -6,4 +7,4 @@ pub mod jsonrpc; pub mod metered; pub mod metrics_frontend; pub mod rpcs; -pub mod stats; +pub mod user_stats; diff --git a/web3_proxy/src/user_stats.rs b/web3_proxy/src/user_stats.rs new file mode 100644 index 00000000..6579a801 --- /dev/null +++ b/web3_proxy/src/user_stats.rs @@ -0,0 +1,65 @@ +use entities::rpc_accounting; +use num::Zero; +use sea_orm::{ColumnTrait, Condition, DatabaseConnection, EntityTrait, QueryFilter, QuerySelect}; + +pub async fn get_aggregate_stats( + chain_id: u64, + db: &DatabaseConnection, + query_start: chrono::NaiveDateTime, +) -> anyhow::Result> { + // TODO: how do we get count reverts compared to other errors? does it matter? what about http errors to our users? + // TODO: how do we count uptime? + let q = rpc_accounting::Entity::find() + .select_only() + .column_as( + rpc_accounting::Column::FrontendRequests.sum(), + "total_requests", + ) + .column_as( + rpc_accounting::Column::CacheMisses.sum(), + "total_cache_misses", + ) + .column_as(rpc_accounting::Column::CacheHits.sum(), "total_cache_hits") + .column_as( + rpc_accounting::Column::BackendRetries.sum(), + "total_backend_retries", + ) + .column_as( + rpc_accounting::Column::SumResponseBytes.sum(), + "total_response_bytes", + ) + .column_as( + // TODO: can we sum bools like this? + rpc_accounting::Column::ErrorResponse.sum(), + "total_error_responses", + ) + .column_as( + rpc_accounting::Column::SumResponseMillis.sum(), + "total_response_millis", + ); + + let condition = Condition::all().add(rpc_accounting::Column::PeriodDatetime.gte(query_start)); + + let (q, condition) = if chain_id.is_zero() { + // fetch all the chains. don't filter + let q = q + .column(rpc_accounting::Column::ChainId) + .group_by(rpc_accounting::Column::ChainId); + + (q, condition) + } else { + let condition = condition.add(rpc_accounting::Column::ChainId.eq(chain_id)); + + (q, condition) + }; + + let q = q.filter(condition); + + // TODO: if user key id is set, use that + // TODO: if user id is set, use that + // TODO: handle secondary users, too + + let r = q.into_json().all(db).await?; + + Ok(r) +}