diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 2ff67bee..2b8a6a4b 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -18,6 +18,7 @@ use crate::stats::{AppStat, RpcQueryStats, StatBuffer}; use crate::user_token::UserBearerToken; use anyhow::Context; use axum::headers::{Origin, Referer, UserAgent}; +use axum::http::StatusCode; use chrono::Utc; use deferred_rate_limiter::DeferredRateLimiter; use derive_more::From; @@ -1157,7 +1158,7 @@ impl Web3ProxyApp { ) -> Web3ProxyResult<(JsonRpcForwardedResponse, Vec>)> { // trace!("Received request: {:?}", request); - let request_metadata = Arc::new(RequestMetadata::new(request.num_bytes())?); + let request_metadata = Arc::new(RequestMetadata::new(request.num_bytes())); let mut kafka_stuff = None; @@ -1338,10 +1339,7 @@ impl Web3ProxyApp { } None => { // TODO: what does geth do if this happens? - // TODO: i think we want a 502 so that haproxy retries on another server - return Err( - anyhow::anyhow!("no servers synced. unknown eth_blockNumber").into(), - ); + return Err(Web3ProxyError::UnknownBlockNumber); } } } @@ -1429,7 +1427,7 @@ impl Web3ProxyApp { let head_block_num = head_block_num .or(self.balanced_rpcs.head_block_num()) - .ok_or_else(|| anyhow::anyhow!("no servers synced"))?; + .ok_or_else(|| Web3ProxyError::NoServersSynced)?; // TODO: error/wait if no head block! @@ -1607,7 +1605,7 @@ impl Web3ProxyApp { return Ok(( JsonRpcForwardedResponse::from_str( "invalid request", - None, + Some(StatusCode::BAD_REQUEST.as_u16().into()), Some(request_id), ), vec![], @@ -1760,17 +1758,10 @@ impl Web3ProxyApp { // TODO: only cache the inner response // TODO: how are we going to stream this? // TODO: check response size. if its very large, return it in a custom Error type that bypasses caching? or will moka do that for us? - Ok::<_, anyhow::Error>(response) + Ok::<_, Web3ProxyError>(response) }) - .await - // TODO: what is the best way to handle an Arc here? - .map_err(|err| { - // TODO: emit a stat for an error - anyhow::anyhow!( - "error while caching and forwarding response: {}", - err - ) - })? + // TODO: add context (error while caching and forwarding response {}) + .await? } else { self.balanced_rpcs .try_proxy_connection( diff --git a/web3_proxy/src/app/ws.rs b/web3_proxy/src/app/ws.rs index b69cdcc9..79617f7b 100644 --- a/web3_proxy/src/app/ws.rs +++ b/web3_proxy/src/app/ws.rs @@ -33,7 +33,7 @@ impl Web3ProxyApp { .context("finding request size")? .len(); - let request_metadata = Arc::new(RequestMetadata::new(request_bytes).unwrap()); + let request_metadata = Arc::new(RequestMetadata::new(request_bytes)); let (subscription_abort_handle, subscription_registration) = AbortHandle::new_pair(); @@ -67,7 +67,7 @@ impl Web3ProxyApp { }; // TODO: what should the payload for RequestMetadata be? - let request_metadata = Arc::new(RequestMetadata::new(0).unwrap()); + let request_metadata = Arc::new(RequestMetadata::new(0)); // TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id let response_json = json!({ @@ -133,7 +133,7 @@ impl Web3ProxyApp { // TODO: do something with this handle? tokio::spawn(async move { while let Some(Ok(new_tx_state)) = pending_tx_receiver.next().await { - let request_metadata = Arc::new(RequestMetadata::new(0).unwrap()); + let request_metadata = Arc::new(RequestMetadata::new(0)); let new_tx = match new_tx_state { TxStatus::Pending(tx) => tx, @@ -208,7 +208,7 @@ impl Web3ProxyApp { // TODO: do something with this handle? tokio::spawn(async move { while let Some(Ok(new_tx_state)) = pending_tx_receiver.next().await { - let request_metadata = Arc::new(RequestMetadata::new(0).unwrap()); + let request_metadata = Arc::new(RequestMetadata::new(0)); let new_tx = match new_tx_state { TxStatus::Pending(tx) => tx, @@ -284,7 +284,7 @@ impl Web3ProxyApp { // TODO: do something with this handle? tokio::spawn(async move { while let Some(Ok(new_tx_state)) = pending_tx_receiver.next().await { - let request_metadata = Arc::new(RequestMetadata::new(0).unwrap()); + let request_metadata = Arc::new(RequestMetadata::new(0)); let new_tx = match new_tx_state { TxStatus::Pending(tx) => tx, diff --git a/web3_proxy/src/block_number.rs b/web3_proxy/src/block_number.rs index bfb39299..b75375a1 100644 --- a/web3_proxy/src/block_number.rs +++ b/web3_proxy/src/block_number.rs @@ -1,4 +1,5 @@ //! Helper functions for turning ether's BlockNumber into numbers and updating incoming queries to match. +use crate::frontend::errors::{Web3ProxyError, Web3ProxyResult}; use anyhow::Context; use ethers::{ prelude::{BlockNumber, U64}, @@ -126,7 +127,7 @@ pub async fn block_needed( params: Option<&mut serde_json::Value>, head_block_num: U64, rpcs: &Web3Rpcs, -) -> anyhow::Result { +) -> Web3ProxyResult { // some requests have potentially very large responses // TODO: only skip caching if the response actually is large if method.starts_with("trace_") || method == "debug_traceTransaction" { @@ -179,7 +180,7 @@ pub async fn block_needed( // TODO: this shouldn't be a 500. this should be a 400. 500 will make haproxy retry a bunch let obj = params[0] .as_object_mut() - .ok_or_else(|| anyhow::anyhow!("invalid format"))?; + .ok_or_else(|| Web3ProxyError::BadRequest("invalid format".to_string()))?; if obj.contains_key("blockHash") { return Ok(BlockNeeded::CacheSuccessForever); diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index 0920d9d3..9a8723f1 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -88,11 +88,11 @@ pub struct RequestMetadata { } impl RequestMetadata { - pub fn new(request_bytes: usize) -> anyhow::Result { + pub fn new(request_bytes: usize) -> Self { // TODO: how can we do this without turning it into a string first. this is going to slow us down! let request_bytes = request_bytes as u64; - let new = Self { + Self { start_instant: Instant::now(), request_bytes, archive_request: false.into(), @@ -102,9 +102,7 @@ impl RequestMetadata { response_bytes: 0.into(), response_millis: 0.into(), response_from_backup_rpc: false.into(), - }; - - Ok(new) + } } } diff --git a/web3_proxy/src/frontend/errors.rs b/web3_proxy/src/frontend/errors.rs index 14bf0185..8acd662c 100644 --- a/web3_proxy/src/frontend/errors.rs +++ b/web3_proxy/src/frontend/errors.rs @@ -4,6 +4,7 @@ use super::authorization::Authorization; use crate::jsonrpc::JsonRpcForwardedResponse; use std::net::IpAddr; +use std::sync::Arc; use axum::{ headers, @@ -33,11 +34,18 @@ pub enum Web3ProxyError { #[error(ignore)] #[from(ignore)] BadRequest(String), - SemaphoreAcquireError(AcquireError), Database(DbErr), + EthersHttpClientError(ethers::prelude::HttpClientError), + EthersProviderError(ethers::prelude::ProviderError), + EthersWsClientError(ethers::prelude::WsClientError), Headers(headers::Error), HeaderToString(ToStrError), InfluxDb2RequestError(influxdb2::RequestError), + #[display(fmt = "{} > {}", min, max)] + InvalidBlockBounds { + min: u64, + max: u64, + }, InvalidHeaderValue(InvalidHeaderValue), IpAddrParse(AddrParseError), #[error(ignore)] @@ -45,6 +53,8 @@ pub enum Web3ProxyError { IpNotAllowed(IpAddr), JoinError(JoinError), MsgPackEncode(rmp_serde::encode::Error), + NoServersSynced, + NoHandleReady, NotFound, OriginRequired, #[error(ignore)] @@ -58,16 +68,23 @@ pub enum Web3ProxyError { #[error(ignore)] #[from(ignore)] RefererNotAllowed(headers::Referer), + SeaRc(Arc), + SemaphoreAcquireError(AcquireError), + SerdeJson(serde_json::Error), /// simple way to return an error message to the user and an anyhow to our logs #[display(fmt = "{}, {}, {:?}", _0, _1, _2)] StatusCode(StatusCode, String, Option), /// TODO: what should be attached to the timout? - Timeout(tokio::time::error::Elapsed), + #[display(fmt = "{:?}", _0)] + #[error(ignore)] + Timeout(Option), UlidDecode(ulid::DecodeError), + UnknownBlockNumber, UnknownKey, UserAgentRequired, #[error(ignore)] UserAgentNotAllowed(headers::UserAgent), + WatchRecvError(tokio::sync::watch::error::RecvError), } impl Web3ProxyError { @@ -120,6 +137,39 @@ impl Web3ProxyError { ), ) } + Self::EthersHttpClientError(err) => { + warn!("EthersHttpClientError err={:?}", err); + ( + StatusCode::INTERNAL_SERVER_ERROR, + JsonRpcForwardedResponse::from_str( + "ether http client error", + Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), + None, + ), + ) + } + Self::EthersProviderError(err) => { + warn!("EthersProviderError err={:?}", err); + ( + StatusCode::INTERNAL_SERVER_ERROR, + JsonRpcForwardedResponse::from_str( + "ether provider error", + Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), + None, + ), + ) + } + Self::EthersWsClientError(err) => { + warn!("EthersWsClientError err={:?}", err); + ( + StatusCode::INTERNAL_SERVER_ERROR, + JsonRpcForwardedResponse::from_str( + "ether ws client error", + Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), + None, + ), + ) + } Self::Headers(err) => { warn!("HeadersError {:?}", err); ( @@ -143,6 +193,20 @@ impl Web3ProxyError { ), ) } + Self::InvalidBlockBounds { min, max } => { + warn!("InvalidBlockBounds min={} max={}", min, max); + ( + StatusCode::BAD_REQUEST, + JsonRpcForwardedResponse::from_string( + format!( + "Invalid blocks bounds requested. min ({}) > max ({})", + min, max + ), + Some(StatusCode::BAD_REQUEST.as_u16().into()), + None, + ), + ) + } Self::IpAddrParse(err) => { warn!("IpAddrParse err={:?}", err); ( @@ -206,6 +270,28 @@ impl Web3ProxyError { ), ) } + Self::NoServersSynced => { + warn!("NoServersSynced"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + JsonRpcForwardedResponse::from_str( + "no servers synced", + Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), + None, + ), + ) + } + Self::NoHandleReady => { + error!("NoHandleReady"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + JsonRpcForwardedResponse::from_str( + "unable to retry for request handle", + Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), + None, + ), + ) + } Self::NotFound => { // TODO: emit a stat? // TODO: instead of an error, show a normal html page for 404 @@ -305,6 +391,11 @@ impl Web3ProxyError { ), ) } + Self::SeaRc(err) => match migration::SeaRc::try_unwrap(err) { + Ok(err) => err, + Err(err) => Self::Anyhow(anyhow::anyhow!("{}", err)), + } + .into_response_parts(), Self::SemaphoreAcquireError(err) => { warn!("semaphore acquire err={:?}", err); ( @@ -317,6 +408,17 @@ impl Web3ProxyError { ), ) } + Self::SerdeJson(err) => { + warn!("serde json err={:?}", err); + ( + StatusCode::BAD_REQUEST, + JsonRpcForwardedResponse::from_str( + "de/serialization error!", + Some(StatusCode::BAD_REQUEST.as_u16().into()), + None, + ), + ) + } Self::StatusCode(status_code, err_msg, err) => { // different status codes should get different error levels. 500s should warn. 400s should stat let code = status_code.as_u16(); @@ -362,6 +464,17 @@ impl Web3ProxyError { ), ) } + Self::UnknownBlockNumber => { + error!("UnknownBlockNumber"); + ( + StatusCode::BAD_GATEWAY, + JsonRpcForwardedResponse::from_str( + "no servers synced. unknown eth_blockNumber", + Some(StatusCode::BAD_GATEWAY.as_u16().into()), + None, + ), + ) + } // TODO: stat? Self::UnknownKey => ( StatusCode::UNAUTHORIZED, @@ -393,10 +506,27 @@ impl Web3ProxyError { ), ) } + Self::WatchRecvError(err) => { + error!("WatchRecvError err={:?}", err); + ( + StatusCode::INTERNAL_SERVER_ERROR, + JsonRpcForwardedResponse::from_str( + "watch recv error!", + Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), + None, + ), + ) + } } } } +impl From for Web3ProxyError { + fn from(err: tokio::time::error::Elapsed) -> Self { + Self::Timeout(Some(err)) + } +} + impl IntoResponse for Web3ProxyError { fn into_response(self) -> Response { // TODO: include the request id in these so that users can give us something that will point to logs diff --git a/web3_proxy/src/frontend/rpc_proxy_ws.rs b/web3_proxy/src/frontend/rpc_proxy_ws.rs index 3e3a6957..9f249555 100644 --- a/web3_proxy/src/frontend/rpc_proxy_ws.rs +++ b/web3_proxy/src/frontend/rpc_proxy_ws.rs @@ -378,7 +378,7 @@ async fn handle_socket_payload( // TODO: move this logic into the app? let request_bytes = json_request.num_bytes(); - let request_metadata = Arc::new(RequestMetadata::new(request_bytes).unwrap()); + let request_metadata = Arc::new(RequestMetadata::new(request_bytes)); let subscription_id = json_request.params.unwrap().to_string(); diff --git a/web3_proxy/src/jsonrpc.rs b/web3_proxy/src/jsonrpc.rs index 0a6435c6..f5a2dde6 100644 --- a/web3_proxy/src/jsonrpc.rs +++ b/web3_proxy/src/jsonrpc.rs @@ -1,3 +1,4 @@ +use crate::frontend::errors::Web3ProxyResult; use derive_more::From; use ethers::prelude::{HttpClientError, ProviderError, WsClientError}; use serde::de::{self, Deserializer, MapAccess, SeqAccess, Visitor}; @@ -240,7 +241,7 @@ impl JsonRpcForwardedResponse { } } - pub fn from_ethers_error(e: ProviderError, id: Box) -> anyhow::Result { + pub fn from_ethers_error(e: ProviderError, id: Box) -> Web3ProxyResult { // TODO: move turning ClientError into json to a helper function? let code; let message: String; @@ -302,7 +303,7 @@ impl JsonRpcForwardedResponse { pub fn try_from_response_result( result: Result, ProviderError>, id: Box, - ) -> anyhow::Result { + ) -> Web3ProxyResult { match result { Ok(response) => Ok(Self::from_response(response, id)), Err(e) => Self::from_ethers_error(e, id), diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index 480f02be..b6c60f2b 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -4,6 +4,7 @@ use super::many::Web3Rpcs; use super::one::Web3Rpc; use super::transactions::TxStatus; use crate::frontend::authorization::Authorization; +use crate::frontend::errors::Web3ProxyResult; use crate::{config::BlockAndRpc, jsonrpc::JsonRpcRequest}; use anyhow::{anyhow, Context}; use derive_more::From; @@ -158,7 +159,7 @@ impl Web3Rpcs { &self, block: Web3ProxyBlock, heaviest_chain: bool, - ) -> anyhow::Result { + ) -> Web3ProxyResult { // TODO: i think we can rearrange this function to make it faster on the hot path let block_hash = block.hash(); @@ -196,7 +197,7 @@ impl Web3Rpcs { authorization: &Arc, hash: &H256, rpc: Option<&Arc>, - ) -> anyhow::Result { + ) -> Web3ProxyResult { // first, try to get the hash from our cache // the cache is set last, so if its here, its everywhere // TODO: use try_get_with @@ -267,7 +268,7 @@ impl Web3Rpcs { &self, authorization: &Arc, num: &U64, - ) -> anyhow::Result<(H256, u64)> { + ) -> Web3ProxyResult<(H256, u64)> { let (block, block_depth) = self.cannonical_block(authorization, num).await?; let hash = *block.hash(); @@ -281,7 +282,7 @@ impl Web3Rpcs { &self, authorization: &Arc, num: &U64, - ) -> anyhow::Result<(Web3ProxyBlock, u64)> { + ) -> Web3ProxyResult<(Web3ProxyBlock, u64)> { // we only have blocks by hash now // maybe save them during save_block in a blocks_by_number Cache> // if theres multiple, use petgraph to find the one on the main chain (and remove the others if they have enough confirmations) diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index a309b2bd..412a26dd 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -7,6 +7,7 @@ use crate::app::{flatten_handle, AnyhowJoinHandle, Web3ProxyApp}; ///! Load balanced communication with a group of web3 providers use crate::config::{BlockAndRpc, TxHashAndRpc, Web3RpcConfig}; use crate::frontend::authorization::{Authorization, RequestMetadata}; +use crate::frontend::errors::{Web3ProxyError, Web3ProxyResult}; use crate::frontend::rpc_proxy_ws::ProxyMode; use crate::jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest}; use crate::rpcs::transactions::TxStatus; @@ -462,7 +463,7 @@ impl Web3Rpcs { params: Option<&serde_json::Value>, error_level: Level, // TODO: remove this box once i figure out how to do the options - ) -> anyhow::Result { + ) -> Web3ProxyResult { // TODO: if only 1 active_request_handles, do self.try_send_request? let responses = active_request_handles @@ -540,7 +541,7 @@ impl Web3Rpcs { // TODO: if we are checking for the consensus head, i don' think we need min_block_needed/max_block_needed min_block_needed: Option<&U64>, max_block_needed: Option<&U64>, - ) -> anyhow::Result { + ) -> Web3ProxyResult { let usable_rpcs_by_tier_and_head_number: BTreeMap<(u64, Option), Vec>> = { let synced_connections = self.watch_consensus_rpcs_sender.borrow().clone(); @@ -569,11 +570,10 @@ impl Web3Rpcs { cmp::Ordering::Greater => { // TODO: force a debug log of the original request to see if our logic is wrong? // TODO: attach the rpc_key_id so we can find the user to ask if they need help - return Err(anyhow::anyhow!( - "Invalid blocks bounds requested. min ({}) > max ({})", - min_block_needed, - max_block_needed - )); + return Err(Web3ProxyError::InvalidBlockBounds { + min: min_block_needed.as_u64(), + max: max_block_needed.as_u64(), + }); } } } @@ -877,7 +877,7 @@ impl Web3Rpcs { request_metadata: Option<&Arc>, min_block_needed: Option<&U64>, max_block_needed: Option<&U64>, - ) -> anyhow::Result { + ) -> Web3ProxyResult { let mut skip_rpcs = vec![]; let mut method_not_available_response = None; @@ -1099,7 +1099,7 @@ impl Web3Rpcs { error_level: Level, max_count: Option, always_include_backups: bool, - ) -> anyhow::Result { + ) -> Web3ProxyResult { let mut watch_consensus_rpcs = self.watch_consensus_rpcs_sender.subscribe(); loop { @@ -1205,7 +1205,7 @@ impl Web3Rpcs { request_metadata: Option<&Arc>, min_block_needed: Option<&U64>, max_block_needed: Option<&U64>, - ) -> anyhow::Result { + ) -> Web3ProxyResult { match authorization.checks.proxy_mode { ProxyMode::Debug | ProxyMode::Best => { self.try_send_best_consensus_head_connection( diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index 93263a54..b7b32fba 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -5,6 +5,7 @@ use super::request::{OpenRequestHandle, OpenRequestResult}; use crate::app::{flatten_handle, AnyhowJoinHandle}; use crate::config::{BlockAndRpc, Web3RpcConfig}; use crate::frontend::authorization::Authorization; +use crate::frontend::errors::{Web3ProxyError, Web3ProxyResult}; use crate::rpcs::request::RequestErrorHandler; use anyhow::{anyhow, Context}; use ethers::prelude::{Bytes, Middleware, ProviderError, TxHash, H256, U64}; @@ -1159,7 +1160,7 @@ impl Web3Rpc { authorization: &'a Arc, max_wait: Option, unlocked_provider: Option>, - ) -> anyhow::Result { + ) -> Web3ProxyResult { let max_wait = max_wait.map(|x| Instant::now() + x); loop { @@ -1181,8 +1182,7 @@ impl Web3Rpc { if let Some(max_wait) = max_wait { if retry_at > max_wait { // break now since we will wait past our maximum wait time - // TODO: don't use anyhow. use specific error type - return Err(anyhow::anyhow!("timeout waiting for request handle")); + return Err(Web3ProxyError::Timeout(None)); } } @@ -1196,7 +1196,7 @@ impl Web3Rpc { let now = Instant::now(); if now > max_wait { - return Err(anyhow::anyhow!("unable to retry for request handle")); + return Err(Web3ProxyError::NoHandleReady); } } @@ -1214,7 +1214,7 @@ impl Web3Rpc { authorization: &Arc, // TODO: borrow on this instead of needing to clone the Arc? unlocked_provider: Option>, - ) -> anyhow::Result { + ) -> Web3ProxyResult { // TODO: think more about this read block // TODO: this should *not* be new_head_client. this should be a separate object if unlocked_provider.is_some() || self.provider.read().await.is_some() {