disconnect is noisy but it works

This commit is contained in:
Bryan Stitt 2023-02-27 13:29:07 -08:00
parent 11ee0aafe9
commit c5e2402d6e
3 changed files with 29 additions and 10 deletions

@ -248,8 +248,12 @@ impl ConnectionsGroup {
}
} else {
// i don't think this is an error. i think its just if a reconnect is currently happening
warn!("connection missing: {}", rpc_name);
debug!("web3_rpcs.by_name: {:#?}", web3_rpcs.by_name);
if web3_rpcs.synced() {
warn!("connection missing: {}", rpc_name);
debug!("web3_rpcs.by_name: {:#?}", web3_rpcs.by_name);
} else {
return Err(anyhow::anyhow!("not synced"));
}
}
}

@ -276,6 +276,11 @@ impl Web3Rpcs {
while let Some(x) = spawn_handles.next().await {
match x {
Ok(Ok((connection, _handle))) => {
// TODO: wait for connection to have a block by watching a channel instead of looping
while connection.head_block.read().is_none() {
sleep(Duration::from_millis(100)).await;
}
// web3 connection worked
let old_rpc = self
.by_name

@ -573,18 +573,22 @@ impl Web3Rpc {
}
pub async fn disconnect(&self) -> anyhow::Result<()> {
self.reconnect.store(false, atomic::Ordering::Release);
let mut provider = self.provider.write().await;
info!("disconnecting {}", self);
*provider = None;
self.reconnect.store(false, atomic::Ordering::Release);
if let Err(err) = self.disconnect_watch.as_ref().unwrap().send(true) {
warn!("failed sending disconnect watch: {:?}", err);
};
trace!("disconnecting (locking) {}", self);
let mut provider = self.provider.write().await;
trace!("disconnecting (clearing provider) {}", self);
*provider = None;
Ok(())
}
@ -715,12 +719,15 @@ impl Web3Rpc {
let mut old_total_requests = 0;
let mut new_total_requests;
// health check loop
loop {
if rpc.should_disconnect() {
break;
}
sleep(Duration::from_secs(health_sleep_seconds)).await;
// health check
// TODO: lower this log level once disconnect works
debug!("health check on {}", rpc);
trace!("health check on {}", rpc);
// TODO: what if we just happened to have this check line up with another restart?
// TODO: think more about this
@ -806,6 +813,9 @@ impl Web3Rpc {
old_total_requests = new_total_requests;
}
}
debug!("health checks for {} exited", rpc);
Ok(())
};
futures.push(flatten_handle(tokio::spawn(f)));