From 519ba473d99e0cf643fb00c9eb70dff145b578e4 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Mon, 28 Nov 2022 06:52:16 +0000 Subject: [PATCH] improve rpc filtering --- web3_proxy/src/rpcs/connections.rs | 88 +++++++++++++++--------------- 1 file changed, 43 insertions(+), 45 deletions(-) diff --git a/web3_proxy/src/rpcs/connections.rs b/web3_proxy/src/rpcs/connections.rs index cc0f3f60..f00431c4 100644 --- a/web3_proxy/src/rpcs/connections.rs +++ b/web3_proxy/src/rpcs/connections.rs @@ -390,41 +390,42 @@ impl Web3Connections { ) -> anyhow::Result { let mut earliest_retry_at = None; - let min_block_needed = if let Some(min_block_needed) = min_block_needed { - *min_block_needed + let usable_rpcs: Vec> = if let Some(min_block_needed) = min_block_needed + { + // need a potentially old block. check all the rpcs + // TODO: we are going to be checking "has_block_data" a lot now + self.conns + .values() + .filter(|x| !skip.contains(x)) + .filter(|x| x.has_block_data(min_block_needed)) + .cloned() + .collect() } else { - match self.head_block_num() { - Some(x) => x, - None => { - trace!("no head block on {:?}", self); - return Ok(OpenRequestResult::NotSynced); - } - } + // need latest. filter the synced rpcs + // TODO: double check has_block_data? + self.synced_connections + .load() + .conns + .iter() + .filter(|x| !skip.contains(x)) + .cloned() + .collect() }; - trace!("min block needed: {}", min_block_needed); - - // filter the synced rpcs - // TODO: we are going to be checking "has_block_data" a lot now - let head_rpcs: Vec> = self - .synced_connections - .load() - .conns - .iter() - .filter(|x| !skip.contains(x)) - .filter(|x| x.has_block_data(&min_block_needed)) - .cloned() - .collect(); - - match head_rpcs.len() { + match usable_rpcs.len() { 0 => { - warn!("no head rpcs: {:?} (skipped {:?})", self, skip); + warn!( + "no rpcs @ {:?}: {:?} (skipped {:?})", + min_block_needed, + self.synced_connections.load(), + skip.iter().map(|x| &x.name).collect::>() + ); // TODO: what should happen here? automatic retry? // TODO: more detailed error return Ok(OpenRequestResult::NotSynced); } 1 => { - let rpc = head_rpcs.get(0).expect("len is 1"); + let rpc = usable_rpcs.get(0).expect("len is 1"); // TODO: try or wait for a request handle? let handle = rpc @@ -441,7 +442,7 @@ impl Web3Connections { let mut minimum = f64::MAX; // we sort on a bunch of values. cache them here so that we don't do this math multiple times. - let available_request_map: HashMap<_, f64> = head_rpcs + let available_request_map: HashMap<_, f64> = usable_rpcs .iter() .map(|rpc| { // TODO: are active requests what we want? do we want a counter for requests in the last second + any actives longer than that? @@ -482,13 +483,14 @@ impl Web3Connections { }; let sorted_rpcs = { - if head_rpcs.len() == 1 { - vec![head_rpcs.get(0).expect("there should be 1")] + if usable_rpcs.len() == 1 { + vec![usable_rpcs.get(0).expect("there should be 1")] } else { let mut rng = thread_fast_rng::thread_fast_rng(); - head_rpcs - .choose_multiple_weighted(&mut rng, head_rpcs.len(), |rpc| { + // TODO: sort or weight the non-archive nodes to be first + usable_rpcs + .choose_multiple_weighted(&mut rng, usable_rpcs.len(), |rpc| { *available_request_map .get(rpc) .expect("rpc should always be in the weight map") @@ -709,8 +711,6 @@ impl Web3Connections { continue; } OpenRequestResult::NotSynced => { - warn!("No synced servers! {:?}", self); - if let Some(request_metadata) = request_metadata { request_metadata.no_servers.fetch_add(1, Ordering::Release); } @@ -730,6 +730,8 @@ impl Web3Connections { .store(true, Ordering::Release); } + warn!("No synced servers! {:?}", self.synced_connections.load()); + // TODO: what error code? 502? Err(anyhow::anyhow!("all {} tries exhausted", skip_rpcs.len())) } @@ -996,18 +998,14 @@ mod tests { ); // best_synced_backend_connection requires servers to be synced with the head block - assert!(matches!( - conns - .best_synced_backend_connection( - &authorization, - None, - &[], - lagged_block.number.as_ref() - ) - .await - .unwrap(), - OpenRequestResult::NotSynced - )); + let x = conns + .best_synced_backend_connection(&authorization, None, &[], None) + .await + .unwrap(); + + dbg!(&x); + + assert!(matches!(x, OpenRequestResult::NotSynced)); // add lagged blocks to the conns. both servers should be allowed conns.save_block(&lagged_block, true).await.unwrap();