move error handling into a function

This commit is contained in:
Bryan Stitt 2022-05-22 18:11:42 +00:00
parent 8a2535da74
commit 8aacd84955
3 changed files with 93 additions and 83 deletions

@ -1,12 +1,11 @@
use crate::config::Web3ConnectionConfig;
use crate::connections::Web3Connections;
use crate::jsonrpc::JsonRpcErrorData;
use crate::jsonrpc::JsonRpcForwardedResponse;
use crate::jsonrpc::JsonRpcForwardedResponseEnum;
use crate::jsonrpc::JsonRpcRequest;
use crate::jsonrpc::JsonRpcRequestEnum;
use dashmap::DashMap;
use ethers::prelude::{HttpClientError, ProviderError, WsClientError, H256};
use ethers::prelude::H256;
use futures::future::join_all;
use linkedhashmap::LinkedHashMap;
use parking_lot::RwLock;
@ -44,7 +43,7 @@ pub struct Web3ProxyApp {
balanced_rpcs: Arc<Web3Connections>,
/// Send private requests (like eth_sendRawTransaction) to all these servers
private_rpcs: Arc<Web3Connections>,
active_requests: ActiveRequestsMap,
incoming_requests: ActiveRequestsMap,
response_cache: ResponseLrcCache,
}
@ -115,7 +114,7 @@ impl Web3ProxyApp {
Ok(Web3ProxyApp {
balanced_rpcs,
private_rpcs,
active_requests: Default::default(),
incoming_requests: Default::default(),
response_cache: Default::default(),
})
}
@ -129,7 +128,7 @@ impl Web3ProxyApp {
}
pub fn get_active_requests(&self) -> &ActiveRequestsMap {
&self.active_requests
&self.incoming_requests
}
/// send the request to the approriate RPCs
@ -301,7 +300,7 @@ impl Web3ProxyApp {
let (cache_key, response_cache) = match self.get_cached_response(&request) {
(cache_key, Ok(response)) => {
let _ = self.active_requests.remove(&cache_key);
let _ = self.incoming_requests.remove(&cache_key);
return Ok(response);
}
@ -309,27 +308,28 @@ impl Web3ProxyApp {
};
// check if this request is already in flight
let (in_flight_tx, in_flight_rx) = watch::channel(true);
let mut other_in_flight_rx = None;
match self.active_requests.entry(cache_key.clone()) {
// TODO: move this logic into an IncomingRequestHandler (ActiveRequestHandler has an rpc, but this won't)
let (incoming_tx, incoming_rx) = watch::channel(true);
let mut other_incoming_rx = None;
match self.incoming_requests.entry(cache_key.clone()) {
dashmap::mapref::entry::Entry::Occupied(entry) => {
other_in_flight_rx = Some(entry.get().clone());
other_incoming_rx = Some(entry.get().clone());
}
dashmap::mapref::entry::Entry::Vacant(entry) => {
entry.insert(in_flight_rx);
entry.insert(incoming_rx);
}
}
if let Some(mut other_in_flight_rx) = other_in_flight_rx {
// wait for the other request to finish. it can finish successfully or with an error
if let Some(mut other_incoming_rx) = other_incoming_rx {
// wait for the other request to finish. it might have finished successfully or with an error
trace!("{:?} waiting on in-flight request", request);
let _ = other_in_flight_rx.changed().await;
let _ = other_incoming_rx.changed().await;
// now that we've waited, lets check the cache again
if let Some(cached) = response_cache.read().get(&cache_key) {
let _ = self.active_requests.remove(&cache_key);
let _ = in_flight_tx.send(false);
let _ = self.incoming_requests.remove(&cache_key);
let _ = incoming_tx.send(false);
// TODO: emit a stat
trace!(
@ -379,77 +379,21 @@ impl Web3ProxyApp {
drop(response_cache);
// TODO: needing to remove manually here makes me think we should do this differently
let _ = self.active_requests.remove(&cache_key);
let _ = in_flight_tx.send(false);
let _ = self.incoming_requests.remove(&cache_key);
let _ = incoming_tx.send(false);
response
}
Err(e) => {
// send now since we aren't going to cache an error response
let _ = in_flight_tx.send(false);
let _ = incoming_tx.send(false);
// TODO: move this to a helper function?
let code;
let message: String;
let data;
match e {
ProviderError::JsonRpcClientError(e) => {
// TODO: we should check what type the provider is rather than trying to downcast both types of errors
if let Some(e) = e.downcast_ref::<HttpClientError>() {
match &*e {
HttpClientError::JsonRpcError(e) => {
code = e.code;
message = e.message.clone();
data = e.data.clone();
}
e => {
// TODO: improve this
code = -32603;
message = format!("{}", e);
data = None;
}
}
} else if let Some(e) = e.downcast_ref::<WsClientError>() {
match &*e {
WsClientError::JsonRpcError(e) => {
code = e.code;
message = e.message.clone();
data = e.data.clone();
}
e => {
// TODO: improve this
code = -32603;
message = format!("{}", e);
data = None;
}
}
} else {
unimplemented!();
}
}
_ => {
code = -32603;
message = format!("{}", e);
data = None;
}
}
JsonRpcForwardedResponse {
jsonrpc: "2.0".to_string(),
id: request.id,
result: None,
error: Some(JsonRpcErrorData {
code,
message,
data,
}),
}
JsonRpcForwardedResponse::from_ethers_error(e, request.id)
}
};
// TODO: needing to remove manually here makes me think we should do this differently
let _ = self.active_requests.remove(&cache_key);
let _ = self.incoming_requests.remove(&cache_key);
if response.error.is_some() {
trace!("Sending error reply: {:?}", response);
@ -458,7 +402,7 @@ impl Web3ProxyApp {
} else {
trace!("Sending reply: {:?}", response);
let _ = in_flight_tx.send(false);
let _ = incoming_tx.send(false);
}
return Ok(response);
@ -468,8 +412,8 @@ impl Web3ProxyApp {
warn!("No servers in sync!");
// TODO: needing to remove manually here makes me think we should do this differently
let _ = self.active_requests.remove(&cache_key);
let _ = in_flight_tx.send(false);
let _ = self.incoming_requests.remove(&cache_key);
let _ = incoming_tx.send(false);
return Err(anyhow::anyhow!("no servers in sync"));
}
@ -482,8 +426,8 @@ impl Web3ProxyApp {
sleep(retry_after).await;
// TODO: needing to remove manually here makes me think we should do this differently
let _ = self.active_requests.remove(&cache_key);
let _ = in_flight_tx.send(false);
let _ = self.incoming_requests.remove(&cache_key);
let _ = incoming_tx.send(false);
continue;
}

@ -1,3 +1,4 @@
use ethers::prelude::{HttpClientError, ProviderError, WsClientError};
use serde::de::{self, Deserialize, Deserializer, MapAccess, SeqAccess, Visitor};
use serde::Serialize;
use serde_json::value::RawValue;
@ -160,6 +161,68 @@ impl fmt::Debug for JsonRpcForwardedResponse {
}
}
impl JsonRpcForwardedResponse {
pub fn from_ethers_error(e: ProviderError, id: Box<serde_json::value::RawValue>) -> Self {
// TODO: move turning ClientError into json to a helper function?
let code;
let message: String;
let data;
match e {
ProviderError::JsonRpcClientError(e) => {
// TODO: we should check what type the provider is rather than trying to downcast both types of errors
if let Some(e) = e.downcast_ref::<HttpClientError>() {
match &*e {
HttpClientError::JsonRpcError(e) => {
code = e.code;
message = e.message.clone();
data = e.data.clone();
}
e => {
// TODO: improve this
code = -32603;
message = format!("{}", e);
data = None;
}
}
} else if let Some(e) = e.downcast_ref::<WsClientError>() {
match &*e {
WsClientError::JsonRpcError(e) => {
code = e.code;
message = e.message.clone();
data = e.data.clone();
}
e => {
// TODO: improve this
code = -32603;
message = format!("{}", e);
data = None;
}
}
} else {
unimplemented!();
}
}
_ => {
code = -32603;
message = format!("{}", e);
data = None;
}
}
Self {
jsonrpc: "2.0".to_string(),
id,
result: None,
error: Some(JsonRpcErrorData {
code,
message,
data,
}),
}
}
}
/// JSONRPC Responses can include one or many response objects.
#[derive(Clone, Debug, Serialize)]
#[serde(untagged)]

@ -19,8 +19,11 @@ use crate::app::Web3ProxyApp;
use crate::config::{CliConfig, RpcConfig};
fn main() -> anyhow::Result<()> {
// if RUST_LOG isn't set, configure a default
// TODO: is there a better way to do this?
// TODO: what should the default log level be?
if std::env::var("RUST_LOG").is_err() {
std::env::set_var("RUST_LOG", "info,web3_proxy=debug");
}
// install global collector configured based on RUST_LOG env var.
tracing_subscriber::fmt()