diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index 0e5f628f..fe296024 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -743,7 +743,7 @@ impl Web3Rpc { let mut abort_handles = vec![]; // health check that runs if there haven't been any recent requests - if let Some(block_and_rpc_sender) = block_and_rpc_sender.clone() { + let health_handle = if let Some(block_and_rpc_sender) = block_and_rpc_sender.clone() { // TODO: move this into a proper function let rpc = self.clone(); let block_map = block_map.clone(); @@ -758,7 +758,7 @@ impl Web3Rpc { let mut old_total_requests = 0; let mut new_total_requests; - // errors here should not cause the loop to exit! + // errors here should not cause the loop to exit! only mark unhealthy loop { new_total_requests = rpc.internal_requests.load(atomic::Ordering::Relaxed) + rpc.external_requests.load(atomic::Ordering::Relaxed); @@ -806,14 +806,39 @@ impl Web3Rpc { self.healthy.store(initial_check, atomic::Ordering::Relaxed); - let h = tokio::spawn(f); - let a = h.abort_handle(); - - futures.push(h); - abort_handles.push(a); + tokio::spawn(f) } else { self.healthy.store(true, atomic::Ordering::Relaxed); - } + + let rpc = self.clone(); + let health_sleep_seconds = 60; + + let f = async move { + // errors here should not cause the loop to exit! only mark unhealthy + loop { + // TODO: if this fails too many times, reset the connection + if let Err(err) = rpc.check_provider(chain_id).await { + rpc.healthy.store(false, atomic::Ordering::Relaxed); + + // TODO: if rate limit error, set "retry_at" + if rpc.backup { + warn!(?err, "provider check on {} failed", rpc); + } else { + error!(?err, "provider check on {} failed", rpc); + } + } else { + rpc.healthy.store(true, atomic::Ordering::Relaxed); + } + + sleep(Duration::from_secs(health_sleep_seconds)).await; + } + }; + + tokio::spawn(f) + }; + + abort_handles.push(health_handle.abort_handle()); + futures.push(health_handle); // subscribe to new heads if let Some(block_and_rpc_sender) = block_and_rpc_sender.clone() { @@ -853,23 +878,10 @@ impl Web3Rpc { } } - if futures.is_empty() { - // we didn't have anything to subscribe to. what should happen? - let clone = self.clone(); + // exit if any of the futures exit + let (first_exit, _, _) = select_all(futures).await; - loop { - sleep(Duration::from_secs(60)).await; - if self.should_disconnect() { - break; - } - } - } else { - // exit if any of the futures exit - // TODO: have an enum for which one exited? - let (first_exit, _, _) = select_all(futures).await; - - debug!(?first_exit, "subscriptions on {} exited", self); - } + debug!(?first_exit, "subscriptions on {} exited", self); // clear the head block if let Some(block_and_rpc_sender) = block_and_rpc_sender {