diff --git a/src/main.rs b/src/main.rs index a4ef4802..98096ff3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -465,7 +465,8 @@ impl Web3ProxyState { match private_rpcs.get_upstream_servers().await { Ok(upstream_servers) => { - let (tx, mut rx) = mpsc::unbounded_channel::(); + let (tx, mut rx) = + mpsc::unbounded_channel::>(); let clone = self.clone(); let connections = private_rpcs.connections.clone(); @@ -480,9 +481,11 @@ impl Web3ProxyState { let response = rx .recv() .await - .ok_or_else(|| anyhow::anyhow!("no response"))?; + .ok_or_else(|| anyhow::anyhow!("no successful response"))?; - return Ok(warp::reply::json(&response)); + if let Ok(response) = response { + return Ok(warp::reply::json(&response)); + } } Err(not_until) => { // TODO: move this to a helper function @@ -500,6 +503,7 @@ impl Web3ProxyState { } } else { // this is not a private transaction (or no private relays are configured) + // try to send to each tier, stopping at the first success loop { let read_lock = self.balanced_rpc_ratelimiter_lock.read().await; @@ -509,7 +513,8 @@ impl Web3ProxyState { for balanced_rpcs in self.balanced_rpc_tiers.iter() { match balanced_rpcs.next_upstream_server().await { Ok(upstream_server) => { - let (tx, mut rx) = mpsc::unbounded_channel::(); + let (tx, mut rx) = + mpsc::unbounded_channel::>(); let clone = self.clone(); let connections = balanced_rpcs.connections.clone(); @@ -529,9 +534,11 @@ impl Web3ProxyState { let response = rx .recv() .await - .ok_or_else(|| anyhow::anyhow!("no response"))?; + .ok_or_else(|| anyhow::anyhow!("no successful response"))?; - return Ok(warp::reply::json(&response)); + if let Ok(response) = response { + return Ok(warp::reply::json(&response)); + } } Err(not_until) => { // save the smallest not_until. if nothing succeeds, return an Err with not_until in it @@ -557,7 +564,12 @@ impl Web3ProxyState { let write_lock = self.balanced_rpc_ratelimiter_lock.write().await; // unwrap should be safe since we would have returned if it wasn't set - let deadline = earliest_not_until.unwrap().wait_time_from(self.clock.now()); + let deadline = if let Some(earliest_not_until) = earliest_not_until { + earliest_not_until.wait_time_from(self.clock.now()) + } else { + // TODO: exponential backoff? + Duration::from_secs(1) + }; sleep(deadline).await; @@ -571,7 +583,7 @@ impl Web3ProxyState { rpc_servers: Vec, connections: Arc, json_request_body: serde_json::Value, - tx: mpsc::UnboundedSender, + tx: mpsc::UnboundedSender>, ) -> anyhow::Result<()> { // {"jsonrpc":"2.0","method":"eth_syncing","params":[],"id":1} let incoming_id = json_request_body @@ -612,7 +624,7 @@ impl Web3ProxyState { // send the first good response to a one shot channel. that way we respond quickly // drop the result because errors are expected after the first send // TODO: if "no block with that header" or some other jsonrpc errors, skip this response - let _ = tx.send(response); + let _ = tx.send(Ok(response)); Ok::<(), anyhow::Error>(()) } @@ -632,10 +644,19 @@ impl Web3ProxyState { } } - if !errs.is_empty() { + let e: anyhow::Result = if !errs.is_empty() { Err(errs.pop().unwrap()) } else { + Err(anyhow::anyhow!("no successful responses")) + }; + + // TODO: think about this more. we want to send it + if tx.send(e).is_ok() { + // if we were able to send an error, then we never sent a success return Err(anyhow::anyhow!("no successful responses")); + } else { + // sending the error failed. the other side must be closed (which means we sent a success) + Ok(()) } } }