diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index 9a980968..f4f18332 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -559,46 +559,12 @@ impl ConsensusFinder { // TODO: also track the sum of *available* hard_limits? if any servers have no hard limits, use their soft limit or no limit? // TODO: struct for the value of the votes hashmap? - let mut primary_votes: HashMap, u32)> = Default::default(); - let mut backup_votes: HashMap, u32)> = Default::default(); - - let mut backup_consensus = None; - - let mut rpc_heads_by_tier: Vec<_> = self.rpc_heads.iter().collect(); - rpc_heads_by_tier.sort_by_cached_key(|(rpc, _)| rpc.tier.load(atomic::Ordering::Relaxed)); - - let current_tier = rpc_heads_by_tier - .first() - .expect("rpc_heads_by_tier should never be empty") - .0 - .tier - .load(atomic::Ordering::Relaxed); - - // trace!("first_tier: {}", current_tier); - - // trace!("rpc_heads_by_tier: {:#?}", rpc_heads_by_tier); - - // loop over all the rpc heads (grouped by tier) and their parents to find consensus - // TODO: i'm sure theres a lot of shortcuts that could be taken, but this is simplest to implement - for (rpc, rpc_head) in rpc_heads_by_tier.into_iter() { - let rpc_tier = rpc.tier.load(atomic::Ordering::Relaxed); - - if current_tier != rpc_tier { - // we finished processing a tier. check for primary results - if let Some(consensus) = self.count_votes(&primary_votes, web3_rpcs) { - trace!("found enough votes on tier {}", current_tier); - return Ok(Some(consensus)); - } - - // only set backup consensus once. we don't want it to keep checking on worse tiers if it already found consensus - if backup_consensus.is_none() { - if let Some(consensus) = self.count_votes(&backup_votes, web3_rpcs) { - trace!("found backup votes on tier {}", current_tier); - backup_consensus = Some(consensus) - } - } - } + let mut primary_votes: HashMap>, u32)> = + Default::default(); + let mut backup_votes: HashMap>, u32)> = + Default::default(); + for (rpc, rpc_head) in self.rpc_heads.iter() { let mut block_to_check = rpc_head.clone(); while block_to_check.number() >= lowest_block_number { @@ -606,14 +572,14 @@ impl ConsensusFinder { // backup nodes are excluded from the primary voting let entry = primary_votes.entry(block_to_check.clone()).or_default(); - entry.0.insert(&rpc.name); + entry.0.insert(rpc); entry.1 += rpc.soft_limit; } // both primary and backup rpcs get included in the backup voting let backup_entry = backup_votes.entry(block_to_check.clone()).or_default(); - backup_entry.0.insert(&rpc.name); + backup_entry.0.insert(rpc); backup_entry.1 += rpc.soft_limit; match web3_rpcs @@ -638,19 +604,14 @@ impl ConsensusFinder { return Ok(Some(consensus)); } - // only set backup consensus once. we don't want it to keep checking on worse tiers if it already found consensus - if let Some(consensus) = backup_consensus { - return Ok(Some(consensus)); - } - - // count votes one last time + // primary votes didn't work. hopefully backup tiers are synced Ok(self.count_votes(&backup_votes, web3_rpcs)) } // TODO: have min_sum_soft_limit and min_head_rpcs on self instead of on Web3Rpcs fn count_votes( &self, - votes: &HashMap, u32)>, + votes: &HashMap>, u32)>, web3_rpcs: &Web3Rpcs, ) -> Option { // sort the primary votes ascending by tier and descending by block num @@ -678,16 +639,12 @@ impl ConsensusFinder { trace!("rpc_names: {:#?}", rpc_names); - // consensus likely found! load the rpcs to make sure they all have active connections - let consensus_rpcs: Vec<_> = rpc_names - .into_iter() - .filter_map(|x| web3_rpcs.get(x)) - .collect(); - - if consensus_rpcs.len() < web3_rpcs.min_synced_rpcs { + if rpc_names.len() < web3_rpcs.min_synced_rpcs { continue; } + // consensus found! + let consensus_rpcs: Vec> = rpc_names.iter().map(|x| Arc::clone(x)).collect(); let tier = consensus_rpcs .iter()