From 9ba2045dbd1f2159a82d0dd198d3ec35c118ad0d Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Tue, 10 Oct 2023 15:46:11 -0700 Subject: [PATCH] improve health checking --- web3_proxy/src/rpcs/consensus.rs | 2 +- web3_proxy/src/rpcs/one.rs | 263 +++++++++++++++++++++---------- 2 files changed, 181 insertions(+), 84 deletions(-) diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index 9ac03f33..11176be9 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -1055,7 +1055,7 @@ impl RpcsForRequest { attempted.push(best_rpc); match best_rpc - .try_request_handle(&self.request, error_handler) + .try_request_handle(&self.request, error_handler, false) .await { Ok(OpenRequestResult::Handle(handle)) => { diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index 3099becd..6e989c09 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -26,7 +26,7 @@ use serde_json::json; use std::cmp::Reverse; use std::fmt; use std::hash::{Hash, Hasher}; -use std::sync::atomic::{self, AtomicU32, AtomicU64, AtomicUsize}; +use std::sync::atomic::{self, AtomicBool, AtomicU32, AtomicU64, AtomicUsize}; use std::{cmp::Ordering, sync::Arc}; use tokio::select; use tokio::sync::{mpsc, watch, RwLock as AsyncRwLock}; @@ -89,6 +89,8 @@ pub struct Web3Rpc { pub(super) disconnect_watch: Option>, /// created_at is only inside an Option so that the "Default" derive works. it will always be set. pub(super) created_at: Option, + /// false if a health check has failed + pub(super) healthy: AtomicBool, } impl Web3Rpc { @@ -190,6 +192,9 @@ impl Web3Rpc { let (disconnect_watch, _) = watch::channel(false); + // TODO: start optimistically? + let healthy = false.into(); + let new_rpc = Self { automatic_block_limit, backup, @@ -210,6 +215,7 @@ impl Web3Rpc { subscribe_txs: config.subscribe_txs, ws_url, disconnect_watch: Some(disconnect_watch), + healthy, ..Default::default() }; @@ -591,8 +597,9 @@ impl Web3Rpc { *self.disconnect_watch.as_ref().unwrap().borrow() } - async fn healthcheck( + async fn check_health( self: &Arc, + detailed_healthcheck: bool, error_handler: Option, ) -> Web3ProxyResult<()> { let head_block = self.head_block_sender.as_ref().unwrap().borrow().clone(); @@ -603,39 +610,41 @@ impl Web3Rpc { return Err(anyhow::anyhow!("head_block is too old!").into()); } - let block_number = head_block.number(); + if detailed_healthcheck { + let block_number = head_block.number(); - let to = if let Some(txid) = head_block.transactions().last().cloned() { - let tx = self - .internal_request::<_, Option>( - "eth_getTransactionByHash", - &(txid,), - error_handler, - Some(Duration::from_secs(5)), - ) - .await? - .context("no transaction")?; + let to = if let Some(txid) = head_block.transactions().last().cloned() { + let tx = self + .internal_request::<_, Option>( + "eth_getTransactionByHash", + &(txid,), + error_handler, + Some(Duration::from_secs(5)), + ) + .await? + .context("no transaction")?; - // TODO: what default? something real? - tx.to.unwrap_or_else(|| { + // TODO: what default? something real? + tx.to.unwrap_or_else(|| { + "0xdead00000000000000000000000000000000beef" + .parse::
() + .expect("deafbeef") + }) + } else { "0xdead00000000000000000000000000000000beef" .parse::
() .expect("deafbeef") - }) - } else { - "0xdead00000000000000000000000000000000beef" - .parse::
() - .expect("deafbeef") - }; + }; - let _code = self - .internal_request::<_, Option>( - "eth_getCode", - &(to, block_number), - error_handler, - Some(Duration::from_secs(5)), - ) - .await?; + let _code = self + .internal_request::<_, Option>( + "eth_getCode", + &(to, block_number), + error_handler, + Some(Duration::from_secs(5)), + ) + .await?; + } } else { // TODO: if head block is none for too long, give an error } @@ -671,14 +680,13 @@ impl Web3Rpc { break; } - if self.backup { - debug!("reconnecting to {} in 30 seconds", self); - } else { - info!("reconnecting to {} in 30 seconds", self); - } - // TODO: exponential backoff with jitter - sleep(Duration::from_secs(30)).await; + if self.backup { + debug!("reconnecting to {} in 10 seconds", self); + } else { + info!("reconnecting to {} in 10 seconds", self); + } + sleep(Duration::from_secs(10)).await; } Ok(()) @@ -728,7 +736,7 @@ impl Web3Rpc { let mut futures = FuturesUnordered::new(); // TODO: use this channel instead of self.disconnect_watch - let (subscribe_stop_tx, subscribe_stop_rx) = watch::channel(false); + let (subscribe_stop_tx, mut subscribe_stop_rx) = watch::channel(false); // subscribe to the disconnect watch. the app uses this when shutting down or when configs change if let Some(disconnect_watch_tx) = self.disconnect_watch.as_ref() { @@ -753,13 +761,14 @@ impl Web3Rpc { } // health check that runs if there haven't been any recent requests - if block_and_rpc_sender.is_some() { + if let Some(block_and_rpc_sender) = block_and_rpc_sender.clone() { // TODO: move this into a proper function let rpc = self.clone(); + let block_map = block_map.clone(); // TODO: how often? different depending on the chain? // TODO: reset this timeout when a new block is seen? we need to keep median_request_latency updated though - let health_sleep_seconds = 5; + let health_sleep_seconds = 10; // health check loop let f = async move { @@ -767,29 +776,54 @@ impl Web3Rpc { let mut old_total_requests = 0; let mut new_total_requests; + if *subscribe_stop_rx.borrow_and_update() { + trace!( + "stopping healthcheck loop before even starting it on {}", + rpc + ); + return Ok(()); + } + // errors here should not cause the loop to exit! - while !(*subscribe_stop_rx.borrow()) { + loop { new_total_requests = rpc.internal_requests.load(atomic::Ordering::Relaxed) + rpc.external_requests.load(atomic::Ordering::Relaxed); - if new_total_requests - old_total_requests < 5 { - // TODO: if this fails too many times, reset the connection - // TODO: move this into a function and the chaining should be easier - if let Err(err) = rpc.healthcheck(error_handler).await { - // TODO: different level depending on the error handler - // TODO: if rate limit error, set "retry_at" - if rpc.backup { - warn!(?err, "health check on {} failed", rpc); - } else { - error!(?err, "health check on {} failed", rpc); - } + let detailed_healthcheck = new_total_requests - old_total_requests < 5; + + // TODO: if this fails too many times, reset the connection + if let Err(err) = rpc.check_health(detailed_healthcheck, error_handler).await { + rpc.healthy.store(false, atomic::Ordering::Relaxed); + + // TODO: different level depending on the error handler + // TODO: if rate limit error, set "retry_at" + if rpc.backup { + warn!(?err, "health check on {} failed", rpc); + } else { + error!(?err, "health check on {} failed", rpc); } + + // clear the head block since we are unhealthy and shouldn't serve any requests + rpc.send_head_block_result(Ok(None), &block_and_rpc_sender, &block_map) + .await?; + } else { + rpc.healthy.store(true, atomic::Ordering::Relaxed); } // TODO: should we count the requests done inside this health check old_total_requests = new_total_requests; - sleep(Duration::from_secs(health_sleep_seconds)).await; + select! { + x = subscribe_stop_rx.changed() => { + if x.is_err() || *subscribe_stop_rx.borrow_and_update() { + trace!(%rpc, "stopping http block subscription"); + break; + } + } + _ = sleep(Duration::from_secs(health_sleep_seconds)) => { + // continue + } + } } trace!("healthcheck loop on {} exited", rpc); @@ -797,15 +831,40 @@ impl Web3Rpc { Ok(()) }; + // TODO: log quick_check lik + let initial_check = if let Err(err) = self.check_health(false, error_handler).await { + if self.backup { + warn!(?err, "initial health check on {} failed", self); + } else { + error!(?err, "initial health check on {} failed", self); + } + + false + } else { + true + }; + + self.healthy.store(initial_check, atomic::Ordering::Relaxed); + futures.push(flatten_handle(tokio::spawn(f))); + } else { + self.healthy.store(true, atomic::Ordering::Relaxed); } // subscribe to new heads if let Some(block_and_rpc_sender) = block_and_rpc_sender.clone() { let clone = self.clone(); - let subscribe_stop_rx = subscribe_stop_tx.subscribe(); + let mut subscribe_stop_rx = subscribe_stop_tx.subscribe(); let block_map = block_map.clone(); + if *subscribe_stop_rx.borrow_and_update() { + return Err(anyhow::anyhow!( + "exiting subscribe_new_heads before even starting on {}", + self + ) + .into()); + } + let f = async move { clone .subscribe_new_heads(block_and_rpc_sender.clone(), block_map, subscribe_stop_rx) @@ -818,7 +877,15 @@ impl Web3Rpc { // subscribe to new transactions if let Some(pending_txid_firehose) = pending_txid_firehose.clone() { let clone = self.clone(); - let subscribe_stop_rx = subscribe_stop_tx.subscribe(); + let mut subscribe_stop_rx = subscribe_stop_tx.subscribe(); + + if *subscribe_stop_rx.borrow_and_update() { + return Err(anyhow::anyhow!( + "exiting subscribe_new_heads before even starting on {}", + self + ) + .into()); + } let f = async move { clone @@ -877,19 +944,23 @@ impl Web3Rpc { // TODO: only subscribe if a user has subscribed let mut pending_txs_sub = ws_provider.subscribe_pending_txs().await?; - while let Some(x) = pending_txs_sub.next().await { - // TODO: check this less often - if *subscribe_stop_rx.borrow() { - // TODO: this is checking way too often. have this on a timer instead - trace!("stopping ws block subscription on {}", self); - break; + loop { + select! { + x = subscribe_stop_rx.changed() => { + if x.is_err() || *subscribe_stop_rx.borrow_and_update() { + break; + } + } + x = pending_txs_sub.next() => { + if let Some(x) = x { + pending_txid_firehose.send(x).await; + } + } } - - pending_txid_firehose.send(x).await; } } else { // only websockets subscribe to pending transactions - // its possibel to do with http, but not recommended + // its possible to do with http, but not recommended // TODO: what should we do here? loop { if *subscribe_stop_rx.borrow_and_update() { @@ -942,16 +1013,21 @@ impl Web3Rpc { self.send_head_block_result(latest_block, &block_sender, &block_map) .await?; - while let Some(block) = blocks.next().await { - if *subscribe_stop_rx.borrow() { - trace!("stopping ws block subscription on {}", self); - break; + loop { + select! { + x = subscribe_stop_rx.changed() => { + if x.is_err() || *subscribe_stop_rx.borrow_and_update() { + trace!(%self, "stopping websocket block subscription"); + break; + } + } + block = blocks.next() => { + let block = block.map(Arc::new); + + self.send_head_block_result(Ok(block), &block_sender, &block_map) + .await?; + } } - - let block = Arc::new(block); - - self.send_head_block_result(Ok(Some(block)), &block_sender, &block_map) - .await?; } } else if self.http_client.is_some() { // there is a "watch_blocks" function, but a lot of public nodes (including llamanodes) do not support the necessary rpc endpoints @@ -960,11 +1036,6 @@ impl Web3Rpc { i.set_missed_tick_behavior(MissedTickBehavior::Delay); loop { - if *subscribe_stop_rx.borrow_and_update() { - trace!(%self, "stopping http block subscription"); - break; - } - let block_result = self .internal_request::<_, Option>( "eth_getBlockByNumber", @@ -977,7 +1048,16 @@ impl Web3Rpc { self.send_head_block_result(block_result, &block_sender, &block_map) .await?; - i.tick().await; + // TODO: should this select be at the start or end of the loop? + select! { + x = subscribe_stop_rx.changed() => { + if x.is_err() || *subscribe_stop_rx.borrow_and_update() { + trace!(%self, "stopping http block subscription"); + break; + } + } + _ = i.tick() => {} + } } } else { return Err(anyhow!("no ws or http provider!").into()); @@ -999,9 +1079,13 @@ impl Web3Rpc { self: &Arc, web3_request: &Arc, error_handler: Option, + allow_unhealthy: bool, ) -> Web3ProxyResult { loop { - match self.try_request_handle(web3_request, error_handler).await { + match self + .try_request_handle(web3_request, error_handler, allow_unhealthy) + .await + { Ok(OpenRequestResult::Handle(handle)) => return Ok(handle), Ok(OpenRequestResult::RetryAt(retry_at)) => { // TODO: emit a stat? @@ -1137,10 +1221,15 @@ impl Web3Rpc { self: &Arc, web3_request: &Arc, error_handler: Option, + allow_unhealthy: bool, ) -> Web3ProxyResult { - // TODO: check a boolean set by health checks? // TODO: if websocket is reconnecting, return an error? + // TODO: what if this is a health check?! + if !(self.healthy.load(atomic::Ordering::Relaxed) || allow_unhealthy) { + return Ok(OpenRequestResult::Failed); + } + // make sure this block has the oldest block that this request needs // TODO: should this check be optional? we've probably already done it for RpcForRuest::inner. for now its fine to duplicate the check if let Some(block_needed) = web3_request.min_block_needed() { @@ -1230,16 +1319,20 @@ impl Web3Rpc { // TODO: think about this more. its hard to do this without being self-referenctial! let web3_request = Web3Request::new_internal(method.into(), params, None, max_wait).await?; - self.authorized_request(&web3_request, error_handler).await + // TODO: if we are inside the health checks and we aren't healthy yet. we need some sort of flag to force try_handle to not error + + self.authorized_request(&web3_request, error_handler, true) + .await } pub async fn authorized_request( self: &Arc, web3_request: &Arc, error_handler: Option, + allow_unhealthy: bool, ) -> Web3ProxyResult { let handle = self - .wait_for_request_handle(web3_request, error_handler) + .wait_for_request_handle(web3_request, error_handler, allow_unhealthy) .await?; let response = handle.request().await?; @@ -1295,7 +1388,7 @@ impl Serialize for Web3Rpc { S: Serializer, { // 14 if we bring head_delay back - let mut state = serializer.serialize_struct("Web3Rpc", 13)?; + let mut state = serializer.serialize_struct("Web3Rpc", 14)?; // the url is excluded because it likely includes private information. just show the name that we use in keys state.serialize_field("name", &self.name)?; @@ -1365,6 +1458,10 @@ impl Serialize for Web3Rpc { let weighted_latency_ms = self.weighted_peak_latency().as_secs_f32() * 1000.0; state.serialize_field("weighted_latency_ms", &weighted_latency_ms)?; } + { + let healthy = self.healthy.load(atomic::Ordering::Relaxed); + state.serialize_field("healthy", &healthy)?; + } state.end() }