diff --git a/web3_proxy/src/admin_queries.rs b/web3_proxy/src/admin_queries.rs index 8538a691..06500213 100644 --- a/web3_proxy/src/admin_queries.rs +++ b/web3_proxy/src/admin_queries.rs @@ -1,8 +1,8 @@ use crate::app::Web3ProxyApp; -use crate::frontend::errors::FrontendErrorResponse; +use crate::frontend::errors::{Web3ProxyError, Web3ProxyResponse}; use crate::http_params::get_user_id_from_params; use anyhow::Context; -use axum::response::{IntoResponse, Response}; +use axum::response::IntoResponse; use axum::{ headers::{authorization::Bearer, Authorization}, Json, TypedHeader, @@ -24,25 +24,21 @@ pub async fn query_admin_modify_usertier<'a>( app: &'a Web3ProxyApp, bearer: Option>>, params: &'a HashMap, -) -> Result { +) -> Web3ProxyResponse { // Quickly return if any of the input tokens are bad let user_address: Vec = params .get("user_address") .ok_or_else(|| { - FrontendErrorResponse::BadRequest( - "Unable to find user_address key in request".to_string(), - ) + Web3ProxyError::BadRequest("Unable to find user_address key in request".to_string()) })? .parse::
() .map_err(|_| { - FrontendErrorResponse::BadRequest( - "Unable to parse user_address as an Address".to_string(), - ) + Web3ProxyError::BadRequest("Unable to parse user_address as an Address".to_string()) })? .to_fixed_bytes() .into(); let user_tier_title = params.get("user_tier_title").ok_or_else(|| { - FrontendErrorResponse::BadRequest( + Web3ProxyError::BadRequest( "Unable to get the user_tier_title key from the request".to_string(), ) })?; @@ -78,7 +74,7 @@ pub async fn query_admin_modify_usertier<'a>( .filter(admin::Column::UserId.eq(caller_id)) .one(&db_conn) .await? - .ok_or(FrontendErrorResponse::AccessDenied)?; + .ok_or(Web3ProxyError::AccessDenied)?; // If we are here, that means an admin was found, and we can safely proceed @@ -87,7 +83,7 @@ pub async fn query_admin_modify_usertier<'a>( .filter(user::Column::Address.eq(user_address)) .one(&db_conn) .await? - .ok_or(FrontendErrorResponse::BadRequest( + .ok_or(Web3ProxyError::BadRequest( "No user with this id found".to_string(), ))?; // Return early if the target user_tier_id is the same as the original user_tier_id @@ -101,7 +97,7 @@ pub async fn query_admin_modify_usertier<'a>( .filter(user_tier::Column::Title.eq(user_tier_title.clone())) .one(&db_conn) .await? - .ok_or(FrontendErrorResponse::BadRequest( + .ok_or(Web3ProxyError::BadRequest( "User Tier name was not found".to_string(), ))?; diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index ae8445b5..2ff67bee 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -4,12 +4,12 @@ mod ws; use crate::block_number::{block_needed, BlockNeeded}; use crate::config::{AppConfig, TopConfig}; use crate::frontend::authorization::{Authorization, RequestMetadata, RpcSecretKey}; -use crate::frontend::errors::FrontendErrorResponse; +use crate::frontend::errors::{Web3ProxyError, Web3ProxyResult}; use crate::frontend::rpc_proxy_ws::ProxyMode; use crate::jsonrpc::{ JsonRpcForwardedResponse, JsonRpcForwardedResponseEnum, JsonRpcRequest, JsonRpcRequestEnum, }; -use crate::rpcs::blockchain::{BlocksByHashCache, Web3ProxyBlock}; +use crate::rpcs::blockchain::Web3ProxyBlock; use crate::rpcs::consensus::ConsensusWeb3Rpcs; use crate::rpcs::many::Web3Rpcs; use crate::rpcs::one::Web3Rpc; @@ -407,7 +407,7 @@ impl Web3ProxyApp { shutdown_sender: broadcast::Sender<()>, ) -> anyhow::Result { let rpc_account_shutdown_recevier = shutdown_sender.subscribe(); - let mut background_shutdown_receiver = shutdown_sender.subscribe(); + let _background_shutdown_receiver = shutdown_sender.subscribe(); // safety checks on the config // while i would prefer this to be in a "apply_top_config" function, that is a larger refactor @@ -811,26 +811,27 @@ impl Web3ProxyApp { app_handles.push(config_handle); } -// ======= -// if important_background_handles.is_empty() { -// info!("no important background handles"); -// -// let f = tokio::spawn(async move { -// let _ = background_shutdown_receiver.recv().await; -// -// Ok(()) -// }); -// -// important_background_handles.push(f); -// >>>>>>> 77df3fa (stats v2) + // ======= + // if important_background_handles.is_empty() { + // info!("no important background handles"); + // + // let f = tokio::spawn(async move { + // let _ = background_shutdown_receiver.recv().await; + // + // Ok(()) + // }); + // + // important_background_handles.push(f); + // >>>>>>> 77df3fa (stats v2) Ok(( app, app_handles, important_background_handles, new_top_config_sender, - consensus_connections_watcher - ).into()) + consensus_connections_watcher, + ) + .into()) } pub async fn apply_top_config(&self, new_top_config: TopConfig) -> anyhow::Result<()> { @@ -1041,7 +1042,7 @@ impl Web3ProxyApp { self: &Arc, authorization: Arc, request: JsonRpcRequestEnum, - ) -> Result<(JsonRpcForwardedResponseEnum, Vec>), FrontendErrorResponse> { + ) -> Web3ProxyResult<(JsonRpcForwardedResponseEnum, Vec>)> { // trace!(?request, "proxy_web3_rpc"); // even though we have timeouts on the requests to our backend providers, @@ -1079,7 +1080,7 @@ impl Web3ProxyApp { self: &Arc, authorization: &Arc, requests: Vec, - ) -> Result<(Vec, Vec>), FrontendErrorResponse> { + ) -> Web3ProxyResult<(Vec, Vec>)> { // TODO: we should probably change ethers-rs to support this directly. they pushed this off to v2 though let num_requests = requests.len(); @@ -1087,7 +1088,7 @@ impl Web3ProxyApp { // TODO: improve flattening // get the head block now so that any requests that need it all use the same block - // TODO: FrontendErrorResponse that handles "no servers synced" in a consistent way + // TODO: Web3ProxyError that handles "no servers synced" in a consistent way // TODO: this still has an edge condition if there is a reorg in the middle of the request!!! let head_block_num = self .balanced_rpcs @@ -1153,7 +1154,7 @@ impl Web3ProxyApp { authorization: &Arc, mut request: JsonRpcRequest, head_block_num: Option, - ) -> Result<(JsonRpcForwardedResponse, Vec>), FrontendErrorResponse> { + ) -> Web3ProxyResult<(JsonRpcForwardedResponse, Vec>)> { // trace!("Received request: {:?}", request); let request_metadata = Arc::new(RequestMetadata::new(request.num_bytes())?); @@ -1591,7 +1592,7 @@ impl Web3ProxyApp { ) .map_err(|x| { trace!("bad request: {:?}", x); - FrontendErrorResponse::BadRequest( + Web3ProxyError::BadRequest( "param 0 could not be read as H256".to_string(), ) })?; @@ -1628,7 +1629,7 @@ impl Web3ProxyApp { method => { if method.starts_with("admin_") { // TODO: emit a stat? will probably just be noise - return Err(FrontendErrorResponse::AccessDenied); + return Err(Web3ProxyError::AccessDenied); } // emit stats diff --git a/web3_proxy/src/frontend/admin.rs b/web3_proxy/src/frontend/admin.rs index 4d55af2a..8df584e6 100644 --- a/web3_proxy/src/frontend/admin.rs +++ b/web3_proxy/src/frontend/admin.rs @@ -1,10 +1,10 @@ //! Handle admin helper logic use super::authorization::login_is_authorized; -use super::errors::FrontendResult; +use super::errors::Web3ProxyResponse; use crate::admin_queries::query_admin_modify_usertier; use crate::app::Web3ProxyApp; -use crate::frontend::errors::FrontendErrorResponse; +use crate::frontend::errors::Web3ProxyError; use crate::user_token::UserBearerToken; use crate::PostLogin; use anyhow::Context; @@ -43,7 +43,7 @@ pub async fn admin_change_user_roles( Extension(app): Extension>, bearer: Option>>, Query(params): Query>, -) -> FrontendResult { +) -> Web3ProxyResponse { let response = query_admin_modify_usertier(&app, bearer, ¶ms).await?; Ok(response) @@ -58,7 +58,7 @@ pub async fn admin_login_get( Extension(app): Extension>, InsecureClientIp(ip): InsecureClientIp, Path(mut params): Path>, -) -> FrontendResult { +) -> Web3ProxyResponse { // First check if the login is authorized login_is_authorized(&app, ip).await?; @@ -85,30 +85,22 @@ pub async fn admin_login_get( let admin_address: Address = params .get("admin_address") .ok_or_else(|| { - FrontendErrorResponse::BadRequest( - "Unable to find admin_address key in request".to_string(), - ) + Web3ProxyError::BadRequest("Unable to find admin_address key in request".to_string()) })? .parse::
() .map_err(|_err| { - FrontendErrorResponse::BadRequest( - "Unable to parse admin_address as an Address".to_string(), - ) + Web3ProxyError::BadRequest("Unable to parse admin_address as an Address".to_string()) })?; // Fetch the user_address parameter from the login string ... (as who we want to be logging in ...) let user_address: Vec = params .get("user_address") .ok_or_else(|| { - FrontendErrorResponse::BadRequest( - "Unable to find user_address key in request".to_string(), - ) + Web3ProxyError::BadRequest("Unable to find user_address key in request".to_string()) })? .parse::
() .map_err(|_err| { - FrontendErrorResponse::BadRequest( - "Unable to parse user_address as an Address".to_string(), - ) + Web3ProxyError::BadRequest("Unable to parse user_address as an Address".to_string()) })? .to_fixed_bytes() .into(); @@ -156,7 +148,7 @@ pub async fn admin_login_get( .filter(user::Column::Address.eq(user_address)) .one(db_replica.conn()) .await? - .ok_or(FrontendErrorResponse::BadRequest( + .ok_or(Web3ProxyError::BadRequest( "Could not find user in db".to_string(), ))?; @@ -164,7 +156,7 @@ pub async fn admin_login_get( .filter(user::Column::Address.eq(admin_address.encode())) .one(db_replica.conn()) .await? - .ok_or(FrontendErrorResponse::BadRequest( + .ok_or(Web3ProxyError::BadRequest( "Could not find admin in db".to_string(), ))?; @@ -233,7 +225,7 @@ pub async fn admin_login_post( Extension(app): Extension>, InsecureClientIp(ip): InsecureClientIp, Json(payload): Json, -) -> FrontendResult { +) -> Web3ProxyResponse { login_is_authorized(&app, ip).await?; // Check for the signed bytes .. @@ -422,7 +414,7 @@ pub async fn admin_login_post( pub async fn admin_logout_post( Extension(app): Extension>, TypedHeader(Authorization(bearer)): TypedHeader>, -) -> FrontendResult { +) -> Web3ProxyResponse { let user_bearer = UserBearerToken::try_from(bearer)?; let db_conn = app.db_conn().context("database needed for user logout")?; diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index 4ab6d66f..fb0ad581 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -1,6 +1,6 @@ //! Utilities for authorization of logged in and anonymous users. -use super::errors::FrontendErrorResponse; +use super::errors::{Web3ProxyError, Web3ProxyResult}; use super::rpc_proxy_ws::ProxyMode; use crate::app::{AuthorizationChecks, Web3ProxyApp, APP_USER_AGENT}; use crate::rpcs::one::Web3Rpc; @@ -308,14 +308,11 @@ impl Authorization { /// rate limit logins only by ip. /// we want all origins and referers and user agents to count together -pub async fn login_is_authorized( - app: &Web3ProxyApp, - ip: IpAddr, -) -> Result { +pub async fn login_is_authorized(app: &Web3ProxyApp, ip: IpAddr) -> Web3ProxyResult { let authorization = match app.rate_limit_login(ip, ProxyMode::Best).await? { RateLimitResult::Allowed(authorization, None) => authorization, RateLimitResult::RateLimited(authorization, retry_at) => { - return Err(FrontendErrorResponse::RateLimited(authorization, retry_at)); + return Err(Web3ProxyError::RateLimited(authorization, retry_at)); } // TODO: don't panic. give the user an error x => unimplemented!("rate_limit_login shouldn't ever see these: {:?}", x), @@ -330,7 +327,7 @@ pub async fn ip_is_authorized( ip: IpAddr, origin: Option, proxy_mode: ProxyMode, -) -> Result<(Authorization, Option), FrontendErrorResponse> { +) -> Web3ProxyResult<(Authorization, Option)> { // TODO: i think we could write an `impl From` for this // TODO: move this to an AuthorizedUser extrator let (authorization, semaphore) = match app @@ -345,7 +342,7 @@ pub async fn ip_is_authorized( RateLimitResult::Allowed(authorization, semaphore) => (authorization, semaphore), RateLimitResult::RateLimited(authorization, retry_at) => { // TODO: in the background, emit a stat (maybe simplest to use a channel?) - return Err(FrontendErrorResponse::RateLimited(authorization, retry_at)); + return Err(Web3ProxyError::RateLimited(authorization, retry_at)); } // TODO: don't panic. give the user an error x => unimplemented!("rate_limit_by_ip shouldn't ever see these: {:?}", x), @@ -389,7 +386,7 @@ pub async fn ip_is_authorized( Ok((authorization, semaphore)) } -/// like app.rate_limit_by_rpc_key but converts to a FrontendErrorResponse; +/// like app.rate_limit_by_rpc_key but converts to a Web3ProxyError; pub async fn key_is_authorized( app: &Arc, rpc_key: RpcSecretKey, @@ -398,7 +395,7 @@ pub async fn key_is_authorized( proxy_mode: ProxyMode, referer: Option, user_agent: Option, -) -> Result<(Authorization, Option), FrontendErrorResponse> { +) -> Web3ProxyResult<(Authorization, Option)> { // check the rate limits. error if over the limit // TODO: i think this should be in an "impl From" or "impl Into" let (authorization, semaphore) = match app @@ -407,9 +404,9 @@ pub async fn key_is_authorized( { RateLimitResult::Allowed(authorization, semaphore) => (authorization, semaphore), RateLimitResult::RateLimited(authorization, retry_at) => { - return Err(FrontendErrorResponse::RateLimited(authorization, retry_at)); + return Err(Web3ProxyError::RateLimited(authorization, retry_at)); } - RateLimitResult::UnknownKey => return Err(FrontendErrorResponse::UnknownKey), + RateLimitResult::UnknownKey => return Err(Web3ProxyError::UnknownKey), }; // TODO: DRY and maybe optimize the hashing @@ -517,7 +514,7 @@ impl Web3ProxyApp { pub async fn bearer_is_authorized( &self, bearer: Bearer, - ) -> Result<(user::Model, OwnedSemaphorePermit), FrontendErrorResponse> { + ) -> Web3ProxyResult<(user::Model, OwnedSemaphorePermit)> { // get the user id for this bearer token let user_bearer_token = UserBearerToken::try_from(bearer)?; @@ -869,7 +866,7 @@ impl Authorization { pub async fn check_again( &self, app: &Arc, - ) -> Result<(Arc, Option), FrontendErrorResponse> { + ) -> Web3ProxyResult<(Arc, Option)> { // TODO: we could probably do this without clones. but this is easy let (a, s) = if let Some(rpc_secret_key) = self.checks.rpc_secret_key { key_is_authorized( diff --git a/web3_proxy/src/frontend/errors.rs b/web3_proxy/src/frontend/errors.rs index cf791e88..515f5c86 100644 --- a/web3_proxy/src/frontend/errors.rs +++ b/web3_proxy/src/frontend/errors.rs @@ -17,12 +17,13 @@ use redis_rate_limiter::redis::RedisError; use reqwest::header::ToStrError; use tokio::{sync::AcquireError, task::JoinError, time::Instant}; +pub type Web3ProxyResult = Result; // TODO: take "IntoResponse" instead of Response? -pub type FrontendResult = Result; +pub type Web3ProxyResponse = Web3ProxyResult; // TODO: #[derive(Debug, From)] -pub enum FrontendErrorResponse { +pub enum Web3ProxyError { AccessDenied, Anyhow(anyhow::Error), BadRequest(String), @@ -46,7 +47,7 @@ pub enum FrontendErrorResponse { UnknownKey, } -impl FrontendErrorResponse { +impl Web3ProxyError { pub fn into_response_parts(self) -> (StatusCode, JsonRpcForwardedResponse) { match self { Self::AccessDenied => { @@ -296,7 +297,7 @@ impl FrontendErrorResponse { } } -impl IntoResponse for FrontendErrorResponse { +impl IntoResponse for Web3ProxyError { fn into_response(self) -> Response { // TODO: include the request id in these so that users can give us something that will point to logs // TODO: status code is in the jsonrpc response and is also the first item in the tuple. DRY @@ -307,5 +308,5 @@ impl IntoResponse for FrontendErrorResponse { } pub async fn handler_404() -> Response { - FrontendErrorResponse::NotFound.into_response() + Web3ProxyError::NotFound.into_response() } diff --git a/web3_proxy/src/frontend/rpc_proxy_http.rs b/web3_proxy/src/frontend/rpc_proxy_http.rs index 1ddd93f5..158f5215 100644 --- a/web3_proxy/src/frontend/rpc_proxy_http.rs +++ b/web3_proxy/src/frontend/rpc_proxy_http.rs @@ -1,7 +1,7 @@ //! Take a user's HTTP JSON-RPC requests and either respond from local data or proxy the request to a backend rpc server. use super::authorization::{ip_is_authorized, key_is_authorized}; -use super::errors::FrontendResult; +use super::errors::Web3ProxyResponse; use super::rpc_proxy_ws::ProxyMode; use crate::{app::Web3ProxyApp, jsonrpc::JsonRpcRequestEnum}; use axum::extract::Path; @@ -22,7 +22,7 @@ pub async fn proxy_web3_rpc( ip: InsecureClientIp, origin: Option>, Json(payload): Json, -) -> FrontendResult { +) -> Web3ProxyResponse { _proxy_web3_rpc(app, ip, origin, payload, ProxyMode::Best).await } @@ -32,7 +32,7 @@ pub async fn fastest_proxy_web3_rpc( ip: InsecureClientIp, origin: Option>, Json(payload): Json, -) -> FrontendResult { +) -> Web3ProxyResponse { // TODO: read the fastest number from params // TODO: check that the app allows this without authentication _proxy_web3_rpc(app, ip, origin, payload, ProxyMode::Fastest(0)).await @@ -44,7 +44,7 @@ pub async fn versus_proxy_web3_rpc( ip: InsecureClientIp, origin: Option>, Json(payload): Json, -) -> FrontendResult { +) -> Web3ProxyResponse { _proxy_web3_rpc(app, ip, origin, payload, ProxyMode::Versus).await } @@ -54,7 +54,7 @@ async fn _proxy_web3_rpc( origin: Option>, payload: JsonRpcRequestEnum, proxy_mode: ProxyMode, -) -> FrontendResult { +) -> Web3ProxyResponse { // TODO: benchmark spawning this // TODO: do we care about keeping the TypedHeader wrapper? let origin = origin.map(|x| x.0); @@ -124,7 +124,7 @@ pub async fn proxy_web3_rpc_with_key( user_agent: Option>, Path(rpc_key): Path, Json(payload): Json, -) -> FrontendResult { +) -> Web3ProxyResponse { _proxy_web3_rpc_with_key( app, ip, @@ -147,7 +147,7 @@ pub async fn debug_proxy_web3_rpc_with_key( user_agent: Option>, Path(rpc_key): Path, Json(payload): Json, -) -> FrontendResult { +) -> Web3ProxyResponse { _proxy_web3_rpc_with_key( app, ip, @@ -170,7 +170,7 @@ pub async fn fastest_proxy_web3_rpc_with_key( user_agent: Option>, Path(rpc_key): Path, Json(payload): Json, -) -> FrontendResult { +) -> Web3ProxyResponse { _proxy_web3_rpc_with_key( app, ip, @@ -193,7 +193,7 @@ pub async fn versus_proxy_web3_rpc_with_key( user_agent: Option>, Path(rpc_key): Path, Json(payload): Json, -) -> FrontendResult { +) -> Web3ProxyResponse { _proxy_web3_rpc_with_key( app, ip, @@ -217,7 +217,7 @@ async fn _proxy_web3_rpc_with_key( rpc_key: String, payload: JsonRpcRequestEnum, proxy_mode: ProxyMode, -) -> FrontendResult { +) -> Web3ProxyResponse { // TODO: DRY w/ proxy_web3_rpc // the request can take a while, so we spawn so that we can start serving another request let rpc_key = rpc_key.parse()?; diff --git a/web3_proxy/src/frontend/rpc_proxy_ws.rs b/web3_proxy/src/frontend/rpc_proxy_ws.rs index 072ad854..3e3a6957 100644 --- a/web3_proxy/src/frontend/rpc_proxy_ws.rs +++ b/web3_proxy/src/frontend/rpc_proxy_ws.rs @@ -3,7 +3,7 @@ //! WebSockets are the preferred method of receiving requests, but not all clients have good support. use super::authorization::{ip_is_authorized, key_is_authorized, Authorization, RequestMetadata}; -use super::errors::{FrontendErrorResponse, FrontendResult}; +use super::errors::{Web3ProxyError, Web3ProxyResponse}; use crate::stats::RpcQueryStats; use crate::{ app::Web3ProxyApp, @@ -59,7 +59,7 @@ pub async fn websocket_handler( ip: InsecureClientIp, origin: Option>, ws_upgrade: Option, -) -> FrontendResult { +) -> Web3ProxyResponse { _websocket_handler(ProxyMode::Best, app, ip, origin, ws_upgrade).await } @@ -71,7 +71,7 @@ pub async fn fastest_websocket_handler( ip: InsecureClientIp, origin: Option>, ws_upgrade: Option, -) -> FrontendResult { +) -> Web3ProxyResponse { // TODO: get the fastest number from the url params (default to 0/all) // TODO: config to disable this _websocket_handler(ProxyMode::Fastest(0), app, ip, origin, ws_upgrade).await @@ -85,7 +85,7 @@ pub async fn versus_websocket_handler( ip: InsecureClientIp, origin: Option>, ws_upgrade: Option, -) -> FrontendResult { +) -> Web3ProxyResponse { // TODO: config to disable this _websocket_handler(ProxyMode::Versus, app, ip, origin, ws_upgrade).await } @@ -96,7 +96,7 @@ async fn _websocket_handler( InsecureClientIp(ip): InsecureClientIp, origin: Option>, ws_upgrade: Option, -) -> FrontendResult { +) -> Web3ProxyResponse { let origin = origin.map(|x| x.0); let (authorization, _semaphore) = ip_is_authorized(&app, ip, origin, proxy_mode).await?; @@ -134,7 +134,7 @@ pub async fn websocket_handler_with_key( referer: Option>, user_agent: Option>, ws_upgrade: Option, -) -> FrontendResult { +) -> Web3ProxyResponse { _websocket_handler_with_key( ProxyMode::Best, app, @@ -157,7 +157,7 @@ pub async fn debug_websocket_handler_with_key( referer: Option>, user_agent: Option>, ws_upgrade: Option, -) -> FrontendResult { +) -> Web3ProxyResponse { _websocket_handler_with_key( ProxyMode::Debug, app, @@ -180,7 +180,7 @@ pub async fn fastest_websocket_handler_with_key( referer: Option>, user_agent: Option>, ws_upgrade: Option, -) -> FrontendResult { +) -> Web3ProxyResponse { // TODO: get the fastest number from the url params (default to 0/all) _websocket_handler_with_key( ProxyMode::Fastest(0), @@ -204,7 +204,7 @@ pub async fn versus_websocket_handler_with_key( referer: Option>, user_agent: Option>, ws_upgrade: Option, -) -> FrontendResult { +) -> Web3ProxyResponse { _websocket_handler_with_key( ProxyMode::Versus, app, @@ -228,7 +228,7 @@ async fn _websocket_handler_with_key( referer: Option>, user_agent: Option>, ws_upgrade: Option, -) -> FrontendResult { +) -> Web3ProxyResponse { let rpc_key = rpc_key.parse()?; let (authorization, _semaphore) = key_is_authorized( @@ -260,7 +260,7 @@ async fn _websocket_handler_with_key( &app.config.redirect_rpc_key_url, authorization.checks.rpc_secret_key_id, ) { - (None, None, _) => Err(FrontendErrorResponse::StatusCode( + (None, None, _) => Err(Web3ProxyError::StatusCode( StatusCode::BAD_REQUEST, "this page is for rpcs".to_string(), None, @@ -273,7 +273,7 @@ async fn _websocket_handler_with_key( if authorization.checks.rpc_secret_key_id.is_none() { // i don't think this is possible - Err(FrontendErrorResponse::StatusCode( + Err(Web3ProxyError::StatusCode( StatusCode::UNAUTHORIZED, "AUTHORIZATION header required".to_string(), None, @@ -291,7 +291,7 @@ async fn _websocket_handler_with_key( } } // any other combinations get a simple error - _ => Err(FrontendErrorResponse::StatusCode( + _ => Err(Web3ProxyError::StatusCode( StatusCode::BAD_REQUEST, "this page is for rpcs".to_string(), None, @@ -419,7 +419,7 @@ async fn handle_socket_payload( .await .map_or_else( |err| match err { - FrontendErrorResponse::Anyhow(err) => Err(err), + Web3ProxyError::Anyhow(err) => Err(err), _ => { error!("handle this better! {:?}", err); Err(anyhow::anyhow!("unexpected error! {:?}", err)) diff --git a/web3_proxy/src/frontend/users.rs b/web3_proxy/src/frontend/users.rs index 29210ae4..e8a43ba3 100644 --- a/web3_proxy/src/frontend/users.rs +++ b/web3_proxy/src/frontend/users.rs @@ -1,6 +1,6 @@ //! Handle registration, logins, and managing account data. use super::authorization::{login_is_authorized, RpcSecretKey}; -use super::errors::FrontendResult; +use super::errors::Web3ProxyResponse; use crate::app::Web3ProxyApp; use crate::http_params::{ get_chain_id_from_params, get_page_from_params, get_query_start_from_params, @@ -65,7 +65,7 @@ pub async fn user_login_get( InsecureClientIp(ip): InsecureClientIp, // TODO: what does axum's error handling look like if the path fails to parse? Path(mut params): Path>, -) -> FrontendResult { +) -> Web3ProxyResponse { login_is_authorized(&app, ip).await?; // create a message and save it in redis @@ -165,7 +165,7 @@ pub async fn user_login_post( InsecureClientIp(ip): InsecureClientIp, Query(query): Query, Json(payload): Json, -) -> FrontendResult { +) -> Web3ProxyResponse { login_is_authorized(&app, ip).await?; // TODO: this seems too verbose. how can we simply convert a String into a [u8; 65] @@ -373,7 +373,7 @@ pub async fn user_login_post( pub async fn user_logout_post( Extension(app): Extension>, TypedHeader(Authorization(bearer)): TypedHeader>, -) -> FrontendResult { +) -> Web3ProxyResponse { let user_bearer = UserBearerToken::try_from(bearer)?; let db_conn = app.db_conn().context("database needed for user logout")?; @@ -417,7 +417,7 @@ pub async fn user_logout_post( pub async fn user_get( Extension(app): Extension>, TypedHeader(Authorization(bearer_token)): TypedHeader>, -) -> FrontendResult { +) -> Web3ProxyResponse { let (user, _semaphore) = app.bearer_is_authorized(bearer_token).await?; Ok(Json(user).into_response()) @@ -435,7 +435,7 @@ pub async fn user_post( Extension(app): Extension>, TypedHeader(Authorization(bearer_token)): TypedHeader>, Json(payload): Json, -) -> FrontendResult { +) -> Web3ProxyResponse { let (user, _semaphore) = app.bearer_is_authorized(bearer_token).await?; let mut user: user::ActiveModel = user.into(); @@ -480,8 +480,8 @@ pub async fn user_post( pub async fn user_balance_get( Extension(app): Extension>, TypedHeader(Authorization(bearer)): TypedHeader>, -) -> FrontendResult { - let (user, _semaphore) = app.bearer_is_authorized(bearer).await?; +) -> Web3ProxyResponse { + let (_user, _semaphore) = app.bearer_is_authorized(bearer).await?; todo!("user_balance_get"); } @@ -495,8 +495,8 @@ pub async fn user_balance_get( pub async fn user_balance_post( Extension(app): Extension>, TypedHeader(Authorization(bearer)): TypedHeader>, -) -> FrontendResult { - let (user, _semaphore) = app.bearer_is_authorized(bearer).await?; +) -> Web3ProxyResponse { + let (_user, _semaphore) = app.bearer_is_authorized(bearer).await?; todo!("user_balance_post"); } @@ -506,7 +506,7 @@ pub async fn user_balance_post( pub async fn rpc_keys_get( Extension(app): Extension>, TypedHeader(Authorization(bearer)): TypedHeader>, -) -> FrontendResult { +) -> Web3ProxyResponse { let (user, _semaphore) = app.bearer_is_authorized(bearer).await?; let db_replica = app @@ -535,8 +535,8 @@ pub async fn rpc_keys_get( pub async fn rpc_keys_delete( Extension(app): Extension>, TypedHeader(Authorization(bearer)): TypedHeader>, -) -> FrontendResult { - let (user, _semaphore) = app.bearer_is_authorized(bearer).await?; +) -> Web3ProxyResponse { + let (_user, _semaphore) = app.bearer_is_authorized(bearer).await?; // TODO: think about how cascading deletes and billing should work Err(anyhow::anyhow!("work in progress").into()) @@ -567,7 +567,7 @@ pub async fn rpc_keys_management( Extension(app): Extension>, TypedHeader(Authorization(bearer)): TypedHeader>, Json(payload): Json, -) -> FrontendResult { +) -> Web3ProxyResponse { // TODO: is there a way we can know if this is a PUT or POST? right now we can modify or create keys with either. though that probably doesn't matter let (user, _semaphore) = app.bearer_is_authorized(bearer).await?; @@ -738,7 +738,7 @@ pub async fn user_revert_logs_get( Extension(app): Extension>, TypedHeader(Authorization(bearer)): TypedHeader>, Query(params): Query>, -) -> FrontendResult { +) -> Web3ProxyResponse { let (user, _semaphore) = app.bearer_is_authorized(bearer).await?; let chain_id = get_chain_id_from_params(app.as_ref(), ¶ms)?; @@ -808,7 +808,7 @@ pub async fn user_stats_aggregated_get( Extension(app): Extension>, bearer: Option>>, Query(params): Query>, -) -> FrontendResult { +) -> Web3ProxyResponse { let response = query_user_stats(&app, bearer, ¶ms, StatType::Aggregated).await?; Ok(response) @@ -828,7 +828,7 @@ pub async fn user_stats_detailed_get( Extension(app): Extension>, bearer: Option>>, Query(params): Query>, -) -> FrontendResult { +) -> Web3ProxyResponse { let response = query_user_stats(&app, bearer, ¶ms, StatType::Detailed).await?; Ok(response) diff --git a/web3_proxy/src/http_params.rs b/web3_proxy/src/http_params.rs index 8b30efa0..1ea7afee 100644 --- a/web3_proxy/src/http_params.rs +++ b/web3_proxy/src/http_params.rs @@ -1,5 +1,5 @@ use crate::app::DatabaseReplica; -use crate::frontend::errors::FrontendErrorResponse; +use crate::frontend::errors::{Web3ProxyError, Web3ProxyResult}; use crate::{app::Web3ProxyApp, user_token::UserBearerToken}; use anyhow::Context; use axum::{ @@ -24,7 +24,7 @@ pub async fn get_user_id_from_params( // this is a long type. should we strip it down? bearer: Option>>, params: &HashMap, -) -> Result { +) -> Web3ProxyResult { match (bearer, params.get("user_id")) { (Some(TypedHeader(Authorization(bearer))), Some(user_id)) => { // check for the bearer cache key @@ -45,7 +45,7 @@ pub async fn get_user_id_from_params( .one(db_replica.conn()) .await .context("database error while querying for user")? - .ok_or(FrontendErrorResponse::AccessDenied)?; + .ok_or(Web3ProxyError::AccessDenied)?; // if expired, delete ALL expired logins let now = Utc::now(); @@ -60,7 +60,7 @@ pub async fn get_user_id_from_params( // TODO: emit a stat? if this is high something weird might be happening debug!("cleared expired logins: {:?}", delete_result); - return Err(FrontendErrorResponse::AccessDenied); + return Err(Web3ProxyError::AccessDenied); } save_to_redis = true; @@ -76,7 +76,7 @@ pub async fn get_user_id_from_params( let user_id: u64 = user_id.parse().context("Parsing user_id param")?; if bearer_user_id != user_id { - return Err(FrontendErrorResponse::AccessDenied); + return Err(Web3ProxyError::AccessDenied); } if save_to_redis { @@ -103,7 +103,7 @@ pub async fn get_user_id_from_params( // TODO: proper error code from a useful error code // TODO: maybe instead of this sharp edged warn, we have a config value? // TODO: check config for if we should deny or allow this - Err(FrontendErrorResponse::AccessDenied) + Err(Web3ProxyError::AccessDenied) // // TODO: make this a flag // warn!("allowing without auth during development!"); // Ok(x.parse()?) @@ -215,7 +215,7 @@ pub fn get_query_stop_from_params( pub fn get_query_window_seconds_from_params( params: &HashMap, -) -> Result { +) -> Web3ProxyResult { params.get("query_window_seconds").map_or_else( || { // no page in params. set default @@ -225,7 +225,7 @@ pub fn get_query_window_seconds_from_params( // parse the given timestamp query_window_seconds.parse::().map_err(|err| { trace!("Unable to parse rpc_key_id: {:#?}", err); - FrontendErrorResponse::BadRequest("Unable to parse rpc_key_id".to_string()) + Web3ProxyError::BadRequest("Unable to parse rpc_key_id".to_string()) }) }, ) diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index bda39389..480f02be 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -398,7 +398,7 @@ impl Web3Rpcs { consensus_finder: &mut ConsensusFinder, new_block: Option, rpc: Arc, - pending_tx_sender: &Option>, + _pending_tx_sender: &Option>, ) -> anyhow::Result<()> { // TODO: how should we handle an error here? if !consensus_finder diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 1a1b8354..ef704498 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -167,7 +167,8 @@ impl Web3Rpcs { .max_capacity(10_000) .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()); - let (watch_consensus_rpcs_sender, consensus_connections_watcher) = watch::channel(Default::default()); + let (watch_consensus_rpcs_sender, consensus_connections_watcher) = + watch::channel(Default::default()); // by_name starts empty. self.apply_server_configs will add to it let by_name = Default::default(); @@ -318,43 +319,43 @@ impl Web3Rpcs { } } -// <<<<<<< HEAD + // <<<<<<< HEAD Ok(()) } -// ======= -// // TODO: max_capacity and time_to_idle from config -// // all block hashes are the same size, so no need for weigher -// let block_hashes = Cache::builder() -// .time_to_idle(Duration::from_secs(600)) -// .max_capacity(10_000) -// .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()); -// // all block numbers are the same size, so no need for weigher -// let block_numbers = Cache::builder() -// .time_to_idle(Duration::from_secs(600)) -// .max_capacity(10_000) -// .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()); -// -// let (watch_consensus_connections_sender, consensus_connections_watcher) = -// watch::channel(Default::default()); -// -// let watch_consensus_head_receiver = -// watch_consensus_head_sender.as_ref().map(|x| x.subscribe()); -// -// let connections = Arc::new(Self { -// by_name: connections, -// watch_consensus_rpcs_sender: watch_consensus_connections_sender, -// watch_consensus_head_receiver, -// pending_transactions, -// block_hashes, -// block_numbers, -// min_sum_soft_limit, -// min_head_rpcs, -// max_block_age, -// max_block_lag, -// }); -// -// let authorization = Arc::new(Authorization::internal(db_conn.clone())?); -// >>>>>>> 77df3fa (stats v2) + // ======= + // // TODO: max_capacity and time_to_idle from config + // // all block hashes are the same size, so no need for weigher + // let block_hashes = Cache::builder() + // .time_to_idle(Duration::from_secs(600)) + // .max_capacity(10_000) + // .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()); + // // all block numbers are the same size, so no need for weigher + // let block_numbers = Cache::builder() + // .time_to_idle(Duration::from_secs(600)) + // .max_capacity(10_000) + // .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()); + // + // let (watch_consensus_connections_sender, consensus_connections_watcher) = + // watch::channel(Default::default()); + // + // let watch_consensus_head_receiver = + // watch_consensus_head_sender.as_ref().map(|x| x.subscribe()); + // + // let connections = Arc::new(Self { + // by_name: connections, + // watch_consensus_rpcs_sender: watch_consensus_connections_sender, + // watch_consensus_head_receiver, + // pending_transactions, + // block_hashes, + // block_numbers, + // min_sum_soft_limit, + // min_head_rpcs, + // max_block_age, + // max_block_lag, + // }); + // + // let authorization = Arc::new(Authorization::internal(db_conn.clone())?); + // >>>>>>> 77df3fa (stats v2) pub fn get(&self, conn_name: &str) -> Option> { self.by_name.read().get(conn_name).cloned() @@ -364,12 +365,12 @@ impl Web3Rpcs { self.by_name.read().len() } -// <<<<<<< HEAD + // <<<<<<< HEAD pub fn is_empty(&self) -> bool { self.by_name.read().is_empty() -// ======= -// Ok((connections, handle, consensus_connections_watcher)) -// >>>>>>> 77df3fa (stats v2) + // ======= + // Ok((connections, handle, consensus_connections_watcher)) + // >>>>>>> 77df3fa (stats v2) } pub fn min_head_rpcs(&self) -> usize { @@ -884,11 +885,11 @@ impl Web3Rpcs { // TODO: maximum retries? right now its the total number of servers loop { -// <<<<<<< HEAD + // <<<<<<< HEAD if skip_rpcs.len() >= self.by_name.read().len() { -// ======= -// if skip_rpcs.len() == self.by_name.len() { -// >>>>>>> 77df3fa (stats v2) + // ======= + // if skip_rpcs.len() == self.by_name.len() { + // >>>>>>> 77df3fa (stats v2) break; } @@ -1159,18 +1160,18 @@ impl Web3Rpcs { request_metadata.no_servers.fetch_add(1, Ordering::Release); } -// <<<<<<< HEAD + // <<<<<<< HEAD watch_consensus_rpcs.changed().await?; watch_consensus_rpcs.borrow_and_update(); -// ======= + // ======= // TODO: i don't think this will ever happen // TODO: return a 502? if it does? // return Err(anyhow::anyhow!("no available rpcs!")); // TODO: sleep how long? // TODO: subscribe to something in ConsensusWeb3Rpcs instead sleep(Duration::from_millis(200)).await; -// >>>>>>> 77df3fa (stats v2) + // >>>>>>> 77df3fa (stats v2) continue; } @@ -1216,7 +1217,7 @@ impl Web3Rpcs { ) .await } - ProxyMode::Fastest(x) => todo!("Fastest"), + ProxyMode::Fastest(_x) => todo!("Fastest"), ProxyMode::Versus => todo!("Versus"), } } @@ -1299,13 +1300,13 @@ mod tests { // TODO: why is this allow needed? does tokio::test get in the way somehow? #![allow(unused_imports)] - use std::time::{SystemTime, UNIX_EPOCH}; use super::*; use crate::rpcs::consensus::ConsensusFinder; use crate::rpcs::{blockchain::Web3ProxyBlock, provider::Web3Provider}; use ethers::types::{Block, U256}; use log::{trace, LevelFilter}; use parking_lot::RwLock; + use std::time::{SystemTime, UNIX_EPOCH}; use tokio::sync::RwLock as AsyncRwLock; #[tokio::test] diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index b908ecf1..93263a54 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -242,12 +242,12 @@ impl Web3Rpc { block_data_limit, reconnect, tier: config.tier, -// <<<<<<< HEAD + // <<<<<<< HEAD disconnect_watch: Some(disconnect_sender), created_at: Some(created_at), -// ======= + // ======= head_block: RwLock::new(Default::default()), -// >>>>>>> 77df3fa (stats v2) + // >>>>>>> 77df3fa (stats v2) ..Default::default() }; @@ -1110,7 +1110,7 @@ impl Web3Rpc { trace!("watching pending transactions on {}", self); // TODO: does this keep the lock open for too long? match provider.as_ref() { - Web3Provider::Http(provider) => { + Web3Provider::Http(_provider) => { // there is a "watch_pending_transactions" function, but a lot of public nodes do not support the necessary rpc endpoints self.wait_for_disconnect().await?; } @@ -1224,11 +1224,11 @@ impl Web3Rpc { } if let Some(hard_limit_until) = self.hard_limit_until.as_ref() { -// <<<<<<< HEAD + // <<<<<<< HEAD let hard_limit_ready = *hard_limit_until.borrow(); -// ======= -// let hard_limit_ready = hard_limit_until.borrow().to_owned(); -// >>>>>>> 77df3fa (stats v2) + // ======= + // let hard_limit_ready = hard_limit_until.borrow().to_owned(); + // >>>>>>> 77df3fa (stats v2) let now = Instant::now(); diff --git a/web3_proxy/src/stats/db_queries.rs b/web3_proxy/src/stats/db_queries.rs index 599b3cff..15a8808f 100644 --- a/web3_proxy/src/stats/db_queries.rs +++ b/web3_proxy/src/stats/db_queries.rs @@ -1,11 +1,11 @@ use crate::app::Web3ProxyApp; -use crate::frontend::errors::FrontendErrorResponse; +use crate::frontend::errors::{Web3ProxyError, Web3ProxyResponse, Web3ProxyResult}; use crate::http_params::{ get_chain_id_from_params, get_page_from_params, get_query_start_from_params, get_query_window_seconds_from_params, get_user_id_from_params, }; use anyhow::Context; -use axum::response::{IntoResponse, Response}; +use axum::response::IntoResponse; use axum::Json; use axum::{ headers::{authorization::Bearer, Authorization}, @@ -227,7 +227,7 @@ pub fn filter_query_window_seconds( query_window_seconds: u64, response: &mut HashMap<&str, serde_json::Value>, q: Select, -) -> Result, FrontendErrorResponse> { +) -> Web3ProxyResult> { if query_window_seconds == 0 { // TODO: order by more than this? // query_window_seconds is not set so we aggregate all records @@ -261,7 +261,7 @@ pub async fn query_user_stats<'a>( bearer: Option>>, params: &'a HashMap, stat_response_type: StatType, -) -> Result { +) -> Web3ProxyResponse { let db_conn = app.db_conn().context("query_user_stats needs a db")?; let db_replica = app .db_replica() @@ -408,7 +408,7 @@ pub async fn query_user_stats<'a>( // TODO: move getting the param and checking the bearer token into a helper function if let Some(rpc_key_id) = params.get("rpc_key_id") { let rpc_key_id = rpc_key_id.parse::().map_err(|e| { - FrontendErrorResponse::StatusCode( + Web3ProxyError::StatusCode( StatusCode::BAD_REQUEST, "Unable to parse rpc_key_id".to_string(), Some(e.into()), diff --git a/web3_proxy/src/stats/influxdb_queries.rs b/web3_proxy/src/stats/influxdb_queries.rs index d38f5865..dc36807b 100644 --- a/web3_proxy/src/stats/influxdb_queries.rs +++ b/web3_proxy/src/stats/influxdb_queries.rs @@ -1,7 +1,7 @@ use super::StatType; use crate::{ app::Web3ProxyApp, - frontend::errors::FrontendErrorResponse, + frontend::errors::Web3ProxyResponse, http_params::{ get_chain_id_from_params, get_query_start_from_params, get_query_stop_from_params, get_query_window_seconds_from_params, get_user_id_from_params, @@ -10,7 +10,7 @@ use crate::{ use anyhow::Context; use axum::{ headers::{authorization::Bearer, Authorization}, - response::{IntoResponse, Response}, + response::IntoResponse, Json, TypedHeader, }; use chrono::{DateTime, FixedOffset}; @@ -34,7 +34,7 @@ pub async fn query_user_stats<'a>( bearer: Option>>, params: &'a HashMap, stat_response_type: StatType, -) -> Result { +) -> Web3ProxyResponse { let db_conn = app.db_conn().context("query_user_stats needs a db")?; let db_replica = app .db_replica()