From 50fed0b0e79778d17bb40df96a11e44341dde0d1 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Mon, 10 Jul 2023 19:41:14 -0700 Subject: [PATCH] fixes for disconnect and config reloads --- web3_proxy/src/rpcs/many.rs | 20 ++++++++++++++++++- web3_proxy/src/rpcs/one.rs | 40 +++++++++++++++---------------------- 2 files changed, 35 insertions(+), 25 deletions(-) diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 1f9e493b..63e0f297 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -184,6 +184,8 @@ impl Web3Rpcs { let block_interval = average_block_interval(chain_id); + let mut names_to_keep = vec![]; + // turn configs into connections (in parallel) let mut spawn_handles: FuturesUnordered<_> = rpc_configs .into_iter() @@ -208,6 +210,8 @@ impl Web3Rpcs { debug!("spawning tasks for {}", server_name); + names_to_keep.push(server_name.clone()); + let handle = tokio::spawn(server_config.spawn( server_name, db_conn, @@ -255,7 +259,7 @@ impl Web3Rpcs { // tell the old rpc to disconnect if let Some(ref disconnect_sender) = old_rpc.disconnect_watch { - trace!("telling {} to disconnect", old_rpc); + debug!("telling old {} to disconnect", old_rpc); disconnect_sender.send_replace(true); } } else { @@ -275,6 +279,20 @@ impl Web3Rpcs { } } + // TODO: remove any RPCs that were part of the config, but are now removed + let active_names: Vec<_> = self.by_name.read().keys().cloned().collect(); + for name in active_names { + if names_to_keep.contains(&name) { + continue; + } + if let Some(old_rpc) = self.by_name.write().remove(&name) { + if let Some(ref disconnect_sender) = old_rpc.disconnect_watch { + debug!("telling {} to disconnect. no longer needed", old_rpc); + disconnect_sender.send_replace(true); + } + } + } + let num_rpcs = self.len(); if num_rpcs < self.min_synced_rpcs { diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index ce8c9cf8..a07de685 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -12,7 +12,7 @@ use anyhow::{anyhow, Context}; use arc_swap::ArcSwapOption; use ethers::prelude::{Bytes, Middleware, TxHash, U64}; use ethers::types::{Address, Transaction, U256}; -use futures::future::try_join_all; +use futures::stream::FuturesUnordered; use futures::StreamExt; use latency::{EwmaLatency, PeakEwmaLatency, RollingQuantileLatency}; use migration::sea_orm::DatabaseConnection; @@ -672,7 +672,7 @@ impl Web3Rpc { .await .web3_context("failed check_provider")?; - let mut futures = vec![]; + let mut futures = FuturesUnordered::new(); // TODO: use this channel instead of self.disconnect_watch let (subscribe_stop_tx, subscribe_stop_rx) = watch::channel(false); @@ -745,26 +745,14 @@ impl Web3Rpc { if let Some(block_and_rpc_sender) = block_and_rpc_sender.clone() { let clone = self.clone(); let subscribe_stop_rx = subscribe_stop_tx.subscribe(); + let block_map = block_map.clone(); let f = async move { - let x = clone - .subscribe_new_heads( - block_and_rpc_sender.clone(), - block_map.clone(), - subscribe_stop_rx, - ) - .await; - - // error or success, we clear the block when subscribe_new_heads exits clone - .send_head_block_result(Ok(None), &block_and_rpc_sender, &block_map) - .await?; - - x + .subscribe_new_heads(block_and_rpc_sender.clone(), block_map, subscribe_stop_rx) + .await }; - // TODO: if - futures.push(flatten_handle(tokio::spawn(f))); } @@ -780,12 +768,16 @@ impl Web3Rpc { futures.push(flatten_handle(tokio::spawn(f))); } - // try_join on the futures - if let Err(err) = try_join_all(futures).await { - warn!(?err, "subscription erred"); - } + // exit if any of the futures exit + let first_exit = futures.next().await; - debug!("subscriptions on {} exited", self); + debug!(?first_exit, "subscriptions on {} exited", self); + + // clear the head block + if let Some(block_and_rpc_sender) = block_and_rpc_sender { + self.send_head_block_result(Ok(None), &block_and_rpc_sender, &block_map) + .await? + }; subscribe_stop_tx.send_replace(true); @@ -856,7 +848,7 @@ impl Web3Rpc { loop { if *subscribe_stop_rx.borrow() { - trace!("stopping http block subscription on {}", self); + trace!(%self, "stopping http block subscription"); break; } @@ -885,7 +877,7 @@ impl Web3Rpc { .await?; if *subscribe_stop_rx.borrow() { - debug!("new heads subscription exited"); + debug!(%self, "new heads subscription exited"); Ok(()) } else { Err(anyhow!("new_heads subscription exited. reconnect needed").into())