do rpc head and ranked rpc sending closer together

This commit is contained in:
Bryan Stitt 2023-11-08 15:34:26 -08:00
parent a0e586dac2
commit 63ddd9c0af
2 changed files with 20 additions and 6 deletions

@ -402,6 +402,8 @@ impl ConsensusFinder {
rpc: Option<&Arc<Web3Rpc>>,
new_block: Option<Web3ProxyBlock>,
) -> Web3ProxyResult<bool> {
let rpc_block_sender = rpc.and_then(|x| x.head_block_sender.as_ref());
let new_ranked_rpcs = match self
.rank_rpcs(web3_rpcs)
.await
@ -409,6 +411,11 @@ impl ConsensusFinder {
{
None => {
warn!(?rpc, ?new_block, "no ranked rpcs found!");
if let Some(rpc_block_sender) = rpc_block_sender {
rpc_block_sender.send_replace(new_block);
}
return Ok(false);
}
Some(x) => x,
@ -428,6 +435,10 @@ impl ConsensusFinder {
let new_ranked_rpcs = Arc::new(new_ranked_rpcs);
if let Some(rpc_block_sender) = rpc_block_sender {
rpc_block_sender.send_replace(new_block.clone());
}
let old_ranked_rpcs = web3_rpcs
.watch_ranked_rpcs
.send_replace(Some(new_ranked_rpcs.clone()));

@ -567,7 +567,10 @@ impl Web3Rpc {
self: &Arc<Self>,
new_head_block: Web3ProxyResult<Option<ArcBlock>>,
) -> Web3ProxyResult<()> {
let head_block_sender = self.head_block_sender.as_ref().unwrap();
let head_block_sender = self
.head_block_sender
.as_ref()
.expect("head_block_sender is always set");
let new_head_block = match new_head_block {
Ok(x) => {
@ -584,11 +587,9 @@ impl Web3Rpc {
trace!("clearing head block on {} ({}ms old)!", self, age);
// send an empty block to take this server out of rotation
head_block_sender.send_replace(None);
// TODO: clear self.block_data_limit?
// send an empty block to take this server out of rotation
None
}
Some(new_head_block) => {
@ -603,7 +604,6 @@ impl Web3Rpc {
.await;
// we are synced! yey!
head_block_sender.send_replace(Some(new_head_block.clone()));
// TODO: checking this every time seems excessive
if self.block_data_limit() == U64::zero() {
@ -631,11 +631,14 @@ impl Web3Rpc {
}
};
// tell web3rpcs about this rpc having this block
if let Some(block_and_rpc_sender) = &self.block_and_rpc_sender {
// tell web3rpcs about this rpc having this block
// web3rpcs will do `self.head_block_sender.send_replace(new_head_block)`
block_and_rpc_sender
.send((new_head_block, self.clone()))
.context("block_and_rpc_sender failed sending")?;
} else {
head_block_sender.send_replace(new_head_block);
}
Ok(())