diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index f1fe2e80..f8fda474 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -403,9 +403,11 @@ impl Web3Rpcs { let num_active_rpcs = consensus_finder.len(); let total_rpcs = self.by_name.load().len(); + let new_consensus_rpcs = Arc::new(new_consensus_rpcs); + let old_consensus_head_connections = self .watch_consensus_rpcs_sender - .send_replace(Some(Arc::new(new_consensus_rpcs))); + .send_replace(Some(new_consensus_rpcs.clone())); let backups_voted_str = if backups_needed { "B " } else { "" }; @@ -494,9 +496,9 @@ impl Web3Rpcs { } Ordering::Less => { // this is unlikely but possible - // TODO: better log + // TODO: better log that includes all the votes warn!( - "chain rolled back {}/{} {}{}/{}/{} con={} old={} rpc={}@{}", + "chain rolled back {}/{} {}{}/{}/{} con={} old={} rpc={}@{}. {:#?} -> {:#?}", consensus_tier, total_tiers, backups_voted_str, @@ -507,6 +509,8 @@ impl Web3Rpcs { old_head_block, rpc, rpc_head_str, + old_consensus_connections, + new_consensus_rpcs, ); if backups_needed { diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index 483a76df..1987b23c 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -470,6 +470,9 @@ impl ConsensusFinder { // create the histogram let mut hist = Histogram::::new_with_bounds(1, max_latency, 3).unwrap(); + // TODO: resize shouldn't be necessary, but i've seen it error + hist.auto(true); + for weighted_latency_ms in weighted_latencies.values() { hist.record(*weighted_latency_ms)?; } @@ -550,7 +553,7 @@ impl ConsensusFinder { let num_known = self.rpc_heads.len(); - if num_known < web3_rpcs.min_head_rpcs { + if num_known < web3_rpcs.min_synced_rpcs { // this keeps us from serving requests when the proxy first starts trace!("not enough servers known"); return Ok(None); @@ -671,7 +674,7 @@ impl ConsensusFinder { continue; } // TODO: different mins for backup vs primary - if rpc_names.len() < web3_rpcs.min_head_rpcs { + if rpc_names.len() < web3_rpcs.min_synced_rpcs { continue; } @@ -683,7 +686,7 @@ impl ConsensusFinder { .filter_map(|x| web3_rpcs.get(x)) .collect(); - if consensus_rpcs.len() < web3_rpcs.min_head_rpcs { + if consensus_rpcs.len() < web3_rpcs.min_synced_rpcs { continue; } // consensus found! diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 4644bac4..5d0f2694 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -61,7 +61,7 @@ pub struct Web3Rpcs { /// blocks on the heaviest chain pub(super) blocks_by_number: BlocksByNumberCache, /// the number of rpcs required to agree on consensus for the head block (thundering herd protection) - pub(super) min_head_rpcs: usize, + pub(super) min_synced_rpcs: usize, /// the soft limit required to agree on consensus for the head block. (thundering herd protection) pub(super) min_sum_soft_limit: u32, /// how far behind the highest known block height we can be before we stop serving requests @@ -121,7 +121,7 @@ impl Web3Rpcs { by_name, max_block_age, max_block_lag, - min_head_rpcs, + min_synced_rpcs: min_head_rpcs, min_sum_soft_limit, name, pending_transaction_cache, @@ -254,10 +254,10 @@ impl Web3Rpcs { let num_rpcs = self.by_name.load().len(); - if num_rpcs < self.min_head_rpcs { + if num_rpcs < self.min_synced_rpcs { return Err(Web3ProxyError::NotEnoughRpcs { num_known: num_rpcs, - min_head_rpcs: self.min_head_rpcs, + min_head_rpcs: self.min_synced_rpcs, }); } @@ -277,7 +277,7 @@ impl Web3Rpcs { } pub fn min_head_rpcs(&self) -> usize { - self.min_head_rpcs + self.min_synced_rpcs } /// subscribe to blocks and transactions from all the backend rpcs. @@ -570,7 +570,7 @@ impl Web3Rpcs { .cloned(), ); - if potential_rpcs.len() >= self.min_head_rpcs { + if potential_rpcs.len() >= self.min_synced_rpcs { // we have enough potential rpcs. try to load balance potential_rpcs.sort_by_cached_key(|x| { x.shuffle_for_load_balancing_on(max_block_needed.copied()) @@ -617,7 +617,7 @@ impl Web3Rpcs { potential_rpcs.extend(more_rpcs); - if potential_rpcs.len() >= self.min_head_rpcs { + if potential_rpcs.len() >= self.min_synced_rpcs { // we have enough potential rpcs. try to load balance potential_rpcs.sort_by_cached_key(|x| { x.shuffle_for_load_balancing_on(max_block_needed.copied()) @@ -1501,7 +1501,7 @@ mod tests { max_block_age: None, // TODO: test max_block_lag? max_block_lag: None, - min_head_rpcs: 1, + min_synced_rpcs: 1, min_sum_soft_limit: 1, }; @@ -1777,7 +1777,7 @@ mod tests { blocks_by_number: CacheBuilder::new(100) .time_to_live(Duration::from_secs(120)) .build(), - min_head_rpcs: 1, + min_synced_rpcs: 1, min_sum_soft_limit: 4_000, max_block_age: None, max_block_lag: None, @@ -1957,7 +1957,7 @@ mod tests { pending_tx_id_sender, blocks_by_hash: Cache::new(10_000), blocks_by_number: Cache::new(10_000), - min_head_rpcs: 1, + min_synced_rpcs: 1, min_sum_soft_limit: 1_000, max_block_age: None, max_block_lag: None,