use pin for some sleeps

This commit is contained in:
Bryan Stitt 2023-10-23 23:06:02 -07:00
parent d09dab846d
commit 5fb64614fe
4 changed files with 12 additions and 7 deletions

View File

@ -44,10 +44,10 @@ use std::str::FromStr;
use std::sync::atomic::{self, AtomicU16, Ordering}; use std::sync::atomic::{self, AtomicU16, Ordering};
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use tokio::select;
use tokio::sync::{broadcast, mpsc, oneshot, watch, Semaphore}; use tokio::sync::{broadcast, mpsc, oneshot, watch, Semaphore};
use tokio::task::{yield_now, JoinHandle}; use tokio::task::{yield_now, JoinHandle};
use tokio::time::{sleep, sleep_until, timeout_at, Instant}; use tokio::time::{sleep, sleep_until, timeout_at, Instant};
use tokio::{pin, select};
use tracing::{error, info, trace, warn}; use tracing::{error, info, trace, warn};
// TODO: make this customizable? // TODO: make this customizable?
@ -1240,7 +1240,8 @@ impl App {
// TODO: think more about how to handle retries without hammering our servers with errors // TODO: think more about how to handle retries without hammering our servers with errors
let mut ranked_rpcs = self.balanced_rpcs.watch_ranked_rpcs.subscribe(); let mut ranked_rpcs = self.balanced_rpcs.watch_ranked_rpcs.subscribe();
let latest_start = Instant::now() + Duration::from_secs(3); let latest_start = sleep_until(Instant::now() + Duration::from_secs(3));
pin!(latest_start);
// TODO: how many retries? // TODO: how many retries?
loop { loop {
@ -1281,7 +1282,7 @@ impl App {
// TODO: pass these RankedRpcs to ValidatedRequest::new_with_app // TODO: pass these RankedRpcs to ValidatedRequest::new_with_app
ranked_rpcs.borrow_and_update(); ranked_rpcs.borrow_and_update();
} }
_ = sleep_until(latest_start) => { _ = &mut latest_start => {
// do not retry if we've already been trying for 3 seconds // do not retry if we've already been trying for 3 seconds
break; break;
} }

View File

@ -956,10 +956,10 @@ impl RpcsForRequest {
earliest_retry_at = Some(self.request.connect_timeout_at()); earliest_retry_at = Some(self.request.connect_timeout_at());
} }
let retry_at = earliest_retry_at.expect("retry_at should always be set by now"); let retry_until = sleep_until(earliest_retry_at.expect("retry_at should always be set by now"));
if wait_for_sync.is_empty() { if wait_for_sync.is_empty() {
sleep_until(retry_at).await; retry_until.await;
} else { } else {
select!{ select!{
(x, _, _) = select_all(wait_for_sync) => { (x, _, _) = select_all(wait_for_sync) => {
@ -979,7 +979,7 @@ impl RpcsForRequest {
}, },
} }
}, },
_ = sleep_until(retry_at) => { _ = retry_until => {
// we've waited long enough that trying again might work // we've waited long enough that trying again might work
}, },
} }

View File

@ -1044,6 +1044,9 @@ impl Web3Rpc {
error_handler: Option<RequestErrorHandler>, error_handler: Option<RequestErrorHandler>,
allow_unhealthy: bool, allow_unhealthy: bool,
) -> Web3ProxyResult<OpenRequestHandle> { ) -> Web3ProxyResult<OpenRequestHandle> {
let connect_timeout_at = sleep_until(web3_request.connect_timeout_at());
tokio::pin!(connect_timeout_at);
loop { loop {
match self match self
.try_request_handle(web3_request, error_handler, allow_unhealthy) .try_request_handle(web3_request, error_handler, allow_unhealthy)
@ -1076,7 +1079,7 @@ impl Web3Rpc {
Ok(OpenRequestResult::Lagged(now_synced_f)) => { Ok(OpenRequestResult::Lagged(now_synced_f)) => {
select! { select! {
_ = now_synced_f => {} _ = now_synced_f => {}
_ = sleep_until(web3_request.connect_timeout_at()) => { _ = &mut connect_timeout_at => {
break; break;
} }
} }

View File

@ -310,6 +310,7 @@ impl ProxydSubCommand {
} }
// now that the frontend is complete, tell all the other futures to finish // now that the frontend is complete, tell all the other futures to finish
// TODO: can we use tokio::spawn Handle's abort?
if let Err(err) = app_shutdown_sender.send(()) { if let Err(err) = app_shutdown_sender.send(()) {
warn!(?err, "backend sender"); warn!(?err, "backend sender");
}; };