From 6eff5e3bf1e703d477fdb26d59fa428a7ded333a Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Wed, 15 Feb 2023 15:31:59 -0800 Subject: [PATCH] health check and latency recordings --- web3_proxy/src/rpcs/one.rs | 94 ++++++++++++++++++++++++++++++---- web3_proxy/src/rpcs/request.rs | 11 ++-- 2 files changed, 88 insertions(+), 17 deletions(-) diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index 8bc94243..7603f7e6 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -5,9 +5,10 @@ use super::request::{OpenRequestHandle, OpenRequestResult}; use crate::app::{flatten_handle, AnyhowJoinHandle}; use crate::config::{BlockAndRpc, Web3RpcConfig}; use crate::frontend::authorization::Authorization; +use crate::rpcs::request::RequestRevertHandler; use anyhow::{anyhow, Context}; use ethers::prelude::{Bytes, Middleware, ProviderError, TxHash, H256, U64}; -use ethers::types::U256; +use ethers::types::{Transaction, U256}; use futures::future::try_join_all; use futures::StreamExt; use hdrhistogram::Histogram; @@ -128,6 +129,7 @@ pub struct Web3Rpc { pub(super) head_latency: RwLock, /// Track request latency pub(super) request_latency: RwLock, + pub(super) request_latency_sender: Option>, /// Track total requests served /// TODO: maybe move this to graphana pub(super) total_requests: AtomicUsize, @@ -215,6 +217,9 @@ impl Web3Rpc { } } + // TODO: what max capacity? + let (request_latency_sender, request_latency_receiver) = flume::bounded(10_000); + let new_connection = Self { name, db_conn: db_conn.clone(), @@ -229,6 +234,7 @@ impl Web3Rpc { backup, block_data_limit, tier: config.tier, + request_latency_sender: Some(request_latency_sender), ..Default::default() }; @@ -248,6 +254,7 @@ impl Web3Rpc { block_sender, chain_id, http_interval_sender, + request_latency_receiver, reconnect, tx_id_sender, ) @@ -634,9 +641,18 @@ impl Web3Rpc { block_sender: Option>, chain_id: u64, http_interval_sender: Option>>, + request_latency_receiver: flume::Receiver, reconnect: bool, tx_id_sender: Option)>>, ) -> anyhow::Result<()> { + let request_latency_receiver = Arc::new(request_latency_receiver); + + let revert_handler = if self.backup { + RequestRevertHandler::DebugLevel + } else { + RequestRevertHandler::ErrorLevel + }; + loop { let http_interval_receiver = http_interval_sender.as_ref().map(|x| x.subscribe()); @@ -662,24 +678,63 @@ impl Web3Rpc { // provider is ready ready_tx.send(()).unwrap(); - // TODO: how often? - // TODO: reset this timeout every time a new block is seen + // TODO: how often? different depending on the chain? + // TODO: reset this timeout when a new block is seen? we need to keep request_latency updated though let health_sleep_seconds = 10; - // wait before doing the initial health check - sleep(Duration::from_secs(health_sleep_seconds)).await; + let mut old_total_requests = 0; + let mut new_total_requests; loop { + sleep(Duration::from_secs(health_sleep_seconds)).await; + // TODO: what if we just happened to have this check line up with another restart? // TODO: think more about this if let Some(client) = &*conn.provider.read().await { - // trace!("health check unlocked with error on {}", conn); - // returning error will trigger a reconnect - // also, do the health check as a way of keeping this rpc's request_ewma accurate - // TODO: do a query of some kind - } + // health check as a way of keeping this rpc's request_ewma accurate + // TODO: do something different if this is a backup server? - sleep(Duration::from_secs(health_sleep_seconds)).await; + new_total_requests = + conn.total_requests.load(atomic::Ordering::Relaxed); + + if new_total_requests - old_total_requests < 10 { + // TODO: if this fails too many times, reset the connection + let head_block = conn.head_block.read().clone(); + + if let Some((block_hash, txid)) = head_block.and_then(|x| { + let block = x.block.clone(); + + let block_hash = block.hash?; + let txid = block.transactions.last().cloned()?; + + Some((block_hash, txid)) + }) { + let authorization = authorization.clone(); + let conn = conn.clone(); + let x = async move { + conn.try_request_handle(&authorization, None).await + }; + + if let Ok(OpenRequestResult::Handle(x)) = x.await { + if let Ok(Some(x)) = x + .request::<_, Option>( + "eth_getTransactionByHash", + &txid, + revert_handler, + None, + ) + .await + { + // TODO: make this flatter + // TODO: do more (fair, not random) things here + // let = x.request("eth_getCode", (tx.to.unwrap_or(Address::zero()), block_hash), RequestRevertHandler::ErrorLevel, Some(client.clone())) + } + } + } + } + + old_total_requests = new_total_requests; + } } }; @@ -708,6 +763,23 @@ impl Web3Rpc { futures.push(flatten_handle(tokio::spawn(f))); } + { + let conn = self.clone(); + let request_latency_receiver = request_latency_receiver.clone(); + + let f = async move { + while let Ok(latency) = request_latency_receiver.recv_async().await { + conn.request_latency + .write() + .record(latency.as_secs_f64() * 1000.0); + } + + Ok(()) + }; + + futures.push(flatten_handle(tokio::spawn(f))); + } + match try_join_all(futures).await { Ok(_) => { // futures all exited without error. break instead of restarting subscriptions diff --git a/web3_proxy/src/rpcs/request.rs b/web3_proxy/src/rpcs/request.rs index 7a2d735d..a4897579 100644 --- a/web3_proxy/src/rpcs/request.rs +++ b/web3_proxy/src/rpcs/request.rs @@ -33,6 +33,7 @@ pub struct OpenRequestHandle { } /// Depending on the context, RPC errors can require different handling. +#[derive(Copy, Clone)] pub enum RequestRevertHandler { /// Log at the trace level. Use when errors are expected. TraceLevel, @@ -373,12 +374,10 @@ impl OpenRequestHandle { tokio::spawn(f); } } - } else { - // TODO: locking now will slow us down. send latency into a channel instead - self.rpc - .request_latency - .write() - .record(start.elapsed().as_secs_f64() * 1000.0); + } else if let Some(x) = self.rpc.request_latency_sender.as_ref() { + if let Err(err) = x.send(start.elapsed()) { + error!("no request latency sender! {:#?}", err); + } } response