use Web3Rpc instead of the name as a key

This commit is contained in:
Bryan Stitt 2023-02-28 11:01:34 -08:00
parent 26970c2d88
commit d83a1f0169
3 changed files with 52 additions and 28 deletions

View File

@ -1016,16 +1016,22 @@ impl Web3ProxyApp {
// TODO: i'm sure this could be done better with iterators // TODO: i'm sure this could be done better with iterators
// TODO: stream the response? // TODO: stream the response?
let mut collected: Vec<JsonRpcForwardedResponse> = Vec::with_capacity(num_requests); let mut collected: Vec<JsonRpcForwardedResponse> = Vec::with_capacity(num_requests);
let mut collected_rpcs: HashSet<Arc<Web3Rpc>> = HashSet::new(); let mut collected_rpc_names: HashSet<String> = HashSet::new();
let mut collected_rpcs: Vec<Arc<Web3Rpc>> = vec![];
for response in responses { for response in responses {
// TODO: any way to attach the tried rpcs to the error? it is likely helpful // TODO: any way to attach the tried rpcs to the error? it is likely helpful
let (response, rpcs) = response?; let (response, rpcs) = response?;
collected.push(response); collected.push(response);
collected_rpcs.extend(rpcs.into_iter()); collected_rpcs.extend(rpcs.into_iter().filter(|x| {
if collected_rpc_names.contains(&x.name) {
false
} else {
collected_rpc_names.insert(x.name.clone());
true
}
}));
} }
let collected_rpcs: Vec<_> = collected_rpcs.into_iter().collect();
Ok((collected, collected_rpcs)) Ok((collected, collected_rpcs))
} }

View File

@ -93,7 +93,7 @@ impl Web3Rpcs {
type FirstSeenCache = Cache<H256, Instant, hashbrown::hash_map::DefaultHashBuilder>; type FirstSeenCache = Cache<H256, Instant, hashbrown::hash_map::DefaultHashBuilder>;
pub struct ConnectionsGroup { pub struct ConnectionsGroup {
rpc_name_to_block: HashMap<String, Web3ProxyBlock>, rpc_to_block: HashMap<Arc<Web3Rpc>, Web3ProxyBlock>,
// TODO: what if there are two blocks with the same number? // TODO: what if there are two blocks with the same number?
highest_block: Option<Web3ProxyBlock>, highest_block: Option<Web3ProxyBlock>,
/// used to track rpc.head_latency. The same cache should be shared between all ConnectionsGroups /// used to track rpc.head_latency. The same cache should be shared between all ConnectionsGroups
@ -103,27 +103,27 @@ pub struct ConnectionsGroup {
impl ConnectionsGroup { impl ConnectionsGroup {
pub fn new(first_seen: FirstSeenCache) -> Self { pub fn new(first_seen: FirstSeenCache) -> Self {
Self { Self {
rpc_name_to_block: Default::default(), rpc_to_block: Default::default(),
highest_block: Default::default(), highest_block: Default::default(),
first_seen, first_seen,
} }
} }
pub fn len(&self) -> usize { pub fn len(&self) -> usize {
self.rpc_name_to_block.len() self.rpc_to_block.len()
} }
pub fn is_empty(&self) -> bool { pub fn is_empty(&self) -> bool {
self.rpc_name_to_block.is_empty() self.rpc_to_block.is_empty()
} }
fn remove(&mut self, rpc_name: &str) -> Option<Web3ProxyBlock> { fn remove(&mut self, rpc: &Arc<Web3Rpc>) -> Option<Web3ProxyBlock> {
if let Some(removed_block) = self.rpc_name_to_block.remove(rpc_name) { if let Some(removed_block) = self.rpc_to_block.remove(rpc) {
match self.highest_block.as_mut() { match self.highest_block.as_mut() {
None => {} None => {}
Some(current_highest_block) => { Some(current_highest_block) => {
if removed_block.hash() == current_highest_block.hash() { if removed_block.hash() == current_highest_block.hash() {
for maybe_highest_block in self.rpc_name_to_block.values() { for maybe_highest_block in self.rpc_to_block.values() {
if maybe_highest_block.number() > current_highest_block.number() { if maybe_highest_block.number() > current_highest_block.number() {
*current_highest_block = maybe_highest_block.clone(); *current_highest_block = maybe_highest_block.clone();
}; };
@ -138,7 +138,7 @@ impl ConnectionsGroup {
} }
} }
async fn insert(&mut self, rpc: &Web3Rpc, block: Web3ProxyBlock) -> Option<Web3ProxyBlock> { async fn insert(&mut self, rpc: Arc<Web3Rpc>, block: Web3ProxyBlock) -> Option<Web3ProxyBlock> {
let first_seen = self let first_seen = self
.first_seen .first_seen
.get_with(*block.hash(), async move { Instant::now() }) .get_with(*block.hash(), async move { Instant::now() })
@ -155,7 +155,7 @@ impl ConnectionsGroup {
self.highest_block = Some(block.clone()); self.highest_block = Some(block.clone());
} }
self.rpc_name_to_block.insert(rpc.name.clone(), block) self.rpc_to_block.insert(rpc, block)
} }
/// min_consensus_block_num keeps us from ever going backwards. /// min_consensus_block_num keeps us from ever going backwards.
@ -190,7 +190,7 @@ impl ConnectionsGroup {
max_lag_consensus_to_highest max_lag_consensus_to_highest
); );
let num_known = self.rpc_name_to_block.len(); let num_known = self.rpc_to_block.len();
if num_known < web3_rpcs.min_head_rpcs { if num_known < web3_rpcs.min_head_rpcs {
return Err(anyhow::anyhow!( return Err(anyhow::anyhow!(
@ -221,21 +221,22 @@ impl ConnectionsGroup {
let maybe_head_hash = maybe_head_block.hash(); let maybe_head_hash = maybe_head_block.hash();
// find all rpcs with maybe_head_hash as their current head // find all rpcs with maybe_head_hash as their current head
for (rpc_name, rpc_head) in self.rpc_name_to_block.iter() { for (rpc, rpc_head) in self.rpc_to_block.iter() {
if rpc_head.hash() != maybe_head_hash { if rpc_head.hash() != maybe_head_hash {
// connection is not on the desired block // connection is not on the desired block
continue; continue;
} }
if backup_consensus_rpcs.contains(rpc_name.as_str()) { let rpc_name = rpc.name.as_str();
if backup_consensus_rpcs.contains(rpc_name) {
// connection is on a later block in this same chain // connection is on a later block in this same chain
continue; continue;
} }
if primary_consensus_rpcs.contains(rpc_name.as_str()) { if primary_consensus_rpcs.contains(rpc_name) {
// connection is on a later block in this same chain // connection is on a later block in this same chain
continue; continue;
} }
if let Some(rpc) = web3_rpcs.by_name.read().get(rpc_name.as_str()) { if let Some(rpc) = web3_rpcs.by_name.read().get(rpc_name) {
if backup_rpcs_voted.is_some() { if backup_rpcs_voted.is_some() {
// backups already voted for a head block. don't change it // backups already voted for a head block. don't change it
} else { } else {
@ -407,14 +408,14 @@ impl ConsensusFinder {
self.tiers.values_mut().last() self.tiers.values_mut().last()
} }
pub fn remove(&mut self, rpc: &Web3Rpc) -> Option<Web3ProxyBlock> { pub fn remove(&mut self, rpc: &Arc<Web3Rpc>) -> Option<Web3ProxyBlock> {
let mut removed = None; let mut removed = None;
for (i, tier_group) in self.tiers.iter_mut().rev() { for (i, tier_group) in self.tiers.iter_mut().rev() {
if i < &rpc.tier { if i < &rpc.tier {
break; break;
} }
let x = tier_group.remove(rpc.name.as_str()); let x = tier_group.remove(rpc);
if removed.is_none() && x.is_some() { if removed.is_none() && x.is_some() {
removed = x; removed = x;
@ -427,7 +428,7 @@ impl ConsensusFinder {
/// returns the block that the rpc was on before updating to the new_block /// returns the block that the rpc was on before updating to the new_block
pub async fn insert( pub async fn insert(
&mut self, &mut self,
rpc: &Web3Rpc, rpc: &Arc<Web3Rpc>,
new_block: Web3ProxyBlock, new_block: Web3ProxyBlock,
) -> Option<Web3ProxyBlock> { ) -> Option<Web3ProxyBlock> {
let mut old = None; let mut old = None;
@ -440,7 +441,7 @@ impl ConsensusFinder {
} }
// TODO: should new_block be a ref? // TODO: should new_block be a ref?
let x = tier_group.insert(rpc, new_block.clone()).await; let x = tier_group.insert(rpc.clone(), new_block.clone()).await;
if old.is_none() && x.is_some() { if old.is_none() && x.is_some() {
old = x; old = x;
@ -528,7 +529,7 @@ impl ConsensusFinder {
// TODO: find the best tier with a connectionsgroup. best case, this only queries the first tier // TODO: find the best tier with a connectionsgroup. best case, this only queries the first tier
// TODO: do we need to calculate all of them? I think having highest_known_block included as part of min_block_num should make that unnecessary // TODO: do we need to calculate all of them? I think having highest_known_block included as part of min_block_num should make that unnecessary
for (tier, x) in self.tiers.iter() { for (tier, x) in self.tiers.iter() {
trace!("checking tier {}: {:#?}", tier, x.rpc_name_to_block); trace!("checking tier {}: {:#?}", tier, x.rpc_to_block);
if let Ok(consensus_head_connections) = x if let Ok(consensus_head_connections) = x
.consensus_head_connections(authorization, web3_connections, min_block_num, tier) .consensus_head_connections(authorization, web3_connections, min_block_num, tier)
.await .await

View File

@ -135,6 +135,7 @@ pub struct Web3Rpc {
pub(super) active_requests: AtomicUsize, pub(super) active_requests: AtomicUsize,
pub(super) reconnect: AtomicBool, pub(super) reconnect: AtomicBool,
pub(super) disconnect_watch: Option<watch::Sender<bool>>, pub(super) disconnect_watch: Option<watch::Sender<bool>>,
pub(super) created_at: Option<Instant>,
} }
impl Web3Rpc { impl Web3Rpc {
@ -158,6 +159,8 @@ impl Web3Rpc {
tx_id_sender: Option<flume::Sender<(TxHash, Arc<Self>)>>, tx_id_sender: Option<flume::Sender<(TxHash, Arc<Self>)>>,
reconnect: bool, reconnect: bool,
) -> anyhow::Result<(Arc<Web3Rpc>, AnyhowJoinHandle<()>)> { ) -> anyhow::Result<(Arc<Web3Rpc>, AnyhowJoinHandle<()>)> {
let created_at = Instant::now();
let hard_limit = match (config.hard_limit, redis_pool) { let hard_limit = match (config.hard_limit, redis_pool) {
(None, None) => None, (None, None) => None,
(Some(hard_limit), Some(redis_pool)) => { (Some(hard_limit), Some(redis_pool)) => {
@ -238,6 +241,7 @@ impl Web3Rpc {
reconnect, reconnect,
tier: config.tier, tier: config.tier,
disconnect_watch: Some(disconnect_sender), disconnect_watch: Some(disconnect_sender),
created_at: Some(created_at),
..Default::default() ..Default::default()
}; };
@ -573,7 +577,9 @@ impl Web3Rpc {
} }
pub async fn disconnect(&self) -> anyhow::Result<()> { pub async fn disconnect(&self) -> anyhow::Result<()> {
info!("disconnecting {}", self); let age = self.created_at.unwrap().elapsed().as_millis();
info!("disconnecting {} ({}ms old)", self, age);
self.reconnect.store(false, atomic::Ordering::Release); self.reconnect.store(false, atomic::Ordering::Release);
@ -581,11 +587,11 @@ impl Web3Rpc {
warn!("failed sending disconnect watch: {:?}", err); warn!("failed sending disconnect watch: {:?}", err);
}; };
trace!("disconnecting (locking) {}", self); trace!("disconnecting (locking) {} ({}ms old)", self, age);
let mut provider = self.provider.write().await; let mut provider = self.provider.write().await;
trace!("disconnecting (clearing provider) {}", self); trace!("disconnecting (clearing provider) {} ({}ms old)", self, age);
*provider = None; *provider = None;
@ -607,7 +613,10 @@ impl Web3Rpc {
// we previously sent a None. return early // we previously sent a None. return early
return Ok(()); return Ok(());
} }
warn!("clearing head block on {}!", self);
let age = self.created_at.unwrap().elapsed().as_millis();
warn!("clearing head block on {} ({}ms old)!", self, age);
*head_block = None; *head_block = None;
} }
@ -1279,8 +1288,16 @@ impl fmt::Debug for Web3Provider {
impl Hash for Web3Rpc { impl Hash for Web3Rpc {
fn hash<H: Hasher>(&self, state: &mut H) { fn hash<H: Hasher>(&self, state: &mut H) {
// TODO: is this enough?
self.name.hash(state); self.name.hash(state);
self.display_name.hash(state);
self.http_url.hash(state);
self.ws_url.hash(state);
self.automatic_block_limit.hash(state);
self.backup.hash(state);
// TODO: including soft_limit might need to change if we change them to be dynamic
self.soft_limit.hash(state);
self.tier.hash(state);
self.created_at.hash(state);
} }
} }