From 8278fe006ba7a7dd800b1fb609450e4128748a1a Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Wed, 19 Oct 2022 21:34:05 +0000 Subject: [PATCH] cleanup --- TODO.md | 1 + web3_proxy/src/frontend/authorization.rs | 27 ++++---- web3_proxy/src/frontend/rpc_proxy_http.rs | 2 +- web3_proxy/src/frontend/users.rs | 75 ++++++++++++++++++++--- web3_proxy/src/user_stats.rs | 29 ++++++++- 5 files changed, 108 insertions(+), 26 deletions(-) diff --git a/TODO.md b/TODO.md index f8a8d324..e61a9390 100644 --- a/TODO.md +++ b/TODO.md @@ -243,6 +243,7 @@ These are roughly in order of completition These are not yet ordered. +- [ ] admin-only page for viewing user stat pages - [ ] geth sometimes gives an empty response instead of an error response. figure out a good way to catch this and not serve it - [ ] GET balance endpoint - [ ] POST balance endpoint diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index f00de909..5a969bbf 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -7,7 +7,7 @@ use anyhow::Context; use axum::headers::{authorization::Bearer, Origin, Referer, UserAgent}; use chrono::Utc; use deferred_rate_limiter::DeferredRateLimitResult; -use entities::user_keys; +use entities::{user, user_keys}; use ipnet::IpNet; use redis_rate_limiter::redis::AsyncCommands; use redis_rate_limiter::RedisRateLimitResult; @@ -282,7 +282,7 @@ pub async fn bearer_is_authorized( let bearer_cache_key = format!("bearer:{}", bearer.token()); // turn bearer into a user key id - let user_key_id: u64 = redis_conn + let user_id: u64 = redis_conn .get(bearer_cache_key) .await .context("unknown bearer token")?; @@ -290,21 +290,22 @@ pub async fn bearer_is_authorized( let db_conn = app.db_conn().context("Getting database connection")?; // turn user key id into a user key - let user_key_data = user_keys::Entity::find_by_id(user_key_id) + let user_key_data = user::Entity::find_by_id(user_id) .one(&db_conn) .await - .context("fetching user key by id")? + .context("fetching user by id")? .context("unknown user id")?; - key_is_authorized( - app, - user_key_data.api_key.into(), - ip, - origin, - referer, - user_agent, - ) - .await + todo!("api_key is wrong. we should check user ids instead") + // key_is_authorized( + // app, + // user_key_data.api_key.into(), + // ip, + // origin, + // referer, + // user_agent, + // ) + // .await } pub async fn ip_is_authorized( diff --git a/web3_proxy/src/frontend/rpc_proxy_http.rs b/web3_proxy/src/frontend/rpc_proxy_http.rs index e8b322ba..d3a6b76f 100644 --- a/web3_proxy/src/frontend/rpc_proxy_http.rs +++ b/web3_proxy/src/frontend/rpc_proxy_http.rs @@ -1,6 +1,6 @@ //! Take a user's HTTP JSON-RPC requests and either respond from local data or proxy the request to a backend rpc server. -use super::authorization::{bearer_is_authorized, ip_is_authorized, key_is_authorized}; +use super::authorization::{ip_is_authorized, key_is_authorized}; use super::errors::FrontendResult; use crate::{app::Web3ProxyApp, jsonrpc::JsonRpcRequestEnum}; use axum::extract::Path; diff --git a/web3_proxy/src/frontend/users.rs b/web3_proxy/src/frontend/users.rs index 22b5e0d5..d65179af 100644 --- a/web3_proxy/src/frontend/users.rs +++ b/web3_proxy/src/frontend/users.rs @@ -324,8 +324,8 @@ pub async fn user_profile_post( // give these named variables so that we drop them at the very end of this function let (_, _semaphore) = login_is_authorized(&app, ip).await?; - let user = ProtectedAction::PostUser - .verify(app.as_ref(), bearer_token, &payload.primary_address) + let user = ProtectedAction::PostUser(payload.primary_address) + .verify(app.as_ref(), bearer_token) .await?; let mut user: user::ActiveModel = user.into(); @@ -424,10 +424,65 @@ pub async fn user_profile_get( /// TODO: this will change as we add better support for secondary users. #[debug_handler] pub async fn user_stats_get( - TypedHeader(Authorization(bearer_token)): TypedHeader>, + // TODO: turn this back on when done debugging. maybe add a path field for this + // TypedHeader(Authorization(bearer)): TypedHeader>, Extension(app): Extension>, + Query(params): Query>, ) -> FrontendResult { - todo!("user_stats_get"); + /* + // get the attached address from redis for the given auth_token. + let mut redis_conn = app.redis_conn().await?; + + // check for the bearer cache key + // TODO: move this to a helper function + let bearer_cache_key = format!("bearer:{}", bearer.token()); + let user_id = redis_conn + .get::<_, u64>(bearer_cache_key) + .await + // TODO: this should be a 403 + .context("fetching user_key_id from redis with bearer_cache_key")?; + + */ + // TODO: remove this and get the user id that matches the bearer + let user_id = params.get("user_id").unwrap().parse().unwrap(); + + let db = app.db_conn.clone().context("no db")?; + + let chain_id = params + .get("chain_id") + .map_or_else::, _, _>( + || Ok(app.config.chain_id), + |c| { + let c = c.parse()?; + + Ok(c) + }, + )?; + + let query_start = params + .get("timestamp") + .map_or_else::, _, _>( + || { + // no timestamp in params. set default + let x = chrono::Utc::now() - chrono::Duration::days(30); + + Ok(x.naive_utc()) + }, + |x: &String| { + // parse the given timestamp + let x = x.parse::().context("parsing timestamp query param")?; + + // TODO: error code 401 + let x = NaiveDateTime::from_timestamp_opt(x, 0) + .context("parsing timestamp query param")?; + + Ok(x) + }, + )?; + + let x = get_aggregate_stats(chain_id, &db, query_start, Some(user_id)).await?; + + Ok(Json(x).into_response()) } /// `GET /user/stats/aggregate` -- Public endpoint for aggregate stats such as bandwidth used and methods requested. @@ -451,7 +506,7 @@ pub async fn user_stats_aggregate_get( )?; let query_start = params - .get("start_timestamp") + .get("timestamp") .map_or_else::, _, _>( || { // no timestamp in params. set default @@ -472,7 +527,7 @@ pub async fn user_stats_aggregate_get( )?; // TODO: optionally no chain id? - let x = get_aggregate_stats(chain_id, &db, query_start).await?; + let x = get_aggregate_stats(chain_id, &db, query_start, None).await?; Ok(Json(x).into_response()) } @@ -481,7 +536,7 @@ pub async fn user_stats_aggregate_get( /// Handle authorization for a given address and bearer token. // TODO: what roles should exist? enum ProtectedAction { - PostUser, + PostUser(Address), } impl ProtectedAction { @@ -491,15 +546,15 @@ impl ProtectedAction { app: &Web3ProxyApp, // TODO: i don't think we want Bearer here. we want user_key and a helper for bearer -> user_key bearer: Bearer, - // TODO: what about secondary addresses? maybe an enum for primary or secondary? - primary_address: &Address, ) -> anyhow::Result { // get the attached address from redis for the given auth_token. let mut redis_conn = app.redis_conn().await?; + // TODO: move this to a helper function let bearer_cache_key = format!("bearer:{}", bearer.token()); - let user_key_id: Option = redis_conn + // TODO: move this to a helper function + let user_id: Option = redis_conn .get(bearer_cache_key) .await .context("fetching bearer cache key from redis")?; diff --git a/web3_proxy/src/user_stats.rs b/web3_proxy/src/user_stats.rs index 6579a801..2529557a 100644 --- a/web3_proxy/src/user_stats.rs +++ b/web3_proxy/src/user_stats.rs @@ -1,12 +1,19 @@ -use entities::rpc_accounting; +use entities::{rpc_accounting, user, user_keys}; use num::Zero; -use sea_orm::{ColumnTrait, Condition, DatabaseConnection, EntityTrait, QueryFilter, QuerySelect}; +use sea_orm::{ + ColumnTrait, Condition, DatabaseConnection, EntityTrait, JoinType, QueryFilter, QuerySelect, + RelationTrait, +}; +use tracing::debug; pub async fn get_aggregate_stats( chain_id: u64, db: &DatabaseConnection, query_start: chrono::NaiveDateTime, + user_id: Option, ) -> anyhow::Result> { + debug!(?chain_id, %query_start, ?user_id, "get_aggregate_stats"); + // TODO: how do we get count reverts compared to other errors? does it matter? what about http errors to our users? // TODO: how do we count uptime? let q = rpc_accounting::Entity::find() @@ -40,8 +47,10 @@ pub async fn get_aggregate_stats( let condition = Condition::all().add(rpc_accounting::Column::PeriodDatetime.gte(query_start)); + /* let (q, condition) = if chain_id.is_zero() { // fetch all the chains. don't filter + // TODO: wait. do we want chain id on the logs? we can get that by joining key let q = q .column(rpc_accounting::Column::ChainId) .group_by(rpc_accounting::Column::ChainId); @@ -52,9 +61,25 @@ pub async fn get_aggregate_stats( (q, condition) }; + */ let q = q.filter(condition); + // // TODO: also check secondary users + // let q = if let Some(user_id) = user_id { + // // TODO: authentication here? or should that be higher in the stack? here sems safest + // // TODO: only join some columns + // // TODO: are these joins correct? + // q.join( + // JoinType::InnerJoin, + // rpc_accounting::Relation::UserKeys.def(), + // ) + // .join(JoinType::InnerJoin, user_keys::Relation::User.def()) + // .filter(user::Column::Id.eq(user_id)) + // } else { + // q + // }; + // TODO: if user key id is set, use that // TODO: if user id is set, use that // TODO: handle secondary users, too