diff --git a/web3_proxy/src/admin_queries.rs b/web3_proxy/src/admin_queries.rs index 8538a691..06500213 100644 --- a/web3_proxy/src/admin_queries.rs +++ b/web3_proxy/src/admin_queries.rs @@ -1,8 +1,8 @@ use crate::app::Web3ProxyApp; -use crate::frontend::errors::FrontendErrorResponse; +use crate::frontend::errors::{Web3ProxyError, Web3ProxyResponse}; use crate::http_params::get_user_id_from_params; use anyhow::Context; -use axum::response::{IntoResponse, Response}; +use axum::response::IntoResponse; use axum::{ headers::{authorization::Bearer, Authorization}, Json, TypedHeader, @@ -24,25 +24,21 @@ pub async fn query_admin_modify_usertier<'a>( app: &'a Web3ProxyApp, bearer: Option>>, params: &'a HashMap, -) -> Result { +) -> Web3ProxyResponse { // Quickly return if any of the input tokens are bad let user_address: Vec = params .get("user_address") .ok_or_else(|| { - FrontendErrorResponse::BadRequest( - "Unable to find user_address key in request".to_string(), - ) + Web3ProxyError::BadRequest("Unable to find user_address key in request".to_string()) })? .parse::
() .map_err(|_| { - FrontendErrorResponse::BadRequest( - "Unable to parse user_address as an Address".to_string(), - ) + Web3ProxyError::BadRequest("Unable to parse user_address as an Address".to_string()) })? .to_fixed_bytes() .into(); let user_tier_title = params.get("user_tier_title").ok_or_else(|| { - FrontendErrorResponse::BadRequest( + Web3ProxyError::BadRequest( "Unable to get the user_tier_title key from the request".to_string(), ) })?; @@ -78,7 +74,7 @@ pub async fn query_admin_modify_usertier<'a>( .filter(admin::Column::UserId.eq(caller_id)) .one(&db_conn) .await? - .ok_or(FrontendErrorResponse::AccessDenied)?; + .ok_or(Web3ProxyError::AccessDenied)?; // If we are here, that means an admin was found, and we can safely proceed @@ -87,7 +83,7 @@ pub async fn query_admin_modify_usertier<'a>( .filter(user::Column::Address.eq(user_address)) .one(&db_conn) .await? - .ok_or(FrontendErrorResponse::BadRequest( + .ok_or(Web3ProxyError::BadRequest( "No user with this id found".to_string(), ))?; // Return early if the target user_tier_id is the same as the original user_tier_id @@ -101,7 +97,7 @@ pub async fn query_admin_modify_usertier<'a>( .filter(user_tier::Column::Title.eq(user_tier_title.clone())) .one(&db_conn) .await? - .ok_or(FrontendErrorResponse::BadRequest( + .ok_or(Web3ProxyError::BadRequest( "User Tier name was not found".to_string(), ))?; diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 2c93faac..ed5ac64b 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -4,12 +4,12 @@ mod ws; use crate::block_number::{block_needed, BlockNeeded}; use crate::config::{AppConfig, TopConfig}; use crate::frontend::authorization::{Authorization, RequestMetadata, RpcSecretKey}; -use crate::frontend::errors::FrontendErrorResponse; +use crate::frontend::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult}; use crate::frontend::rpc_proxy_ws::ProxyMode; use crate::jsonrpc::{ JsonRpcForwardedResponse, JsonRpcForwardedResponseEnum, JsonRpcRequest, JsonRpcRequestEnum, }; -use crate::rpcs::blockchain::{BlocksByHashCache, Web3ProxyBlock}; +use crate::rpcs::blockchain::Web3ProxyBlock; use crate::rpcs::consensus::ConsensusWeb3Rpcs; use crate::rpcs::many::Web3Rpcs; use crate::rpcs::one::Web3Rpc; @@ -18,6 +18,7 @@ use crate::stats::{AppStat, RpcQueryStats, StatBuffer}; use crate::user_token::UserBearerToken; use anyhow::Context; use axum::headers::{Origin, Referer, UserAgent}; +use axum::http::StatusCode; use chrono::Utc; use deferred_rate_limiter::DeferredRateLimiter; use derive_more::From; @@ -1051,7 +1052,7 @@ impl Web3ProxyApp { self: &Arc, authorization: Arc, request: JsonRpcRequestEnum, - ) -> Result<(JsonRpcForwardedResponseEnum, Vec>), FrontendErrorResponse> { + ) -> Web3ProxyResult<(JsonRpcForwardedResponseEnum, Vec>)> { // trace!(?request, "proxy_web3_rpc"); // even though we have timeouts on the requests to our backend providers, @@ -1089,7 +1090,7 @@ impl Web3ProxyApp { self: &Arc, authorization: &Arc, requests: Vec, - ) -> Result<(Vec, Vec>), FrontendErrorResponse> { + ) -> Web3ProxyResult<(Vec, Vec>)> { // TODO: we should probably change ethers-rs to support this directly. they pushed this off to v2 though let num_requests = requests.len(); @@ -1097,12 +1098,11 @@ impl Web3ProxyApp { // TODO: improve flattening // get the head block now so that any requests that need it all use the same block - // TODO: FrontendErrorResponse that handles "no servers synced" in a consistent way // TODO: this still has an edge condition if there is a reorg in the middle of the request!!! let head_block_num = self .balanced_rpcs .head_block_num() - .context(anyhow::anyhow!("no servers synced"))?; + .ok_or(Web3ProxyError::NoServersSynced)?; let responses = join_all( requests @@ -1163,10 +1163,10 @@ impl Web3ProxyApp { authorization: &Arc, mut request: JsonRpcRequest, head_block_num: Option, - ) -> Result<(JsonRpcForwardedResponse, Vec>), FrontendErrorResponse> { + ) -> Web3ProxyResult<(JsonRpcForwardedResponse, Vec>)> { // trace!("Received request: {:?}", request); - let request_metadata = Arc::new(RequestMetadata::new(request.num_bytes())?); + let request_metadata = Arc::new(RequestMetadata::new(request.num_bytes())); let mut kafka_stuff = None; @@ -1347,10 +1347,7 @@ impl Web3ProxyApp { } None => { // TODO: what does geth do if this happens? - // TODO: i think we want a 502 so that haproxy retries on another server - return Err( - anyhow::anyhow!("no servers synced. unknown eth_blockNumber").into(), - ); + return Err(Web3ProxyError::UnknownBlockNumber); } } } @@ -1377,7 +1374,7 @@ impl Web3ProxyApp { let mut gas_estimate: U256 = if let Some(gas_estimate) = response.result.take() { serde_json::from_str(gas_estimate.get()) - .context("gas estimate result is not an U256")? + .or(Err(Web3ProxyError::GasEstimateNotU256))? } else { // i think this is always an error response let rpcs = request_metadata.backend_requests.lock().clone(); @@ -1438,7 +1435,7 @@ impl Web3ProxyApp { let head_block_num = head_block_num .or(self.balanced_rpcs.head_block_num()) - .ok_or_else(|| anyhow::anyhow!("no servers synced"))?; + .ok_or_else(|| Web3ProxyError::NoServersSynced)?; // TODO: error/wait if no head block! @@ -1467,15 +1464,15 @@ impl Web3ProxyApp { { let params = request .params - .context("there must be params if we got this far")?; + .web3_context("there must be params if we got this far")?; let params = params .as_array() - .context("there must be an array if we got this far")? + .web3_context("there must be an array if we got this far")? .get(0) - .context("there must be an item if we got this far")? + .web3_context("there must be an item if we got this far")? .as_str() - .context("there must be a string if we got this far")?; + .web3_context("there must be a string if we got this far")?; let params = Bytes::from_str(params) .expect("there must be Bytes if we got this far"); @@ -1599,11 +1596,12 @@ impl Web3ProxyApp { let param = Bytes::from_str( params[0] .as_str() - .context("parsing params 0 into str then bytes")?, + .ok_or(Web3ProxyError::ParseBytesError(None)) + .web3_context("parsing params 0 into str then bytes")?, ) .map_err(|x| { trace!("bad request: {:?}", x); - FrontendErrorResponse::BadRequest( + Web3ProxyError::BadRequest( "param 0 could not be read as H256".to_string(), ) })?; @@ -1618,7 +1616,7 @@ impl Web3ProxyApp { return Ok(( JsonRpcForwardedResponse::from_str( "invalid request", - None, + Some(StatusCode::BAD_REQUEST.as_u16().into()), Some(request_id), ), vec![], @@ -1640,7 +1638,7 @@ impl Web3ProxyApp { method => { if method.starts_with("admin_") { // TODO: emit a stat? will probably just be noise - return Err(FrontendErrorResponse::AccessDenied); + return Err(Web3ProxyError::AccessDenied); } // emit stats @@ -1648,7 +1646,7 @@ impl Web3ProxyApp { // TODO: if no servers synced, wait for them to be synced? probably better to error and let haproxy retry another server let head_block_num = head_block_num .or(self.balanced_rpcs.head_block_num()) - .context("no servers synced")?; + .ok_or(Web3ProxyError::NoServersSynced)?; // we do this check before checking caches because it might modify the request params // TODO: add a stat for archive vs full since they should probably cost different @@ -1771,17 +1769,10 @@ impl Web3ProxyApp { // TODO: only cache the inner response // TODO: how are we going to stream this? // TODO: check response size. if its very large, return it in a custom Error type that bypasses caching? or will moka do that for us? - Ok::<_, anyhow::Error>(response) + Ok::<_, Web3ProxyError>(response) }) - .await - // TODO: what is the best way to handle an Arc here? - .map_err(|err| { - // TODO: emit a stat for an error - anyhow::anyhow!( - "error while caching and forwarding response: {}", - err - ) - })? + // TODO: add context (error while caching and forwarding response {}) + .await? } else { self.balanced_rpcs .try_proxy_connection( @@ -1813,7 +1804,7 @@ impl Web3ProxyApp { stat_sender .send_async(response_stat.into()) .await - .context("stat_sender sending response_stat")?; + .map_err(Web3ProxyError::SendAppStatError)?; } return Ok((response, rpcs)); @@ -1836,7 +1827,7 @@ impl Web3ProxyApp { stat_sender .send_async(response_stat.into()) .await - .context("stat_sender sending response stat")?; + .map_err(Web3ProxyError::SendAppStatError)?; } if let Some((kafka_topic, kafka_key, kafka_headers)) = kafka_stuff { @@ -1846,7 +1837,7 @@ impl Web3ProxyApp { .expect("if headers are set, producer must exist"); let response_bytes = - rmp_serde::to_vec(&response).context("failed msgpack serialize response")?; + rmp_serde::to_vec(&response).web3_context("failed msgpack serialize response")?; let f = async move { let produce_future = kafka_producer.send( diff --git a/web3_proxy/src/app/ws.rs b/web3_proxy/src/app/ws.rs index 037e560b..4b57c8a8 100644 --- a/web3_proxy/src/app/ws.rs +++ b/web3_proxy/src/app/ws.rs @@ -2,11 +2,11 @@ use super::Web3ProxyApp; use crate::frontend::authorization::{Authorization, RequestMetadata}; +use crate::frontend::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult}; use crate::jsonrpc::JsonRpcForwardedResponse; use crate::jsonrpc::JsonRpcRequest; use crate::rpcs::transactions::TxStatus; use crate::stats::RpcQueryStats; -use anyhow::Context; use axum::extract::ws::Message; use ethers::prelude::U64; use futures::future::AbortHandle; @@ -27,13 +27,13 @@ impl Web3ProxyApp { subscription_count: &'a AtomicUsize, // TODO: taking a sender for Message instead of the exact json we are planning to send feels wrong, but its easier for now response_sender: flume::Sender, - ) -> anyhow::Result<(AbortHandle, JsonRpcForwardedResponse)> { + ) -> Web3ProxyResult<(AbortHandle, JsonRpcForwardedResponse)> { // TODO: this is not efficient let request_bytes = serde_json::to_string(&request_json) - .context("finding request size")? + .web3_context("finding request size")? .len(); - let request_metadata = Arc::new(RequestMetadata::new(request_bytes).unwrap()); + let request_metadata = Arc::new(RequestMetadata::new(request_bytes)); let (subscription_abort_handle, subscription_registration) = AbortHandle::new_pair(); @@ -67,7 +67,7 @@ impl Web3ProxyApp { }; // TODO: what should the payload for RequestMetadata be? - let request_metadata = Arc::new(RequestMetadata::new(0).unwrap()); + let request_metadata = Arc::new(RequestMetadata::new(0)); // TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id let response_json = json!({ @@ -133,7 +133,7 @@ impl Web3ProxyApp { // TODO: do something with this handle? tokio::spawn(async move { while let Some(Ok(new_tx_state)) = pending_tx_receiver.next().await { - let request_metadata = Arc::new(RequestMetadata::new(0).unwrap()); + let request_metadata = Arc::new(RequestMetadata::new(0)); let new_tx = match new_tx_state { TxStatus::Pending(tx) => tx, @@ -208,7 +208,7 @@ impl Web3ProxyApp { // TODO: do something with this handle? tokio::spawn(async move { while let Some(Ok(new_tx_state)) = pending_tx_receiver.next().await { - let request_metadata = Arc::new(RequestMetadata::new(0).unwrap()); + let request_metadata = Arc::new(RequestMetadata::new(0)); let new_tx = match new_tx_state { TxStatus::Pending(tx) => tx, @@ -284,7 +284,7 @@ impl Web3ProxyApp { // TODO: do something with this handle? tokio::spawn(async move { while let Some(Ok(new_tx_state)) = pending_tx_receiver.next().await { - let request_metadata = Arc::new(RequestMetadata::new(0).unwrap()); + let request_metadata = Arc::new(RequestMetadata::new(0)); let new_tx = match new_tx_state { TxStatus::Pending(tx) => tx, @@ -341,7 +341,7 @@ impl Web3ProxyApp { ); }); } - _ => return Err(anyhow::anyhow!("unimplemented")), + _ => return Err(Web3ProxyError::NotImplemented), } // TODO: do something with subscription_join_handle? diff --git a/web3_proxy/src/block_number.rs b/web3_proxy/src/block_number.rs index 82741d49..a32392e1 100644 --- a/web3_proxy/src/block_number.rs +++ b/web3_proxy/src/block_number.rs @@ -1,4 +1,5 @@ //! Helper functions for turning ether's BlockNumber into numbers and updating incoming queries to match. +use crate::frontend::errors::{Web3ProxyError, Web3ProxyResult}; use anyhow::Context; use ethers::{ prelude::{BlockNumber, U64}, @@ -126,7 +127,7 @@ pub async fn block_needed( params: Option<&mut serde_json::Value>, head_block_num: U64, rpcs: &Web3Rpcs, -) -> anyhow::Result { +) -> Web3ProxyResult { // some requests have potentially very large responses // TODO: only skip caching if the response actually is large if method.starts_with("trace_") || method == "debug_traceTransaction" { @@ -181,7 +182,7 @@ pub async fn block_needed( .get_mut(0) .ok_or_else(|| anyhow::anyhow!("invalid format. no params"))? .as_object_mut() - .ok_or_else(|| anyhow::anyhow!("invalid format"))?; + .ok_or_else(|| Web3ProxyError::BadRequest("invalid format".to_string()))?; if obj.contains_key("blockHash") { return Ok(BlockNeeded::CacheSuccessForever); diff --git a/web3_proxy/src/frontend/admin.rs b/web3_proxy/src/frontend/admin.rs index 7af212f9..72a49b67 100644 --- a/web3_proxy/src/frontend/admin.rs +++ b/web3_proxy/src/frontend/admin.rs @@ -1,13 +1,12 @@ //! Handle admin helper logic use super::authorization::login_is_authorized; -use super::errors::FrontendResult; +use super::errors::Web3ProxyResponse; use crate::admin_queries::query_admin_modify_usertier; use crate::app::Web3ProxyApp; -use crate::frontend::errors::FrontendErrorResponse; +use crate::frontend::errors::{Web3ProxyError, Web3ProxyErrorContext}; use crate::user_token::UserBearerToken; use crate::PostLogin; -use anyhow::Context; use axum::{ extract::{Path, Query}, headers::{authorization::Bearer, Authorization}, @@ -43,7 +42,7 @@ pub async fn admin_change_user_roles( Extension(app): Extension>, bearer: Option>>, Query(params): Query>, -) -> FrontendResult { +) -> Web3ProxyResponse { let response = query_admin_modify_usertier(&app, bearer, ¶ms).await?; Ok(response) @@ -58,7 +57,7 @@ pub async fn admin_login_get( Extension(app): Extension>, InsecureClientIp(ip): InsecureClientIp, Path(mut params): Path>, -) -> FrontendResult { +) -> Web3ProxyResponse { // First check if the login is authorized login_is_authorized(&app, ip).await?; @@ -85,30 +84,22 @@ pub async fn admin_login_get( let admin_address: Address = params .get("admin_address") .ok_or_else(|| { - FrontendErrorResponse::BadRequest( - "Unable to find admin_address key in request".to_string(), - ) + Web3ProxyError::BadRequest("Unable to find admin_address key in request".to_string()) })? .parse::
() .map_err(|_err| { - FrontendErrorResponse::BadRequest( - "Unable to parse admin_address as an Address".to_string(), - ) + Web3ProxyError::BadRequest("Unable to parse admin_address as an Address".to_string()) })?; // Fetch the user_address parameter from the login string ... (as who we want to be logging in ...) let user_address: Vec = params .get("user_address") .ok_or_else(|| { - FrontendErrorResponse::BadRequest( - "Unable to find user_address key in request".to_string(), - ) + Web3ProxyError::BadRequest("Unable to find user_address key in request".to_string()) })? .parse::
() .map_err(|_err| { - FrontendErrorResponse::BadRequest( - "Unable to parse user_address as an Address".to_string(), - ) + Web3ProxyError::BadRequest("Unable to parse user_address as an Address".to_string()) })? .to_fixed_bytes() .into(); @@ -147,10 +138,10 @@ pub async fn admin_login_get( let admin_address: Vec = admin_address.to_fixed_bytes().into(); - let db_conn = app.db_conn().context("login requires a database")?; + let db_conn = app.db_conn().web3_context("login requires a database")?; let db_replica = app .db_replica() - .context("login requires a replica database")?; + .web3_context("login requires a replica database")?; // Get the user that we want to imitate from the read-only database (their id ...) // TODO: Only get the id, not the whole user object ... @@ -158,7 +149,7 @@ pub async fn admin_login_get( .filter(user::Column::Address.eq(user_address)) .one(db_replica.conn()) .await? - .ok_or(FrontendErrorResponse::BadRequest( + .ok_or(Web3ProxyError::BadRequest( "Could not find user in db".to_string(), ))?; @@ -169,7 +160,7 @@ pub async fn admin_login_get( .filter(user::Column::Address.eq(admin_address)) .one(db_replica.conn()) .await? - .ok_or(FrontendErrorResponse::BadRequest( + .ok_or(Web3ProxyError::BadRequest( "Could not find admin in db".to_string(), ))?; @@ -184,7 +175,7 @@ pub async fn admin_login_get( trail .save(&db_conn) .await - .context("saving user's pending_login")?; + .web3_context("saving user's pending_login")?; // Can there be two login-sessions at the same time? // I supposed if the user logs in, the admin would be logged out and vice versa @@ -209,7 +200,7 @@ pub async fn admin_login_get( user_pending_login .save(&db_conn) .await - .context("saving an admin trail pre login")?; + .web3_context("saving an admin trail pre login")?; // there are multiple ways to sign messages and not all wallets support them // TODO: default message eip from config? @@ -223,7 +214,7 @@ pub async fn admin_login_get( "eip4361" => message.to_string(), _ => { // TODO: custom error that is handled a 401 - return Err(anyhow::anyhow!("invalid message eip given").into()); + return Err(Web3ProxyError::InvalidEip); } }; @@ -238,14 +229,14 @@ pub async fn admin_login_post( Extension(app): Extension>, InsecureClientIp(ip): InsecureClientIp, Json(payload): Json, -) -> FrontendResult { +) -> Web3ProxyResponse { login_is_authorized(&app, ip).await?; // Check for the signed bytes .. // TODO: this seems too verbose. how can we simply convert a String into a [u8; 65] - let their_sig_bytes = Bytes::from_str(&payload.sig).context("parsing sig")?; + let their_sig_bytes = Bytes::from_str(&payload.sig).web3_context("parsing sig")?; if their_sig_bytes.len() != 65 { - return Err(anyhow::anyhow!("checking signature length").into()); + return Err(Web3ProxyError::InvalidSignatureLength); } let mut their_sig: [u8; 65] = [0; 65]; for x in 0..65 { @@ -255,17 +246,18 @@ pub async fn admin_login_post( // we can't trust that they didn't tamper with the message in some way. like some clients return it hex encoded // TODO: checking 0x seems fragile, but I think it will be fine. siwe message text shouldn't ever start with 0x let their_msg: Message = if payload.msg.starts_with("0x") { - let their_msg_bytes = Bytes::from_str(&payload.msg).context("parsing payload message")?; + let their_msg_bytes = + Bytes::from_str(&payload.msg).web3_context("parsing payload message")?; // TODO: lossy or no? String::from_utf8_lossy(their_msg_bytes.as_ref()) .parse::() - .context("parsing hex string message")? + .web3_context("parsing hex string message")? } else { payload .msg .parse::() - .context("parsing string message")? + .web3_context("parsing string message")? }; // the only part of the message we will trust is their nonce @@ -273,7 +265,9 @@ pub async fn admin_login_post( let login_nonce = UserBearerToken::from_str(&their_msg.nonce)?; // fetch the message we gave them from our database - let db_replica = app.db_replica().context("Getting database connection")?; + let db_replica = app + .db_replica() + .web3_context("Getting database connection")?; // massage type for the db let login_nonce_uuid: Uuid = login_nonce.clone().into(); @@ -283,30 +277,30 @@ pub async fn admin_login_post( .filter(pending_login::Column::Nonce.eq(login_nonce_uuid)) .one(db_replica.conn()) .await - .context("database error while finding pending_login")? - .context("login nonce not found")?; + .web3_context("database error while finding pending_login")? + .web3_context("login nonce not found")?; let our_msg: siwe::Message = user_pending_login .message .parse() - .context("parsing siwe message")?; + .web3_context("parsing siwe message")?; // default options are fine. the message includes timestamp and domain and nonce let verify_config = VerificationOpts::default(); let db_conn = app .db_conn() - .context("deleting expired pending logins requires a db")?; + .web3_context("deleting expired pending logins requires a db")?; if let Err(err_1) = our_msg .verify(&their_sig, &verify_config) .await - .context("verifying signature against our local message") + .web3_context("verifying signature against our local message") { // verification method 1 failed. try eip191 if let Err(err_191) = our_msg .verify_eip191(&their_sig) - .context("verifying eip191 signature against our local message") + .web3_context("verifying eip191 signature against our local message") { // delete ALL expired rows. let now = Utc::now(); @@ -318,18 +312,16 @@ pub async fn admin_login_post( // TODO: emit a stat? if this is high something weird might be happening debug!("cleared expired pending_logins: {:?}", delete_result); - return Err(anyhow::anyhow!( - "both the primary and eip191 verification failed: {:#?}; {:#?}", - err_1, - err_191 - ) - .into()); + return Err(Web3ProxyError::EipVerificationFailed( + Box::new(err_1), + Box::new(err_191), + )); } } let imitating_user_id = user_pending_login .imitating_user - .context("getting address of the imitating user")?; + .web3_context("getting address of the imitating user")?; // TODO: limit columns or load whole user? // TODO: Right now this loads the whole admin. I assume we might want to load the user though (?) figure this out as we go along... @@ -337,13 +329,13 @@ pub async fn admin_login_post( .filter(user::Column::Address.eq(our_msg.address.as_ref())) .one(db_replica.conn()) .await? - .context("getting admin address")?; + .web3_context("getting admin address")?; let imitating_user = user::Entity::find() .filter(user::Column::Id.eq(imitating_user_id)) .one(db_replica.conn()) .await? - .context("admin address was not found!")?; + .web3_context("admin address was not found!")?; // Add a message that the admin has logged in // Note that the admin is trying to log in as this user @@ -357,7 +349,7 @@ pub async fn admin_login_post( trail .save(&db_conn) .await - .context("saving an admin trail post login")?; + .web3_context("saving an admin trail post login")?; // I supposed we also get the rpc_key, whatever this is used for (?). // I think the RPC key should still belong to the admin though in this case ... @@ -367,7 +359,7 @@ pub async fn admin_login_post( .filter(rpc_key::Column::UserId.eq(admin.id)) .all(db_replica.conn()) .await - .context("failed loading user's key")?; + .web3_context("failed loading user's key")?; // create a bearer token for the user. let user_bearer_token = UserBearerToken::default(); @@ -408,7 +400,7 @@ pub async fn admin_login_post( user_login .save(&db_conn) .await - .context("saving user login")?; + .web3_context("saving user login")?; if let Err(err) = user_pending_login .into_active_model() @@ -427,10 +419,12 @@ pub async fn admin_login_post( pub async fn admin_logout_post( Extension(app): Extension>, TypedHeader(Authorization(bearer)): TypedHeader>, -) -> FrontendResult { +) -> Web3ProxyResponse { let user_bearer = UserBearerToken::try_from(bearer)?; - let db_conn = app.db_conn().context("database needed for user logout")?; + let db_conn = app + .db_conn() + .web3_context("database needed for user logout")?; if let Err(err) = login::Entity::delete_many() .filter(login::Column::BearerToken.eq(user_bearer.uuid())) diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index 39577c62..39e0c9c1 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -1,11 +1,10 @@ //! Utilities for authorization of logged in and anonymous users. -use super::errors::FrontendErrorResponse; +use super::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult}; use super::rpc_proxy_ws::ProxyMode; use crate::app::{AuthorizationChecks, Web3ProxyApp, APP_USER_AGENT}; use crate::rpcs::one::Web3Rpc; use crate::user_token::UserBearerToken; -use anyhow::Context; use axum::headers::authorization::Bearer; use axum::headers::{Header, Origin, Referer, UserAgent}; use chrono::Utc; @@ -88,11 +87,11 @@ pub struct RequestMetadata { } impl RequestMetadata { - pub fn new(request_bytes: usize) -> anyhow::Result { + pub fn new(request_bytes: usize) -> Self { // TODO: how can we do this without turning it into a string first. this is going to slow us down! let request_bytes = request_bytes as u64; - let new = Self { + Self { start_instant: Instant::now(), request_bytes, archive_request: false.into(), @@ -102,9 +101,7 @@ impl RequestMetadata { response_bytes: 0.into(), response_millis: 0.into(), response_from_backup_rpc: false.into(), - }; - - Ok(new) + } } } @@ -130,7 +127,7 @@ impl Display for RpcSecretKey { } impl FromStr for RpcSecretKey { - type Err = anyhow::Error; + type Err = Web3ProxyError; fn from_str(s: &str) -> Result { if let Ok(ulid) = s.parse::() { @@ -139,7 +136,7 @@ impl FromStr for RpcSecretKey { Ok(uuid.into()) } else { // TODO: custom error type so that this shows as a 400 - Err(anyhow::anyhow!("UserKey was not a ULID or UUID")) + Err(Web3ProxyError::InvalidUserKey) } } } @@ -175,7 +172,7 @@ impl From for Uuid { } impl Authorization { - pub fn internal(db_conn: Option) -> anyhow::Result { + pub fn internal(db_conn: Option) -> Web3ProxyResult { let authorization_checks = AuthorizationChecks { // any error logs on a local (internal) query are likely problems. log them all log_revert_chance: 1.0, @@ -206,7 +203,7 @@ impl Authorization { proxy_mode: ProxyMode, referer: Option, user_agent: Option, - ) -> anyhow::Result { + ) -> Web3ProxyResult { // some origins can override max_requests_per_period for anon users let max_requests_per_period = origin .as_ref() @@ -244,13 +241,13 @@ impl Authorization { referer: Option, user_agent: Option, authorization_type: AuthorizationType, - ) -> anyhow::Result { + ) -> Web3ProxyResult { // check ip match &authorization_checks.allowed_ips { None => {} Some(allowed_ips) => { if !allowed_ips.iter().any(|x| x.contains(&ip)) { - return Err(anyhow::anyhow!("IP ({}) is not allowed!", ip)); + return Err(Web3ProxyError::IpNotAllowed(ip)); } } } @@ -259,10 +256,10 @@ impl Authorization { match (&origin, &authorization_checks.allowed_origins) { (None, None) => {} (Some(_), None) => {} - (None, Some(_)) => return Err(anyhow::anyhow!("Origin required")), + (None, Some(_)) => return Err(Web3ProxyError::OriginRequired), (Some(origin), Some(allowed_origins)) => { if !allowed_origins.contains(origin) { - return Err(anyhow::anyhow!("Origin ({}) is not allowed!", origin)); + return Err(Web3ProxyError::OriginNotAllowed(origin.clone())); } } } @@ -271,10 +268,10 @@ impl Authorization { match (&referer, &authorization_checks.allowed_referers) { (None, None) => {} (Some(_), None) => {} - (None, Some(_)) => return Err(anyhow::anyhow!("Referer required")), + (None, Some(_)) => return Err(Web3ProxyError::RefererRequired), (Some(referer), Some(allowed_referers)) => { if !allowed_referers.contains(referer) { - return Err(anyhow::anyhow!("Referer ({:?}) is not allowed!", referer)); + return Err(Web3ProxyError::RefererNotAllowed(referer.clone())); } } } @@ -283,13 +280,10 @@ impl Authorization { match (&user_agent, &authorization_checks.allowed_user_agents) { (None, None) => {} (Some(_), None) => {} - (None, Some(_)) => return Err(anyhow::anyhow!("User agent required")), + (None, Some(_)) => return Err(Web3ProxyError::UserAgentRequired), (Some(user_agent), Some(allowed_user_agents)) => { if !allowed_user_agents.contains(user_agent) { - return Err(anyhow::anyhow!( - "User agent ({}) is not allowed!", - user_agent - )); + return Err(Web3ProxyError::UserAgentNotAllowed(user_agent.clone())); } } } @@ -308,14 +302,11 @@ impl Authorization { /// rate limit logins only by ip. /// we want all origins and referers and user agents to count together -pub async fn login_is_authorized( - app: &Web3ProxyApp, - ip: IpAddr, -) -> Result { +pub async fn login_is_authorized(app: &Web3ProxyApp, ip: IpAddr) -> Web3ProxyResult { let authorization = match app.rate_limit_login(ip, ProxyMode::Best).await? { RateLimitResult::Allowed(authorization, None) => authorization, RateLimitResult::RateLimited(authorization, retry_at) => { - return Err(FrontendErrorResponse::RateLimited(authorization, retry_at)); + return Err(Web3ProxyError::RateLimited(authorization, retry_at)); } // TODO: don't panic. give the user an error x => unimplemented!("rate_limit_login shouldn't ever see these: {:?}", x), @@ -330,7 +321,7 @@ pub async fn ip_is_authorized( ip: IpAddr, origin: Option, proxy_mode: ProxyMode, -) -> Result<(Authorization, Option), FrontendErrorResponse> { +) -> Web3ProxyResult<(Authorization, Option)> { // TODO: i think we could write an `impl From` for this // TODO: move this to an AuthorizedUser extrator let (authorization, semaphore) = match app @@ -345,7 +336,7 @@ pub async fn ip_is_authorized( RateLimitResult::Allowed(authorization, semaphore) => (authorization, semaphore), RateLimitResult::RateLimited(authorization, retry_at) => { // TODO: in the background, emit a stat (maybe simplest to use a channel?) - return Err(FrontendErrorResponse::RateLimited(authorization, retry_at)); + return Err(Web3ProxyError::RateLimited(authorization, retry_at)); } // TODO: don't panic. give the user an error x => unimplemented!("rate_limit_by_ip shouldn't ever see these: {:?}", x), @@ -375,7 +366,7 @@ pub async fn ip_is_authorized( .await?; }; - Ok::<_, anyhow::Error>(()) + Ok::<_, Web3ProxyError>(()) } .map_err(|err| { warn!("background update of recent_users:ip failed: {}", err); @@ -389,7 +380,7 @@ pub async fn ip_is_authorized( Ok((authorization, semaphore)) } -/// like app.rate_limit_by_rpc_key but converts to a FrontendErrorResponse; +/// like app.rate_limit_by_rpc_key but converts to a Web3ProxyError; pub async fn key_is_authorized( app: &Arc, rpc_key: RpcSecretKey, @@ -398,7 +389,7 @@ pub async fn key_is_authorized( proxy_mode: ProxyMode, referer: Option, user_agent: Option, -) -> Result<(Authorization, Option), FrontendErrorResponse> { +) -> Web3ProxyResult<(Authorization, Option)> { // check the rate limits. error if over the limit // TODO: i think this should be in an "impl From" or "impl Into" let (authorization, semaphore) = match app @@ -407,9 +398,9 @@ pub async fn key_is_authorized( { RateLimitResult::Allowed(authorization, semaphore) => (authorization, semaphore), RateLimitResult::RateLimited(authorization, retry_at) => { - return Err(FrontendErrorResponse::RateLimited(authorization, retry_at)); + return Err(Web3ProxyError::RateLimited(authorization, retry_at)); } - RateLimitResult::UnknownKey => return Err(FrontendErrorResponse::UnknownKey), + RateLimitResult::UnknownKey => return Err(Web3ProxyError::UnknownKey), }; // TODO: DRY and maybe optimize the hashing @@ -438,7 +429,7 @@ pub async fn key_is_authorized( .await?; } - Ok::<_, anyhow::Error>(()) + Ok::<_, Web3ProxyError>(()) } .map_err(|err| { warn!("background update of recent_users:id failed: {}", err); @@ -454,7 +445,7 @@ pub async fn key_is_authorized( impl Web3ProxyApp { /// Limit the number of concurrent requests from the given ip address. - pub async fn ip_semaphore(&self, ip: IpAddr) -> anyhow::Result> { + pub async fn ip_semaphore(&self, ip: IpAddr) -> Web3ProxyResult> { if let Some(max_concurrent_requests) = self.config.public_max_concurrent_requests { let semaphore = self .ip_semaphores @@ -482,12 +473,13 @@ impl Web3ProxyApp { pub async fn registered_user_semaphore( &self, authorization_checks: &AuthorizationChecks, - ) -> anyhow::Result> { + ) -> Web3ProxyResult> { if let Some(max_concurrent_requests) = authorization_checks.max_concurrent_requests { let user_id = authorization_checks .user_id .try_into() - .context("user ids should always be non-zero")?; + .or(Err(Web3ProxyError::UserIdZero)) + .web3_context("user ids should always be non-zero")?; let semaphore = self .registered_user_semaphores @@ -517,7 +509,7 @@ impl Web3ProxyApp { pub async fn bearer_is_authorized( &self, bearer: Bearer, - ) -> Result<(user::Model, OwnedSemaphorePermit), FrontendErrorResponse> { + ) -> Web3ProxyResult<(user::Model, OwnedSemaphorePermit)> { // get the user id for this bearer token let user_bearer_token = UserBearerToken::try_from(bearer)?; @@ -535,7 +527,7 @@ impl Web3ProxyApp { // get the attached address from the database for the given auth_token. let db_replica = self .db_replica() - .context("checking if bearer token is authorized")?; + .web3_context("checking if bearer token is authorized")?; let user_bearer_uuid: Uuid = user_bearer_token.into(); @@ -544,8 +536,8 @@ impl Web3ProxyApp { .filter(login::Column::BearerToken.eq(user_bearer_uuid)) .one(db_replica.conn()) .await - .context("fetching user from db by bearer token")? - .context("unknown bearer token")?; + .web3_context("fetching user from db by bearer token")? + .web3_context("unknown bearer token")?; Ok((user, semaphore_permit)) } @@ -554,7 +546,7 @@ impl Web3ProxyApp { &self, ip: IpAddr, proxy_mode: ProxyMode, - ) -> anyhow::Result { + ) -> Web3ProxyResult { // TODO: dry this up with rate_limit_by_rpc_key? // we don't care about user agent or origin or referer @@ -611,7 +603,7 @@ impl Web3ProxyApp { ip: IpAddr, origin: Option, proxy_mode: ProxyMode, - ) -> anyhow::Result { + ) -> Web3ProxyResult { // ip rate limits don't check referer or user agent // they do check origin because we can override rate limits for some origins let authorization = Authorization::external( @@ -670,13 +662,15 @@ impl Web3ProxyApp { &self, proxy_mode: ProxyMode, rpc_secret_key: RpcSecretKey, - ) -> anyhow::Result { - let authorization_checks: Result<_, Arc> = self + ) -> Web3ProxyResult { + let authorization_checks: Result<_, Arc> = self .rpc_secret_key_cache .try_get_with(rpc_secret_key.into(), async move { // trace!(?rpc_secret_key, "user cache miss"); - let db_replica = self.db_replica().context("Getting database connection")?; + let db_replica = self + .db_replica() + .web3_context("Getting database connection")?; // TODO: join the user table to this to return the User? we don't always need it // TODO: join on secondary users @@ -732,7 +726,11 @@ impl Web3ProxyApp { if let Some(allowed_referers) = rpc_key_model.allowed_referers { let x = allowed_referers .split(',') - .map(|x| x.trim().parse::()) + .map(|x| { + x.trim() + .parse::() + .or(Err(Web3ProxyError::InvalidReferer)) + }) .collect::, _>>()?; Some(x) @@ -744,7 +742,11 @@ impl Web3ProxyApp { if let Some(allowed_user_agents) = rpc_key_model.allowed_user_agents { let x: Result, _> = allowed_user_agents .split(',') - .map(|x| x.trim().parse::()) + .map(|x| { + x.trim() + .parse::() + .or(Err(Web3ProxyError::InvalidUserAgent)) + }) .collect(); Some(x?) @@ -776,8 +778,7 @@ impl Web3ProxyApp { }) .await; - // TODO: what's the best way to handle this arc? try_unwrap will not work - authorization_checks.map_err(|err| anyhow::anyhow!(err)) + authorization_checks.map_err(Web3ProxyError::Arc) } /// Authorized the ip/origin/referer/useragent and rate limit and concurrency @@ -789,7 +790,7 @@ impl Web3ProxyApp { referer: Option, rpc_key: RpcSecretKey, user_agent: Option, - ) -> anyhow::Result { + ) -> Web3ProxyResult { let authorization_checks = self.authorization_checks(proxy_mode, rpc_key).await?; // if no rpc_key_id matching the given rpc was found, then we can't rate limit by key @@ -869,7 +870,7 @@ impl Authorization { pub async fn check_again( &self, app: &Arc, - ) -> Result<(Arc, Option), FrontendErrorResponse> { + ) -> Web3ProxyResult<(Arc, Option)> { // TODO: we could probably do this without clones. but this is easy let (a, s) = if let Some(rpc_secret_key) = self.checks.rpc_secret_key { key_is_authorized( diff --git a/web3_proxy/src/frontend/errors.rs b/web3_proxy/src/frontend/errors.rs index cf791e88..3d6e8164 100644 --- a/web3_proxy/src/frontend/errors.rs +++ b/web3_proxy/src/frontend/errors.rs @@ -2,51 +2,130 @@ use super::authorization::Authorization; use crate::jsonrpc::JsonRpcForwardedResponse; + +use std::net::IpAddr; +use std::sync::Arc; + use axum::{ headers, http::StatusCode, response::{IntoResponse, Response}, Json, }; -use derive_more::From; +use derive_more::{Display, Error, From}; use http::header::InvalidHeaderValue; use ipnet::AddrParseError; -use log::{debug, error, trace, warn}; +use log::{debug, error, info, trace, warn}; use migration::sea_orm::DbErr; use redis_rate_limiter::redis::RedisError; use reqwest::header::ToStrError; use tokio::{sync::AcquireError, task::JoinError, time::Instant}; +pub type Web3ProxyResult = Result; // TODO: take "IntoResponse" instead of Response? -pub type FrontendResult = Result; +pub type Web3ProxyResponse = Web3ProxyResult; // TODO: -#[derive(Debug, From)] -pub enum FrontendErrorResponse { +#[derive(Debug, Display, Error, From)] +pub enum Web3ProxyError { AccessDenied, + #[error(ignore)] Anyhow(anyhow::Error), + Arc(Arc), + #[error(ignore)] + #[from(ignore)] BadRequest(String), - SemaphoreAcquireError(AcquireError), + BadRouting, Database(DbErr), + #[display(fmt = "{:#?}, {:#?}", _0, _1)] + EipVerificationFailed(Box, Box), + EthersHttpClientError(ethers::prelude::HttpClientError), + EthersProviderError(ethers::prelude::ProviderError), + EthersWsClientError(ethers::prelude::WsClientError), + FlumeRecvError(flume::RecvError), + GasEstimateNotU256, Headers(headers::Error), HeaderToString(ToStrError), InfluxDb2RequestError(influxdb2::RequestError), + #[display(fmt = "{} > {}", min, max)] + #[from(ignore)] + InvalidBlockBounds { + min: u64, + max: u64, + }, InvalidHeaderValue(InvalidHeaderValue), + InvalidEip, + InvalidInviteCode, + InvalidReferer, + InvalidSignatureLength, + InvalidUserAgent, + InvalidUserKey, IpAddrParse(AddrParseError), + #[error(ignore)] + #[from(ignore)] + IpNotAllowed(IpAddr), JoinError(JoinError), + #[display(fmt = "{:?}", _0)] + #[error(ignore)] + JsonRpc(crate::jsonrpc::JsonRpcErrorData), MsgPackEncode(rmp_serde::encode::Error), + NoBlockNumberOrHash, + NoBlocksKnown, + NoConsensusHeadBlock, + NoHandleReady, + NoServersSynced, + #[display(fmt = "{}/{}", num_known, min_head_rpcs)] + #[from(ignore)] + NotEnoughRpcs { + num_known: usize, + min_head_rpcs: usize, + }, NotFound, + NotImplemented, + OriginRequired, + #[error(ignore)] + #[from(ignore)] + OriginNotAllowed(headers::Origin), + #[display(fmt = "{:?}", _0)] + #[error(ignore)] + ParseBytesError(Option), + ParseMsgError(siwe::ParseError), + ParseAddressError, + #[display(fmt = "{:?}, {:?}", _0, _1)] RateLimited(Authorization, Option), Redis(RedisError), + RefererRequired, + #[display(fmt = "{:?}", _0)] + #[error(ignore)] + #[from(ignore)] + RefererNotAllowed(headers::Referer), + SemaphoreAcquireError(AcquireError), + SendAppStatError(flume::SendError), + SerdeJson(serde_json::Error), /// simple way to return an error message to the user and an anyhow to our logs + #[display(fmt = "{}, {}, {:?}", _0, _1, _2)] StatusCode(StatusCode, String, Option), /// TODO: what should be attached to the timout? - Timeout(tokio::time::error::Elapsed), + #[display(fmt = "{:?}", _0)] + #[error(ignore)] + Timeout(Option), UlidDecode(ulid::DecodeError), + UnknownBlockNumber, UnknownKey, + UserAgentRequired, + #[error(ignore)] + UserAgentNotAllowed(headers::UserAgent), + UserIdZero, + VerificationError(siwe::VerificationError), + WatchRecvError(tokio::sync::watch::error::RecvError), + WatchSendError, + WebsocketOnly, + #[display(fmt = "{:?}, {}", _0, _1)] + #[error(ignore)] + WithContext(Option>, String), } -impl FrontendErrorResponse { +impl Web3ProxyError { pub fn into_response_parts(self) -> (StatusCode, JsonRpcForwardedResponse) { match self { Self::AccessDenied => { @@ -85,6 +164,17 @@ impl FrontendErrorResponse { ), ) } + Self::BadRouting => { + error!("BadRouting"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + JsonRpcForwardedResponse::from_str( + "bad routing", + Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), + None, + ), + ) + } Self::Database(err) => { error!("database err={:?}", err); ( @@ -96,6 +186,78 @@ impl FrontendErrorResponse { ), ) } + Self::EipVerificationFailed(err_1, err_191) => { + info!( + "EipVerificationFailed err_1={:#?} err2={:#?}", + err_1, err_191 + ); + ( + StatusCode::UNAUTHORIZED, + JsonRpcForwardedResponse::from_string( + format!( + "both the primary and eip191 verification failed: {:#?}; {:#?}", + err_1, err_191 + ), + Some(StatusCode::UNAUTHORIZED.as_u16().into()), + None, + ), + ) + } + Self::EthersHttpClientError(err) => { + warn!("EthersHttpClientError err={:?}", err); + ( + StatusCode::INTERNAL_SERVER_ERROR, + JsonRpcForwardedResponse::from_str( + "ether http client error", + Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), + None, + ), + ) + } + Self::EthersProviderError(err) => { + warn!("EthersProviderError err={:?}", err); + ( + StatusCode::INTERNAL_SERVER_ERROR, + JsonRpcForwardedResponse::from_str( + "ether provider error", + Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), + None, + ), + ) + } + Self::EthersWsClientError(err) => { + warn!("EthersWsClientError err={:?}", err); + ( + StatusCode::INTERNAL_SERVER_ERROR, + JsonRpcForwardedResponse::from_str( + "ether ws client error", + Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), + None, + ), + ) + } + Self::FlumeRecvError(err) => { + warn!("FlumeRecvError err={:#?}", err); + ( + StatusCode::INTERNAL_SERVER_ERROR, + JsonRpcForwardedResponse::from_str( + "flume recv error!", + Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), + None, + ), + ) + } + Self::GasEstimateNotU256 => { + warn!("GasEstimateNotU256"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + JsonRpcForwardedResponse::from_str( + "gas estimate result is not an U256", + Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), + None, + ), + ) + } Self::Headers(err) => { warn!("HeadersError {:?}", err); ( @@ -119,6 +281,20 @@ impl FrontendErrorResponse { ), ) } + Self::InvalidBlockBounds { min, max } => { + debug!("InvalidBlockBounds min={} max={}", min, max); + ( + StatusCode::BAD_REQUEST, + JsonRpcForwardedResponse::from_string( + format!( + "Invalid blocks bounds requested. min ({}) > max ({})", + min, max + ), + Some(StatusCode::BAD_REQUEST.as_u16().into()), + None, + ), + ) + } Self::IpAddrParse(err) => { warn!("IpAddrParse err={:?}", err); ( @@ -130,6 +306,17 @@ impl FrontendErrorResponse { ), ) } + Self::IpNotAllowed(ip) => { + warn!("IpNotAllowed ip={})", ip); + ( + StatusCode::FORBIDDEN, + JsonRpcForwardedResponse::from_string( + format!("IP ({}) is not allowed!", ip), + Some(StatusCode::FORBIDDEN.as_u16().into()), + None, + ), + ) + } Self::InvalidHeaderValue(err) => { warn!("InvalidHeaderValue err={:?}", err); ( @@ -141,6 +328,72 @@ impl FrontendErrorResponse { ), ) } + Self::InvalidEip => { + warn!("InvalidEip"); + ( + StatusCode::UNAUTHORIZED, + JsonRpcForwardedResponse::from_str( + "invalid message eip given", + Some(StatusCode::UNAUTHORIZED.as_u16().into()), + None, + ), + ) + } + Self::InvalidInviteCode => { + warn!("InvalidInviteCode"); + ( + StatusCode::UNAUTHORIZED, + JsonRpcForwardedResponse::from_str( + "invalid invite code", + Some(StatusCode::UNAUTHORIZED.as_u16().into()), + None, + ), + ) + } + Self::InvalidReferer => { + warn!("InvalidReferer"); + ( + StatusCode::BAD_REQUEST, + JsonRpcForwardedResponse::from_str( + "invalid referer!", + Some(StatusCode::BAD_REQUEST.as_u16().into()), + None, + ), + ) + } + Self::InvalidSignatureLength => { + warn!("InvalidSignatureLength"); + ( + StatusCode::BAD_REQUEST, + JsonRpcForwardedResponse::from_str( + "invalid signature length", + Some(StatusCode::BAD_REQUEST.as_u16().into()), + None, + ), + ) + } + Self::InvalidUserAgent => { + warn!("InvalidUserAgent"); + ( + StatusCode::BAD_REQUEST, + JsonRpcForwardedResponse::from_str( + "invalid user agent!", + Some(StatusCode::BAD_REQUEST.as_u16().into()), + None, + ), + ) + } + Self::InvalidUserKey => { + warn!("InvalidUserKey"); + ( + StatusCode::BAD_REQUEST, + JsonRpcForwardedResponse::from_str( + "UserKey was not a ULID or UUID", + Some(StatusCode::BAD_REQUEST.as_u16().into()), + None, + ), + ) + } Self::JoinError(err) => { let code = if err.is_cancelled() { trace!("JoinError. likely shutting down. err={:?}", err); @@ -160,6 +413,17 @@ impl FrontendErrorResponse { ), ) } + Self::JsonRpc(err) => { + debug!("JsonRpc err={:?}", err); + ( + StatusCode::BAD_REQUEST, + JsonRpcForwardedResponse::from_str( + "json rpc error!", + Some(StatusCode::BAD_REQUEST.as_u16().into()), + None, + ), + ) + } Self::MsgPackEncode(err) => { debug!("MsgPackEncode Error: {}", err); ( @@ -171,6 +435,75 @@ impl FrontendErrorResponse { ), ) } + Self::NoBlockNumberOrHash => { + warn!("NoBlockNumberOrHash"); + ( + StatusCode::BAD_REQUEST, + JsonRpcForwardedResponse::from_str( + "Blocks here must have a number or hash", + Some(StatusCode::BAD_REQUEST.as_u16().into()), + None, + ), + ) + } + Self::NoBlocksKnown => { + error!("NoBlocksKnown"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + JsonRpcForwardedResponse::from_str( + "no blocks known", + Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), + None, + ), + ) + } + Self::NoConsensusHeadBlock => { + error!("NoConsensusHeadBlock"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + JsonRpcForwardedResponse::from_str( + "no consensus head block", + Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), + None, + ), + ) + } + Self::NoHandleReady => { + error!("NoHandleReady"); + ( + StatusCode::BAD_GATEWAY, + JsonRpcForwardedResponse::from_str( + "unable to retry for request handle", + Some(StatusCode::BAD_GATEWAY.as_u16().into()), + None, + ), + ) + } + Self::NoServersSynced => { + warn!("NoServersSynced"); + ( + StatusCode::BAD_GATEWAY, + JsonRpcForwardedResponse::from_str( + "no servers synced", + Some(StatusCode::BAD_GATEWAY.as_u16().into()), + None, + ), + ) + } + Self::NotEnoughRpcs { + num_known, + min_head_rpcs, + } => { + error!("NotEnoughRpcs {}/{}", num_known, min_head_rpcs); + ( + StatusCode::BAD_GATEWAY, + JsonRpcForwardedResponse::from_string( + format!("not enough rpcs connected {}/{}", num_known, min_head_rpcs), + Some(StatusCode::BAD_GATEWAY.as_u16().into()), + None, + ), + ) + } Self::NotFound => { // TODO: emit a stat? // TODO: instead of an error, show a normal html page for 404 @@ -183,6 +516,72 @@ impl FrontendErrorResponse { ), ) } + Self::NotImplemented => { + trace!("NotImplemented"); + ( + StatusCode::NOT_IMPLEMENTED, + JsonRpcForwardedResponse::from_str( + "work in progress", + Some(StatusCode::NOT_IMPLEMENTED.as_u16().into()), + None, + ), + ) + } + Self::OriginRequired => { + warn!("OriginRequired"); + ( + StatusCode::BAD_REQUEST, + JsonRpcForwardedResponse::from_str( + "Origin required", + Some(StatusCode::BAD_REQUEST.as_u16().into()), + None, + ), + ) + } + Self::OriginNotAllowed(origin) => { + warn!("OriginNotAllowed origin={}", origin); + ( + StatusCode::FORBIDDEN, + JsonRpcForwardedResponse::from_string( + format!("Origin ({}) is not allowed!", origin), + Some(StatusCode::FORBIDDEN.as_u16().into()), + None, + ), + ) + } + Self::ParseBytesError(err) => { + warn!("ParseBytesError err={:?}", err); + ( + StatusCode::BAD_REQUEST, + JsonRpcForwardedResponse::from_str( + "parse bytes error!", + Some(StatusCode::BAD_REQUEST.as_u16().into()), + None, + ), + ) + } + Self::ParseMsgError(err) => { + warn!("ParseMsgError err={:?}", err); + ( + StatusCode::BAD_REQUEST, + JsonRpcForwardedResponse::from_str( + "parse message error!", + Some(StatusCode::BAD_REQUEST.as_u16().into()), + None, + ), + ) + } + Self::ParseAddressError => { + warn!("ParseAddressError"); + ( + StatusCode::UNAUTHORIZED, + JsonRpcForwardedResponse::from_str( + "unable to parse address", + Some(StatusCode::BAD_REQUEST.as_u16().into()), + None, + ), + ) + } // TODO: this should actually by the id of the key. multiple users might control one key Self::RateLimited(authorization, retry_at) => { // TODO: emit a stat @@ -226,6 +625,33 @@ impl FrontendErrorResponse { ), ) } + Self::RefererRequired => { + warn!("referer required"); + ( + StatusCode::BAD_REQUEST, + JsonRpcForwardedResponse::from_str( + "Referer required", + Some(StatusCode::BAD_REQUEST.as_u16().into()), + None, + ), + ) + } + Self::RefererNotAllowed(referer) => { + warn!("referer not allowed referer={:?}", referer); + ( + StatusCode::FORBIDDEN, + JsonRpcForwardedResponse::from_string( + format!("Referer ({:?}) is not allowed", referer), + Some(StatusCode::FORBIDDEN.as_u16().into()), + None, + ), + ) + } + Self::Arc(err) => match Arc::try_unwrap(err) { + Ok(err) => err, + Err(err) => Self::Anyhow(anyhow::anyhow!("{}", err)), + } + .into_response_parts(), Self::SemaphoreAcquireError(err) => { warn!("semaphore acquire err={:?}", err); ( @@ -238,6 +664,28 @@ impl FrontendErrorResponse { ), ) } + Self::SendAppStatError(err) => { + error!("SendAppStatError err={:?}", err); + ( + StatusCode::INTERNAL_SERVER_ERROR, + JsonRpcForwardedResponse::from_str( + "error stat_sender sending response_stat", + Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), + None, + ), + ) + } + Self::SerdeJson(err) => { + warn!("serde json err={:?}", err); + ( + StatusCode::BAD_REQUEST, + JsonRpcForwardedResponse::from_str( + "de/serialization error!", + Some(StatusCode::BAD_REQUEST.as_u16().into()), + None, + ), + ) + } Self::StatusCode(status_code, err_msg, err) => { // different status codes should get different error levels. 500s should warn. 400s should stat let code = status_code.as_u16(); @@ -283,6 +731,17 @@ impl FrontendErrorResponse { ), ) } + Self::UnknownBlockNumber => { + error!("UnknownBlockNumber"); + ( + StatusCode::BAD_GATEWAY, + JsonRpcForwardedResponse::from_str( + "no servers synced. unknown eth_blockNumber", + Some(StatusCode::BAD_GATEWAY.as_u16().into()), + None, + ), + ) + } // TODO: stat? Self::UnknownKey => ( StatusCode::UNAUTHORIZED, @@ -292,11 +751,114 @@ impl FrontendErrorResponse { None, ), ), + Self::UserAgentRequired => { + warn!("UserAgentRequired"); + ( + StatusCode::BAD_REQUEST, + JsonRpcForwardedResponse::from_str( + "User agent required", + Some(StatusCode::BAD_REQUEST.as_u16().into()), + None, + ), + ) + } + Self::UserAgentNotAllowed(ua) => { + warn!("UserAgentNotAllowed ua={}", ua); + ( + StatusCode::FORBIDDEN, + JsonRpcForwardedResponse::from_string( + format!("User agent ({}) is not allowed!", ua), + Some(StatusCode::FORBIDDEN.as_u16().into()), + None, + ), + ) + } + Self::UserIdZero => { + warn!("UserIdZero"); + ( + StatusCode::BAD_REQUEST, + JsonRpcForwardedResponse::from_str( + "user ids should always be non-zero", + Some(StatusCode::BAD_REQUEST.as_u16().into()), + None, + ), + ) + } + Self::VerificationError(err) => { + warn!("VerificationError err={:?}", err); + ( + StatusCode::BAD_REQUEST, + JsonRpcForwardedResponse::from_str( + "verification error!", + Some(StatusCode::BAD_REQUEST.as_u16().into()), + None, + ), + ) + } + Self::WatchRecvError(err) => { + error!("WatchRecvError err={:?}", err); + ( + StatusCode::INTERNAL_SERVER_ERROR, + JsonRpcForwardedResponse::from_str( + "watch recv error!", + Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), + None, + ), + ) + } + Self::WatchSendError => { + error!("WatchSendError"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + JsonRpcForwardedResponse::from_str( + "watch send error!", + Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), + None, + ), + ) + } + Self::WebsocketOnly => { + warn!("WebsocketOnly"); + ( + StatusCode::BAD_REQUEST, + JsonRpcForwardedResponse::from_str( + "redirect_public_url not set. only websockets work here", + Some(StatusCode::BAD_REQUEST.as_u16().into()), + None, + ), + ) + } + Self::WithContext(err, msg) => { + info!("in context: {}", msg); + match err { + Some(err) => err.into_response_parts(), + None => ( + StatusCode::INTERNAL_SERVER_ERROR, + JsonRpcForwardedResponse::from_string( + msg, + Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), + None, + ), + ), + } + } } } } -impl IntoResponse for FrontendErrorResponse { +impl From for Web3ProxyError { + fn from(err: ethers::types::ParseBytesError) -> Self { + Self::ParseBytesError(Some(err)) + } +} + +impl From for Web3ProxyError { + fn from(err: tokio::time::error::Elapsed) -> Self { + Self::Timeout(Some(err)) + } +} + +impl IntoResponse for Web3ProxyError { fn into_response(self) -> Response { // TODO: include the request id in these so that users can give us something that will point to logs // TODO: status code is in the jsonrpc response and is also the first item in the tuple. DRY @@ -307,5 +869,24 @@ impl IntoResponse for FrontendErrorResponse { } pub async fn handler_404() -> Response { - FrontendErrorResponse::NotFound.into_response() + Web3ProxyError::NotFound.into_response() +} + +pub trait Web3ProxyErrorContext { + fn web3_context>(self, msg: S) -> Result; +} + +impl Web3ProxyErrorContext for Option { + fn web3_context>(self, msg: S) -> Result { + self.ok_or(Web3ProxyError::WithContext(None, msg.into())) + } +} + +impl Web3ProxyErrorContext for Result +where + E: Into, +{ + fn web3_context>(self, msg: S) -> Result { + self.map_err(|err| Web3ProxyError::WithContext(Some(Box::new(err.into())), msg.into())) + } } diff --git a/web3_proxy/src/frontend/rpc_proxy_http.rs b/web3_proxy/src/frontend/rpc_proxy_http.rs index d27f1533..2d938adb 100644 --- a/web3_proxy/src/frontend/rpc_proxy_http.rs +++ b/web3_proxy/src/frontend/rpc_proxy_http.rs @@ -1,7 +1,7 @@ //! Take a user's HTTP JSON-RPC requests and either respond from local data or proxy the request to a backend rpc server. use super::authorization::{ip_is_authorized, key_is_authorized}; -use super::errors::FrontendResult; +use super::errors::Web3ProxyResponse; use super::rpc_proxy_ws::ProxyMode; use crate::{app::Web3ProxyApp, jsonrpc::JsonRpcRequestEnum}; use axum::extract::Path; @@ -22,7 +22,7 @@ pub async fn proxy_web3_rpc( ip: InsecureClientIp, origin: Option>, Json(payload): Json, -) -> FrontendResult { +) -> Web3ProxyResponse { _proxy_web3_rpc(app, ip, origin, payload, ProxyMode::Best).await } @@ -32,7 +32,7 @@ pub async fn fastest_proxy_web3_rpc( ip: InsecureClientIp, origin: Option>, Json(payload): Json, -) -> FrontendResult { +) -> Web3ProxyResponse { // TODO: read the fastest number from params // TODO: check that the app allows this without authentication _proxy_web3_rpc(app, ip, origin, payload, ProxyMode::Fastest(0)).await @@ -44,7 +44,7 @@ pub async fn versus_proxy_web3_rpc( ip: InsecureClientIp, origin: Option>, Json(payload): Json, -) -> FrontendResult { +) -> Web3ProxyResponse { _proxy_web3_rpc(app, ip, origin, payload, ProxyMode::Versus).await } @@ -54,7 +54,7 @@ async fn _proxy_web3_rpc( origin: Option>, payload: JsonRpcRequestEnum, proxy_mode: ProxyMode, -) -> FrontendResult { +) -> Web3ProxyResponse { // TODO: benchmark spawning this // TODO: do we care about keeping the TypedHeader wrapper? let origin = origin.map(|x| x.0); @@ -115,7 +115,7 @@ pub async fn proxy_web3_rpc_with_key( user_agent: Option>, Path(rpc_key): Path, Json(payload): Json, -) -> FrontendResult { +) -> Web3ProxyResponse { _proxy_web3_rpc_with_key( app, ip, @@ -138,7 +138,7 @@ pub async fn debug_proxy_web3_rpc_with_key( user_agent: Option>, Path(rpc_key): Path, Json(payload): Json, -) -> FrontendResult { +) -> Web3ProxyResponse { _proxy_web3_rpc_with_key( app, ip, @@ -161,7 +161,7 @@ pub async fn fastest_proxy_web3_rpc_with_key( user_agent: Option>, Path(rpc_key): Path, Json(payload): Json, -) -> FrontendResult { +) -> Web3ProxyResponse { _proxy_web3_rpc_with_key( app, ip, @@ -184,7 +184,7 @@ pub async fn versus_proxy_web3_rpc_with_key( user_agent: Option>, Path(rpc_key): Path, Json(payload): Json, -) -> FrontendResult { +) -> Web3ProxyResponse { _proxy_web3_rpc_with_key( app, ip, @@ -208,7 +208,7 @@ async fn _proxy_web3_rpc_with_key( rpc_key: String, payload: JsonRpcRequestEnum, proxy_mode: ProxyMode, -) -> FrontendResult { +) -> Web3ProxyResponse { // TODO: DRY w/ proxy_web3_rpc // the request can take a while, so we spawn so that we can start serving another request let rpc_key = rpc_key.parse()?; diff --git a/web3_proxy/src/frontend/rpc_proxy_ws.rs b/web3_proxy/src/frontend/rpc_proxy_ws.rs index 968e5eca..28c9f934 100644 --- a/web3_proxy/src/frontend/rpc_proxy_ws.rs +++ b/web3_proxy/src/frontend/rpc_proxy_ws.rs @@ -3,10 +3,11 @@ //! WebSockets are the preferred method of receiving requests, but not all clients have good support. use super::authorization::{ip_is_authorized, key_is_authorized, Authorization, RequestMetadata}; -use super::errors::{FrontendErrorResponse, FrontendResult}; +use super::errors::{Web3ProxyError, Web3ProxyResponse}; use crate::stats::RpcQueryStats; use crate::{ app::Web3ProxyApp, + frontend::errors::Web3ProxyResult, jsonrpc::{JsonRpcForwardedResponse, JsonRpcForwardedResponseEnum, JsonRpcRequest}, }; use axum::headers::{Origin, Referer, UserAgent}; @@ -26,7 +27,7 @@ use futures::{ use handlebars::Handlebars; use hashbrown::HashMap; use http::StatusCode; -use log::{error, info, trace, warn}; +use log::{info, trace, warn}; use serde_json::json; use serde_json::value::to_raw_value; use std::sync::Arc; @@ -59,7 +60,7 @@ pub async fn websocket_handler( ip: InsecureClientIp, origin: Option>, ws_upgrade: Option, -) -> FrontendResult { +) -> Web3ProxyResponse { _websocket_handler(ProxyMode::Best, app, ip, origin, ws_upgrade).await } @@ -71,7 +72,7 @@ pub async fn fastest_websocket_handler( ip: InsecureClientIp, origin: Option>, ws_upgrade: Option, -) -> FrontendResult { +) -> Web3ProxyResponse { // TODO: get the fastest number from the url params (default to 0/all) // TODO: config to disable this _websocket_handler(ProxyMode::Fastest(0), app, ip, origin, ws_upgrade).await @@ -85,7 +86,7 @@ pub async fn versus_websocket_handler( ip: InsecureClientIp, origin: Option>, ws_upgrade: Option, -) -> FrontendResult { +) -> Web3ProxyResponse { // TODO: config to disable this _websocket_handler(ProxyMode::Versus, app, ip, origin, ws_upgrade).await } @@ -96,7 +97,7 @@ async fn _websocket_handler( InsecureClientIp(ip): InsecureClientIp, origin: Option>, ws_upgrade: Option, -) -> FrontendResult { +) -> Web3ProxyResponse { let origin = origin.map(|x| x.0); let (authorization, _semaphore) = ip_is_authorized(&app, ip, origin, proxy_mode).await?; @@ -112,11 +113,7 @@ async fn _websocket_handler( // this is not a websocket. redirect to a friendly page Ok(Redirect::permanent(redirect).into_response()) } else { - // TODO: do not use an anyhow error. send the user a 400 - Err( - anyhow::anyhow!("redirect_public_url not set. only websockets work here") - .into(), - ) + Err(Web3ProxyError::WebsocketOnly) } } } @@ -134,7 +131,7 @@ pub async fn websocket_handler_with_key( referer: Option>, user_agent: Option>, ws_upgrade: Option, -) -> FrontendResult { +) -> Web3ProxyResponse { _websocket_handler_with_key( ProxyMode::Best, app, @@ -157,7 +154,7 @@ pub async fn debug_websocket_handler_with_key( referer: Option>, user_agent: Option>, ws_upgrade: Option, -) -> FrontendResult { +) -> Web3ProxyResponse { _websocket_handler_with_key( ProxyMode::Debug, app, @@ -180,7 +177,7 @@ pub async fn fastest_websocket_handler_with_key( referer: Option>, user_agent: Option>, ws_upgrade: Option, -) -> FrontendResult { +) -> Web3ProxyResponse { // TODO: get the fastest number from the url params (default to 0/all) _websocket_handler_with_key( ProxyMode::Fastest(0), @@ -204,7 +201,7 @@ pub async fn versus_websocket_handler_with_key( referer: Option>, user_agent: Option>, ws_upgrade: Option, -) -> FrontendResult { +) -> Web3ProxyResponse { _websocket_handler_with_key( ProxyMode::Versus, app, @@ -228,7 +225,7 @@ async fn _websocket_handler_with_key( referer: Option>, user_agent: Option>, ws_upgrade: Option, -) -> FrontendResult { +) -> Web3ProxyResponse { let rpc_key = rpc_key.parse()?; let (authorization, _semaphore) = key_is_authorized( @@ -260,7 +257,7 @@ async fn _websocket_handler_with_key( &app.config.redirect_rpc_key_url, authorization.checks.rpc_secret_key_id, ) { - (None, None, _) => Err(FrontendErrorResponse::StatusCode( + (None, None, _) => Err(Web3ProxyError::StatusCode( StatusCode::BAD_REQUEST, "this page is for rpcs".to_string(), None, @@ -273,7 +270,7 @@ async fn _websocket_handler_with_key( if authorization.checks.rpc_secret_key_id.is_none() { // i don't think this is possible - Err(FrontendErrorResponse::StatusCode( + Err(Web3ProxyError::StatusCode( StatusCode::UNAUTHORIZED, "AUTHORIZATION header required".to_string(), None, @@ -291,7 +288,7 @@ async fn _websocket_handler_with_key( } } // any other combinations get a simple error - _ => Err(FrontendErrorResponse::StatusCode( + _ => Err(Web3ProxyError::StatusCode( StatusCode::BAD_REQUEST, "this page is for rpcs".to_string(), None, @@ -341,7 +338,7 @@ async fn handle_socket_payload( Ok(json_request) => { let id = json_request.id.clone(); - let response: anyhow::Result = match &json_request.method + let response: Web3ProxyResult = match &json_request.method [..] { "eth_subscribe" => { @@ -378,7 +375,7 @@ async fn handle_socket_payload( // TODO: move this logic into the app? let request_bytes = json_request.num_bytes(); - let request_metadata = Arc::new(RequestMetadata::new(request_bytes).unwrap()); + let request_metadata = Arc::new(RequestMetadata::new(request_bytes)); let subscription_id = json_request.params.unwrap().to_string(); @@ -417,16 +414,7 @@ async fn handle_socket_payload( _ => app .proxy_web3_rpc(authorization.clone(), json_request.into()) .await - .map_or_else( - |err| match err { - FrontendErrorResponse::Anyhow(err) => Err(err), - _ => { - error!("handle this better! {:?}", err); - Err(anyhow::anyhow!("unexpected error! {:?}", err)) - } - }, - |(response, _)| Ok(response), - ), + .map(|(response, _)| response), }; (id, response) @@ -442,9 +430,8 @@ async fn handle_socket_payload( let response_str = match response { Ok(x) => serde_json::to_string(&x).expect("to_string should always work here"), Err(err) => { - // we have an anyhow error. turn it into a response - let response = JsonRpcForwardedResponse::from_anyhow_error(err, None, Some(id)); - + let (_, mut response) = err.into_response_parts(); + response.id = id; serde_json::to_string(&response).expect("to_string should always work here") } }; diff --git a/web3_proxy/src/frontend/users.rs b/web3_proxy/src/frontend/users.rs index 29210ae4..c405b075 100644 --- a/web3_proxy/src/frontend/users.rs +++ b/web3_proxy/src/frontend/users.rs @@ -1,6 +1,6 @@ //! Handle registration, logins, and managing account data. use super::authorization::{login_is_authorized, RpcSecretKey}; -use super::errors::FrontendResult; +use super::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResponse}; use crate::app::Web3ProxyApp; use crate::http_params::{ get_chain_id_from_params, get_page_from_params, get_query_start_from_params, @@ -9,7 +9,6 @@ use crate::stats::influxdb_queries::query_user_stats; use crate::stats::StatType; use crate::user_token::UserBearerToken; use crate::{PostLogin, PostLoginQuery}; -use anyhow::Context; use axum::headers::{Header, Origin, Referer, UserAgent}; use axum::{ extract::{Path, Query}, @@ -65,7 +64,7 @@ pub async fn user_login_get( InsecureClientIp(ip): InsecureClientIp, // TODO: what does axum's error handling look like if the path fails to parse? Path(mut params): Path>, -) -> FrontendResult { +) -> Web3ProxyResponse { login_is_authorized(&app, ip).await?; // create a message and save it in redis @@ -81,11 +80,9 @@ pub async fn user_login_get( // TODO: allow ENS names here? let user_address: Address = params .remove("user_address") - // TODO: map_err so this becomes a 500. routing must be bad - .context("impossible")? + .ok_or(Web3ProxyError::BadRouting)? .parse() - // TODO: map_err so this becomes a 401 - .context("unable to parse address")?; + .or(Err(Web3ProxyError::ParseAddressError))?; let login_domain = app .config @@ -113,7 +110,7 @@ pub async fn user_login_get( resources: vec![], }; - let db_conn = app.db_conn().context("login requires a database")?; + let db_conn = app.db_conn().web3_context("login requires a database")?; // massage types to fit in the database. sea-orm does not make this very elegant let uuid = Uuid::from_u128(nonce.into()); @@ -135,7 +132,7 @@ pub async fn user_login_get( user_pending_login .save(&db_conn) .await - .context("saving user's pending_login")?; + .web3_context("saving user's pending_login")?; // there are multiple ways to sign messages and not all wallets support them // TODO: default message eip from config? @@ -148,8 +145,7 @@ pub async fn user_login_get( "eip191_hash" => Bytes::from(&message.eip191_hash().unwrap()).to_string(), "eip4361" => message.to_string(), _ => { - // TODO: custom error that is handled a 401 - return Err(anyhow::anyhow!("invalid message eip given").into()); + return Err(Web3ProxyError::InvalidEip); } }; @@ -165,13 +161,13 @@ pub async fn user_login_post( InsecureClientIp(ip): InsecureClientIp, Query(query): Query, Json(payload): Json, -) -> FrontendResult { +) -> Web3ProxyResponse { login_is_authorized(&app, ip).await?; // TODO: this seems too verbose. how can we simply convert a String into a [u8; 65] - let their_sig_bytes = Bytes::from_str(&payload.sig).context("parsing sig")?; + let their_sig_bytes = Bytes::from_str(&payload.sig).web3_context("parsing sig")?; if their_sig_bytes.len() != 65 { - return Err(anyhow::anyhow!("checking signature length").into()); + return Err(Web3ProxyError::InvalidSignatureLength); } let mut their_sig: [u8; 65] = [0; 65]; for x in 0..65 { @@ -181,17 +177,18 @@ pub async fn user_login_post( // we can't trust that they didn't tamper with the message in some way. like some clients return it hex encoded // TODO: checking 0x seems fragile, but I think it will be fine. siwe message text shouldn't ever start with 0x let their_msg: Message = if payload.msg.starts_with("0x") { - let their_msg_bytes = Bytes::from_str(&payload.msg).context("parsing payload message")?; + let their_msg_bytes = + Bytes::from_str(&payload.msg).web3_context("parsing payload message")?; // TODO: lossy or no? String::from_utf8_lossy(their_msg_bytes.as_ref()) .parse::() - .context("parsing hex string message")? + .web3_context("parsing hex string message")? } else { payload .msg .parse::() - .context("parsing string message")? + .web3_context("parsing string message")? }; // the only part of the message we will trust is their nonce @@ -199,7 +196,9 @@ pub async fn user_login_post( let login_nonce = UserBearerToken::from_str(&their_msg.nonce)?; // fetch the message we gave them from our database - let db_replica = app.db_replica().context("Getting database connection")?; + let db_replica = app + .db_replica() + .web3_context("Getting database connection")?; // massage type for the db let login_nonce_uuid: Uuid = login_nonce.clone().into(); @@ -208,13 +207,13 @@ pub async fn user_login_post( .filter(pending_login::Column::Nonce.eq(login_nonce_uuid)) .one(db_replica.conn()) .await - .context("database error while finding pending_login")? - .context("login nonce not found")?; + .web3_context("database error while finding pending_login")? + .web3_context("login nonce not found")?; let our_msg: siwe::Message = user_pending_login .message .parse() - .context("parsing siwe message")?; + .web3_context("parsing siwe message")?; // default options are fine. the message includes timestamp and domain and nonce let verify_config = VerificationOpts::default(); @@ -223,16 +222,16 @@ pub async fn user_login_post( if let Err(err_1) = our_msg .verify(&their_sig, &verify_config) .await - .context("verifying signature against our local message") + .web3_context("verifying signature against our local message") { // verification method 1 failed. try eip191 if let Err(err_191) = our_msg .verify_eip191(&their_sig) - .context("verifying eip191 signature against our local message") + .web3_context("verifying eip191 signature against our local message") { let db_conn = app .db_conn() - .context("deleting expired pending logins requires a db")?; + .web3_context("deleting expired pending logins requires a db")?; // delete ALL expired rows. let now = Utc::now(); @@ -244,12 +243,10 @@ pub async fn user_login_post( // TODO: emit a stat? if this is high something weird might be happening debug!("cleared expired pending_logins: {:?}", delete_result); - return Err(anyhow::anyhow!( - "both the primary and eip191 verification failed: {:#?}; {:#?}", - err_1, - err_191 - ) - .into()); + return Err(Web3ProxyError::EipVerificationFailed( + Box::new(err_1), + Box::new(err_191), + )); } } @@ -260,7 +257,7 @@ pub async fn user_login_post( .await .unwrap(); - let db_conn = app.db_conn().context("login requires a db")?; + let db_conn = app.db_conn().web3_context("login requires a db")?; let (u, uks, status_code) = match u { None => { @@ -270,7 +267,7 @@ pub async fn user_login_post( // TODO: more advanced invite codes that set different request/minute and concurrency limits if let Some(invite_code) = &app.config.invite_code { if query.invite_code.as_ref() != Some(invite_code) { - return Err(anyhow::anyhow!("checking invite_code").into()); + return Err(Web3ProxyError::InvalidInviteCode); } } @@ -300,7 +297,7 @@ pub async fn user_login_post( let uk = uk .insert(&txn) .await - .context("Failed saving new user key")?; + .web3_context("Failed saving new user key")?; let uks = vec![uk]; @@ -315,7 +312,7 @@ pub async fn user_login_post( .filter(rpc_key::Column::UserId.eq(u.id)) .all(db_replica.conn()) .await - .context("failed loading user's key")?; + .web3_context("failed loading user's key")?; (u, uks, StatusCode::OK) } @@ -355,7 +352,7 @@ pub async fn user_login_post( user_login .save(&db_conn) .await - .context("saving user login")?; + .web3_context("saving user login")?; if let Err(err) = user_pending_login .into_active_model() @@ -373,10 +370,12 @@ pub async fn user_login_post( pub async fn user_logout_post( Extension(app): Extension>, TypedHeader(Authorization(bearer)): TypedHeader>, -) -> FrontendResult { +) -> Web3ProxyResponse { let user_bearer = UserBearerToken::try_from(bearer)?; - let db_conn = app.db_conn().context("database needed for user logout")?; + let db_conn = app + .db_conn() + .web3_context("database needed for user logout")?; if let Err(err) = login::Entity::delete_many() .filter(login::Column::BearerToken.eq(user_bearer.uuid())) @@ -417,7 +416,7 @@ pub async fn user_logout_post( pub async fn user_get( Extension(app): Extension>, TypedHeader(Authorization(bearer_token)): TypedHeader>, -) -> FrontendResult { +) -> Web3ProxyResponse { let (user, _semaphore) = app.bearer_is_authorized(bearer_token).await?; Ok(Json(user).into_response()) @@ -435,7 +434,7 @@ pub async fn user_post( Extension(app): Extension>, TypedHeader(Authorization(bearer_token)): TypedHeader>, Json(payload): Json, -) -> FrontendResult { +) -> Web3ProxyResponse { let (user, _semaphore) = app.bearer_is_authorized(bearer_token).await?; let mut user: user::ActiveModel = user.into(); @@ -456,7 +455,7 @@ pub async fn user_post( // TODO: what else can we update here? password hash? subscription to newsletter? let user = if user.is_changed() { - let db_conn = app.db_conn().context("Getting database connection")?; + let db_conn = app.db_conn().web3_context("Getting database connection")?; user.save(&db_conn).await? } else { @@ -464,7 +463,7 @@ pub async fn user_post( user }; - let user: user::Model = user.try_into().context("Returning updated user")?; + let user: user::Model = user.try_into().web3_context("Returning updated user")?; Ok(Json(user).into_response()) } @@ -480,8 +479,8 @@ pub async fn user_post( pub async fn user_balance_get( Extension(app): Extension>, TypedHeader(Authorization(bearer)): TypedHeader>, -) -> FrontendResult { - let (user, _semaphore) = app.bearer_is_authorized(bearer).await?; +) -> Web3ProxyResponse { + let (_user, _semaphore) = app.bearer_is_authorized(bearer).await?; todo!("user_balance_get"); } @@ -495,8 +494,8 @@ pub async fn user_balance_get( pub async fn user_balance_post( Extension(app): Extension>, TypedHeader(Authorization(bearer)): TypedHeader>, -) -> FrontendResult { - let (user, _semaphore) = app.bearer_is_authorized(bearer).await?; +) -> Web3ProxyResponse { + let (_user, _semaphore) = app.bearer_is_authorized(bearer).await?; todo!("user_balance_post"); } @@ -506,18 +505,18 @@ pub async fn user_balance_post( pub async fn rpc_keys_get( Extension(app): Extension>, TypedHeader(Authorization(bearer)): TypedHeader>, -) -> FrontendResult { +) -> Web3ProxyResponse { let (user, _semaphore) = app.bearer_is_authorized(bearer).await?; let db_replica = app .db_replica() - .context("db_replica is required to fetch a user's keys")?; + .web3_context("db_replica is required to fetch a user's keys")?; let uks = rpc_key::Entity::find() .filter(rpc_key::Column::UserId.eq(user.id)) .all(db_replica.conn()) .await - .context("failed loading user's key")?; + .web3_context("failed loading user's key")?; let response_json = json!({ "user_id": user.id, @@ -535,11 +534,11 @@ pub async fn rpc_keys_get( pub async fn rpc_keys_delete( Extension(app): Extension>, TypedHeader(Authorization(bearer)): TypedHeader>, -) -> FrontendResult { - let (user, _semaphore) = app.bearer_is_authorized(bearer).await?; +) -> Web3ProxyResponse { + let (_user, _semaphore) = app.bearer_is_authorized(bearer).await?; // TODO: think about how cascading deletes and billing should work - Err(anyhow::anyhow!("work in progress").into()) + Err(Web3ProxyError::NotImplemented) } /// the JSON input to the `rpc_keys_management` handler. @@ -567,12 +566,14 @@ pub async fn rpc_keys_management( Extension(app): Extension>, TypedHeader(Authorization(bearer)): TypedHeader>, Json(payload): Json, -) -> FrontendResult { +) -> Web3ProxyResponse { // TODO: is there a way we can know if this is a PUT or POST? right now we can modify or create keys with either. though that probably doesn't matter let (user, _semaphore) = app.bearer_is_authorized(bearer).await?; - let db_replica = app.db_replica().context("getting db for user's keys")?; + let db_replica = app + .db_replica() + .web3_context("getting db for user's keys")?; let mut uk = if let Some(existing_key_id) = payload.key_id { // get the key and make sure it belongs to the user @@ -581,8 +582,8 @@ pub async fn rpc_keys_management( .filter(rpc_key::Column::Id.eq(existing_key_id)) .one(db_replica.conn()) .await - .context("failed loading user's key")? - .context("key does not exist or is not controlled by this bearer token")? + .web3_context("failed loading user's key")? + .web3_context("key does not exist or is not controlled by this bearer token")? .into_active_model() } else { // make a new key @@ -591,7 +592,7 @@ pub async fn rpc_keys_management( let log_level = payload .log_level - .context("log level must be 'none', 'detailed', or 'aggregated'")?; + .web3_context("log level must be 'none', 'detailed', or 'aggregated'")?; rpc_key::ActiveModel { user_id: sea_orm::Set(user.id), @@ -720,9 +721,11 @@ pub async fn rpc_keys_management( } let uk = if uk.is_changed() { - let db_conn = app.db_conn().context("login requires a db")?; + let db_conn = app.db_conn().web3_context("login requires a db")?; - uk.save(&db_conn).await.context("Failed saving user key")? + uk.save(&db_conn) + .await + .web3_context("Failed saving user key")? } else { uk }; @@ -738,7 +741,7 @@ pub async fn user_revert_logs_get( Extension(app): Extension>, TypedHeader(Authorization(bearer)): TypedHeader>, Query(params): Query>, -) -> FrontendResult { +) -> Web3ProxyResponse { let (user, _semaphore) = app.bearer_is_authorized(bearer).await?; let chain_id = get_chain_id_from_params(app.as_ref(), ¶ms)?; @@ -757,13 +760,13 @@ pub async fn user_revert_logs_get( let db_replica = app .db_replica() - .context("getting replica db for user's revert logs")?; + .web3_context("getting replica db for user's revert logs")?; let uks = rpc_key::Entity::find() .filter(rpc_key::Column::UserId.eq(user.id)) .all(db_replica.conn()) .await - .context("failed loading user's key")?; + .web3_context("failed loading user's key")?; // TODO: only select the ids let uks: Vec<_> = uks.into_iter().map(|x| x.id).collect(); @@ -808,7 +811,7 @@ pub async fn user_stats_aggregated_get( Extension(app): Extension>, bearer: Option>>, Query(params): Query>, -) -> FrontendResult { +) -> Web3ProxyResponse { let response = query_user_stats(&app, bearer, ¶ms, StatType::Aggregated).await?; Ok(response) @@ -828,7 +831,7 @@ pub async fn user_stats_detailed_get( Extension(app): Extension>, bearer: Option>>, Query(params): Query>, -) -> FrontendResult { +) -> Web3ProxyResponse { let response = query_user_stats(&app, bearer, ¶ms, StatType::Detailed).await?; Ok(response) diff --git a/web3_proxy/src/http_params.rs b/web3_proxy/src/http_params.rs index 35236465..754857cf 100644 --- a/web3_proxy/src/http_params.rs +++ b/web3_proxy/src/http_params.rs @@ -1,5 +1,5 @@ use crate::app::DatabaseReplica; -use crate::frontend::errors::FrontendErrorResponse; +use crate::frontend::errors::{Web3ProxyError, Web3ProxyResult}; use crate::{app::Web3ProxyApp, user_token::UserBearerToken}; use anyhow::Context; use axum::{ @@ -24,7 +24,7 @@ pub async fn get_user_id_from_params( // this is a long type. should we strip it down? bearer: Option>>, params: &HashMap, -) -> Result { +) -> Web3ProxyResult { match (bearer, params.get("user_id")) { (Some(TypedHeader(Authorization(bearer))), Some(user_id)) => { // check for the bearer cache key @@ -45,7 +45,7 @@ pub async fn get_user_id_from_params( .one(db_replica.conn()) .await .context("database error while querying for user")? - .ok_or(FrontendErrorResponse::AccessDenied)?; + .ok_or(Web3ProxyError::AccessDenied)?; // if expired, delete ALL expired logins let now = Utc::now(); @@ -60,7 +60,7 @@ pub async fn get_user_id_from_params( // TODO: emit a stat? if this is high something weird might be happening debug!("cleared expired logins: {:?}", delete_result); - return Err(FrontendErrorResponse::AccessDenied); + return Err(Web3ProxyError::AccessDenied); } save_to_redis = true; @@ -76,7 +76,7 @@ pub async fn get_user_id_from_params( let user_id: u64 = user_id.parse().context("Parsing user_id param")?; if bearer_user_id != user_id { - return Err(FrontendErrorResponse::AccessDenied); + return Err(Web3ProxyError::AccessDenied); } if save_to_redis { @@ -103,7 +103,7 @@ pub async fn get_user_id_from_params( // TODO: proper error code from a useful error code // TODO: maybe instead of this sharp edged warn, we have a config value? // TODO: check config for if we should deny or allow this - Err(FrontendErrorResponse::AccessDenied) + Err(Web3ProxyError::AccessDenied) // // TODO: make this a flag // warn!("allowing without auth during development!"); // Ok(x.parse()?) @@ -215,7 +215,7 @@ pub fn get_query_stop_from_params( pub fn get_query_window_seconds_from_params( params: &HashMap, -) -> Result { +) -> Web3ProxyResult { params.get("query_window_seconds").map_or_else( || { // no page in params. set default @@ -225,15 +225,13 @@ pub fn get_query_window_seconds_from_params( // parse the given timestamp query_window_seconds.parse::().map_err(|err| { trace!("Unable to parse rpc_key_id: {:#?}", err); - FrontendErrorResponse::BadRequest("Unable to parse rpc_key_id".to_string()) + Web3ProxyError::BadRequest("Unable to parse rpc_key_id".to_string()) }) }, ) } -pub fn get_stats_column_from_params( - params: &HashMap, -) -> Result<&str, FrontendErrorResponse> { +pub fn get_stats_column_from_params(params: &HashMap) -> Web3ProxyResult<&str> { params.get("query_stats_column").map_or_else( || Ok("frontend_requests"), |query_stats_column: &String| { @@ -247,7 +245,7 @@ pub fn get_stats_column_from_params( | "sum_request_bytes" | "sum_response_bytes" | "sum_response_millis" => Ok(query_stats_column), - _ => Err(FrontendErrorResponse::BadRequest( + _ => Err(Web3ProxyError::BadRequest( "Unable to parse query_stats_column. It must be one of: \ frontend_requests, \ backend_requests, \ diff --git a/web3_proxy/src/jsonrpc.rs b/web3_proxy/src/jsonrpc.rs index ac205f12..2438eb44 100644 --- a/web3_proxy/src/jsonrpc.rs +++ b/web3_proxy/src/jsonrpc.rs @@ -1,3 +1,4 @@ +use crate::frontend::errors::Web3ProxyResult; use derive_more::From; use ethers::prelude::{HttpClientError, ProviderError, WsClientError}; use serde::de::{self, Deserializer, MapAccess, SeqAccess, Visitor}; @@ -240,7 +241,7 @@ impl JsonRpcForwardedResponse { } } - pub fn from_ethers_error(e: ProviderError, id: Box) -> anyhow::Result { + pub fn from_ethers_error(e: ProviderError, id: Box) -> Web3ProxyResult { // TODO: move turning ClientError into json to a helper function? let code; let message: String; @@ -280,7 +281,7 @@ impl JsonRpcForwardedResponse { } } } else { - return Err(anyhow::anyhow!("unexpected ethers error!")); + return Err(anyhow::anyhow!("unexpected ethers error!").into()); } } } @@ -302,7 +303,7 @@ impl JsonRpcForwardedResponse { pub fn try_from_response_result( result: Result, ProviderError>, id: Box, - ) -> anyhow::Result { + ) -> Web3ProxyResult { match result { Ok(response) => Ok(Self::from_response(response, id)), Err(e) => Self::from_ethers_error(e, id), diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index 4544386f..39271ee0 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -4,8 +4,8 @@ use super::many::Web3Rpcs; use super::one::Web3Rpc; use super::transactions::TxStatus; use crate::frontend::authorization::Authorization; +use crate::frontend::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult}; use crate::{config::BlockAndRpc, jsonrpc::JsonRpcRequest}; -use anyhow::{anyhow, Context}; use derive_more::From; use ethers::prelude::{Block, TxHash, H256, U64}; use log::{debug, trace, warn, Level}; @@ -132,11 +132,11 @@ impl Web3ProxyBlock { } impl TryFrom for Web3ProxyBlock { - type Error = anyhow::Error; + type Error = Web3ProxyError; fn try_from(x: ArcBlock) -> Result { if x.number.is_none() || x.hash.is_none() { - return Err(anyhow!("Blocks here must have a number of hash")); + return Err(Web3ProxyError::NoBlockNumberOrHash); } let b = Web3ProxyBlock { @@ -166,7 +166,7 @@ impl Web3Rpcs { &self, block: Web3ProxyBlock, heaviest_chain: bool, - ) -> anyhow::Result { + ) -> Web3ProxyResult { // TODO: i think we can rearrange this function to make it faster on the hot path let block_hash = block.hash(); @@ -198,13 +198,13 @@ impl Web3Rpcs { /// Get a block from caches with fallback. /// Will query a specific node or the best available. - /// TODO: return anyhow::Result>? + /// TODO: return Web3ProxyResult>? pub async fn block( &self, authorization: &Arc, hash: &H256, rpc: Option<&Arc>, - ) -> anyhow::Result { + ) -> Web3ProxyResult { // first, try to get the hash from our cache // the cache is set last, so if its here, its everywhere // TODO: use try_get_with @@ -233,7 +233,7 @@ impl Web3Rpcs { x.try_into().ok() } }) - .context("no block!")?, + .web3_context("no block!")?, None => { // TODO: helper for method+params => JsonRpcRequest // TODO: does this id matter? @@ -253,16 +253,16 @@ impl Web3Rpcs { .await?; if let Some(err) = response.error { - let err = anyhow::anyhow!("{:#?}", err); - - return Err(err.context("failed fetching block")); + return Err(err).web3_context("failed fetching block"); } - let block = response.result.context("no error, but also no block")?; + let block = response + .result + .web3_context("no error, but also no block")?; let block: Option = serde_json::from_str(block.get())?; - let block: ArcBlock = block.context("no block in the response")?; + let block: ArcBlock = block.web3_context("no block in the response")?; // TODO: received time is going to be weird Web3ProxyBlock::try_from(block)? @@ -281,7 +281,7 @@ impl Web3Rpcs { &self, authorization: &Arc, num: &U64, - ) -> anyhow::Result<(H256, u64)> { + ) -> Web3ProxyResult<(H256, u64)> { let (block, block_depth) = self.cannonical_block(authorization, num).await?; let hash = *block.hash(); @@ -295,7 +295,7 @@ impl Web3Rpcs { &self, authorization: &Arc, num: &U64, - ) -> anyhow::Result<(Web3ProxyBlock, u64)> { + ) -> Web3ProxyResult<(Web3ProxyBlock, u64)> { // we only have blocks by hash now // maybe save them during save_block in a blocks_by_number Cache> // if theres multiple, use petgraph to find the one on the main chain (and remove the others if they have enough confirmations) @@ -303,7 +303,7 @@ impl Web3Rpcs { let mut consensus_head_receiver = self .watch_consensus_head_sender .as_ref() - .context("need new head subscriptions to fetch cannonical_block")? + .web3_context("need new head subscriptions to fetch cannonical_block")? .subscribe(); // be sure the requested block num exists @@ -311,7 +311,7 @@ impl Web3Rpcs { let mut head_block_num = *consensus_head_receiver .borrow_and_update() .as_ref() - .context("no consensus head block")? + .web3_context("no consensus head block")? .number(); loop { @@ -352,7 +352,7 @@ impl Web3Rpcs { debug!("could not find canonical block {}: {:?}", num, err); } - let raw_block = response.result.context("no cannonical block result")?; + let raw_block = response.result.web3_context("no cannonical block result")?; let block: ArcBlock = serde_json::from_str(raw_block.get())?; @@ -412,13 +412,13 @@ impl Web3Rpcs { consensus_finder: &mut ConsensusFinder, new_block: Option, rpc: Arc, - pending_tx_sender: &Option>, - ) -> anyhow::Result<()> { + _pending_tx_sender: &Option>, + ) -> Web3ProxyResult<()> { // TODO: how should we handle an error here? if !consensus_finder .update_rpc(new_block.clone(), rpc.clone(), self) .await - .context("failed to update rpc")? + .web3_context("failed to update rpc")? { // nothing changed. no need to scan for a new consensus head return Ok(()); @@ -429,12 +429,10 @@ impl Web3Rpcs { .await { Err(err) => { - let err = err.context("error while finding consensus head block!"); - - return Err(err); + return Err(err).web3_context("error while finding consensus head block!"); } Ok(None) => { - return Err(anyhow!("no consensus head block!")); + return Err(Web3ProxyError::NoConsensusHeadBlock); } Ok(Some(x)) => x, }; @@ -479,7 +477,8 @@ impl Web3Rpcs { watch_consensus_head_sender .send(Some(consensus_head_block)) - .context( + .or(Err(Web3ProxyError::WatchSendError)) + .web3_context( "watch_consensus_head_sender failed sending first consensus_head_block", )?; } @@ -529,11 +528,12 @@ impl Web3Rpcs { let consensus_head_block = self .try_cache_block(consensus_head_block, true) .await - .context("save consensus_head_block as heaviest chain")?; + .web3_context("save consensus_head_block as heaviest chain")?; watch_consensus_head_sender .send(Some(consensus_head_block)) - .context("watch_consensus_head_sender failed sending uncled consensus_head_block")?; + .or(Err(Web3ProxyError::WatchSendError)) + .web3_context("watch_consensus_head_sender failed sending uncled consensus_head_block")?; } } Ordering::Less => { @@ -562,11 +562,14 @@ impl Web3Rpcs { let consensus_head_block = self .try_cache_block(consensus_head_block, true) .await - .context("save_block sending consensus_head_block as heaviest chain")?; + .web3_context( + "save_block sending consensus_head_block as heaviest chain", + )?; watch_consensus_head_sender .send(Some(consensus_head_block)) - .context("watch_consensus_head_sender failed sending rollback consensus_head_block")?; + .or(Err(Web3ProxyError::WatchSendError)) + .web3_context("watch_consensus_head_sender failed sending rollback consensus_head_block")?; } Ordering::Greater => { debug!( @@ -590,7 +593,9 @@ impl Web3Rpcs { let consensus_head_block = self.try_cache_block(consensus_head_block, true).await?; - watch_consensus_head_sender.send(Some(consensus_head_block)).context("watch_consensus_head_sender failed sending new consensus_head_block")?; + watch_consensus_head_sender.send(Some(consensus_head_block)) + .or(Err(Web3ProxyError::WatchSendError)) + .web3_context("watch_consensus_head_sender failed sending new consensus_head_block")?; } } } diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index f70f73eb..6f3e6a2b 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -2,7 +2,7 @@ use super::blockchain::Web3ProxyBlock; use super::many::Web3Rpcs; use super::one::Web3Rpc; use crate::frontend::authorization::Authorization; -use anyhow::Context; +use crate::frontend::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult}; use ethers::prelude::{H256, U64}; use hashbrown::{HashMap, HashSet}; use itertools::{Itertools, MinMaxResult}; @@ -156,7 +156,7 @@ impl ConsensusFinder { rpc: Arc, // we need this so we can save the block to caches. i don't like it though. maybe we should use a lazy_static Cache wrapper that has a "save_block" method?. i generally dislike globals but i also dislike all the types having to pass eachother around web3_connections: &Web3Rpcs, - ) -> anyhow::Result { + ) -> Web3ProxyResult { // add the rpc's block to connection_heads, or remove the rpc from connection_heads let changed = match rpc_head_block { Some(mut rpc_head_block) => { @@ -164,7 +164,7 @@ impl ConsensusFinder { rpc_head_block = web3_connections .try_cache_block(rpc_head_block, false) .await - .context("failed caching block")?; + .web3_context("failed caching block")?; // if let Some(max_block_lag) = max_block_lag { // if rpc_head_block.number() < ??? { @@ -203,7 +203,7 @@ impl ConsensusFinder { &mut self, authorization: &Arc, web3_rpcs: &Web3Rpcs, - ) -> anyhow::Result> { + ) -> Web3ProxyResult> { let minmax_block = self.rpc_heads.values().minmax_by_key(|&x| x.number()); let (lowest_block, highest_block) = match minmax_block { diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index ce89dcf5..cc49561b 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -7,6 +7,7 @@ use crate::app::{flatten_handle, AnyhowJoinHandle, Web3ProxyApp}; ///! Load balanced communication with a group of web3 providers use crate::config::{BlockAndRpc, TxHashAndRpc, Web3RpcConfig}; use crate::frontend::authorization::{Authorization, RequestMetadata}; +use crate::frontend::errors::{Web3ProxyError, Web3ProxyResult}; use crate::frontend::rpc_proxy_ws::ProxyMode; use crate::jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest}; use crate::rpcs::transactions::TxStatus; @@ -422,7 +423,7 @@ impl Web3Rpcs { params: Option<&serde_json::Value>, error_level: Level, // TODO: remove this box once i figure out how to do the options - ) -> anyhow::Result { + ) -> Web3ProxyResult { // TODO: if only 1 active_request_handles, do self.try_send_request? let responses = active_request_handles @@ -500,7 +501,7 @@ impl Web3Rpcs { // TODO: if we are checking for the consensus head, i don' think we need min_block_needed/max_block_needed min_block_needed: Option<&U64>, max_block_needed: Option<&U64>, - ) -> anyhow::Result { + ) -> Web3ProxyResult { let usable_rpcs_by_tier_and_head_number: BTreeMap<(u64, Option), Vec>> = { let synced_connections = self.watch_consensus_rpcs_sender.borrow().clone(); @@ -529,11 +530,10 @@ impl Web3Rpcs { cmp::Ordering::Greater => { // TODO: force a debug log of the original request to see if our logic is wrong? // TODO: attach the rpc_key_id so we can find the user to ask if they need help - return Err(anyhow::anyhow!( - "Invalid blocks bounds requested. min ({}) > max ({})", - min_block_needed, - max_block_needed - )); + return Err(Web3ProxyError::InvalidBlockBounds { + min: min_block_needed.as_u64(), + max: max_block_needed.as_u64(), + }); } } } @@ -837,7 +837,7 @@ impl Web3Rpcs { request_metadata: Option<&Arc>, min_block_needed: Option<&U64>, max_block_needed: Option<&U64>, - ) -> anyhow::Result { + ) -> Web3ProxyResult { let mut skip_rpcs = vec![]; let mut method_not_available_response = None; @@ -1065,7 +1065,7 @@ impl Web3Rpcs { error_level: Level, max_count: Option, always_include_backups: bool, - ) -> anyhow::Result { + ) -> Web3ProxyResult { let mut watch_consensus_rpcs = self.watch_consensus_rpcs_sender.subscribe(); loop { @@ -1161,7 +1161,7 @@ impl Web3Rpcs { request_metadata: Option<&Arc>, min_block_needed: Option<&U64>, max_block_needed: Option<&U64>, - ) -> anyhow::Result { + ) -> Web3ProxyResult { match authorization.checks.proxy_mode { ProxyMode::Debug | ProxyMode::Best => { self.try_send_best_consensus_head_connection( @@ -1173,7 +1173,7 @@ impl Web3Rpcs { ) .await } - ProxyMode::Fastest(x) => todo!("Fastest"), + ProxyMode::Fastest(_x) => todo!("Fastest"), ProxyMode::Versus => todo!("Versus"), } } @@ -1676,7 +1676,7 @@ mod tests { OpenRequestResult::Handle(_) )); - let best_available_server_from_none = rpcs + let _best_available_server_from_none = rpcs .best_available_rpc(&authorization, None, &[], None, None) .await; diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index 63114df3..e0775768 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -5,6 +5,7 @@ use super::request::{OpenRequestHandle, OpenRequestResult}; use crate::app::{flatten_handle, AnyhowJoinHandle}; use crate::config::{BlockAndRpc, Web3RpcConfig}; use crate::frontend::authorization::Authorization; +use crate::frontend::errors::{Web3ProxyError, Web3ProxyResult}; use crate::rpcs::request::RequestRevertHandler; use anyhow::{anyhow, Context}; use ethers::prelude::{Bytes, Middleware, ProviderError, TxHash, H256, U64}; @@ -1129,7 +1130,7 @@ impl Web3Rpc { trace!("watching pending transactions on {}", self); // TODO: does this keep the lock open for too long? match provider.as_ref() { - Web3Provider::Http(provider) => { + Web3Provider::Http(_provider) => { // there is a "watch_pending_transactions" function, but a lot of public nodes do not support the necessary rpc endpoints self.wait_for_disconnect().await?; } @@ -1184,7 +1185,7 @@ impl Web3Rpc { authorization: &'a Arc, max_wait: Option, unlocked_provider: Option>, - ) -> anyhow::Result { + ) -> Web3ProxyResult { let max_wait = max_wait.map(|x| Instant::now() + x); loop { @@ -1206,8 +1207,7 @@ impl Web3Rpc { if let Some(max_wait) = max_wait { if retry_at > max_wait { // break now since we will wait past our maximum wait time - // TODO: don't use anyhow. use specific error type - return Err(anyhow::anyhow!("timeout waiting for request handle")); + return Err(Web3ProxyError::Timeout(None)); } } @@ -1221,7 +1221,7 @@ impl Web3Rpc { let now = Instant::now(); if now > max_wait { - return Err(anyhow::anyhow!("unable to retry for request handle")); + return Err(Web3ProxyError::NoHandleReady); } } @@ -1239,7 +1239,7 @@ impl Web3Rpc { authorization: &Arc, // TODO: borrow on this instead of needing to clone the Arc? unlocked_provider: Option>, - ) -> anyhow::Result { + ) -> Web3ProxyResult { // TODO: think more about this read block // TODO: this should *not* be new_head_client. this should be a separate object if unlocked_provider.is_some() || self.provider.read().await.is_some() { diff --git a/web3_proxy/src/stats/db_queries.rs b/web3_proxy/src/stats/db_queries.rs index 247bc360..eab9495b 100644 --- a/web3_proxy/src/stats/db_queries.rs +++ b/web3_proxy/src/stats/db_queries.rs @@ -1,12 +1,12 @@ use super::StatType; use crate::app::Web3ProxyApp; -use crate::frontend::errors::FrontendErrorResponse; +use crate::frontend::errors::{Web3ProxyError, Web3ProxyResponse, Web3ProxyResult}; use crate::http_params::{ get_chain_id_from_params, get_page_from_params, get_query_start_from_params, get_query_window_seconds_from_params, get_user_id_from_params, }; use anyhow::Context; -use axum::response::{IntoResponse, Response}; +use axum::response::IntoResponse; use axum::Json; use axum::{ headers::{authorization::Bearer, Authorization}, @@ -28,7 +28,7 @@ pub fn filter_query_window_seconds( query_window_seconds: u64, response: &mut HashMap<&str, serde_json::Value>, q: Select, -) -> Result, FrontendErrorResponse> { +) -> Web3ProxyResult> { if query_window_seconds == 0 { // TODO: order by more than this? // query_window_seconds is not set so we aggregate all records @@ -62,7 +62,7 @@ pub async fn query_user_stats<'a>( bearer: Option>>, params: &'a HashMap, stat_response_type: StatType, -) -> Result { +) -> Web3ProxyResponse { let db_conn = app.db_conn().context("query_user_stats needs a db")?; let db_replica = app .db_replica() @@ -209,7 +209,7 @@ pub async fn query_user_stats<'a>( // TODO: move getting the param and checking the bearer token into a helper function if let Some(rpc_key_id) = params.get("rpc_key_id") { let rpc_key_id = rpc_key_id.parse::().map_err(|e| { - FrontendErrorResponse::StatusCode( + Web3ProxyError::StatusCode( StatusCode::BAD_REQUEST, "Unable to parse rpc_key_id".to_string(), Some(e.into()), diff --git a/web3_proxy/src/stats/influxdb_queries.rs b/web3_proxy/src/stats/influxdb_queries.rs index 4bb6d294..1fa267b1 100644 --- a/web3_proxy/src/stats/influxdb_queries.rs +++ b/web3_proxy/src/stats/influxdb_queries.rs @@ -2,7 +2,7 @@ use super::StatType; use crate::http_params::get_stats_column_from_params; use crate::{ app::Web3ProxyApp, - frontend::errors::FrontendErrorResponse, + frontend::errors::{Web3ProxyError, Web3ProxyResponse}, http_params::{ get_chain_id_from_params, get_query_start_from_params, get_query_stop_from_params, get_query_window_seconds_from_params, get_user_id_from_params, @@ -11,7 +11,7 @@ use crate::{ use anyhow::Context; use axum::{ headers::{authorization::Bearer, Authorization}, - response::{IntoResponse, Response}, + response::IntoResponse, Json, TypedHeader, }; use chrono::{DateTime, FixedOffset}; @@ -60,7 +60,7 @@ pub async fn query_user_stats<'a>( bearer: Option>>, params: &'a HashMap, stat_response_type: StatType, -) -> Result { +) -> Web3ProxyResponse { info!("Got this far 1"); let db_conn = app.db_conn().context("query_user_stats needs a db")?; let db_replica = app @@ -96,7 +96,7 @@ pub async fn query_user_stats<'a>( // Return a bad request if query_start == query_stop, because then the query is empty basically if query_start == query_stop { - return Err(FrontendErrorResponse::BadRequest( + return Err(Web3ProxyError::BadRequest( "Start and Stop date cannot be equal. Please specify a (different) start date." .to_owned(), )); @@ -462,9 +462,9 @@ pub async fn query_user_stats<'a>( // Also optionally add the rpc_key_id: if let Some(rpc_key_id) = params.get("rpc_key_id") { - let rpc_key_id = rpc_key_id.parse::().map_err(|e| { - FrontendErrorResponse::BadRequest("Unable to parse rpc_key_id".to_string()) - })?; + let rpc_key_id = rpc_key_id + .parse::() + .map_err(|e| Web3ProxyError::BadRequest("Unable to parse rpc_key_id".to_string()))?; response_body.insert("rpc_key_id", serde_json::Value::Number(rpc_key_id.into())); }