From f3fc4924dc4b041da9a88683a843a5da4b1b13ad Mon Sep 17 00:00:00 2001 From: Rory Neithinger Date: Thu, 16 Mar 2023 19:38:11 -0700 Subject: [PATCH 1/9] Refactor FrontendErrorResponse into Web3ProxyError Renamed FrontendResponse to Web3ProxyResponse and introduced a new generic type alias Web3ProxyResult. Fixed a few noisy cargo warnings. --- web3_proxy/src/admin_queries.rs | 22 +++-- web3_proxy/src/app/mod.rs | 47 +++++------ web3_proxy/src/frontend/admin.rs | 32 +++----- web3_proxy/src/frontend/authorization.rs | 25 +++--- web3_proxy/src/frontend/errors.rs | 11 +-- web3_proxy/src/frontend/rpc_proxy_http.rs | 20 ++--- web3_proxy/src/frontend/rpc_proxy_ws.rs | 28 +++---- web3_proxy/src/frontend/users.rs | 34 ++++---- web3_proxy/src/http_params.rs | 16 ++-- web3_proxy/src/rpcs/blockchain.rs | 2 +- web3_proxy/src/rpcs/many.rs | 99 ++++++++++++----------- web3_proxy/src/rpcs/one.rs | 16 ++-- web3_proxy/src/stats/db_queries.rs | 10 +-- web3_proxy/src/stats/influxdb_queries.rs | 6 +- 14 files changed, 178 insertions(+), 190 deletions(-) diff --git a/web3_proxy/src/admin_queries.rs b/web3_proxy/src/admin_queries.rs index 8538a691..06500213 100644 --- a/web3_proxy/src/admin_queries.rs +++ b/web3_proxy/src/admin_queries.rs @@ -1,8 +1,8 @@ use crate::app::Web3ProxyApp; -use crate::frontend::errors::FrontendErrorResponse; +use crate::frontend::errors::{Web3ProxyError, Web3ProxyResponse}; use crate::http_params::get_user_id_from_params; use anyhow::Context; -use axum::response::{IntoResponse, Response}; +use axum::response::IntoResponse; use axum::{ headers::{authorization::Bearer, Authorization}, Json, TypedHeader, @@ -24,25 +24,21 @@ pub async fn query_admin_modify_usertier<'a>( app: &'a Web3ProxyApp, bearer: Option>>, params: &'a HashMap, -) -> Result { +) -> Web3ProxyResponse { // Quickly return if any of the input tokens are bad let user_address: Vec = params .get("user_address") .ok_or_else(|| { - FrontendErrorResponse::BadRequest( - "Unable to find user_address key in request".to_string(), - ) + Web3ProxyError::BadRequest("Unable to find user_address key in request".to_string()) })? .parse::
() .map_err(|_| { - FrontendErrorResponse::BadRequest( - "Unable to parse user_address as an Address".to_string(), - ) + Web3ProxyError::BadRequest("Unable to parse user_address as an Address".to_string()) })? .to_fixed_bytes() .into(); let user_tier_title = params.get("user_tier_title").ok_or_else(|| { - FrontendErrorResponse::BadRequest( + Web3ProxyError::BadRequest( "Unable to get the user_tier_title key from the request".to_string(), ) })?; @@ -78,7 +74,7 @@ pub async fn query_admin_modify_usertier<'a>( .filter(admin::Column::UserId.eq(caller_id)) .one(&db_conn) .await? - .ok_or(FrontendErrorResponse::AccessDenied)?; + .ok_or(Web3ProxyError::AccessDenied)?; // If we are here, that means an admin was found, and we can safely proceed @@ -87,7 +83,7 @@ pub async fn query_admin_modify_usertier<'a>( .filter(user::Column::Address.eq(user_address)) .one(&db_conn) .await? - .ok_or(FrontendErrorResponse::BadRequest( + .ok_or(Web3ProxyError::BadRequest( "No user with this id found".to_string(), ))?; // Return early if the target user_tier_id is the same as the original user_tier_id @@ -101,7 +97,7 @@ pub async fn query_admin_modify_usertier<'a>( .filter(user_tier::Column::Title.eq(user_tier_title.clone())) .one(&db_conn) .await? - .ok_or(FrontendErrorResponse::BadRequest( + .ok_or(Web3ProxyError::BadRequest( "User Tier name was not found".to_string(), ))?; diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index ae8445b5..2ff67bee 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -4,12 +4,12 @@ mod ws; use crate::block_number::{block_needed, BlockNeeded}; use crate::config::{AppConfig, TopConfig}; use crate::frontend::authorization::{Authorization, RequestMetadata, RpcSecretKey}; -use crate::frontend::errors::FrontendErrorResponse; +use crate::frontend::errors::{Web3ProxyError, Web3ProxyResult}; use crate::frontend::rpc_proxy_ws::ProxyMode; use crate::jsonrpc::{ JsonRpcForwardedResponse, JsonRpcForwardedResponseEnum, JsonRpcRequest, JsonRpcRequestEnum, }; -use crate::rpcs::blockchain::{BlocksByHashCache, Web3ProxyBlock}; +use crate::rpcs::blockchain::Web3ProxyBlock; use crate::rpcs::consensus::ConsensusWeb3Rpcs; use crate::rpcs::many::Web3Rpcs; use crate::rpcs::one::Web3Rpc; @@ -407,7 +407,7 @@ impl Web3ProxyApp { shutdown_sender: broadcast::Sender<()>, ) -> anyhow::Result { let rpc_account_shutdown_recevier = shutdown_sender.subscribe(); - let mut background_shutdown_receiver = shutdown_sender.subscribe(); + let _background_shutdown_receiver = shutdown_sender.subscribe(); // safety checks on the config // while i would prefer this to be in a "apply_top_config" function, that is a larger refactor @@ -811,26 +811,27 @@ impl Web3ProxyApp { app_handles.push(config_handle); } -// ======= -// if important_background_handles.is_empty() { -// info!("no important background handles"); -// -// let f = tokio::spawn(async move { -// let _ = background_shutdown_receiver.recv().await; -// -// Ok(()) -// }); -// -// important_background_handles.push(f); -// >>>>>>> 77df3fa (stats v2) + // ======= + // if important_background_handles.is_empty() { + // info!("no important background handles"); + // + // let f = tokio::spawn(async move { + // let _ = background_shutdown_receiver.recv().await; + // + // Ok(()) + // }); + // + // important_background_handles.push(f); + // >>>>>>> 77df3fa (stats v2) Ok(( app, app_handles, important_background_handles, new_top_config_sender, - consensus_connections_watcher - ).into()) + consensus_connections_watcher, + ) + .into()) } pub async fn apply_top_config(&self, new_top_config: TopConfig) -> anyhow::Result<()> { @@ -1041,7 +1042,7 @@ impl Web3ProxyApp { self: &Arc, authorization: Arc, request: JsonRpcRequestEnum, - ) -> Result<(JsonRpcForwardedResponseEnum, Vec>), FrontendErrorResponse> { + ) -> Web3ProxyResult<(JsonRpcForwardedResponseEnum, Vec>)> { // trace!(?request, "proxy_web3_rpc"); // even though we have timeouts on the requests to our backend providers, @@ -1079,7 +1080,7 @@ impl Web3ProxyApp { self: &Arc, authorization: &Arc, requests: Vec, - ) -> Result<(Vec, Vec>), FrontendErrorResponse> { + ) -> Web3ProxyResult<(Vec, Vec>)> { // TODO: we should probably change ethers-rs to support this directly. they pushed this off to v2 though let num_requests = requests.len(); @@ -1087,7 +1088,7 @@ impl Web3ProxyApp { // TODO: improve flattening // get the head block now so that any requests that need it all use the same block - // TODO: FrontendErrorResponse that handles "no servers synced" in a consistent way + // TODO: Web3ProxyError that handles "no servers synced" in a consistent way // TODO: this still has an edge condition if there is a reorg in the middle of the request!!! let head_block_num = self .balanced_rpcs @@ -1153,7 +1154,7 @@ impl Web3ProxyApp { authorization: &Arc, mut request: JsonRpcRequest, head_block_num: Option, - ) -> Result<(JsonRpcForwardedResponse, Vec>), FrontendErrorResponse> { + ) -> Web3ProxyResult<(JsonRpcForwardedResponse, Vec>)> { // trace!("Received request: {:?}", request); let request_metadata = Arc::new(RequestMetadata::new(request.num_bytes())?); @@ -1591,7 +1592,7 @@ impl Web3ProxyApp { ) .map_err(|x| { trace!("bad request: {:?}", x); - FrontendErrorResponse::BadRequest( + Web3ProxyError::BadRequest( "param 0 could not be read as H256".to_string(), ) })?; @@ -1628,7 +1629,7 @@ impl Web3ProxyApp { method => { if method.starts_with("admin_") { // TODO: emit a stat? will probably just be noise - return Err(FrontendErrorResponse::AccessDenied); + return Err(Web3ProxyError::AccessDenied); } // emit stats diff --git a/web3_proxy/src/frontend/admin.rs b/web3_proxy/src/frontend/admin.rs index 4d55af2a..8df584e6 100644 --- a/web3_proxy/src/frontend/admin.rs +++ b/web3_proxy/src/frontend/admin.rs @@ -1,10 +1,10 @@ //! Handle admin helper logic use super::authorization::login_is_authorized; -use super::errors::FrontendResult; +use super::errors::Web3ProxyResponse; use crate::admin_queries::query_admin_modify_usertier; use crate::app::Web3ProxyApp; -use crate::frontend::errors::FrontendErrorResponse; +use crate::frontend::errors::Web3ProxyError; use crate::user_token::UserBearerToken; use crate::PostLogin; use anyhow::Context; @@ -43,7 +43,7 @@ pub async fn admin_change_user_roles( Extension(app): Extension>, bearer: Option>>, Query(params): Query>, -) -> FrontendResult { +) -> Web3ProxyResponse { let response = query_admin_modify_usertier(&app, bearer, ¶ms).await?; Ok(response) @@ -58,7 +58,7 @@ pub async fn admin_login_get( Extension(app): Extension>, InsecureClientIp(ip): InsecureClientIp, Path(mut params): Path>, -) -> FrontendResult { +) -> Web3ProxyResponse { // First check if the login is authorized login_is_authorized(&app, ip).await?; @@ -85,30 +85,22 @@ pub async fn admin_login_get( let admin_address: Address = params .get("admin_address") .ok_or_else(|| { - FrontendErrorResponse::BadRequest( - "Unable to find admin_address key in request".to_string(), - ) + Web3ProxyError::BadRequest("Unable to find admin_address key in request".to_string()) })? .parse::
() .map_err(|_err| { - FrontendErrorResponse::BadRequest( - "Unable to parse admin_address as an Address".to_string(), - ) + Web3ProxyError::BadRequest("Unable to parse admin_address as an Address".to_string()) })?; // Fetch the user_address parameter from the login string ... (as who we want to be logging in ...) let user_address: Vec = params .get("user_address") .ok_or_else(|| { - FrontendErrorResponse::BadRequest( - "Unable to find user_address key in request".to_string(), - ) + Web3ProxyError::BadRequest("Unable to find user_address key in request".to_string()) })? .parse::
() .map_err(|_err| { - FrontendErrorResponse::BadRequest( - "Unable to parse user_address as an Address".to_string(), - ) + Web3ProxyError::BadRequest("Unable to parse user_address as an Address".to_string()) })? .to_fixed_bytes() .into(); @@ -156,7 +148,7 @@ pub async fn admin_login_get( .filter(user::Column::Address.eq(user_address)) .one(db_replica.conn()) .await? - .ok_or(FrontendErrorResponse::BadRequest( + .ok_or(Web3ProxyError::BadRequest( "Could not find user in db".to_string(), ))?; @@ -164,7 +156,7 @@ pub async fn admin_login_get( .filter(user::Column::Address.eq(admin_address.encode())) .one(db_replica.conn()) .await? - .ok_or(FrontendErrorResponse::BadRequest( + .ok_or(Web3ProxyError::BadRequest( "Could not find admin in db".to_string(), ))?; @@ -233,7 +225,7 @@ pub async fn admin_login_post( Extension(app): Extension>, InsecureClientIp(ip): InsecureClientIp, Json(payload): Json, -) -> FrontendResult { +) -> Web3ProxyResponse { login_is_authorized(&app, ip).await?; // Check for the signed bytes .. @@ -422,7 +414,7 @@ pub async fn admin_login_post( pub async fn admin_logout_post( Extension(app): Extension>, TypedHeader(Authorization(bearer)): TypedHeader>, -) -> FrontendResult { +) -> Web3ProxyResponse { let user_bearer = UserBearerToken::try_from(bearer)?; let db_conn = app.db_conn().context("database needed for user logout")?; diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index 4ab6d66f..fb0ad581 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -1,6 +1,6 @@ //! Utilities for authorization of logged in and anonymous users. -use super::errors::FrontendErrorResponse; +use super::errors::{Web3ProxyError, Web3ProxyResult}; use super::rpc_proxy_ws::ProxyMode; use crate::app::{AuthorizationChecks, Web3ProxyApp, APP_USER_AGENT}; use crate::rpcs::one::Web3Rpc; @@ -308,14 +308,11 @@ impl Authorization { /// rate limit logins only by ip. /// we want all origins and referers and user agents to count together -pub async fn login_is_authorized( - app: &Web3ProxyApp, - ip: IpAddr, -) -> Result { +pub async fn login_is_authorized(app: &Web3ProxyApp, ip: IpAddr) -> Web3ProxyResult { let authorization = match app.rate_limit_login(ip, ProxyMode::Best).await? { RateLimitResult::Allowed(authorization, None) => authorization, RateLimitResult::RateLimited(authorization, retry_at) => { - return Err(FrontendErrorResponse::RateLimited(authorization, retry_at)); + return Err(Web3ProxyError::RateLimited(authorization, retry_at)); } // TODO: don't panic. give the user an error x => unimplemented!("rate_limit_login shouldn't ever see these: {:?}", x), @@ -330,7 +327,7 @@ pub async fn ip_is_authorized( ip: IpAddr, origin: Option, proxy_mode: ProxyMode, -) -> Result<(Authorization, Option), FrontendErrorResponse> { +) -> Web3ProxyResult<(Authorization, Option)> { // TODO: i think we could write an `impl From` for this // TODO: move this to an AuthorizedUser extrator let (authorization, semaphore) = match app @@ -345,7 +342,7 @@ pub async fn ip_is_authorized( RateLimitResult::Allowed(authorization, semaphore) => (authorization, semaphore), RateLimitResult::RateLimited(authorization, retry_at) => { // TODO: in the background, emit a stat (maybe simplest to use a channel?) - return Err(FrontendErrorResponse::RateLimited(authorization, retry_at)); + return Err(Web3ProxyError::RateLimited(authorization, retry_at)); } // TODO: don't panic. give the user an error x => unimplemented!("rate_limit_by_ip shouldn't ever see these: {:?}", x), @@ -389,7 +386,7 @@ pub async fn ip_is_authorized( Ok((authorization, semaphore)) } -/// like app.rate_limit_by_rpc_key but converts to a FrontendErrorResponse; +/// like app.rate_limit_by_rpc_key but converts to a Web3ProxyError; pub async fn key_is_authorized( app: &Arc, rpc_key: RpcSecretKey, @@ -398,7 +395,7 @@ pub async fn key_is_authorized( proxy_mode: ProxyMode, referer: Option, user_agent: Option, -) -> Result<(Authorization, Option), FrontendErrorResponse> { +) -> Web3ProxyResult<(Authorization, Option)> { // check the rate limits. error if over the limit // TODO: i think this should be in an "impl From" or "impl Into" let (authorization, semaphore) = match app @@ -407,9 +404,9 @@ pub async fn key_is_authorized( { RateLimitResult::Allowed(authorization, semaphore) => (authorization, semaphore), RateLimitResult::RateLimited(authorization, retry_at) => { - return Err(FrontendErrorResponse::RateLimited(authorization, retry_at)); + return Err(Web3ProxyError::RateLimited(authorization, retry_at)); } - RateLimitResult::UnknownKey => return Err(FrontendErrorResponse::UnknownKey), + RateLimitResult::UnknownKey => return Err(Web3ProxyError::UnknownKey), }; // TODO: DRY and maybe optimize the hashing @@ -517,7 +514,7 @@ impl Web3ProxyApp { pub async fn bearer_is_authorized( &self, bearer: Bearer, - ) -> Result<(user::Model, OwnedSemaphorePermit), FrontendErrorResponse> { + ) -> Web3ProxyResult<(user::Model, OwnedSemaphorePermit)> { // get the user id for this bearer token let user_bearer_token = UserBearerToken::try_from(bearer)?; @@ -869,7 +866,7 @@ impl Authorization { pub async fn check_again( &self, app: &Arc, - ) -> Result<(Arc, Option), FrontendErrorResponse> { + ) -> Web3ProxyResult<(Arc, Option)> { // TODO: we could probably do this without clones. but this is easy let (a, s) = if let Some(rpc_secret_key) = self.checks.rpc_secret_key { key_is_authorized( diff --git a/web3_proxy/src/frontend/errors.rs b/web3_proxy/src/frontend/errors.rs index cf791e88..515f5c86 100644 --- a/web3_proxy/src/frontend/errors.rs +++ b/web3_proxy/src/frontend/errors.rs @@ -17,12 +17,13 @@ use redis_rate_limiter::redis::RedisError; use reqwest::header::ToStrError; use tokio::{sync::AcquireError, task::JoinError, time::Instant}; +pub type Web3ProxyResult = Result; // TODO: take "IntoResponse" instead of Response? -pub type FrontendResult = Result; +pub type Web3ProxyResponse = Web3ProxyResult; // TODO: #[derive(Debug, From)] -pub enum FrontendErrorResponse { +pub enum Web3ProxyError { AccessDenied, Anyhow(anyhow::Error), BadRequest(String), @@ -46,7 +47,7 @@ pub enum FrontendErrorResponse { UnknownKey, } -impl FrontendErrorResponse { +impl Web3ProxyError { pub fn into_response_parts(self) -> (StatusCode, JsonRpcForwardedResponse) { match self { Self::AccessDenied => { @@ -296,7 +297,7 @@ impl FrontendErrorResponse { } } -impl IntoResponse for FrontendErrorResponse { +impl IntoResponse for Web3ProxyError { fn into_response(self) -> Response { // TODO: include the request id in these so that users can give us something that will point to logs // TODO: status code is in the jsonrpc response and is also the first item in the tuple. DRY @@ -307,5 +308,5 @@ impl IntoResponse for FrontendErrorResponse { } pub async fn handler_404() -> Response { - FrontendErrorResponse::NotFound.into_response() + Web3ProxyError::NotFound.into_response() } diff --git a/web3_proxy/src/frontend/rpc_proxy_http.rs b/web3_proxy/src/frontend/rpc_proxy_http.rs index 1ddd93f5..158f5215 100644 --- a/web3_proxy/src/frontend/rpc_proxy_http.rs +++ b/web3_proxy/src/frontend/rpc_proxy_http.rs @@ -1,7 +1,7 @@ //! Take a user's HTTP JSON-RPC requests and either respond from local data or proxy the request to a backend rpc server. use super::authorization::{ip_is_authorized, key_is_authorized}; -use super::errors::FrontendResult; +use super::errors::Web3ProxyResponse; use super::rpc_proxy_ws::ProxyMode; use crate::{app::Web3ProxyApp, jsonrpc::JsonRpcRequestEnum}; use axum::extract::Path; @@ -22,7 +22,7 @@ pub async fn proxy_web3_rpc( ip: InsecureClientIp, origin: Option>, Json(payload): Json, -) -> FrontendResult { +) -> Web3ProxyResponse { _proxy_web3_rpc(app, ip, origin, payload, ProxyMode::Best).await } @@ -32,7 +32,7 @@ pub async fn fastest_proxy_web3_rpc( ip: InsecureClientIp, origin: Option>, Json(payload): Json, -) -> FrontendResult { +) -> Web3ProxyResponse { // TODO: read the fastest number from params // TODO: check that the app allows this without authentication _proxy_web3_rpc(app, ip, origin, payload, ProxyMode::Fastest(0)).await @@ -44,7 +44,7 @@ pub async fn versus_proxy_web3_rpc( ip: InsecureClientIp, origin: Option>, Json(payload): Json, -) -> FrontendResult { +) -> Web3ProxyResponse { _proxy_web3_rpc(app, ip, origin, payload, ProxyMode::Versus).await } @@ -54,7 +54,7 @@ async fn _proxy_web3_rpc( origin: Option>, payload: JsonRpcRequestEnum, proxy_mode: ProxyMode, -) -> FrontendResult { +) -> Web3ProxyResponse { // TODO: benchmark spawning this // TODO: do we care about keeping the TypedHeader wrapper? let origin = origin.map(|x| x.0); @@ -124,7 +124,7 @@ pub async fn proxy_web3_rpc_with_key( user_agent: Option>, Path(rpc_key): Path, Json(payload): Json, -) -> FrontendResult { +) -> Web3ProxyResponse { _proxy_web3_rpc_with_key( app, ip, @@ -147,7 +147,7 @@ pub async fn debug_proxy_web3_rpc_with_key( user_agent: Option>, Path(rpc_key): Path, Json(payload): Json, -) -> FrontendResult { +) -> Web3ProxyResponse { _proxy_web3_rpc_with_key( app, ip, @@ -170,7 +170,7 @@ pub async fn fastest_proxy_web3_rpc_with_key( user_agent: Option>, Path(rpc_key): Path, Json(payload): Json, -) -> FrontendResult { +) -> Web3ProxyResponse { _proxy_web3_rpc_with_key( app, ip, @@ -193,7 +193,7 @@ pub async fn versus_proxy_web3_rpc_with_key( user_agent: Option>, Path(rpc_key): Path, Json(payload): Json, -) -> FrontendResult { +) -> Web3ProxyResponse { _proxy_web3_rpc_with_key( app, ip, @@ -217,7 +217,7 @@ async fn _proxy_web3_rpc_with_key( rpc_key: String, payload: JsonRpcRequestEnum, proxy_mode: ProxyMode, -) -> FrontendResult { +) -> Web3ProxyResponse { // TODO: DRY w/ proxy_web3_rpc // the request can take a while, so we spawn so that we can start serving another request let rpc_key = rpc_key.parse()?; diff --git a/web3_proxy/src/frontend/rpc_proxy_ws.rs b/web3_proxy/src/frontend/rpc_proxy_ws.rs index 072ad854..3e3a6957 100644 --- a/web3_proxy/src/frontend/rpc_proxy_ws.rs +++ b/web3_proxy/src/frontend/rpc_proxy_ws.rs @@ -3,7 +3,7 @@ //! WebSockets are the preferred method of receiving requests, but not all clients have good support. use super::authorization::{ip_is_authorized, key_is_authorized, Authorization, RequestMetadata}; -use super::errors::{FrontendErrorResponse, FrontendResult}; +use super::errors::{Web3ProxyError, Web3ProxyResponse}; use crate::stats::RpcQueryStats; use crate::{ app::Web3ProxyApp, @@ -59,7 +59,7 @@ pub async fn websocket_handler( ip: InsecureClientIp, origin: Option>, ws_upgrade: Option, -) -> FrontendResult { +) -> Web3ProxyResponse { _websocket_handler(ProxyMode::Best, app, ip, origin, ws_upgrade).await } @@ -71,7 +71,7 @@ pub async fn fastest_websocket_handler( ip: InsecureClientIp, origin: Option>, ws_upgrade: Option, -) -> FrontendResult { +) -> Web3ProxyResponse { // TODO: get the fastest number from the url params (default to 0/all) // TODO: config to disable this _websocket_handler(ProxyMode::Fastest(0), app, ip, origin, ws_upgrade).await @@ -85,7 +85,7 @@ pub async fn versus_websocket_handler( ip: InsecureClientIp, origin: Option>, ws_upgrade: Option, -) -> FrontendResult { +) -> Web3ProxyResponse { // TODO: config to disable this _websocket_handler(ProxyMode::Versus, app, ip, origin, ws_upgrade).await } @@ -96,7 +96,7 @@ async fn _websocket_handler( InsecureClientIp(ip): InsecureClientIp, origin: Option>, ws_upgrade: Option, -) -> FrontendResult { +) -> Web3ProxyResponse { let origin = origin.map(|x| x.0); let (authorization, _semaphore) = ip_is_authorized(&app, ip, origin, proxy_mode).await?; @@ -134,7 +134,7 @@ pub async fn websocket_handler_with_key( referer: Option>, user_agent: Option>, ws_upgrade: Option, -) -> FrontendResult { +) -> Web3ProxyResponse { _websocket_handler_with_key( ProxyMode::Best, app, @@ -157,7 +157,7 @@ pub async fn debug_websocket_handler_with_key( referer: Option>, user_agent: Option>, ws_upgrade: Option, -) -> FrontendResult { +) -> Web3ProxyResponse { _websocket_handler_with_key( ProxyMode::Debug, app, @@ -180,7 +180,7 @@ pub async fn fastest_websocket_handler_with_key( referer: Option>, user_agent: Option>, ws_upgrade: Option, -) -> FrontendResult { +) -> Web3ProxyResponse { // TODO: get the fastest number from the url params (default to 0/all) _websocket_handler_with_key( ProxyMode::Fastest(0), @@ -204,7 +204,7 @@ pub async fn versus_websocket_handler_with_key( referer: Option>, user_agent: Option>, ws_upgrade: Option, -) -> FrontendResult { +) -> Web3ProxyResponse { _websocket_handler_with_key( ProxyMode::Versus, app, @@ -228,7 +228,7 @@ async fn _websocket_handler_with_key( referer: Option>, user_agent: Option>, ws_upgrade: Option, -) -> FrontendResult { +) -> Web3ProxyResponse { let rpc_key = rpc_key.parse()?; let (authorization, _semaphore) = key_is_authorized( @@ -260,7 +260,7 @@ async fn _websocket_handler_with_key( &app.config.redirect_rpc_key_url, authorization.checks.rpc_secret_key_id, ) { - (None, None, _) => Err(FrontendErrorResponse::StatusCode( + (None, None, _) => Err(Web3ProxyError::StatusCode( StatusCode::BAD_REQUEST, "this page is for rpcs".to_string(), None, @@ -273,7 +273,7 @@ async fn _websocket_handler_with_key( if authorization.checks.rpc_secret_key_id.is_none() { // i don't think this is possible - Err(FrontendErrorResponse::StatusCode( + Err(Web3ProxyError::StatusCode( StatusCode::UNAUTHORIZED, "AUTHORIZATION header required".to_string(), None, @@ -291,7 +291,7 @@ async fn _websocket_handler_with_key( } } // any other combinations get a simple error - _ => Err(FrontendErrorResponse::StatusCode( + _ => Err(Web3ProxyError::StatusCode( StatusCode::BAD_REQUEST, "this page is for rpcs".to_string(), None, @@ -419,7 +419,7 @@ async fn handle_socket_payload( .await .map_or_else( |err| match err { - FrontendErrorResponse::Anyhow(err) => Err(err), + Web3ProxyError::Anyhow(err) => Err(err), _ => { error!("handle this better! {:?}", err); Err(anyhow::anyhow!("unexpected error! {:?}", err)) diff --git a/web3_proxy/src/frontend/users.rs b/web3_proxy/src/frontend/users.rs index 29210ae4..e8a43ba3 100644 --- a/web3_proxy/src/frontend/users.rs +++ b/web3_proxy/src/frontend/users.rs @@ -1,6 +1,6 @@ //! Handle registration, logins, and managing account data. use super::authorization::{login_is_authorized, RpcSecretKey}; -use super::errors::FrontendResult; +use super::errors::Web3ProxyResponse; use crate::app::Web3ProxyApp; use crate::http_params::{ get_chain_id_from_params, get_page_from_params, get_query_start_from_params, @@ -65,7 +65,7 @@ pub async fn user_login_get( InsecureClientIp(ip): InsecureClientIp, // TODO: what does axum's error handling look like if the path fails to parse? Path(mut params): Path>, -) -> FrontendResult { +) -> Web3ProxyResponse { login_is_authorized(&app, ip).await?; // create a message and save it in redis @@ -165,7 +165,7 @@ pub async fn user_login_post( InsecureClientIp(ip): InsecureClientIp, Query(query): Query, Json(payload): Json, -) -> FrontendResult { +) -> Web3ProxyResponse { login_is_authorized(&app, ip).await?; // TODO: this seems too verbose. how can we simply convert a String into a [u8; 65] @@ -373,7 +373,7 @@ pub async fn user_login_post( pub async fn user_logout_post( Extension(app): Extension>, TypedHeader(Authorization(bearer)): TypedHeader>, -) -> FrontendResult { +) -> Web3ProxyResponse { let user_bearer = UserBearerToken::try_from(bearer)?; let db_conn = app.db_conn().context("database needed for user logout")?; @@ -417,7 +417,7 @@ pub async fn user_logout_post( pub async fn user_get( Extension(app): Extension>, TypedHeader(Authorization(bearer_token)): TypedHeader>, -) -> FrontendResult { +) -> Web3ProxyResponse { let (user, _semaphore) = app.bearer_is_authorized(bearer_token).await?; Ok(Json(user).into_response()) @@ -435,7 +435,7 @@ pub async fn user_post( Extension(app): Extension>, TypedHeader(Authorization(bearer_token)): TypedHeader>, Json(payload): Json, -) -> FrontendResult { +) -> Web3ProxyResponse { let (user, _semaphore) = app.bearer_is_authorized(bearer_token).await?; let mut user: user::ActiveModel = user.into(); @@ -480,8 +480,8 @@ pub async fn user_post( pub async fn user_balance_get( Extension(app): Extension>, TypedHeader(Authorization(bearer)): TypedHeader>, -) -> FrontendResult { - let (user, _semaphore) = app.bearer_is_authorized(bearer).await?; +) -> Web3ProxyResponse { + let (_user, _semaphore) = app.bearer_is_authorized(bearer).await?; todo!("user_balance_get"); } @@ -495,8 +495,8 @@ pub async fn user_balance_get( pub async fn user_balance_post( Extension(app): Extension>, TypedHeader(Authorization(bearer)): TypedHeader>, -) -> FrontendResult { - let (user, _semaphore) = app.bearer_is_authorized(bearer).await?; +) -> Web3ProxyResponse { + let (_user, _semaphore) = app.bearer_is_authorized(bearer).await?; todo!("user_balance_post"); } @@ -506,7 +506,7 @@ pub async fn user_balance_post( pub async fn rpc_keys_get( Extension(app): Extension>, TypedHeader(Authorization(bearer)): TypedHeader>, -) -> FrontendResult { +) -> Web3ProxyResponse { let (user, _semaphore) = app.bearer_is_authorized(bearer).await?; let db_replica = app @@ -535,8 +535,8 @@ pub async fn rpc_keys_get( pub async fn rpc_keys_delete( Extension(app): Extension>, TypedHeader(Authorization(bearer)): TypedHeader>, -) -> FrontendResult { - let (user, _semaphore) = app.bearer_is_authorized(bearer).await?; +) -> Web3ProxyResponse { + let (_user, _semaphore) = app.bearer_is_authorized(bearer).await?; // TODO: think about how cascading deletes and billing should work Err(anyhow::anyhow!("work in progress").into()) @@ -567,7 +567,7 @@ pub async fn rpc_keys_management( Extension(app): Extension>, TypedHeader(Authorization(bearer)): TypedHeader>, Json(payload): Json, -) -> FrontendResult { +) -> Web3ProxyResponse { // TODO: is there a way we can know if this is a PUT or POST? right now we can modify or create keys with either. though that probably doesn't matter let (user, _semaphore) = app.bearer_is_authorized(bearer).await?; @@ -738,7 +738,7 @@ pub async fn user_revert_logs_get( Extension(app): Extension>, TypedHeader(Authorization(bearer)): TypedHeader>, Query(params): Query>, -) -> FrontendResult { +) -> Web3ProxyResponse { let (user, _semaphore) = app.bearer_is_authorized(bearer).await?; let chain_id = get_chain_id_from_params(app.as_ref(), ¶ms)?; @@ -808,7 +808,7 @@ pub async fn user_stats_aggregated_get( Extension(app): Extension>, bearer: Option>>, Query(params): Query>, -) -> FrontendResult { +) -> Web3ProxyResponse { let response = query_user_stats(&app, bearer, ¶ms, StatType::Aggregated).await?; Ok(response) @@ -828,7 +828,7 @@ pub async fn user_stats_detailed_get( Extension(app): Extension>, bearer: Option>>, Query(params): Query>, -) -> FrontendResult { +) -> Web3ProxyResponse { let response = query_user_stats(&app, bearer, ¶ms, StatType::Detailed).await?; Ok(response) diff --git a/web3_proxy/src/http_params.rs b/web3_proxy/src/http_params.rs index 8b30efa0..1ea7afee 100644 --- a/web3_proxy/src/http_params.rs +++ b/web3_proxy/src/http_params.rs @@ -1,5 +1,5 @@ use crate::app::DatabaseReplica; -use crate::frontend::errors::FrontendErrorResponse; +use crate::frontend::errors::{Web3ProxyError, Web3ProxyResult}; use crate::{app::Web3ProxyApp, user_token::UserBearerToken}; use anyhow::Context; use axum::{ @@ -24,7 +24,7 @@ pub async fn get_user_id_from_params( // this is a long type. should we strip it down? bearer: Option>>, params: &HashMap, -) -> Result { +) -> Web3ProxyResult { match (bearer, params.get("user_id")) { (Some(TypedHeader(Authorization(bearer))), Some(user_id)) => { // check for the bearer cache key @@ -45,7 +45,7 @@ pub async fn get_user_id_from_params( .one(db_replica.conn()) .await .context("database error while querying for user")? - .ok_or(FrontendErrorResponse::AccessDenied)?; + .ok_or(Web3ProxyError::AccessDenied)?; // if expired, delete ALL expired logins let now = Utc::now(); @@ -60,7 +60,7 @@ pub async fn get_user_id_from_params( // TODO: emit a stat? if this is high something weird might be happening debug!("cleared expired logins: {:?}", delete_result); - return Err(FrontendErrorResponse::AccessDenied); + return Err(Web3ProxyError::AccessDenied); } save_to_redis = true; @@ -76,7 +76,7 @@ pub async fn get_user_id_from_params( let user_id: u64 = user_id.parse().context("Parsing user_id param")?; if bearer_user_id != user_id { - return Err(FrontendErrorResponse::AccessDenied); + return Err(Web3ProxyError::AccessDenied); } if save_to_redis { @@ -103,7 +103,7 @@ pub async fn get_user_id_from_params( // TODO: proper error code from a useful error code // TODO: maybe instead of this sharp edged warn, we have a config value? // TODO: check config for if we should deny or allow this - Err(FrontendErrorResponse::AccessDenied) + Err(Web3ProxyError::AccessDenied) // // TODO: make this a flag // warn!("allowing without auth during development!"); // Ok(x.parse()?) @@ -215,7 +215,7 @@ pub fn get_query_stop_from_params( pub fn get_query_window_seconds_from_params( params: &HashMap, -) -> Result { +) -> Web3ProxyResult { params.get("query_window_seconds").map_or_else( || { // no page in params. set default @@ -225,7 +225,7 @@ pub fn get_query_window_seconds_from_params( // parse the given timestamp query_window_seconds.parse::().map_err(|err| { trace!("Unable to parse rpc_key_id: {:#?}", err); - FrontendErrorResponse::BadRequest("Unable to parse rpc_key_id".to_string()) + Web3ProxyError::BadRequest("Unable to parse rpc_key_id".to_string()) }) }, ) diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index bda39389..480f02be 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -398,7 +398,7 @@ impl Web3Rpcs { consensus_finder: &mut ConsensusFinder, new_block: Option, rpc: Arc, - pending_tx_sender: &Option>, + _pending_tx_sender: &Option>, ) -> anyhow::Result<()> { // TODO: how should we handle an error here? if !consensus_finder diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 1a1b8354..ef704498 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -167,7 +167,8 @@ impl Web3Rpcs { .max_capacity(10_000) .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()); - let (watch_consensus_rpcs_sender, consensus_connections_watcher) = watch::channel(Default::default()); + let (watch_consensus_rpcs_sender, consensus_connections_watcher) = + watch::channel(Default::default()); // by_name starts empty. self.apply_server_configs will add to it let by_name = Default::default(); @@ -318,43 +319,43 @@ impl Web3Rpcs { } } -// <<<<<<< HEAD + // <<<<<<< HEAD Ok(()) } -// ======= -// // TODO: max_capacity and time_to_idle from config -// // all block hashes are the same size, so no need for weigher -// let block_hashes = Cache::builder() -// .time_to_idle(Duration::from_secs(600)) -// .max_capacity(10_000) -// .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()); -// // all block numbers are the same size, so no need for weigher -// let block_numbers = Cache::builder() -// .time_to_idle(Duration::from_secs(600)) -// .max_capacity(10_000) -// .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()); -// -// let (watch_consensus_connections_sender, consensus_connections_watcher) = -// watch::channel(Default::default()); -// -// let watch_consensus_head_receiver = -// watch_consensus_head_sender.as_ref().map(|x| x.subscribe()); -// -// let connections = Arc::new(Self { -// by_name: connections, -// watch_consensus_rpcs_sender: watch_consensus_connections_sender, -// watch_consensus_head_receiver, -// pending_transactions, -// block_hashes, -// block_numbers, -// min_sum_soft_limit, -// min_head_rpcs, -// max_block_age, -// max_block_lag, -// }); -// -// let authorization = Arc::new(Authorization::internal(db_conn.clone())?); -// >>>>>>> 77df3fa (stats v2) + // ======= + // // TODO: max_capacity and time_to_idle from config + // // all block hashes are the same size, so no need for weigher + // let block_hashes = Cache::builder() + // .time_to_idle(Duration::from_secs(600)) + // .max_capacity(10_000) + // .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()); + // // all block numbers are the same size, so no need for weigher + // let block_numbers = Cache::builder() + // .time_to_idle(Duration::from_secs(600)) + // .max_capacity(10_000) + // .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()); + // + // let (watch_consensus_connections_sender, consensus_connections_watcher) = + // watch::channel(Default::default()); + // + // let watch_consensus_head_receiver = + // watch_consensus_head_sender.as_ref().map(|x| x.subscribe()); + // + // let connections = Arc::new(Self { + // by_name: connections, + // watch_consensus_rpcs_sender: watch_consensus_connections_sender, + // watch_consensus_head_receiver, + // pending_transactions, + // block_hashes, + // block_numbers, + // min_sum_soft_limit, + // min_head_rpcs, + // max_block_age, + // max_block_lag, + // }); + // + // let authorization = Arc::new(Authorization::internal(db_conn.clone())?); + // >>>>>>> 77df3fa (stats v2) pub fn get(&self, conn_name: &str) -> Option> { self.by_name.read().get(conn_name).cloned() @@ -364,12 +365,12 @@ impl Web3Rpcs { self.by_name.read().len() } -// <<<<<<< HEAD + // <<<<<<< HEAD pub fn is_empty(&self) -> bool { self.by_name.read().is_empty() -// ======= -// Ok((connections, handle, consensus_connections_watcher)) -// >>>>>>> 77df3fa (stats v2) + // ======= + // Ok((connections, handle, consensus_connections_watcher)) + // >>>>>>> 77df3fa (stats v2) } pub fn min_head_rpcs(&self) -> usize { @@ -884,11 +885,11 @@ impl Web3Rpcs { // TODO: maximum retries? right now its the total number of servers loop { -// <<<<<<< HEAD + // <<<<<<< HEAD if skip_rpcs.len() >= self.by_name.read().len() { -// ======= -// if skip_rpcs.len() == self.by_name.len() { -// >>>>>>> 77df3fa (stats v2) + // ======= + // if skip_rpcs.len() == self.by_name.len() { + // >>>>>>> 77df3fa (stats v2) break; } @@ -1159,18 +1160,18 @@ impl Web3Rpcs { request_metadata.no_servers.fetch_add(1, Ordering::Release); } -// <<<<<<< HEAD + // <<<<<<< HEAD watch_consensus_rpcs.changed().await?; watch_consensus_rpcs.borrow_and_update(); -// ======= + // ======= // TODO: i don't think this will ever happen // TODO: return a 502? if it does? // return Err(anyhow::anyhow!("no available rpcs!")); // TODO: sleep how long? // TODO: subscribe to something in ConsensusWeb3Rpcs instead sleep(Duration::from_millis(200)).await; -// >>>>>>> 77df3fa (stats v2) + // >>>>>>> 77df3fa (stats v2) continue; } @@ -1216,7 +1217,7 @@ impl Web3Rpcs { ) .await } - ProxyMode::Fastest(x) => todo!("Fastest"), + ProxyMode::Fastest(_x) => todo!("Fastest"), ProxyMode::Versus => todo!("Versus"), } } @@ -1299,13 +1300,13 @@ mod tests { // TODO: why is this allow needed? does tokio::test get in the way somehow? #![allow(unused_imports)] - use std::time::{SystemTime, UNIX_EPOCH}; use super::*; use crate::rpcs::consensus::ConsensusFinder; use crate::rpcs::{blockchain::Web3ProxyBlock, provider::Web3Provider}; use ethers::types::{Block, U256}; use log::{trace, LevelFilter}; use parking_lot::RwLock; + use std::time::{SystemTime, UNIX_EPOCH}; use tokio::sync::RwLock as AsyncRwLock; #[tokio::test] diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index b908ecf1..93263a54 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -242,12 +242,12 @@ impl Web3Rpc { block_data_limit, reconnect, tier: config.tier, -// <<<<<<< HEAD + // <<<<<<< HEAD disconnect_watch: Some(disconnect_sender), created_at: Some(created_at), -// ======= + // ======= head_block: RwLock::new(Default::default()), -// >>>>>>> 77df3fa (stats v2) + // >>>>>>> 77df3fa (stats v2) ..Default::default() }; @@ -1110,7 +1110,7 @@ impl Web3Rpc { trace!("watching pending transactions on {}", self); // TODO: does this keep the lock open for too long? match provider.as_ref() { - Web3Provider::Http(provider) => { + Web3Provider::Http(_provider) => { // there is a "watch_pending_transactions" function, but a lot of public nodes do not support the necessary rpc endpoints self.wait_for_disconnect().await?; } @@ -1224,11 +1224,11 @@ impl Web3Rpc { } if let Some(hard_limit_until) = self.hard_limit_until.as_ref() { -// <<<<<<< HEAD + // <<<<<<< HEAD let hard_limit_ready = *hard_limit_until.borrow(); -// ======= -// let hard_limit_ready = hard_limit_until.borrow().to_owned(); -// >>>>>>> 77df3fa (stats v2) + // ======= + // let hard_limit_ready = hard_limit_until.borrow().to_owned(); + // >>>>>>> 77df3fa (stats v2) let now = Instant::now(); diff --git a/web3_proxy/src/stats/db_queries.rs b/web3_proxy/src/stats/db_queries.rs index 599b3cff..15a8808f 100644 --- a/web3_proxy/src/stats/db_queries.rs +++ b/web3_proxy/src/stats/db_queries.rs @@ -1,11 +1,11 @@ use crate::app::Web3ProxyApp; -use crate::frontend::errors::FrontendErrorResponse; +use crate::frontend::errors::{Web3ProxyError, Web3ProxyResponse, Web3ProxyResult}; use crate::http_params::{ get_chain_id_from_params, get_page_from_params, get_query_start_from_params, get_query_window_seconds_from_params, get_user_id_from_params, }; use anyhow::Context; -use axum::response::{IntoResponse, Response}; +use axum::response::IntoResponse; use axum::Json; use axum::{ headers::{authorization::Bearer, Authorization}, @@ -227,7 +227,7 @@ pub fn filter_query_window_seconds( query_window_seconds: u64, response: &mut HashMap<&str, serde_json::Value>, q: Select, -) -> Result, FrontendErrorResponse> { +) -> Web3ProxyResult> { if query_window_seconds == 0 { // TODO: order by more than this? // query_window_seconds is not set so we aggregate all records @@ -261,7 +261,7 @@ pub async fn query_user_stats<'a>( bearer: Option>>, params: &'a HashMap, stat_response_type: StatType, -) -> Result { +) -> Web3ProxyResponse { let db_conn = app.db_conn().context("query_user_stats needs a db")?; let db_replica = app .db_replica() @@ -408,7 +408,7 @@ pub async fn query_user_stats<'a>( // TODO: move getting the param and checking the bearer token into a helper function if let Some(rpc_key_id) = params.get("rpc_key_id") { let rpc_key_id = rpc_key_id.parse::().map_err(|e| { - FrontendErrorResponse::StatusCode( + Web3ProxyError::StatusCode( StatusCode::BAD_REQUEST, "Unable to parse rpc_key_id".to_string(), Some(e.into()), diff --git a/web3_proxy/src/stats/influxdb_queries.rs b/web3_proxy/src/stats/influxdb_queries.rs index d38f5865..dc36807b 100644 --- a/web3_proxy/src/stats/influxdb_queries.rs +++ b/web3_proxy/src/stats/influxdb_queries.rs @@ -1,7 +1,7 @@ use super::StatType; use crate::{ app::Web3ProxyApp, - frontend::errors::FrontendErrorResponse, + frontend::errors::Web3ProxyResponse, http_params::{ get_chain_id_from_params, get_query_start_from_params, get_query_stop_from_params, get_query_window_seconds_from_params, get_user_id_from_params, @@ -10,7 +10,7 @@ use crate::{ use anyhow::Context; use axum::{ headers::{authorization::Bearer, Authorization}, - response::{IntoResponse, Response}, + response::IntoResponse, Json, TypedHeader, }; use chrono::{DateTime, FixedOffset}; @@ -34,7 +34,7 @@ pub async fn query_user_stats<'a>( bearer: Option>>, params: &'a HashMap, stat_response_type: StatType, -) -> Result { +) -> Web3ProxyResponse { let db_conn = app.db_conn().context("query_user_stats needs a db")?; let db_replica = app .db_replica() From c32d12b5e0c28162af0f9929f939c3edcf2a008a Mon Sep 17 00:00:00 2001 From: Rory Neithinger Date: Sun, 19 Mar 2023 15:50:25 -0700 Subject: [PATCH 2/9] better error handling for ip_is_authorized() --- web3_proxy/src/frontend/authorization.rs | 31 +++---- web3_proxy/src/frontend/errors.rs | 104 ++++++++++++++++++++++- web3_proxy/src/rpcs/many.rs | 2 +- 3 files changed, 117 insertions(+), 20 deletions(-) diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index fb0ad581..0920d9d3 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -175,7 +175,7 @@ impl From for Uuid { } impl Authorization { - pub fn internal(db_conn: Option) -> anyhow::Result { + pub fn internal(db_conn: Option) -> Web3ProxyResult { let authorization_checks = AuthorizationChecks { // any error logs on a local (internal) query are likely problems. log them all log_revert_chance: 1.0, @@ -206,7 +206,7 @@ impl Authorization { proxy_mode: ProxyMode, referer: Option, user_agent: Option, - ) -> anyhow::Result { + ) -> Web3ProxyResult { // some origins can override max_requests_per_period for anon users let max_requests_per_period = origin .as_ref() @@ -244,13 +244,13 @@ impl Authorization { referer: Option, user_agent: Option, authorization_type: AuthorizationType, - ) -> anyhow::Result { + ) -> Web3ProxyResult { // check ip match &authorization_checks.allowed_ips { None => {} Some(allowed_ips) => { if !allowed_ips.iter().any(|x| x.contains(&ip)) { - return Err(anyhow::anyhow!("IP ({}) is not allowed!", ip)); + return Err(Web3ProxyError::IpNotAllowed(ip)); } } } @@ -259,10 +259,10 @@ impl Authorization { match (&origin, &authorization_checks.allowed_origins) { (None, None) => {} (Some(_), None) => {} - (None, Some(_)) => return Err(anyhow::anyhow!("Origin required")), + (None, Some(_)) => return Err(Web3ProxyError::OriginRequired), (Some(origin), Some(allowed_origins)) => { if !allowed_origins.contains(origin) { - return Err(anyhow::anyhow!("Origin ({}) is not allowed!", origin)); + return Err(Web3ProxyError::OriginNotAllowed(origin.clone())); } } } @@ -271,10 +271,10 @@ impl Authorization { match (&referer, &authorization_checks.allowed_referers) { (None, None) => {} (Some(_), None) => {} - (None, Some(_)) => return Err(anyhow::anyhow!("Referer required")), + (None, Some(_)) => return Err(Web3ProxyError::RefererRequired), (Some(referer), Some(allowed_referers)) => { if !allowed_referers.contains(referer) { - return Err(anyhow::anyhow!("Referer ({:?}) is not allowed!", referer)); + return Err(Web3ProxyError::RefererNotAllowed(referer.clone())); } } } @@ -283,13 +283,10 @@ impl Authorization { match (&user_agent, &authorization_checks.allowed_user_agents) { (None, None) => {} (Some(_), None) => {} - (None, Some(_)) => return Err(anyhow::anyhow!("User agent required")), + (None, Some(_)) => return Err(Web3ProxyError::UserAgentRequired), (Some(user_agent), Some(allowed_user_agents)) => { if !allowed_user_agents.contains(user_agent) { - return Err(anyhow::anyhow!( - "User agent ({}) is not allowed!", - user_agent - )); + return Err(Web3ProxyError::UserAgentNotAllowed(user_agent.clone())); } } } @@ -451,7 +448,7 @@ pub async fn key_is_authorized( impl Web3ProxyApp { /// Limit the number of concurrent requests from the given ip address. - pub async fn ip_semaphore(&self, ip: IpAddr) -> anyhow::Result> { + pub async fn ip_semaphore(&self, ip: IpAddr) -> Web3ProxyResult> { if let Some(max_concurrent_requests) = self.config.public_max_concurrent_requests { let semaphore = self .ip_semaphores @@ -551,7 +548,7 @@ impl Web3ProxyApp { &self, ip: IpAddr, proxy_mode: ProxyMode, - ) -> anyhow::Result { + ) -> Web3ProxyResult { // TODO: dry this up with rate_limit_by_rpc_key? // we don't care about user agent or origin or referer @@ -608,7 +605,7 @@ impl Web3ProxyApp { ip: IpAddr, origin: Option, proxy_mode: ProxyMode, - ) -> anyhow::Result { + ) -> Web3ProxyResult { // ip rate limits don't check referer or user agent // the do check origin because we can override rate limits for some origins let authorization = Authorization::external( @@ -786,7 +783,7 @@ impl Web3ProxyApp { referer: Option, rpc_key: RpcSecretKey, user_agent: Option, - ) -> anyhow::Result { + ) -> Web3ProxyResult { let authorization_checks = self.authorization_checks(proxy_mode, rpc_key).await?; // if no rpc_key_id matching the given rpc was found, then we can't rate limit by key diff --git a/web3_proxy/src/frontend/errors.rs b/web3_proxy/src/frontend/errors.rs index 515f5c86..14bf0185 100644 --- a/web3_proxy/src/frontend/errors.rs +++ b/web3_proxy/src/frontend/errors.rs @@ -2,13 +2,16 @@ use super::authorization::Authorization; use crate::jsonrpc::JsonRpcForwardedResponse; + +use std::net::IpAddr; + use axum::{ headers, http::StatusCode, response::{IntoResponse, Response}, Json, }; -use derive_more::From; +use derive_more::{Display, Error, From}; use http::header::InvalidHeaderValue; use ipnet::AddrParseError; use log::{debug, error, trace, warn}; @@ -22,10 +25,13 @@ pub type Web3ProxyResult = Result; pub type Web3ProxyResponse = Web3ProxyResult; // TODO: -#[derive(Debug, From)] +#[derive(Debug, Display, Error, From)] pub enum Web3ProxyError { AccessDenied, + #[error(ignore)] Anyhow(anyhow::Error), + #[error(ignore)] + #[from(ignore)] BadRequest(String), SemaphoreAcquireError(AcquireError), Database(DbErr), @@ -34,17 +40,34 @@ pub enum Web3ProxyError { InfluxDb2RequestError(influxdb2::RequestError), InvalidHeaderValue(InvalidHeaderValue), IpAddrParse(AddrParseError), + #[error(ignore)] + #[from(ignore)] + IpNotAllowed(IpAddr), JoinError(JoinError), MsgPackEncode(rmp_serde::encode::Error), NotFound, + OriginRequired, + #[error(ignore)] + #[from(ignore)] + OriginNotAllowed(headers::Origin), + #[display(fmt = "{:?}, {:?}", _0, _1)] RateLimited(Authorization, Option), Redis(RedisError), + RefererRequired, + #[display(fmt = "{:?}", _0)] + #[error(ignore)] + #[from(ignore)] + RefererNotAllowed(headers::Referer), /// simple way to return an error message to the user and an anyhow to our logs + #[display(fmt = "{}, {}, {:?}", _0, _1, _2)] StatusCode(StatusCode, String, Option), /// TODO: what should be attached to the timout? Timeout(tokio::time::error::Elapsed), UlidDecode(ulid::DecodeError), UnknownKey, + UserAgentRequired, + #[error(ignore)] + UserAgentNotAllowed(headers::UserAgent), } impl Web3ProxyError { @@ -131,6 +154,17 @@ impl Web3ProxyError { ), ) } + Self::IpNotAllowed(ip) => { + warn!("IpNotAllowed ip={})", ip); + ( + StatusCode::FORBIDDEN, + JsonRpcForwardedResponse::from_string( + format!("IP ({}) is not allowed!", ip), + Some(StatusCode::FORBIDDEN.as_u16().into()), + None, + ), + ) + } Self::InvalidHeaderValue(err) => { warn!("InvalidHeaderValue err={:?}", err); ( @@ -184,6 +218,28 @@ impl Web3ProxyError { ), ) } + Self::OriginRequired => { + warn!("OriginRequired"); + ( + StatusCode::BAD_REQUEST, + JsonRpcForwardedResponse::from_str( + "Origin required", + Some(StatusCode::BAD_REQUEST.as_u16().into()), + None, + ), + ) + } + Self::OriginNotAllowed(origin) => { + warn!("OriginNotAllowed origin={}", origin); + ( + StatusCode::FORBIDDEN, + JsonRpcForwardedResponse::from_string( + format!("Origin ({}) is not allowed!", origin), + Some(StatusCode::FORBIDDEN.as_u16().into()), + None, + ), + ) + } // TODO: this should actually by the id of the key. multiple users might control one key Self::RateLimited(authorization, retry_at) => { // TODO: emit a stat @@ -227,6 +283,28 @@ impl Web3ProxyError { ), ) } + Self::RefererRequired => { + warn!("referer required"); + ( + StatusCode::BAD_REQUEST, + JsonRpcForwardedResponse::from_str( + "Referer required", + Some(StatusCode::BAD_REQUEST.as_u16().into()), + None, + ), + ) + } + Self::RefererNotAllowed(referer) => { + warn!("referer not allowed referer={:?}", referer); + ( + StatusCode::FORBIDDEN, + JsonRpcForwardedResponse::from_string( + format!("Referer ({:?}) is not allowed", referer), + Some(StatusCode::FORBIDDEN.as_u16().into()), + None, + ), + ) + } Self::SemaphoreAcquireError(err) => { warn!("semaphore acquire err={:?}", err); ( @@ -293,6 +371,28 @@ impl Web3ProxyError { None, ), ), + Self::UserAgentRequired => { + warn!("UserAgentRequired"); + ( + StatusCode::BAD_REQUEST, + JsonRpcForwardedResponse::from_str( + "User agent required", + Some(StatusCode::BAD_REQUEST.as_u16().into()), + None, + ), + ) + } + Self::UserAgentNotAllowed(ua) => { + warn!("UserAgentNotAllowed ua={}", ua); + ( + StatusCode::FORBIDDEN, + JsonRpcForwardedResponse::from_string( + format!("User agent ({}) is not allowed!", ua), + Some(StatusCode::FORBIDDEN.as_u16().into()), + None, + ), + ) + } } } } diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index ef704498..a309b2bd 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -1720,7 +1720,7 @@ mod tests { OpenRequestResult::Handle(_) )); - let best_available_server_from_none = rpcs + let _best_available_server_from_none = rpcs .best_available_rpc(&authorization, None, &[], None, None) .await; From beac7ee017a3f424a0e315cfe93399e031f672a7 Mon Sep 17 00:00:00 2001 From: Rory Neithinger Date: Sun, 19 Mar 2023 18:52:28 -0700 Subject: [PATCH 3/9] better error handling for proxy_web3_rpc() --- web3_proxy/src/app/mod.rs | 25 ++--- web3_proxy/src/app/ws.rs | 10 +- web3_proxy/src/block_number.rs | 5 +- web3_proxy/src/frontend/authorization.rs | 8 +- web3_proxy/src/frontend/errors.rs | 134 ++++++++++++++++++++++- web3_proxy/src/frontend/rpc_proxy_ws.rs | 2 +- web3_proxy/src/jsonrpc.rs | 5 +- web3_proxy/src/rpcs/blockchain.rs | 9 +- web3_proxy/src/rpcs/many.rs | 20 ++-- web3_proxy/src/rpcs/one.rs | 10 +- 10 files changed, 175 insertions(+), 53 deletions(-) diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 2ff67bee..2b8a6a4b 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -18,6 +18,7 @@ use crate::stats::{AppStat, RpcQueryStats, StatBuffer}; use crate::user_token::UserBearerToken; use anyhow::Context; use axum::headers::{Origin, Referer, UserAgent}; +use axum::http::StatusCode; use chrono::Utc; use deferred_rate_limiter::DeferredRateLimiter; use derive_more::From; @@ -1157,7 +1158,7 @@ impl Web3ProxyApp { ) -> Web3ProxyResult<(JsonRpcForwardedResponse, Vec>)> { // trace!("Received request: {:?}", request); - let request_metadata = Arc::new(RequestMetadata::new(request.num_bytes())?); + let request_metadata = Arc::new(RequestMetadata::new(request.num_bytes())); let mut kafka_stuff = None; @@ -1338,10 +1339,7 @@ impl Web3ProxyApp { } None => { // TODO: what does geth do if this happens? - // TODO: i think we want a 502 so that haproxy retries on another server - return Err( - anyhow::anyhow!("no servers synced. unknown eth_blockNumber").into(), - ); + return Err(Web3ProxyError::UnknownBlockNumber); } } } @@ -1429,7 +1427,7 @@ impl Web3ProxyApp { let head_block_num = head_block_num .or(self.balanced_rpcs.head_block_num()) - .ok_or_else(|| anyhow::anyhow!("no servers synced"))?; + .ok_or_else(|| Web3ProxyError::NoServersSynced)?; // TODO: error/wait if no head block! @@ -1607,7 +1605,7 @@ impl Web3ProxyApp { return Ok(( JsonRpcForwardedResponse::from_str( "invalid request", - None, + Some(StatusCode::BAD_REQUEST.as_u16().into()), Some(request_id), ), vec![], @@ -1760,17 +1758,10 @@ impl Web3ProxyApp { // TODO: only cache the inner response // TODO: how are we going to stream this? // TODO: check response size. if its very large, return it in a custom Error type that bypasses caching? or will moka do that for us? - Ok::<_, anyhow::Error>(response) + Ok::<_, Web3ProxyError>(response) }) - .await - // TODO: what is the best way to handle an Arc here? - .map_err(|err| { - // TODO: emit a stat for an error - anyhow::anyhow!( - "error while caching and forwarding response: {}", - err - ) - })? + // TODO: add context (error while caching and forwarding response {}) + .await? } else { self.balanced_rpcs .try_proxy_connection( diff --git a/web3_proxy/src/app/ws.rs b/web3_proxy/src/app/ws.rs index b69cdcc9..79617f7b 100644 --- a/web3_proxy/src/app/ws.rs +++ b/web3_proxy/src/app/ws.rs @@ -33,7 +33,7 @@ impl Web3ProxyApp { .context("finding request size")? .len(); - let request_metadata = Arc::new(RequestMetadata::new(request_bytes).unwrap()); + let request_metadata = Arc::new(RequestMetadata::new(request_bytes)); let (subscription_abort_handle, subscription_registration) = AbortHandle::new_pair(); @@ -67,7 +67,7 @@ impl Web3ProxyApp { }; // TODO: what should the payload for RequestMetadata be? - let request_metadata = Arc::new(RequestMetadata::new(0).unwrap()); + let request_metadata = Arc::new(RequestMetadata::new(0)); // TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id let response_json = json!({ @@ -133,7 +133,7 @@ impl Web3ProxyApp { // TODO: do something with this handle? tokio::spawn(async move { while let Some(Ok(new_tx_state)) = pending_tx_receiver.next().await { - let request_metadata = Arc::new(RequestMetadata::new(0).unwrap()); + let request_metadata = Arc::new(RequestMetadata::new(0)); let new_tx = match new_tx_state { TxStatus::Pending(tx) => tx, @@ -208,7 +208,7 @@ impl Web3ProxyApp { // TODO: do something with this handle? tokio::spawn(async move { while let Some(Ok(new_tx_state)) = pending_tx_receiver.next().await { - let request_metadata = Arc::new(RequestMetadata::new(0).unwrap()); + let request_metadata = Arc::new(RequestMetadata::new(0)); let new_tx = match new_tx_state { TxStatus::Pending(tx) => tx, @@ -284,7 +284,7 @@ impl Web3ProxyApp { // TODO: do something with this handle? tokio::spawn(async move { while let Some(Ok(new_tx_state)) = pending_tx_receiver.next().await { - let request_metadata = Arc::new(RequestMetadata::new(0).unwrap()); + let request_metadata = Arc::new(RequestMetadata::new(0)); let new_tx = match new_tx_state { TxStatus::Pending(tx) => tx, diff --git a/web3_proxy/src/block_number.rs b/web3_proxy/src/block_number.rs index bfb39299..b75375a1 100644 --- a/web3_proxy/src/block_number.rs +++ b/web3_proxy/src/block_number.rs @@ -1,4 +1,5 @@ //! Helper functions for turning ether's BlockNumber into numbers and updating incoming queries to match. +use crate::frontend::errors::{Web3ProxyError, Web3ProxyResult}; use anyhow::Context; use ethers::{ prelude::{BlockNumber, U64}, @@ -126,7 +127,7 @@ pub async fn block_needed( params: Option<&mut serde_json::Value>, head_block_num: U64, rpcs: &Web3Rpcs, -) -> anyhow::Result { +) -> Web3ProxyResult { // some requests have potentially very large responses // TODO: only skip caching if the response actually is large if method.starts_with("trace_") || method == "debug_traceTransaction" { @@ -179,7 +180,7 @@ pub async fn block_needed( // TODO: this shouldn't be a 500. this should be a 400. 500 will make haproxy retry a bunch let obj = params[0] .as_object_mut() - .ok_or_else(|| anyhow::anyhow!("invalid format"))?; + .ok_or_else(|| Web3ProxyError::BadRequest("invalid format".to_string()))?; if obj.contains_key("blockHash") { return Ok(BlockNeeded::CacheSuccessForever); diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index 0920d9d3..9a8723f1 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -88,11 +88,11 @@ pub struct RequestMetadata { } impl RequestMetadata { - pub fn new(request_bytes: usize) -> anyhow::Result { + pub fn new(request_bytes: usize) -> Self { // TODO: how can we do this without turning it into a string first. this is going to slow us down! let request_bytes = request_bytes as u64; - let new = Self { + Self { start_instant: Instant::now(), request_bytes, archive_request: false.into(), @@ -102,9 +102,7 @@ impl RequestMetadata { response_bytes: 0.into(), response_millis: 0.into(), response_from_backup_rpc: false.into(), - }; - - Ok(new) + } } } diff --git a/web3_proxy/src/frontend/errors.rs b/web3_proxy/src/frontend/errors.rs index 14bf0185..8acd662c 100644 --- a/web3_proxy/src/frontend/errors.rs +++ b/web3_proxy/src/frontend/errors.rs @@ -4,6 +4,7 @@ use super::authorization::Authorization; use crate::jsonrpc::JsonRpcForwardedResponse; use std::net::IpAddr; +use std::sync::Arc; use axum::{ headers, @@ -33,11 +34,18 @@ pub enum Web3ProxyError { #[error(ignore)] #[from(ignore)] BadRequest(String), - SemaphoreAcquireError(AcquireError), Database(DbErr), + EthersHttpClientError(ethers::prelude::HttpClientError), + EthersProviderError(ethers::prelude::ProviderError), + EthersWsClientError(ethers::prelude::WsClientError), Headers(headers::Error), HeaderToString(ToStrError), InfluxDb2RequestError(influxdb2::RequestError), + #[display(fmt = "{} > {}", min, max)] + InvalidBlockBounds { + min: u64, + max: u64, + }, InvalidHeaderValue(InvalidHeaderValue), IpAddrParse(AddrParseError), #[error(ignore)] @@ -45,6 +53,8 @@ pub enum Web3ProxyError { IpNotAllowed(IpAddr), JoinError(JoinError), MsgPackEncode(rmp_serde::encode::Error), + NoServersSynced, + NoHandleReady, NotFound, OriginRequired, #[error(ignore)] @@ -58,16 +68,23 @@ pub enum Web3ProxyError { #[error(ignore)] #[from(ignore)] RefererNotAllowed(headers::Referer), + SeaRc(Arc), + SemaphoreAcquireError(AcquireError), + SerdeJson(serde_json::Error), /// simple way to return an error message to the user and an anyhow to our logs #[display(fmt = "{}, {}, {:?}", _0, _1, _2)] StatusCode(StatusCode, String, Option), /// TODO: what should be attached to the timout? - Timeout(tokio::time::error::Elapsed), + #[display(fmt = "{:?}", _0)] + #[error(ignore)] + Timeout(Option), UlidDecode(ulid::DecodeError), + UnknownBlockNumber, UnknownKey, UserAgentRequired, #[error(ignore)] UserAgentNotAllowed(headers::UserAgent), + WatchRecvError(tokio::sync::watch::error::RecvError), } impl Web3ProxyError { @@ -120,6 +137,39 @@ impl Web3ProxyError { ), ) } + Self::EthersHttpClientError(err) => { + warn!("EthersHttpClientError err={:?}", err); + ( + StatusCode::INTERNAL_SERVER_ERROR, + JsonRpcForwardedResponse::from_str( + "ether http client error", + Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), + None, + ), + ) + } + Self::EthersProviderError(err) => { + warn!("EthersProviderError err={:?}", err); + ( + StatusCode::INTERNAL_SERVER_ERROR, + JsonRpcForwardedResponse::from_str( + "ether provider error", + Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), + None, + ), + ) + } + Self::EthersWsClientError(err) => { + warn!("EthersWsClientError err={:?}", err); + ( + StatusCode::INTERNAL_SERVER_ERROR, + JsonRpcForwardedResponse::from_str( + "ether ws client error", + Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), + None, + ), + ) + } Self::Headers(err) => { warn!("HeadersError {:?}", err); ( @@ -143,6 +193,20 @@ impl Web3ProxyError { ), ) } + Self::InvalidBlockBounds { min, max } => { + warn!("InvalidBlockBounds min={} max={}", min, max); + ( + StatusCode::BAD_REQUEST, + JsonRpcForwardedResponse::from_string( + format!( + "Invalid blocks bounds requested. min ({}) > max ({})", + min, max + ), + Some(StatusCode::BAD_REQUEST.as_u16().into()), + None, + ), + ) + } Self::IpAddrParse(err) => { warn!("IpAddrParse err={:?}", err); ( @@ -206,6 +270,28 @@ impl Web3ProxyError { ), ) } + Self::NoServersSynced => { + warn!("NoServersSynced"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + JsonRpcForwardedResponse::from_str( + "no servers synced", + Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), + None, + ), + ) + } + Self::NoHandleReady => { + error!("NoHandleReady"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + JsonRpcForwardedResponse::from_str( + "unable to retry for request handle", + Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), + None, + ), + ) + } Self::NotFound => { // TODO: emit a stat? // TODO: instead of an error, show a normal html page for 404 @@ -305,6 +391,11 @@ impl Web3ProxyError { ), ) } + Self::SeaRc(err) => match migration::SeaRc::try_unwrap(err) { + Ok(err) => err, + Err(err) => Self::Anyhow(anyhow::anyhow!("{}", err)), + } + .into_response_parts(), Self::SemaphoreAcquireError(err) => { warn!("semaphore acquire err={:?}", err); ( @@ -317,6 +408,17 @@ impl Web3ProxyError { ), ) } + Self::SerdeJson(err) => { + warn!("serde json err={:?}", err); + ( + StatusCode::BAD_REQUEST, + JsonRpcForwardedResponse::from_str( + "de/serialization error!", + Some(StatusCode::BAD_REQUEST.as_u16().into()), + None, + ), + ) + } Self::StatusCode(status_code, err_msg, err) => { // different status codes should get different error levels. 500s should warn. 400s should stat let code = status_code.as_u16(); @@ -362,6 +464,17 @@ impl Web3ProxyError { ), ) } + Self::UnknownBlockNumber => { + error!("UnknownBlockNumber"); + ( + StatusCode::BAD_GATEWAY, + JsonRpcForwardedResponse::from_str( + "no servers synced. unknown eth_blockNumber", + Some(StatusCode::BAD_GATEWAY.as_u16().into()), + None, + ), + ) + } // TODO: stat? Self::UnknownKey => ( StatusCode::UNAUTHORIZED, @@ -393,10 +506,27 @@ impl Web3ProxyError { ), ) } + Self::WatchRecvError(err) => { + error!("WatchRecvError err={:?}", err); + ( + StatusCode::INTERNAL_SERVER_ERROR, + JsonRpcForwardedResponse::from_str( + "watch recv error!", + Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), + None, + ), + ) + } } } } +impl From for Web3ProxyError { + fn from(err: tokio::time::error::Elapsed) -> Self { + Self::Timeout(Some(err)) + } +} + impl IntoResponse for Web3ProxyError { fn into_response(self) -> Response { // TODO: include the request id in these so that users can give us something that will point to logs diff --git a/web3_proxy/src/frontend/rpc_proxy_ws.rs b/web3_proxy/src/frontend/rpc_proxy_ws.rs index 3e3a6957..9f249555 100644 --- a/web3_proxy/src/frontend/rpc_proxy_ws.rs +++ b/web3_proxy/src/frontend/rpc_proxy_ws.rs @@ -378,7 +378,7 @@ async fn handle_socket_payload( // TODO: move this logic into the app? let request_bytes = json_request.num_bytes(); - let request_metadata = Arc::new(RequestMetadata::new(request_bytes).unwrap()); + let request_metadata = Arc::new(RequestMetadata::new(request_bytes)); let subscription_id = json_request.params.unwrap().to_string(); diff --git a/web3_proxy/src/jsonrpc.rs b/web3_proxy/src/jsonrpc.rs index 0a6435c6..f5a2dde6 100644 --- a/web3_proxy/src/jsonrpc.rs +++ b/web3_proxy/src/jsonrpc.rs @@ -1,3 +1,4 @@ +use crate::frontend::errors::Web3ProxyResult; use derive_more::From; use ethers::prelude::{HttpClientError, ProviderError, WsClientError}; use serde::de::{self, Deserializer, MapAccess, SeqAccess, Visitor}; @@ -240,7 +241,7 @@ impl JsonRpcForwardedResponse { } } - pub fn from_ethers_error(e: ProviderError, id: Box) -> anyhow::Result { + pub fn from_ethers_error(e: ProviderError, id: Box) -> Web3ProxyResult { // TODO: move turning ClientError into json to a helper function? let code; let message: String; @@ -302,7 +303,7 @@ impl JsonRpcForwardedResponse { pub fn try_from_response_result( result: Result, ProviderError>, id: Box, - ) -> anyhow::Result { + ) -> Web3ProxyResult { match result { Ok(response) => Ok(Self::from_response(response, id)), Err(e) => Self::from_ethers_error(e, id), diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index 480f02be..b6c60f2b 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -4,6 +4,7 @@ use super::many::Web3Rpcs; use super::one::Web3Rpc; use super::transactions::TxStatus; use crate::frontend::authorization::Authorization; +use crate::frontend::errors::Web3ProxyResult; use crate::{config::BlockAndRpc, jsonrpc::JsonRpcRequest}; use anyhow::{anyhow, Context}; use derive_more::From; @@ -158,7 +159,7 @@ impl Web3Rpcs { &self, block: Web3ProxyBlock, heaviest_chain: bool, - ) -> anyhow::Result { + ) -> Web3ProxyResult { // TODO: i think we can rearrange this function to make it faster on the hot path let block_hash = block.hash(); @@ -196,7 +197,7 @@ impl Web3Rpcs { authorization: &Arc, hash: &H256, rpc: Option<&Arc>, - ) -> anyhow::Result { + ) -> Web3ProxyResult { // first, try to get the hash from our cache // the cache is set last, so if its here, its everywhere // TODO: use try_get_with @@ -267,7 +268,7 @@ impl Web3Rpcs { &self, authorization: &Arc, num: &U64, - ) -> anyhow::Result<(H256, u64)> { + ) -> Web3ProxyResult<(H256, u64)> { let (block, block_depth) = self.cannonical_block(authorization, num).await?; let hash = *block.hash(); @@ -281,7 +282,7 @@ impl Web3Rpcs { &self, authorization: &Arc, num: &U64, - ) -> anyhow::Result<(Web3ProxyBlock, u64)> { + ) -> Web3ProxyResult<(Web3ProxyBlock, u64)> { // we only have blocks by hash now // maybe save them during save_block in a blocks_by_number Cache> // if theres multiple, use petgraph to find the one on the main chain (and remove the others if they have enough confirmations) diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index a309b2bd..412a26dd 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -7,6 +7,7 @@ use crate::app::{flatten_handle, AnyhowJoinHandle, Web3ProxyApp}; ///! Load balanced communication with a group of web3 providers use crate::config::{BlockAndRpc, TxHashAndRpc, Web3RpcConfig}; use crate::frontend::authorization::{Authorization, RequestMetadata}; +use crate::frontend::errors::{Web3ProxyError, Web3ProxyResult}; use crate::frontend::rpc_proxy_ws::ProxyMode; use crate::jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest}; use crate::rpcs::transactions::TxStatus; @@ -462,7 +463,7 @@ impl Web3Rpcs { params: Option<&serde_json::Value>, error_level: Level, // TODO: remove this box once i figure out how to do the options - ) -> anyhow::Result { + ) -> Web3ProxyResult { // TODO: if only 1 active_request_handles, do self.try_send_request? let responses = active_request_handles @@ -540,7 +541,7 @@ impl Web3Rpcs { // TODO: if we are checking for the consensus head, i don' think we need min_block_needed/max_block_needed min_block_needed: Option<&U64>, max_block_needed: Option<&U64>, - ) -> anyhow::Result { + ) -> Web3ProxyResult { let usable_rpcs_by_tier_and_head_number: BTreeMap<(u64, Option), Vec>> = { let synced_connections = self.watch_consensus_rpcs_sender.borrow().clone(); @@ -569,11 +570,10 @@ impl Web3Rpcs { cmp::Ordering::Greater => { // TODO: force a debug log of the original request to see if our logic is wrong? // TODO: attach the rpc_key_id so we can find the user to ask if they need help - return Err(anyhow::anyhow!( - "Invalid blocks bounds requested. min ({}) > max ({})", - min_block_needed, - max_block_needed - )); + return Err(Web3ProxyError::InvalidBlockBounds { + min: min_block_needed.as_u64(), + max: max_block_needed.as_u64(), + }); } } } @@ -877,7 +877,7 @@ impl Web3Rpcs { request_metadata: Option<&Arc>, min_block_needed: Option<&U64>, max_block_needed: Option<&U64>, - ) -> anyhow::Result { + ) -> Web3ProxyResult { let mut skip_rpcs = vec![]; let mut method_not_available_response = None; @@ -1099,7 +1099,7 @@ impl Web3Rpcs { error_level: Level, max_count: Option, always_include_backups: bool, - ) -> anyhow::Result { + ) -> Web3ProxyResult { let mut watch_consensus_rpcs = self.watch_consensus_rpcs_sender.subscribe(); loop { @@ -1205,7 +1205,7 @@ impl Web3Rpcs { request_metadata: Option<&Arc>, min_block_needed: Option<&U64>, max_block_needed: Option<&U64>, - ) -> anyhow::Result { + ) -> Web3ProxyResult { match authorization.checks.proxy_mode { ProxyMode::Debug | ProxyMode::Best => { self.try_send_best_consensus_head_connection( diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index 93263a54..b7b32fba 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -5,6 +5,7 @@ use super::request::{OpenRequestHandle, OpenRequestResult}; use crate::app::{flatten_handle, AnyhowJoinHandle}; use crate::config::{BlockAndRpc, Web3RpcConfig}; use crate::frontend::authorization::Authorization; +use crate::frontend::errors::{Web3ProxyError, Web3ProxyResult}; use crate::rpcs::request::RequestErrorHandler; use anyhow::{anyhow, Context}; use ethers::prelude::{Bytes, Middleware, ProviderError, TxHash, H256, U64}; @@ -1159,7 +1160,7 @@ impl Web3Rpc { authorization: &'a Arc, max_wait: Option, unlocked_provider: Option>, - ) -> anyhow::Result { + ) -> Web3ProxyResult { let max_wait = max_wait.map(|x| Instant::now() + x); loop { @@ -1181,8 +1182,7 @@ impl Web3Rpc { if let Some(max_wait) = max_wait { if retry_at > max_wait { // break now since we will wait past our maximum wait time - // TODO: don't use anyhow. use specific error type - return Err(anyhow::anyhow!("timeout waiting for request handle")); + return Err(Web3ProxyError::Timeout(None)); } } @@ -1196,7 +1196,7 @@ impl Web3Rpc { let now = Instant::now(); if now > max_wait { - return Err(anyhow::anyhow!("unable to retry for request handle")); + return Err(Web3ProxyError::NoHandleReady); } } @@ -1214,7 +1214,7 @@ impl Web3Rpc { authorization: &Arc, // TODO: borrow on this instead of needing to clone the Arc? unlocked_provider: Option>, - ) -> anyhow::Result { + ) -> Web3ProxyResult { // TODO: think more about this read block // TODO: this should *not* be new_head_client. this should be a separate object if unlocked_provider.is_some() || self.provider.read().await.is_some() { From 1493d73386891b75930b07e1fd0c281ffa4582c1 Mon Sep 17 00:00:00 2001 From: Rory Neithinger Date: Sun, 19 Mar 2023 19:14:46 -0700 Subject: [PATCH 4/9] better error handling for ws --- web3_proxy/src/app/ws.rs | 5 +++-- web3_proxy/src/frontend/errors.rs | 12 +++++++++++ web3_proxy/src/frontend/rpc_proxy_ws.rs | 27 +++++++------------------ 3 files changed, 22 insertions(+), 22 deletions(-) diff --git a/web3_proxy/src/app/ws.rs b/web3_proxy/src/app/ws.rs index 79617f7b..2366ab31 100644 --- a/web3_proxy/src/app/ws.rs +++ b/web3_proxy/src/app/ws.rs @@ -2,6 +2,7 @@ use super::Web3ProxyApp; use crate::frontend::authorization::{Authorization, RequestMetadata}; +use crate::frontend::errors::Web3ProxyResult; use crate::jsonrpc::JsonRpcForwardedResponse; use crate::jsonrpc::JsonRpcRequest; use crate::rpcs::transactions::TxStatus; @@ -27,7 +28,7 @@ impl Web3ProxyApp { subscription_count: &'a AtomicUsize, // TODO: taking a sender for Message instead of the exact json we are planning to send feels wrong, but its easier for now response_sender: flume::Sender, - ) -> anyhow::Result<(AbortHandle, JsonRpcForwardedResponse)> { + ) -> Web3ProxyResult<(AbortHandle, JsonRpcForwardedResponse)> { // TODO: this is not efficient let request_bytes = serde_json::to_string(&request_json) .context("finding request size")? @@ -341,7 +342,7 @@ impl Web3ProxyApp { ); }); } - _ => return Err(anyhow::anyhow!("unimplemented")), + _ => return Err(anyhow::anyhow!("unimplemented").into()), } // TODO: do something with subscription_join_handle? diff --git a/web3_proxy/src/frontend/errors.rs b/web3_proxy/src/frontend/errors.rs index 8acd662c..0e4356a8 100644 --- a/web3_proxy/src/frontend/errors.rs +++ b/web3_proxy/src/frontend/errors.rs @@ -85,6 +85,7 @@ pub enum Web3ProxyError { #[error(ignore)] UserAgentNotAllowed(headers::UserAgent), WatchRecvError(tokio::sync::watch::error::RecvError), + WebsocketOnly, } impl Web3ProxyError { @@ -517,6 +518,17 @@ impl Web3ProxyError { ), ) } + Self::WebsocketOnly => { + warn!("WebsocketOnly"); + ( + StatusCode::BAD_REQUEST, + JsonRpcForwardedResponse::from_str( + "redirect_public_url not set. only websockets work here", + Some(StatusCode::BAD_REQUEST.as_u16().into()), + None, + ), + ) + } } } } diff --git a/web3_proxy/src/frontend/rpc_proxy_ws.rs b/web3_proxy/src/frontend/rpc_proxy_ws.rs index 9f249555..df186414 100644 --- a/web3_proxy/src/frontend/rpc_proxy_ws.rs +++ b/web3_proxy/src/frontend/rpc_proxy_ws.rs @@ -7,6 +7,7 @@ use super::errors::{Web3ProxyError, Web3ProxyResponse}; use crate::stats::RpcQueryStats; use crate::{ app::Web3ProxyApp, + frontend::errors::Web3ProxyResult, jsonrpc::{JsonRpcForwardedResponse, JsonRpcForwardedResponseEnum, JsonRpcRequest}, }; use axum::headers::{Origin, Referer, UserAgent}; @@ -26,7 +27,7 @@ use futures::{ use handlebars::Handlebars; use hashbrown::HashMap; use http::StatusCode; -use log::{error, info, trace, warn}; +use log::{info, trace, warn}; use serde_json::json; use serde_json::value::to_raw_value; use std::sync::Arc; @@ -112,11 +113,7 @@ async fn _websocket_handler( // this is not a websocket. redirect to a friendly page Ok(Redirect::permanent(redirect).into_response()) } else { - // TODO: do not use an anyhow error. send the user a 400 - Err( - anyhow::anyhow!("redirect_public_url not set. only websockets work here") - .into(), - ) + Err(Web3ProxyError::WebsocketOnly) } } } @@ -341,7 +338,7 @@ async fn handle_socket_payload( Ok(json_request) => { let id = json_request.id.clone(); - let response: anyhow::Result = match &json_request.method + let response: Web3ProxyResult = match &json_request.method [..] { "eth_subscribe" => { @@ -417,16 +414,7 @@ async fn handle_socket_payload( _ => app .proxy_web3_rpc(authorization.clone(), json_request.into()) .await - .map_or_else( - |err| match err { - Web3ProxyError::Anyhow(err) => Err(err), - _ => { - error!("handle this better! {:?}", err); - Err(anyhow::anyhow!("unexpected error! {:?}", err)) - } - }, - |(response, _)| Ok(response), - ), + .map(|(response, _)| response), }; (id, response) @@ -442,9 +430,8 @@ async fn handle_socket_payload( let response_str = match response { Ok(x) => serde_json::to_string(&x).expect("to_string should always work here"), Err(err) => { - // we have an anyhow error. turn it into a response - let response = JsonRpcForwardedResponse::from_anyhow_error(err, None, Some(id)); - + let (_, mut response) = err.into_response_parts(); + response.id = id; serde_json::to_string(&response).expect("to_string should always work here") } }; From 3479bf9d063a66e88001fcdef652f5a6c3fa803d Mon Sep 17 00:00:00 2001 From: Rory Neithinger Date: Mon, 20 Mar 2023 11:38:54 -0700 Subject: [PATCH 5/9] add context type which preserves status code --- web3_proxy/src/frontend/errors.rs | 38 ++++++++++++++++++++++++++++++- 1 file changed, 37 insertions(+), 1 deletion(-) diff --git a/web3_proxy/src/frontend/errors.rs b/web3_proxy/src/frontend/errors.rs index 0e4356a8..d9692f7f 100644 --- a/web3_proxy/src/frontend/errors.rs +++ b/web3_proxy/src/frontend/errors.rs @@ -15,7 +15,7 @@ use axum::{ use derive_more::{Display, Error, From}; use http::header::InvalidHeaderValue; use ipnet::AddrParseError; -use log::{debug, error, trace, warn}; +use log::{debug, error, info, trace, warn}; use migration::sea_orm::DbErr; use redis_rate_limiter::redis::RedisError; use reqwest::header::ToStrError; @@ -86,6 +86,9 @@ pub enum Web3ProxyError { UserAgentNotAllowed(headers::UserAgent), WatchRecvError(tokio::sync::watch::error::RecvError), WebsocketOnly, + #[display(fmt = "{}, {}", _0, _1)] + #[error(ignore)] + WithContext(Option>, String), } impl Web3ProxyError { @@ -529,6 +532,20 @@ impl Web3ProxyError { ), ) } + Self::WithContext(err, msg) => { + info!("in context: {}", msg); + match err { + Some(err) => err.into_response_parts(), + None => ( + StatusCode::INTERNAL_SERVER_ERROR, + JsonRpcForwardedResponse::from_string( + msg, + Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), + None, + ), + ), + } + } } } } @@ -552,3 +569,22 @@ impl IntoResponse for Web3ProxyError { pub async fn handler_404() -> Response { Web3ProxyError::NotFound.into_response() } + +pub trait Web3ProxyErrorContext { + fn web3_context>(self, msg: S) -> Result; +} + +impl Web3ProxyErrorContext for Option { + fn web3_context>(self, msg: S) -> Result { + self.ok_or(Web3ProxyError::WithContext(None, msg.into())) + } +} + +impl Web3ProxyErrorContext for Result +where + E: Into, +{ + fn web3_context>(self, msg: S) -> Result { + self.map_err(|err| Web3ProxyError::WithContext(Some(Box::new(err.into())), msg.into())) + } +} From 60c1a6d3828c34bfecd1c69f60f9fe6f8fa77b3e Mon Sep 17 00:00:00 2001 From: Rory Neithinger Date: Mon, 20 Mar 2023 12:47:57 -0700 Subject: [PATCH 6/9] remove anyhow from rest of frontend module --- web3_proxy/src/frontend/admin.rs | 66 +++++---- web3_proxy/src/frontend/authorization.rs | 45 +++--- web3_proxy/src/frontend/errors.rs | 177 ++++++++++++++++++++++- web3_proxy/src/frontend/users.rs | 97 +++++++------ 4 files changed, 287 insertions(+), 98 deletions(-) diff --git a/web3_proxy/src/frontend/admin.rs b/web3_proxy/src/frontend/admin.rs index 8df584e6..8de51fa8 100644 --- a/web3_proxy/src/frontend/admin.rs +++ b/web3_proxy/src/frontend/admin.rs @@ -4,10 +4,9 @@ use super::authorization::login_is_authorized; use super::errors::Web3ProxyResponse; use crate::admin_queries::query_admin_modify_usertier; use crate::app::Web3ProxyApp; -use crate::frontend::errors::Web3ProxyError; +use crate::frontend::errors::{Web3ProxyError, Web3ProxyErrorContext}; use crate::user_token::UserBearerToken; use crate::PostLogin; -use anyhow::Context; use axum::{ extract::{Path, Query}, headers::{authorization::Bearer, Authorization}, @@ -137,10 +136,10 @@ pub async fn admin_login_get( resources: vec![], }; - let db_conn = app.db_conn().context("login requires a database")?; + let db_conn = app.db_conn().web3_context("login requires a database")?; let db_replica = app .db_replica() - .context("login requires a replica database")?; + .web3_context("login requires a replica database")?; // Get the user that we want to imitate from the read-only database (their id ...) // TODO: Only get the id, not the whole user object ... @@ -171,7 +170,7 @@ pub async fn admin_login_get( trail .save(&db_conn) .await - .context("saving user's pending_login")?; + .web3_context("saving user's pending_login")?; // Can there be two login-sessions at the same time? // I supposed if the user logs in, the admin would be logged out and vice versa @@ -196,7 +195,7 @@ pub async fn admin_login_get( user_pending_login .save(&db_conn) .await - .context("saving an admin trail pre login")?; + .web3_context("saving an admin trail pre login")?; // there are multiple ways to sign messages and not all wallets support them // TODO: default message eip from config? @@ -210,7 +209,7 @@ pub async fn admin_login_get( "eip4361" => message.to_string(), _ => { // TODO: custom error that is handled a 401 - return Err(anyhow::anyhow!("invalid message eip given").into()); + return Err(Web3ProxyError::InvalidEip); } }; @@ -230,9 +229,9 @@ pub async fn admin_login_post( // Check for the signed bytes .. // TODO: this seems too verbose. how can we simply convert a String into a [u8; 65] - let their_sig_bytes = Bytes::from_str(&payload.sig).context("parsing sig")?; + let their_sig_bytes = Bytes::from_str(&payload.sig).web3_context("parsing sig")?; if their_sig_bytes.len() != 65 { - return Err(anyhow::anyhow!("checking signature length").into()); + return Err(Web3ProxyError::InvalidSignatureLength); } let mut their_sig: [u8; 65] = [0; 65]; for x in 0..65 { @@ -242,17 +241,18 @@ pub async fn admin_login_post( // we can't trust that they didn't tamper with the message in some way. like some clients return it hex encoded // TODO: checking 0x seems fragile, but I think it will be fine. siwe message text shouldn't ever start with 0x let their_msg: Message = if payload.msg.starts_with("0x") { - let their_msg_bytes = Bytes::from_str(&payload.msg).context("parsing payload message")?; + let their_msg_bytes = + Bytes::from_str(&payload.msg).web3_context("parsing payload message")?; // TODO: lossy or no? String::from_utf8_lossy(their_msg_bytes.as_ref()) .parse::() - .context("parsing hex string message")? + .web3_context("parsing hex string message")? } else { payload .msg .parse::() - .context("parsing string message")? + .web3_context("parsing string message")? }; // the only part of the message we will trust is their nonce @@ -260,7 +260,9 @@ pub async fn admin_login_post( let login_nonce = UserBearerToken::from_str(&their_msg.nonce)?; // fetch the message we gave them from our database - let db_replica = app.db_replica().context("Getting database connection")?; + let db_replica = app + .db_replica() + .web3_context("Getting database connection")?; // massage type for the db let login_nonce_uuid: Uuid = login_nonce.clone().into(); @@ -270,30 +272,30 @@ pub async fn admin_login_post( .filter(pending_login::Column::Nonce.eq(login_nonce_uuid)) .one(db_replica.conn()) .await - .context("database error while finding pending_login")? - .context("login nonce not found")?; + .web3_context("database error while finding pending_login")? + .web3_context("login nonce not found")?; let our_msg: siwe::Message = user_pending_login .message .parse() - .context("parsing siwe message")?; + .web3_context("parsing siwe message")?; // default options are fine. the message includes timestamp and domain and nonce let verify_config = VerificationOpts::default(); let db_conn = app .db_conn() - .context("deleting expired pending logins requires a db")?; + .web3_context("deleting expired pending logins requires a db")?; if let Err(err_1) = our_msg .verify(&their_sig, &verify_config) .await - .context("verifying signature against our local message") + .web3_context("verifying signature against our local message") { // verification method 1 failed. try eip191 if let Err(err_191) = our_msg .verify_eip191(&their_sig) - .context("verifying eip191 signature against our local message") + .web3_context("verifying eip191 signature against our local message") { // delete ALL expired rows. let now = Utc::now(); @@ -305,18 +307,16 @@ pub async fn admin_login_post( // TODO: emit a stat? if this is high something weird might be happening debug!("cleared expired pending_logins: {:?}", delete_result); - return Err(anyhow::anyhow!( - "both the primary and eip191 verification failed: {:#?}; {:#?}", - err_1, - err_191 - ) - .into()); + return Err(Web3ProxyError::EipVerificationFailed( + Box::new(err_1), + Box::new(err_191), + )); } } let imitating_user_id = user_pending_login .imitating_user - .context("getting address of the imitating user")?; + .web3_context("getting address of the imitating user")?; // TODO: limit columns or load whole user? // TODO: Right now this loads the whole admin. I assume we might want to load the user though (?) figure this out as we go along... @@ -324,13 +324,13 @@ pub async fn admin_login_post( .filter(user::Column::Address.eq(our_msg.address.as_ref())) .one(db_replica.conn()) .await? - .context("getting admin address")?; + .web3_context("getting admin address")?; let imitating_user = user::Entity::find() .filter(user::Column::Id.eq(imitating_user_id)) .one(db_replica.conn()) .await? - .context("admin address was not found!")?; + .web3_context("admin address was not found!")?; // Add a message that the admin has logged in // Note that the admin is trying to log in as this user @@ -344,7 +344,7 @@ pub async fn admin_login_post( trail .save(&db_conn) .await - .context("saving an admin trail post login")?; + .web3_context("saving an admin trail post login")?; // I supposed we also get the rpc_key, whatever this is used for (?). // I think the RPC key should still belong to the admin though in this case ... @@ -354,7 +354,7 @@ pub async fn admin_login_post( .filter(rpc_key::Column::UserId.eq(admin.id)) .all(db_replica.conn()) .await - .context("failed loading user's key")?; + .web3_context("failed loading user's key")?; // create a bearer token for the user. let user_bearer_token = UserBearerToken::default(); @@ -395,7 +395,7 @@ pub async fn admin_login_post( user_login .save(&db_conn) .await - .context("saving user login")?; + .web3_context("saving user login")?; if let Err(err) = user_pending_login .into_active_model() @@ -417,7 +417,9 @@ pub async fn admin_logout_post( ) -> Web3ProxyResponse { let user_bearer = UserBearerToken::try_from(bearer)?; - let db_conn = app.db_conn().context("database needed for user logout")?; + let db_conn = app + .db_conn() + .web3_context("database needed for user logout")?; if let Err(err) = login::Entity::delete_many() .filter(login::Column::BearerToken.eq(user_bearer.uuid())) diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index 9a8723f1..03e80ef8 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -1,11 +1,10 @@ //! Utilities for authorization of logged in and anonymous users. -use super::errors::{Web3ProxyError, Web3ProxyResult}; +use super::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult}; use super::rpc_proxy_ws::ProxyMode; use crate::app::{AuthorizationChecks, Web3ProxyApp, APP_USER_AGENT}; use crate::rpcs::one::Web3Rpc; use crate::user_token::UserBearerToken; -use anyhow::Context; use axum::headers::authorization::Bearer; use axum::headers::{Header, Origin, Referer, UserAgent}; use chrono::Utc; @@ -128,7 +127,7 @@ impl Display for RpcSecretKey { } impl FromStr for RpcSecretKey { - type Err = anyhow::Error; + type Err = Web3ProxyError; fn from_str(s: &str) -> Result { if let Ok(ulid) = s.parse::() { @@ -137,7 +136,7 @@ impl FromStr for RpcSecretKey { Ok(uuid.into()) } else { // TODO: custom error type so that this shows as a 400 - Err(anyhow::anyhow!("UserKey was not a ULID or UUID")) + Err(Web3ProxyError::InvalidUserKey) } } } @@ -367,7 +366,7 @@ pub async fn ip_is_authorized( .await?; }; - Ok::<_, anyhow::Error>(()) + Ok::<_, Web3ProxyError>(()) } .map_err(|err| { warn!("background update of recent_users:ip failed: {}", err); @@ -430,7 +429,7 @@ pub async fn key_is_authorized( .await?; } - Ok::<_, anyhow::Error>(()) + Ok::<_, Web3ProxyError>(()) } .map_err(|err| { warn!("background update of recent_users:id failed: {}", err); @@ -474,12 +473,13 @@ impl Web3ProxyApp { pub async fn registered_user_semaphore( &self, authorization_checks: &AuthorizationChecks, - ) -> anyhow::Result> { + ) -> Web3ProxyResult> { if let Some(max_concurrent_requests) = authorization_checks.max_concurrent_requests { let user_id = authorization_checks .user_id .try_into() - .context("user ids should always be non-zero")?; + .or(Err(Web3ProxyError::UserIdZero)) + .web3_context("user ids should always be non-zero")?; let semaphore = self .registered_user_semaphores @@ -527,7 +527,7 @@ impl Web3ProxyApp { // get the attached address from the database for the given auth_token. let db_replica = self .db_replica() - .context("checking if bearer token is authorized")?; + .web3_context("checking if bearer token is authorized")?; let user_bearer_uuid: Uuid = user_bearer_token.into(); @@ -536,8 +536,8 @@ impl Web3ProxyApp { .filter(login::Column::BearerToken.eq(user_bearer_uuid)) .one(db_replica.conn()) .await - .context("fetching user from db by bearer token")? - .context("unknown bearer token")?; + .web3_context("fetching user from db by bearer token")? + .web3_context("unknown bearer token")?; Ok((user, semaphore_permit)) } @@ -662,13 +662,15 @@ impl Web3ProxyApp { &self, proxy_mode: ProxyMode, rpc_secret_key: RpcSecretKey, - ) -> anyhow::Result { - let authorization_checks: Result<_, Arc> = self + ) -> Web3ProxyResult { + let authorization_checks: Result<_, Arc> = self .rpc_secret_key_cache .try_get_with(rpc_secret_key.into(), async move { // trace!(?rpc_secret_key, "user cache miss"); - let db_replica = self.db_replica().context("Getting database connection")?; + let db_replica = self + .db_replica() + .web3_context("Getting database connection")?; // TODO: join the user table to this to return the User? we don't always need it // TODO: join on secondary users @@ -724,7 +726,11 @@ impl Web3ProxyApp { if let Some(allowed_referers) = rpc_key_model.allowed_referers { let x = allowed_referers .split(',') - .map(|x| x.trim().parse::()) + .map(|x| { + x.trim() + .parse::() + .or(Err(Web3ProxyError::InvalidReferer)) + }) .collect::, _>>()?; Some(x) @@ -736,7 +742,11 @@ impl Web3ProxyApp { if let Some(allowed_user_agents) = rpc_key_model.allowed_user_agents { let x: Result, _> = allowed_user_agents .split(',') - .map(|x| x.trim().parse::()) + .map(|x| { + x.trim() + .parse::() + .or(Err(Web3ProxyError::InvalidUserAgent)) + }) .collect(); Some(x?) @@ -768,8 +778,7 @@ impl Web3ProxyApp { }) .await; - // TODO: what's the best way to handle this arc? try_unwrap will not work - authorization_checks.map_err(|err| anyhow::anyhow!(err)) + authorization_checks.map_err(Web3ProxyError::SeaRc) } /// Authorized the ip/origin/referer/useragent and rate limit and concurrency diff --git a/web3_proxy/src/frontend/errors.rs b/web3_proxy/src/frontend/errors.rs index d9692f7f..2546ae14 100644 --- a/web3_proxy/src/frontend/errors.rs +++ b/web3_proxy/src/frontend/errors.rs @@ -34,7 +34,10 @@ pub enum Web3ProxyError { #[error(ignore)] #[from(ignore)] BadRequest(String), + BadRouting, Database(DbErr), + #[display(fmt = "{:#?}, {:#?}", _0, _1)] + EipVerificationFailed(Box, Box), EthersHttpClientError(ethers::prelude::HttpClientError), EthersProviderError(ethers::prelude::ProviderError), EthersWsClientError(ethers::prelude::WsClientError), @@ -47,6 +50,12 @@ pub enum Web3ProxyError { max: u64, }, InvalidHeaderValue(InvalidHeaderValue), + InvalidEip, + InvalidInviteCode, + InvalidReferer, + InvalidSignatureLength, + InvalidUserAgent, + InvalidUserKey, IpAddrParse(AddrParseError), #[error(ignore)] #[from(ignore)] @@ -56,10 +65,14 @@ pub enum Web3ProxyError { NoServersSynced, NoHandleReady, NotFound, + NotImplemented, OriginRequired, #[error(ignore)] #[from(ignore)] OriginNotAllowed(headers::Origin), + ParseBytesError(ethers::types::ParseBytesError), + ParseMsgError(siwe::ParseError), + ParseAddressError, #[display(fmt = "{:?}, {:?}", _0, _1)] RateLimited(Authorization, Option), Redis(RedisError), @@ -84,9 +97,11 @@ pub enum Web3ProxyError { UserAgentRequired, #[error(ignore)] UserAgentNotAllowed(headers::UserAgent), + UserIdZero, + VerificationError(siwe::VerificationError), WatchRecvError(tokio::sync::watch::error::RecvError), WebsocketOnly, - #[display(fmt = "{}, {}", _0, _1)] + #[display(fmt = "{:?}, {}", _0, _1)] #[error(ignore)] WithContext(Option>, String), } @@ -130,6 +145,17 @@ impl Web3ProxyError { ), ) } + Self::BadRouting => { + error!("BadRouting"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + JsonRpcForwardedResponse::from_str( + "bad routing", + Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), + None, + ), + ) + } Self::Database(err) => { error!("database err={:?}", err); ( @@ -141,6 +167,23 @@ impl Web3ProxyError { ), ) } + Self::EipVerificationFailed(err_1, err_191) => { + warn!( + "EipVerificationFailed err_1={:#?} err2={:#?}", + err_1, err_191 + ); + ( + StatusCode::UNAUTHORIZED, + JsonRpcForwardedResponse::from_string( + format!( + "both the primary and eip191 verification failed: {:#?}; {:#?}", + err_1, err_191 + ), + Some(StatusCode::UNAUTHORIZED.as_u16().into()), + None, + ), + ) + } Self::EthersHttpClientError(err) => { warn!("EthersHttpClientError err={:?}", err); ( @@ -244,6 +287,72 @@ impl Web3ProxyError { ), ) } + Self::InvalidEip => { + warn!("InvalidEip"); + ( + StatusCode::UNAUTHORIZED, + JsonRpcForwardedResponse::from_str( + "invalid message eip given", + Some(StatusCode::UNAUTHORIZED.as_u16().into()), + None, + ), + ) + } + Self::InvalidInviteCode => { + warn!("InvalidInviteCode"); + ( + StatusCode::UNAUTHORIZED, + JsonRpcForwardedResponse::from_str( + "invalid invite code", + Some(StatusCode::UNAUTHORIZED.as_u16().into()), + None, + ), + ) + } + Self::InvalidReferer => { + warn!("InvalidReferer"); + ( + StatusCode::BAD_REQUEST, + JsonRpcForwardedResponse::from_str( + "invalid referer!", + Some(StatusCode::BAD_REQUEST.as_u16().into()), + None, + ), + ) + } + Self::InvalidSignatureLength => { + warn!("InvalidSignatureLength"); + ( + StatusCode::BAD_REQUEST, + JsonRpcForwardedResponse::from_str( + "invalid signature length", + Some(StatusCode::BAD_REQUEST.as_u16().into()), + None, + ), + ) + } + Self::InvalidUserAgent => { + warn!("InvalidUserAgent"); + ( + StatusCode::BAD_REQUEST, + JsonRpcForwardedResponse::from_str( + "invalid user agent!", + Some(StatusCode::BAD_REQUEST.as_u16().into()), + None, + ), + ) + } + Self::InvalidUserKey => { + warn!("InvalidUserKey"); + ( + StatusCode::BAD_REQUEST, + JsonRpcForwardedResponse::from_str( + "UserKey was not a ULID or UUID", + Some(StatusCode::BAD_REQUEST.as_u16().into()), + None, + ), + ) + } Self::JoinError(err) => { let code = if err.is_cancelled() { trace!("JoinError. likely shutting down. err={:?}", err); @@ -308,6 +417,17 @@ impl Web3ProxyError { ), ) } + Self::NotImplemented => { + warn!("NotImplemented"); + ( + StatusCode::NOT_IMPLEMENTED, + JsonRpcForwardedResponse::from_str( + "work in progress", + Some(StatusCode::NOT_IMPLEMENTED.as_u16().into()), + None, + ), + ) + } Self::OriginRequired => { warn!("OriginRequired"); ( @@ -330,6 +450,39 @@ impl Web3ProxyError { ), ) } + Self::ParseBytesError(err) => { + warn!("ParseBytesError err={:?}", err); + ( + StatusCode::BAD_REQUEST, + JsonRpcForwardedResponse::from_str( + "parse bytes error!", + Some(StatusCode::BAD_REQUEST.as_u16().into()), + None, + ), + ) + } + Self::ParseMsgError(err) => { + warn!("ParseMsgError err={:?}", err); + ( + StatusCode::BAD_REQUEST, + JsonRpcForwardedResponse::from_str( + "parse message error!", + Some(StatusCode::BAD_REQUEST.as_u16().into()), + None, + ), + ) + } + Self::ParseAddressError => { + warn!("ParseAddressError"); + ( + StatusCode::UNAUTHORIZED, + JsonRpcForwardedResponse::from_str( + "unable to parse address", + Some(StatusCode::BAD_REQUEST.as_u16().into()), + None, + ), + ) + } // TODO: this should actually by the id of the key. multiple users might control one key Self::RateLimited(authorization, retry_at) => { // TODO: emit a stat @@ -510,6 +663,28 @@ impl Web3ProxyError { ), ) } + Self::UserIdZero => { + warn!("UserIdZero"); + ( + StatusCode::BAD_REQUEST, + JsonRpcForwardedResponse::from_str( + "user ids should always be non-zero", + Some(StatusCode::BAD_REQUEST.as_u16().into()), + None, + ), + ) + } + Self::VerificationError(err) => { + warn!("VerificationError err={:?}", err); + ( + StatusCode::BAD_REQUEST, + JsonRpcForwardedResponse::from_str( + "verification error!", + Some(StatusCode::BAD_REQUEST.as_u16().into()), + None, + ), + ) + } Self::WatchRecvError(err) => { error!("WatchRecvError err={:?}", err); ( diff --git a/web3_proxy/src/frontend/users.rs b/web3_proxy/src/frontend/users.rs index e8a43ba3..c405b075 100644 --- a/web3_proxy/src/frontend/users.rs +++ b/web3_proxy/src/frontend/users.rs @@ -1,6 +1,6 @@ //! Handle registration, logins, and managing account data. use super::authorization::{login_is_authorized, RpcSecretKey}; -use super::errors::Web3ProxyResponse; +use super::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResponse}; use crate::app::Web3ProxyApp; use crate::http_params::{ get_chain_id_from_params, get_page_from_params, get_query_start_from_params, @@ -9,7 +9,6 @@ use crate::stats::influxdb_queries::query_user_stats; use crate::stats::StatType; use crate::user_token::UserBearerToken; use crate::{PostLogin, PostLoginQuery}; -use anyhow::Context; use axum::headers::{Header, Origin, Referer, UserAgent}; use axum::{ extract::{Path, Query}, @@ -81,11 +80,9 @@ pub async fn user_login_get( // TODO: allow ENS names here? let user_address: Address = params .remove("user_address") - // TODO: map_err so this becomes a 500. routing must be bad - .context("impossible")? + .ok_or(Web3ProxyError::BadRouting)? .parse() - // TODO: map_err so this becomes a 401 - .context("unable to parse address")?; + .or(Err(Web3ProxyError::ParseAddressError))?; let login_domain = app .config @@ -113,7 +110,7 @@ pub async fn user_login_get( resources: vec![], }; - let db_conn = app.db_conn().context("login requires a database")?; + let db_conn = app.db_conn().web3_context("login requires a database")?; // massage types to fit in the database. sea-orm does not make this very elegant let uuid = Uuid::from_u128(nonce.into()); @@ -135,7 +132,7 @@ pub async fn user_login_get( user_pending_login .save(&db_conn) .await - .context("saving user's pending_login")?; + .web3_context("saving user's pending_login")?; // there are multiple ways to sign messages and not all wallets support them // TODO: default message eip from config? @@ -148,8 +145,7 @@ pub async fn user_login_get( "eip191_hash" => Bytes::from(&message.eip191_hash().unwrap()).to_string(), "eip4361" => message.to_string(), _ => { - // TODO: custom error that is handled a 401 - return Err(anyhow::anyhow!("invalid message eip given").into()); + return Err(Web3ProxyError::InvalidEip); } }; @@ -169,9 +165,9 @@ pub async fn user_login_post( login_is_authorized(&app, ip).await?; // TODO: this seems too verbose. how can we simply convert a String into a [u8; 65] - let their_sig_bytes = Bytes::from_str(&payload.sig).context("parsing sig")?; + let their_sig_bytes = Bytes::from_str(&payload.sig).web3_context("parsing sig")?; if their_sig_bytes.len() != 65 { - return Err(anyhow::anyhow!("checking signature length").into()); + return Err(Web3ProxyError::InvalidSignatureLength); } let mut their_sig: [u8; 65] = [0; 65]; for x in 0..65 { @@ -181,17 +177,18 @@ pub async fn user_login_post( // we can't trust that they didn't tamper with the message in some way. like some clients return it hex encoded // TODO: checking 0x seems fragile, but I think it will be fine. siwe message text shouldn't ever start with 0x let their_msg: Message = if payload.msg.starts_with("0x") { - let their_msg_bytes = Bytes::from_str(&payload.msg).context("parsing payload message")?; + let their_msg_bytes = + Bytes::from_str(&payload.msg).web3_context("parsing payload message")?; // TODO: lossy or no? String::from_utf8_lossy(their_msg_bytes.as_ref()) .parse::() - .context("parsing hex string message")? + .web3_context("parsing hex string message")? } else { payload .msg .parse::() - .context("parsing string message")? + .web3_context("parsing string message")? }; // the only part of the message we will trust is their nonce @@ -199,7 +196,9 @@ pub async fn user_login_post( let login_nonce = UserBearerToken::from_str(&their_msg.nonce)?; // fetch the message we gave them from our database - let db_replica = app.db_replica().context("Getting database connection")?; + let db_replica = app + .db_replica() + .web3_context("Getting database connection")?; // massage type for the db let login_nonce_uuid: Uuid = login_nonce.clone().into(); @@ -208,13 +207,13 @@ pub async fn user_login_post( .filter(pending_login::Column::Nonce.eq(login_nonce_uuid)) .one(db_replica.conn()) .await - .context("database error while finding pending_login")? - .context("login nonce not found")?; + .web3_context("database error while finding pending_login")? + .web3_context("login nonce not found")?; let our_msg: siwe::Message = user_pending_login .message .parse() - .context("parsing siwe message")?; + .web3_context("parsing siwe message")?; // default options are fine. the message includes timestamp and domain and nonce let verify_config = VerificationOpts::default(); @@ -223,16 +222,16 @@ pub async fn user_login_post( if let Err(err_1) = our_msg .verify(&their_sig, &verify_config) .await - .context("verifying signature against our local message") + .web3_context("verifying signature against our local message") { // verification method 1 failed. try eip191 if let Err(err_191) = our_msg .verify_eip191(&their_sig) - .context("verifying eip191 signature against our local message") + .web3_context("verifying eip191 signature against our local message") { let db_conn = app .db_conn() - .context("deleting expired pending logins requires a db")?; + .web3_context("deleting expired pending logins requires a db")?; // delete ALL expired rows. let now = Utc::now(); @@ -244,12 +243,10 @@ pub async fn user_login_post( // TODO: emit a stat? if this is high something weird might be happening debug!("cleared expired pending_logins: {:?}", delete_result); - return Err(anyhow::anyhow!( - "both the primary and eip191 verification failed: {:#?}; {:#?}", - err_1, - err_191 - ) - .into()); + return Err(Web3ProxyError::EipVerificationFailed( + Box::new(err_1), + Box::new(err_191), + )); } } @@ -260,7 +257,7 @@ pub async fn user_login_post( .await .unwrap(); - let db_conn = app.db_conn().context("login requires a db")?; + let db_conn = app.db_conn().web3_context("login requires a db")?; let (u, uks, status_code) = match u { None => { @@ -270,7 +267,7 @@ pub async fn user_login_post( // TODO: more advanced invite codes that set different request/minute and concurrency limits if let Some(invite_code) = &app.config.invite_code { if query.invite_code.as_ref() != Some(invite_code) { - return Err(anyhow::anyhow!("checking invite_code").into()); + return Err(Web3ProxyError::InvalidInviteCode); } } @@ -300,7 +297,7 @@ pub async fn user_login_post( let uk = uk .insert(&txn) .await - .context("Failed saving new user key")?; + .web3_context("Failed saving new user key")?; let uks = vec![uk]; @@ -315,7 +312,7 @@ pub async fn user_login_post( .filter(rpc_key::Column::UserId.eq(u.id)) .all(db_replica.conn()) .await - .context("failed loading user's key")?; + .web3_context("failed loading user's key")?; (u, uks, StatusCode::OK) } @@ -355,7 +352,7 @@ pub async fn user_login_post( user_login .save(&db_conn) .await - .context("saving user login")?; + .web3_context("saving user login")?; if let Err(err) = user_pending_login .into_active_model() @@ -376,7 +373,9 @@ pub async fn user_logout_post( ) -> Web3ProxyResponse { let user_bearer = UserBearerToken::try_from(bearer)?; - let db_conn = app.db_conn().context("database needed for user logout")?; + let db_conn = app + .db_conn() + .web3_context("database needed for user logout")?; if let Err(err) = login::Entity::delete_many() .filter(login::Column::BearerToken.eq(user_bearer.uuid())) @@ -456,7 +455,7 @@ pub async fn user_post( // TODO: what else can we update here? password hash? subscription to newsletter? let user = if user.is_changed() { - let db_conn = app.db_conn().context("Getting database connection")?; + let db_conn = app.db_conn().web3_context("Getting database connection")?; user.save(&db_conn).await? } else { @@ -464,7 +463,7 @@ pub async fn user_post( user }; - let user: user::Model = user.try_into().context("Returning updated user")?; + let user: user::Model = user.try_into().web3_context("Returning updated user")?; Ok(Json(user).into_response()) } @@ -511,13 +510,13 @@ pub async fn rpc_keys_get( let db_replica = app .db_replica() - .context("db_replica is required to fetch a user's keys")?; + .web3_context("db_replica is required to fetch a user's keys")?; let uks = rpc_key::Entity::find() .filter(rpc_key::Column::UserId.eq(user.id)) .all(db_replica.conn()) .await - .context("failed loading user's key")?; + .web3_context("failed loading user's key")?; let response_json = json!({ "user_id": user.id, @@ -539,7 +538,7 @@ pub async fn rpc_keys_delete( let (_user, _semaphore) = app.bearer_is_authorized(bearer).await?; // TODO: think about how cascading deletes and billing should work - Err(anyhow::anyhow!("work in progress").into()) + Err(Web3ProxyError::NotImplemented) } /// the JSON input to the `rpc_keys_management` handler. @@ -572,7 +571,9 @@ pub async fn rpc_keys_management( let (user, _semaphore) = app.bearer_is_authorized(bearer).await?; - let db_replica = app.db_replica().context("getting db for user's keys")?; + let db_replica = app + .db_replica() + .web3_context("getting db for user's keys")?; let mut uk = if let Some(existing_key_id) = payload.key_id { // get the key and make sure it belongs to the user @@ -581,8 +582,8 @@ pub async fn rpc_keys_management( .filter(rpc_key::Column::Id.eq(existing_key_id)) .one(db_replica.conn()) .await - .context("failed loading user's key")? - .context("key does not exist or is not controlled by this bearer token")? + .web3_context("failed loading user's key")? + .web3_context("key does not exist or is not controlled by this bearer token")? .into_active_model() } else { // make a new key @@ -591,7 +592,7 @@ pub async fn rpc_keys_management( let log_level = payload .log_level - .context("log level must be 'none', 'detailed', or 'aggregated'")?; + .web3_context("log level must be 'none', 'detailed', or 'aggregated'")?; rpc_key::ActiveModel { user_id: sea_orm::Set(user.id), @@ -720,9 +721,11 @@ pub async fn rpc_keys_management( } let uk = if uk.is_changed() { - let db_conn = app.db_conn().context("login requires a db")?; + let db_conn = app.db_conn().web3_context("login requires a db")?; - uk.save(&db_conn).await.context("Failed saving user key")? + uk.save(&db_conn) + .await + .web3_context("Failed saving user key")? } else { uk }; @@ -757,13 +760,13 @@ pub async fn user_revert_logs_get( let db_replica = app .db_replica() - .context("getting replica db for user's revert logs")?; + .web3_context("getting replica db for user's revert logs")?; let uks = rpc_key::Entity::find() .filter(rpc_key::Column::UserId.eq(user.id)) .all(db_replica.conn()) .await - .context("failed loading user's key")?; + .web3_context("failed loading user's key")?; // TODO: only select the ids let uks: Vec<_> = uks.into_iter().map(|x| x.id).collect(); From 847e961eb063bdaa526ad4fe58ca66aa8afb0fac Mon Sep 17 00:00:00 2001 From: Rory Neithinger Date: Mon, 20 Mar 2023 13:07:24 -0700 Subject: [PATCH 7/9] remove more anyhows from app module the only ones left should be in the top level spawn functions. --- web3_proxy/src/app/mod.rs | 26 ++++++++-------- web3_proxy/src/app/ws.rs | 5 ++-- web3_proxy/src/frontend/authorization.rs | 2 +- web3_proxy/src/frontend/errors.rs | 38 ++++++++++++++++++++++-- 4 files changed, 51 insertions(+), 20 deletions(-) diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 2b8a6a4b..6215463b 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -4,7 +4,7 @@ mod ws; use crate::block_number::{block_needed, BlockNeeded}; use crate::config::{AppConfig, TopConfig}; use crate::frontend::authorization::{Authorization, RequestMetadata, RpcSecretKey}; -use crate::frontend::errors::{Web3ProxyError, Web3ProxyResult}; +use crate::frontend::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult}; use crate::frontend::rpc_proxy_ws::ProxyMode; use crate::jsonrpc::{ JsonRpcForwardedResponse, JsonRpcForwardedResponseEnum, JsonRpcRequest, JsonRpcRequestEnum, @@ -1089,12 +1089,11 @@ impl Web3ProxyApp { // TODO: improve flattening // get the head block now so that any requests that need it all use the same block - // TODO: Web3ProxyError that handles "no servers synced" in a consistent way // TODO: this still has an edge condition if there is a reorg in the middle of the request!!! let head_block_num = self .balanced_rpcs .head_block_num() - .context(anyhow::anyhow!("no servers synced"))?; + .ok_or(Web3ProxyError::NoServersSynced)?; let responses = join_all( requests @@ -1366,7 +1365,7 @@ impl Web3ProxyApp { let mut gas_estimate: U256 = if let Some(gas_estimate) = response.result.take() { serde_json::from_str(gas_estimate.get()) - .context("gas estimate result is not an U256")? + .or(Err(Web3ProxyError::GasEstimateNotU256))? } else { // i think this is always an error response let rpcs = request_metadata.backend_requests.lock().clone(); @@ -1456,15 +1455,15 @@ impl Web3ProxyApp { { let params = request .params - .context("there must be params if we got this far")?; + .web3_context("there must be params if we got this far")?; let params = params .as_array() - .context("there must be an array if we got this far")? + .web3_context("there must be an array if we got this far")? .get(0) - .context("there must be an item if we got this far")? + .web3_context("there must be an item if we got this far")? .as_str() - .context("there must be a string if we got this far")?; + .web3_context("there must be a string if we got this far")?; let params = Bytes::from_str(params) .expect("there must be Bytes if we got this far"); @@ -1586,7 +1585,8 @@ impl Web3ProxyApp { let param = Bytes::from_str( params[0] .as_str() - .context("parsing params 0 into str then bytes")?, + .ok_or(Web3ProxyError::ParseBytesError(None)) + .web3_context("parsing params 0 into str then bytes")?, ) .map_err(|x| { trace!("bad request: {:?}", x); @@ -1635,7 +1635,7 @@ impl Web3ProxyApp { // TODO: if no servers synced, wait for them to be synced? probably better to error and let haproxy retry another server let head_block_num = head_block_num .or(self.balanced_rpcs.head_block_num()) - .context("no servers synced")?; + .ok_or(Web3ProxyError::NoServersSynced)?; // we do this check before checking caches because it might modify the request params // TODO: add a stat for archive vs full since they should probably cost different @@ -1793,7 +1793,7 @@ impl Web3ProxyApp { stat_sender .send_async(response_stat.into()) .await - .context("stat_sender sending response_stat")?; + .map_err(Web3ProxyError::SendAppStatError)?; } return Ok((response, rpcs)); @@ -1816,7 +1816,7 @@ impl Web3ProxyApp { stat_sender .send_async(response_stat.into()) .await - .context("stat_sender sending response stat")?; + .map_err(Web3ProxyError::SendAppStatError)?; } if let Some((kafka_topic, kafka_key, kafka_headers)) = kafka_stuff { @@ -1826,7 +1826,7 @@ impl Web3ProxyApp { .expect("if headers are set, producer must exist"); let response_bytes = - rmp_serde::to_vec(&response).context("failed msgpack serialize response")?; + rmp_serde::to_vec(&response).web3_context("failed msgpack serialize response")?; let f = async move { let produce_future = kafka_producer.send( diff --git a/web3_proxy/src/app/ws.rs b/web3_proxy/src/app/ws.rs index 2366ab31..cc36e9d6 100644 --- a/web3_proxy/src/app/ws.rs +++ b/web3_proxy/src/app/ws.rs @@ -2,12 +2,11 @@ use super::Web3ProxyApp; use crate::frontend::authorization::{Authorization, RequestMetadata}; -use crate::frontend::errors::Web3ProxyResult; +use crate::frontend::errors::{Web3ProxyErrorContext, Web3ProxyResult}; use crate::jsonrpc::JsonRpcForwardedResponse; use crate::jsonrpc::JsonRpcRequest; use crate::rpcs::transactions::TxStatus; use crate::stats::RpcQueryStats; -use anyhow::Context; use axum::extract::ws::Message; use ethers::prelude::U64; use futures::future::AbortHandle; @@ -31,7 +30,7 @@ impl Web3ProxyApp { ) -> Web3ProxyResult<(AbortHandle, JsonRpcForwardedResponse)> { // TODO: this is not efficient let request_bytes = serde_json::to_string(&request_json) - .context("finding request size")? + .web3_context("finding request size")? .len(); let request_metadata = Arc::new(RequestMetadata::new(request_bytes)); diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index 03e80ef8..3fac5717 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -778,7 +778,7 @@ impl Web3ProxyApp { }) .await; - authorization_checks.map_err(Web3ProxyError::SeaRc) + authorization_checks.map_err(Web3ProxyError::Arc) } /// Authorized the ip/origin/referer/useragent and rate limit and concurrency diff --git a/web3_proxy/src/frontend/errors.rs b/web3_proxy/src/frontend/errors.rs index 2546ae14..4e43fbf8 100644 --- a/web3_proxy/src/frontend/errors.rs +++ b/web3_proxy/src/frontend/errors.rs @@ -31,6 +31,7 @@ pub enum Web3ProxyError { AccessDenied, #[error(ignore)] Anyhow(anyhow::Error), + Arc(Arc), #[error(ignore)] #[from(ignore)] BadRequest(String), @@ -41,6 +42,7 @@ pub enum Web3ProxyError { EthersHttpClientError(ethers::prelude::HttpClientError), EthersProviderError(ethers::prelude::ProviderError), EthersWsClientError(ethers::prelude::WsClientError), + GasEstimateNotU256, Headers(headers::Error), HeaderToString(ToStrError), InfluxDb2RequestError(influxdb2::RequestError), @@ -70,7 +72,9 @@ pub enum Web3ProxyError { #[error(ignore)] #[from(ignore)] OriginNotAllowed(headers::Origin), - ParseBytesError(ethers::types::ParseBytesError), + #[display(fmt = "{:?}", _0)] + #[error(ignore)] + ParseBytesError(Option), ParseMsgError(siwe::ParseError), ParseAddressError, #[display(fmt = "{:?}, {:?}", _0, _1)] @@ -81,8 +85,8 @@ pub enum Web3ProxyError { #[error(ignore)] #[from(ignore)] RefererNotAllowed(headers::Referer), - SeaRc(Arc), SemaphoreAcquireError(AcquireError), + SendAppStatError(flume::SendError), SerdeJson(serde_json::Error), /// simple way to return an error message to the user and an anyhow to our logs #[display(fmt = "{}, {}, {:?}", _0, _1, _2)] @@ -217,6 +221,17 @@ impl Web3ProxyError { ), ) } + Self::GasEstimateNotU256 => { + warn!("GasEstimateNotU256"); + ( + StatusCode::BAD_REQUEST, + JsonRpcForwardedResponse::from_str( + "gas estimate result is not an U256", + Some(StatusCode::BAD_REQUEST.as_u16().into()), + None, + ), + ) + } Self::Headers(err) => { warn!("HeadersError {:?}", err); ( @@ -548,7 +563,7 @@ impl Web3ProxyError { ), ) } - Self::SeaRc(err) => match migration::SeaRc::try_unwrap(err) { + Self::Arc(err) => match Arc::try_unwrap(err) { Ok(err) => err, Err(err) => Self::Anyhow(anyhow::anyhow!("{}", err)), } @@ -565,6 +580,17 @@ impl Web3ProxyError { ), ) } + Self::SendAppStatError(err) => { + error!("SendAppStatError err={:?}", err); + ( + StatusCode::INTERNAL_SERVER_ERROR, + JsonRpcForwardedResponse::from_str( + "error stat_sender sending response_stat", + Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), + None, + ), + ) + } Self::SerdeJson(err) => { warn!("serde json err={:?}", err); ( @@ -725,6 +751,12 @@ impl Web3ProxyError { } } +impl From for Web3ProxyError { + fn from(err: ethers::types::ParseBytesError) -> Self { + Self::ParseBytesError(Some(err)) + } +} + impl From for Web3ProxyError { fn from(err: tokio::time::error::Elapsed) -> Self { Self::Timeout(Some(err)) From fe2a760c8a405933c3c61526c3541166fbf33c73 Mon Sep 17 00:00:00 2001 From: Rory Neithinger Date: Mon, 20 Mar 2023 13:45:21 -0700 Subject: [PATCH 8/9] more conversions to Web3ProxyError --- web3_proxy/src/app/ws.rs | 4 +- web3_proxy/src/frontend/errors.rs | 77 +++++++++++++++++++++++++++++-- web3_proxy/src/rpcs/blockchain.rs | 46 ++++++++++-------- web3_proxy/src/rpcs/consensus.rs | 32 ++++++------- 4 files changed, 117 insertions(+), 42 deletions(-) diff --git a/web3_proxy/src/app/ws.rs b/web3_proxy/src/app/ws.rs index cc36e9d6..8f42efe6 100644 --- a/web3_proxy/src/app/ws.rs +++ b/web3_proxy/src/app/ws.rs @@ -2,7 +2,7 @@ use super::Web3ProxyApp; use crate::frontend::authorization::{Authorization, RequestMetadata}; -use crate::frontend::errors::{Web3ProxyErrorContext, Web3ProxyResult}; +use crate::frontend::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult}; use crate::jsonrpc::JsonRpcForwardedResponse; use crate::jsonrpc::JsonRpcRequest; use crate::rpcs::transactions::TxStatus; @@ -341,7 +341,7 @@ impl Web3ProxyApp { ); }); } - _ => return Err(anyhow::anyhow!("unimplemented").into()), + _ => return Err(Web3ProxyError::NotImplemented), } // TODO: do something with subscription_join_handle? diff --git a/web3_proxy/src/frontend/errors.rs b/web3_proxy/src/frontend/errors.rs index 4e43fbf8..a7c01bf4 100644 --- a/web3_proxy/src/frontend/errors.rs +++ b/web3_proxy/src/frontend/errors.rs @@ -42,11 +42,13 @@ pub enum Web3ProxyError { EthersHttpClientError(ethers::prelude::HttpClientError), EthersProviderError(ethers::prelude::ProviderError), EthersWsClientError(ethers::prelude::WsClientError), + FlumeRecvError(flume::RecvError), GasEstimateNotU256, Headers(headers::Error), HeaderToString(ToStrError), InfluxDb2RequestError(influxdb2::RequestError), #[display(fmt = "{} > {}", min, max)] + #[from(ignore)] InvalidBlockBounds { min: u64, max: u64, @@ -64,8 +66,16 @@ pub enum Web3ProxyError { IpNotAllowed(IpAddr), JoinError(JoinError), MsgPackEncode(rmp_serde::encode::Error), - NoServersSynced, + NoBlockNumberOrHash, + NoBlocksKnown, NoHandleReady, + NoServersSynced, + #[display(fmt = "{}/{}", num_known, min_head_rpcs)] + #[from(ignore)] + NotEnoughRpcs { + num_known: usize, + min_head_rpcs: usize, + }, NotFound, NotImplemented, OriginRequired, @@ -104,6 +114,7 @@ pub enum Web3ProxyError { UserIdZero, VerificationError(siwe::VerificationError), WatchRecvError(tokio::sync::watch::error::RecvError), + WatchSendError, WebsocketOnly, #[display(fmt = "{:?}, {}", _0, _1)] #[error(ignore)] @@ -221,6 +232,17 @@ impl Web3ProxyError { ), ) } + Self::FlumeRecvError(err) => { + warn!("FlumeRecvError err={:#?}", err); + ( + StatusCode::INTERNAL_SERVER_ERROR, + JsonRpcForwardedResponse::from_str( + "flume recv error!", + Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), + None, + ), + ) + } Self::GasEstimateNotU256 => { warn!("GasEstimateNotU256"); ( @@ -398,12 +420,23 @@ impl Web3ProxyError { ), ) } - Self::NoServersSynced => { - warn!("NoServersSynced"); + Self::NoBlockNumberOrHash => { + warn!("NoBlockNumberOrHash"); + ( + StatusCode::BAD_REQUEST, + JsonRpcForwardedResponse::from_str( + "Blocks here must have a number or hash", + Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), + None, + ), + ) + } + Self::NoBlocksKnown => { + error!("NoBlocksKnown"); ( StatusCode::INTERNAL_SERVER_ERROR, JsonRpcForwardedResponse::from_str( - "no servers synced", + "no blocks known", Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), None, ), @@ -420,6 +453,31 @@ impl Web3ProxyError { ), ) } + Self::NoServersSynced => { + warn!("NoServersSynced"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + JsonRpcForwardedResponse::from_str( + "no servers synced", + Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), + None, + ), + ) + } + Self::NotEnoughRpcs { + num_known, + min_head_rpcs, + } => { + error!("NotEnoughRpcs {}/{}", num_known, min_head_rpcs); + ( + StatusCode::INTERNAL_SERVER_ERROR, + JsonRpcForwardedResponse::from_string( + format!("not enough rpcs connected {}/{}", num_known, min_head_rpcs), + Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), + None, + ), + ) + } Self::NotFound => { // TODO: emit a stat? // TODO: instead of an error, show a normal html page for 404 @@ -722,6 +780,17 @@ impl Web3ProxyError { ), ) } + Self::WatchSendError => { + error!("WatchSendError"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + JsonRpcForwardedResponse::from_str( + "watch send error!", + Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), + None, + ), + ) + } Self::WebsocketOnly => { warn!("WebsocketOnly"); ( diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index b6c60f2b..6cbfd9ae 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -4,9 +4,8 @@ use super::many::Web3Rpcs; use super::one::Web3Rpc; use super::transactions::TxStatus; use crate::frontend::authorization::Authorization; -use crate::frontend::errors::Web3ProxyResult; +use crate::frontend::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult}; use crate::{config::BlockAndRpc, jsonrpc::JsonRpcRequest}; -use anyhow::{anyhow, Context}; use derive_more::From; use ethers::prelude::{Block, TxHash, H256, U64}; use log::{debug, trace, warn, Level}; @@ -125,11 +124,11 @@ impl Web3ProxyBlock { } impl TryFrom for Web3ProxyBlock { - type Error = anyhow::Error; + type Error = Web3ProxyError; fn try_from(x: ArcBlock) -> Result { if x.number.is_none() || x.hash.is_none() { - return Err(anyhow!("Blocks here must have a number of hash")); + return Err(Web3ProxyError::NoBlockNumberOrHash); } let b = Web3ProxyBlock { @@ -191,7 +190,7 @@ impl Web3Rpcs { /// Get a block from caches with fallback. /// Will query a specific node or the best available. - /// TODO: return anyhow::Result>? + /// TODO: return Web3ProxyResult>? pub async fn block( &self, authorization: &Arc, @@ -226,7 +225,7 @@ impl Web3Rpcs { x.try_into().ok() } }) - .context("no block!")?, + .web3_context("no block!")?, None => { // TODO: helper for method+params => JsonRpcRequest // TODO: does this id matter? @@ -245,11 +244,11 @@ impl Web3Rpcs { ) .await?; - let block = response.result.context("failed fetching block")?; + let block = response.result.web3_context("failed fetching block")?; let block: Option = serde_json::from_str(block.get())?; - let block: ArcBlock = block.context("no block in the response")?; + let block: ArcBlock = block.web3_context("no block in the response")?; // TODO: received time is going to be weird Web3ProxyBlock::try_from(block)? @@ -290,7 +289,7 @@ impl Web3Rpcs { let mut consensus_head_receiver = self .watch_consensus_head_sender .as_ref() - .context("need new head subscriptions to fetch cannonical_block")? + .web3_context("need new head subscriptions to fetch cannonical_block")? .subscribe(); // be sure the requested block num exists @@ -298,7 +297,7 @@ impl Web3Rpcs { let mut head_block_num = *consensus_head_receiver .borrow_and_update() .as_ref() - .context("no consensus head block")? + .web3_context("no consensus head block")? .number(); loop { @@ -342,7 +341,7 @@ impl Web3Rpcs { debug!("could not find canonical block {}: {:?}", num, err); } - let raw_block = response.result.context("no cannonical block result")?; + let raw_block = response.result.web3_context("no cannonical block result")?; let block: ArcBlock = serde_json::from_str(raw_block.get())?; @@ -400,12 +399,12 @@ impl Web3Rpcs { new_block: Option, rpc: Arc, _pending_tx_sender: &Option>, - ) -> anyhow::Result<()> { + ) -> Web3ProxyResult<()> { // TODO: how should we handle an error here? if !consensus_finder .update_rpc(new_block.clone(), rpc.clone(), self) .await - .context("failed to update rpc")? + .web3_context("failed to update rpc")? { // nothing changed. no need to scan for a new consensus head return Ok(()); @@ -414,7 +413,7 @@ impl Web3Rpcs { let new_consensus = consensus_finder .best_consensus_connections(authorization, self) .await - .context("no consensus head block!") + .web3_context("no consensus head block!") .map_err(|err| { self.watch_consensus_rpcs_sender.send_replace(None); @@ -473,7 +472,8 @@ impl Web3Rpcs { watch_consensus_head_sender .send(Some(consensus_head_block)) - .context( + .or(Err(Web3ProxyError::WatchSendError)) + .web3_context( "watch_consensus_head_sender failed sending first consensus_head_block", )?; } @@ -529,11 +529,12 @@ impl Web3Rpcs { let consensus_head_block = self .try_cache_block(consensus_head_block, true) .await - .context("save consensus_head_block as heaviest chain")?; + .web3_context("save consensus_head_block as heaviest chain")?; watch_consensus_head_sender .send(Some(consensus_head_block)) - .context("watch_consensus_head_sender failed sending uncled consensus_head_block")?; + .or(Err(Web3ProxyError::WatchSendError)) + .web3_context("watch_consensus_head_sender failed sending uncled consensus_head_block")?; } } Ordering::Less => { @@ -563,11 +564,14 @@ impl Web3Rpcs { let consensus_head_block = self .try_cache_block(consensus_head_block, true) .await - .context("save_block sending consensus_head_block as heaviest chain")?; + .web3_context( + "save_block sending consensus_head_block as heaviest chain", + )?; watch_consensus_head_sender .send(Some(consensus_head_block)) - .context("watch_consensus_head_sender failed sending rollback consensus_head_block")?; + .or(Err(Web3ProxyError::WatchSendError)) + .web3_context("watch_consensus_head_sender failed sending rollback consensus_head_block")?; } Ordering::Greater => { debug!( @@ -592,7 +596,9 @@ impl Web3Rpcs { let consensus_head_block = self.try_cache_block(consensus_head_block, true).await?; - watch_consensus_head_sender.send(Some(consensus_head_block)).context("watch_consensus_head_sender failed sending new consensus_head_block")?; + watch_consensus_head_sender.send(Some(consensus_head_block)) + .or(Err(Web3ProxyError::WatchSendError)) + .web3_context("watch_consensus_head_sender failed sending new consensus_head_block")?; } } } diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index 373a1dd8..b9666db8 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -2,7 +2,7 @@ use super::blockchain::Web3ProxyBlock; use super::many::Web3Rpcs; use super::one::Web3Rpc; use crate::frontend::authorization::Authorization; -use anyhow::Context; +use crate::frontend::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult}; use ethers::prelude::{H256, U64}; use hashbrown::{HashMap, HashSet}; use log::{debug, trace, warn}; @@ -169,9 +169,9 @@ impl ConnectionsGroup { web3_rpcs: &Web3Rpcs, min_consensus_block_num: Option, tier: &u64, - ) -> anyhow::Result { + ) -> Web3ProxyResult { let mut maybe_head_block = match self.highest_block.clone() { - None => return Err(anyhow::anyhow!("no blocks known")), + None => return Err(Web3ProxyError::NoBlocksKnown), Some(x) => x, }; @@ -196,11 +196,10 @@ impl ConnectionsGroup { let num_known = self.rpc_to_block.len(); if num_known < web3_rpcs.min_head_rpcs { - return Err(anyhow::anyhow!( - "not enough rpcs connected: {}/{}", + return Err(Web3ProxyError::NotEnoughRpcs { num_known, - web3_rpcs.min_head_rpcs, - )); + min_head_rpcs: web3_rpcs.min_head_rpcs, + }); } let mut primary_rpcs_voted: Option = None; @@ -256,7 +255,7 @@ impl ConnectionsGroup { warn!("connection missing: {}", rpc_name); debug!("web3_rpcs.by_name: {:#?}", web3_rpcs.by_name); } else { - return Err(anyhow::anyhow!("not synced")); + return Err(Web3ProxyError::NoServersSynced); } } } @@ -309,7 +308,7 @@ impl ConnectionsGroup { warn!("{}", err_msg); break; } else { - return Err(anyhow::anyhow!(err_msg)); + return Err(anyhow::anyhow!(err_msg).into()); } } } @@ -334,7 +333,8 @@ impl ConnectionsGroup { primary_sum_soft_limit, web3_rpcs.min_sum_soft_limit, soft_limit_percent, - )); + ) + .into()); } // success! this block has enough soft limit and nodes on it (or on later blocks) @@ -462,7 +462,7 @@ impl ConsensusFinder { rpc: Arc, // we need this so we can save the block to caches. i don't like it though. maybe we should use a lazy_static Cache wrapper that has a "save_block" method?. i generally dislike globals but i also dislike all the types having to pass eachother around web3_connections: &Web3Rpcs, - ) -> anyhow::Result { + ) -> Web3ProxyResult { // add the rpc's block to connection_heads, or remove the rpc from connection_heads let changed = match rpc_head_block { Some(mut rpc_head_block) => { @@ -470,7 +470,7 @@ impl ConsensusFinder { rpc_head_block = web3_connections .try_cache_block(rpc_head_block, false) .await - .context("failed caching block")?; + .web3_context("failed caching block")?; // if let Some(max_block_lag) = max_block_lag { // if rpc_head_block.number() < ??? { @@ -509,14 +509,14 @@ impl ConsensusFinder { &mut self, authorization: &Arc, web3_connections: &Web3Rpcs, - ) -> anyhow::Result { + ) -> Web3ProxyResult { // TODO: attach context to these? let highest_known_block = self .all_rpcs_group() - .context("no rpcs")? + .web3_context("no rpcs")? .highest_block .as_ref() - .context("no highest block")?; + .web3_context("no highest block")?; trace!("highest_known_block: {}", highest_known_block); @@ -545,7 +545,7 @@ impl ConsensusFinder { } } - return Err(anyhow::anyhow!("failed finding consensus on all tiers")); + return Err(anyhow::anyhow!("failed finding consensus on all tiers").into()); } } From e9c1d019fc7b2109dc6c96d16be5417020fc3bd0 Mon Sep 17 00:00:00 2001 From: Rory Neithinger Date: Tue, 21 Mar 2023 22:52:46 -0700 Subject: [PATCH 9/9] fix some incorrect error codes and log levels --- web3_proxy/src/frontend/errors.rs | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/web3_proxy/src/frontend/errors.rs b/web3_proxy/src/frontend/errors.rs index a7c01bf4..c74d2d62 100644 --- a/web3_proxy/src/frontend/errors.rs +++ b/web3_proxy/src/frontend/errors.rs @@ -183,7 +183,7 @@ impl Web3ProxyError { ) } Self::EipVerificationFailed(err_1, err_191) => { - warn!( + info!( "EipVerificationFailed err_1={:#?} err2={:#?}", err_1, err_191 ); @@ -246,10 +246,10 @@ impl Web3ProxyError { Self::GasEstimateNotU256 => { warn!("GasEstimateNotU256"); ( - StatusCode::BAD_REQUEST, + StatusCode::INTERNAL_SERVER_ERROR, JsonRpcForwardedResponse::from_str( "gas estimate result is not an U256", - Some(StatusCode::BAD_REQUEST.as_u16().into()), + Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), None, ), ) @@ -278,7 +278,7 @@ impl Web3ProxyError { ) } Self::InvalidBlockBounds { min, max } => { - warn!("InvalidBlockBounds min={} max={}", min, max); + debug!("InvalidBlockBounds min={} max={}", min, max); ( StatusCode::BAD_REQUEST, JsonRpcForwardedResponse::from_string( @@ -426,7 +426,7 @@ impl Web3ProxyError { StatusCode::BAD_REQUEST, JsonRpcForwardedResponse::from_str( "Blocks here must have a number or hash", - Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), + Some(StatusCode::BAD_REQUEST.as_u16().into()), None, ), ) @@ -445,10 +445,10 @@ impl Web3ProxyError { Self::NoHandleReady => { error!("NoHandleReady"); ( - StatusCode::INTERNAL_SERVER_ERROR, + StatusCode::BAD_GATEWAY, JsonRpcForwardedResponse::from_str( "unable to retry for request handle", - Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), + Some(StatusCode::BAD_GATEWAY.as_u16().into()), None, ), ) @@ -456,10 +456,10 @@ impl Web3ProxyError { Self::NoServersSynced => { warn!("NoServersSynced"); ( - StatusCode::INTERNAL_SERVER_ERROR, + StatusCode::BAD_GATEWAY, JsonRpcForwardedResponse::from_str( "no servers synced", - Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), + Some(StatusCode::BAD_GATEWAY.as_u16().into()), None, ), ) @@ -470,10 +470,10 @@ impl Web3ProxyError { } => { error!("NotEnoughRpcs {}/{}", num_known, min_head_rpcs); ( - StatusCode::INTERNAL_SERVER_ERROR, + StatusCode::BAD_GATEWAY, JsonRpcForwardedResponse::from_string( format!("not enough rpcs connected {}/{}", num_known, min_head_rpcs), - Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), + Some(StatusCode::BAD_GATEWAY.as_u16().into()), None, ), ) @@ -491,7 +491,7 @@ impl Web3ProxyError { ) } Self::NotImplemented => { - warn!("NotImplemented"); + trace!("NotImplemented"); ( StatusCode::NOT_IMPLEMENTED, JsonRpcForwardedResponse::from_str(