continue rebase
This commit is contained in:
parent
df87a41d5b
commit
fc1fdaaaf1
22
web3_proxy/src/atomics.rs
Normal file
22
web3_proxy/src/atomics.rs
Normal file
@ -0,0 +1,22 @@
|
|||||||
|
use std::sync::atomic::{AtomicU64, Ordering};
|
||||||
|
|
||||||
|
pub struct AtomicF64 {
|
||||||
|
storage: AtomicU64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AtomicF64 {
|
||||||
|
pub fn new(value: f64) -> Self {
|
||||||
|
let as_u64 = value.to_bits();
|
||||||
|
Self {
|
||||||
|
storage: AtomicU64::new(as_u64),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn store(&self, value: f64, ordering: Ordering) {
|
||||||
|
let as_u64 = value.to_bits();
|
||||||
|
self.storage.store(as_u64, ordering)
|
||||||
|
}
|
||||||
|
pub fn load(&self, ordering: Ordering) -> f64 {
|
||||||
|
let as_u64 = self.storage.load(ordering);
|
||||||
|
f64::from_bits(as_u64)
|
||||||
|
}
|
||||||
|
}
|
@ -1,6 +1,7 @@
|
|||||||
pub mod app;
|
pub mod app;
|
||||||
pub mod app_stats;
|
pub mod app_stats;
|
||||||
pub mod admin_queries;
|
pub mod admin_queries;
|
||||||
|
pub mod atomics;
|
||||||
pub mod block_number;
|
pub mod block_number;
|
||||||
pub mod config;
|
pub mod config;
|
||||||
pub mod frontend;
|
pub mod frontend;
|
||||||
|
397
web3_proxy/src/peak_ewma.rs
Normal file
397
web3_proxy/src/peak_ewma.rs
Normal file
@ -0,0 +1,397 @@
|
|||||||
|
//! Code from [tower](https://github.com/tower-rs/tower/blob/3f31ffd2cf15f1e905142e5f43ab39ac995c22ed/tower/src/load/peak_ewma.rs)
|
||||||
|
//! Measures load using the PeakEWMA response latency.
|
||||||
|
//! TODO: refactor to work with our code
|
||||||
|
|
||||||
|
use std::task::{Context, Poll};
|
||||||
|
use std::{
|
||||||
|
sync::{Arc, Mutex},
|
||||||
|
time::Duration,
|
||||||
|
};
|
||||||
|
use tokio::time::Instant;
|
||||||
|
use tower_service::Service;
|
||||||
|
use tracing::trace;
|
||||||
|
|
||||||
|
/// Measures the load of the underlying service using Peak-EWMA load measurement.
|
||||||
|
///
|
||||||
|
/// [`PeakEwma`] implements [`Load`] with the [`Cost`] metric that estimates the amount of
|
||||||
|
/// pending work to an endpoint. Work is calculated by multiplying the
|
||||||
|
/// exponentially-weighted moving average (EWMA) of response latencies by the number of
|
||||||
|
/// pending requests. The Peak-EWMA algorithm is designed to be especially sensitive to
|
||||||
|
/// worst-case latencies. Over time, the peak latency value decays towards the moving
|
||||||
|
/// average of latencies to the endpoint.
|
||||||
|
///
|
||||||
|
/// When no latency information has been measured for an endpoint, an arbitrary default
|
||||||
|
/// RTT of 1 second is used to prevent the endpoint from being overloaded before a
|
||||||
|
/// meaningful baseline can be established..
|
||||||
|
///
|
||||||
|
/// ## Note
|
||||||
|
///
|
||||||
|
/// This is derived from [Finagle][finagle], which is distributed under the Apache V2
|
||||||
|
/// license. Copyright 2017, Twitter Inc.
|
||||||
|
///
|
||||||
|
/// [finagle]:
|
||||||
|
/// https://github.com/twitter/finagle/blob/9cc08d15216497bb03a1cafda96b7266cfbbcff1/finagle-core/src/main/scala/com/twitter/finagle/loadbalancer/PeakEwma.scala
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct PeakEwma<S, C = CompleteOnResponse> {
|
||||||
|
service: S,
|
||||||
|
decay_ns: f64,
|
||||||
|
rtt_estimate: Arc<Mutex<RttEstimate>>,
|
||||||
|
completion: C,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "discover")]
|
||||||
|
pin_project! {
|
||||||
|
/// Wraps a `D`-typed stream of discovered services with `PeakEwma`.
|
||||||
|
#[cfg_attr(docsrs, doc(cfg(feature = "discover")))]
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct PeakEwmaDiscover<D, C = CompleteOnResponse> {
|
||||||
|
#[pin]
|
||||||
|
discover: D,
|
||||||
|
decay_ns: f64,
|
||||||
|
default_rtt: Duration,
|
||||||
|
completion: C,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Represents the relative cost of communicating with a service.
|
||||||
|
///
|
||||||
|
/// The underlying value estimates the amount of pending work to a service: the Peak-EWMA
|
||||||
|
/// latency estimate multiplied by the number of pending requests.
|
||||||
|
#[derive(Copy, Clone, Debug, PartialEq, PartialOrd)]
|
||||||
|
pub struct Cost(f64);
|
||||||
|
|
||||||
|
/// Tracks an in-flight request and updates the RTT-estimate on Drop.
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct Handle {
|
||||||
|
sent_at: Instant,
|
||||||
|
decay_ns: f64,
|
||||||
|
rtt_estimate: Arc<Mutex<RttEstimate>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Holds the current RTT estimate and the last time this value was updated.
|
||||||
|
#[derive(Debug)]
|
||||||
|
struct RttEstimate {
|
||||||
|
update_at: Instant,
|
||||||
|
rtt_ns: f64,
|
||||||
|
}
|
||||||
|
|
||||||
|
const NANOS_PER_MILLI: f64 = 1_000_000.0;
|
||||||
|
|
||||||
|
// ===== impl PeakEwma =====
|
||||||
|
|
||||||
|
impl<S, C> PeakEwma<S, C> {
|
||||||
|
/// Wraps an `S`-typed service so that its load is tracked by the EWMA of its peak latency.
|
||||||
|
pub fn new(service: S, default_rtt: Duration, decay_ns: f64, completion: C) -> Self {
|
||||||
|
debug_assert!(decay_ns > 0.0, "decay_ns must be positive");
|
||||||
|
Self {
|
||||||
|
service,
|
||||||
|
decay_ns,
|
||||||
|
rtt_estimate: Arc::new(Mutex::new(RttEstimate::new(nanos(default_rtt)))),
|
||||||
|
completion,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle(&self) -> Handle {
|
||||||
|
Handle {
|
||||||
|
decay_ns: self.decay_ns,
|
||||||
|
sent_at: Instant::now(),
|
||||||
|
rtt_estimate: self.rtt_estimate.clone(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S, C, Request> Service<Request> for PeakEwma<S, C>
|
||||||
|
where
|
||||||
|
S: Service<Request>,
|
||||||
|
C: TrackCompletion<Handle, S::Response>,
|
||||||
|
{
|
||||||
|
type Response = C::Output;
|
||||||
|
type Error = S::Error;
|
||||||
|
type Future = TrackCompletionFuture<S::Future, C, Handle>;
|
||||||
|
|
||||||
|
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||||
|
self.service.poll_ready(cx)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn call(&mut self, req: Request) -> Self::Future {
|
||||||
|
TrackCompletionFuture::new(
|
||||||
|
self.completion.clone(),
|
||||||
|
self.handle(),
|
||||||
|
self.service.call(req),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S, C> Load for PeakEwma<S, C> {
|
||||||
|
type Metric = Cost;
|
||||||
|
|
||||||
|
fn load(&self) -> Self::Metric {
|
||||||
|
let pending = Arc::strong_count(&self.rtt_estimate) as u32 - 1;
|
||||||
|
|
||||||
|
// Update the RTT estimate to account for decay since the last update.
|
||||||
|
// If an estimate has not been established, a default is provided
|
||||||
|
let estimate = self.update_estimate();
|
||||||
|
|
||||||
|
let cost = Cost(estimate * f64::from(pending + 1));
|
||||||
|
trace!(
|
||||||
|
"load estimate={:.0}ms pending={} cost={:?}",
|
||||||
|
estimate / NANOS_PER_MILLI,
|
||||||
|
pending,
|
||||||
|
cost,
|
||||||
|
);
|
||||||
|
cost
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S, C> PeakEwma<S, C> {
|
||||||
|
fn update_estimate(&self) -> f64 {
|
||||||
|
let mut rtt = self.rtt_estimate.lock().expect("peak ewma prior_estimate");
|
||||||
|
rtt.decay(self.decay_ns)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ===== impl PeakEwmaDiscover =====
|
||||||
|
|
||||||
|
#[cfg(feature = "discover")]
|
||||||
|
impl<D, C> PeakEwmaDiscover<D, C> {
|
||||||
|
/// Wraps a `D`-typed [`Discover`] so that services have a [`PeakEwma`] load metric.
|
||||||
|
///
|
||||||
|
/// The provided `default_rtt` is used as the default RTT estimate for newly
|
||||||
|
/// added services.
|
||||||
|
///
|
||||||
|
/// They `decay` value determines over what time period a RTT estimate should
|
||||||
|
/// decay.
|
||||||
|
pub fn new<Request>(discover: D, default_rtt: Duration, decay: Duration, completion: C) -> Self
|
||||||
|
where
|
||||||
|
D: Discover,
|
||||||
|
D::Service: Service<Request>,
|
||||||
|
C: TrackCompletion<Handle, <D::Service as Service<Request>>::Response>,
|
||||||
|
{
|
||||||
|
PeakEwmaDiscover {
|
||||||
|
discover,
|
||||||
|
decay_ns: nanos(decay),
|
||||||
|
default_rtt,
|
||||||
|
completion,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "discover")]
|
||||||
|
impl<D, C> Stream for PeakEwmaDiscover<D, C>
|
||||||
|
where
|
||||||
|
D: Discover,
|
||||||
|
C: Clone,
|
||||||
|
{
|
||||||
|
type Item = Result<Change<D::Key, PeakEwma<D::Service, C>>, D::Error>;
|
||||||
|
|
||||||
|
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||||
|
let this = self.project();
|
||||||
|
let change = match ready!(this.discover.poll_discover(cx)).transpose()? {
|
||||||
|
None => return Poll::Ready(None),
|
||||||
|
Some(Change::Remove(k)) => Change::Remove(k),
|
||||||
|
Some(Change::Insert(k, svc)) => {
|
||||||
|
let peak_ewma = PeakEwma::new(
|
||||||
|
svc,
|
||||||
|
*this.default_rtt,
|
||||||
|
*this.decay_ns,
|
||||||
|
this.completion.clone(),
|
||||||
|
);
|
||||||
|
Change::Insert(k, peak_ewma)
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
Poll::Ready(Some(Ok(change)))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ===== impl RttEstimate =====
|
||||||
|
|
||||||
|
impl RttEstimate {
|
||||||
|
fn new(rtt_ns: f64) -> Self {
|
||||||
|
debug_assert!(0.0 < rtt_ns, "rtt must be positive");
|
||||||
|
Self {
|
||||||
|
rtt_ns,
|
||||||
|
update_at: Instant::now(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Decays the RTT estimate with a decay period of `decay_ns`.
|
||||||
|
fn decay(&mut self, decay_ns: f64) -> f64 {
|
||||||
|
// Updates with a 0 duration so that the estimate decays towards 0.
|
||||||
|
let now = Instant::now();
|
||||||
|
self.update(now, now, decay_ns)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Updates the Peak-EWMA RTT estimate.
|
||||||
|
///
|
||||||
|
/// The elapsed time from `sent_at` to `recv_at` is added
|
||||||
|
fn update(&mut self, sent_at: Instant, recv_at: Instant, decay_ns: f64) -> f64 {
|
||||||
|
debug_assert!(
|
||||||
|
sent_at <= recv_at,
|
||||||
|
"recv_at={:?} after sent_at={:?}",
|
||||||
|
recv_at,
|
||||||
|
sent_at
|
||||||
|
);
|
||||||
|
let rtt = nanos(recv_at.saturating_duration_since(sent_at));
|
||||||
|
|
||||||
|
let now = Instant::now();
|
||||||
|
debug_assert!(
|
||||||
|
self.update_at <= now,
|
||||||
|
"update_at={:?} in the future",
|
||||||
|
self.update_at
|
||||||
|
);
|
||||||
|
|
||||||
|
self.rtt_ns = if self.rtt_ns < rtt {
|
||||||
|
// For Peak-EWMA, always use the worst-case (peak) value as the estimate for
|
||||||
|
// subsequent requests.
|
||||||
|
trace!(
|
||||||
|
"update peak rtt={}ms prior={}ms",
|
||||||
|
rtt / NANOS_PER_MILLI,
|
||||||
|
self.rtt_ns / NANOS_PER_MILLI,
|
||||||
|
);
|
||||||
|
rtt
|
||||||
|
} else {
|
||||||
|
// When an RTT is observed that is less than the estimated RTT, we decay the
|
||||||
|
// prior estimate according to how much time has elapsed since the last
|
||||||
|
// update. The inverse of the decay is used to scale the estimate towards the
|
||||||
|
// observed RTT value.
|
||||||
|
let elapsed = nanos(now.saturating_duration_since(self.update_at));
|
||||||
|
let decay = (-elapsed / decay_ns).exp();
|
||||||
|
let recency = 1.0 - decay;
|
||||||
|
let next_estimate = (self.rtt_ns * decay) + (rtt * recency);
|
||||||
|
trace!(
|
||||||
|
"update rtt={:03.0}ms decay={:06.0}ns; next={:03.0}ms",
|
||||||
|
rtt / NANOS_PER_MILLI,
|
||||||
|
self.rtt_ns - next_estimate,
|
||||||
|
next_estimate / NANOS_PER_MILLI,
|
||||||
|
);
|
||||||
|
next_estimate
|
||||||
|
};
|
||||||
|
self.update_at = now;
|
||||||
|
|
||||||
|
self.rtt_ns
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ===== impl Handle =====
|
||||||
|
|
||||||
|
impl Drop for Handle {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
let recv_at = Instant::now();
|
||||||
|
|
||||||
|
if let Ok(mut rtt) = self.rtt_estimate.lock() {
|
||||||
|
rtt.update(self.sent_at, recv_at, self.decay_ns);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ===== impl Cost =====
|
||||||
|
|
||||||
|
// Utility that converts durations to nanos in f64.
|
||||||
|
//
|
||||||
|
// Due to a lossy transformation, the maximum value that can be represented is ~585 years,
|
||||||
|
// which, I hope, is more than enough to represent request latencies.
|
||||||
|
fn nanos(d: Duration) -> f64 {
|
||||||
|
const NANOS_PER_SEC: u64 = 1_000_000_000;
|
||||||
|
let n = f64::from(d.subsec_nanos());
|
||||||
|
let s = d.as_secs().saturating_mul(NANOS_PER_SEC) as f64;
|
||||||
|
n + s
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use futures_util::future;
|
||||||
|
use std::time::Duration;
|
||||||
|
use tokio::time;
|
||||||
|
use tokio_test::{assert_ready, assert_ready_ok, task};
|
||||||
|
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
struct Svc;
|
||||||
|
impl Service<()> for Svc {
|
||||||
|
type Response = ();
|
||||||
|
type Error = ();
|
||||||
|
type Future = future::Ready<Result<(), ()>>;
|
||||||
|
|
||||||
|
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), ()>> {
|
||||||
|
Poll::Ready(Ok(()))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn call(&mut self, (): ()) -> Self::Future {
|
||||||
|
future::ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The default RTT estimate decays, so that new nodes are considered if the
|
||||||
|
/// default RTT is too high.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn default_decay() {
|
||||||
|
time::pause();
|
||||||
|
|
||||||
|
let svc = PeakEwma::new(
|
||||||
|
Svc,
|
||||||
|
Duration::from_millis(10),
|
||||||
|
NANOS_PER_MILLI * 1_000.0,
|
||||||
|
CompleteOnResponse,
|
||||||
|
);
|
||||||
|
let Cost(load) = svc.load();
|
||||||
|
assert_eq!(load, 10.0 * NANOS_PER_MILLI);
|
||||||
|
|
||||||
|
time::advance(Duration::from_millis(100)).await;
|
||||||
|
let Cost(load) = svc.load();
|
||||||
|
assert!(9.0 * NANOS_PER_MILLI < load && load < 10.0 * NANOS_PER_MILLI);
|
||||||
|
|
||||||
|
time::advance(Duration::from_millis(100)).await;
|
||||||
|
let Cost(load) = svc.load();
|
||||||
|
assert!(8.0 * NANOS_PER_MILLI < load && load < 9.0 * NANOS_PER_MILLI);
|
||||||
|
}
|
||||||
|
|
||||||
|
// The default RTT estimate decays, so that new nodes are considered if the default RTT is too
|
||||||
|
// high.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn compound_decay() {
|
||||||
|
time::pause();
|
||||||
|
|
||||||
|
let mut svc = PeakEwma::new(
|
||||||
|
Svc,
|
||||||
|
Duration::from_millis(20),
|
||||||
|
NANOS_PER_MILLI * 1_000.0,
|
||||||
|
CompleteOnResponse,
|
||||||
|
);
|
||||||
|
assert_eq!(svc.load(), Cost(20.0 * NANOS_PER_MILLI));
|
||||||
|
|
||||||
|
time::advance(Duration::from_millis(100)).await;
|
||||||
|
let mut rsp0 = task::spawn(svc.call(()));
|
||||||
|
assert!(svc.load() > Cost(20.0 * NANOS_PER_MILLI));
|
||||||
|
|
||||||
|
time::advance(Duration::from_millis(100)).await;
|
||||||
|
let mut rsp1 = task::spawn(svc.call(()));
|
||||||
|
assert!(svc.load() > Cost(40.0 * NANOS_PER_MILLI));
|
||||||
|
|
||||||
|
time::advance(Duration::from_millis(100)).await;
|
||||||
|
let () = assert_ready_ok!(rsp0.poll());
|
||||||
|
assert_eq!(svc.load(), Cost(400_000_000.0));
|
||||||
|
|
||||||
|
time::advance(Duration::from_millis(100)).await;
|
||||||
|
let () = assert_ready_ok!(rsp1.poll());
|
||||||
|
assert_eq!(svc.load(), Cost(200_000_000.0));
|
||||||
|
|
||||||
|
// Check that values decay as time elapses
|
||||||
|
time::advance(Duration::from_secs(1)).await;
|
||||||
|
assert!(svc.load() < Cost(100_000_000.0));
|
||||||
|
|
||||||
|
time::advance(Duration::from_secs(10)).await;
|
||||||
|
assert!(svc.load() < Cost(100_000.0));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn nanos() {
|
||||||
|
assert_eq!(super::nanos(Duration::new(0, 0)), 0.0);
|
||||||
|
assert_eq!(super::nanos(Duration::new(0, 123)), 123.0);
|
||||||
|
assert_eq!(super::nanos(Duration::new(1, 23)), 1_000_000_023.0);
|
||||||
|
assert_eq!(
|
||||||
|
super::nanos(Duration::new(::std::u64::MAX, 999_999_999)),
|
||||||
|
18446744074709553000.0
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
@ -127,10 +127,11 @@ impl ConnectionsGroup {
|
|||||||
.get_with(*block.hash(), async move { Instant::now() })
|
.get_with(*block.hash(), async move { Instant::now() })
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
// TODO: this should be 0 if we are first seen, but i think it will be slightly non-zero
|
// TODO: this should be 0 if we are first seen, but i think it will be slightly non-zero.
|
||||||
rpc.head_latency
|
// calculate elapsed time before trying to lock.
|
||||||
.write()
|
let latency = first_seen.elapsed();
|
||||||
.record(first_seen.elapsed().as_secs_f64() * 1000.0);
|
|
||||||
|
rpc.head_latency.write().record(latency);
|
||||||
|
|
||||||
// TODO: what about a reorg to the same height?
|
// TODO: what about a reorg to the same height?
|
||||||
if Some(block.number()) > self.highest_block.as_ref().map(|x| x.number()) {
|
if Some(block.number()) > self.highest_block.as_ref().map(|x| x.number()) {
|
||||||
|
@ -27,7 +27,7 @@ use serde_json::json;
|
|||||||
use serde_json::value::RawValue;
|
use serde_json::value::RawValue;
|
||||||
use std::cmp::min_by_key;
|
use std::cmp::min_by_key;
|
||||||
use std::collections::BTreeMap;
|
use std::collections::BTreeMap;
|
||||||
use std::sync::atomic::Ordering;
|
use std::sync::atomic::{self, Ordering};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::{cmp, fmt};
|
use std::{cmp, fmt};
|
||||||
use thread_fast_rng::rand::seq::SliceRandom;
|
use thread_fast_rng::rand::seq::SliceRandom;
|
||||||
@ -592,7 +592,7 @@ impl Web3Rpcs {
|
|||||||
// TODO: cached key to save a read lock
|
// TODO: cached key to save a read lock
|
||||||
// TODO: ties to the server with the smallest block_data_limit
|
// TODO: ties to the server with the smallest block_data_limit
|
||||||
let best_rpc = min_by_key(rpc_a, rpc_b, |x| {
|
let best_rpc = min_by_key(rpc_a, rpc_b, |x| {
|
||||||
OrderedFloat(x.request_latency.read().ewma.value())
|
OrderedFloat(x.head_latency.read().value())
|
||||||
});
|
});
|
||||||
trace!("winner: {}", best_rpc);
|
trace!("winner: {}", best_rpc);
|
||||||
|
|
||||||
@ -1159,9 +1159,16 @@ fn rpc_sync_status_sort_key(x: &Arc<Web3Rpc>) -> (U64, u64, OrderedFloat<f64>) {
|
|||||||
|
|
||||||
let tier = x.tier;
|
let tier = x.tier;
|
||||||
|
|
||||||
let request_ewma = OrderedFloat(x.request_latency.read().ewma.value());
|
// TODO: use request instead of head latency
|
||||||
|
let head_ewma = x.head_latency.read().value();
|
||||||
|
|
||||||
(reversed_head_block, tier, request_ewma)
|
let active_requests = x.active_requests.load(atomic::Ordering::Relaxed) as f64;
|
||||||
|
|
||||||
|
// TODO: i'm not sure head * active is exactly right. but we'll see
|
||||||
|
// TODO: i don't think this actually counts as peak. investigate with atomics.rs and peak_ewma.rs
|
||||||
|
let peak_ewma = OrderedFloat(head_ewma * active_requests);
|
||||||
|
|
||||||
|
(reversed_head_block, tier, peak_ewma)
|
||||||
}
|
}
|
||||||
|
|
||||||
mod tests {
|
mod tests {
|
||||||
|
@ -8,12 +8,12 @@ use crate::frontend::authorization::Authorization;
|
|||||||
use crate::rpcs::request::RequestRevertHandler;
|
use crate::rpcs::request::RequestRevertHandler;
|
||||||
use anyhow::{anyhow, Context};
|
use anyhow::{anyhow, Context};
|
||||||
use ethers::prelude::{Bytes, Middleware, ProviderError, TxHash, H256, U64};
|
use ethers::prelude::{Bytes, Middleware, ProviderError, TxHash, H256, U64};
|
||||||
use ethers::types::{Transaction, U256};
|
use ethers::types::{Address, Transaction, U256};
|
||||||
use futures::future::try_join_all;
|
use futures::future::try_join_all;
|
||||||
use futures::StreamExt;
|
use futures::StreamExt;
|
||||||
use hdrhistogram::Histogram;
|
|
||||||
use log::{debug, error, info, trace, warn, Level};
|
use log::{debug, error, info, trace, warn, Level};
|
||||||
use migration::sea_orm::DatabaseConnection;
|
use migration::sea_orm::DatabaseConnection;
|
||||||
|
use ordered_float::OrderedFloat;
|
||||||
use parking_lot::RwLock;
|
use parking_lot::RwLock;
|
||||||
use redis_rate_limiter::{RedisPool, RedisRateLimitResult, RedisRateLimiter};
|
use redis_rate_limiter::{RedisPool, RedisRateLimitResult, RedisRateLimiter};
|
||||||
use serde::ser::{SerializeStruct, Serializer};
|
use serde::ser::{SerializeStruct, Serializer};
|
||||||
@ -30,10 +30,8 @@ use tokio::sync::{broadcast, oneshot, watch, RwLock as AsyncRwLock};
|
|||||||
use tokio::time::{sleep, sleep_until, timeout, Duration, Instant};
|
use tokio::time::{sleep, sleep_until, timeout, Duration, Instant};
|
||||||
|
|
||||||
pub struct Latency {
|
pub struct Latency {
|
||||||
/// Track how many milliseconds slower we are than the fastest node
|
|
||||||
pub histogram: Histogram<u64>,
|
|
||||||
/// exponentially weighted moving average of how many milliseconds behind the fastest node we are
|
/// exponentially weighted moving average of how many milliseconds behind the fastest node we are
|
||||||
pub ewma: ewma::EWMA,
|
ewma: ewma::EWMA,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Serialize for Latency {
|
impl Serialize for Latency {
|
||||||
@ -41,51 +39,52 @@ impl Serialize for Latency {
|
|||||||
where
|
where
|
||||||
S: Serializer,
|
S: Serializer,
|
||||||
{
|
{
|
||||||
let mut state = serializer.serialize_struct("latency", 6)?;
|
serializer.serialize_f64(self.ewma.value())
|
||||||
|
|
||||||
state.serialize_field("ewma_ms", &self.ewma.value())?;
|
|
||||||
|
|
||||||
state.serialize_field("histogram_len", &self.histogram.len())?;
|
|
||||||
state.serialize_field("mean_ms", &self.histogram.mean())?;
|
|
||||||
state.serialize_field("p50_ms", &self.histogram.value_at_quantile(0.50))?;
|
|
||||||
state.serialize_field("p75_ms", &self.histogram.value_at_quantile(0.75))?;
|
|
||||||
state.serialize_field("p99_ms", &self.histogram.value_at_quantile(0.99))?;
|
|
||||||
|
|
||||||
state.end()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Latency {
|
impl Latency {
|
||||||
pub fn record(&mut self, milliseconds: f64) {
|
#[inline(always)]
|
||||||
self.ewma.add(milliseconds);
|
pub fn record(&mut self, duration: Duration) {
|
||||||
|
self.record_ms(duration.as_secs_f64() * 1000.0);
|
||||||
|
}
|
||||||
|
|
||||||
// histogram needs ints and not floats
|
#[inline(always)]
|
||||||
self.histogram.record(milliseconds as u64).unwrap();
|
pub fn record_ms(&mut self, milliseconds: f64) {
|
||||||
|
self.ewma.add(milliseconds);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline(always)]
|
||||||
|
pub fn value(&self) -> f64 {
|
||||||
|
self.ewma.value()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for Latency {
|
impl Default for Latency {
|
||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
// TODO: what should the default sigfig be?
|
|
||||||
let sigfig = 0;
|
|
||||||
|
|
||||||
// TODO: what should the default span be? 25 requests? have a "new"
|
// TODO: what should the default span be? 25 requests? have a "new"
|
||||||
let span = 25.0;
|
let span = 25.0;
|
||||||
|
|
||||||
Self::new(sigfig, span).expect("default histogram sigfigs should always work")
|
let start = 1000.0;
|
||||||
|
|
||||||
|
Self::new(span, start)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Latency {
|
impl Latency {
|
||||||
pub fn new(sigfig: u8, span: f64) -> Result<Self, hdrhistogram::CreationError> {
|
// depending on the span, start might not be perfect
|
||||||
|
pub fn new(span: f64, start: f64) -> Self {
|
||||||
let alpha = Self::span_to_alpha(span);
|
let alpha = Self::span_to_alpha(span);
|
||||||
|
|
||||||
let histogram = Histogram::new(sigfig)?;
|
let mut ewma = ewma::EWMA::new(alpha);
|
||||||
|
|
||||||
Ok(Self {
|
if start > 0.0 {
|
||||||
histogram,
|
for _ in 0..(span as u64) {
|
||||||
ewma: ewma::EWMA::new(alpha),
|
ewma.add(start);
|
||||||
})
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Self { ewma }
|
||||||
}
|
}
|
||||||
|
|
||||||
fn span_to_alpha(span: f64) -> f64 {
|
fn span_to_alpha(span: f64) -> f64 {
|
||||||
@ -127,11 +126,13 @@ pub struct Web3Rpc {
|
|||||||
pub(super) head_block: RwLock<Option<Web3ProxyBlock>>,
|
pub(super) head_block: RwLock<Option<Web3ProxyBlock>>,
|
||||||
/// Track head block latency
|
/// Track head block latency
|
||||||
pub(super) head_latency: RwLock<Latency>,
|
pub(super) head_latency: RwLock<Latency>,
|
||||||
/// Track request latency
|
// /// Track request latency
|
||||||
pub(super) request_latency: RwLock<Latency>,
|
// /// TODO: refactor this. this lock kills perf. for now just use head_latency
|
||||||
|
// pub(super) request_latency: RwLock<Latency>,
|
||||||
/// Track total requests served
|
/// Track total requests served
|
||||||
/// TODO: maybe move this to graphana
|
/// TODO: maybe move this to graphana
|
||||||
pub(super) total_requests: AtomicUsize,
|
pub(super) total_requests: AtomicUsize,
|
||||||
|
pub(super) active_requests: AtomicUsize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Web3Rpc {
|
impl Web3Rpc {
|
||||||
@ -259,6 +260,18 @@ impl Web3Rpc {
|
|||||||
Ok((new_connection, handle))
|
Ok((new_connection, handle))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn peak_ewma(&self) -> OrderedFloat<f64> {
|
||||||
|
// TODO: use request instead of head latency? that was killing perf though
|
||||||
|
let head_ewma = self.head_latency.read().value();
|
||||||
|
|
||||||
|
// TODO: what ordering?
|
||||||
|
let active_requests = self.active_requests.load(atomic::Ordering::Relaxed) as f64;
|
||||||
|
|
||||||
|
// TODO: i'm not sure head * active is exactly right. but we'll see
|
||||||
|
// TODO: i don't think this actually counts as peak. investigate with atomics.rs and peak_ewma.rs
|
||||||
|
OrderedFloat(head_ewma * active_requests)
|
||||||
|
}
|
||||||
|
|
||||||
// TODO: would be great if rpcs exposed this. see https://github.com/ledgerwatch/erigon/issues/6391
|
// TODO: would be great if rpcs exposed this. see https://github.com/ledgerwatch/erigon/issues/6391
|
||||||
async fn check_block_data_limit(
|
async fn check_block_data_limit(
|
||||||
self: &Arc<Self>,
|
self: &Arc<Self>,
|
||||||
@ -671,14 +684,9 @@ impl Web3Rpc {
|
|||||||
|
|
||||||
// TODO: how often? different depending on the chain?
|
// TODO: how often? different depending on the chain?
|
||||||
// TODO: reset this timeout when a new block is seen? we need to keep request_latency updated though
|
// TODO: reset this timeout when a new block is seen? we need to keep request_latency updated though
|
||||||
// let health_sleep_seconds = 10;
|
let health_sleep_seconds = 10;
|
||||||
|
|
||||||
futures::future::pending::<()>().await;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
|
|
||||||
// TODO: benchmark this and lock contention
|
// TODO: benchmark this and lock contention
|
||||||
/*
|
|
||||||
let mut old_total_requests = 0;
|
let mut old_total_requests = 0;
|
||||||
let mut new_total_requests;
|
let mut new_total_requests;
|
||||||
|
|
||||||
@ -696,6 +704,7 @@ impl Web3Rpc {
|
|||||||
|
|
||||||
if new_total_requests - old_total_requests < 10 {
|
if new_total_requests - old_total_requests < 10 {
|
||||||
// TODO: if this fails too many times, reset the connection
|
// TODO: if this fails too many times, reset the connection
|
||||||
|
// TODO: move this into a function and the chaining should be easier
|
||||||
let head_block = conn.head_block.read().clone();
|
let head_block = conn.head_block.read().clone();
|
||||||
|
|
||||||
if let Some((block_hash, txid)) = head_block.and_then(|x| {
|
if let Some((block_hash, txid)) = head_block.and_then(|x| {
|
||||||
@ -706,28 +715,65 @@ impl Web3Rpc {
|
|||||||
|
|
||||||
Some((block_hash, txid))
|
Some((block_hash, txid))
|
||||||
}) {
|
}) {
|
||||||
let authorization = authorization.clone();
|
let to = conn
|
||||||
let conn = conn.clone();
|
.wait_for_query::<_, Option<Transaction>>(
|
||||||
|
|
||||||
let x = async move {
|
|
||||||
conn.try_request_handle(&authorization, Some(client)).await
|
|
||||||
}
|
|
||||||
.await;
|
|
||||||
|
|
||||||
if let Ok(OpenRequestResult::Handle(x)) = x {
|
|
||||||
if let Ok(Some(x)) = x
|
|
||||||
.request::<_, Option<Transaction>>(
|
|
||||||
"eth_getTransactionByHash",
|
"eth_getTransactionByHash",
|
||||||
&(txid,),
|
&(txid,),
|
||||||
revert_handler,
|
revert_handler,
|
||||||
None,
|
authorization.clone(),
|
||||||
|
Some(client.clone()),
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
{
|
.and_then(|tx| {
|
||||||
// TODO: make this flatter
|
let tx = tx.context("no transaction found")?;
|
||||||
// TODO: do more (fair, not random) things here
|
|
||||||
// let = x.request("eth_getCode", (tx.to.unwrap_or(Address::zero()), block_hash), RequestRevertHandler::ErrorLevel, Some(client.clone()))
|
// TODO: what default? something real?
|
||||||
|
let to = tx.to.unwrap_or_else(|| {
|
||||||
|
"0xdead00000000000000000000000000000000beef"
|
||||||
|
.parse::<Address>()
|
||||||
|
.expect("deafbeef")
|
||||||
|
});
|
||||||
|
|
||||||
|
Ok(to)
|
||||||
|
});
|
||||||
|
|
||||||
|
let code = match to {
|
||||||
|
Err(err) => {
|
||||||
|
if conn.backup {
|
||||||
|
debug!(
|
||||||
|
"{} failed health check query! {:#?}",
|
||||||
|
conn, err
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
warn!(
|
||||||
|
"{} failed health check query! {:#?}",
|
||||||
|
conn, err
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
Ok(to) => {
|
||||||
|
conn.wait_for_query::<_, Option<Bytes>>(
|
||||||
|
"eth_getCode",
|
||||||
|
&(to, block_hash),
|
||||||
|
revert_handler,
|
||||||
|
authorization.clone(),
|
||||||
|
Some(client),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Err(err) = code {
|
||||||
|
if conn.backup {
|
||||||
|
debug!(
|
||||||
|
"{} failed health check query! {:#?}",
|
||||||
|
conn, err
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
warn!("{} failed health check query! {:#?}", conn, err);
|
||||||
|
}
|
||||||
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -735,7 +781,6 @@ impl Web3Rpc {
|
|||||||
old_total_requests = new_total_requests;
|
old_total_requests = new_total_requests;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
*/
|
|
||||||
};
|
};
|
||||||
|
|
||||||
futures.push(flatten_handle(tokio::spawn(f)));
|
futures.push(flatten_handle(tokio::spawn(f)));
|
||||||
@ -1144,6 +1189,26 @@ impl Web3Rpc {
|
|||||||
|
|
||||||
Ok(OpenRequestResult::Handle(handle))
|
Ok(OpenRequestResult::Handle(handle))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn wait_for_query<P, R>(
|
||||||
|
self: &Arc<Self>,
|
||||||
|
method: &str,
|
||||||
|
params: &P,
|
||||||
|
revert_handler: RequestRevertHandler,
|
||||||
|
authorization: Arc<Authorization>,
|
||||||
|
unlocked_provider: Option<Arc<Web3Provider>>,
|
||||||
|
) -> anyhow::Result<R>
|
||||||
|
where
|
||||||
|
// TODO: not sure about this type. would be better to not need clones, but measure and spawns combine to need it
|
||||||
|
P: Clone + fmt::Debug + serde::Serialize + Send + Sync + 'static,
|
||||||
|
R: serde::Serialize + serde::de::DeserializeOwned + fmt::Debug,
|
||||||
|
{
|
||||||
|
self.wait_for_request_handle(&authorization, None, None)
|
||||||
|
.await?
|
||||||
|
.request::<P, R>(method, params, revert_handler, unlocked_provider)
|
||||||
|
.await
|
||||||
|
.context("ProviderError from the backend")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl fmt::Debug for Web3Provider {
|
impl fmt::Debug for Web3Provider {
|
||||||
@ -1211,9 +1276,7 @@ impl Serialize for Web3Rpc {
|
|||||||
// TODO: maybe this is too much data. serialize less?
|
// TODO: maybe this is too much data. serialize less?
|
||||||
state.serialize_field("head_block", &*self.head_block.read())?;
|
state.serialize_field("head_block", &*self.head_block.read())?;
|
||||||
|
|
||||||
state.serialize_field("head_latency", &*self.head_latency.read())?;
|
state.serialize_field("head_latency", &self.head_latency.read().value())?;
|
||||||
|
|
||||||
state.serialize_field("request_latency", &*self.request_latency.read())?;
|
|
||||||
|
|
||||||
state.serialize_field(
|
state.serialize_field(
|
||||||
"total_requests",
|
"total_requests",
|
||||||
|
@ -139,6 +139,7 @@ impl OpenRequestHandle {
|
|||||||
|
|
||||||
/// Send a web3 request
|
/// Send a web3 request
|
||||||
/// By having the request method here, we ensure that the rate limiter was called and connection counts were properly incremented
|
/// By having the request method here, we ensure that the rate limiter was called and connection counts were properly incremented
|
||||||
|
/// depending on how things are locked, you might need to pass the provider in
|
||||||
pub async fn request<P, R>(
|
pub async fn request<P, R>(
|
||||||
self,
|
self,
|
||||||
method: &str,
|
method: &str,
|
||||||
@ -156,30 +157,23 @@ impl OpenRequestHandle {
|
|||||||
// trace!(rpc=%self.conn, %method, "request");
|
// trace!(rpc=%self.conn, %method, "request");
|
||||||
trace!("requesting from {}", self.rpc);
|
trace!("requesting from {}", self.rpc);
|
||||||
|
|
||||||
let mut provider: Option<Arc<Web3Provider>> = None;
|
let mut provider = if unlocked_provider.is_some() {
|
||||||
|
unlocked_provider
|
||||||
|
} else {
|
||||||
|
self.rpc.provider.read().await.clone()
|
||||||
|
};
|
||||||
|
|
||||||
let mut logged = false;
|
let mut logged = false;
|
||||||
while provider.is_none() {
|
while provider.is_none() {
|
||||||
// trace!("waiting on provider: locking...");
|
// trace!("waiting on provider: locking...");
|
||||||
|
sleep(Duration::from_millis(100)).await;
|
||||||
// TODO: this should *not* be new_head_client. that is dedicated to only new heads
|
|
||||||
if let Some(unlocked_provider) = unlocked_provider {
|
|
||||||
provider = Some(unlocked_provider);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
let unlocked_provider = self.rpc.provider.read().await;
|
|
||||||
|
|
||||||
if let Some(unlocked_provider) = unlocked_provider.clone() {
|
|
||||||
provider = Some(unlocked_provider);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
if !logged {
|
if !logged {
|
||||||
debug!("no provider for open handle on {}", self.rpc);
|
debug!("no provider for open handle on {}", self.rpc);
|
||||||
logged = true;
|
logged = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
sleep(Duration::from_millis(100)).await;
|
provider = self.rpc.provider.read().await.clone();
|
||||||
}
|
}
|
||||||
|
|
||||||
let provider = provider.expect("provider was checked already");
|
let provider = provider.expect("provider was checked already");
|
||||||
@ -188,7 +182,11 @@ impl OpenRequestHandle {
|
|||||||
.total_requests
|
.total_requests
|
||||||
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||||
|
|
||||||
let start = Instant::now();
|
self.rpc
|
||||||
|
.active_requests
|
||||||
|
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||||
|
|
||||||
|
// let latency = Instant::now();
|
||||||
|
|
||||||
// TODO: replace ethers-rs providers with our own that supports streaming the responses
|
// TODO: replace ethers-rs providers with our own that supports streaming the responses
|
||||||
let response = match provider.as_ref() {
|
let response = match provider.as_ref() {
|
||||||
@ -201,6 +199,13 @@ impl OpenRequestHandle {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// note. we intentionally do not record this latency now. we do NOT want to measure errors
|
||||||
|
// let latency = latency.elapsed();
|
||||||
|
|
||||||
|
self.rpc
|
||||||
|
.active_requests
|
||||||
|
.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
|
||||||
|
|
||||||
// // TODO: i think ethers already has trace logging (and does it much more fancy)
|
// // TODO: i think ethers already has trace logging (and does it much more fancy)
|
||||||
// trace!(
|
// trace!(
|
||||||
// "response from {} for {} {:?}: {:?}",
|
// "response from {} for {} {:?}: {:?}",
|
||||||
|
Loading…
Reference in New Issue
Block a user