diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index b415549d..f1e90af1 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -26,6 +26,7 @@ use std::fmt; use std::hash::{Hash, Hasher}; use std::sync::atomic::{self, AtomicU32, AtomicU64, AtomicUsize}; use std::{cmp::Ordering, sync::Arc}; +use tokio::select; use tokio::sync::{mpsc, watch, RwLock as AsyncRwLock}; use tokio::time::{interval, sleep, sleep_until, Duration, Instant, MissedTickBehavior}; use tracing::{debug, error, info, trace, warn, Level}; @@ -983,6 +984,8 @@ impl Web3Rpc { web3_request: &Arc, error_handler: Option, ) -> Web3ProxyResult { + let mut head_block_sender = None; + loop { match self.try_request_handle(web3_request, error_handler).await { Ok(OpenRequestResult::Handle(handle)) => return Ok(handle), @@ -1013,14 +1016,28 @@ impl Web3Rpc { // TODO: when can this happen? log? emit a stat? trace!("{} has no handle ready", self); - // TODO: sleep how long? maybe just error? - // TODO: instead of an arbitrary sleep, subscribe to the head block on this? - // TODO: exponential backoff with jitter - todo!(); + if head_block_sender.is_none() { + head_block_sender = self.head_block_sender.as_ref().map(|x| x.subscribe()); + } + + if let Some(head_block_sender) = &mut head_block_sender { + select! { + _ = head_block_sender.changed() => { + head_block_sender.borrow_and_update(); + } + _ = sleep_until(web3_request.expire_instant) => { + break; + } + } + } else { + break; + } } Err(err) => return Err(err), } } + + Err(Web3ProxyError::NoServersSynced) } async fn wait_for_throttle(self: &Arc, wait_until: Instant) -> Web3ProxyResult {