diff --git a/web3_proxy/src/frontend/users/payment.rs b/web3_proxy/src/frontend/users/payment.rs index 728728d0..1ceb8532 100644 --- a/web3_proxy/src/frontend/users/payment.rs +++ b/web3_proxy/src/frontend/users/payment.rs @@ -153,7 +153,7 @@ pub async fn user_balance_post( // Just make an rpc request, idk if i need to call this super extensive code let transaction_receipt: TransactionReceipt = match app .balanced_rpcs - .wait_for_best_rpc(&authorization, None, &mut vec![], None, None) + .wait_for_best_rpc(&authorization, None, &mut vec![], None, None, None) .await { Ok(OpenRequestResult::Handle(handle)) => { @@ -187,7 +187,7 @@ pub async fn user_balance_post( debug!("Transaction receipt is: {:?}", transaction_receipt); let accepted_token: Address = match app .balanced_rpcs - .wait_for_best_rpc(&authorization, None, &mut vec![], None, None) + .wait_for_best_rpc(&authorization, None, &mut vec![], None, None, None) .await { Ok(OpenRequestResult::Handle(handle)) => { @@ -242,7 +242,7 @@ pub async fn user_balance_post( debug!("Accepted token is: {:?}", accepted_token); let decimals: u32 = match app .balanced_rpcs - .wait_for_best_rpc(&authorization, None, &mut vec![], None, None) + .wait_for_best_rpc(&authorization, None, &mut vec![], None, None, None) .await { Ok(OpenRequestResult::Handle(handle)) => { diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 5bad1334..4bf536cf 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -34,6 +34,7 @@ use std::fmt::{self, Display}; use std::sync::atomic::Ordering; use std::sync::Arc; use thread_fast_rng::rand::seq::SliceRandom; +use tokio::select; use tokio::sync::{broadcast, watch}; use tokio::time::{sleep, sleep_until, Duration, Instant}; @@ -485,6 +486,7 @@ impl Web3Rpcs { skip_rpcs: &mut Vec>, min_block_needed: Option<&U64>, max_block_needed: Option<&U64>, + max_wait: Option, ) -> Web3ProxyResult { let mut earliest_retry_at: Option = None; @@ -526,16 +528,14 @@ impl Web3Rpcs { } } } else { - let start = Instant::now(); - - // TODO: get from config or argument - let max_wait = Duration::from_secs(10); + let stop_trying_at = + Instant::now() + max_wait.unwrap_or_else(|| Duration::from_secs(10)); let mut watch_consensus_rpcs = self.watch_consensus_rpcs_sender.subscribe(); let mut potential_rpcs = Vec::with_capacity(self.by_name.load().len()); - while start.elapsed() < max_wait { + loop { let consensus_rpcs = watch_consensus_rpcs.borrow_and_update().clone(); potential_rpcs.clear(); @@ -653,12 +653,16 @@ impl Web3Rpcs { match consensus_rpcs.should_wait_for_block(waiting_for, skip_rpcs) { ShouldWaitForBlock::NeverReady => break, - ShouldWaitForBlock::Ready => continue, - ShouldWaitForBlock::Wait { .. } => {} + ShouldWaitForBlock::Ready => {} + ShouldWaitForBlock::Wait { .. } => select! { + _ = watch_consensus_rpcs.changed() => {}, + _ = sleep_until(stop_trying_at) => {}, + }, } + } - // TODO: select on consensus_rpcs changing and on earliest_retry_at - watch_consensus_rpcs.changed().await?; + if Instant::now() > stop_trying_at { + break; } } } @@ -823,6 +827,7 @@ impl Web3Rpcs { &mut skip_rpcs, min_block_needed, max_block_needed, + None, ) .await? { @@ -1493,6 +1498,7 @@ mod tests { &mut vec![], Some(head_block.number.as_ref().unwrap()), None, + Some(Duration::from_secs(0)), ) .await .unwrap(); @@ -1584,28 +1590,56 @@ mod tests { // TODO: make sure the handle is for the expected rpc assert!(matches!( - rpcs.wait_for_best_rpc(&authorization, None, &mut vec![], None, None) - .await, + rpcs.wait_for_best_rpc( + &authorization, + None, + &mut vec![], + None, + None, + Some(Duration::from_secs(0)) + ) + .await, Ok(OpenRequestResult::Handle(_)) )); // TODO: make sure the handle is for the expected rpc assert!(matches!( - rpcs.wait_for_best_rpc(&authorization, None, &mut vec![], Some(&0.into()), None) - .await, + rpcs.wait_for_best_rpc( + &authorization, + None, + &mut vec![], + Some(&0.into()), + None, + Some(Duration::from_secs(0)), + ) + .await, Ok(OpenRequestResult::Handle(_)) )); // TODO: make sure the handle is for the expected rpc assert!(matches!( - rpcs.wait_for_best_rpc(&authorization, None, &mut vec![], Some(&1.into()), None) - .await, + rpcs.wait_for_best_rpc( + &authorization, + None, + &mut vec![], + Some(&1.into()), + None, + Some(Duration::from_secs(0)), + ) + .await, Ok(OpenRequestResult::Handle(_)) )); // future block should not get a handle let future_rpc = rpcs - .wait_for_best_rpc(&authorization, None, &mut vec![], Some(&2.into()), None) + .wait_for_best_rpc( + &authorization, + None, + &mut vec![], + Some(&2.into()), + None, + Some(Duration::from_secs(0)), + ) .await; assert!(matches!(future_rpc, Ok(OpenRequestResult::NotReady))); } @@ -1675,7 +1709,6 @@ mod tests { let (watch_consensus_rpcs_sender, _) = watch::channel(None); let (watch_consensus_head_sender, _watch_consensus_head_receiver) = watch::channel(None); - // TODO: make a Web3Rpcs::new let rpcs = Web3Rpcs { block_sender, by_name: ArcSwap::from_pointee(rpcs_by_name), @@ -1733,6 +1766,7 @@ mod tests { &mut vec![], Some(head_block.number()), None, + Some(Duration::from_secs(0)), ) .await; @@ -1744,13 +1778,27 @@ mod tests { )); let _best_available_server_from_none = rpcs - .wait_for_best_rpc(&authorization, None, &mut vec![], None, None) + .wait_for_best_rpc( + &authorization, + None, + &mut vec![], + None, + None, + Some(Duration::from_secs(0)), + ) .await; // assert_eq!(best_available_server, best_available_server_from_none); let best_archive_server = rpcs - .wait_for_best_rpc(&authorization, None, &mut vec![], Some(&1.into()), None) + .wait_for_best_rpc( + &authorization, + None, + &mut vec![], + Some(&1.into()), + None, + Some(Duration::from_secs(0)), + ) .await; match best_archive_server {