From ba7eba41db125a851a80dbf094bf06d19875bf41 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Fri, 13 Oct 2023 20:26:02 -0700 Subject: [PATCH] allow_unhealthy should also skip checking block heights --- web3_proxy/src/rpcs/one.rs | 115 ++++++++++++++++++++----------------- 1 file changed, 62 insertions(+), 53 deletions(-) diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index ef6d3254..ef5076ea 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -737,9 +737,19 @@ impl Web3Rpc { trace!("starting subscriptions on {}", self); - self.check_provider(chain_id) + // start optimistically + // we need healthy to start at "true" or else check_provider and other things will not send requests + // self.healthy.store(true, atomic::Ordering::Relaxed); + + // TODO: this should set healthy to false. its not strictly necessary since block checks will fail, but this is a faster check + if let Err(err) = self + .check_provider(chain_id) .await - .web3_context("failed check_provider")?; + .web3_context("failed check_provider") + { + self.healthy.store(false, atomic::Ordering::Relaxed); + return Err(err); + } let mut futures = Vec::new(); let mut abort_handles = vec![]; @@ -810,8 +820,6 @@ impl Web3Rpc { tokio::spawn(f) } else { - self.healthy.store(true, atomic::Ordering::Relaxed); - let rpc = self.clone(); let health_sleep_seconds = 60; @@ -1153,69 +1161,70 @@ impl Web3Rpc { ) -> Web3ProxyResult { // TODO: if websocket is reconnecting, return an error? - // TODO: what if this is a health check?! - if !(self.healthy.load(atomic::Ordering::Relaxed) || allow_unhealthy) { - return Ok(OpenRequestResult::Failed); - } - - // make sure this block has the oldest block that this request needs - // TODO: should this check be optional? we've probably already done it for RpcForRuest::inner. for now its fine to duplicate the check - if let Some(block_needed) = web3_request.min_block_needed() { - if !self.has_block_data(block_needed) { - trace!(%web3_request, %block_needed, "{} cannot serve this request. Missing min block", self); + if !allow_unhealthy { + if !(self.healthy.load(atomic::Ordering::Relaxed)) { return Ok(OpenRequestResult::Failed); } - } - // make sure this block has the newest block that this request needs - // TODO: should this check be optional? we've probably already done it for RpcForRuest::inner. for now its fine to duplicate the check - if let Some(block_needed) = web3_request.max_block_needed() { - if !self.has_block_data(block_needed) { - trace!(%web3_request, %block_needed, "{} cannot serve this request. Missing max block", self); + // make sure this block has the oldest block that this request needs + // TODO: should this check be optional? we've probably already done it for RpcForRuest::inner. for now its fine to duplicate the check + if let Some(block_needed) = web3_request.min_block_needed() { + if !self.has_block_data(block_needed) { + trace!(%web3_request, %block_needed, "{} cannot serve this request. Missing min block", self); + return Ok(OpenRequestResult::Failed); + } + } - let clone = self.clone(); - let connect_timeout_at = web3_request.connect_timeout_at(); + // make sure this block has the newest block that this request needs + // TODO: should this check be optional? we've probably already done it for RpcForRuest::inner. for now its fine to duplicate the check + if let Some(block_needed) = web3_request.max_block_needed() { + if !self.has_block_data(block_needed) { + trace!(%web3_request, %block_needed, "{} cannot serve this request. Missing max block", self); - // create a future that resolves once this rpc can serve this request - // TODO: i don't love this future. think about it more - let synced_f = async move { - let mut head_block_receiver = - clone.head_block_sender.as_ref().unwrap().subscribe(); + let clone = self.clone(); + let connect_timeout_at = web3_request.connect_timeout_at(); - // TODO: if head_block is far behind block_needed, retrurn now - head_block_receiver.borrow_and_update(); + // create a future that resolves once this rpc can serve this request + // TODO: i don't love this future. think about it more + let synced_f = async move { + let mut head_block_receiver = + clone.head_block_sender.as_ref().unwrap().subscribe(); - loop { - select! { - _ = head_block_receiver.changed() => { - if let Some(head_block_number) = head_block_receiver.borrow_and_update().as_ref().map(|x| x.number()) { - if head_block_number >= block_needed { - trace!("the block we needed has arrived!"); - break; + // TODO: if head_block is far behind block_needed, retrurn now + head_block_receiver.borrow_and_update(); + + loop { + select! { + _ = head_block_receiver.changed() => { + if let Some(head_block_number) = head_block_receiver.borrow_and_update().as_ref().map(|x| x.number()) { + if head_block_number >= block_needed { + trace!("the block we needed has arrived!"); + break; + } + // wait up to 2 blocks + // TODO: configurable wait per chain + if head_block_number + U64::from(2) < block_needed { + return Err(Web3ProxyError::FarFutureBlock { head: head_block_number, requested: block_needed }); + } + } else { + // TODO: what should we do? this server has no blocks at all. we can wait, but i think exiting now is best + // yield_now().await; + error!("no head block during try_request_handle on {}", clone); + return Err(Web3ProxyError::NoServersSynced); } - // wait up to 2 blocks - // TODO: configurable wait per chain - if head_block_number + U64::from(2) < block_needed { - return Err(Web3ProxyError::FarFutureBlock { head: head_block_number, requested: block_needed }); - } - } else { - // TODO: what should we do? this server has no blocks at all. we can wait, but i think exiting now is best - // yield_now().await; - error!("no head block during try_request_handle on {}", clone); + } + _ = sleep_until(connect_timeout_at) => { + error!("connection timeout on {}", clone); return Err(Web3ProxyError::NoServersSynced); } } - _ = sleep_until(connect_timeout_at) => { - error!("connection timeout on {}", clone); - return Err(Web3ProxyError::NoServersSynced); - } } - } - Ok(clone) - }; + Ok(clone) + }; - return Ok(OpenRequestResult::Lagged(Box::pin(synced_f))); + return Ok(OpenRequestResult::Lagged(Box::pin(synced_f))); + } } }