diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index 12bba91a..ea4d56db 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -190,6 +190,10 @@ impl Web3Rpcs { &self, mut block_and_rpc_receiver: mpsc::UnboundedReceiver, ) -> Web3ProxyResult<()> { + if self.watch_head_block.is_none() { + return Ok(()); + } + let mut consensus_finder = ConsensusFinder::new(Some(self.max_head_block_age), Some(self.max_head_block_lag)); diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index a3168566..ebf7f483 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -2,7 +2,7 @@ use super::blockchain::{BlocksByHashCache, BlocksByNumberCache, Web3ProxyBlock}; use super::consensus::{RankedRpcs, RpcsForRequest}; use super::one::Web3Rpc; -use crate::app::{flatten_handle, App, Web3ProxyJoinHandle}; +use crate::app::{App, Web3ProxyJoinHandle}; use crate::config::{average_block_interval, BlockAndRpc, Web3RpcConfig}; use crate::errors::{Web3ProxyError, Web3ProxyResult}; use crate::frontend::rpc_proxy_ws::ProxyMode; @@ -12,7 +12,6 @@ use crate::jsonrpc::{self, JsonRpcErrorData, JsonRpcParams, JsonRpcResultData}; use deduped_broadcast::DedupedBroadcaster; use derive_more::From; use ethers::prelude::{TxHash, U64}; -use futures::future::try_join_all; use futures::stream::StreamExt; use futures_util::future::join_all; use hashbrown::HashMap; @@ -155,7 +154,11 @@ impl Web3Rpcs { let handle = { let connections = connections.clone(); - tokio::spawn(connections.subscribe(block_and_rpc_receiver)) + tokio::spawn(async move { + connections + .process_incoming_blocks(block_and_rpc_receiver) + .await + }) }; Ok((connections, handle, consensus_connections_watcher)) @@ -346,49 +349,6 @@ impl Web3Rpcs { self.min_synced_rpcs } - /// subscribe to blocks and transactions from all the backend rpcs. - /// blocks are processed by all the `Web3Rpc`s and then sent to the `block_receiver` - /// transaction ids from all the `Web3Rpc`s are deduplicated and forwarded to `pending_tx_sender` - /// returns True if the subscription is not needed. False if it exited after starting - async fn subscribe( - self: Arc, - block_and_rpc_receiver: mpsc::UnboundedReceiver, - ) -> Web3ProxyResult<()> { - let mut futures = vec![]; - - // TODO: do we need anything here to set up the transaction funnel - - // setup the block funnel - if self.watch_head_block.is_some() { - let connections = Arc::clone(&self); - - let handle = tokio::task::Builder::default() - .name("process_incoming_blocks") - .spawn(async move { - connections - .process_incoming_blocks(block_and_rpc_receiver) - .await - })?; - - futures.push(flatten_handle(handle)); - } - - if futures.is_empty() { - // no transaction or block subscriptions. - info!(?self, "no subscriptions needed"); - return Ok(()); - } - - if let Err(e) = try_join_all(futures).await { - error!(?self, "subscriptions over"); - // TODO: i think this should maybe be a panic - return Err(e); - } - - info!(?self, "subscriptions over"); - Ok(()) - } - /// TODO: i think this RpcsForRequest should be stored on the ValidatedRequest when its made. that way any waiting for sync happens early and we don't need waiting anywhere else in the app pub async fn wait_for_rpcs_for_request( &self,