fixed one test. still more to fix though

This commit is contained in:
Bryan Stitt 2023-02-14 12:41:05 -08:00 committed by yenicelik
parent da33ec32eb
commit a519427fcf
3 changed files with 40 additions and 12 deletions

@ -8,6 +8,7 @@ use crate::{config::BlockAndRpc, jsonrpc::JsonRpcRequest};
use anyhow::Context;
use derive_more::From;
use ethers::prelude::{Block, TxHash, H256, U64};
use hashbrown::HashSet;
use log::{debug, error, trace, warn, Level};
use moka::future::Cache;
use serde::Serialize;
@ -315,7 +316,16 @@ impl Web3Rpcs {
) -> anyhow::Result<()> {
// TODO: indexmap or hashmap? what hasher? with_capacity?
// TODO: this will grow unbounded. prune old heads on this at the same time we prune the graph?
let mut connection_heads = ConsensusFinder::new(self.max_block_age, self.max_block_lag);
let configured_tiers: Vec<u64> = self
.conns
.values()
.map(|x| x.tier)
.collect::<HashSet<u64>>()
.into_iter()
.collect();
let mut connection_heads =
ConsensusFinder::new(&configured_tiers, self.max_block_age, self.max_block_lag);
loop {
match block_receiver.recv_async().await {

@ -3,6 +3,7 @@ use crate::frontend::authorization::Authorization;
use super::blockchain::Web3ProxyBlock;
use super::many::Web3Rpcs;
use super::one::Web3Rpc;
use anyhow::Context;
use ethers::prelude::{H256, U64};
use hashbrown::{HashMap, HashSet};
use log::{debug, trace, warn};
@ -371,9 +372,19 @@ pub struct ConsensusFinder {
}
impl ConsensusFinder {
pub fn new(max_block_age: Option<u64>, max_block_lag: Option<U64>) -> Self {
pub fn new(
configured_tiers: &[u64],
max_block_age: Option<u64>,
max_block_lag: Option<U64>,
) -> Self {
// TODO: this will need some thought when config reloading is written
let tiers = configured_tiers
.iter()
.map(|x| (*x, Default::default()))
.collect();
Self {
tiers: Default::default(),
tiers,
max_block_age,
max_block_lag,
}
@ -413,8 +424,10 @@ impl ConsensusFinder {
pub fn insert(&mut self, rpc: &Web3Rpc, new_block: Web3ProxyBlock) -> Option<Web3ProxyBlock> {
let mut old = None;
// TODO: error if rpc.tier is not in self.tiers
for (i, tier_group) in self.tiers.iter_mut().rev() {
if i > &rpc.tier {
if i < &rpc.tier {
break;
}
@ -443,7 +456,8 @@ impl ConsensusFinder {
// we don't know if its on the heaviest chain yet
rpc_head_block = web3_connections
.try_cache_block(rpc_head_block, false)
.await?;
.await
.context("failed caching block")?;
// if let Some(max_block_lag) = max_block_lag {
// if rpc_head_block.number() < ??? {
@ -486,14 +500,18 @@ impl ConsensusFinder {
Ok(changed)
}
// TODO: this could definitely be cleaner. i don't like the error handling/unwrapping
pub async fn best_consensus_connections(
&mut self,
authorization: &Arc<Authorization>,
web3_connections: &Web3Rpcs,
) -> Option<ConsensusWeb3Rpcs> {
) -> anyhow::Result<ConsensusWeb3Rpcs> {
// TODO: attach context to these?
let highest_known_block = self.all_rpcs_group()?.highest_block.as_ref()?;
let highest_known_block = self
.all_rpcs_group()
.context("no rpcs")?
.highest_block
.as_ref()
.context("no highest block")?;
trace!("highest_known_block: {}", highest_known_block);
@ -518,10 +536,10 @@ impl ConsensusFinder {
trace!("success on tier {}", i);
// we got one! hopefully it didn't need to use any backups.
// but even if it did need backup servers, that is better than going to a worse tier
return Some(consensus_head_connections);
return Ok(consensus_head_connections);
}
}
return None;
return Err(anyhow::anyhow!("failed finding consensus on all tiers"));
}
}

@ -1373,7 +1373,7 @@ mod tests {
let authorization = Arc::new(Authorization::internal(None).unwrap());
let (head_block_sender, _head_block_receiver) = watch::channel(Default::default());
let mut consensus_finder = ConsensusFinder::new(None, None);
let mut consensus_finder = ConsensusFinder::new(&[0, 1, 2, 3], None, None);
// process None so that
conns
@ -1584,7 +1584,7 @@ mod tests {
let authorization = Arc::new(Authorization::internal(None).unwrap());
let (head_block_sender, _head_block_receiver) = watch::channel(Default::default());
let mut connection_heads = ConsensusFinder::new(None, None);
let mut connection_heads = ConsensusFinder::new(&[0, 1, 2, 3], None, None);
conns
.process_block_from_rpc(