yes health, no channel

This commit is contained in:
Bryan Stitt 2023-02-15 15:52:42 -08:00 committed by yenicelik
parent 688cc62460
commit 1b7050d294
2 changed files with 35 additions and 59 deletions

View File

@ -129,7 +129,6 @@ pub struct Web3Rpc {
pub(super) head_latency: RwLock<Latency>, pub(super) head_latency: RwLock<Latency>,
/// Track request latency /// Track request latency
pub(super) request_latency: RwLock<Latency>, pub(super) request_latency: RwLock<Latency>,
pub(super) request_latency_sender: Option<flume::Sender<Duration>>,
/// Track total requests served /// Track total requests served
/// TODO: maybe move this to graphana /// TODO: maybe move this to graphana
pub(super) total_requests: AtomicUsize, pub(super) total_requests: AtomicUsize,
@ -217,9 +216,6 @@ impl Web3Rpc {
} }
} }
// TODO: what max capacity?
let (request_latency_sender, request_latency_receiver) = flume::bounded(10_000);
let new_connection = Self { let new_connection = Self {
name, name,
db_conn: db_conn.clone(), db_conn: db_conn.clone(),
@ -234,7 +230,6 @@ impl Web3Rpc {
backup, backup,
block_data_limit, block_data_limit,
tier: config.tier, tier: config.tier,
request_latency_sender: Some(request_latency_sender),
..Default::default() ..Default::default()
}; };
@ -254,7 +249,6 @@ impl Web3Rpc {
block_sender, block_sender,
chain_id, chain_id,
http_interval_sender, http_interval_sender,
request_latency_receiver,
reconnect, reconnect,
tx_id_sender, tx_id_sender,
) )
@ -641,12 +635,9 @@ impl Web3Rpc {
block_sender: Option<flume::Sender<BlockAndRpc>>, block_sender: Option<flume::Sender<BlockAndRpc>>,
chain_id: u64, chain_id: u64,
http_interval_sender: Option<Arc<broadcast::Sender<()>>>, http_interval_sender: Option<Arc<broadcast::Sender<()>>>,
request_latency_receiver: flume::Receiver<Duration>,
reconnect: bool, reconnect: bool,
tx_id_sender: Option<flume::Sender<(TxHash, Arc<Self>)>>, tx_id_sender: Option<flume::Sender<(TxHash, Arc<Self>)>>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let request_latency_receiver = Arc::new(request_latency_receiver);
let revert_handler = if self.backup { let revert_handler = if self.backup {
RequestRevertHandler::DebugLevel RequestRevertHandler::DebugLevel
} else { } else {
@ -699,40 +690,40 @@ impl Web3Rpc {
if new_total_requests - old_total_requests < 10 { if new_total_requests - old_total_requests < 10 {
// TODO: if this fails too many times, reset the connection // TODO: if this fails too many times, reset the connection
// let head_block = conn.head_block.read().clone(); let head_block = conn.head_block.read().clone();
// if let Some((block_hash, txid)) = head_block.and_then(|x| { if let Some((block_hash, txid)) = head_block.and_then(|x| {
// let block = x.block.clone(); let block = x.block.clone();
// let block_hash = block.hash?; let block_hash = block.hash?;
// let txid = block.transactions.last().cloned()?; let txid = block.transactions.last().cloned()?;
// Some((block_hash, txid)) Some((block_hash, txid))
// }) { }) {
// let authorization = authorization.clone(); let authorization = authorization.clone();
// let conn = conn.clone(); let conn = conn.clone();
// let x = async move { let x = async move {
// conn.try_request_handle(&authorization, Some(client)).await conn.try_request_handle(&authorization, Some(client)).await
// } }
// .await; .await;
// if let Ok(OpenRequestResult::Handle(x)) = x { if let Ok(OpenRequestResult::Handle(x)) = x {
// if let Ok(Some(x)) = x if let Ok(Some(x)) = x
// .request::<_, Option<Transaction>>( .request::<_, Option<Transaction>>(
// "eth_getTransactionByHash", "eth_getTransactionByHash",
// &txid, &txid,
// revert_handler, revert_handler,
// None, None,
// ) )
// .await .await
// { {
// // TODO: make this flatter // TODO: make this flatter
// // TODO: do more (fair, not random) things here // TODO: do more (fair, not random) things here
// // let = x.request("eth_getCode", (tx.to.unwrap_or(Address::zero()), block_hash), RequestRevertHandler::ErrorLevel, Some(client.clone())) // let = x.request("eth_getCode", (tx.to.unwrap_or(Address::zero()), block_hash), RequestRevertHandler::ErrorLevel, Some(client.clone()))
// } }
// } }
// } }
} }
old_total_requests = new_total_requests; old_total_requests = new_total_requests;
@ -765,23 +756,6 @@ impl Web3Rpc {
futures.push(flatten_handle(tokio::spawn(f))); futures.push(flatten_handle(tokio::spawn(f)));
} }
{
let conn = self.clone();
let request_latency_receiver = request_latency_receiver.clone();
let f = async move {
while let Ok(latency) = request_latency_receiver.recv_async().await {
conn.request_latency
.write()
.record(latency.as_secs_f64() * 1000.0);
}
Ok(())
};
futures.push(flatten_handle(tokio::spawn(f)));
}
match try_join_all(futures).await { match try_join_all(futures).await {
Ok(_) => { Ok(_) => {
// futures all exited without error. break instead of restarting subscriptions // futures all exited without error. break instead of restarting subscriptions

View File

@ -374,10 +374,12 @@ impl OpenRequestHandle {
tokio::spawn(f); tokio::spawn(f);
} }
} }
} else if let Some(x) = self.rpc.request_latency_sender.as_ref() { } else {
if let Err(err) = x.send(start.elapsed()) { let latency_ms = start.elapsed().as_secs_f64() * 1000.0;
error!("no request latency sender! {:#?}", err);
} let mut latency_recording = self.rpc.request_latency.write();
latency_recording.record(latency_ms);
} }
response response