From d83a1f016942a5e24aa4a0164669efbf3df6377e Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Tue, 28 Feb 2023 11:01:34 -0800 Subject: [PATCH] use Web3Rpc instead of the name as a key --- web3_proxy/src/app/mod.rs | 14 ++++++++---- web3_proxy/src/rpcs/consensus.rs | 39 ++++++++++++++++---------------- web3_proxy/src/rpcs/one.rs | 27 ++++++++++++++++++---- 3 files changed, 52 insertions(+), 28 deletions(-) diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index f69362f7..5687df69 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -1016,17 +1016,23 @@ impl Web3ProxyApp { // TODO: i'm sure this could be done better with iterators // TODO: stream the response? let mut collected: Vec = Vec::with_capacity(num_requests); - let mut collected_rpcs: HashSet> = HashSet::new(); + let mut collected_rpc_names: HashSet = HashSet::new(); + let mut collected_rpcs: Vec> = vec![]; for response in responses { // TODO: any way to attach the tried rpcs to the error? it is likely helpful let (response, rpcs) = response?; collected.push(response); - collected_rpcs.extend(rpcs.into_iter()); + collected_rpcs.extend(rpcs.into_iter().filter(|x| { + if collected_rpc_names.contains(&x.name) { + false + } else { + collected_rpc_names.insert(x.name.clone()); + true + } + })); } - let collected_rpcs: Vec<_> = collected_rpcs.into_iter().collect(); - Ok((collected, collected_rpcs)) } diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index defbac07..f4e63259 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -93,7 +93,7 @@ impl Web3Rpcs { type FirstSeenCache = Cache; pub struct ConnectionsGroup { - rpc_name_to_block: HashMap, + rpc_to_block: HashMap, Web3ProxyBlock>, // TODO: what if there are two blocks with the same number? highest_block: Option, /// used to track rpc.head_latency. The same cache should be shared between all ConnectionsGroups @@ -103,27 +103,27 @@ pub struct ConnectionsGroup { impl ConnectionsGroup { pub fn new(first_seen: FirstSeenCache) -> Self { Self { - rpc_name_to_block: Default::default(), + rpc_to_block: Default::default(), highest_block: Default::default(), first_seen, } } pub fn len(&self) -> usize { - self.rpc_name_to_block.len() + self.rpc_to_block.len() } pub fn is_empty(&self) -> bool { - self.rpc_name_to_block.is_empty() + self.rpc_to_block.is_empty() } - fn remove(&mut self, rpc_name: &str) -> Option { - if let Some(removed_block) = self.rpc_name_to_block.remove(rpc_name) { + fn remove(&mut self, rpc: &Arc) -> Option { + if let Some(removed_block) = self.rpc_to_block.remove(rpc) { match self.highest_block.as_mut() { None => {} Some(current_highest_block) => { if removed_block.hash() == current_highest_block.hash() { - for maybe_highest_block in self.rpc_name_to_block.values() { + for maybe_highest_block in self.rpc_to_block.values() { if maybe_highest_block.number() > current_highest_block.number() { *current_highest_block = maybe_highest_block.clone(); }; @@ -138,7 +138,7 @@ impl ConnectionsGroup { } } - async fn insert(&mut self, rpc: &Web3Rpc, block: Web3ProxyBlock) -> Option { + async fn insert(&mut self, rpc: Arc, block: Web3ProxyBlock) -> Option { let first_seen = self .first_seen .get_with(*block.hash(), async move { Instant::now() }) @@ -155,7 +155,7 @@ impl ConnectionsGroup { self.highest_block = Some(block.clone()); } - self.rpc_name_to_block.insert(rpc.name.clone(), block) + self.rpc_to_block.insert(rpc, block) } /// min_consensus_block_num keeps us from ever going backwards. @@ -190,7 +190,7 @@ impl ConnectionsGroup { max_lag_consensus_to_highest ); - let num_known = self.rpc_name_to_block.len(); + let num_known = self.rpc_to_block.len(); if num_known < web3_rpcs.min_head_rpcs { return Err(anyhow::anyhow!( @@ -221,21 +221,22 @@ impl ConnectionsGroup { let maybe_head_hash = maybe_head_block.hash(); // find all rpcs with maybe_head_hash as their current head - for (rpc_name, rpc_head) in self.rpc_name_to_block.iter() { + for (rpc, rpc_head) in self.rpc_to_block.iter() { if rpc_head.hash() != maybe_head_hash { // connection is not on the desired block continue; } - if backup_consensus_rpcs.contains(rpc_name.as_str()) { + let rpc_name = rpc.name.as_str(); + if backup_consensus_rpcs.contains(rpc_name) { // connection is on a later block in this same chain continue; } - if primary_consensus_rpcs.contains(rpc_name.as_str()) { + if primary_consensus_rpcs.contains(rpc_name) { // connection is on a later block in this same chain continue; } - if let Some(rpc) = web3_rpcs.by_name.read().get(rpc_name.as_str()) { + if let Some(rpc) = web3_rpcs.by_name.read().get(rpc_name) { if backup_rpcs_voted.is_some() { // backups already voted for a head block. don't change it } else { @@ -407,14 +408,14 @@ impl ConsensusFinder { self.tiers.values_mut().last() } - pub fn remove(&mut self, rpc: &Web3Rpc) -> Option { + pub fn remove(&mut self, rpc: &Arc) -> Option { let mut removed = None; for (i, tier_group) in self.tiers.iter_mut().rev() { if i < &rpc.tier { break; } - let x = tier_group.remove(rpc.name.as_str()); + let x = tier_group.remove(rpc); if removed.is_none() && x.is_some() { removed = x; @@ -427,7 +428,7 @@ impl ConsensusFinder { /// returns the block that the rpc was on before updating to the new_block pub async fn insert( &mut self, - rpc: &Web3Rpc, + rpc: &Arc, new_block: Web3ProxyBlock, ) -> Option { let mut old = None; @@ -440,7 +441,7 @@ impl ConsensusFinder { } // TODO: should new_block be a ref? - let x = tier_group.insert(rpc, new_block.clone()).await; + let x = tier_group.insert(rpc.clone(), new_block.clone()).await; if old.is_none() && x.is_some() { old = x; @@ -528,7 +529,7 @@ impl ConsensusFinder { // TODO: find the best tier with a connectionsgroup. best case, this only queries the first tier // TODO: do we need to calculate all of them? I think having highest_known_block included as part of min_block_num should make that unnecessary for (tier, x) in self.tiers.iter() { - trace!("checking tier {}: {:#?}", tier, x.rpc_name_to_block); + trace!("checking tier {}: {:#?}", tier, x.rpc_to_block); if let Ok(consensus_head_connections) = x .consensus_head_connections(authorization, web3_connections, min_block_num, tier) .await diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index e945d7b6..63b82b4b 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -135,6 +135,7 @@ pub struct Web3Rpc { pub(super) active_requests: AtomicUsize, pub(super) reconnect: AtomicBool, pub(super) disconnect_watch: Option>, + pub(super) created_at: Option, } impl Web3Rpc { @@ -158,6 +159,8 @@ impl Web3Rpc { tx_id_sender: Option)>>, reconnect: bool, ) -> anyhow::Result<(Arc, AnyhowJoinHandle<()>)> { + let created_at = Instant::now(); + let hard_limit = match (config.hard_limit, redis_pool) { (None, None) => None, (Some(hard_limit), Some(redis_pool)) => { @@ -238,6 +241,7 @@ impl Web3Rpc { reconnect, tier: config.tier, disconnect_watch: Some(disconnect_sender), + created_at: Some(created_at), ..Default::default() }; @@ -573,7 +577,9 @@ impl Web3Rpc { } pub async fn disconnect(&self) -> anyhow::Result<()> { - info!("disconnecting {}", self); + let age = self.created_at.unwrap().elapsed().as_millis(); + + info!("disconnecting {} ({}ms old)", self, age); self.reconnect.store(false, atomic::Ordering::Release); @@ -581,11 +587,11 @@ impl Web3Rpc { warn!("failed sending disconnect watch: {:?}", err); }; - trace!("disconnecting (locking) {}", self); + trace!("disconnecting (locking) {} ({}ms old)", self, age); let mut provider = self.provider.write().await; - trace!("disconnecting (clearing provider) {}", self); + trace!("disconnecting (clearing provider) {} ({}ms old)", self, age); *provider = None; @@ -607,7 +613,10 @@ impl Web3Rpc { // we previously sent a None. return early return Ok(()); } - warn!("clearing head block on {}!", self); + + let age = self.created_at.unwrap().elapsed().as_millis(); + + warn!("clearing head block on {} ({}ms old)!", self, age); *head_block = None; } @@ -1279,8 +1288,16 @@ impl fmt::Debug for Web3Provider { impl Hash for Web3Rpc { fn hash(&self, state: &mut H) { - // TODO: is this enough? self.name.hash(state); + self.display_name.hash(state); + self.http_url.hash(state); + self.ws_url.hash(state); + self.automatic_block_limit.hash(state); + self.backup.hash(state); + // TODO: including soft_limit might need to change if we change them to be dynamic + self.soft_limit.hash(state); + self.tier.hash(state); + self.created_at.hash(state); } }