From 3c9576c13bc9421f8c94b423e645e287c9628639 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Wed, 15 Feb 2023 15:52:42 -0800 Subject: [PATCH] yes health, no channel --- web3_proxy/src/rpcs/one.rs | 84 ++++++++++++---------------------- web3_proxy/src/rpcs/request.rs | 10 ++-- 2 files changed, 35 insertions(+), 59 deletions(-) diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index 8fc1b656..b4d6cebb 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -129,7 +129,6 @@ pub struct Web3Rpc { pub(super) head_latency: RwLock, /// Track request latency pub(super) request_latency: RwLock, - pub(super) request_latency_sender: Option>, /// Track total requests served /// TODO: maybe move this to graphana pub(super) total_requests: AtomicUsize, @@ -217,9 +216,6 @@ impl Web3Rpc { } } - // TODO: what max capacity? - let (request_latency_sender, request_latency_receiver) = flume::bounded(10_000); - let new_connection = Self { name, db_conn: db_conn.clone(), @@ -234,7 +230,6 @@ impl Web3Rpc { backup, block_data_limit, tier: config.tier, - request_latency_sender: Some(request_latency_sender), ..Default::default() }; @@ -254,7 +249,6 @@ impl Web3Rpc { block_sender, chain_id, http_interval_sender, - request_latency_receiver, reconnect, tx_id_sender, ) @@ -641,12 +635,9 @@ impl Web3Rpc { block_sender: Option>, chain_id: u64, http_interval_sender: Option>>, - request_latency_receiver: flume::Receiver, reconnect: bool, tx_id_sender: Option)>>, ) -> anyhow::Result<()> { - let request_latency_receiver = Arc::new(request_latency_receiver); - let revert_handler = if self.backup { RequestRevertHandler::DebugLevel } else { @@ -699,40 +690,40 @@ impl Web3Rpc { if new_total_requests - old_total_requests < 10 { // TODO: if this fails too many times, reset the connection - // let head_block = conn.head_block.read().clone(); + let head_block = conn.head_block.read().clone(); - // if let Some((block_hash, txid)) = head_block.and_then(|x| { - // let block = x.block.clone(); + if let Some((block_hash, txid)) = head_block.and_then(|x| { + let block = x.block.clone(); - // let block_hash = block.hash?; - // let txid = block.transactions.last().cloned()?; + let block_hash = block.hash?; + let txid = block.transactions.last().cloned()?; - // Some((block_hash, txid)) - // }) { - // let authorization = authorization.clone(); - // let conn = conn.clone(); + Some((block_hash, txid)) + }) { + let authorization = authorization.clone(); + let conn = conn.clone(); - // let x = async move { - // conn.try_request_handle(&authorization, Some(client)).await - // } - // .await; + let x = async move { + conn.try_request_handle(&authorization, Some(client)).await + } + .await; - // if let Ok(OpenRequestResult::Handle(x)) = x { - // if let Ok(Some(x)) = x - // .request::<_, Option>( - // "eth_getTransactionByHash", - // &txid, - // revert_handler, - // None, - // ) - // .await - // { - // // TODO: make this flatter - // // TODO: do more (fair, not random) things here - // // let = x.request("eth_getCode", (tx.to.unwrap_or(Address::zero()), block_hash), RequestRevertHandler::ErrorLevel, Some(client.clone())) - // } - // } - // } + if let Ok(OpenRequestResult::Handle(x)) = x { + if let Ok(Some(x)) = x + .request::<_, Option>( + "eth_getTransactionByHash", + &txid, + revert_handler, + None, + ) + .await + { + // TODO: make this flatter + // TODO: do more (fair, not random) things here + // let = x.request("eth_getCode", (tx.to.unwrap_or(Address::zero()), block_hash), RequestRevertHandler::ErrorLevel, Some(client.clone())) + } + } + } } old_total_requests = new_total_requests; @@ -765,23 +756,6 @@ impl Web3Rpc { futures.push(flatten_handle(tokio::spawn(f))); } - { - let conn = self.clone(); - let request_latency_receiver = request_latency_receiver.clone(); - - let f = async move { - while let Ok(latency) = request_latency_receiver.recv_async().await { - conn.request_latency - .write() - .record(latency.as_secs_f64() * 1000.0); - } - - Ok(()) - }; - - futures.push(flatten_handle(tokio::spawn(f))); - } - match try_join_all(futures).await { Ok(_) => { // futures all exited without error. break instead of restarting subscriptions diff --git a/web3_proxy/src/rpcs/request.rs b/web3_proxy/src/rpcs/request.rs index a4897579..11383b08 100644 --- a/web3_proxy/src/rpcs/request.rs +++ b/web3_proxy/src/rpcs/request.rs @@ -374,10 +374,12 @@ impl OpenRequestHandle { tokio::spawn(f); } } - } else if let Some(x) = self.rpc.request_latency_sender.as_ref() { - if let Err(err) = x.send(start.elapsed()) { - error!("no request latency sender! {:#?}", err); - } + } else { + let latency_ms = start.elapsed().as_secs_f64() * 1000.0; + + let mut latency_recording = self.rpc.request_latency.write(); + + latency_recording.record(latency_ms); } response