From fe2a760c8a405933c3c61526c3541166fbf33c73 Mon Sep 17 00:00:00 2001 From: Rory Neithinger Date: Mon, 20 Mar 2023 13:45:21 -0700 Subject: [PATCH] more conversions to Web3ProxyError --- web3_proxy/src/app/ws.rs | 4 +- web3_proxy/src/frontend/errors.rs | 77 +++++++++++++++++++++++++++++-- web3_proxy/src/rpcs/blockchain.rs | 46 ++++++++++-------- web3_proxy/src/rpcs/consensus.rs | 32 ++++++------- 4 files changed, 117 insertions(+), 42 deletions(-) diff --git a/web3_proxy/src/app/ws.rs b/web3_proxy/src/app/ws.rs index cc36e9d6..8f42efe6 100644 --- a/web3_proxy/src/app/ws.rs +++ b/web3_proxy/src/app/ws.rs @@ -2,7 +2,7 @@ use super::Web3ProxyApp; use crate::frontend::authorization::{Authorization, RequestMetadata}; -use crate::frontend::errors::{Web3ProxyErrorContext, Web3ProxyResult}; +use crate::frontend::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult}; use crate::jsonrpc::JsonRpcForwardedResponse; use crate::jsonrpc::JsonRpcRequest; use crate::rpcs::transactions::TxStatus; @@ -341,7 +341,7 @@ impl Web3ProxyApp { ); }); } - _ => return Err(anyhow::anyhow!("unimplemented").into()), + _ => return Err(Web3ProxyError::NotImplemented), } // TODO: do something with subscription_join_handle? diff --git a/web3_proxy/src/frontend/errors.rs b/web3_proxy/src/frontend/errors.rs index 4e43fbf8..a7c01bf4 100644 --- a/web3_proxy/src/frontend/errors.rs +++ b/web3_proxy/src/frontend/errors.rs @@ -42,11 +42,13 @@ pub enum Web3ProxyError { EthersHttpClientError(ethers::prelude::HttpClientError), EthersProviderError(ethers::prelude::ProviderError), EthersWsClientError(ethers::prelude::WsClientError), + FlumeRecvError(flume::RecvError), GasEstimateNotU256, Headers(headers::Error), HeaderToString(ToStrError), InfluxDb2RequestError(influxdb2::RequestError), #[display(fmt = "{} > {}", min, max)] + #[from(ignore)] InvalidBlockBounds { min: u64, max: u64, @@ -64,8 +66,16 @@ pub enum Web3ProxyError { IpNotAllowed(IpAddr), JoinError(JoinError), MsgPackEncode(rmp_serde::encode::Error), - NoServersSynced, + NoBlockNumberOrHash, + NoBlocksKnown, NoHandleReady, + NoServersSynced, + #[display(fmt = "{}/{}", num_known, min_head_rpcs)] + #[from(ignore)] + NotEnoughRpcs { + num_known: usize, + min_head_rpcs: usize, + }, NotFound, NotImplemented, OriginRequired, @@ -104,6 +114,7 @@ pub enum Web3ProxyError { UserIdZero, VerificationError(siwe::VerificationError), WatchRecvError(tokio::sync::watch::error::RecvError), + WatchSendError, WebsocketOnly, #[display(fmt = "{:?}, {}", _0, _1)] #[error(ignore)] @@ -221,6 +232,17 @@ impl Web3ProxyError { ), ) } + Self::FlumeRecvError(err) => { + warn!("FlumeRecvError err={:#?}", err); + ( + StatusCode::INTERNAL_SERVER_ERROR, + JsonRpcForwardedResponse::from_str( + "flume recv error!", + Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), + None, + ), + ) + } Self::GasEstimateNotU256 => { warn!("GasEstimateNotU256"); ( @@ -398,12 +420,23 @@ impl Web3ProxyError { ), ) } - Self::NoServersSynced => { - warn!("NoServersSynced"); + Self::NoBlockNumberOrHash => { + warn!("NoBlockNumberOrHash"); + ( + StatusCode::BAD_REQUEST, + JsonRpcForwardedResponse::from_str( + "Blocks here must have a number or hash", + Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), + None, + ), + ) + } + Self::NoBlocksKnown => { + error!("NoBlocksKnown"); ( StatusCode::INTERNAL_SERVER_ERROR, JsonRpcForwardedResponse::from_str( - "no servers synced", + "no blocks known", Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), None, ), @@ -420,6 +453,31 @@ impl Web3ProxyError { ), ) } + Self::NoServersSynced => { + warn!("NoServersSynced"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + JsonRpcForwardedResponse::from_str( + "no servers synced", + Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), + None, + ), + ) + } + Self::NotEnoughRpcs { + num_known, + min_head_rpcs, + } => { + error!("NotEnoughRpcs {}/{}", num_known, min_head_rpcs); + ( + StatusCode::INTERNAL_SERVER_ERROR, + JsonRpcForwardedResponse::from_string( + format!("not enough rpcs connected {}/{}", num_known, min_head_rpcs), + Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), + None, + ), + ) + } Self::NotFound => { // TODO: emit a stat? // TODO: instead of an error, show a normal html page for 404 @@ -722,6 +780,17 @@ impl Web3ProxyError { ), ) } + Self::WatchSendError => { + error!("WatchSendError"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + JsonRpcForwardedResponse::from_str( + "watch send error!", + Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), + None, + ), + ) + } Self::WebsocketOnly => { warn!("WebsocketOnly"); ( diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index b6c60f2b..6cbfd9ae 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -4,9 +4,8 @@ use super::many::Web3Rpcs; use super::one::Web3Rpc; use super::transactions::TxStatus; use crate::frontend::authorization::Authorization; -use crate::frontend::errors::Web3ProxyResult; +use crate::frontend::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult}; use crate::{config::BlockAndRpc, jsonrpc::JsonRpcRequest}; -use anyhow::{anyhow, Context}; use derive_more::From; use ethers::prelude::{Block, TxHash, H256, U64}; use log::{debug, trace, warn, Level}; @@ -125,11 +124,11 @@ impl Web3ProxyBlock { } impl TryFrom for Web3ProxyBlock { - type Error = anyhow::Error; + type Error = Web3ProxyError; fn try_from(x: ArcBlock) -> Result { if x.number.is_none() || x.hash.is_none() { - return Err(anyhow!("Blocks here must have a number of hash")); + return Err(Web3ProxyError::NoBlockNumberOrHash); } let b = Web3ProxyBlock { @@ -191,7 +190,7 @@ impl Web3Rpcs { /// Get a block from caches with fallback. /// Will query a specific node or the best available. - /// TODO: return anyhow::Result>? + /// TODO: return Web3ProxyResult>? pub async fn block( &self, authorization: &Arc, @@ -226,7 +225,7 @@ impl Web3Rpcs { x.try_into().ok() } }) - .context("no block!")?, + .web3_context("no block!")?, None => { // TODO: helper for method+params => JsonRpcRequest // TODO: does this id matter? @@ -245,11 +244,11 @@ impl Web3Rpcs { ) .await?; - let block = response.result.context("failed fetching block")?; + let block = response.result.web3_context("failed fetching block")?; let block: Option = serde_json::from_str(block.get())?; - let block: ArcBlock = block.context("no block in the response")?; + let block: ArcBlock = block.web3_context("no block in the response")?; // TODO: received time is going to be weird Web3ProxyBlock::try_from(block)? @@ -290,7 +289,7 @@ impl Web3Rpcs { let mut consensus_head_receiver = self .watch_consensus_head_sender .as_ref() - .context("need new head subscriptions to fetch cannonical_block")? + .web3_context("need new head subscriptions to fetch cannonical_block")? .subscribe(); // be sure the requested block num exists @@ -298,7 +297,7 @@ impl Web3Rpcs { let mut head_block_num = *consensus_head_receiver .borrow_and_update() .as_ref() - .context("no consensus head block")? + .web3_context("no consensus head block")? .number(); loop { @@ -342,7 +341,7 @@ impl Web3Rpcs { debug!("could not find canonical block {}: {:?}", num, err); } - let raw_block = response.result.context("no cannonical block result")?; + let raw_block = response.result.web3_context("no cannonical block result")?; let block: ArcBlock = serde_json::from_str(raw_block.get())?; @@ -400,12 +399,12 @@ impl Web3Rpcs { new_block: Option, rpc: Arc, _pending_tx_sender: &Option>, - ) -> anyhow::Result<()> { + ) -> Web3ProxyResult<()> { // TODO: how should we handle an error here? if !consensus_finder .update_rpc(new_block.clone(), rpc.clone(), self) .await - .context("failed to update rpc")? + .web3_context("failed to update rpc")? { // nothing changed. no need to scan for a new consensus head return Ok(()); @@ -414,7 +413,7 @@ impl Web3Rpcs { let new_consensus = consensus_finder .best_consensus_connections(authorization, self) .await - .context("no consensus head block!") + .web3_context("no consensus head block!") .map_err(|err| { self.watch_consensus_rpcs_sender.send_replace(None); @@ -473,7 +472,8 @@ impl Web3Rpcs { watch_consensus_head_sender .send(Some(consensus_head_block)) - .context( + .or(Err(Web3ProxyError::WatchSendError)) + .web3_context( "watch_consensus_head_sender failed sending first consensus_head_block", )?; } @@ -529,11 +529,12 @@ impl Web3Rpcs { let consensus_head_block = self .try_cache_block(consensus_head_block, true) .await - .context("save consensus_head_block as heaviest chain")?; + .web3_context("save consensus_head_block as heaviest chain")?; watch_consensus_head_sender .send(Some(consensus_head_block)) - .context("watch_consensus_head_sender failed sending uncled consensus_head_block")?; + .or(Err(Web3ProxyError::WatchSendError)) + .web3_context("watch_consensus_head_sender failed sending uncled consensus_head_block")?; } } Ordering::Less => { @@ -563,11 +564,14 @@ impl Web3Rpcs { let consensus_head_block = self .try_cache_block(consensus_head_block, true) .await - .context("save_block sending consensus_head_block as heaviest chain")?; + .web3_context( + "save_block sending consensus_head_block as heaviest chain", + )?; watch_consensus_head_sender .send(Some(consensus_head_block)) - .context("watch_consensus_head_sender failed sending rollback consensus_head_block")?; + .or(Err(Web3ProxyError::WatchSendError)) + .web3_context("watch_consensus_head_sender failed sending rollback consensus_head_block")?; } Ordering::Greater => { debug!( @@ -592,7 +596,9 @@ impl Web3Rpcs { let consensus_head_block = self.try_cache_block(consensus_head_block, true).await?; - watch_consensus_head_sender.send(Some(consensus_head_block)).context("watch_consensus_head_sender failed sending new consensus_head_block")?; + watch_consensus_head_sender.send(Some(consensus_head_block)) + .or(Err(Web3ProxyError::WatchSendError)) + .web3_context("watch_consensus_head_sender failed sending new consensus_head_block")?; } } } diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index 373a1dd8..b9666db8 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -2,7 +2,7 @@ use super::blockchain::Web3ProxyBlock; use super::many::Web3Rpcs; use super::one::Web3Rpc; use crate::frontend::authorization::Authorization; -use anyhow::Context; +use crate::frontend::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult}; use ethers::prelude::{H256, U64}; use hashbrown::{HashMap, HashSet}; use log::{debug, trace, warn}; @@ -169,9 +169,9 @@ impl ConnectionsGroup { web3_rpcs: &Web3Rpcs, min_consensus_block_num: Option, tier: &u64, - ) -> anyhow::Result { + ) -> Web3ProxyResult { let mut maybe_head_block = match self.highest_block.clone() { - None => return Err(anyhow::anyhow!("no blocks known")), + None => return Err(Web3ProxyError::NoBlocksKnown), Some(x) => x, }; @@ -196,11 +196,10 @@ impl ConnectionsGroup { let num_known = self.rpc_to_block.len(); if num_known < web3_rpcs.min_head_rpcs { - return Err(anyhow::anyhow!( - "not enough rpcs connected: {}/{}", + return Err(Web3ProxyError::NotEnoughRpcs { num_known, - web3_rpcs.min_head_rpcs, - )); + min_head_rpcs: web3_rpcs.min_head_rpcs, + }); } let mut primary_rpcs_voted: Option = None; @@ -256,7 +255,7 @@ impl ConnectionsGroup { warn!("connection missing: {}", rpc_name); debug!("web3_rpcs.by_name: {:#?}", web3_rpcs.by_name); } else { - return Err(anyhow::anyhow!("not synced")); + return Err(Web3ProxyError::NoServersSynced); } } } @@ -309,7 +308,7 @@ impl ConnectionsGroup { warn!("{}", err_msg); break; } else { - return Err(anyhow::anyhow!(err_msg)); + return Err(anyhow::anyhow!(err_msg).into()); } } } @@ -334,7 +333,8 @@ impl ConnectionsGroup { primary_sum_soft_limit, web3_rpcs.min_sum_soft_limit, soft_limit_percent, - )); + ) + .into()); } // success! this block has enough soft limit and nodes on it (or on later blocks) @@ -462,7 +462,7 @@ impl ConsensusFinder { rpc: Arc, // we need this so we can save the block to caches. i don't like it though. maybe we should use a lazy_static Cache wrapper that has a "save_block" method?. i generally dislike globals but i also dislike all the types having to pass eachother around web3_connections: &Web3Rpcs, - ) -> anyhow::Result { + ) -> Web3ProxyResult { // add the rpc's block to connection_heads, or remove the rpc from connection_heads let changed = match rpc_head_block { Some(mut rpc_head_block) => { @@ -470,7 +470,7 @@ impl ConsensusFinder { rpc_head_block = web3_connections .try_cache_block(rpc_head_block, false) .await - .context("failed caching block")?; + .web3_context("failed caching block")?; // if let Some(max_block_lag) = max_block_lag { // if rpc_head_block.number() < ??? { @@ -509,14 +509,14 @@ impl ConsensusFinder { &mut self, authorization: &Arc, web3_connections: &Web3Rpcs, - ) -> anyhow::Result { + ) -> Web3ProxyResult { // TODO: attach context to these? let highest_known_block = self .all_rpcs_group() - .context("no rpcs")? + .web3_context("no rpcs")? .highest_block .as_ref() - .context("no highest block")?; + .web3_context("no highest block")?; trace!("highest_known_block: {}", highest_known_block); @@ -545,7 +545,7 @@ impl ConsensusFinder { } } - return Err(anyhow::anyhow!("failed finding consensus on all tiers")); + return Err(anyhow::anyhow!("failed finding consensus on all tiers").into()); } }