fixes for disconnect and config reloads

This commit is contained in:
Bryan Stitt 2023-07-10 19:41:14 -07:00
parent 2dfc5dcdf3
commit 50fed0b0e7
2 changed files with 35 additions and 25 deletions

View File

@ -184,6 +184,8 @@ impl Web3Rpcs {
let block_interval = average_block_interval(chain_id); let block_interval = average_block_interval(chain_id);
let mut names_to_keep = vec![];
// turn configs into connections (in parallel) // turn configs into connections (in parallel)
let mut spawn_handles: FuturesUnordered<_> = rpc_configs let mut spawn_handles: FuturesUnordered<_> = rpc_configs
.into_iter() .into_iter()
@ -208,6 +210,8 @@ impl Web3Rpcs {
debug!("spawning tasks for {}", server_name); debug!("spawning tasks for {}", server_name);
names_to_keep.push(server_name.clone());
let handle = tokio::spawn(server_config.spawn( let handle = tokio::spawn(server_config.spawn(
server_name, server_name,
db_conn, db_conn,
@ -255,7 +259,7 @@ impl Web3Rpcs {
// tell the old rpc to disconnect // tell the old rpc to disconnect
if let Some(ref disconnect_sender) = old_rpc.disconnect_watch { if let Some(ref disconnect_sender) = old_rpc.disconnect_watch {
trace!("telling {} to disconnect", old_rpc); debug!("telling old {} to disconnect", old_rpc);
disconnect_sender.send_replace(true); disconnect_sender.send_replace(true);
} }
} else { } else {
@ -275,6 +279,20 @@ impl Web3Rpcs {
} }
} }
// TODO: remove any RPCs that were part of the config, but are now removed
let active_names: Vec<_> = self.by_name.read().keys().cloned().collect();
for name in active_names {
if names_to_keep.contains(&name) {
continue;
}
if let Some(old_rpc) = self.by_name.write().remove(&name) {
if let Some(ref disconnect_sender) = old_rpc.disconnect_watch {
debug!("telling {} to disconnect. no longer needed", old_rpc);
disconnect_sender.send_replace(true);
}
}
}
let num_rpcs = self.len(); let num_rpcs = self.len();
if num_rpcs < self.min_synced_rpcs { if num_rpcs < self.min_synced_rpcs {

View File

@ -12,7 +12,7 @@ use anyhow::{anyhow, Context};
use arc_swap::ArcSwapOption; use arc_swap::ArcSwapOption;
use ethers::prelude::{Bytes, Middleware, TxHash, U64}; use ethers::prelude::{Bytes, Middleware, TxHash, U64};
use ethers::types::{Address, Transaction, U256}; use ethers::types::{Address, Transaction, U256};
use futures::future::try_join_all; use futures::stream::FuturesUnordered;
use futures::StreamExt; use futures::StreamExt;
use latency::{EwmaLatency, PeakEwmaLatency, RollingQuantileLatency}; use latency::{EwmaLatency, PeakEwmaLatency, RollingQuantileLatency};
use migration::sea_orm::DatabaseConnection; use migration::sea_orm::DatabaseConnection;
@ -672,7 +672,7 @@ impl Web3Rpc {
.await .await
.web3_context("failed check_provider")?; .web3_context("failed check_provider")?;
let mut futures = vec![]; let mut futures = FuturesUnordered::new();
// TODO: use this channel instead of self.disconnect_watch // TODO: use this channel instead of self.disconnect_watch
let (subscribe_stop_tx, subscribe_stop_rx) = watch::channel(false); let (subscribe_stop_tx, subscribe_stop_rx) = watch::channel(false);
@ -745,26 +745,14 @@ impl Web3Rpc {
if let Some(block_and_rpc_sender) = block_and_rpc_sender.clone() { if let Some(block_and_rpc_sender) = block_and_rpc_sender.clone() {
let clone = self.clone(); let clone = self.clone();
let subscribe_stop_rx = subscribe_stop_tx.subscribe(); let subscribe_stop_rx = subscribe_stop_tx.subscribe();
let block_map = block_map.clone();
let f = async move { let f = async move {
let x = clone
.subscribe_new_heads(
block_and_rpc_sender.clone(),
block_map.clone(),
subscribe_stop_rx,
)
.await;
// error or success, we clear the block when subscribe_new_heads exits
clone clone
.send_head_block_result(Ok(None), &block_and_rpc_sender, &block_map) .subscribe_new_heads(block_and_rpc_sender.clone(), block_map, subscribe_stop_rx)
.await?; .await
x
}; };
// TODO: if
futures.push(flatten_handle(tokio::spawn(f))); futures.push(flatten_handle(tokio::spawn(f)));
} }
@ -780,12 +768,16 @@ impl Web3Rpc {
futures.push(flatten_handle(tokio::spawn(f))); futures.push(flatten_handle(tokio::spawn(f)));
} }
// try_join on the futures // exit if any of the futures exit
if let Err(err) = try_join_all(futures).await { let first_exit = futures.next().await;
warn!(?err, "subscription erred");
}
debug!("subscriptions on {} exited", self); debug!(?first_exit, "subscriptions on {} exited", self);
// clear the head block
if let Some(block_and_rpc_sender) = block_and_rpc_sender {
self.send_head_block_result(Ok(None), &block_and_rpc_sender, &block_map)
.await?
};
subscribe_stop_tx.send_replace(true); subscribe_stop_tx.send_replace(true);
@ -856,7 +848,7 @@ impl Web3Rpc {
loop { loop {
if *subscribe_stop_rx.borrow() { if *subscribe_stop_rx.borrow() {
trace!("stopping http block subscription on {}", self); trace!(%self, "stopping http block subscription");
break; break;
} }
@ -885,7 +877,7 @@ impl Web3Rpc {
.await?; .await?;
if *subscribe_stop_rx.borrow() { if *subscribe_stop_rx.borrow() {
debug!("new heads subscription exited"); debug!(%self, "new heads subscription exited");
Ok(()) Ok(())
} else { } else {
Err(anyhow!("new_heads subscription exited. reconnect needed").into()) Err(anyhow!("new_heads subscription exited. reconnect needed").into())