Revert "Revert "fixes for disconnect and config reloads""

This reverts commit 3cfbc3cbf124caf9154ffe6a444082cb0c3e20a1.
This commit is contained in:
Bryan Stitt 2023-07-10 20:59:03 -07:00
parent 3cfbc3cbf1
commit fe39ad1e1e
2 changed files with 35 additions and 25 deletions

@ -184,6 +184,8 @@ impl Web3Rpcs {
let block_interval = average_block_interval(chain_id);
let mut names_to_keep = vec![];
// turn configs into connections (in parallel)
let mut spawn_handles: FuturesUnordered<_> = rpc_configs
.into_iter()
@ -208,6 +210,8 @@ impl Web3Rpcs {
debug!("spawning tasks for {}", server_name);
names_to_keep.push(server_name.clone());
let handle = tokio::spawn(server_config.spawn(
server_name,
db_conn,
@ -255,7 +259,7 @@ impl Web3Rpcs {
// tell the old rpc to disconnect
if let Some(ref disconnect_sender) = old_rpc.disconnect_watch {
trace!("telling {} to disconnect", old_rpc);
debug!("telling old {} to disconnect", old_rpc);
disconnect_sender.send_replace(true);
}
} else {
@ -275,6 +279,20 @@ impl Web3Rpcs {
}
}
// TODO: remove any RPCs that were part of the config, but are now removed
let active_names: Vec<_> = self.by_name.read().keys().cloned().collect();
for name in active_names {
if names_to_keep.contains(&name) {
continue;
}
if let Some(old_rpc) = self.by_name.write().remove(&name) {
if let Some(ref disconnect_sender) = old_rpc.disconnect_watch {
debug!("telling {} to disconnect. no longer needed", old_rpc);
disconnect_sender.send_replace(true);
}
}
}
let num_rpcs = self.len();
if num_rpcs < self.min_synced_rpcs {

@ -12,7 +12,7 @@ use anyhow::{anyhow, Context};
use arc_swap::ArcSwapOption;
use ethers::prelude::{Bytes, Middleware, TxHash, U64};
use ethers::types::{Address, Transaction, U256};
use futures::future::try_join_all;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use latency::{EwmaLatency, PeakEwmaLatency, RollingQuantileLatency};
use migration::sea_orm::DatabaseConnection;
@ -672,7 +672,7 @@ impl Web3Rpc {
.await
.web3_context("failed check_provider")?;
let mut futures = vec![];
let mut futures = FuturesUnordered::new();
// TODO: use this channel instead of self.disconnect_watch
let (subscribe_stop_tx, subscribe_stop_rx) = watch::channel(false);
@ -745,26 +745,14 @@ impl Web3Rpc {
if let Some(block_and_rpc_sender) = block_and_rpc_sender.clone() {
let clone = self.clone();
let subscribe_stop_rx = subscribe_stop_tx.subscribe();
let block_map = block_map.clone();
let f = async move {
let x = clone
.subscribe_new_heads(
block_and_rpc_sender.clone(),
block_map.clone(),
subscribe_stop_rx,
)
.await;
// error or success, we clear the block when subscribe_new_heads exits
clone
.send_head_block_result(Ok(None), &block_and_rpc_sender, &block_map)
.await?;
x
.subscribe_new_heads(block_and_rpc_sender.clone(), block_map, subscribe_stop_rx)
.await
};
// TODO: if
futures.push(flatten_handle(tokio::spawn(f)));
}
@ -780,12 +768,16 @@ impl Web3Rpc {
futures.push(flatten_handle(tokio::spawn(f)));
}
// try_join on the futures
if let Err(err) = try_join_all(futures).await {
warn!(?err, "subscription erred");
}
// exit if any of the futures exit
let first_exit = futures.next().await;
debug!("subscriptions on {} exited", self);
debug!(?first_exit, "subscriptions on {} exited", self);
// clear the head block
if let Some(block_and_rpc_sender) = block_and_rpc_sender {
self.send_head_block_result(Ok(None), &block_and_rpc_sender, &block_map)
.await?
};
subscribe_stop_tx.send_replace(true);
@ -856,7 +848,7 @@ impl Web3Rpc {
loop {
if *subscribe_stop_rx.borrow() {
trace!("stopping http block subscription on {}", self);
trace!(%self, "stopping http block subscription");
break;
}
@ -885,7 +877,7 @@ impl Web3Rpc {
.await?;
if *subscribe_stop_rx.borrow() {
debug!("new heads subscription exited");
debug!(%self, "new heads subscription exited");
Ok(())
} else {
Err(anyhow!("new_heads subscription exited. reconnect needed").into())