better error handling for proxy_web3_rpc()

This commit is contained in:
Rory Neithinger 2023-03-19 18:52:28 -07:00
parent c32d12b5e0
commit beac7ee017
10 changed files with 175 additions and 53 deletions

@ -18,6 +18,7 @@ use crate::stats::{AppStat, RpcQueryStats, StatBuffer};
use crate::user_token::UserBearerToken;
use anyhow::Context;
use axum::headers::{Origin, Referer, UserAgent};
use axum::http::StatusCode;
use chrono::Utc;
use deferred_rate_limiter::DeferredRateLimiter;
use derive_more::From;
@ -1157,7 +1158,7 @@ impl Web3ProxyApp {
) -> Web3ProxyResult<(JsonRpcForwardedResponse, Vec<Arc<Web3Rpc>>)> {
// trace!("Received request: {:?}", request);
let request_metadata = Arc::new(RequestMetadata::new(request.num_bytes())?);
let request_metadata = Arc::new(RequestMetadata::new(request.num_bytes()));
let mut kafka_stuff = None;
@ -1338,10 +1339,7 @@ impl Web3ProxyApp {
}
None => {
// TODO: what does geth do if this happens?
// TODO: i think we want a 502 so that haproxy retries on another server
return Err(
anyhow::anyhow!("no servers synced. unknown eth_blockNumber").into(),
);
return Err(Web3ProxyError::UnknownBlockNumber);
}
}
}
@ -1429,7 +1427,7 @@ impl Web3ProxyApp {
let head_block_num = head_block_num
.or(self.balanced_rpcs.head_block_num())
.ok_or_else(|| anyhow::anyhow!("no servers synced"))?;
.ok_or_else(|| Web3ProxyError::NoServersSynced)?;
// TODO: error/wait if no head block!
@ -1607,7 +1605,7 @@ impl Web3ProxyApp {
return Ok((
JsonRpcForwardedResponse::from_str(
"invalid request",
None,
Some(StatusCode::BAD_REQUEST.as_u16().into()),
Some(request_id),
),
vec![],
@ -1760,17 +1758,10 @@ impl Web3ProxyApp {
// TODO: only cache the inner response
// TODO: how are we going to stream this?
// TODO: check response size. if its very large, return it in a custom Error type that bypasses caching? or will moka do that for us?
Ok::<_, anyhow::Error>(response)
Ok::<_, Web3ProxyError>(response)
})
.await
// TODO: what is the best way to handle an Arc here?
.map_err(|err| {
// TODO: emit a stat for an error
anyhow::anyhow!(
"error while caching and forwarding response: {}",
err
)
})?
// TODO: add context (error while caching and forwarding response {})
.await?
} else {
self.balanced_rpcs
.try_proxy_connection(

@ -33,7 +33,7 @@ impl Web3ProxyApp {
.context("finding request size")?
.len();
let request_metadata = Arc::new(RequestMetadata::new(request_bytes).unwrap());
let request_metadata = Arc::new(RequestMetadata::new(request_bytes));
let (subscription_abort_handle, subscription_registration) = AbortHandle::new_pair();
@ -67,7 +67,7 @@ impl Web3ProxyApp {
};
// TODO: what should the payload for RequestMetadata be?
let request_metadata = Arc::new(RequestMetadata::new(0).unwrap());
let request_metadata = Arc::new(RequestMetadata::new(0));
// TODO: make a struct for this? using our JsonRpcForwardedResponse won't work because it needs an id
let response_json = json!({
@ -133,7 +133,7 @@ impl Web3ProxyApp {
// TODO: do something with this handle?
tokio::spawn(async move {
while let Some(Ok(new_tx_state)) = pending_tx_receiver.next().await {
let request_metadata = Arc::new(RequestMetadata::new(0).unwrap());
let request_metadata = Arc::new(RequestMetadata::new(0));
let new_tx = match new_tx_state {
TxStatus::Pending(tx) => tx,
@ -208,7 +208,7 @@ impl Web3ProxyApp {
// TODO: do something with this handle?
tokio::spawn(async move {
while let Some(Ok(new_tx_state)) = pending_tx_receiver.next().await {
let request_metadata = Arc::new(RequestMetadata::new(0).unwrap());
let request_metadata = Arc::new(RequestMetadata::new(0));
let new_tx = match new_tx_state {
TxStatus::Pending(tx) => tx,
@ -284,7 +284,7 @@ impl Web3ProxyApp {
// TODO: do something with this handle?
tokio::spawn(async move {
while let Some(Ok(new_tx_state)) = pending_tx_receiver.next().await {
let request_metadata = Arc::new(RequestMetadata::new(0).unwrap());
let request_metadata = Arc::new(RequestMetadata::new(0));
let new_tx = match new_tx_state {
TxStatus::Pending(tx) => tx,

@ -1,4 +1,5 @@
//! Helper functions for turning ether's BlockNumber into numbers and updating incoming queries to match.
use crate::frontend::errors::{Web3ProxyError, Web3ProxyResult};
use anyhow::Context;
use ethers::{
prelude::{BlockNumber, U64},
@ -126,7 +127,7 @@ pub async fn block_needed(
params: Option<&mut serde_json::Value>,
head_block_num: U64,
rpcs: &Web3Rpcs,
) -> anyhow::Result<BlockNeeded> {
) -> Web3ProxyResult<BlockNeeded> {
// some requests have potentially very large responses
// TODO: only skip caching if the response actually is large
if method.starts_with("trace_") || method == "debug_traceTransaction" {
@ -179,7 +180,7 @@ pub async fn block_needed(
// TODO: this shouldn't be a 500. this should be a 400. 500 will make haproxy retry a bunch
let obj = params[0]
.as_object_mut()
.ok_or_else(|| anyhow::anyhow!("invalid format"))?;
.ok_or_else(|| Web3ProxyError::BadRequest("invalid format".to_string()))?;
if obj.contains_key("blockHash") {
return Ok(BlockNeeded::CacheSuccessForever);

@ -88,11 +88,11 @@ pub struct RequestMetadata {
}
impl RequestMetadata {
pub fn new(request_bytes: usize) -> anyhow::Result<Self> {
pub fn new(request_bytes: usize) -> Self {
// TODO: how can we do this without turning it into a string first. this is going to slow us down!
let request_bytes = request_bytes as u64;
let new = Self {
Self {
start_instant: Instant::now(),
request_bytes,
archive_request: false.into(),
@ -102,9 +102,7 @@ impl RequestMetadata {
response_bytes: 0.into(),
response_millis: 0.into(),
response_from_backup_rpc: false.into(),
};
Ok(new)
}
}
}

@ -4,6 +4,7 @@ use super::authorization::Authorization;
use crate::jsonrpc::JsonRpcForwardedResponse;
use std::net::IpAddr;
use std::sync::Arc;
use axum::{
headers,
@ -33,11 +34,18 @@ pub enum Web3ProxyError {
#[error(ignore)]
#[from(ignore)]
BadRequest(String),
SemaphoreAcquireError(AcquireError),
Database(DbErr),
EthersHttpClientError(ethers::prelude::HttpClientError),
EthersProviderError(ethers::prelude::ProviderError),
EthersWsClientError(ethers::prelude::WsClientError),
Headers(headers::Error),
HeaderToString(ToStrError),
InfluxDb2RequestError(influxdb2::RequestError),
#[display(fmt = "{} > {}", min, max)]
InvalidBlockBounds {
min: u64,
max: u64,
},
InvalidHeaderValue(InvalidHeaderValue),
IpAddrParse(AddrParseError),
#[error(ignore)]
@ -45,6 +53,8 @@ pub enum Web3ProxyError {
IpNotAllowed(IpAddr),
JoinError(JoinError),
MsgPackEncode(rmp_serde::encode::Error),
NoServersSynced,
NoHandleReady,
NotFound,
OriginRequired,
#[error(ignore)]
@ -58,16 +68,23 @@ pub enum Web3ProxyError {
#[error(ignore)]
#[from(ignore)]
RefererNotAllowed(headers::Referer),
SeaRc(Arc<Web3ProxyError>),
SemaphoreAcquireError(AcquireError),
SerdeJson(serde_json::Error),
/// simple way to return an error message to the user and an anyhow to our logs
#[display(fmt = "{}, {}, {:?}", _0, _1, _2)]
StatusCode(StatusCode, String, Option<anyhow::Error>),
/// TODO: what should be attached to the timout?
Timeout(tokio::time::error::Elapsed),
#[display(fmt = "{:?}", _0)]
#[error(ignore)]
Timeout(Option<tokio::time::error::Elapsed>),
UlidDecode(ulid::DecodeError),
UnknownBlockNumber,
UnknownKey,
UserAgentRequired,
#[error(ignore)]
UserAgentNotAllowed(headers::UserAgent),
WatchRecvError(tokio::sync::watch::error::RecvError),
}
impl Web3ProxyError {
@ -120,6 +137,39 @@ impl Web3ProxyError {
),
)
}
Self::EthersHttpClientError(err) => {
warn!("EthersHttpClientError err={:?}", err);
(
StatusCode::INTERNAL_SERVER_ERROR,
JsonRpcForwardedResponse::from_str(
"ether http client error",
Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()),
None,
),
)
}
Self::EthersProviderError(err) => {
warn!("EthersProviderError err={:?}", err);
(
StatusCode::INTERNAL_SERVER_ERROR,
JsonRpcForwardedResponse::from_str(
"ether provider error",
Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()),
None,
),
)
}
Self::EthersWsClientError(err) => {
warn!("EthersWsClientError err={:?}", err);
(
StatusCode::INTERNAL_SERVER_ERROR,
JsonRpcForwardedResponse::from_str(
"ether ws client error",
Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()),
None,
),
)
}
Self::Headers(err) => {
warn!("HeadersError {:?}", err);
(
@ -143,6 +193,20 @@ impl Web3ProxyError {
),
)
}
Self::InvalidBlockBounds { min, max } => {
warn!("InvalidBlockBounds min={} max={}", min, max);
(
StatusCode::BAD_REQUEST,
JsonRpcForwardedResponse::from_string(
format!(
"Invalid blocks bounds requested. min ({}) > max ({})",
min, max
),
Some(StatusCode::BAD_REQUEST.as_u16().into()),
None,
),
)
}
Self::IpAddrParse(err) => {
warn!("IpAddrParse err={:?}", err);
(
@ -206,6 +270,28 @@ impl Web3ProxyError {
),
)
}
Self::NoServersSynced => {
warn!("NoServersSynced");
(
StatusCode::INTERNAL_SERVER_ERROR,
JsonRpcForwardedResponse::from_str(
"no servers synced",
Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()),
None,
),
)
}
Self::NoHandleReady => {
error!("NoHandleReady");
(
StatusCode::INTERNAL_SERVER_ERROR,
JsonRpcForwardedResponse::from_str(
"unable to retry for request handle",
Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()),
None,
),
)
}
Self::NotFound => {
// TODO: emit a stat?
// TODO: instead of an error, show a normal html page for 404
@ -305,6 +391,11 @@ impl Web3ProxyError {
),
)
}
Self::SeaRc(err) => match migration::SeaRc::try_unwrap(err) {
Ok(err) => err,
Err(err) => Self::Anyhow(anyhow::anyhow!("{}", err)),
}
.into_response_parts(),
Self::SemaphoreAcquireError(err) => {
warn!("semaphore acquire err={:?}", err);
(
@ -317,6 +408,17 @@ impl Web3ProxyError {
),
)
}
Self::SerdeJson(err) => {
warn!("serde json err={:?}", err);
(
StatusCode::BAD_REQUEST,
JsonRpcForwardedResponse::from_str(
"de/serialization error!",
Some(StatusCode::BAD_REQUEST.as_u16().into()),
None,
),
)
}
Self::StatusCode(status_code, err_msg, err) => {
// different status codes should get different error levels. 500s should warn. 400s should stat
let code = status_code.as_u16();
@ -362,6 +464,17 @@ impl Web3ProxyError {
),
)
}
Self::UnknownBlockNumber => {
error!("UnknownBlockNumber");
(
StatusCode::BAD_GATEWAY,
JsonRpcForwardedResponse::from_str(
"no servers synced. unknown eth_blockNumber",
Some(StatusCode::BAD_GATEWAY.as_u16().into()),
None,
),
)
}
// TODO: stat?
Self::UnknownKey => (
StatusCode::UNAUTHORIZED,
@ -393,10 +506,27 @@ impl Web3ProxyError {
),
)
}
Self::WatchRecvError(err) => {
error!("WatchRecvError err={:?}", err);
(
StatusCode::INTERNAL_SERVER_ERROR,
JsonRpcForwardedResponse::from_str(
"watch recv error!",
Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()),
None,
),
)
}
}
}
}
impl From<tokio::time::error::Elapsed> for Web3ProxyError {
fn from(err: tokio::time::error::Elapsed) -> Self {
Self::Timeout(Some(err))
}
}
impl IntoResponse for Web3ProxyError {
fn into_response(self) -> Response {
// TODO: include the request id in these so that users can give us something that will point to logs

@ -378,7 +378,7 @@ async fn handle_socket_payload(
// TODO: move this logic into the app?
let request_bytes = json_request.num_bytes();
let request_metadata = Arc::new(RequestMetadata::new(request_bytes).unwrap());
let request_metadata = Arc::new(RequestMetadata::new(request_bytes));
let subscription_id = json_request.params.unwrap().to_string();

@ -1,3 +1,4 @@
use crate::frontend::errors::Web3ProxyResult;
use derive_more::From;
use ethers::prelude::{HttpClientError, ProviderError, WsClientError};
use serde::de::{self, Deserializer, MapAccess, SeqAccess, Visitor};
@ -240,7 +241,7 @@ impl JsonRpcForwardedResponse {
}
}
pub fn from_ethers_error(e: ProviderError, id: Box<RawValue>) -> anyhow::Result<Self> {
pub fn from_ethers_error(e: ProviderError, id: Box<RawValue>) -> Web3ProxyResult<Self> {
// TODO: move turning ClientError into json to a helper function?
let code;
let message: String;
@ -302,7 +303,7 @@ impl JsonRpcForwardedResponse {
pub fn try_from_response_result(
result: Result<Box<RawValue>, ProviderError>,
id: Box<RawValue>,
) -> anyhow::Result<Self> {
) -> Web3ProxyResult<Self> {
match result {
Ok(response) => Ok(Self::from_response(response, id)),
Err(e) => Self::from_ethers_error(e, id),

@ -4,6 +4,7 @@ use super::many::Web3Rpcs;
use super::one::Web3Rpc;
use super::transactions::TxStatus;
use crate::frontend::authorization::Authorization;
use crate::frontend::errors::Web3ProxyResult;
use crate::{config::BlockAndRpc, jsonrpc::JsonRpcRequest};
use anyhow::{anyhow, Context};
use derive_more::From;
@ -158,7 +159,7 @@ impl Web3Rpcs {
&self,
block: Web3ProxyBlock,
heaviest_chain: bool,
) -> anyhow::Result<Web3ProxyBlock> {
) -> Web3ProxyResult<Web3ProxyBlock> {
// TODO: i think we can rearrange this function to make it faster on the hot path
let block_hash = block.hash();
@ -196,7 +197,7 @@ impl Web3Rpcs {
authorization: &Arc<Authorization>,
hash: &H256,
rpc: Option<&Arc<Web3Rpc>>,
) -> anyhow::Result<Web3ProxyBlock> {
) -> Web3ProxyResult<Web3ProxyBlock> {
// first, try to get the hash from our cache
// the cache is set last, so if its here, its everywhere
// TODO: use try_get_with
@ -267,7 +268,7 @@ impl Web3Rpcs {
&self,
authorization: &Arc<Authorization>,
num: &U64,
) -> anyhow::Result<(H256, u64)> {
) -> Web3ProxyResult<(H256, u64)> {
let (block, block_depth) = self.cannonical_block(authorization, num).await?;
let hash = *block.hash();
@ -281,7 +282,7 @@ impl Web3Rpcs {
&self,
authorization: &Arc<Authorization>,
num: &U64,
) -> anyhow::Result<(Web3ProxyBlock, u64)> {
) -> Web3ProxyResult<(Web3ProxyBlock, u64)> {
// we only have blocks by hash now
// maybe save them during save_block in a blocks_by_number Cache<U64, Vec<ArcBlock>>
// if theres multiple, use petgraph to find the one on the main chain (and remove the others if they have enough confirmations)

@ -7,6 +7,7 @@ use crate::app::{flatten_handle, AnyhowJoinHandle, Web3ProxyApp};
///! Load balanced communication with a group of web3 providers
use crate::config::{BlockAndRpc, TxHashAndRpc, Web3RpcConfig};
use crate::frontend::authorization::{Authorization, RequestMetadata};
use crate::frontend::errors::{Web3ProxyError, Web3ProxyResult};
use crate::frontend::rpc_proxy_ws::ProxyMode;
use crate::jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest};
use crate::rpcs::transactions::TxStatus;
@ -462,7 +463,7 @@ impl Web3Rpcs {
params: Option<&serde_json::Value>,
error_level: Level,
// TODO: remove this box once i figure out how to do the options
) -> anyhow::Result<JsonRpcForwardedResponse> {
) -> Web3ProxyResult<JsonRpcForwardedResponse> {
// TODO: if only 1 active_request_handles, do self.try_send_request?
let responses = active_request_handles
@ -540,7 +541,7 @@ impl Web3Rpcs {
// TODO: if we are checking for the consensus head, i don' think we need min_block_needed/max_block_needed
min_block_needed: Option<&U64>,
max_block_needed: Option<&U64>,
) -> anyhow::Result<OpenRequestResult> {
) -> Web3ProxyResult<OpenRequestResult> {
let usable_rpcs_by_tier_and_head_number: BTreeMap<(u64, Option<U64>), Vec<Arc<Web3Rpc>>> = {
let synced_connections = self.watch_consensus_rpcs_sender.borrow().clone();
@ -569,11 +570,10 @@ impl Web3Rpcs {
cmp::Ordering::Greater => {
// TODO: force a debug log of the original request to see if our logic is wrong?
// TODO: attach the rpc_key_id so we can find the user to ask if they need help
return Err(anyhow::anyhow!(
"Invalid blocks bounds requested. min ({}) > max ({})",
min_block_needed,
max_block_needed
));
return Err(Web3ProxyError::InvalidBlockBounds {
min: min_block_needed.as_u64(),
max: max_block_needed.as_u64(),
});
}
}
}
@ -877,7 +877,7 @@ impl Web3Rpcs {
request_metadata: Option<&Arc<RequestMetadata>>,
min_block_needed: Option<&U64>,
max_block_needed: Option<&U64>,
) -> anyhow::Result<JsonRpcForwardedResponse> {
) -> Web3ProxyResult<JsonRpcForwardedResponse> {
let mut skip_rpcs = vec![];
let mut method_not_available_response = None;
@ -1099,7 +1099,7 @@ impl Web3Rpcs {
error_level: Level,
max_count: Option<usize>,
always_include_backups: bool,
) -> anyhow::Result<JsonRpcForwardedResponse> {
) -> Web3ProxyResult<JsonRpcForwardedResponse> {
let mut watch_consensus_rpcs = self.watch_consensus_rpcs_sender.subscribe();
loop {
@ -1205,7 +1205,7 @@ impl Web3Rpcs {
request_metadata: Option<&Arc<RequestMetadata>>,
min_block_needed: Option<&U64>,
max_block_needed: Option<&U64>,
) -> anyhow::Result<JsonRpcForwardedResponse> {
) -> Web3ProxyResult<JsonRpcForwardedResponse> {
match authorization.checks.proxy_mode {
ProxyMode::Debug | ProxyMode::Best => {
self.try_send_best_consensus_head_connection(

@ -5,6 +5,7 @@ use super::request::{OpenRequestHandle, OpenRequestResult};
use crate::app::{flatten_handle, AnyhowJoinHandle};
use crate::config::{BlockAndRpc, Web3RpcConfig};
use crate::frontend::authorization::Authorization;
use crate::frontend::errors::{Web3ProxyError, Web3ProxyResult};
use crate::rpcs::request::RequestErrorHandler;
use anyhow::{anyhow, Context};
use ethers::prelude::{Bytes, Middleware, ProviderError, TxHash, H256, U64};
@ -1159,7 +1160,7 @@ impl Web3Rpc {
authorization: &'a Arc<Authorization>,
max_wait: Option<Duration>,
unlocked_provider: Option<Arc<Web3Provider>>,
) -> anyhow::Result<OpenRequestHandle> {
) -> Web3ProxyResult<OpenRequestHandle> {
let max_wait = max_wait.map(|x| Instant::now() + x);
loop {
@ -1181,8 +1182,7 @@ impl Web3Rpc {
if let Some(max_wait) = max_wait {
if retry_at > max_wait {
// break now since we will wait past our maximum wait time
// TODO: don't use anyhow. use specific error type
return Err(anyhow::anyhow!("timeout waiting for request handle"));
return Err(Web3ProxyError::Timeout(None));
}
}
@ -1196,7 +1196,7 @@ impl Web3Rpc {
let now = Instant::now();
if now > max_wait {
return Err(anyhow::anyhow!("unable to retry for request handle"));
return Err(Web3ProxyError::NoHandleReady);
}
}
@ -1214,7 +1214,7 @@ impl Web3Rpc {
authorization: &Arc<Authorization>,
// TODO: borrow on this instead of needing to clone the Arc?
unlocked_provider: Option<Arc<Web3Provider>>,
) -> anyhow::Result<OpenRequestResult> {
) -> Web3ProxyResult<OpenRequestResult> {
// TODO: think more about this read block
// TODO: this should *not* be new_head_client. this should be a separate object
if unlocked_provider.is_some() || self.provider.read().await.is_some() {