change ewma calculation

This commit is contained in:
Bryan Stitt 2023-02-16 00:26:58 -08:00
parent 4916188d5c
commit 738815244d
7 changed files with 578 additions and 82 deletions

22
web3_proxy/src/atomics.rs Normal file

@ -0,0 +1,22 @@
use std::sync::atomic::{AtomicU64, Ordering};
pub struct AtomicF64 {
storage: AtomicU64,
}
impl AtomicF64 {
pub fn new(value: f64) -> Self {
let as_u64 = value.to_bits();
Self {
storage: AtomicU64::new(as_u64),
}
}
pub fn store(&self, value: f64, ordering: Ordering) {
let as_u64 = value.to_bits();
self.storage.store(as_u64, ordering)
}
pub fn load(&self, ordering: Ordering) -> f64 {
let as_u64 = self.storage.load(ordering);
f64::from_bits(as_u64)
}
}

@ -1,5 +1,6 @@
pub mod app;
pub mod app_stats;
pub mod atomics;
pub mod block_number;
pub mod config;
pub mod frontend;

397
web3_proxy/src/peak_ewma.rs Normal file

@ -0,0 +1,397 @@
//! Code from [tower](https://github.com/tower-rs/tower/blob/3f31ffd2cf15f1e905142e5f43ab39ac995c22ed/tower/src/load/peak_ewma.rs)
//! Measures load using the PeakEWMA response latency.
//! TODO: refactor to work with our code
use std::task::{Context, Poll};
use std::{
sync::{Arc, Mutex},
time::Duration,
};
use tokio::time::Instant;
use tower_service::Service;
use tracing::trace;
/// Measures the load of the underlying service using Peak-EWMA load measurement.
///
/// [`PeakEwma`] implements [`Load`] with the [`Cost`] metric that estimates the amount of
/// pending work to an endpoint. Work is calculated by multiplying the
/// exponentially-weighted moving average (EWMA) of response latencies by the number of
/// pending requests. The Peak-EWMA algorithm is designed to be especially sensitive to
/// worst-case latencies. Over time, the peak latency value decays towards the moving
/// average of latencies to the endpoint.
///
/// When no latency information has been measured for an endpoint, an arbitrary default
/// RTT of 1 second is used to prevent the endpoint from being overloaded before a
/// meaningful baseline can be established..
///
/// ## Note
///
/// This is derived from [Finagle][finagle], which is distributed under the Apache V2
/// license. Copyright 2017, Twitter Inc.
///
/// [finagle]:
/// https://github.com/twitter/finagle/blob/9cc08d15216497bb03a1cafda96b7266cfbbcff1/finagle-core/src/main/scala/com/twitter/finagle/loadbalancer/PeakEwma.scala
#[derive(Debug)]
pub struct PeakEwma<S, C = CompleteOnResponse> {
service: S,
decay_ns: f64,
rtt_estimate: Arc<Mutex<RttEstimate>>,
completion: C,
}
#[cfg(feature = "discover")]
pin_project! {
/// Wraps a `D`-typed stream of discovered services with `PeakEwma`.
#[cfg_attr(docsrs, doc(cfg(feature = "discover")))]
#[derive(Debug)]
pub struct PeakEwmaDiscover<D, C = CompleteOnResponse> {
#[pin]
discover: D,
decay_ns: f64,
default_rtt: Duration,
completion: C,
}
}
/// Represents the relative cost of communicating with a service.
///
/// The underlying value estimates the amount of pending work to a service: the Peak-EWMA
/// latency estimate multiplied by the number of pending requests.
#[derive(Copy, Clone, Debug, PartialEq, PartialOrd)]
pub struct Cost(f64);
/// Tracks an in-flight request and updates the RTT-estimate on Drop.
#[derive(Debug)]
pub struct Handle {
sent_at: Instant,
decay_ns: f64,
rtt_estimate: Arc<Mutex<RttEstimate>>,
}
/// Holds the current RTT estimate and the last time this value was updated.
#[derive(Debug)]
struct RttEstimate {
update_at: Instant,
rtt_ns: f64,
}
const NANOS_PER_MILLI: f64 = 1_000_000.0;
// ===== impl PeakEwma =====
impl<S, C> PeakEwma<S, C> {
/// Wraps an `S`-typed service so that its load is tracked by the EWMA of its peak latency.
pub fn new(service: S, default_rtt: Duration, decay_ns: f64, completion: C) -> Self {
debug_assert!(decay_ns > 0.0, "decay_ns must be positive");
Self {
service,
decay_ns,
rtt_estimate: Arc::new(Mutex::new(RttEstimate::new(nanos(default_rtt)))),
completion,
}
}
fn handle(&self) -> Handle {
Handle {
decay_ns: self.decay_ns,
sent_at: Instant::now(),
rtt_estimate: self.rtt_estimate.clone(),
}
}
}
impl<S, C, Request> Service<Request> for PeakEwma<S, C>
where
S: Service<Request>,
C: TrackCompletion<Handle, S::Response>,
{
type Response = C::Output;
type Error = S::Error;
type Future = TrackCompletionFuture<S::Future, C, Handle>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.service.poll_ready(cx)
}
fn call(&mut self, req: Request) -> Self::Future {
TrackCompletionFuture::new(
self.completion.clone(),
self.handle(),
self.service.call(req),
)
}
}
impl<S, C> Load for PeakEwma<S, C> {
type Metric = Cost;
fn load(&self) -> Self::Metric {
let pending = Arc::strong_count(&self.rtt_estimate) as u32 - 1;
// Update the RTT estimate to account for decay since the last update.
// If an estimate has not been established, a default is provided
let estimate = self.update_estimate();
let cost = Cost(estimate * f64::from(pending + 1));
trace!(
"load estimate={:.0}ms pending={} cost={:?}",
estimate / NANOS_PER_MILLI,
pending,
cost,
);
cost
}
}
impl<S, C> PeakEwma<S, C> {
fn update_estimate(&self) -> f64 {
let mut rtt = self.rtt_estimate.lock().expect("peak ewma prior_estimate");
rtt.decay(self.decay_ns)
}
}
// ===== impl PeakEwmaDiscover =====
#[cfg(feature = "discover")]
impl<D, C> PeakEwmaDiscover<D, C> {
/// Wraps a `D`-typed [`Discover`] so that services have a [`PeakEwma`] load metric.
///
/// The provided `default_rtt` is used as the default RTT estimate for newly
/// added services.
///
/// They `decay` value determines over what time period a RTT estimate should
/// decay.
pub fn new<Request>(discover: D, default_rtt: Duration, decay: Duration, completion: C) -> Self
where
D: Discover,
D::Service: Service<Request>,
C: TrackCompletion<Handle, <D::Service as Service<Request>>::Response>,
{
PeakEwmaDiscover {
discover,
decay_ns: nanos(decay),
default_rtt,
completion,
}
}
}
#[cfg(feature = "discover")]
impl<D, C> Stream for PeakEwmaDiscover<D, C>
where
D: Discover,
C: Clone,
{
type Item = Result<Change<D::Key, PeakEwma<D::Service, C>>, D::Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
let change = match ready!(this.discover.poll_discover(cx)).transpose()? {
None => return Poll::Ready(None),
Some(Change::Remove(k)) => Change::Remove(k),
Some(Change::Insert(k, svc)) => {
let peak_ewma = PeakEwma::new(
svc,
*this.default_rtt,
*this.decay_ns,
this.completion.clone(),
);
Change::Insert(k, peak_ewma)
}
};
Poll::Ready(Some(Ok(change)))
}
}
// ===== impl RttEstimate =====
impl RttEstimate {
fn new(rtt_ns: f64) -> Self {
debug_assert!(0.0 < rtt_ns, "rtt must be positive");
Self {
rtt_ns,
update_at: Instant::now(),
}
}
/// Decays the RTT estimate with a decay period of `decay_ns`.
fn decay(&mut self, decay_ns: f64) -> f64 {
// Updates with a 0 duration so that the estimate decays towards 0.
let now = Instant::now();
self.update(now, now, decay_ns)
}
/// Updates the Peak-EWMA RTT estimate.
///
/// The elapsed time from `sent_at` to `recv_at` is added
fn update(&mut self, sent_at: Instant, recv_at: Instant, decay_ns: f64) -> f64 {
debug_assert!(
sent_at <= recv_at,
"recv_at={:?} after sent_at={:?}",
recv_at,
sent_at
);
let rtt = nanos(recv_at.saturating_duration_since(sent_at));
let now = Instant::now();
debug_assert!(
self.update_at <= now,
"update_at={:?} in the future",
self.update_at
);
self.rtt_ns = if self.rtt_ns < rtt {
// For Peak-EWMA, always use the worst-case (peak) value as the estimate for
// subsequent requests.
trace!(
"update peak rtt={}ms prior={}ms",
rtt / NANOS_PER_MILLI,
self.rtt_ns / NANOS_PER_MILLI,
);
rtt
} else {
// When an RTT is observed that is less than the estimated RTT, we decay the
// prior estimate according to how much time has elapsed since the last
// update. The inverse of the decay is used to scale the estimate towards the
// observed RTT value.
let elapsed = nanos(now.saturating_duration_since(self.update_at));
let decay = (-elapsed / decay_ns).exp();
let recency = 1.0 - decay;
let next_estimate = (self.rtt_ns * decay) + (rtt * recency);
trace!(
"update rtt={:03.0}ms decay={:06.0}ns; next={:03.0}ms",
rtt / NANOS_PER_MILLI,
self.rtt_ns - next_estimate,
next_estimate / NANOS_PER_MILLI,
);
next_estimate
};
self.update_at = now;
self.rtt_ns
}
}
// ===== impl Handle =====
impl Drop for Handle {
fn drop(&mut self) {
let recv_at = Instant::now();
if let Ok(mut rtt) = self.rtt_estimate.lock() {
rtt.update(self.sent_at, recv_at, self.decay_ns);
}
}
}
// ===== impl Cost =====
// Utility that converts durations to nanos in f64.
//
// Due to a lossy transformation, the maximum value that can be represented is ~585 years,
// which, I hope, is more than enough to represent request latencies.
fn nanos(d: Duration) -> f64 {
const NANOS_PER_SEC: u64 = 1_000_000_000;
let n = f64::from(d.subsec_nanos());
let s = d.as_secs().saturating_mul(NANOS_PER_SEC) as f64;
n + s
}
#[cfg(test)]
mod tests {
use futures_util::future;
use std::time::Duration;
use tokio::time;
use tokio_test::{assert_ready, assert_ready_ok, task};
use super::*;
struct Svc;
impl Service<()> for Svc {
type Response = ();
type Error = ();
type Future = future::Ready<Result<(), ()>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), ()>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, (): ()) -> Self::Future {
future::ok(())
}
}
/// The default RTT estimate decays, so that new nodes are considered if the
/// default RTT is too high.
#[tokio::test]
async fn default_decay() {
time::pause();
let svc = PeakEwma::new(
Svc,
Duration::from_millis(10),
NANOS_PER_MILLI * 1_000.0,
CompleteOnResponse,
);
let Cost(load) = svc.load();
assert_eq!(load, 10.0 * NANOS_PER_MILLI);
time::advance(Duration::from_millis(100)).await;
let Cost(load) = svc.load();
assert!(9.0 * NANOS_PER_MILLI < load && load < 10.0 * NANOS_PER_MILLI);
time::advance(Duration::from_millis(100)).await;
let Cost(load) = svc.load();
assert!(8.0 * NANOS_PER_MILLI < load && load < 9.0 * NANOS_PER_MILLI);
}
// The default RTT estimate decays, so that new nodes are considered if the default RTT is too
// high.
#[tokio::test]
async fn compound_decay() {
time::pause();
let mut svc = PeakEwma::new(
Svc,
Duration::from_millis(20),
NANOS_PER_MILLI * 1_000.0,
CompleteOnResponse,
);
assert_eq!(svc.load(), Cost(20.0 * NANOS_PER_MILLI));
time::advance(Duration::from_millis(100)).await;
let mut rsp0 = task::spawn(svc.call(()));
assert!(svc.load() > Cost(20.0 * NANOS_PER_MILLI));
time::advance(Duration::from_millis(100)).await;
let mut rsp1 = task::spawn(svc.call(()));
assert!(svc.load() > Cost(40.0 * NANOS_PER_MILLI));
time::advance(Duration::from_millis(100)).await;
let () = assert_ready_ok!(rsp0.poll());
assert_eq!(svc.load(), Cost(400_000_000.0));
time::advance(Duration::from_millis(100)).await;
let () = assert_ready_ok!(rsp1.poll());
assert_eq!(svc.load(), Cost(200_000_000.0));
// Check that values decay as time elapses
time::advance(Duration::from_secs(1)).await;
assert!(svc.load() < Cost(100_000_000.0));
time::advance(Duration::from_secs(10)).await;
assert!(svc.load() < Cost(100_000.0));
}
#[test]
fn nanos() {
assert_eq!(super::nanos(Duration::new(0, 0)), 0.0);
assert_eq!(super::nanos(Duration::new(0, 123)), 123.0);
assert_eq!(super::nanos(Duration::new(1, 23)), 1_000_000_023.0);
assert_eq!(
super::nanos(Duration::new(::std::u64::MAX, 999_999_999)),
18446744074709553000.0
);
}
}

@ -127,10 +127,11 @@ impl ConnectionsGroup {
.get_with(*block.hash(), async move { Instant::now() })
.await;
// TODO: this should be 0 if we are first seen, but i think it will be slightly non-zero
rpc.head_latency
.write()
.record(first_seen.elapsed().as_secs_f64() * 1000.0);
// TODO: this should be 0 if we are first seen, but i think it will be slightly non-zero.
// calculate elapsed time before trying to lock.
let latency = first_seen.elapsed();
rpc.head_latency.write().record(latency);
// TODO: what about a reorg to the same height?
if Some(block.number()) > self.highest_block.as_ref().map(|x| x.number()) {

@ -27,7 +27,7 @@ use serde_json::json;
use serde_json::value::RawValue;
use std::cmp::min_by_key;
use std::collections::BTreeMap;
use std::sync::atomic::Ordering;
use std::sync::atomic::{self, Ordering};
use std::sync::Arc;
use std::{cmp, fmt};
use thread_fast_rng::rand::seq::SliceRandom;
@ -592,7 +592,7 @@ impl Web3Rpcs {
// TODO: cached key to save a read lock
// TODO: ties to the server with the smallest block_data_limit
let best_rpc = min_by_key(rpc_a, rpc_b, |x| {
OrderedFloat(x.request_latency.read().ewma.value())
OrderedFloat(x.head_latency.read().value())
});
trace!("winner: {}", best_rpc);
@ -1159,9 +1159,16 @@ fn rpc_sync_status_sort_key(x: &Arc<Web3Rpc>) -> (U64, u64, OrderedFloat<f64>) {
let tier = x.tier;
let request_ewma = OrderedFloat(x.request_latency.read().ewma.value());
// TODO: use request instead of head latency
let head_ewma = x.head_latency.read().value();
(reversed_head_block, tier, request_ewma)
let active_requests = x.active_requests.load(atomic::Ordering::Relaxed) as f64;
// TODO: i'm not sure head * active is exactly right. but we'll see
// TODO: i don't think this actually counts as peak. investigate with atomics.rs and peak_ewma.rs
let peak_ewma = OrderedFloat(head_ewma * active_requests);
(reversed_head_block, tier, peak_ewma)
}
mod tests {

@ -8,12 +8,12 @@ use crate::frontend::authorization::Authorization;
use crate::rpcs::request::RequestRevertHandler;
use anyhow::{anyhow, Context};
use ethers::prelude::{Bytes, Middleware, ProviderError, TxHash, H256, U64};
use ethers::types::{Transaction, U256};
use ethers::types::{Address, Transaction, U256};
use futures::future::try_join_all;
use futures::StreamExt;
use hdrhistogram::Histogram;
use log::{debug, error, info, trace, warn, Level};
use migration::sea_orm::DatabaseConnection;
use ordered_float::OrderedFloat;
use parking_lot::RwLock;
use redis_rate_limiter::{RedisPool, RedisRateLimitResult, RedisRateLimiter};
use serde::ser::{SerializeStruct, Serializer};
@ -30,10 +30,8 @@ use tokio::sync::{broadcast, oneshot, watch, RwLock as AsyncRwLock};
use tokio::time::{sleep, sleep_until, timeout, Duration, Instant};
pub struct Latency {
/// Track how many milliseconds slower we are than the fastest node
pub histogram: Histogram<u64>,
/// exponentially weighted moving average of how many milliseconds behind the fastest node we are
pub ewma: ewma::EWMA,
ewma: ewma::EWMA,
}
impl Serialize for Latency {
@ -41,51 +39,52 @@ impl Serialize for Latency {
where
S: Serializer,
{
let mut state = serializer.serialize_struct("latency", 6)?;
state.serialize_field("ewma_ms", &self.ewma.value())?;
state.serialize_field("histogram_len", &self.histogram.len())?;
state.serialize_field("mean_ms", &self.histogram.mean())?;
state.serialize_field("p50_ms", &self.histogram.value_at_quantile(0.50))?;
state.serialize_field("p75_ms", &self.histogram.value_at_quantile(0.75))?;
state.serialize_field("p99_ms", &self.histogram.value_at_quantile(0.99))?;
state.end()
serializer.serialize_f64(self.ewma.value())
}
}
impl Latency {
pub fn record(&mut self, milliseconds: f64) {
self.ewma.add(milliseconds);
#[inline(always)]
pub fn record(&mut self, duration: Duration) {
self.record_ms(duration.as_secs_f64() * 1000.0);
}
// histogram needs ints and not floats
self.histogram.record(milliseconds as u64).unwrap();
#[inline(always)]
pub fn record_ms(&mut self, milliseconds: f64) {
self.ewma.add(milliseconds);
}
#[inline(always)]
pub fn value(&self) -> f64 {
self.ewma.value()
}
}
impl Default for Latency {
fn default() -> Self {
// TODO: what should the default sigfig be?
let sigfig = 0;
// TODO: what should the default span be? 25 requests? have a "new"
let span = 25.0;
Self::new(sigfig, span).expect("default histogram sigfigs should always work")
let start = 1000.0;
Self::new(span, start)
}
}
impl Latency {
pub fn new(sigfig: u8, span: f64) -> Result<Self, hdrhistogram::CreationError> {
// depending on the span, start might not be perfect
pub fn new(span: f64, start: f64) -> Self {
let alpha = Self::span_to_alpha(span);
let histogram = Histogram::new(sigfig)?;
let mut ewma = ewma::EWMA::new(alpha);
Ok(Self {
histogram,
ewma: ewma::EWMA::new(alpha),
})
if start > 0.0 {
for _ in 0..(span as u64) {
ewma.add(start);
}
}
Self { ewma }
}
fn span_to_alpha(span: f64) -> f64 {
@ -127,11 +126,13 @@ pub struct Web3Rpc {
pub(super) head_block: RwLock<Option<Web3ProxyBlock>>,
/// Track head block latency
pub(super) head_latency: RwLock<Latency>,
/// Track request latency
pub(super) request_latency: RwLock<Latency>,
// /// Track request latency
// /// TODO: refactor this. this lock kills perf. for now just use head_latency
// pub(super) request_latency: RwLock<Latency>,
/// Track total requests served
/// TODO: maybe move this to graphana
pub(super) total_requests: AtomicUsize,
pub(super) active_requests: AtomicUsize,
}
impl Web3Rpc {
@ -259,6 +260,18 @@ impl Web3Rpc {
Ok((new_connection, handle))
}
pub async fn peak_ewma(&self) -> OrderedFloat<f64> {
// TODO: use request instead of head latency? that was killing perf though
let head_ewma = self.head_latency.read().value();
// TODO: what ordering?
let active_requests = self.active_requests.load(atomic::Ordering::Relaxed) as f64;
// TODO: i'm not sure head * active is exactly right. but we'll see
// TODO: i don't think this actually counts as peak. investigate with atomics.rs and peak_ewma.rs
OrderedFloat(head_ewma * active_requests)
}
// TODO: would be great if rpcs exposed this. see https://github.com/ledgerwatch/erigon/issues/6391
async fn check_block_data_limit(
self: &Arc<Self>,
@ -671,14 +684,9 @@ impl Web3Rpc {
// TODO: how often? different depending on the chain?
// TODO: reset this timeout when a new block is seen? we need to keep request_latency updated though
// let health_sleep_seconds = 10;
futures::future::pending::<()>().await;
Ok(())
let health_sleep_seconds = 10;
// TODO: benchmark this and lock contention
/*
let mut old_total_requests = 0;
let mut new_total_requests;
@ -696,6 +704,7 @@ impl Web3Rpc {
if new_total_requests - old_total_requests < 10 {
// TODO: if this fails too many times, reset the connection
// TODO: move this into a function and the chaining should be easier
let head_block = conn.head_block.read().clone();
if let Some((block_hash, txid)) = head_block.and_then(|x| {
@ -706,28 +715,65 @@ impl Web3Rpc {
Some((block_hash, txid))
}) {
let authorization = authorization.clone();
let conn = conn.clone();
let to = conn
.wait_for_query::<_, Option<Transaction>>(
"eth_getTransactionByHash",
&(txid,),
revert_handler,
authorization.clone(),
Some(client.clone()),
)
.await
.and_then(|tx| {
let tx = tx.context("no transaction found")?;
let x = async move {
conn.try_request_handle(&authorization, Some(client)).await
}
.await;
// TODO: what default? something real?
let to = tx.to.unwrap_or_else(|| {
"0xdead00000000000000000000000000000000beef"
.parse::<Address>()
.expect("deafbeef")
});
if let Ok(OpenRequestResult::Handle(x)) = x {
if let Ok(Some(x)) = x
.request::<_, Option<Transaction>>(
"eth_getTransactionByHash",
&(txid,),
Ok(to)
});
let code = match to {
Err(err) => {
if conn.backup {
debug!(
"{} failed health check query! {:#?}",
conn, err
);
} else {
warn!(
"{} failed health check query! {:#?}",
conn, err
);
}
continue;
}
Ok(to) => {
conn.wait_for_query::<_, Option<Bytes>>(
"eth_getCode",
&(to, block_hash),
revert_handler,
None,
authorization.clone(),
Some(client),
)
.await
{
// TODO: make this flatter
// TODO: do more (fair, not random) things here
// let = x.request("eth_getCode", (tx.to.unwrap_or(Address::zero()), block_hash), RequestRevertHandler::ErrorLevel, Some(client.clone()))
}
};
if let Err(err) = code {
if conn.backup {
debug!(
"{} failed health check query! {:#?}",
conn, err
);
} else {
warn!("{} failed health check query! {:#?}", conn, err);
}
continue;
}
}
}
@ -735,7 +781,6 @@ impl Web3Rpc {
old_total_requests = new_total_requests;
}
}
*/
};
futures.push(flatten_handle(tokio::spawn(f)));
@ -1144,6 +1189,26 @@ impl Web3Rpc {
Ok(OpenRequestResult::Handle(handle))
}
pub async fn wait_for_query<P, R>(
self: &Arc<Self>,
method: &str,
params: &P,
revert_handler: RequestRevertHandler,
authorization: Arc<Authorization>,
unlocked_provider: Option<Arc<Web3Provider>>,
) -> anyhow::Result<R>
where
// TODO: not sure about this type. would be better to not need clones, but measure and spawns combine to need it
P: Clone + fmt::Debug + serde::Serialize + Send + Sync + 'static,
R: serde::Serialize + serde::de::DeserializeOwned + fmt::Debug,
{
self.wait_for_request_handle(&authorization, None, None)
.await?
.request::<P, R>(method, params, revert_handler, unlocked_provider)
.await
.context("ProviderError from the backend")
}
}
impl fmt::Debug for Web3Provider {
@ -1211,9 +1276,7 @@ impl Serialize for Web3Rpc {
// TODO: maybe this is too much data. serialize less?
state.serialize_field("head_block", &*self.head_block.read())?;
state.serialize_field("head_latency", &*self.head_latency.read())?;
state.serialize_field("request_latency", &*self.request_latency.read())?;
state.serialize_field("head_latency", &self.head_latency.read().value())?;
state.serialize_field(
"total_requests",

@ -139,6 +139,7 @@ impl OpenRequestHandle {
/// Send a web3 request
/// By having the request method here, we ensure that the rate limiter was called and connection counts were properly incremented
/// depending on how things are locked, you might need to pass the provider in
pub async fn request<P, R>(
self,
method: &str,
@ -156,30 +157,23 @@ impl OpenRequestHandle {
// trace!(rpc=%self.conn, %method, "request");
trace!("requesting from {}", self.rpc);
let mut provider: Option<Arc<Web3Provider>> = None;
let mut provider = if unlocked_provider.is_some() {
unlocked_provider
} else {
self.rpc.provider.read().await.clone()
};
let mut logged = false;
while provider.is_none() {
// trace!("waiting on provider: locking...");
// TODO: this should *not* be new_head_client. that is dedicated to only new heads
if let Some(unlocked_provider) = unlocked_provider {
provider = Some(unlocked_provider);
break;
}
let unlocked_provider = self.rpc.provider.read().await;
if let Some(unlocked_provider) = unlocked_provider.clone() {
provider = Some(unlocked_provider);
break;
}
sleep(Duration::from_millis(100)).await;
if !logged {
debug!("no provider for open handle on {}", self.rpc);
logged = true;
}
sleep(Duration::from_millis(100)).await;
provider = self.rpc.provider.read().await.clone();
}
let provider = provider.expect("provider was checked already");
@ -188,7 +182,11 @@ impl OpenRequestHandle {
.total_requests
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let start = Instant::now();
self.rpc
.active_requests
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
// let latency = Instant::now();
// TODO: replace ethers-rs providers with our own that supports streaming the responses
let response = match provider.as_ref() {
@ -201,6 +199,13 @@ impl OpenRequestHandle {
}
};
// note. we intentionally do not record this latency now. we do NOT want to measure errors
// let latency = latency.elapsed();
self.rpc
.active_requests
.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
// // TODO: i think ethers already has trace logging (and does it much more fancy)
// trace!(
// "response from {} for {} {:?}: {:?}",