best block num has to check has_block_data

This commit is contained in:
Bryan Stitt 2023-05-16 18:18:56 -07:00
parent 30ea532c70
commit a0d8218eb1
2 changed files with 66 additions and 13 deletions

View File

@ -7,10 +7,10 @@ use derive_more::Constructor;
use ethers::prelude::{H256, U64};
use hashbrown::{HashMap, HashSet};
use itertools::{Itertools, MinMaxResult};
use log::{trace, warn};
use log::{debug, trace, warn};
use moka::future::Cache;
use serde::Serialize;
use std::cmp::Reverse;
use std::cmp::{Ordering, Reverse};
use std::collections::BTreeMap;
use std::fmt;
use std::sync::Arc;
@ -115,24 +115,76 @@ impl ConsensusWeb3Rpcs {
self.best_rpcs.len()
}
pub fn best_block_num(&self, skip_rpcs: &[Arc<Web3Rpc>]) -> Option<&U64> {
if self.best_rpcs.iter().all(|rpc| skip_rpcs.contains(rpc)) {
pub fn best_block_num(
&self,
needed_block_num: Option<&U64>,
skip_rpcs: &[Arc<Web3Rpc>],
) -> Option<&U64> {
// TODO: dry this up with `filter`?
fn _best_block_num_filter(
x: &ConsensusWeb3Rpcs,
rpc: &Arc<Web3Rpc>,
needed_block_num: Option<&U64>,
skip_rpcs: &[Arc<Web3Rpc>],
) -> bool {
// return true if this rpc will never work for us. "false" is good
if skip_rpcs.contains(rpc) {
// if rpc is skipped, it must have already been determined it is unable to serve the request
true
} else if let Some(needed_block_num) = needed_block_num {
if let Some(rpc_data) = x.rpc_data.get(rpc).as_ref() {
match rpc_data.head_block_num.cmp(needed_block_num) {
Ordering::Less => {
// rpc is not synced. let it catch up
false
}
Ordering::Greater | Ordering::Equal => {
// rpc is synced past the needed block. make sure the block isn't too old for it
!x.has_block_data(rpc, needed_block_num)
}
}
} else {
// no rpc data for this rpc. thats not promising
true
}
} else {
false
}
}
if self
.best_rpcs
.iter()
.all(|rpc| _best_block_num_filter(self, rpc, needed_block_num, skip_rpcs))
{
// all of the consensus rpcs are skipped
// iterate the other rpc tiers to find the next best block
let mut best_block = None;
let mut best_num = None;
for (next_ranking, next_rpcs) in self.other_rpcs.iter() {
if next_rpcs.iter().all(|rpc| skip_rpcs.contains(rpc)) {
// everything in this ranking is skipped
if next_rpcs
.iter()
.all(|rpc| _best_block_num_filter(self, rpc, needed_block_num, skip_rpcs))
{
// TODO: too verbose
debug!("everything in this ranking ({:?}) is skipped", next_ranking);
continue;
}
best_block = best_block.max(next_ranking.head_num.as_ref());
best_num = best_num.max(next_ranking.head_num.as_ref());
}
best_block
// TODO: too verbose
debug!("best (old) block: {:?}", best_num);
best_num
} else {
// not all the best synced rpcs are skipped yet. use the best head block
Some(self.head_block.number())
let best_num = self.head_block.number();
// TODO: too verbose
debug!("best (head) block: {}", best_num);
Some(best_num)
}
}
@ -143,6 +195,7 @@ impl ConsensusWeb3Rpcs {
.unwrap_or(false)
}
// TODO: better name for this
pub fn filter(
&self,
skip: &[Arc<Web3Rpc>],

View File

@ -1933,12 +1933,12 @@ async fn watch_for_block(
skip_rpcs: &[Arc<Web3Rpc>],
watch_consensus_rpcs: &mut watch::Receiver<Option<Arc<ConsensusWeb3Rpcs>>>,
) -> Web3ProxyResult<bool> {
info!("waiting for {:?}", needed_block_num);
debug!("waiting for {:?}", needed_block_num);
let mut best_block_num: Option<U64> = watch_consensus_rpcs
.borrow_and_update()
.as_ref()
.and_then(|x| x.best_block_num(skip_rpcs).copied());
.and_then(|x| x.best_block_num(needed_block_num, skip_rpcs).copied());
match (needed_block_num, best_block_num.as_ref()) {
(Some(x), Some(best)) => {
@ -1977,7 +1977,7 @@ async fn watch_for_block(
best_block_num = consensus_rpcs
.as_ref()
.and_then(|x| x.best_block_num(skip_rpcs).copied());
.and_then(|x| x.best_block_num(needed_block_num, skip_rpcs).copied());
}
Ok(true)