always have some kind of health check

This commit is contained in:
Bryan Stitt 2023-10-12 21:05:17 -07:00
parent 51d86ade59
commit 3b45256c9a

View File

@ -743,7 +743,7 @@ impl Web3Rpc {
let mut abort_handles = vec![]; let mut abort_handles = vec![];
// health check that runs if there haven't been any recent requests // health check that runs if there haven't been any recent requests
if let Some(block_and_rpc_sender) = block_and_rpc_sender.clone() { let health_handle = if let Some(block_and_rpc_sender) = block_and_rpc_sender.clone() {
// TODO: move this into a proper function // TODO: move this into a proper function
let rpc = self.clone(); let rpc = self.clone();
let block_map = block_map.clone(); let block_map = block_map.clone();
@ -758,7 +758,7 @@ impl Web3Rpc {
let mut old_total_requests = 0; let mut old_total_requests = 0;
let mut new_total_requests; let mut new_total_requests;
// errors here should not cause the loop to exit! // errors here should not cause the loop to exit! only mark unhealthy
loop { loop {
new_total_requests = rpc.internal_requests.load(atomic::Ordering::Relaxed) new_total_requests = rpc.internal_requests.load(atomic::Ordering::Relaxed)
+ rpc.external_requests.load(atomic::Ordering::Relaxed); + rpc.external_requests.load(atomic::Ordering::Relaxed);
@ -806,14 +806,39 @@ impl Web3Rpc {
self.healthy.store(initial_check, atomic::Ordering::Relaxed); self.healthy.store(initial_check, atomic::Ordering::Relaxed);
let h = tokio::spawn(f); tokio::spawn(f)
let a = h.abort_handle();
futures.push(h);
abort_handles.push(a);
} else { } else {
self.healthy.store(true, atomic::Ordering::Relaxed); self.healthy.store(true, atomic::Ordering::Relaxed);
}
let rpc = self.clone();
let health_sleep_seconds = 60;
let f = async move {
// errors here should not cause the loop to exit! only mark unhealthy
loop {
// TODO: if this fails too many times, reset the connection
if let Err(err) = rpc.check_provider(chain_id).await {
rpc.healthy.store(false, atomic::Ordering::Relaxed);
// TODO: if rate limit error, set "retry_at"
if rpc.backup {
warn!(?err, "provider check on {} failed", rpc);
} else {
error!(?err, "provider check on {} failed", rpc);
}
} else {
rpc.healthy.store(true, atomic::Ordering::Relaxed);
}
sleep(Duration::from_secs(health_sleep_seconds)).await;
}
};
tokio::spawn(f)
};
abort_handles.push(health_handle.abort_handle());
futures.push(health_handle);
// subscribe to new heads // subscribe to new heads
if let Some(block_and_rpc_sender) = block_and_rpc_sender.clone() { if let Some(block_and_rpc_sender) = block_and_rpc_sender.clone() {
@ -853,23 +878,10 @@ impl Web3Rpc {
} }
} }
if futures.is_empty() { // exit if any of the futures exit
// we didn't have anything to subscribe to. what should happen? let (first_exit, _, _) = select_all(futures).await;
let clone = self.clone();
loop { debug!(?first_exit, "subscriptions on {} exited", self);
sleep(Duration::from_secs(60)).await;
if self.should_disconnect() {
break;
}
}
} else {
// exit if any of the futures exit
// TODO: have an enum for which one exited?
let (first_exit, _, _) = select_all(futures).await;
debug!(?first_exit, "subscriptions on {} exited", self);
}
// clear the head block // clear the head block
if let Some(block_and_rpc_sender) = block_and_rpc_sender { if let Some(block_and_rpc_sender) = block_and_rpc_sender {