diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 2b8a6a4b..6215463b 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -4,7 +4,7 @@ mod ws; use crate::block_number::{block_needed, BlockNeeded}; use crate::config::{AppConfig, TopConfig}; use crate::frontend::authorization::{Authorization, RequestMetadata, RpcSecretKey}; -use crate::frontend::errors::{Web3ProxyError, Web3ProxyResult}; +use crate::frontend::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult}; use crate::frontend::rpc_proxy_ws::ProxyMode; use crate::jsonrpc::{ JsonRpcForwardedResponse, JsonRpcForwardedResponseEnum, JsonRpcRequest, JsonRpcRequestEnum, @@ -1089,12 +1089,11 @@ impl Web3ProxyApp { // TODO: improve flattening // get the head block now so that any requests that need it all use the same block - // TODO: Web3ProxyError that handles "no servers synced" in a consistent way // TODO: this still has an edge condition if there is a reorg in the middle of the request!!! let head_block_num = self .balanced_rpcs .head_block_num() - .context(anyhow::anyhow!("no servers synced"))?; + .ok_or(Web3ProxyError::NoServersSynced)?; let responses = join_all( requests @@ -1366,7 +1365,7 @@ impl Web3ProxyApp { let mut gas_estimate: U256 = if let Some(gas_estimate) = response.result.take() { serde_json::from_str(gas_estimate.get()) - .context("gas estimate result is not an U256")? + .or(Err(Web3ProxyError::GasEstimateNotU256))? } else { // i think this is always an error response let rpcs = request_metadata.backend_requests.lock().clone(); @@ -1456,15 +1455,15 @@ impl Web3ProxyApp { { let params = request .params - .context("there must be params if we got this far")?; + .web3_context("there must be params if we got this far")?; let params = params .as_array() - .context("there must be an array if we got this far")? + .web3_context("there must be an array if we got this far")? .get(0) - .context("there must be an item if we got this far")? + .web3_context("there must be an item if we got this far")? .as_str() - .context("there must be a string if we got this far")?; + .web3_context("there must be a string if we got this far")?; let params = Bytes::from_str(params) .expect("there must be Bytes if we got this far"); @@ -1586,7 +1585,8 @@ impl Web3ProxyApp { let param = Bytes::from_str( params[0] .as_str() - .context("parsing params 0 into str then bytes")?, + .ok_or(Web3ProxyError::ParseBytesError(None)) + .web3_context("parsing params 0 into str then bytes")?, ) .map_err(|x| { trace!("bad request: {:?}", x); @@ -1635,7 +1635,7 @@ impl Web3ProxyApp { // TODO: if no servers synced, wait for them to be synced? probably better to error and let haproxy retry another server let head_block_num = head_block_num .or(self.balanced_rpcs.head_block_num()) - .context("no servers synced")?; + .ok_or(Web3ProxyError::NoServersSynced)?; // we do this check before checking caches because it might modify the request params // TODO: add a stat for archive vs full since they should probably cost different @@ -1793,7 +1793,7 @@ impl Web3ProxyApp { stat_sender .send_async(response_stat.into()) .await - .context("stat_sender sending response_stat")?; + .map_err(Web3ProxyError::SendAppStatError)?; } return Ok((response, rpcs)); @@ -1816,7 +1816,7 @@ impl Web3ProxyApp { stat_sender .send_async(response_stat.into()) .await - .context("stat_sender sending response stat")?; + .map_err(Web3ProxyError::SendAppStatError)?; } if let Some((kafka_topic, kafka_key, kafka_headers)) = kafka_stuff { @@ -1826,7 +1826,7 @@ impl Web3ProxyApp { .expect("if headers are set, producer must exist"); let response_bytes = - rmp_serde::to_vec(&response).context("failed msgpack serialize response")?; + rmp_serde::to_vec(&response).web3_context("failed msgpack serialize response")?; let f = async move { let produce_future = kafka_producer.send( diff --git a/web3_proxy/src/app/ws.rs b/web3_proxy/src/app/ws.rs index 2366ab31..cc36e9d6 100644 --- a/web3_proxy/src/app/ws.rs +++ b/web3_proxy/src/app/ws.rs @@ -2,12 +2,11 @@ use super::Web3ProxyApp; use crate::frontend::authorization::{Authorization, RequestMetadata}; -use crate::frontend::errors::Web3ProxyResult; +use crate::frontend::errors::{Web3ProxyErrorContext, Web3ProxyResult}; use crate::jsonrpc::JsonRpcForwardedResponse; use crate::jsonrpc::JsonRpcRequest; use crate::rpcs::transactions::TxStatus; use crate::stats::RpcQueryStats; -use anyhow::Context; use axum::extract::ws::Message; use ethers::prelude::U64; use futures::future::AbortHandle; @@ -31,7 +30,7 @@ impl Web3ProxyApp { ) -> Web3ProxyResult<(AbortHandle, JsonRpcForwardedResponse)> { // TODO: this is not efficient let request_bytes = serde_json::to_string(&request_json) - .context("finding request size")? + .web3_context("finding request size")? .len(); let request_metadata = Arc::new(RequestMetadata::new(request_bytes)); diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index 03e80ef8..3fac5717 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -778,7 +778,7 @@ impl Web3ProxyApp { }) .await; - authorization_checks.map_err(Web3ProxyError::SeaRc) + authorization_checks.map_err(Web3ProxyError::Arc) } /// Authorized the ip/origin/referer/useragent and rate limit and concurrency diff --git a/web3_proxy/src/frontend/errors.rs b/web3_proxy/src/frontend/errors.rs index 2546ae14..4e43fbf8 100644 --- a/web3_proxy/src/frontend/errors.rs +++ b/web3_proxy/src/frontend/errors.rs @@ -31,6 +31,7 @@ pub enum Web3ProxyError { AccessDenied, #[error(ignore)] Anyhow(anyhow::Error), + Arc(Arc), #[error(ignore)] #[from(ignore)] BadRequest(String), @@ -41,6 +42,7 @@ pub enum Web3ProxyError { EthersHttpClientError(ethers::prelude::HttpClientError), EthersProviderError(ethers::prelude::ProviderError), EthersWsClientError(ethers::prelude::WsClientError), + GasEstimateNotU256, Headers(headers::Error), HeaderToString(ToStrError), InfluxDb2RequestError(influxdb2::RequestError), @@ -70,7 +72,9 @@ pub enum Web3ProxyError { #[error(ignore)] #[from(ignore)] OriginNotAllowed(headers::Origin), - ParseBytesError(ethers::types::ParseBytesError), + #[display(fmt = "{:?}", _0)] + #[error(ignore)] + ParseBytesError(Option), ParseMsgError(siwe::ParseError), ParseAddressError, #[display(fmt = "{:?}, {:?}", _0, _1)] @@ -81,8 +85,8 @@ pub enum Web3ProxyError { #[error(ignore)] #[from(ignore)] RefererNotAllowed(headers::Referer), - SeaRc(Arc), SemaphoreAcquireError(AcquireError), + SendAppStatError(flume::SendError), SerdeJson(serde_json::Error), /// simple way to return an error message to the user and an anyhow to our logs #[display(fmt = "{}, {}, {:?}", _0, _1, _2)] @@ -217,6 +221,17 @@ impl Web3ProxyError { ), ) } + Self::GasEstimateNotU256 => { + warn!("GasEstimateNotU256"); + ( + StatusCode::BAD_REQUEST, + JsonRpcForwardedResponse::from_str( + "gas estimate result is not an U256", + Some(StatusCode::BAD_REQUEST.as_u16().into()), + None, + ), + ) + } Self::Headers(err) => { warn!("HeadersError {:?}", err); ( @@ -548,7 +563,7 @@ impl Web3ProxyError { ), ) } - Self::SeaRc(err) => match migration::SeaRc::try_unwrap(err) { + Self::Arc(err) => match Arc::try_unwrap(err) { Ok(err) => err, Err(err) => Self::Anyhow(anyhow::anyhow!("{}", err)), } @@ -565,6 +580,17 @@ impl Web3ProxyError { ), ) } + Self::SendAppStatError(err) => { + error!("SendAppStatError err={:?}", err); + ( + StatusCode::INTERNAL_SERVER_ERROR, + JsonRpcForwardedResponse::from_str( + "error stat_sender sending response_stat", + Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), + None, + ), + ) + } Self::SerdeJson(err) => { warn!("serde json err={:?}", err); ( @@ -725,6 +751,12 @@ impl Web3ProxyError { } } +impl From for Web3ProxyError { + fn from(err: ethers::types::ParseBytesError) -> Self { + Self::ParseBytesError(Some(err)) + } +} + impl From for Web3ProxyError { fn from(err: tokio::time::error::Elapsed) -> Self { Self::Timeout(Some(err))