diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index d0b5882d..85860e23 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -1011,6 +1011,12 @@ impl RpcsForRequest { stream! { loop { + if self.request.ttl_expired() { + break; + } else { + yield_now().await; + } + let mut earliest_retry_at = None; let mut wait_for_sync = None; @@ -1130,7 +1136,17 @@ impl RpcsForRequest { (Some(wait_for_sync), Some(retry_at)) => { select! { x = wait_for_sync => { - todo!() + match x { + Ok(rpc) => { + trace!(%rpc, "rpc ready. it might be used on the next loop"); + // TODO: try a handle now? + continue; + }, + Err(err) => { + trace!(?err, "problem while waiting for an rpc for a request"); + break; + }, + } } _ = sleep_until(retry_at) => { yield_now().await; diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index 284c13db..5501de6f 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -259,7 +259,7 @@ impl Web3Rpc { &self, max_block: Option, start_instant: Instant, - ) -> (Reverse, bool, Reverse, u32) { + ) -> (Instant, bool, Reverse, u32) { let mut head_block = self .head_block_sender .as_ref() @@ -276,7 +276,7 @@ impl Web3Rpc { let next_available = self.next_available(start_instant); - (Reverse(next_available), !backup, Reverse(head_block), tier) + (next_available, !backup, Reverse(head_block), tier) } /// sort with `sort_on` and then on `weighted_peak_latency`