From 3cfbc3cbf124caf9154ffe6a444082cb0c3e20a1 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Mon, 10 Jul 2023 20:30:10 -0700 Subject: [PATCH] Revert "fixes for disconnect and config reloads" This reverts commit 50fed0b0e79778d17bb40df96a11e44341dde0d1. --- web3_proxy/src/rpcs/many.rs | 20 +------------------ web3_proxy/src/rpcs/one.rs | 40 ++++++++++++++++++++++--------------- 2 files changed, 25 insertions(+), 35 deletions(-) diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 63e0f297..1f9e493b 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -184,8 +184,6 @@ impl Web3Rpcs { let block_interval = average_block_interval(chain_id); - let mut names_to_keep = vec![]; - // turn configs into connections (in parallel) let mut spawn_handles: FuturesUnordered<_> = rpc_configs .into_iter() @@ -210,8 +208,6 @@ impl Web3Rpcs { debug!("spawning tasks for {}", server_name); - names_to_keep.push(server_name.clone()); - let handle = tokio::spawn(server_config.spawn( server_name, db_conn, @@ -259,7 +255,7 @@ impl Web3Rpcs { // tell the old rpc to disconnect if let Some(ref disconnect_sender) = old_rpc.disconnect_watch { - debug!("telling old {} to disconnect", old_rpc); + trace!("telling {} to disconnect", old_rpc); disconnect_sender.send_replace(true); } } else { @@ -279,20 +275,6 @@ impl Web3Rpcs { } } - // TODO: remove any RPCs that were part of the config, but are now removed - let active_names: Vec<_> = self.by_name.read().keys().cloned().collect(); - for name in active_names { - if names_to_keep.contains(&name) { - continue; - } - if let Some(old_rpc) = self.by_name.write().remove(&name) { - if let Some(ref disconnect_sender) = old_rpc.disconnect_watch { - debug!("telling {} to disconnect. no longer needed", old_rpc); - disconnect_sender.send_replace(true); - } - } - } - let num_rpcs = self.len(); if num_rpcs < self.min_synced_rpcs { diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index a07de685..ce8c9cf8 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -12,7 +12,7 @@ use anyhow::{anyhow, Context}; use arc_swap::ArcSwapOption; use ethers::prelude::{Bytes, Middleware, TxHash, U64}; use ethers::types::{Address, Transaction, U256}; -use futures::stream::FuturesUnordered; +use futures::future::try_join_all; use futures::StreamExt; use latency::{EwmaLatency, PeakEwmaLatency, RollingQuantileLatency}; use migration::sea_orm::DatabaseConnection; @@ -672,7 +672,7 @@ impl Web3Rpc { .await .web3_context("failed check_provider")?; - let mut futures = FuturesUnordered::new(); + let mut futures = vec![]; // TODO: use this channel instead of self.disconnect_watch let (subscribe_stop_tx, subscribe_stop_rx) = watch::channel(false); @@ -745,14 +745,26 @@ impl Web3Rpc { if let Some(block_and_rpc_sender) = block_and_rpc_sender.clone() { let clone = self.clone(); let subscribe_stop_rx = subscribe_stop_tx.subscribe(); - let block_map = block_map.clone(); let f = async move { + let x = clone + .subscribe_new_heads( + block_and_rpc_sender.clone(), + block_map.clone(), + subscribe_stop_rx, + ) + .await; + + // error or success, we clear the block when subscribe_new_heads exits clone - .subscribe_new_heads(block_and_rpc_sender.clone(), block_map, subscribe_stop_rx) - .await + .send_head_block_result(Ok(None), &block_and_rpc_sender, &block_map) + .await?; + + x }; + // TODO: if + futures.push(flatten_handle(tokio::spawn(f))); } @@ -768,16 +780,12 @@ impl Web3Rpc { futures.push(flatten_handle(tokio::spawn(f))); } - // exit if any of the futures exit - let first_exit = futures.next().await; + // try_join on the futures + if let Err(err) = try_join_all(futures).await { + warn!(?err, "subscription erred"); + } - debug!(?first_exit, "subscriptions on {} exited", self); - - // clear the head block - if let Some(block_and_rpc_sender) = block_and_rpc_sender { - self.send_head_block_result(Ok(None), &block_and_rpc_sender, &block_map) - .await? - }; + debug!("subscriptions on {} exited", self); subscribe_stop_tx.send_replace(true); @@ -848,7 +856,7 @@ impl Web3Rpc { loop { if *subscribe_stop_rx.borrow() { - trace!(%self, "stopping http block subscription"); + trace!("stopping http block subscription on {}", self); break; } @@ -877,7 +885,7 @@ impl Web3Rpc { .await?; if *subscribe_stop_rx.borrow() { - debug!(%self, "new heads subscription exited"); + debug!("new heads subscription exited"); Ok(()) } else { Err(anyhow!("new_heads subscription exited. reconnect needed").into())