From a01038eb3d2987a86b70fc0a61c838bf26098138 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Tue, 14 Nov 2023 15:10:11 -0800 Subject: [PATCH] optional block more places --- web3_proxy/src/rpcs/blockchain.rs | 3 +- web3_proxy/src/rpcs/consensus.rs | 131 +++++++++++++++++++++--------- 2 files changed, 95 insertions(+), 39 deletions(-) diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index 754f0d12..68b00eef 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -23,7 +23,8 @@ pub type BlocksByHashCache = Cache; pub type BlocksByNumberCache = Cache; /// A block and its age with a less verbose serialized format -#[derive(Clone, Debug, Default)] +/// This does **not** implement Default. We rarely want a block with number 0 and hash 0. +#[derive(Clone, Debug)] pub struct Web3ProxyBlock(pub ArcBlock); impl Serialize for Web3ProxyBlock { diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index 8fd28639..81e0eb7e 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -86,7 +86,7 @@ enum SortMethod { /// TODO: make serializing work. the key needs to be a string. I think we need `serialize_with` #[derive(Clone, Debug, Serialize)] pub struct RankedRpcs { - pub head_block: Web3ProxyBlock, + pub head_block: Option, pub num_synced: usize, pub backups_needed: bool, pub check_block_data: bool, @@ -113,9 +113,6 @@ impl RankedRpcs { // we don't need to sort the rpcs now. we will sort them when a request neds them // TODO: the shame about this is that we lose just being able to compare 2 random servers - // TODO: why is head_block not set here?! it should always be set! - let head_block = head_block.unwrap_or_default(); - let rpcs: HashSet<_> = rpcs.into_iter().collect(); let backups_needed = rpcs.iter().any(|x| x.backup); @@ -204,7 +201,7 @@ impl RankedRpcs { let consensus = RankedRpcs { backups_needed, check_block_data: true, - head_block: best_block, + head_block: Some(best_block), sort_mode, inner: best_rpcs, num_synced, @@ -221,7 +218,7 @@ impl RankedRpcs { return None; } - let head_block = self.head_block.number(); + let head_block_num = self.head_block.as_ref().map(|x| x.number()); let num_active = self.num_active_rpcs(); @@ -289,7 +286,7 @@ impl RankedRpcs { } if inner_for_request.is_empty() { - warn!(?inner_for_request, ?outer_for_request, %web3_request, %head_block, "no rpcs for request"); + warn!(?inner_for_request, ?outer_for_request, %web3_request, head_block=%MaybeBlockNum(&head_block_num), "no rpcs for request"); None } else { trace!(?inner_for_request, ?outer_for_request, %web3_request, "for_request"); @@ -467,7 +464,7 @@ impl ConsensusFinder { num_consensus_rpcs, num_active_rpcs, total_rpcs, - consensus_head_block, + MaybeBlock(&consensus_head_block), rpc_head_str, ); @@ -476,13 +473,20 @@ impl ConsensusFinder { warn!("Backup RPCs are in use!"); } - // this should already be cached - let consensus_head_block = web3_rpcs - .try_cache_block(consensus_head_block, true) - .await?; + // this should already be cached, but now we set to consensus_head + let consensus_head_block = if let Some(consensus_head_block) = consensus_head_block + { + let consensus_head_block = web3_rpcs + .try_cache_block(consensus_head_block, true) + .await?; + + Some(consensus_head_block) + } else { + None + }; watch_consensus_head_sender - .send(Some(consensus_head_block)) + .send(consensus_head_block) .or(Err(Web3ProxyError::WatchSendError)) .web3_context( "watch_consensus_head_sender failed sending first consensus_head_block", @@ -491,10 +495,16 @@ impl ConsensusFinder { Some(old_consensus_connections) => { let old_head_block = &old_consensus_connections.head_block; - match consensus_head_block.number().cmp(&old_head_block.number()) { + let consensus_num = consensus_head_block.as_ref().map(|x| x.number()); + let old_head_num = old_head_block.as_ref().map(|x| x.number()); + + let consensus_hash = consensus_head_block.as_ref().map(|x| x.hash()); + let old_head_hash = old_head_block.as_ref().map(|x| x.hash()); + + match consensus_num.cmp(&old_head_num) { Ordering::Equal => { - // multiple blocks with the same fork! - if consensus_head_block.hash() == old_head_block.hash() { + // multiple blocks with the same number! fork detected! + if consensus_hash == old_head_hash { // no change in hash. no need to use watch_consensus_head_sender // TODO: trace level if rpc is backup debug!( @@ -505,7 +515,7 @@ impl ConsensusFinder { num_consensus_rpcs, num_active_rpcs, total_rpcs, - consensus_head_block, + MaybeBlock(&consensus_head_block), rpc_head_str, ) } else { @@ -519,18 +529,26 @@ impl ConsensusFinder { num_consensus_rpcs, num_active_rpcs, total_rpcs, - consensus_head_block, - old_head_block, + MaybeBlock(&consensus_head_block), + MaybeBlock(old_head_block), rpc_head_str, ); - let consensus_head_block = web3_rpcs - .try_cache_block(consensus_head_block, true) - .await - .web3_context("save consensus_head_block as heaviest chain")?; + let consensus_head_block = if let Some(consensus_head_block) = + consensus_head_block + { + let consensus_head_block = web3_rpcs + .try_cache_block(consensus_head_block, true) + .await + .web3_context("save consensus_head_block as heaviest chain")?; + + Some(consensus_head_block) + } else { + None + }; watch_consensus_head_sender - .send(Some(consensus_head_block)) + .send(consensus_head_block) .or(Err(Web3ProxyError::WatchSendError)) .web3_context("watch_consensus_head_sender failed sending uncled consensus_head_block")?; } @@ -546,8 +564,8 @@ impl ConsensusFinder { num_consensus_rpcs, num_active_rpcs, total_rpcs, - consensus_head_block, - old_head_block, + MaybeBlock(&consensus_head_block), + MaybeBlock(old_head_block), rpc_head_str, ); @@ -557,15 +575,22 @@ impl ConsensusFinder { } // TODO: tell save_block to remove any higher block numbers from the cache. not needed because we have other checks on requested blocks being > head, but still seems like a good idea - let consensus_head_block = web3_rpcs - .try_cache_block(consensus_head_block, true) - .await - .web3_context( - "save_block sending consensus_head_block as heaviest chain", - )?; + let consensus_head_block = + if let Some(consensus_head_block) = consensus_head_block { + let consensus_head_block = web3_rpcs + .try_cache_block(consensus_head_block, true) + .await + .web3_context( + "save_block sending consensus_head_block as heaviest chain", + )?; + + Some(consensus_head_block) + } else { + None + }; watch_consensus_head_sender - .send(Some(consensus_head_block)) + .send(consensus_head_block) .or(Err(Web3ProxyError::WatchSendError)) .web3_context("watch_consensus_head_sender failed sending rollback consensus_head_block")?; } @@ -578,7 +603,7 @@ impl ConsensusFinder { num_consensus_rpcs, num_active_rpcs, total_rpcs, - consensus_head_block, + MaybeBlock(&consensus_head_block), rpc_head_str, ); @@ -587,11 +612,19 @@ impl ConsensusFinder { warn!("Backup RPCs are in use!"); } - let consensus_head_block = web3_rpcs - .try_cache_block(consensus_head_block, true) - .await?; + // this should already be cached, but now we set to consensus_head + let consensus_head_block = + if let Some(consensus_head_block) = consensus_head_block { + Some( + web3_rpcs + .try_cache_block(consensus_head_block, true) + .await?, + ) + } else { + None + }; - watch_consensus_head_sender.send(Some(consensus_head_block)) + watch_consensus_head_sender.send(consensus_head_block) .or(Err(Web3ProxyError::WatchSendError)) .web3_context("watch_consensus_head_sender failed sending new consensus_head_block")?; } @@ -1019,3 +1052,25 @@ impl RpcsForRequest { // TODO: log that no servers were available. this might not be a server error. the user might have requested something in the far future (common when people mix up chains) } } + +struct MaybeBlock<'a>(pub &'a Option); + +impl std::fmt::Display for MaybeBlock<'_> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self.0 { + Some(x) => write!(f, "{}", x), + None => write!(f, "None"), + } + } +} + +struct MaybeBlockNum<'a>(pub &'a Option); + +impl std::fmt::Display for MaybeBlockNum<'_> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self.0 { + Some(x) => write!(f, "{}", x), + None => write!(f, "None"), + } + } +}