From 5fb64614fe7a00ca4f27917dc044caf31b5aaa89 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Mon, 23 Oct 2023 23:06:02 -0700 Subject: [PATCH] use pin for some sleeps --- web3_proxy/src/app/mod.rs | 7 ++++--- web3_proxy/src/rpcs/consensus.rs | 6 +++--- web3_proxy/src/rpcs/one.rs | 5 ++++- web3_proxy_cli/src/sub_commands/proxyd.rs | 1 + 4 files changed, 12 insertions(+), 7 deletions(-) diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 79006f5d..4671b776 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -44,10 +44,10 @@ use std::str::FromStr; use std::sync::atomic::{self, AtomicU16, Ordering}; use std::sync::Arc; use std::time::Duration; -use tokio::select; use tokio::sync::{broadcast, mpsc, oneshot, watch, Semaphore}; use tokio::task::{yield_now, JoinHandle}; use tokio::time::{sleep, sleep_until, timeout_at, Instant}; +use tokio::{pin, select}; use tracing::{error, info, trace, warn}; // TODO: make this customizable? @@ -1240,7 +1240,8 @@ impl App { // TODO: think more about how to handle retries without hammering our servers with errors let mut ranked_rpcs = self.balanced_rpcs.watch_ranked_rpcs.subscribe(); - let latest_start = Instant::now() + Duration::from_secs(3); + let latest_start = sleep_until(Instant::now() + Duration::from_secs(3)); + pin!(latest_start); // TODO: how many retries? loop { @@ -1281,7 +1282,7 @@ impl App { // TODO: pass these RankedRpcs to ValidatedRequest::new_with_app ranked_rpcs.borrow_and_update(); } - _ = sleep_until(latest_start) => { + _ = &mut latest_start => { // do not retry if we've already been trying for 3 seconds break; } diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index 28ac3940..eadac935 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -956,10 +956,10 @@ impl RpcsForRequest { earliest_retry_at = Some(self.request.connect_timeout_at()); } - let retry_at = earliest_retry_at.expect("retry_at should always be set by now"); + let retry_until = sleep_until(earliest_retry_at.expect("retry_at should always be set by now")); if wait_for_sync.is_empty() { - sleep_until(retry_at).await; + retry_until.await; } else { select!{ (x, _, _) = select_all(wait_for_sync) => { @@ -979,7 +979,7 @@ impl RpcsForRequest { }, } }, - _ = sleep_until(retry_at) => { + _ = retry_until => { // we've waited long enough that trying again might work }, } diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index 5085ef14..22301ab6 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -1044,6 +1044,9 @@ impl Web3Rpc { error_handler: Option, allow_unhealthy: bool, ) -> Web3ProxyResult { + let connect_timeout_at = sleep_until(web3_request.connect_timeout_at()); + tokio::pin!(connect_timeout_at); + loop { match self .try_request_handle(web3_request, error_handler, allow_unhealthy) @@ -1076,7 +1079,7 @@ impl Web3Rpc { Ok(OpenRequestResult::Lagged(now_synced_f)) => { select! { _ = now_synced_f => {} - _ = sleep_until(web3_request.connect_timeout_at()) => { + _ = &mut connect_timeout_at => { break; } } diff --git a/web3_proxy_cli/src/sub_commands/proxyd.rs b/web3_proxy_cli/src/sub_commands/proxyd.rs index e67af9ad..ab08b50f 100644 --- a/web3_proxy_cli/src/sub_commands/proxyd.rs +++ b/web3_proxy_cli/src/sub_commands/proxyd.rs @@ -310,6 +310,7 @@ impl ProxydSubCommand { } // now that the frontend is complete, tell all the other futures to finish + // TODO: can we use tokio::spawn Handle's abort? if let Err(err) = app_shutdown_sender.send(()) { warn!(?err, "backend sender"); };