better error handling

This commit is contained in:
Bryan Stitt 2022-04-26 07:10:13 +00:00
parent 710cef5da3
commit 234f7a6f70

@ -465,7 +465,8 @@ impl Web3ProxyState {
match private_rpcs.get_upstream_servers().await {
Ok(upstream_servers) => {
let (tx, mut rx) = mpsc::unbounded_channel::<serde_json::Value>();
let (tx, mut rx) =
mpsc::unbounded_channel::<anyhow::Result<serde_json::Value>>();
let clone = self.clone();
let connections = private_rpcs.connections.clone();
@ -480,9 +481,11 @@ impl Web3ProxyState {
let response = rx
.recv()
.await
.ok_or_else(|| anyhow::anyhow!("no response"))?;
.ok_or_else(|| anyhow::anyhow!("no successful response"))?;
return Ok(warp::reply::json(&response));
if let Ok(response) = response {
return Ok(warp::reply::json(&response));
}
}
Err(not_until) => {
// TODO: move this to a helper function
@ -500,6 +503,7 @@ impl Web3ProxyState {
}
} else {
// this is not a private transaction (or no private relays are configured)
// try to send to each tier, stopping at the first success
loop {
let read_lock = self.balanced_rpc_ratelimiter_lock.read().await;
@ -509,7 +513,8 @@ impl Web3ProxyState {
for balanced_rpcs in self.balanced_rpc_tiers.iter() {
match balanced_rpcs.next_upstream_server().await {
Ok(upstream_server) => {
let (tx, mut rx) = mpsc::unbounded_channel::<serde_json::Value>();
let (tx, mut rx) =
mpsc::unbounded_channel::<anyhow::Result<serde_json::Value>>();
let clone = self.clone();
let connections = balanced_rpcs.connections.clone();
@ -529,9 +534,11 @@ impl Web3ProxyState {
let response = rx
.recv()
.await
.ok_or_else(|| anyhow::anyhow!("no response"))?;
.ok_or_else(|| anyhow::anyhow!("no successful response"))?;
return Ok(warp::reply::json(&response));
if let Ok(response) = response {
return Ok(warp::reply::json(&response));
}
}
Err(not_until) => {
// save the smallest not_until. if nothing succeeds, return an Err with not_until in it
@ -557,7 +564,12 @@ impl Web3ProxyState {
let write_lock = self.balanced_rpc_ratelimiter_lock.write().await;
// unwrap should be safe since we would have returned if it wasn't set
let deadline = earliest_not_until.unwrap().wait_time_from(self.clock.now());
let deadline = if let Some(earliest_not_until) = earliest_not_until {
earliest_not_until.wait_time_from(self.clock.now())
} else {
// TODO: exponential backoff?
Duration::from_secs(1)
};
sleep(deadline).await;
@ -571,7 +583,7 @@ impl Web3ProxyState {
rpc_servers: Vec<String>,
connections: Arc<ConnectionsMap>,
json_request_body: serde_json::Value,
tx: mpsc::UnboundedSender<serde_json::Value>,
tx: mpsc::UnboundedSender<anyhow::Result<serde_json::Value>>,
) -> anyhow::Result<()> {
// {"jsonrpc":"2.0","method":"eth_syncing","params":[],"id":1}
let incoming_id = json_request_body
@ -612,7 +624,7 @@ impl Web3ProxyState {
// send the first good response to a one shot channel. that way we respond quickly
// drop the result because errors are expected after the first send
// TODO: if "no block with that header" or some other jsonrpc errors, skip this response
let _ = tx.send(response);
let _ = tx.send(Ok(response));
Ok::<(), anyhow::Error>(())
}
@ -632,10 +644,19 @@ impl Web3ProxyState {
}
}
if !errs.is_empty() {
let e: anyhow::Result<serde_json::Value> = if !errs.is_empty() {
Err(errs.pop().unwrap())
} else {
Err(anyhow::anyhow!("no successful responses"))
};
// TODO: think about this more. we want to send it
if tx.send(e).is_ok() {
// if we were able to send an error, then we never sent a success
return Err(anyhow::anyhow!("no successful responses"));
} else {
// sending the error failed. the other side must be closed (which means we sent a success)
Ok(())
}
}
}