From e520d85c43a6d136ce17972888d1ba6f073f5495 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Wed, 19 Oct 2022 22:20:34 +0000 Subject: [PATCH] aggregate users or everybody on the same endpoint --- web3_proxy/src/frontend/rpc_proxy_http.rs | 2 +- web3_proxy/src/frontend/users.rs | 43 +++++++++++++++--- web3_proxy/src/user_stats.rs | 55 ++++++++++++----------- 3 files changed, 69 insertions(+), 31 deletions(-) diff --git a/web3_proxy/src/frontend/rpc_proxy_http.rs b/web3_proxy/src/frontend/rpc_proxy_http.rs index d3a6b76f..e8b322ba 100644 --- a/web3_proxy/src/frontend/rpc_proxy_http.rs +++ b/web3_proxy/src/frontend/rpc_proxy_http.rs @@ -1,6 +1,6 @@ //! Take a user's HTTP JSON-RPC requests and either respond from local data or proxy the request to a backend rpc server. -use super::authorization::{ip_is_authorized, key_is_authorized}; +use super::authorization::{bearer_is_authorized, ip_is_authorized, key_is_authorized}; use super::errors::FrontendResult; use crate::{app::Web3ProxyApp, jsonrpc::JsonRpcRequestEnum}; use axum::extract::Path; diff --git a/web3_proxy/src/frontend/users.rs b/web3_proxy/src/frontend/users.rs index d65179af..90b7b6b9 100644 --- a/web3_proxy/src/frontend/users.rs +++ b/web3_proxy/src/frontend/users.rs @@ -25,7 +25,7 @@ use siwe::{Message, VerificationOpts}; use std::ops::Add; use std::sync::Arc; use time::{Duration, OffsetDateTime}; -use tracing::debug; +use tracing::{debug, info, warn}; use ulid::Ulid; /// `GET /user/login/:user_address` or `GET /user/login/:user_address/:message_eip` -- Start the "Sign In with Ethereum" (siwe) login flow. @@ -480,7 +480,7 @@ pub async fn user_stats_get( }, )?; - let x = get_aggregate_stats(chain_id, &db, query_start, Some(user_id)).await?; + let x = get_aggregate_stats(chain_id, &db, query_start, user_id).await?; Ok(Json(x).into_response()) } @@ -488,11 +488,42 @@ pub async fn user_stats_get( /// `GET /user/stats/aggregate` -- Public endpoint for aggregate stats such as bandwidth used and methods requested. #[debug_handler] pub async fn user_stats_aggregate_get( + bearer: Option>>, Extension(app): Extension>, Query(params): Query>, ) -> FrontendResult { - // TODO: how is this supposed to be used? - let db = app.db_conn.clone().context("no db")?; + // TODO: how is db_conn supposed to be used? + let db = app.db_conn.clone().context("connecting to db")?; + + // get the attached address from redis for the given auth_token. + let mut redis_conn = app.redis_conn().await.context("connecting to redis")?; + + let user_id = match (bearer, params.get("user_id")) { + (Some(bearer), Some(params)) => { + // check for the bearer cache key + // TODO: move this to a helper function + let bearer_cache_key = format!("bearer:{}", bearer.token()); + + // get the user id that is attached to this bearer token + redis_conn + .get::<_, u64>(bearer_cache_key) + .await + // TODO: this should be a 403 + .context("fetching user_key_id from redis with bearer_cache_key")? + } + (_, None) => { + // they have a bearer token. we don't care about it on public pages + // 0 means all + 0 + } + (None, Some(x)) => { + // they do not have a bearer token, but requested a specific id. block + // TODO: proper error code + // TODO: maybe instead of this sharp edged warn, we have a config value? + warn!("this should maybe be an access denied"); + x.parse().context("Parsing user_id param")? + } + }; let chain_id = params .get("chain_id") @@ -501,6 +532,8 @@ pub async fn user_stats_aggregate_get( |c| { let c = c.parse()?; + info!("user supplied chain_id"); + Ok(c) }, )?; @@ -527,7 +560,7 @@ pub async fn user_stats_aggregate_get( )?; // TODO: optionally no chain id? - let x = get_aggregate_stats(chain_id, &db, query_start, None).await?; + let x = get_aggregate_stats(chain_id, &db, query_start, user_id).await?; Ok(Json(x).into_response()) } diff --git a/web3_proxy/src/user_stats.rs b/web3_proxy/src/user_stats.rs index 2529557a..38b510b8 100644 --- a/web3_proxy/src/user_stats.rs +++ b/web3_proxy/src/user_stats.rs @@ -4,15 +4,15 @@ use sea_orm::{ ColumnTrait, Condition, DatabaseConnection, EntityTrait, JoinType, QueryFilter, QuerySelect, RelationTrait, }; -use tracing::debug; +use tracing::{debug, info, trace}; pub async fn get_aggregate_stats( chain_id: u64, db: &DatabaseConnection, query_start: chrono::NaiveDateTime, - user_id: Option, + user_id: u64, ) -> anyhow::Result> { - debug!(?chain_id, %query_start, ?user_id, "get_aggregate_stats"); + trace!(?chain_id, %query_start, ?user_id, "get_aggregate_stats"); // TODO: how do we get count reverts compared to other errors? does it matter? what about http errors to our users? // TODO: how do we count uptime? @@ -47,43 +47,48 @@ pub async fn get_aggregate_stats( let condition = Condition::all().add(rpc_accounting::Column::PeriodDatetime.gte(query_start)); - /* - let (q, condition) = if chain_id.is_zero() { + let (condition, q) = if chain_id.is_zero() { // fetch all the chains. don't filter // TODO: wait. do we want chain id on the logs? we can get that by joining key let q = q .column(rpc_accounting::Column::ChainId) .group_by(rpc_accounting::Column::ChainId); - (q, condition) + (condition, q) } else { let condition = condition.add(rpc_accounting::Column::ChainId.eq(chain_id)); - (q, condition) + (condition, q) + }; + + let (condition, q) = if user_id.is_zero() { + // 0 means everyone. don't filter on user + (condition, q) + } else { + // TODO: authentication here? or should that be higher in the stack? here sems safest + // TODO: only join some columns + // TODO: are these joins correct? + // TODO: what about keys where they are the secondary users? + let q = q + .join( + JoinType::InnerJoin, + rpc_accounting::Relation::UserKeys.def(), + ) + .column(user_keys::Column::UserId) + .group_by(user_keys::Column::UserId); + + let condition = condition.add(user_keys::Column::UserId.eq(user_id)); + + (condition, q) }; - */ let q = q.filter(condition); - // // TODO: also check secondary users - // let q = if let Some(user_id) = user_id { - // // TODO: authentication here? or should that be higher in the stack? here sems safest - // // TODO: only join some columns - // // TODO: are these joins correct? - // q.join( - // JoinType::InnerJoin, - // rpc_accounting::Relation::UserKeys.def(), - // ) - // .join(JoinType::InnerJoin, user_keys::Relation::User.def()) - // .filter(user::Column::Id.eq(user_id)) - // } else { - // q - // }; - - // TODO: if user key id is set, use that - // TODO: if user id is set, use that + // TODO: enum between searching on user_key_id on user_id // TODO: handle secondary users, too + // log query here. i think sea orm has a useful log level for this + let r = q.into_json().all(db).await?; Ok(r)