optional block more places

This commit is contained in:
Bryan Stitt 2023-11-14 15:10:11 -08:00
parent 0d060033d0
commit a01038eb3d
2 changed files with 95 additions and 39 deletions

View File

@ -23,7 +23,8 @@ pub type BlocksByHashCache = Cache<H256, Web3ProxyBlock>;
pub type BlocksByNumberCache = Cache<U64, H256>;
/// A block and its age with a less verbose serialized format
#[derive(Clone, Debug, Default)]
/// This does **not** implement Default. We rarely want a block with number 0 and hash 0.
#[derive(Clone, Debug)]
pub struct Web3ProxyBlock(pub ArcBlock);
impl Serialize for Web3ProxyBlock {

View File

@ -86,7 +86,7 @@ enum SortMethod {
/// TODO: make serializing work. the key needs to be a string. I think we need `serialize_with`
#[derive(Clone, Debug, Serialize)]
pub struct RankedRpcs {
pub head_block: Web3ProxyBlock,
pub head_block: Option<Web3ProxyBlock>,
pub num_synced: usize,
pub backups_needed: bool,
pub check_block_data: bool,
@ -113,9 +113,6 @@ impl RankedRpcs {
// we don't need to sort the rpcs now. we will sort them when a request neds them
// TODO: the shame about this is that we lose just being able to compare 2 random servers
// TODO: why is head_block not set here?! it should always be set!
let head_block = head_block.unwrap_or_default();
let rpcs: HashSet<_> = rpcs.into_iter().collect();
let backups_needed = rpcs.iter().any(|x| x.backup);
@ -204,7 +201,7 @@ impl RankedRpcs {
let consensus = RankedRpcs {
backups_needed,
check_block_data: true,
head_block: best_block,
head_block: Some(best_block),
sort_mode,
inner: best_rpcs,
num_synced,
@ -221,7 +218,7 @@ impl RankedRpcs {
return None;
}
let head_block = self.head_block.number();
let head_block_num = self.head_block.as_ref().map(|x| x.number());
let num_active = self.num_active_rpcs();
@ -289,7 +286,7 @@ impl RankedRpcs {
}
if inner_for_request.is_empty() {
warn!(?inner_for_request, ?outer_for_request, %web3_request, %head_block, "no rpcs for request");
warn!(?inner_for_request, ?outer_for_request, %web3_request, head_block=%MaybeBlockNum(&head_block_num), "no rpcs for request");
None
} else {
trace!(?inner_for_request, ?outer_for_request, %web3_request, "for_request");
@ -467,7 +464,7 @@ impl ConsensusFinder {
num_consensus_rpcs,
num_active_rpcs,
total_rpcs,
consensus_head_block,
MaybeBlock(&consensus_head_block),
rpc_head_str,
);
@ -476,13 +473,20 @@ impl ConsensusFinder {
warn!("Backup RPCs are in use!");
}
// this should already be cached
let consensus_head_block = web3_rpcs
.try_cache_block(consensus_head_block, true)
.await?;
// this should already be cached, but now we set to consensus_head
let consensus_head_block = if let Some(consensus_head_block) = consensus_head_block
{
let consensus_head_block = web3_rpcs
.try_cache_block(consensus_head_block, true)
.await?;
Some(consensus_head_block)
} else {
None
};
watch_consensus_head_sender
.send(Some(consensus_head_block))
.send(consensus_head_block)
.or(Err(Web3ProxyError::WatchSendError))
.web3_context(
"watch_consensus_head_sender failed sending first consensus_head_block",
@ -491,10 +495,16 @@ impl ConsensusFinder {
Some(old_consensus_connections) => {
let old_head_block = &old_consensus_connections.head_block;
match consensus_head_block.number().cmp(&old_head_block.number()) {
let consensus_num = consensus_head_block.as_ref().map(|x| x.number());
let old_head_num = old_head_block.as_ref().map(|x| x.number());
let consensus_hash = consensus_head_block.as_ref().map(|x| x.hash());
let old_head_hash = old_head_block.as_ref().map(|x| x.hash());
match consensus_num.cmp(&old_head_num) {
Ordering::Equal => {
// multiple blocks with the same fork!
if consensus_head_block.hash() == old_head_block.hash() {
// multiple blocks with the same number! fork detected!
if consensus_hash == old_head_hash {
// no change in hash. no need to use watch_consensus_head_sender
// TODO: trace level if rpc is backup
debug!(
@ -505,7 +515,7 @@ impl ConsensusFinder {
num_consensus_rpcs,
num_active_rpcs,
total_rpcs,
consensus_head_block,
MaybeBlock(&consensus_head_block),
rpc_head_str,
)
} else {
@ -519,18 +529,26 @@ impl ConsensusFinder {
num_consensus_rpcs,
num_active_rpcs,
total_rpcs,
consensus_head_block,
old_head_block,
MaybeBlock(&consensus_head_block),
MaybeBlock(old_head_block),
rpc_head_str,
);
let consensus_head_block = web3_rpcs
.try_cache_block(consensus_head_block, true)
.await
.web3_context("save consensus_head_block as heaviest chain")?;
let consensus_head_block = if let Some(consensus_head_block) =
consensus_head_block
{
let consensus_head_block = web3_rpcs
.try_cache_block(consensus_head_block, true)
.await
.web3_context("save consensus_head_block as heaviest chain")?;
Some(consensus_head_block)
} else {
None
};
watch_consensus_head_sender
.send(Some(consensus_head_block))
.send(consensus_head_block)
.or(Err(Web3ProxyError::WatchSendError))
.web3_context("watch_consensus_head_sender failed sending uncled consensus_head_block")?;
}
@ -546,8 +564,8 @@ impl ConsensusFinder {
num_consensus_rpcs,
num_active_rpcs,
total_rpcs,
consensus_head_block,
old_head_block,
MaybeBlock(&consensus_head_block),
MaybeBlock(old_head_block),
rpc_head_str,
);
@ -557,15 +575,22 @@ impl ConsensusFinder {
}
// TODO: tell save_block to remove any higher block numbers from the cache. not needed because we have other checks on requested blocks being > head, but still seems like a good idea
let consensus_head_block = web3_rpcs
.try_cache_block(consensus_head_block, true)
.await
.web3_context(
"save_block sending consensus_head_block as heaviest chain",
)?;
let consensus_head_block =
if let Some(consensus_head_block) = consensus_head_block {
let consensus_head_block = web3_rpcs
.try_cache_block(consensus_head_block, true)
.await
.web3_context(
"save_block sending consensus_head_block as heaviest chain",
)?;
Some(consensus_head_block)
} else {
None
};
watch_consensus_head_sender
.send(Some(consensus_head_block))
.send(consensus_head_block)
.or(Err(Web3ProxyError::WatchSendError))
.web3_context("watch_consensus_head_sender failed sending rollback consensus_head_block")?;
}
@ -578,7 +603,7 @@ impl ConsensusFinder {
num_consensus_rpcs,
num_active_rpcs,
total_rpcs,
consensus_head_block,
MaybeBlock(&consensus_head_block),
rpc_head_str,
);
@ -587,11 +612,19 @@ impl ConsensusFinder {
warn!("Backup RPCs are in use!");
}
let consensus_head_block = web3_rpcs
.try_cache_block(consensus_head_block, true)
.await?;
// this should already be cached, but now we set to consensus_head
let consensus_head_block =
if let Some(consensus_head_block) = consensus_head_block {
Some(
web3_rpcs
.try_cache_block(consensus_head_block, true)
.await?,
)
} else {
None
};
watch_consensus_head_sender.send(Some(consensus_head_block))
watch_consensus_head_sender.send(consensus_head_block)
.or(Err(Web3ProxyError::WatchSendError))
.web3_context("watch_consensus_head_sender failed sending new consensus_head_block")?;
}
@ -1019,3 +1052,25 @@ impl RpcsForRequest {
// TODO: log that no servers were available. this might not be a server error. the user might have requested something in the far future (common when people mix up chains)
}
}
struct MaybeBlock<'a>(pub &'a Option<Web3ProxyBlock>);
impl std::fmt::Display for MaybeBlock<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self.0 {
Some(x) => write!(f, "{}", x),
None => write!(f, "None"),
}
}
}
struct MaybeBlockNum<'a>(pub &'a Option<U64>);
impl std::fmt::Display for MaybeBlockNum<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self.0 {
Some(x) => write!(f, "{}", x),
None => write!(f, "None"),
}
}
}