From a519427fcf95ade26343734cc81146492e53907b Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Tue, 14 Feb 2023 12:41:05 -0800 Subject: [PATCH] fixed one test. still more to fix though --- web3_proxy/src/rpcs/blockchain.rs | 12 ++++++++++- web3_proxy/src/rpcs/consensus.rs | 36 +++++++++++++++++++++++-------- web3_proxy/src/rpcs/many.rs | 4 ++-- 3 files changed, 40 insertions(+), 12 deletions(-) diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index b6bfd01e..8ce7f495 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -8,6 +8,7 @@ use crate::{config::BlockAndRpc, jsonrpc::JsonRpcRequest}; use anyhow::Context; use derive_more::From; use ethers::prelude::{Block, TxHash, H256, U64}; +use hashbrown::HashSet; use log::{debug, error, trace, warn, Level}; use moka::future::Cache; use serde::Serialize; @@ -315,7 +316,16 @@ impl Web3Rpcs { ) -> anyhow::Result<()> { // TODO: indexmap or hashmap? what hasher? with_capacity? // TODO: this will grow unbounded. prune old heads on this at the same time we prune the graph? - let mut connection_heads = ConsensusFinder::new(self.max_block_age, self.max_block_lag); + let configured_tiers: Vec = self + .conns + .values() + .map(|x| x.tier) + .collect::>() + .into_iter() + .collect(); + + let mut connection_heads = + ConsensusFinder::new(&configured_tiers, self.max_block_age, self.max_block_lag); loop { match block_receiver.recv_async().await { diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index 289be536..bcaf1f56 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -3,6 +3,7 @@ use crate::frontend::authorization::Authorization; use super::blockchain::Web3ProxyBlock; use super::many::Web3Rpcs; use super::one::Web3Rpc; +use anyhow::Context; use ethers::prelude::{H256, U64}; use hashbrown::{HashMap, HashSet}; use log::{debug, trace, warn}; @@ -371,9 +372,19 @@ pub struct ConsensusFinder { } impl ConsensusFinder { - pub fn new(max_block_age: Option, max_block_lag: Option) -> Self { + pub fn new( + configured_tiers: &[u64], + max_block_age: Option, + max_block_lag: Option, + ) -> Self { + // TODO: this will need some thought when config reloading is written + let tiers = configured_tiers + .iter() + .map(|x| (*x, Default::default())) + .collect(); + Self { - tiers: Default::default(), + tiers, max_block_age, max_block_lag, } @@ -413,8 +424,10 @@ impl ConsensusFinder { pub fn insert(&mut self, rpc: &Web3Rpc, new_block: Web3ProxyBlock) -> Option { let mut old = None; + // TODO: error if rpc.tier is not in self.tiers + for (i, tier_group) in self.tiers.iter_mut().rev() { - if i > &rpc.tier { + if i < &rpc.tier { break; } @@ -443,7 +456,8 @@ impl ConsensusFinder { // we don't know if its on the heaviest chain yet rpc_head_block = web3_connections .try_cache_block(rpc_head_block, false) - .await?; + .await + .context("failed caching block")?; // if let Some(max_block_lag) = max_block_lag { // if rpc_head_block.number() < ??? { @@ -486,14 +500,18 @@ impl ConsensusFinder { Ok(changed) } - // TODO: this could definitely be cleaner. i don't like the error handling/unwrapping pub async fn best_consensus_connections( &mut self, authorization: &Arc, web3_connections: &Web3Rpcs, - ) -> Option { + ) -> anyhow::Result { // TODO: attach context to these? - let highest_known_block = self.all_rpcs_group()?.highest_block.as_ref()?; + let highest_known_block = self + .all_rpcs_group() + .context("no rpcs")? + .highest_block + .as_ref() + .context("no highest block")?; trace!("highest_known_block: {}", highest_known_block); @@ -518,10 +536,10 @@ impl ConsensusFinder { trace!("success on tier {}", i); // we got one! hopefully it didn't need to use any backups. // but even if it did need backup servers, that is better than going to a worse tier - return Some(consensus_head_connections); + return Ok(consensus_head_connections); } } - return None; + return Err(anyhow::anyhow!("failed finding consensus on all tiers")); } } diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index e5293d2d..2a3bd24a 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -1373,7 +1373,7 @@ mod tests { let authorization = Arc::new(Authorization::internal(None).unwrap()); let (head_block_sender, _head_block_receiver) = watch::channel(Default::default()); - let mut consensus_finder = ConsensusFinder::new(None, None); + let mut consensus_finder = ConsensusFinder::new(&[0, 1, 2, 3], None, None); // process None so that conns @@ -1584,7 +1584,7 @@ mod tests { let authorization = Arc::new(Authorization::internal(None).unwrap()); let (head_block_sender, _head_block_receiver) = watch::channel(Default::default()); - let mut connection_heads = ConsensusFinder::new(None, None); + let mut connection_heads = ConsensusFinder::new(&[0, 1, 2, 3], None, None); conns .process_block_from_rpc(