move health check inside consensus finder

This commit is contained in:
Bryan Stitt 2023-10-23 19:44:21 -07:00
parent 6cb2accd0d
commit d5ac4b5d78
3 changed files with 29 additions and 19 deletions

@ -624,13 +624,6 @@ impl ConsensusFinder {
.await
.web3_context("failed caching block")?;
if let Some(max_age) = self.max_head_block_age {
if rpc_head_block.age() > max_age {
warn!("rpc_head_block from {} is too old! {}", rpc, rpc_head_block);
return Ok(self.remove(&rpc).is_some());
}
}
if let Some(prev_block) = self.insert(rpc, rpc_head_block.clone()).await {
// false if this block was already sent by this rpc
// true if new block for this rpc
@ -749,7 +742,25 @@ impl ConsensusFinder {
) -> Web3ProxyResult<Option<RankedRpcs>> {
self.update_tiers().await?;
let minmax_block = self.rpc_heads.values().minmax_by_key(|&x| x.number());
let minmax_block = self
.rpc_heads
.iter()
.filter(|(rpc, x)| {
if !rpc.healthy.load(atomic::Ordering::Relaxed) {
// TODO: we might go backwards if this happens. make sure we hold on to highest block from a previous run
return false;
}
if let Some(max_block_age) = self.max_head_block_age {
if x.age() > max_block_age {
return false;
}
}
true
})
.map(|(_, x)| x)
.minmax_by_key(|x| x.number());
let (lowest_block, highest_block) = match minmax_block {
MinMaxResult::NoElements => return Ok(None),
@ -764,6 +775,7 @@ impl ConsensusFinder {
trace!("lowest_block_number: {}", lowest_block.number());
// TODO: move this default. should be in config, not here
// TODO: arbitrum needs more slack
let max_lag_block_number = highest_block_number
.saturating_sub(self.max_head_block_lag.unwrap_or_else(|| U64::from(5)));

@ -35,7 +35,7 @@ pub struct Web3Rpcs {
pub(crate) name: Cow<'static, str>,
pub(crate) chain_id: u64,
/// if watch_head_block is some, Web3Rpc inside self will send blocks here when they get them
pub(crate) block_sender: mpsc::UnboundedSender<(Option<Web3ProxyBlock>, Arc<Web3Rpc>)>,
pub(crate) block_and_rpc_sender: mpsc::UnboundedSender<(Option<Web3ProxyBlock>, Arc<Web3Rpc>)>,
/// any requests will be forwarded to one (or more) of these connections
/// TODO: hopefully this not being an async lock will be okay. if you need it across awaits, clone the arc
pub(crate) by_name: RwLock<HashMap<String, Arc<Web3Rpc>>>,
@ -139,7 +139,7 @@ impl Web3Rpcs {
average_block_interval(chain_id).mul_f32((max_head_block_lag.as_u64() * 10) as f32);
let connections = Arc::new(Self {
block_sender: block_and_rpc_sender,
block_and_rpc_sender,
blocks_by_hash,
blocks_by_number,
by_name,
@ -220,7 +220,7 @@ impl Web3Rpcs {
let vredis_pool = app.vredis_pool.clone();
let block_and_rpc_sender = if self.watch_head_block.is_some() {
Some(self.block_sender.clone())
Some(self.block_and_rpc_sender.clone())
} else {
None
};

@ -779,10 +779,9 @@ impl Web3Rpc {
let mut abort_handles = vec![];
// health check that runs if there haven't been any recent requests
let health_handle = if let Some(block_and_rpc_sender) = block_and_rpc_sender.clone() {
let health_handle = if block_and_rpc_sender.is_some() {
// TODO: move this into a proper function
let rpc = self.clone();
let block_map = block_map.clone();
// TODO: how often? different depending on the chain?
// TODO: reset this timeout when a new block is seen? we need to keep median_request_latency updated though
@ -812,10 +811,6 @@ impl Web3Rpc {
} else {
error!(?err, "health check on {} failed", rpc);
}
// clear the head block since we are unhealthy and shouldn't serve any requests
rpc.send_head_block_result(Ok(None), &block_and_rpc_sender, &block_map)
.await?;
} else {
rpc.healthy.store(true, atomic::Ordering::Relaxed);
}
@ -881,7 +876,7 @@ impl Web3Rpc {
let f = async move {
clone
.subscribe_new_heads(block_and_rpc_sender.clone(), block_map)
.subscribe_new_heads(block_and_rpc_sender, block_map)
.await
};
@ -915,6 +910,9 @@ impl Web3Rpc {
// exit if any of the futures exit
let (first_exit, _, _) = select_all(futures).await;
// mark unhealthy
self.healthy.store(false, atomic::Ordering::Relaxed);
debug!(?first_exit, "subscriptions on {} exited", self);
// clear the head block
@ -928,7 +926,7 @@ impl Web3Rpc {
a.abort();
}
// TODO: tell ethers to disconnect?
// TODO: tell ethers to disconnect? i think dropping will do that
self.ws_provider.store(None);
Ok(())