From b9d68c0f465670c1fd89bcb6ba92f7e2c840d55b Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Tue, 24 Oct 2023 23:41:45 -0700 Subject: [PATCH] better disconnect health check --- web3_proxy/src/rpcs/consensus.rs | 5 ++--- web3_proxy/src/rpcs/one.rs | 17 +++++++++++++---- 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index d6be5815..7717843a 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -757,8 +757,8 @@ impl ConsensusFinder { let minmax_block = self .rpc_heads - .iter() - .filter(|(rpc, x)| { + .values() + .filter(|x| { if let Some(max_block_age) = self.max_head_block_age { if x.age() > max_block_age { return false; @@ -767,7 +767,6 @@ impl ConsensusFinder { true }) - .map(|(_, x)| x) .minmax_by_key(|x| x.number()); let (lowest_block, highest_block) = match minmax_block { diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index a9421d96..0082b226 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -630,6 +630,7 @@ impl Web3Rpc { Ok(()) } + #[inline(always)] fn should_disconnect(&self) -> bool { *self.disconnect_watch.as_ref().unwrap().borrow() } @@ -647,10 +648,6 @@ impl Web3Rpc { return Err(Web3ProxyError::OldHead(self.clone(), head_block)); } - if self.should_disconnect() { - return Err(anyhow::anyhow!("rpc should disconnect").into()); - } - if detailed_healthcheck { let block_number = head_block.number(); @@ -799,6 +796,10 @@ impl Web3Rpc { // errors here should not cause the loop to exit! only mark unhealthy loop { + if rpc.should_disconnect() { + break; + } + new_total_requests = rpc.internal_requests.load(atomic::Ordering::Relaxed) + rpc.external_requests.load(atomic::Ordering::Relaxed); @@ -824,6 +825,8 @@ impl Web3Rpc { sleep(Duration::from_secs(health_sleep_seconds)).await; } + + Ok(()) }; // TODO: log quick_check lik @@ -849,6 +852,10 @@ impl Web3Rpc { let f = async move { // errors here should not cause the loop to exit! only mark unhealthy loop { + if rpc.should_disconnect() { + break; + } + // TODO: if this fails too many times, reset the connection if let Err(err) = rpc.check_provider(chain_id).await { rpc.healthy.store(false, atomic::Ordering::Relaxed); @@ -865,6 +872,8 @@ impl Web3Rpc { sleep(Duration::from_secs(health_sleep_seconds)).await; } + + Ok(()) }; tokio::spawn(f)