diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index 944835f1..c4bb6917 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -334,7 +334,7 @@ impl Web3RpcConfig { block_interval: Duration, http_client: Option, blocks_by_hash_cache: BlocksByHashCache, - block_sender: Option>, + block_and_rpc_sender: Option>, max_head_block_age: Duration, ) -> anyhow::Result<(Arc, Web3ProxyJoinHandle<()>)> { if !self.extra.is_empty() { @@ -351,7 +351,7 @@ impl Web3RpcConfig { server_id, block_interval, blocks_by_hash_cache, - block_sender, + block_and_rpc_sender, max_head_block_age, ) .await diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index ecc5dd73..454de5cf 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -408,7 +408,7 @@ impl Web3Rpcs { pub(super) async fn process_incoming_blocks( &self, - mut block_receiver: mpsc::UnboundedReceiver, + mut block_and_rpc_receiver: mpsc::UnboundedReceiver, ) -> Web3ProxyResult<()> { let mut consensus_finder = ConsensusFinder::new(Some(self.max_head_block_age), Some(self.max_head_block_lag)); @@ -419,7 +419,7 @@ impl Web3Rpcs { let mut had_first_success = false; loop { - match timeout(double_block_time, block_receiver.recv()).await { + match timeout(double_block_time, block_and_rpc_receiver.recv()).await { Ok(Some((new_block, rpc))) => { let rpc_name = rpc.name.clone(); let rpc_is_backup = rpc.backup; diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 614ef4d1..201161b5 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -83,7 +83,7 @@ impl Web3Rpcs { Web3ProxyJoinHandle<()>, watch::Receiver>>, )> { - let (block_sender, block_receiver) = mpsc::unbounded_channel::(); + let (block_and_rpc_sender, block_and_rpc_receiver) = mpsc::unbounded_channel::(); // these blocks don't have full transactions, but they do have rather variable amounts of transaction hashes // TODO: actual weighter on this @@ -113,7 +113,7 @@ impl Web3Rpcs { average_block_interval(chain_id).mul_f32((max_head_block_lag.as_u64() * 10) as f32); let connections = Arc::new(Self { - block_sender, + block_sender: block_and_rpc_sender, blocks_by_hash, blocks_by_number, by_name, @@ -130,7 +130,7 @@ impl Web3Rpcs { let handle = { let connections = connections.clone(); - tokio::spawn(connections.subscribe(block_receiver)) + tokio::spawn(connections.subscribe(block_and_rpc_receiver)) }; Ok((connections, handle, consensus_connections_watcher)) @@ -313,7 +313,7 @@ impl Web3Rpcs { /// transaction ids from all the `Web3Rpc`s are deduplicated and forwarded to `pending_tx_sender` async fn subscribe( self: Arc, - block_receiver: mpsc::UnboundedReceiver, + block_and_rpc_receiver: mpsc::UnboundedReceiver, ) -> Web3ProxyResult<()> { let mut futures = vec![]; @@ -348,7 +348,7 @@ impl Web3Rpcs { let handle = tokio::task::Builder::default() .name("process_incoming_blocks") - .spawn(async move { connections.process_incoming_blocks(block_receiver).await })?; + .spawn(async move { connections.process_incoming_blocks(block_and_rpc_receiver).await })?; futures.push(flatten_handle(handle)); }