From 3793bdff678528834ae9d9ad064838aff6c04902 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Wed, 19 Oct 2022 00:56:57 +0000 Subject: [PATCH] aggregate stats endpoint --- TODO.md | 2 + web3_proxy/src/app.rs | 5 +- web3_proxy/src/frontend/authorization.rs | 4 +- web3_proxy/src/frontend/mod.rs | 4 ++ web3_proxy/src/frontend/users.rs | 76 +++++++++++++++++++++--- 5 files changed, 80 insertions(+), 11 deletions(-) diff --git a/TODO.md b/TODO.md index 7d082f06..4058786d 100644 --- a/TODO.md +++ b/TODO.md @@ -197,6 +197,7 @@ These are roughly in order of completition - [ ] display requests per second per api key (only with authentication!) - [ ] display concurrent requests per api key (only with authentication!) - [ ] display distribution of methods per api key (eth_call, eth_getLogs, etc.) (only with authentication!) + - [x] get aggregate stats endpoint - [ ] POST key endpoint - allow setting things such as private relay, revert logging, ip/origin/etc checks - [ ] GET logged reverts on an endpoint that requires authentication. @@ -445,3 +446,4 @@ in another repo: event subscriber - [ ] document "backend requests is cache_misses + backend_retries" - [ ] having tons of worker threads can actually make us slower if they keep waking to steal work from eachother. need benchmarks - [ ] change the wrk data to log requests and errors to a file +- [ ] if redis is not set and login page is visited, users get a 502. should be 501 diff --git a/web3_proxy/src/app.rs b/web3_proxy/src/app.rs index e4f43491..2874fbe9 100644 --- a/web3_proxy/src/app.rs +++ b/web3_proxy/src/app.rs @@ -764,8 +764,9 @@ impl Web3ProxyApp { Ok(collected) } - pub fn db_conn(&self) -> Option<&DatabaseConnection> { - self.db_conn.as_ref() + /// TODO: i don't think we want or need this. just use app.db_conn, or maybe app.db_conn.clone() or app.db_conn.as_ref() + pub fn db_conn(&self) -> Option { + self.db_conn.clone() } pub async fn redis_conn(&self) -> anyhow::Result { diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index be0d38b0..f00de909 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -291,7 +291,7 @@ pub async fn bearer_is_authorized( // turn user key id into a user key let user_key_data = user_keys::Entity::find_by_id(user_key_id) - .one(db_conn) + .one(&db_conn) .await .context("fetching user key by id")? .context("unknown user id")?; @@ -486,7 +486,7 @@ impl Web3ProxyApp { match user_keys::Entity::find() .filter(user_keys::Column::ApiKey.eq(user_uuid)) .filter(user_keys::Column::Active.eq(true)) - .one(db) + .one(&db) .await? { Some(user_key_model) => { diff --git a/web3_proxy/src/frontend/mod.rs b/web3_proxy/src/frontend/mod.rs index 17c75a44..ebdd6f01 100644 --- a/web3_proxy/src/frontend/mod.rs +++ b/web3_proxy/src/frontend/mod.rs @@ -74,6 +74,10 @@ pub async fn serve(port: u16, proxy_app: Arc) -> anyhow::Result<() .route("/user/profile", get(users::user_profile_get)) .route("/user/profile", post(users::user_profile_post)) .route("/user/stats", get(users::user_stats_get)) + .route( + "/user/stats/aggregate", + get(users::user_stats_aggregate_get), + ) .route("/user/logout", post(users::user_logout_post)) .route("/status", get(status::status)) // TODO: make this optional or remove it since it is available on another port diff --git a/web3_proxy/src/frontend/users.rs b/web3_proxy/src/frontend/users.rs index fe6709eb..33febcbd 100644 --- a/web3_proxy/src/frontend/users.rs +++ b/web3_proxy/src/frontend/users.rs @@ -12,12 +12,15 @@ use axum::{ }; use axum_client_ip::ClientIp; use axum_macros::debug_handler; -use entities::{user, user_keys}; +use entities::{rpc_accounting, user, user_keys}; use ethers::{prelude::Address, types::Bytes}; use hashbrown::HashMap; use http::StatusCode; use redis_rate_limiter::redis::AsyncCommands; -use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, QueryFilter, TransactionTrait}; +use sea_orm::{ + ActiveModelTrait, ColumnTrait, Condition, EntityTrait, QueryFilter, QuerySelect, + TransactionTrait, +}; use serde::{Deserialize, Serialize}; use siwe::{Message, VerificationOpts}; use std::ops::Add; @@ -67,8 +70,10 @@ pub async fn user_login_get( let user_address: Address = params .remove("user_address") + // TODO: map_err so this becomes a 500. routing must be bad .context("impossible")? .parse() + // TODO: map_err so this becomes a 401 .context("bad input")?; // TODO: get most of these from the app config @@ -90,7 +95,7 @@ pub async fn user_login_get( let session_key = format!("pending:{}", nonce); - // TODO: if no redis server, store in local cache? + // TODO: if no redis server, store in local cache? at least give a better error. right now this seems to be a 502 // the address isn't enough. we need to save the actual message so we can read the nonce // TODO: what message format is the most efficient to store in redis? probably eip191_bytes // we add 1 to expire_seconds just to be sure redis has the key for the full expiration_time @@ -110,7 +115,7 @@ pub async fn user_login_get( "eip191_hash" => Bytes::from(&message.eip191_hash().unwrap()).to_string(), "eip4361" => message.to_string(), _ => { - // TODO: this needs the correct error code in the response + // TODO: custom error that is handled a 401 return Err(anyhow::anyhow!("invalid message eip given").into()); } }; @@ -129,6 +134,7 @@ pub struct PostLoginQuery { /// JSON body to our `post_login` handler. /// Currently only siwe logins that send an address, msg, and sig are allowed. +/// Email/password and other login methods are planned. #[derive(Deserialize)] pub struct PostLogin { pub address: Address, @@ -146,6 +152,7 @@ pub struct PostLoginResponse { bearer_token: Ulid, /// Used for authenticating with the RPC endpoints. api_keys: Vec, + // TODO: what else? } /// `POST /user/login` - Register or login by posting a signed "siwe" message. @@ -199,7 +206,7 @@ pub async fn user_login_post( // TODO: limit columns or load whole user? let u = user::Entity::find() .filter(user::Column::Address.eq(our_msg.address.as_ref())) - .one(db) + .one(&db) .await .unwrap(); @@ -248,7 +255,7 @@ pub async fn user_login_post( // the user is already registered let uks = user_keys::Entity::find() .filter(user_keys::Column::UserId.eq(u.id)) - .all(db) + .all(&db) .await .context("failed loading user's key")?; @@ -337,7 +344,7 @@ pub async fn user_profile_post( let db = app.db_conn().context("Getting database connection")?; - user.save(db).await?; + user.save(&db).await?; todo!("finish post_user"); } @@ -422,6 +429,61 @@ pub async fn user_stats_get( todo!("user_stats_get"); } +/// `GET /user/stats/aggregate` -- Public endpoint for aggregate stats such as bandwidth used and methods requested. +#[debug_handler] +pub async fn user_stats_aggregate_get( + Extension(app): Extension>, + Path(mut params): Path>, +) -> FrontendResult { + // TODO: how is this supposed to be used? + let db = app.db_conn.clone().context("no db")?; + + // TODO: read this from params + let timeframe = chrono::Utc::now() - chrono::Duration::days(30); + + // TODO: how do we get count reverts compared to other errors? does it matter? what about http errors to our users? + // TODO: how do we count uptime? + let x = rpc_accounting::Entity::find() + .select_only() + .column_as( + rpc_accounting::Column::FrontendRequests.sum(), + "total_requests", + ) + .column_as( + rpc_accounting::Column::CacheMisses.sum(), + "total_cache_misses", + ) + .column_as(rpc_accounting::Column::CacheHits.sum(), "total_cache_hits") + .column_as( + rpc_accounting::Column::BackendRetries.sum(), + "total_backend_retries", + ) + .column_as( + rpc_accounting::Column::SumResponseBytes.sum(), + "total_response_bytes", + ) + .column_as( + // TODO: can we sum bools like this? + rpc_accounting::Column::ErrorResponse.sum(), + "total_error_responses", + ) + .column_as( + rpc_accounting::Column::SumResponseMillis.sum(), + "total_response_millis", + ) + .filter( + Condition::all() + .add(rpc_accounting::Column::ChainId.eq(app.config.chain_id)) + .add(rpc_accounting::Column::PeriodDatetime.gte(timeframe)), + ) + .into_json() + .one(&db) + .await? + .unwrap(); + + Ok(Json(x).into_response()) +} + /// `GET /user/profile` -- Use a bearer token to get the user's profile such as their optional email address. /// Handle authorization for a given address and bearer token. // TODO: what roles should exist?