From 8aacd849554d7d4b5d796105ee2983bb92a61aa1 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Sun, 22 May 2022 18:11:42 +0000 Subject: [PATCH] move error handling into a function --- web3-proxy/src/app.rs | 108 +++++++++----------------------------- web3-proxy/src/jsonrpc.rs | 63 ++++++++++++++++++++++ web3-proxy/src/main.rs | 5 +- 3 files changed, 93 insertions(+), 83 deletions(-) diff --git a/web3-proxy/src/app.rs b/web3-proxy/src/app.rs index 1b0eed8a..5cc9a8c1 100644 --- a/web3-proxy/src/app.rs +++ b/web3-proxy/src/app.rs @@ -1,12 +1,11 @@ use crate::config::Web3ConnectionConfig; use crate::connections::Web3Connections; -use crate::jsonrpc::JsonRpcErrorData; use crate::jsonrpc::JsonRpcForwardedResponse; use crate::jsonrpc::JsonRpcForwardedResponseEnum; use crate::jsonrpc::JsonRpcRequest; use crate::jsonrpc::JsonRpcRequestEnum; use dashmap::DashMap; -use ethers::prelude::{HttpClientError, ProviderError, WsClientError, H256}; +use ethers::prelude::H256; use futures::future::join_all; use linkedhashmap::LinkedHashMap; use parking_lot::RwLock; @@ -44,7 +43,7 @@ pub struct Web3ProxyApp { balanced_rpcs: Arc, /// Send private requests (like eth_sendRawTransaction) to all these servers private_rpcs: Arc, - active_requests: ActiveRequestsMap, + incoming_requests: ActiveRequestsMap, response_cache: ResponseLrcCache, } @@ -115,7 +114,7 @@ impl Web3ProxyApp { Ok(Web3ProxyApp { balanced_rpcs, private_rpcs, - active_requests: Default::default(), + incoming_requests: Default::default(), response_cache: Default::default(), }) } @@ -129,7 +128,7 @@ impl Web3ProxyApp { } pub fn get_active_requests(&self) -> &ActiveRequestsMap { - &self.active_requests + &self.incoming_requests } /// send the request to the approriate RPCs @@ -301,7 +300,7 @@ impl Web3ProxyApp { let (cache_key, response_cache) = match self.get_cached_response(&request) { (cache_key, Ok(response)) => { - let _ = self.active_requests.remove(&cache_key); + let _ = self.incoming_requests.remove(&cache_key); return Ok(response); } @@ -309,27 +308,28 @@ impl Web3ProxyApp { }; // check if this request is already in flight - let (in_flight_tx, in_flight_rx) = watch::channel(true); - let mut other_in_flight_rx = None; - match self.active_requests.entry(cache_key.clone()) { + // TODO: move this logic into an IncomingRequestHandler (ActiveRequestHandler has an rpc, but this won't) + let (incoming_tx, incoming_rx) = watch::channel(true); + let mut other_incoming_rx = None; + match self.incoming_requests.entry(cache_key.clone()) { dashmap::mapref::entry::Entry::Occupied(entry) => { - other_in_flight_rx = Some(entry.get().clone()); + other_incoming_rx = Some(entry.get().clone()); } dashmap::mapref::entry::Entry::Vacant(entry) => { - entry.insert(in_flight_rx); + entry.insert(incoming_rx); } } - if let Some(mut other_in_flight_rx) = other_in_flight_rx { - // wait for the other request to finish. it can finish successfully or with an error + if let Some(mut other_incoming_rx) = other_incoming_rx { + // wait for the other request to finish. it might have finished successfully or with an error trace!("{:?} waiting on in-flight request", request); - let _ = other_in_flight_rx.changed().await; + let _ = other_incoming_rx.changed().await; // now that we've waited, lets check the cache again if let Some(cached) = response_cache.read().get(&cache_key) { - let _ = self.active_requests.remove(&cache_key); - let _ = in_flight_tx.send(false); + let _ = self.incoming_requests.remove(&cache_key); + let _ = incoming_tx.send(false); // TODO: emit a stat trace!( @@ -379,77 +379,21 @@ impl Web3ProxyApp { drop(response_cache); // TODO: needing to remove manually here makes me think we should do this differently - let _ = self.active_requests.remove(&cache_key); - let _ = in_flight_tx.send(false); + let _ = self.incoming_requests.remove(&cache_key); + let _ = incoming_tx.send(false); response } Err(e) => { // send now since we aren't going to cache an error response - let _ = in_flight_tx.send(false); + let _ = incoming_tx.send(false); - // TODO: move this to a helper function? - let code; - let message: String; - let data; - - match e { - ProviderError::JsonRpcClientError(e) => { - // TODO: we should check what type the provider is rather than trying to downcast both types of errors - if let Some(e) = e.downcast_ref::() { - match &*e { - HttpClientError::JsonRpcError(e) => { - code = e.code; - message = e.message.clone(); - data = e.data.clone(); - } - e => { - // TODO: improve this - code = -32603; - message = format!("{}", e); - data = None; - } - } - } else if let Some(e) = e.downcast_ref::() { - match &*e { - WsClientError::JsonRpcError(e) => { - code = e.code; - message = e.message.clone(); - data = e.data.clone(); - } - e => { - // TODO: improve this - code = -32603; - message = format!("{}", e); - data = None; - } - } - } else { - unimplemented!(); - } - } - _ => { - code = -32603; - message = format!("{}", e); - data = None; - } - } - - JsonRpcForwardedResponse { - jsonrpc: "2.0".to_string(), - id: request.id, - result: None, - error: Some(JsonRpcErrorData { - code, - message, - data, - }), - } + JsonRpcForwardedResponse::from_ethers_error(e, request.id) } }; // TODO: needing to remove manually here makes me think we should do this differently - let _ = self.active_requests.remove(&cache_key); + let _ = self.incoming_requests.remove(&cache_key); if response.error.is_some() { trace!("Sending error reply: {:?}", response); @@ -458,7 +402,7 @@ impl Web3ProxyApp { } else { trace!("Sending reply: {:?}", response); - let _ = in_flight_tx.send(false); + let _ = incoming_tx.send(false); } return Ok(response); @@ -468,8 +412,8 @@ impl Web3ProxyApp { warn!("No servers in sync!"); // TODO: needing to remove manually here makes me think we should do this differently - let _ = self.active_requests.remove(&cache_key); - let _ = in_flight_tx.send(false); + let _ = self.incoming_requests.remove(&cache_key); + let _ = incoming_tx.send(false); return Err(anyhow::anyhow!("no servers in sync")); } @@ -482,8 +426,8 @@ impl Web3ProxyApp { sleep(retry_after).await; // TODO: needing to remove manually here makes me think we should do this differently - let _ = self.active_requests.remove(&cache_key); - let _ = in_flight_tx.send(false); + let _ = self.incoming_requests.remove(&cache_key); + let _ = incoming_tx.send(false); continue; } diff --git a/web3-proxy/src/jsonrpc.rs b/web3-proxy/src/jsonrpc.rs index 59802b77..48dfafed 100644 --- a/web3-proxy/src/jsonrpc.rs +++ b/web3-proxy/src/jsonrpc.rs @@ -1,3 +1,4 @@ +use ethers::prelude::{HttpClientError, ProviderError, WsClientError}; use serde::de::{self, Deserialize, Deserializer, MapAccess, SeqAccess, Visitor}; use serde::Serialize; use serde_json::value::RawValue; @@ -160,6 +161,68 @@ impl fmt::Debug for JsonRpcForwardedResponse { } } +impl JsonRpcForwardedResponse { + pub fn from_ethers_error(e: ProviderError, id: Box) -> Self { + // TODO: move turning ClientError into json to a helper function? + let code; + let message: String; + let data; + + match e { + ProviderError::JsonRpcClientError(e) => { + // TODO: we should check what type the provider is rather than trying to downcast both types of errors + if let Some(e) = e.downcast_ref::() { + match &*e { + HttpClientError::JsonRpcError(e) => { + code = e.code; + message = e.message.clone(); + data = e.data.clone(); + } + e => { + // TODO: improve this + code = -32603; + message = format!("{}", e); + data = None; + } + } + } else if let Some(e) = e.downcast_ref::() { + match &*e { + WsClientError::JsonRpcError(e) => { + code = e.code; + message = e.message.clone(); + data = e.data.clone(); + } + e => { + // TODO: improve this + code = -32603; + message = format!("{}", e); + data = None; + } + } + } else { + unimplemented!(); + } + } + _ => { + code = -32603; + message = format!("{}", e); + data = None; + } + } + + Self { + jsonrpc: "2.0".to_string(), + id, + result: None, + error: Some(JsonRpcErrorData { + code, + message, + data, + }), + } + } +} + /// JSONRPC Responses can include one or many response objects. #[derive(Clone, Debug, Serialize)] #[serde(untagged)] diff --git a/web3-proxy/src/main.rs b/web3-proxy/src/main.rs index 4f874088..23db7e1a 100644 --- a/web3-proxy/src/main.rs +++ b/web3-proxy/src/main.rs @@ -19,8 +19,11 @@ use crate::app::Web3ProxyApp; use crate::config::{CliConfig, RpcConfig}; fn main() -> anyhow::Result<()> { + // if RUST_LOG isn't set, configure a default // TODO: is there a better way to do this? - // TODO: what should the default log level be? + if std::env::var("RUST_LOG").is_err() { + std::env::set_var("RUST_LOG", "info,web3_proxy=debug"); + } // install global collector configured based on RUST_LOG env var. tracing_subscriber::fmt()