count all the tiers at once
This commit is contained in:
parent
bba7ccf7eb
commit
c192e03fa2
@ -559,46 +559,12 @@ impl ConsensusFinder {
|
|||||||
|
|
||||||
// TODO: also track the sum of *available* hard_limits? if any servers have no hard limits, use their soft limit or no limit?
|
// TODO: also track the sum of *available* hard_limits? if any servers have no hard limits, use their soft limit or no limit?
|
||||||
// TODO: struct for the value of the votes hashmap?
|
// TODO: struct for the value of the votes hashmap?
|
||||||
let mut primary_votes: HashMap<Web3ProxyBlock, (HashSet<&str>, u32)> = Default::default();
|
let mut primary_votes: HashMap<Web3ProxyBlock, (HashSet<&Arc<Web3Rpc>>, u32)> =
|
||||||
let mut backup_votes: HashMap<Web3ProxyBlock, (HashSet<&str>, u32)> = Default::default();
|
Default::default();
|
||||||
|
let mut backup_votes: HashMap<Web3ProxyBlock, (HashSet<&Arc<Web3Rpc>>, u32)> =
|
||||||
let mut backup_consensus = None;
|
Default::default();
|
||||||
|
|
||||||
let mut rpc_heads_by_tier: Vec<_> = self.rpc_heads.iter().collect();
|
|
||||||
rpc_heads_by_tier.sort_by_cached_key(|(rpc, _)| rpc.tier.load(atomic::Ordering::Relaxed));
|
|
||||||
|
|
||||||
let current_tier = rpc_heads_by_tier
|
|
||||||
.first()
|
|
||||||
.expect("rpc_heads_by_tier should never be empty")
|
|
||||||
.0
|
|
||||||
.tier
|
|
||||||
.load(atomic::Ordering::Relaxed);
|
|
||||||
|
|
||||||
// trace!("first_tier: {}", current_tier);
|
|
||||||
|
|
||||||
// trace!("rpc_heads_by_tier: {:#?}", rpc_heads_by_tier);
|
|
||||||
|
|
||||||
// loop over all the rpc heads (grouped by tier) and their parents to find consensus
|
|
||||||
// TODO: i'm sure theres a lot of shortcuts that could be taken, but this is simplest to implement
|
|
||||||
for (rpc, rpc_head) in rpc_heads_by_tier.into_iter() {
|
|
||||||
let rpc_tier = rpc.tier.load(atomic::Ordering::Relaxed);
|
|
||||||
|
|
||||||
if current_tier != rpc_tier {
|
|
||||||
// we finished processing a tier. check for primary results
|
|
||||||
if let Some(consensus) = self.count_votes(&primary_votes, web3_rpcs) {
|
|
||||||
trace!("found enough votes on tier {}", current_tier);
|
|
||||||
return Ok(Some(consensus));
|
|
||||||
}
|
|
||||||
|
|
||||||
// only set backup consensus once. we don't want it to keep checking on worse tiers if it already found consensus
|
|
||||||
if backup_consensus.is_none() {
|
|
||||||
if let Some(consensus) = self.count_votes(&backup_votes, web3_rpcs) {
|
|
||||||
trace!("found backup votes on tier {}", current_tier);
|
|
||||||
backup_consensus = Some(consensus)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
for (rpc, rpc_head) in self.rpc_heads.iter() {
|
||||||
let mut block_to_check = rpc_head.clone();
|
let mut block_to_check = rpc_head.clone();
|
||||||
|
|
||||||
while block_to_check.number() >= lowest_block_number {
|
while block_to_check.number() >= lowest_block_number {
|
||||||
@ -606,14 +572,14 @@ impl ConsensusFinder {
|
|||||||
// backup nodes are excluded from the primary voting
|
// backup nodes are excluded from the primary voting
|
||||||
let entry = primary_votes.entry(block_to_check.clone()).or_default();
|
let entry = primary_votes.entry(block_to_check.clone()).or_default();
|
||||||
|
|
||||||
entry.0.insert(&rpc.name);
|
entry.0.insert(rpc);
|
||||||
entry.1 += rpc.soft_limit;
|
entry.1 += rpc.soft_limit;
|
||||||
}
|
}
|
||||||
|
|
||||||
// both primary and backup rpcs get included in the backup voting
|
// both primary and backup rpcs get included in the backup voting
|
||||||
let backup_entry = backup_votes.entry(block_to_check.clone()).or_default();
|
let backup_entry = backup_votes.entry(block_to_check.clone()).or_default();
|
||||||
|
|
||||||
backup_entry.0.insert(&rpc.name);
|
backup_entry.0.insert(rpc);
|
||||||
backup_entry.1 += rpc.soft_limit;
|
backup_entry.1 += rpc.soft_limit;
|
||||||
|
|
||||||
match web3_rpcs
|
match web3_rpcs
|
||||||
@ -638,19 +604,14 @@ impl ConsensusFinder {
|
|||||||
return Ok(Some(consensus));
|
return Ok(Some(consensus));
|
||||||
}
|
}
|
||||||
|
|
||||||
// only set backup consensus once. we don't want it to keep checking on worse tiers if it already found consensus
|
// primary votes didn't work. hopefully backup tiers are synced
|
||||||
if let Some(consensus) = backup_consensus {
|
|
||||||
return Ok(Some(consensus));
|
|
||||||
}
|
|
||||||
|
|
||||||
// count votes one last time
|
|
||||||
Ok(self.count_votes(&backup_votes, web3_rpcs))
|
Ok(self.count_votes(&backup_votes, web3_rpcs))
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: have min_sum_soft_limit and min_head_rpcs on self instead of on Web3Rpcs
|
// TODO: have min_sum_soft_limit and min_head_rpcs on self instead of on Web3Rpcs
|
||||||
fn count_votes(
|
fn count_votes(
|
||||||
&self,
|
&self,
|
||||||
votes: &HashMap<Web3ProxyBlock, (HashSet<&str>, u32)>,
|
votes: &HashMap<Web3ProxyBlock, (HashSet<&Arc<Web3Rpc>>, u32)>,
|
||||||
web3_rpcs: &Web3Rpcs,
|
web3_rpcs: &Web3Rpcs,
|
||||||
) -> Option<ConsensusWeb3Rpcs> {
|
) -> Option<ConsensusWeb3Rpcs> {
|
||||||
// sort the primary votes ascending by tier and descending by block num
|
// sort the primary votes ascending by tier and descending by block num
|
||||||
@ -678,16 +639,12 @@ impl ConsensusFinder {
|
|||||||
|
|
||||||
trace!("rpc_names: {:#?}", rpc_names);
|
trace!("rpc_names: {:#?}", rpc_names);
|
||||||
|
|
||||||
// consensus likely found! load the rpcs to make sure they all have active connections
|
if rpc_names.len() < web3_rpcs.min_synced_rpcs {
|
||||||
let consensus_rpcs: Vec<_> = rpc_names
|
|
||||||
.into_iter()
|
|
||||||
.filter_map(|x| web3_rpcs.get(x))
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
if consensus_rpcs.len() < web3_rpcs.min_synced_rpcs {
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
// consensus found!
|
// consensus found!
|
||||||
|
let consensus_rpcs: Vec<Arc<_>> = rpc_names.iter().map(|x| Arc::clone(x)).collect();
|
||||||
|
|
||||||
let tier = consensus_rpcs
|
let tier = consensus_rpcs
|
||||||
.iter()
|
.iter()
|
||||||
|
Loading…
Reference in New Issue
Block a user