From c5e2402d6e1af79c7b0868a2274448e3d5faf942 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Mon, 27 Feb 2023 13:29:07 -0800 Subject: [PATCH] disconnect is noisy but it works --- web3_proxy/src/rpcs/consensus.rs | 8 ++++++-- web3_proxy/src/rpcs/many.rs | 5 +++++ web3_proxy/src/rpcs/one.rs | 26 ++++++++++++++++++-------- 3 files changed, 29 insertions(+), 10 deletions(-) diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index d9564df3..defbac07 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -248,8 +248,12 @@ impl ConnectionsGroup { } } else { // i don't think this is an error. i think its just if a reconnect is currently happening - warn!("connection missing: {}", rpc_name); - debug!("web3_rpcs.by_name: {:#?}", web3_rpcs.by_name); + if web3_rpcs.synced() { + warn!("connection missing: {}", rpc_name); + debug!("web3_rpcs.by_name: {:#?}", web3_rpcs.by_name); + } else { + return Err(anyhow::anyhow!("not synced")); + } } } diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index b5e6c8fb..44973a2a 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -276,6 +276,11 @@ impl Web3Rpcs { while let Some(x) = spawn_handles.next().await { match x { Ok(Ok((connection, _handle))) => { + // TODO: wait for connection to have a block by watching a channel instead of looping + while connection.head_block.read().is_none() { + sleep(Duration::from_millis(100)).await; + } + // web3 connection worked let old_rpc = self .by_name diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index 17faee6c..e945d7b6 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -573,18 +573,22 @@ impl Web3Rpc { } pub async fn disconnect(&self) -> anyhow::Result<()> { - self.reconnect.store(false, atomic::Ordering::Release); - - let mut provider = self.provider.write().await; - info!("disconnecting {}", self); - *provider = None; + self.reconnect.store(false, atomic::Ordering::Release); if let Err(err) = self.disconnect_watch.as_ref().unwrap().send(true) { warn!("failed sending disconnect watch: {:?}", err); }; + trace!("disconnecting (locking) {}", self); + + let mut provider = self.provider.write().await; + + trace!("disconnecting (clearing provider) {}", self); + + *provider = None; + Ok(()) } @@ -715,12 +719,15 @@ impl Web3Rpc { let mut old_total_requests = 0; let mut new_total_requests; + // health check loop loop { + if rpc.should_disconnect() { + break; + } + sleep(Duration::from_secs(health_sleep_seconds)).await; - // health check - // TODO: lower this log level once disconnect works - debug!("health check on {}", rpc); + trace!("health check on {}", rpc); // TODO: what if we just happened to have this check line up with another restart? // TODO: think more about this @@ -806,6 +813,9 @@ impl Web3Rpc { old_total_requests = new_total_requests; } } + debug!("health checks for {} exited", rpc); + + Ok(()) }; futures.push(flatten_handle(tokio::spawn(f)));