From 51ba53980061995b1493eb563d06f11b14344f34 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Thu, 12 Oct 2023 23:57:12 -0700 Subject: [PATCH] bring back outer rpc checking --- web3_proxy/src/block_number.rs | 3 +- web3_proxy/src/rpcs/consensus.rs | 88 ++++++++++++++++++-------------- web3_proxy/src/rpcs/many.rs | 2 +- 3 files changed, 53 insertions(+), 40 deletions(-) diff --git a/web3_proxy/src/block_number.rs b/web3_proxy/src/block_number.rs index 1f6f41bd..a13ab420 100644 --- a/web3_proxy/src/block_number.rs +++ b/web3_proxy/src/block_number.rs @@ -103,6 +103,7 @@ pub async fn clean_block_number<'a>( if block_hash == *head_block.hash() { (head_block.into(), false) } else if let Some(app) = app { + // TODO: query for the block let block = app .balanced_rpcs .blocks_by_hash @@ -161,7 +162,7 @@ pub async fn clean_block_number<'a>( let head_block_num = head_block.number(); if block_num > head_block_num { - // TODO: option to wait for the block + // todo!(option to wait for the block here) return Err(Web3ProxyError::UnknownBlockNumber { known: head_block_num, unknown: block_num, diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index 0fd28376..081408b1 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -97,6 +97,7 @@ pub struct RankedRpcs { // TODO: could these be refs? The owning RankedRpcs lifetime might work. `stream!` might make it complicated pub struct RpcsForRequest { inner: Vec>, + outer: Vec>, request: Arc, } @@ -205,8 +206,11 @@ impl RankedRpcs { let head_block = self.head_block.number(); + let num_active = self.num_active_rpcs(); + // these are bigger than we need, but how much does that matter? - let mut inner_for_request = Vec::>::with_capacity(self.num_active_rpcs()); + let mut inner_for_request = Vec::>::with_capacity(num_active); + let mut outer_for_request = Vec::with_capacity(num_active); // TODO: what if min is set to some future block? // TODO: what if max is set to some future block? @@ -214,21 +218,24 @@ impl RankedRpcs { let max_block_needed = web3_request.max_block_needed(); // TODO: max lag was already handled - for rpc in self.inner.iter() { + for rpc in self.inner.iter().cloned() { if let Some(block_needed) = min_block_needed { if !rpc.has_block_data(block_needed) { + outer_for_request.push(rpc); continue; } } if let Some(block_needed) = max_block_needed { if !rpc.has_block_data(block_needed) { + outer_for_request.push(rpc); continue; } } - inner_for_request.push(rpc.clone()); + inner_for_request.push(rpc); } + // TODO: use web3_request.start_instant? I think we want it to be as recent as possible let now = Instant::now(); match self.sort_mode { @@ -239,26 +246,31 @@ impl RankedRpcs { // we use shuffle instead of sort si that the load gets spread around more // we will still compare weights during `RpcsForRequest::to_stream` - // TODO: use web3_request.start_instant? I think we want it to be as recent as possible inner_for_request.sort_by_cached_key(|x| { x.shuffle_for_load_balancing_on(max_block_needed, &mut rng, now) }); + outer_for_request.sort_by_cached_key(|x| { + x.shuffle_for_load_balancing_on(max_block_needed, &mut rng, now) + }); } SortMethod::Sort => { // we sort so that the best nodes are always preferred. we will compare weights during `RpcsForRequest::to_stream` inner_for_request .sort_by_cached_key(|x| x.sort_for_load_balancing_on(max_block_needed, now)); + outer_for_request + .sort_by_cached_key(|x| x.sort_for_load_balancing_on(max_block_needed, now)); } } if inner_for_request.is_empty() { - warn!(?inner_for_request, %web3_request, %head_block, "no rpcs for request"); + warn!(?inner_for_request, ?outer_for_request, %web3_request, %head_block, "no rpcs for request"); None } else { - trace!(?inner_for_request, %web3_request, "for_request"); + trace!(?inner_for_request, ?outer_for_request, %web3_request, "for_request"); Some(RpcsForRequest { inner: inner_for_request, + outer: outer_for_request, request: web3_request.clone(), }) } @@ -870,46 +882,46 @@ impl RpcsForRequest { loop { if self.request.connect_timeout() { break; - } else { - // yield_now().await; } let mut earliest_retry_at = None; let mut wait_for_sync = Vec::new(); // TODO: we used to do a neat power of 2 random choices here, but it had bugs. bring that back - for best_rpc in self.inner.iter() { - match best_rpc - .try_request_handle(&self.request, error_handler, false) - .await - { - Ok(OpenRequestResult::Handle(handle)) => { - trace!("opened handle: {}", best_rpc); - yield handle; + for rpcs in [self.inner.iter(), self.outer.iter()] { + for best_rpc in rpcs { + match best_rpc + .try_request_handle(&self.request, error_handler, false) + .await + { + Ok(OpenRequestResult::Handle(handle)) => { + trace!("opened handle: {}", best_rpc); + yield handle; + } + Ok(OpenRequestResult::RetryAt(retry_at)) => { + trace!( + "retry on {} @ {}", + best_rpc, + retry_at.duration_since(Instant::now()).as_secs_f32() + ); + earliest_retry_at = earliest_retry_at.min(Some(retry_at, )); + } + Ok(OpenRequestResult::Lagged(x)) => { + // this will probably always be the same block, right? + trace!("{} is lagged. will not work now", best_rpc); + wait_for_sync.push(x); + } + Ok(OpenRequestResult::Failed) => { + // TODO: log a warning? emit a stat? + trace!("best_rpc not ready: {}", best_rpc); + } + Err(err) => { + trace!("No request handle for {}. err={:?}", best_rpc, err); + } } - Ok(OpenRequestResult::RetryAt(retry_at)) => { - trace!( - "retry on {} @ {}", - best_rpc, - retry_at.duration_since(Instant::now()).as_secs_f32() - ); - earliest_retry_at = earliest_retry_at.min(Some(retry_at, )); - } - Ok(OpenRequestResult::Lagged(x)) => { - // this will probably always be the same block, right? - trace!("{} is lagged. will not work now", best_rpc); - wait_for_sync.push(x); - } - Ok(OpenRequestResult::Failed) => { - // TODO: log a warning? emit a stat? - trace!("best_rpc not ready: {}", best_rpc); - } - Err(err) => { - trace!("No request handle for {}. err={:?}", best_rpc, err); - } - } - yield_now().await; + yield_now().await; + } } // if we got this far, no inner or outer rpcs are ready. thats suprising since an inner should have been ready. maybe it got rate limited diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 5c2d740a..77aefe6d 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -462,7 +462,7 @@ impl Web3Rpcs { // TODO: limit number of tries let rpcs = self.try_rpcs_for_request(web3_request).await?; - let stream = rpcs.to_stream().take(3); + let stream = rpcs.to_stream(); pin!(stream);