add max wait to fix tests

This commit is contained in:
Bryan Stitt 2023-05-23 20:46:27 -07:00
parent dafb69fae1
commit b1a0bcac57
2 changed files with 70 additions and 22 deletions

View File

@ -153,7 +153,7 @@ pub async fn user_balance_post(
// Just make an rpc request, idk if i need to call this super extensive code
let transaction_receipt: TransactionReceipt = match app
.balanced_rpcs
.wait_for_best_rpc(&authorization, None, &mut vec![], None, None)
.wait_for_best_rpc(&authorization, None, &mut vec![], None, None, None)
.await
{
Ok(OpenRequestResult::Handle(handle)) => {
@ -187,7 +187,7 @@ pub async fn user_balance_post(
debug!("Transaction receipt is: {:?}", transaction_receipt);
let accepted_token: Address = match app
.balanced_rpcs
.wait_for_best_rpc(&authorization, None, &mut vec![], None, None)
.wait_for_best_rpc(&authorization, None, &mut vec![], None, None, None)
.await
{
Ok(OpenRequestResult::Handle(handle)) => {
@ -242,7 +242,7 @@ pub async fn user_balance_post(
debug!("Accepted token is: {:?}", accepted_token);
let decimals: u32 = match app
.balanced_rpcs
.wait_for_best_rpc(&authorization, None, &mut vec![], None, None)
.wait_for_best_rpc(&authorization, None, &mut vec![], None, None, None)
.await
{
Ok(OpenRequestResult::Handle(handle)) => {

View File

@ -34,6 +34,7 @@ use std::fmt::{self, Display};
use std::sync::atomic::Ordering;
use std::sync::Arc;
use thread_fast_rng::rand::seq::SliceRandom;
use tokio::select;
use tokio::sync::{broadcast, watch};
use tokio::time::{sleep, sleep_until, Duration, Instant};
@ -485,6 +486,7 @@ impl Web3Rpcs {
skip_rpcs: &mut Vec<Arc<Web3Rpc>>,
min_block_needed: Option<&U64>,
max_block_needed: Option<&U64>,
max_wait: Option<Duration>,
) -> Web3ProxyResult<OpenRequestResult> {
let mut earliest_retry_at: Option<Instant> = None;
@ -526,16 +528,14 @@ impl Web3Rpcs {
}
}
} else {
let start = Instant::now();
// TODO: get from config or argument
let max_wait = Duration::from_secs(10);
let stop_trying_at =
Instant::now() + max_wait.unwrap_or_else(|| Duration::from_secs(10));
let mut watch_consensus_rpcs = self.watch_consensus_rpcs_sender.subscribe();
let mut potential_rpcs = Vec::with_capacity(self.by_name.load().len());
while start.elapsed() < max_wait {
loop {
let consensus_rpcs = watch_consensus_rpcs.borrow_and_update().clone();
potential_rpcs.clear();
@ -653,12 +653,16 @@ impl Web3Rpcs {
match consensus_rpcs.should_wait_for_block(waiting_for, skip_rpcs) {
ShouldWaitForBlock::NeverReady => break,
ShouldWaitForBlock::Ready => continue,
ShouldWaitForBlock::Wait { .. } => {}
ShouldWaitForBlock::Ready => {}
ShouldWaitForBlock::Wait { .. } => select! {
_ = watch_consensus_rpcs.changed() => {},
_ = sleep_until(stop_trying_at) => {},
},
}
}
// TODO: select on consensus_rpcs changing and on earliest_retry_at
watch_consensus_rpcs.changed().await?;
if Instant::now() > stop_trying_at {
break;
}
}
}
@ -823,6 +827,7 @@ impl Web3Rpcs {
&mut skip_rpcs,
min_block_needed,
max_block_needed,
None,
)
.await?
{
@ -1493,6 +1498,7 @@ mod tests {
&mut vec![],
Some(head_block.number.as_ref().unwrap()),
None,
Some(Duration::from_secs(0)),
)
.await
.unwrap();
@ -1584,28 +1590,56 @@ mod tests {
// TODO: make sure the handle is for the expected rpc
assert!(matches!(
rpcs.wait_for_best_rpc(&authorization, None, &mut vec![], None, None)
.await,
rpcs.wait_for_best_rpc(
&authorization,
None,
&mut vec![],
None,
None,
Some(Duration::from_secs(0))
)
.await,
Ok(OpenRequestResult::Handle(_))
));
// TODO: make sure the handle is for the expected rpc
assert!(matches!(
rpcs.wait_for_best_rpc(&authorization, None, &mut vec![], Some(&0.into()), None)
.await,
rpcs.wait_for_best_rpc(
&authorization,
None,
&mut vec![],
Some(&0.into()),
None,
Some(Duration::from_secs(0)),
)
.await,
Ok(OpenRequestResult::Handle(_))
));
// TODO: make sure the handle is for the expected rpc
assert!(matches!(
rpcs.wait_for_best_rpc(&authorization, None, &mut vec![], Some(&1.into()), None)
.await,
rpcs.wait_for_best_rpc(
&authorization,
None,
&mut vec![],
Some(&1.into()),
None,
Some(Duration::from_secs(0)),
)
.await,
Ok(OpenRequestResult::Handle(_))
));
// future block should not get a handle
let future_rpc = rpcs
.wait_for_best_rpc(&authorization, None, &mut vec![], Some(&2.into()), None)
.wait_for_best_rpc(
&authorization,
None,
&mut vec![],
Some(&2.into()),
None,
Some(Duration::from_secs(0)),
)
.await;
assert!(matches!(future_rpc, Ok(OpenRequestResult::NotReady)));
}
@ -1675,7 +1709,6 @@ mod tests {
let (watch_consensus_rpcs_sender, _) = watch::channel(None);
let (watch_consensus_head_sender, _watch_consensus_head_receiver) = watch::channel(None);
// TODO: make a Web3Rpcs::new
let rpcs = Web3Rpcs {
block_sender,
by_name: ArcSwap::from_pointee(rpcs_by_name),
@ -1733,6 +1766,7 @@ mod tests {
&mut vec![],
Some(head_block.number()),
None,
Some(Duration::from_secs(0)),
)
.await;
@ -1744,13 +1778,27 @@ mod tests {
));
let _best_available_server_from_none = rpcs
.wait_for_best_rpc(&authorization, None, &mut vec![], None, None)
.wait_for_best_rpc(
&authorization,
None,
&mut vec![],
None,
None,
Some(Duration::from_secs(0)),
)
.await;
// assert_eq!(best_available_server, best_available_server_from_none);
let best_archive_server = rpcs
.wait_for_best_rpc(&authorization, None, &mut vec![], Some(&1.into()), None)
.wait_for_best_rpc(
&authorization,
None,
&mut vec![],
Some(&1.into()),
None,
Some(Duration::from_secs(0)),
)
.await;
match best_archive_server {