From aa3b40e03f6ea67505646325d90e35143b4ad25a Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Fri, 13 Oct 2023 00:22:57 -0700 Subject: [PATCH] add a spot for retries --- web3_proxy/src/app/mod.rs | 78 +++++++++++++++++++++++++++++---------- 1 file changed, 59 insertions(+), 19 deletions(-) diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 1b5940d1..5a3f7d84 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -47,7 +47,7 @@ use std::time::Duration; use tokio::select; use tokio::sync::{broadcast, mpsc, oneshot, watch, Semaphore}; use tokio::task::{yield_now, JoinHandle}; -use tokio::time::{sleep, timeout, timeout_at, Instant}; +use tokio::time::{sleep, sleep_until, timeout, timeout_at, Instant}; use tracing::{error, info, trace, warn}; // TODO: make this customizable? @@ -1095,28 +1095,68 @@ impl App { // TODO: this clone is only for an error response. refactor to not need it let error_id = request.id.clone(); - let web3_request = match ValidatedRequest::new_with_app( - self, - authorization, - None, - None, - request.into(), - head_block, - ) - .await - { - Ok(x) => x, - Err(err) => { - let (a, b) = err.as_json_response_parts(error_id); + let mut last_success = None; + let mut last_error = None; + let mut web3_request; - return (a, b, vec![]); + // TODO: think more about how to handle retries without hammering our servers with errors + let mut ranked_rpcs = self.balanced_rpcs.watch_ranked_rpcs.subscribe(); + + let latest_start = Instant::now() + Duration::from_secs(3); + + // TODO: how many retries? + loop { + // TODO: refresh the request instead of making new each time. then we need less clones + web3_request = match ValidatedRequest::new_with_app( + self, + authorization.clone(), + None, + None, + request.clone().into(), + head_block.clone(), + ) + .await + { + Ok(x) => x, + Err(err) => { + let (a, b) = err.as_json_response_parts(error_id); + + let rpcs = vec![]; + + return (a, b, rpcs); + } + }; + + // turn some of the Web3ProxyErrors into Ok results + match self._proxy_request_with_caching(&web3_request).await { + Ok(response_data) => { + last_success = Some(response_data); + break; + } + Err(err) => { + last_error = Some(err); + } } + + select! { + _ = ranked_rpcs.changed() => { + // TODO: pass these RankedRpcs to ValidatedRequest::new_with_app + ranked_rpcs.borrow_and_update(); + } + _ = sleep_until(latest_start) => { + // do not retry if we've already been trying for 3 seconds + break; + } + } + } + + let last_response = if let Some(last_success) = last_success { + Ok(last_success) + } else { + Err(last_error.unwrap_or(anyhow::anyhow!("no success or error").into())) }; - // TODO: trace/kafka log request.params before we send them to _proxy_request_with_caching which might modify them - - // turn some of the Web3ProxyErrors into Ok results - let (code, response) = match self._proxy_request_with_caching(&web3_request).await { + let (code, response) = match last_response { Ok(response_data) => { web3_request.error_response.store(false, Ordering::Relaxed); web3_request