diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index 63e0dc6a..28ac3940 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -624,13 +624,6 @@ impl ConsensusFinder { .await .web3_context("failed caching block")?; - if let Some(max_age) = self.max_head_block_age { - if rpc_head_block.age() > max_age { - warn!("rpc_head_block from {} is too old! {}", rpc, rpc_head_block); - return Ok(self.remove(&rpc).is_some()); - } - } - if let Some(prev_block) = self.insert(rpc, rpc_head_block.clone()).await { // false if this block was already sent by this rpc // true if new block for this rpc @@ -749,7 +742,25 @@ impl ConsensusFinder { ) -> Web3ProxyResult> { self.update_tiers().await?; - let minmax_block = self.rpc_heads.values().minmax_by_key(|&x| x.number()); + let minmax_block = self + .rpc_heads + .iter() + .filter(|(rpc, x)| { + if !rpc.healthy.load(atomic::Ordering::Relaxed) { + // TODO: we might go backwards if this happens. make sure we hold on to highest block from a previous run + return false; + } + + if let Some(max_block_age) = self.max_head_block_age { + if x.age() > max_block_age { + return false; + } + } + + true + }) + .map(|(_, x)| x) + .minmax_by_key(|x| x.number()); let (lowest_block, highest_block) = match minmax_block { MinMaxResult::NoElements => return Ok(None), @@ -764,6 +775,7 @@ impl ConsensusFinder { trace!("lowest_block_number: {}", lowest_block.number()); // TODO: move this default. should be in config, not here + // TODO: arbitrum needs more slack let max_lag_block_number = highest_block_number .saturating_sub(self.max_head_block_lag.unwrap_or_else(|| U64::from(5))); diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 9fc914b0..23c0ba31 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -35,7 +35,7 @@ pub struct Web3Rpcs { pub(crate) name: Cow<'static, str>, pub(crate) chain_id: u64, /// if watch_head_block is some, Web3Rpc inside self will send blocks here when they get them - pub(crate) block_sender: mpsc::UnboundedSender<(Option, Arc)>, + pub(crate) block_and_rpc_sender: mpsc::UnboundedSender<(Option, Arc)>, /// any requests will be forwarded to one (or more) of these connections /// TODO: hopefully this not being an async lock will be okay. if you need it across awaits, clone the arc pub(crate) by_name: RwLock>>, @@ -139,7 +139,7 @@ impl Web3Rpcs { average_block_interval(chain_id).mul_f32((max_head_block_lag.as_u64() * 10) as f32); let connections = Arc::new(Self { - block_sender: block_and_rpc_sender, + block_and_rpc_sender, blocks_by_hash, blocks_by_number, by_name, @@ -220,7 +220,7 @@ impl Web3Rpcs { let vredis_pool = app.vredis_pool.clone(); let block_and_rpc_sender = if self.watch_head_block.is_some() { - Some(self.block_sender.clone()) + Some(self.block_and_rpc_sender.clone()) } else { None }; diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index 9450c0d0..5085ef14 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -779,10 +779,9 @@ impl Web3Rpc { let mut abort_handles = vec![]; // health check that runs if there haven't been any recent requests - let health_handle = if let Some(block_and_rpc_sender) = block_and_rpc_sender.clone() { + let health_handle = if block_and_rpc_sender.is_some() { // TODO: move this into a proper function let rpc = self.clone(); - let block_map = block_map.clone(); // TODO: how often? different depending on the chain? // TODO: reset this timeout when a new block is seen? we need to keep median_request_latency updated though @@ -812,10 +811,6 @@ impl Web3Rpc { } else { error!(?err, "health check on {} failed", rpc); } - - // clear the head block since we are unhealthy and shouldn't serve any requests - rpc.send_head_block_result(Ok(None), &block_and_rpc_sender, &block_map) - .await?; } else { rpc.healthy.store(true, atomic::Ordering::Relaxed); } @@ -881,7 +876,7 @@ impl Web3Rpc { let f = async move { clone - .subscribe_new_heads(block_and_rpc_sender.clone(), block_map) + .subscribe_new_heads(block_and_rpc_sender, block_map) .await }; @@ -915,6 +910,9 @@ impl Web3Rpc { // exit if any of the futures exit let (first_exit, _, _) = select_all(futures).await; + // mark unhealthy + self.healthy.store(false, atomic::Ordering::Relaxed); + debug!(?first_exit, "subscriptions on {} exited", self); // clear the head block @@ -928,7 +926,7 @@ impl Web3Rpc { a.abort(); } - // TODO: tell ethers to disconnect? + // TODO: tell ethers to disconnect? i think dropping will do that self.ws_provider.store(None); Ok(())