diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index d4d5e552..8fd28639 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -402,6 +402,8 @@ impl ConsensusFinder { rpc: Option<&Arc>, new_block: Option, ) -> Web3ProxyResult { + let rpc_block_sender = rpc.and_then(|x| x.head_block_sender.as_ref()); + let new_ranked_rpcs = match self .rank_rpcs(web3_rpcs) .await @@ -409,6 +411,11 @@ impl ConsensusFinder { { None => { warn!(?rpc, ?new_block, "no ranked rpcs found!"); + + if let Some(rpc_block_sender) = rpc_block_sender { + rpc_block_sender.send_replace(new_block); + } + return Ok(false); } Some(x) => x, @@ -428,6 +435,10 @@ impl ConsensusFinder { let new_ranked_rpcs = Arc::new(new_ranked_rpcs); + if let Some(rpc_block_sender) = rpc_block_sender { + rpc_block_sender.send_replace(new_block.clone()); + } + let old_ranked_rpcs = web3_rpcs .watch_ranked_rpcs .send_replace(Some(new_ranked_rpcs.clone())); diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index 71b5c653..84c44266 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -567,7 +567,10 @@ impl Web3Rpc { self: &Arc, new_head_block: Web3ProxyResult>, ) -> Web3ProxyResult<()> { - let head_block_sender = self.head_block_sender.as_ref().unwrap(); + let head_block_sender = self + .head_block_sender + .as_ref() + .expect("head_block_sender is always set"); let new_head_block = match new_head_block { Ok(x) => { @@ -584,11 +587,9 @@ impl Web3Rpc { trace!("clearing head block on {} ({}ms old)!", self, age); - // send an empty block to take this server out of rotation - head_block_sender.send_replace(None); - // TODO: clear self.block_data_limit? + // send an empty block to take this server out of rotation None } Some(new_head_block) => { @@ -603,7 +604,6 @@ impl Web3Rpc { .await; // we are synced! yey! - head_block_sender.send_replace(Some(new_head_block.clone())); // TODO: checking this every time seems excessive if self.block_data_limit() == U64::zero() { @@ -631,11 +631,14 @@ impl Web3Rpc { } }; - // tell web3rpcs about this rpc having this block if let Some(block_and_rpc_sender) = &self.block_and_rpc_sender { + // tell web3rpcs about this rpc having this block + // web3rpcs will do `self.head_block_sender.send_replace(new_head_block)` block_and_rpc_sender .send((new_head_block, self.clone())) .context("block_and_rpc_sender failed sending")?; + } else { + head_block_sender.send_replace(new_head_block); } Ok(())