refactor wait_for_block into should_wait_for_block

This commit is contained in:
Bryan Stitt 2023-05-16 22:29:36 -07:00
parent a07da30042
commit 89961331af
3 changed files with 121 additions and 140 deletions

@ -86,25 +86,31 @@ impl PartialOrd for RpcRanking {
pub type RankedRpcMap = BTreeMap<RpcRanking, Vec<Arc<Web3Rpc>>>;
pub enum ShouldWaitForBlock {
Ready,
Wait { current: Option<U64> },
NeverReady,
}
/// A collection of Web3Rpcs that are on the same block.
/// Serialize is so we can print it on our debug endpoint
/// Serialize is so we can print it on our /status endpoint
#[derive(Clone, Serialize)]
pub struct ConsensusWeb3Rpcs {
pub(crate) tier: u64,
pub(crate) backups_needed: bool,
// TODO: this is already inside best_rpcs. give that a shorter serialize here and then include this again
// TODO: this is already inside best_rpcs. Don't skip, instead make a shorter serialize
#[serde(skip_serializing)]
pub(crate) head_block: Web3ProxyBlock,
// TODO: smaller serialize
pub(crate) best_rpcs: Vec<Arc<Web3Rpc>>,
pub(crate) head_rpcs: Vec<Arc<Web3Rpc>>,
// TODO: make this work. the key needs to be a string
// TODO: make this work. the key needs to be a string. I think we need `serialize_with`
#[serde(skip_serializing)]
pub(crate) other_rpcs: RankedRpcMap,
// TODO: make this work. the key needs to be a string
// TODO: make this work. the key needs to be a string. I think we need `serialize_with`
#[serde(skip_serializing)]
rpc_data: HashMap<Arc<Web3Rpc>, RpcData>,
}
@ -112,79 +118,65 @@ pub struct ConsensusWeb3Rpcs {
impl ConsensusWeb3Rpcs {
#[inline]
pub fn num_consensus_rpcs(&self) -> usize {
self.best_rpcs.len()
self.head_rpcs.len()
}
pub fn best_block_num(
/// will tell you if you should wait for a block
/// TODO: also include method (or maybe an enum representing the different prune types)
pub fn should_wait_for_block(
&self,
needed_block_num: Option<&U64>,
skip_rpcs: &[Arc<Web3Rpc>],
) -> Option<&U64> {
// TODO: dry this up with `filter`?
fn _best_block_num_filter(
x: &ConsensusWeb3Rpcs,
rpc: &Arc<Web3Rpc>,
needed_block_num: Option<&U64>,
skip_rpcs: &[Arc<Web3Rpc>],
) -> bool {
// return true if this rpc will never work for us. "false" is good
if skip_rpcs.contains(rpc) {
// if rpc is skipped, it must have already been determined it is unable to serve the request
true
} else if let Some(needed_block_num) = needed_block_num {
if let Some(rpc_data) = x.rpc_data.get(rpc).as_ref() {
match rpc_data.head_block_num.cmp(needed_block_num) {
Ordering::Less => {
// rpc is not synced. let it catch up
false
}
Ordering::Greater | Ordering::Equal => {
// rpc is synced past the needed block. make sure the block isn't too old for it
!x.has_block_data(rpc, needed_block_num)
}
}
} else {
// no rpc data for this rpc. thats not promising
true
}
} else {
false
) -> ShouldWaitForBlock {
if self
.head_rpcs
.iter()
.any(|rpc| self.rpc_will_work_eventually(rpc, needed_block_num, skip_rpcs))
{
let head_num = self.head_block.number();
if Some(head_num) >= needed_block_num {
debug!("best (head) block: {}", head_num);
return ShouldWaitForBlock::Ready;
}
}
if self
.best_rpcs
.iter()
.all(|rpc| _best_block_num_filter(self, rpc, needed_block_num, skip_rpcs))
{
// all of the consensus rpcs are skipped
// iterate the other rpc tiers to find the next best block
let mut best_num = None;
for (next_ranking, next_rpcs) in self.other_rpcs.iter() {
if next_rpcs
.iter()
.all(|rpc| _best_block_num_filter(self, rpc, needed_block_num, skip_rpcs))
{
// TODO: too verbose
debug!("everything in this ranking ({:?}) is skipped", next_ranking);
continue;
}
// all of the head rpcs are skipped
best_num = best_num.max(next_ranking.head_num.as_ref());
let mut best_num = None;
// iterate the other rpc tiers to find the next best block
for (next_ranking, next_rpcs) in self.other_rpcs.iter() {
if !next_rpcs
.iter()
.any(|rpc| self.rpc_will_work_eventually(rpc, needed_block_num, skip_rpcs))
{
// TODO: too verbose
debug!("everything in this ranking ({:?}) is skipped", next_ranking);
continue;
}
let next_head_num = next_ranking.head_num.as_ref();
if next_head_num >= needed_block_num {
debug!("best (head) block: {:?}", next_head_num);
return ShouldWaitForBlock::Ready;
}
best_num = next_head_num;
}
// TODO: this seems wrong
if best_num.is_some() {
// TODO: too verbose
debug!("best (old) block: {:?}", best_num);
best_num
ShouldWaitForBlock::Wait {
current: best_num.copied(),
}
} else {
// not all the best synced rpcs are skipped yet. use the best head block
let best_num = self.head_block.number();
// TODO: too verbose
debug!("best (head) block: {}", best_num);
Some(best_num)
debug!("never ready");
ShouldWaitForBlock::NeverReady
}
}
@ -195,8 +187,48 @@ impl ConsensusWeb3Rpcs {
.unwrap_or(false)
}
// TODO: take method as a param, too. mark nodes with supported methods (maybe do it optimistically? on)
fn rpc_will_work_eventually(
&self,
rpc: &Arc<Web3Rpc>,
needed_block_num: Option<&U64>,
skip_rpcs: &[Arc<Web3Rpc>],
) -> bool {
// return true if this rpc will never work for us. "false" is good
if skip_rpcs.contains(rpc) {
// if rpc is skipped, it must have already been determined it is unable to serve the request
return false;
}
if let Some(needed_block_num) = needed_block_num {
if let Some(rpc_data) = self.rpc_data.get(rpc) {
match rpc_data.head_block_num.cmp(needed_block_num) {
Ordering::Less => {
debug!("{} is behind. let it catch up", rpc);
return true;
}
Ordering::Greater | Ordering::Equal => {
// rpc is synced past the needed block. make sure the block isn't too old for it
if self.has_block_data(rpc, needed_block_num) {
debug!("{} has {}", rpc, needed_block_num);
return true;
} else {
debug!("{} does not have {}", rpc, needed_block_num);
return false;
}
}
}
}
// no rpc data for this rpc. thats not promising
return true;
}
false
}
// TODO: better name for this
pub fn filter(
pub fn rpc_will_work_now(
&self,
skip: &[Arc<Web3Rpc>],
min_block_needed: Option<&U64>,
@ -244,7 +276,7 @@ impl fmt::Debug for ConsensusWeb3Rpcs {
// TODO: print the actual conns?
f.debug_struct("ConsensusWeb3Rpcs")
.field("head_block", &self.head_block)
.field("num_conns", &self.best_rpcs.len())
.field("num_conns", &self.head_rpcs.len())
.finish_non_exhaustive()
}
}
@ -272,7 +304,7 @@ impl Web3Rpcs {
let consensus = self.watch_consensus_rpcs_sender.borrow();
if let Some(consensus) = consensus.as_ref() {
!consensus.best_rpcs.is_empty()
!consensus.head_rpcs.is_empty()
} else {
false
}
@ -282,7 +314,7 @@ impl Web3Rpcs {
let consensus = self.watch_consensus_rpcs_sender.borrow();
if let Some(consensus) = consensus.as_ref() {
consensus.best_rpcs.len()
consensus.head_rpcs.len()
} else {
0
}
@ -598,7 +630,7 @@ impl ConsensusFinder {
let consensus = ConsensusWeb3Rpcs {
tier,
head_block: maybe_head_block.clone(),
best_rpcs: consensus_rpcs,
head_rpcs: consensus_rpcs,
other_rpcs,
backups_needed,
rpc_data,

@ -1,6 +1,6 @@
///! Load balanced communication with a group of web3 rpc providers
use super::blockchain::{BlocksByHashCache, Web3ProxyBlock};
use super::consensus::ConsensusWeb3Rpcs;
use super::consensus::{ConsensusWeb3Rpcs, ShouldWaitForBlock};
use super::one::Web3Rpc;
use super::request::{OpenRequestHandle, OpenRequestResult, RequestErrorHandler};
use crate::app::{flatten_handle, AnyhowJoinHandle, Web3ProxyApp};
@ -521,8 +521,8 @@ impl Web3Rpcs {
);
// todo: for now, build the map m here. once that works, do as much of it as possible while building ConsensusWeb3Rpcs
for x in consensus_rpcs.best_rpcs.iter().filter(|rpc| {
consensus_rpcs.filter(skip, min_block_needed, max_block_needed, rpc)
for x in consensus_rpcs.head_rpcs.iter().filter(|rpc| {
consensus_rpcs.rpc_will_work_now(skip, min_block_needed, max_block_needed, rpc)
}) {
m.entry(best_key).or_insert_with(Vec::new).push(x.clone());
}
@ -533,7 +533,12 @@ impl Web3Rpcs {
let v: Vec<_> = v
.iter()
.filter(|rpc| {
consensus_rpcs.filter(skip, min_block_needed, max_block_needed, rpc)
consensus_rpcs.rpc_will_work_now(
skip,
min_block_needed,
max_block_needed,
rpc,
)
})
.cloned()
.collect();
@ -698,7 +703,7 @@ impl Web3Rpcs {
let synced_rpcs = self.watch_consensus_rpcs_sender.borrow();
if let Some(synced_rpcs) = synced_rpcs.as_ref() {
synced_rpcs.best_rpcs.clone()
synced_rpcs.head_rpcs.clone()
} else {
vec![]
}
@ -967,13 +972,15 @@ impl Web3Rpcs {
let waiting_for = min_block_needed.max(max_block_needed);
if watch_for_block(waiting_for, &skip_rpcs, &mut watch_consensus_rpcs).await? {
// block found! continue so we can check for another rpc
} else {
// rate limits are likely keeping us from serving the head block
watch_consensus_rpcs.changed().await?;
watch_consensus_rpcs.borrow_and_update();
if let Some(consensus_rpcs) = watch_consensus_rpcs.borrow_and_update().as_ref()
{
match consensus_rpcs.should_wait_for_block(waiting_for, &skip_rpcs) {
ShouldWaitForBlock::NeverReady => break,
ShouldWaitForBlock::Ready => continue,
ShouldWaitForBlock::Wait { .. } => {}
}
}
watch_consensus_rpcs.changed().await;
}
}
}
@ -1926,67 +1933,6 @@ mod tests {
}
}
/// returns `true` when the desired block number is available
/// TODO: max wait time? max number of blocks to wait for? time is probably best
async fn watch_for_block(
needed_block_num: Option<&U64>,
skip_rpcs: &[Arc<Web3Rpc>],
watch_consensus_rpcs: &mut watch::Receiver<Option<Arc<ConsensusWeb3Rpcs>>>,
) -> Web3ProxyResult<bool> {
let mut best_block_num: Option<U64> = watch_consensus_rpcs
.borrow_and_update()
.as_ref()
.and_then(|x| x.best_block_num(needed_block_num, skip_rpcs).copied());
debug!(
"waiting for {:?}. best {:?}",
needed_block_num, best_block_num
);
match (needed_block_num, best_block_num.as_ref()) {
(Some(x), Some(best)) => {
if x <= best {
// the best block is past the needed block and no servers have the needed data
// this happens if the block is old and all archive servers are offline
// there is no chance we will get this block without adding an archive server to the config
// TODO: i think this can also happen if we are being rate limited! but then waiting might work. need skip_rpcs to be smarter
warn!("watching for block {} will never succeed. best {}", x, best);
return Ok(false);
}
}
(None, None) => {
// i don't think this is possible
// maybe if we internally make a request for the latest block and all our servers are disconnected?
warn!("how'd this None/None happen?");
return Ok(false);
}
(Some(_), None) => {
// block requested but no servers synced. we will wait
// TODO: if the web3rpcs connected to this consensus isn't watching head blocks, exit with an erorr (waiting for blocks won't ever work)
}
(None, Some(head)) => {
// i don't think this is possible
// maybe if we internally make a request for the latest block and all our servers are disconnected?
warn!("how'd this None/Some({}) happen?", head);
return Ok(false);
}
};
// future block is requested
// wait for the block to arrive
while best_block_num.as_ref() < needed_block_num {
watch_consensus_rpcs.changed().await?;
let consensus_rpcs = watch_consensus_rpcs.borrow_and_update();
best_block_num = consensus_rpcs
.as_ref()
.and_then(|x| x.best_block_num(needed_block_num, skip_rpcs).copied());
}
Ok(true)
}
#[cfg(test)]
mod test {
use std::cmp::Reverse;

@ -47,6 +47,8 @@ pub struct Web3Rpc {
/// it is an async lock because we hold it open across awaits
/// this provider is only used for new heads subscriptions
/// TODO: benchmark ArcSwapOption and a watch::Sender
/// TODO: only the websocket provider needs to be behind an asyncrwlock!
/// TODO: the http provider is just an http_client
pub(super) provider: AsyncRwLock<Option<Arc<Web3Provider>>>,
/// keep track of hard limits
/// this is only inside an Option so that the "Default" derive works. it will always be set.
@ -1216,6 +1218,7 @@ impl Web3Rpc {
if unlocked_provider.is_some() || self.provider.read().await.is_some() {
// we already have an unlocked provider. no need to lock
} else {
warn!("no provider on {}", self);
return Ok(OpenRequestResult::NotReady);
}