health check and latency recordings
This commit is contained in:
parent
774bd5b232
commit
056670d29a
@ -5,9 +5,10 @@ use super::request::{OpenRequestHandle, OpenRequestResult};
|
||||
use crate::app::{flatten_handle, AnyhowJoinHandle};
|
||||
use crate::config::{BlockAndRpc, Web3RpcConfig};
|
||||
use crate::frontend::authorization::Authorization;
|
||||
use crate::rpcs::request::RequestRevertHandler;
|
||||
use anyhow::{anyhow, Context};
|
||||
use ethers::prelude::{Bytes, Middleware, ProviderError, TxHash, H256, U64};
|
||||
use ethers::types::U256;
|
||||
use ethers::types::{Transaction, U256};
|
||||
use futures::future::try_join_all;
|
||||
use futures::StreamExt;
|
||||
use hdrhistogram::Histogram;
|
||||
@ -128,6 +129,7 @@ pub struct Web3Rpc {
|
||||
pub(super) head_latency: RwLock<Latency>,
|
||||
/// Track request latency
|
||||
pub(super) request_latency: RwLock<Latency>,
|
||||
pub(super) request_latency_sender: Option<flume::Sender<Duration>>,
|
||||
/// Track total requests served
|
||||
/// TODO: maybe move this to graphana
|
||||
pub(super) total_requests: AtomicUsize,
|
||||
@ -215,6 +217,9 @@ impl Web3Rpc {
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: what max capacity?
|
||||
let (request_latency_sender, request_latency_receiver) = flume::bounded(10_000);
|
||||
|
||||
let new_connection = Self {
|
||||
name,
|
||||
db_conn: db_conn.clone(),
|
||||
@ -229,6 +234,7 @@ impl Web3Rpc {
|
||||
backup,
|
||||
block_data_limit,
|
||||
tier: config.tier,
|
||||
request_latency_sender: Some(request_latency_sender),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
@ -248,6 +254,7 @@ impl Web3Rpc {
|
||||
block_sender,
|
||||
chain_id,
|
||||
http_interval_sender,
|
||||
request_latency_receiver,
|
||||
reconnect,
|
||||
tx_id_sender,
|
||||
)
|
||||
@ -634,9 +641,18 @@ impl Web3Rpc {
|
||||
block_sender: Option<flume::Sender<BlockAndRpc>>,
|
||||
chain_id: u64,
|
||||
http_interval_sender: Option<Arc<broadcast::Sender<()>>>,
|
||||
request_latency_receiver: flume::Receiver<Duration>,
|
||||
reconnect: bool,
|
||||
tx_id_sender: Option<flume::Sender<(TxHash, Arc<Self>)>>,
|
||||
) -> anyhow::Result<()> {
|
||||
let request_latency_receiver = Arc::new(request_latency_receiver);
|
||||
|
||||
let revert_handler = if self.backup {
|
||||
RequestRevertHandler::DebugLevel
|
||||
} else {
|
||||
RequestRevertHandler::ErrorLevel
|
||||
};
|
||||
|
||||
loop {
|
||||
let http_interval_receiver = http_interval_sender.as_ref().map(|x| x.subscribe());
|
||||
|
||||
@ -662,24 +678,63 @@ impl Web3Rpc {
|
||||
// provider is ready
|
||||
ready_tx.send(()).unwrap();
|
||||
|
||||
// TODO: how often?
|
||||
// TODO: reset this timeout every time a new block is seen
|
||||
// TODO: how often? different depending on the chain?
|
||||
// TODO: reset this timeout when a new block is seen? we need to keep request_latency updated though
|
||||
let health_sleep_seconds = 10;
|
||||
|
||||
// wait before doing the initial health check
|
||||
sleep(Duration::from_secs(health_sleep_seconds)).await;
|
||||
let mut old_total_requests = 0;
|
||||
let mut new_total_requests;
|
||||
|
||||
loop {
|
||||
sleep(Duration::from_secs(health_sleep_seconds)).await;
|
||||
|
||||
// TODO: what if we just happened to have this check line up with another restart?
|
||||
// TODO: think more about this
|
||||
if let Some(client) = &*conn.provider.read().await {
|
||||
// trace!("health check unlocked with error on {}", conn);
|
||||
// returning error will trigger a reconnect
|
||||
// also, do the health check as a way of keeping this rpc's request_ewma accurate
|
||||
// TODO: do a query of some kind
|
||||
}
|
||||
// health check as a way of keeping this rpc's request_ewma accurate
|
||||
// TODO: do something different if this is a backup server?
|
||||
|
||||
sleep(Duration::from_secs(health_sleep_seconds)).await;
|
||||
new_total_requests =
|
||||
conn.total_requests.load(atomic::Ordering::Relaxed);
|
||||
|
||||
if new_total_requests - old_total_requests < 10 {
|
||||
// TODO: if this fails too many times, reset the connection
|
||||
let head_block = conn.head_block.read().clone();
|
||||
|
||||
if let Some((block_hash, txid)) = head_block.and_then(|x| {
|
||||
let block = x.block.clone();
|
||||
|
||||
let block_hash = block.hash?;
|
||||
let txid = block.transactions.last().cloned()?;
|
||||
|
||||
Some((block_hash, txid))
|
||||
}) {
|
||||
let authorization = authorization.clone();
|
||||
let conn = conn.clone();
|
||||
let x = async move {
|
||||
conn.try_request_handle(&authorization, None).await
|
||||
};
|
||||
|
||||
if let Ok(OpenRequestResult::Handle(x)) = x.await {
|
||||
if let Ok(Some(x)) = x
|
||||
.request::<_, Option<Transaction>>(
|
||||
"eth_getTransactionByHash",
|
||||
&txid,
|
||||
revert_handler,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
{
|
||||
// TODO: make this flatter
|
||||
// TODO: do more (fair, not random) things here
|
||||
// let = x.request("eth_getCode", (tx.to.unwrap_or(Address::zero()), block_hash), RequestRevertHandler::ErrorLevel, Some(client.clone()))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
old_total_requests = new_total_requests;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
@ -708,6 +763,23 @@ impl Web3Rpc {
|
||||
futures.push(flatten_handle(tokio::spawn(f)));
|
||||
}
|
||||
|
||||
{
|
||||
let conn = self.clone();
|
||||
let request_latency_receiver = request_latency_receiver.clone();
|
||||
|
||||
let f = async move {
|
||||
while let Ok(latency) = request_latency_receiver.recv_async().await {
|
||||
conn.request_latency
|
||||
.write()
|
||||
.record(latency.as_secs_f64() * 1000.0);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
};
|
||||
|
||||
futures.push(flatten_handle(tokio::spawn(f)));
|
||||
}
|
||||
|
||||
match try_join_all(futures).await {
|
||||
Ok(_) => {
|
||||
// futures all exited without error. break instead of restarting subscriptions
|
||||
|
@ -33,6 +33,7 @@ pub struct OpenRequestHandle {
|
||||
}
|
||||
|
||||
/// Depending on the context, RPC errors can require different handling.
|
||||
#[derive(Copy, Clone)]
|
||||
pub enum RequestRevertHandler {
|
||||
/// Log at the trace level. Use when errors are expected.
|
||||
TraceLevel,
|
||||
@ -373,12 +374,10 @@ impl OpenRequestHandle {
|
||||
tokio::spawn(f);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// TODO: locking now will slow us down. send latency into a channel instead
|
||||
self.rpc
|
||||
.request_latency
|
||||
.write()
|
||||
.record(start.elapsed().as_secs_f64() * 1000.0);
|
||||
} else if let Some(x) = self.rpc.request_latency_sender.as_ref() {
|
||||
if let Err(err) = x.send(start.elapsed()) {
|
||||
error!("no request latency sender! {:#?}", err);
|
||||
}
|
||||
}
|
||||
|
||||
response
|
||||
|
Loading…
Reference in New Issue
Block a user