diff --git a/entities/src/sea_orm_active_enums.rs b/entities/src/sea_orm_active_enums.rs index 1dd920c1..879a218d 100644 --- a/entities/src/sea_orm_active_enums.rs +++ b/entities/src/sea_orm_active_enums.rs @@ -1,7 +1,7 @@ //! SeaORM Entity. Generated by sea-orm-codegen 0.10.0 use sea_orm::entity::prelude::*; -use sea_orm::{sea_query, ActiveEnum, EnumIter, Iden}; +use sea_orm::EnumIter; use serde::{Deserialize, Serialize}; #[derive(Debug, Clone, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] diff --git a/web3_proxy/src/app.rs b/web3_proxy/src/app.rs index 0cc3abf6..a3522bf3 100644 --- a/web3_proxy/src/app.rs +++ b/web3_proxy/src/app.rs @@ -172,6 +172,16 @@ pub async fn get_migrated_db( Ok(db_conn) } +#[derive(From)] +pub struct Web3ProxyAppSpawn { + /// the app. probably clone this to use in other groups of handles + pub app: Arc, + // cancellable handles + pub app_handles: FuturesUnordered>, + /// these are important and must be allowed to finish + pub background_handles: FuturesUnordered>, +} + #[metered(registry = Web3ProxyAppMetrics, registry_expr = self.app_metrics, visibility = pub)] impl Web3ProxyApp { /// The main entrypoint. @@ -180,13 +190,7 @@ impl Web3ProxyApp { top_config: TopConfig, num_workers: usize, shutdown_receiver: broadcast::Receiver<()>, - ) -> anyhow::Result<( - Arc, - // this handle is the main loops that we can cancel. select on this - FuturesUnordered>, - // this handle is the state saving background loops that we must let finish. join_all on this - FuturesUnordered>, - )> { + ) -> anyhow::Result { // safety checks on the config if let Some(redirect) = &top_config.app.redirect_user_url { assert!( @@ -493,7 +497,7 @@ impl Web3ProxyApp { let app = Arc::new(app); - Ok((app, cancellable_handles, important_background_handles)) + Ok((app, cancellable_handles, important_background_handles).into()) } #[instrument(level = "trace")] diff --git a/web3_proxy/src/bin/web3_proxy.rs b/web3_proxy/src/bin/web3_proxy.rs index bf51a672..7de292d3 100644 --- a/web3_proxy/src/bin/web3_proxy.rs +++ b/web3_proxy/src/bin/web3_proxy.rs @@ -74,17 +74,21 @@ fn run( let app_frontend_port = cli_config.port; let app_prometheus_port = cli_config.prometheus_port; - let (app, app_handles, mut important_background_handles) = + let mut spawned_app = Web3ProxyApp::spawn(top_config, num_workers, shutdown_sender.subscribe()).await?; - let frontend_handle = tokio::spawn(frontend::serve(app_frontend_port, app.clone())); + let frontend_handle = + tokio::spawn(frontend::serve(app_frontend_port, spawned_app.app.clone())); - let prometheus_handle = tokio::spawn(metrics_frontend::serve(app, app_prometheus_port)); + let prometheus_handle = tokio::spawn(metrics_frontend::serve( + spawned_app.app, + app_prometheus_port, + )); // if everything is working, these should both run forever // TODO: join these instead and use shutdown handler properly. probably use tokio's ctrl+c helper tokio::select! { - x = flatten_handles(app_handles) => { + x = flatten_handles(spawned_app.app_handles) => { match x { Ok(_) => info!("app_handle exited"), Err(e) => { @@ -124,7 +128,7 @@ fn run( }; // wait on all the important background tasks (like saving stats to the database) to complete - while let Some(x) = important_background_handles.next().await { + while let Some(x) = spawned_app.background_handles.next().await { match x { Err(e) => return Err(e.into()), Ok(Err(e)) => return Err(e), diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index 088a85b8..1b13b665 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -3,6 +3,7 @@ use super::errors::FrontendErrorResponse; use crate::app::{UserKeyData, Web3ProxyApp}; use crate::jsonrpc::JsonRpcRequest; +use crate::user_token::UserBearerToken; use anyhow::Context; use axum::headers::authorization::Bearer; use axum::headers::{Header, Origin, Referer, UserAgent}; @@ -395,7 +396,7 @@ impl Web3ProxyApp { // get the user id for this bearer token // TODO: move redis key building to a helper function - let bearer_cache_key = format!("bearer:{}", bearer.token()); + let bearer_cache_key = UserBearerToken::try_from(bearer)?.to_string(); // get the attached address from redis for the given auth_token. let mut redis_conn = self.redis_conn().await?; diff --git a/web3_proxy/src/frontend/errors.rs b/web3_proxy/src/frontend/errors.rs index 42f24062..75ac889b 100644 --- a/web3_proxy/src/frontend/errors.rs +++ b/web3_proxy/src/frontend/errors.rs @@ -15,7 +15,7 @@ use reqwest::header::ToStrError; use sea_orm::DbErr; use std::{error::Error, net::IpAddr}; use tokio::time::Instant; -use tracing::{instrument, warn}; +use tracing::{instrument, trace, warn}; // TODO: take "IntoResponse" instead of Response? pub type FrontendResult = Result; @@ -37,6 +37,7 @@ pub enum FrontendErrorResponse { Response(Response), /// simple way to return an error message to the user and an anyhow to our logs StatusCode(StatusCode, String, anyhow::Error), + UlidDecodeError(ulid::DecodeError), UnknownKey, } @@ -168,7 +169,8 @@ impl IntoResponse for FrontendErrorResponse { return r; } Self::StatusCode(status_code, err_msg, err) => { - warn!(?status_code, ?err_msg, ?err); + // TODO: warn is way too loud. different status codes should get different error levels. 500s should warn. 400s should stat + trace!(?status_code, ?err_msg, ?err); ( status_code, JsonRpcForwardedResponse::from_str( @@ -179,7 +181,7 @@ impl IntoResponse for FrontendErrorResponse { ) } Self::HeaderToString(err) => { - warn!(?err, "HeaderToString"); + trace!(?err, "HeaderToString"); ( StatusCode::BAD_REQUEST, JsonRpcForwardedResponse::from_str( @@ -189,6 +191,18 @@ impl IntoResponse for FrontendErrorResponse { ), ) } + Self::UlidDecodeError(err) => { + trace!(?err, "UlidDecodeError"); + ( + StatusCode::BAD_REQUEST, + JsonRpcForwardedResponse::from_str( + &format!("{}", err), + Some(StatusCode::BAD_REQUEST.as_u16().into()), + None, + ), + ) + } + // TODO: stat? Self::UnknownKey => ( StatusCode::UNAUTHORIZED, JsonRpcForwardedResponse::from_str( diff --git a/web3_proxy/src/frontend/users.rs b/web3_proxy/src/frontend/users.rs index 3bbe9454..4a5c70b5 100644 --- a/web3_proxy/src/frontend/users.rs +++ b/web3_proxy/src/frontend/users.rs @@ -7,6 +7,7 @@ use crate::user_queries::{ get_aggregate_rpc_stats_from_params, get_detailed_stats, get_page_from_params, }; use crate::user_queries::{get_chain_id_from_params, get_query_start_from_params}; +use crate::user_token::UserBearerToken; use anyhow::Context; use axum::headers::{Header, Origin, Referer, UserAgent}; use axum::{ @@ -312,7 +313,7 @@ pub async fn user_login_post( // add bearer to redis // TODO: use a helper function/struct for this - let bearer_redis_key = format!("bearer:{}", bearer_token); + let bearer_redis_key = UserBearerToken(bearer_token).to_string(); // expire in 4 weeks // TODO: get expiration time from app config @@ -340,7 +341,7 @@ pub async fn user_logout_post( let mut redis_conn = app.redis_conn().await?; // TODO: i don't like this. move this to a helper function so it is less fragile - let bearer_cache_key = format!("bearer:{}", bearer.token()); + let bearer_cache_key = UserBearerToken::try_from(bearer)?.to_string(); redis_conn.del(bearer_cache_key).await?; diff --git a/web3_proxy/src/lib.rs b/web3_proxy/src/lib.rs index 28bfd96c..0ae97055 100644 --- a/web3_proxy/src/lib.rs +++ b/web3_proxy/src/lib.rs @@ -8,3 +8,4 @@ pub mod metered; pub mod metrics_frontend; pub mod rpcs; pub mod user_queries; +pub mod user_token; diff --git a/web3_proxy/src/user_queries.rs b/web3_proxy/src/user_queries.rs index 68729cb0..4f7807aa 100644 --- a/web3_proxy/src/user_queries.rs +++ b/web3_proxy/src/user_queries.rs @@ -15,7 +15,7 @@ use sea_orm::{ }; use tracing::{instrument, trace}; -use crate::app::Web3ProxyApp; +use crate::{app::Web3ProxyApp, user_token::UserBearerToken}; /// get the attached address from redis for the given auth_token. /// 0 means all users @@ -27,10 +27,9 @@ async fn get_user_id_from_params( params: &HashMap, ) -> anyhow::Result { match (bearer, params.get("user_id")) { - (Some(bearer), Some(user_id)) => { + (Some(TypedHeader(Authorization(bearer))), Some(user_id)) => { // check for the bearer cache key - // TODO: move this to a helper function - let bearer_cache_key = format!("bearer:{}", bearer.token()); + let bearer_cache_key = UserBearerToken::try_from(bearer)?.to_string(); // get the user id that is attached to this bearer token redis_conn