diff --git a/web3_proxy/src/atomics.rs b/web3_proxy/src/atomics.rs new file mode 100644 index 00000000..8b0e8e5e --- /dev/null +++ b/web3_proxy/src/atomics.rs @@ -0,0 +1,22 @@ +use std::sync::atomic::{AtomicU64, Ordering}; + +pub struct AtomicF64 { + storage: AtomicU64, +} + +impl AtomicF64 { + pub fn new(value: f64) -> Self { + let as_u64 = value.to_bits(); + Self { + storage: AtomicU64::new(as_u64), + } + } + pub fn store(&self, value: f64, ordering: Ordering) { + let as_u64 = value.to_bits(); + self.storage.store(as_u64, ordering) + } + pub fn load(&self, ordering: Ordering) -> f64 { + let as_u64 = self.storage.load(ordering); + f64::from_bits(as_u64) + } +} diff --git a/web3_proxy/src/lib.rs b/web3_proxy/src/lib.rs index aab98d57..e31d0972 100644 --- a/web3_proxy/src/lib.rs +++ b/web3_proxy/src/lib.rs @@ -1,6 +1,7 @@ pub mod app; pub mod app_stats; pub mod admin_queries; +pub mod atomics; pub mod block_number; pub mod config; pub mod frontend; diff --git a/web3_proxy/src/peak_ewma.rs b/web3_proxy/src/peak_ewma.rs new file mode 100644 index 00000000..9adb34d9 --- /dev/null +++ b/web3_proxy/src/peak_ewma.rs @@ -0,0 +1,397 @@ +//! Code from [tower](https://github.com/tower-rs/tower/blob/3f31ffd2cf15f1e905142e5f43ab39ac995c22ed/tower/src/load/peak_ewma.rs) +//! Measures load using the PeakEWMA response latency. +//! TODO: refactor to work with our code + +use std::task::{Context, Poll}; +use std::{ + sync::{Arc, Mutex}, + time::Duration, +}; +use tokio::time::Instant; +use tower_service::Service; +use tracing::trace; + +/// Measures the load of the underlying service using Peak-EWMA load measurement. +/// +/// [`PeakEwma`] implements [`Load`] with the [`Cost`] metric that estimates the amount of +/// pending work to an endpoint. Work is calculated by multiplying the +/// exponentially-weighted moving average (EWMA) of response latencies by the number of +/// pending requests. The Peak-EWMA algorithm is designed to be especially sensitive to +/// worst-case latencies. Over time, the peak latency value decays towards the moving +/// average of latencies to the endpoint. +/// +/// When no latency information has been measured for an endpoint, an arbitrary default +/// RTT of 1 second is used to prevent the endpoint from being overloaded before a +/// meaningful baseline can be established.. +/// +/// ## Note +/// +/// This is derived from [Finagle][finagle], which is distributed under the Apache V2 +/// license. Copyright 2017, Twitter Inc. +/// +/// [finagle]: +/// https://github.com/twitter/finagle/blob/9cc08d15216497bb03a1cafda96b7266cfbbcff1/finagle-core/src/main/scala/com/twitter/finagle/loadbalancer/PeakEwma.scala +#[derive(Debug)] +pub struct PeakEwma { + service: S, + decay_ns: f64, + rtt_estimate: Arc>, + completion: C, +} + +#[cfg(feature = "discover")] +pin_project! { + /// Wraps a `D`-typed stream of discovered services with `PeakEwma`. + #[cfg_attr(docsrs, doc(cfg(feature = "discover")))] + #[derive(Debug)] + pub struct PeakEwmaDiscover { + #[pin] + discover: D, + decay_ns: f64, + default_rtt: Duration, + completion: C, + } +} + +/// Represents the relative cost of communicating with a service. +/// +/// The underlying value estimates the amount of pending work to a service: the Peak-EWMA +/// latency estimate multiplied by the number of pending requests. +#[derive(Copy, Clone, Debug, PartialEq, PartialOrd)] +pub struct Cost(f64); + +/// Tracks an in-flight request and updates the RTT-estimate on Drop. +#[derive(Debug)] +pub struct Handle { + sent_at: Instant, + decay_ns: f64, + rtt_estimate: Arc>, +} + +/// Holds the current RTT estimate and the last time this value was updated. +#[derive(Debug)] +struct RttEstimate { + update_at: Instant, + rtt_ns: f64, +} + +const NANOS_PER_MILLI: f64 = 1_000_000.0; + +// ===== impl PeakEwma ===== + +impl PeakEwma { + /// Wraps an `S`-typed service so that its load is tracked by the EWMA of its peak latency. + pub fn new(service: S, default_rtt: Duration, decay_ns: f64, completion: C) -> Self { + debug_assert!(decay_ns > 0.0, "decay_ns must be positive"); + Self { + service, + decay_ns, + rtt_estimate: Arc::new(Mutex::new(RttEstimate::new(nanos(default_rtt)))), + completion, + } + } + + fn handle(&self) -> Handle { + Handle { + decay_ns: self.decay_ns, + sent_at: Instant::now(), + rtt_estimate: self.rtt_estimate.clone(), + } + } +} + +impl Service for PeakEwma +where + S: Service, + C: TrackCompletion, +{ + type Response = C::Output; + type Error = S::Error; + type Future = TrackCompletionFuture; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.service.poll_ready(cx) + } + + fn call(&mut self, req: Request) -> Self::Future { + TrackCompletionFuture::new( + self.completion.clone(), + self.handle(), + self.service.call(req), + ) + } +} + +impl Load for PeakEwma { + type Metric = Cost; + + fn load(&self) -> Self::Metric { + let pending = Arc::strong_count(&self.rtt_estimate) as u32 - 1; + + // Update the RTT estimate to account for decay since the last update. + // If an estimate has not been established, a default is provided + let estimate = self.update_estimate(); + + let cost = Cost(estimate * f64::from(pending + 1)); + trace!( + "load estimate={:.0}ms pending={} cost={:?}", + estimate / NANOS_PER_MILLI, + pending, + cost, + ); + cost + } +} + +impl PeakEwma { + fn update_estimate(&self) -> f64 { + let mut rtt = self.rtt_estimate.lock().expect("peak ewma prior_estimate"); + rtt.decay(self.decay_ns) + } +} + +// ===== impl PeakEwmaDiscover ===== + +#[cfg(feature = "discover")] +impl PeakEwmaDiscover { + /// Wraps a `D`-typed [`Discover`] so that services have a [`PeakEwma`] load metric. + /// + /// The provided `default_rtt` is used as the default RTT estimate for newly + /// added services. + /// + /// They `decay` value determines over what time period a RTT estimate should + /// decay. + pub fn new(discover: D, default_rtt: Duration, decay: Duration, completion: C) -> Self + where + D: Discover, + D::Service: Service, + C: TrackCompletion>::Response>, + { + PeakEwmaDiscover { + discover, + decay_ns: nanos(decay), + default_rtt, + completion, + } + } +} + +#[cfg(feature = "discover")] +impl Stream for PeakEwmaDiscover +where + D: Discover, + C: Clone, +{ + type Item = Result>, D::Error>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + let change = match ready!(this.discover.poll_discover(cx)).transpose()? { + None => return Poll::Ready(None), + Some(Change::Remove(k)) => Change::Remove(k), + Some(Change::Insert(k, svc)) => { + let peak_ewma = PeakEwma::new( + svc, + *this.default_rtt, + *this.decay_ns, + this.completion.clone(), + ); + Change::Insert(k, peak_ewma) + } + }; + + Poll::Ready(Some(Ok(change))) + } +} + +// ===== impl RttEstimate ===== + +impl RttEstimate { + fn new(rtt_ns: f64) -> Self { + debug_assert!(0.0 < rtt_ns, "rtt must be positive"); + Self { + rtt_ns, + update_at: Instant::now(), + } + } + + /// Decays the RTT estimate with a decay period of `decay_ns`. + fn decay(&mut self, decay_ns: f64) -> f64 { + // Updates with a 0 duration so that the estimate decays towards 0. + let now = Instant::now(); + self.update(now, now, decay_ns) + } + + /// Updates the Peak-EWMA RTT estimate. + /// + /// The elapsed time from `sent_at` to `recv_at` is added + fn update(&mut self, sent_at: Instant, recv_at: Instant, decay_ns: f64) -> f64 { + debug_assert!( + sent_at <= recv_at, + "recv_at={:?} after sent_at={:?}", + recv_at, + sent_at + ); + let rtt = nanos(recv_at.saturating_duration_since(sent_at)); + + let now = Instant::now(); + debug_assert!( + self.update_at <= now, + "update_at={:?} in the future", + self.update_at + ); + + self.rtt_ns = if self.rtt_ns < rtt { + // For Peak-EWMA, always use the worst-case (peak) value as the estimate for + // subsequent requests. + trace!( + "update peak rtt={}ms prior={}ms", + rtt / NANOS_PER_MILLI, + self.rtt_ns / NANOS_PER_MILLI, + ); + rtt + } else { + // When an RTT is observed that is less than the estimated RTT, we decay the + // prior estimate according to how much time has elapsed since the last + // update. The inverse of the decay is used to scale the estimate towards the + // observed RTT value. + let elapsed = nanos(now.saturating_duration_since(self.update_at)); + let decay = (-elapsed / decay_ns).exp(); + let recency = 1.0 - decay; + let next_estimate = (self.rtt_ns * decay) + (rtt * recency); + trace!( + "update rtt={:03.0}ms decay={:06.0}ns; next={:03.0}ms", + rtt / NANOS_PER_MILLI, + self.rtt_ns - next_estimate, + next_estimate / NANOS_PER_MILLI, + ); + next_estimate + }; + self.update_at = now; + + self.rtt_ns + } +} + +// ===== impl Handle ===== + +impl Drop for Handle { + fn drop(&mut self) { + let recv_at = Instant::now(); + + if let Ok(mut rtt) = self.rtt_estimate.lock() { + rtt.update(self.sent_at, recv_at, self.decay_ns); + } + } +} + +// ===== impl Cost ===== + +// Utility that converts durations to nanos in f64. +// +// Due to a lossy transformation, the maximum value that can be represented is ~585 years, +// which, I hope, is more than enough to represent request latencies. +fn nanos(d: Duration) -> f64 { + const NANOS_PER_SEC: u64 = 1_000_000_000; + let n = f64::from(d.subsec_nanos()); + let s = d.as_secs().saturating_mul(NANOS_PER_SEC) as f64; + n + s +} + +#[cfg(test)] +mod tests { + use futures_util::future; + use std::time::Duration; + use tokio::time; + use tokio_test::{assert_ready, assert_ready_ok, task}; + + use super::*; + + struct Svc; + impl Service<()> for Svc { + type Response = (); + type Error = (); + type Future = future::Ready>; + + fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, (): ()) -> Self::Future { + future::ok(()) + } + } + + /// The default RTT estimate decays, so that new nodes are considered if the + /// default RTT is too high. + #[tokio::test] + async fn default_decay() { + time::pause(); + + let svc = PeakEwma::new( + Svc, + Duration::from_millis(10), + NANOS_PER_MILLI * 1_000.0, + CompleteOnResponse, + ); + let Cost(load) = svc.load(); + assert_eq!(load, 10.0 * NANOS_PER_MILLI); + + time::advance(Duration::from_millis(100)).await; + let Cost(load) = svc.load(); + assert!(9.0 * NANOS_PER_MILLI < load && load < 10.0 * NANOS_PER_MILLI); + + time::advance(Duration::from_millis(100)).await; + let Cost(load) = svc.load(); + assert!(8.0 * NANOS_PER_MILLI < load && load < 9.0 * NANOS_PER_MILLI); + } + + // The default RTT estimate decays, so that new nodes are considered if the default RTT is too + // high. + #[tokio::test] + async fn compound_decay() { + time::pause(); + + let mut svc = PeakEwma::new( + Svc, + Duration::from_millis(20), + NANOS_PER_MILLI * 1_000.0, + CompleteOnResponse, + ); + assert_eq!(svc.load(), Cost(20.0 * NANOS_PER_MILLI)); + + time::advance(Duration::from_millis(100)).await; + let mut rsp0 = task::spawn(svc.call(())); + assert!(svc.load() > Cost(20.0 * NANOS_PER_MILLI)); + + time::advance(Duration::from_millis(100)).await; + let mut rsp1 = task::spawn(svc.call(())); + assert!(svc.load() > Cost(40.0 * NANOS_PER_MILLI)); + + time::advance(Duration::from_millis(100)).await; + let () = assert_ready_ok!(rsp0.poll()); + assert_eq!(svc.load(), Cost(400_000_000.0)); + + time::advance(Duration::from_millis(100)).await; + let () = assert_ready_ok!(rsp1.poll()); + assert_eq!(svc.load(), Cost(200_000_000.0)); + + // Check that values decay as time elapses + time::advance(Duration::from_secs(1)).await; + assert!(svc.load() < Cost(100_000_000.0)); + + time::advance(Duration::from_secs(10)).await; + assert!(svc.load() < Cost(100_000.0)); + } + + #[test] + fn nanos() { + assert_eq!(super::nanos(Duration::new(0, 0)), 0.0); + assert_eq!(super::nanos(Duration::new(0, 123)), 123.0); + assert_eq!(super::nanos(Duration::new(1, 23)), 1_000_000_023.0); + assert_eq!( + super::nanos(Duration::new(::std::u64::MAX, 999_999_999)), + 18446744074709553000.0 + ); + } +} diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index 62901b59..a348b9d6 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -127,10 +127,11 @@ impl ConnectionsGroup { .get_with(*block.hash(), async move { Instant::now() }) .await; - // TODO: this should be 0 if we are first seen, but i think it will be slightly non-zero - rpc.head_latency - .write() - .record(first_seen.elapsed().as_secs_f64() * 1000.0); + // TODO: this should be 0 if we are first seen, but i think it will be slightly non-zero. + // calculate elapsed time before trying to lock. + let latency = first_seen.elapsed(); + + rpc.head_latency.write().record(latency); // TODO: what about a reorg to the same height? if Some(block.number()) > self.highest_block.as_ref().map(|x| x.number()) { diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index cc671ab7..d53f0531 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -27,7 +27,7 @@ use serde_json::json; use serde_json::value::RawValue; use std::cmp::min_by_key; use std::collections::BTreeMap; -use std::sync::atomic::Ordering; +use std::sync::atomic::{self, Ordering}; use std::sync::Arc; use std::{cmp, fmt}; use thread_fast_rng::rand::seq::SliceRandom; @@ -592,7 +592,7 @@ impl Web3Rpcs { // TODO: cached key to save a read lock // TODO: ties to the server with the smallest block_data_limit let best_rpc = min_by_key(rpc_a, rpc_b, |x| { - OrderedFloat(x.request_latency.read().ewma.value()) + OrderedFloat(x.head_latency.read().value()) }); trace!("winner: {}", best_rpc); @@ -1159,9 +1159,16 @@ fn rpc_sync_status_sort_key(x: &Arc) -> (U64, u64, OrderedFloat) { let tier = x.tier; - let request_ewma = OrderedFloat(x.request_latency.read().ewma.value()); + // TODO: use request instead of head latency + let head_ewma = x.head_latency.read().value(); - (reversed_head_block, tier, request_ewma) + let active_requests = x.active_requests.load(atomic::Ordering::Relaxed) as f64; + + // TODO: i'm not sure head * active is exactly right. but we'll see + // TODO: i don't think this actually counts as peak. investigate with atomics.rs and peak_ewma.rs + let peak_ewma = OrderedFloat(head_ewma * active_requests); + + (reversed_head_block, tier, peak_ewma) } mod tests { diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index dcbfe220..1fc80ea4 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -8,12 +8,12 @@ use crate::frontend::authorization::Authorization; use crate::rpcs::request::RequestRevertHandler; use anyhow::{anyhow, Context}; use ethers::prelude::{Bytes, Middleware, ProviderError, TxHash, H256, U64}; -use ethers::types::{Transaction, U256}; +use ethers::types::{Address, Transaction, U256}; use futures::future::try_join_all; use futures::StreamExt; -use hdrhistogram::Histogram; use log::{debug, error, info, trace, warn, Level}; use migration::sea_orm::DatabaseConnection; +use ordered_float::OrderedFloat; use parking_lot::RwLock; use redis_rate_limiter::{RedisPool, RedisRateLimitResult, RedisRateLimiter}; use serde::ser::{SerializeStruct, Serializer}; @@ -30,10 +30,8 @@ use tokio::sync::{broadcast, oneshot, watch, RwLock as AsyncRwLock}; use tokio::time::{sleep, sleep_until, timeout, Duration, Instant}; pub struct Latency { - /// Track how many milliseconds slower we are than the fastest node - pub histogram: Histogram, /// exponentially weighted moving average of how many milliseconds behind the fastest node we are - pub ewma: ewma::EWMA, + ewma: ewma::EWMA, } impl Serialize for Latency { @@ -41,51 +39,52 @@ impl Serialize for Latency { where S: Serializer, { - let mut state = serializer.serialize_struct("latency", 6)?; - - state.serialize_field("ewma_ms", &self.ewma.value())?; - - state.serialize_field("histogram_len", &self.histogram.len())?; - state.serialize_field("mean_ms", &self.histogram.mean())?; - state.serialize_field("p50_ms", &self.histogram.value_at_quantile(0.50))?; - state.serialize_field("p75_ms", &self.histogram.value_at_quantile(0.75))?; - state.serialize_field("p99_ms", &self.histogram.value_at_quantile(0.99))?; - - state.end() + serializer.serialize_f64(self.ewma.value()) } } impl Latency { - pub fn record(&mut self, milliseconds: f64) { - self.ewma.add(milliseconds); + #[inline(always)] + pub fn record(&mut self, duration: Duration) { + self.record_ms(duration.as_secs_f64() * 1000.0); + } - // histogram needs ints and not floats - self.histogram.record(milliseconds as u64).unwrap(); + #[inline(always)] + pub fn record_ms(&mut self, milliseconds: f64) { + self.ewma.add(milliseconds); + } + + #[inline(always)] + pub fn value(&self) -> f64 { + self.ewma.value() } } impl Default for Latency { fn default() -> Self { - // TODO: what should the default sigfig be? - let sigfig = 0; - // TODO: what should the default span be? 25 requests? have a "new" let span = 25.0; - Self::new(sigfig, span).expect("default histogram sigfigs should always work") + let start = 1000.0; + + Self::new(span, start) } } impl Latency { - pub fn new(sigfig: u8, span: f64) -> Result { + // depending on the span, start might not be perfect + pub fn new(span: f64, start: f64) -> Self { let alpha = Self::span_to_alpha(span); - let histogram = Histogram::new(sigfig)?; + let mut ewma = ewma::EWMA::new(alpha); - Ok(Self { - histogram, - ewma: ewma::EWMA::new(alpha), - }) + if start > 0.0 { + for _ in 0..(span as u64) { + ewma.add(start); + } + } + + Self { ewma } } fn span_to_alpha(span: f64) -> f64 { @@ -127,11 +126,13 @@ pub struct Web3Rpc { pub(super) head_block: RwLock>, /// Track head block latency pub(super) head_latency: RwLock, - /// Track request latency - pub(super) request_latency: RwLock, + // /// Track request latency + // /// TODO: refactor this. this lock kills perf. for now just use head_latency + // pub(super) request_latency: RwLock, /// Track total requests served /// TODO: maybe move this to graphana pub(super) total_requests: AtomicUsize, + pub(super) active_requests: AtomicUsize, } impl Web3Rpc { @@ -259,6 +260,18 @@ impl Web3Rpc { Ok((new_connection, handle)) } + pub async fn peak_ewma(&self) -> OrderedFloat { + // TODO: use request instead of head latency? that was killing perf though + let head_ewma = self.head_latency.read().value(); + + // TODO: what ordering? + let active_requests = self.active_requests.load(atomic::Ordering::Relaxed) as f64; + + // TODO: i'm not sure head * active is exactly right. but we'll see + // TODO: i don't think this actually counts as peak. investigate with atomics.rs and peak_ewma.rs + OrderedFloat(head_ewma * active_requests) + } + // TODO: would be great if rpcs exposed this. see https://github.com/ledgerwatch/erigon/issues/6391 async fn check_block_data_limit( self: &Arc, @@ -671,14 +684,9 @@ impl Web3Rpc { // TODO: how often? different depending on the chain? // TODO: reset this timeout when a new block is seen? we need to keep request_latency updated though - // let health_sleep_seconds = 10; - - futures::future::pending::<()>().await; - - Ok(()) + let health_sleep_seconds = 10; // TODO: benchmark this and lock contention - /* let mut old_total_requests = 0; let mut new_total_requests; @@ -696,6 +704,7 @@ impl Web3Rpc { if new_total_requests - old_total_requests < 10 { // TODO: if this fails too many times, reset the connection + // TODO: move this into a function and the chaining should be easier let head_block = conn.head_block.read().clone(); if let Some((block_hash, txid)) = head_block.and_then(|x| { @@ -706,28 +715,65 @@ impl Web3Rpc { Some((block_hash, txid)) }) { - let authorization = authorization.clone(); - let conn = conn.clone(); + let to = conn + .wait_for_query::<_, Option>( + "eth_getTransactionByHash", + &(txid,), + revert_handler, + authorization.clone(), + Some(client.clone()), + ) + .await + .and_then(|tx| { + let tx = tx.context("no transaction found")?; - let x = async move { - conn.try_request_handle(&authorization, Some(client)).await - } - .await; + // TODO: what default? something real? + let to = tx.to.unwrap_or_else(|| { + "0xdead00000000000000000000000000000000beef" + .parse::
() + .expect("deafbeef") + }); - if let Ok(OpenRequestResult::Handle(x)) = x { - if let Ok(Some(x)) = x - .request::<_, Option>( - "eth_getTransactionByHash", - &(txid,), + Ok(to) + }); + + let code = match to { + Err(err) => { + if conn.backup { + debug!( + "{} failed health check query! {:#?}", + conn, err + ); + } else { + warn!( + "{} failed health check query! {:#?}", + conn, err + ); + } + continue; + } + Ok(to) => { + conn.wait_for_query::<_, Option>( + "eth_getCode", + &(to, block_hash), revert_handler, - None, + authorization.clone(), + Some(client), ) .await - { - // TODO: make this flatter - // TODO: do more (fair, not random) things here - // let = x.request("eth_getCode", (tx.to.unwrap_or(Address::zero()), block_hash), RequestRevertHandler::ErrorLevel, Some(client.clone())) } + }; + + if let Err(err) = code { + if conn.backup { + debug!( + "{} failed health check query! {:#?}", + conn, err + ); + } else { + warn!("{} failed health check query! {:#?}", conn, err); + } + continue; } } } @@ -735,7 +781,6 @@ impl Web3Rpc { old_total_requests = new_total_requests; } } - */ }; futures.push(flatten_handle(tokio::spawn(f))); @@ -1144,6 +1189,26 @@ impl Web3Rpc { Ok(OpenRequestResult::Handle(handle)) } + + pub async fn wait_for_query( + self: &Arc, + method: &str, + params: &P, + revert_handler: RequestRevertHandler, + authorization: Arc, + unlocked_provider: Option>, + ) -> anyhow::Result + where + // TODO: not sure about this type. would be better to not need clones, but measure and spawns combine to need it + P: Clone + fmt::Debug + serde::Serialize + Send + Sync + 'static, + R: serde::Serialize + serde::de::DeserializeOwned + fmt::Debug, + { + self.wait_for_request_handle(&authorization, None, None) + .await? + .request::(method, params, revert_handler, unlocked_provider) + .await + .context("ProviderError from the backend") + } } impl fmt::Debug for Web3Provider { @@ -1211,9 +1276,7 @@ impl Serialize for Web3Rpc { // TODO: maybe this is too much data. serialize less? state.serialize_field("head_block", &*self.head_block.read())?; - state.serialize_field("head_latency", &*self.head_latency.read())?; - - state.serialize_field("request_latency", &*self.request_latency.read())?; + state.serialize_field("head_latency", &self.head_latency.read().value())?; state.serialize_field( "total_requests", diff --git a/web3_proxy/src/rpcs/request.rs b/web3_proxy/src/rpcs/request.rs index 8a9254a6..139e3bba 100644 --- a/web3_proxy/src/rpcs/request.rs +++ b/web3_proxy/src/rpcs/request.rs @@ -139,6 +139,7 @@ impl OpenRequestHandle { /// Send a web3 request /// By having the request method here, we ensure that the rate limiter was called and connection counts were properly incremented + /// depending on how things are locked, you might need to pass the provider in pub async fn request( self, method: &str, @@ -156,30 +157,23 @@ impl OpenRequestHandle { // trace!(rpc=%self.conn, %method, "request"); trace!("requesting from {}", self.rpc); - let mut provider: Option> = None; + let mut provider = if unlocked_provider.is_some() { + unlocked_provider + } else { + self.rpc.provider.read().await.clone() + }; + let mut logged = false; while provider.is_none() { // trace!("waiting on provider: locking..."); - - // TODO: this should *not* be new_head_client. that is dedicated to only new heads - if let Some(unlocked_provider) = unlocked_provider { - provider = Some(unlocked_provider); - break; - } - - let unlocked_provider = self.rpc.provider.read().await; - - if let Some(unlocked_provider) = unlocked_provider.clone() { - provider = Some(unlocked_provider); - break; - } + sleep(Duration::from_millis(100)).await; if !logged { debug!("no provider for open handle on {}", self.rpc); logged = true; } - sleep(Duration::from_millis(100)).await; + provider = self.rpc.provider.read().await.clone(); } let provider = provider.expect("provider was checked already"); @@ -188,7 +182,11 @@ impl OpenRequestHandle { .total_requests .fetch_add(1, std::sync::atomic::Ordering::Relaxed); - let start = Instant::now(); + self.rpc + .active_requests + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); + + // let latency = Instant::now(); // TODO: replace ethers-rs providers with our own that supports streaming the responses let response = match provider.as_ref() { @@ -201,6 +199,13 @@ impl OpenRequestHandle { } }; + // note. we intentionally do not record this latency now. we do NOT want to measure errors + // let latency = latency.elapsed(); + + self.rpc + .active_requests + .fetch_sub(1, std::sync::atomic::Ordering::Relaxed); + // // TODO: i think ethers already has trace logging (and does it much more fancy) // trace!( // "response from {} for {} {:?}: {:?}",