automated tiers

This commit is contained in:
Bryan Stitt 2023-06-09 12:21:50 -07:00
parent b6ff14210d
commit 1f3040f6c7
10 changed files with 184 additions and 73 deletions

3
Cargo.lock generated

@ -2440,6 +2440,7 @@ checksum = "7f19b9f54f7c7f55e31401bb647626ce0cf0f67b0004982ce815b3ee72a02aa8"
dependencies = [
"base64 0.13.1",
"byteorder",
"crossbeam-channel",
"flate2",
"nom",
"num-traits",
@ -6719,6 +6720,7 @@ dependencies = [
"axum",
"axum-client-ip",
"axum-macros",
"base64 0.21.2",
"chrono",
"console-subscriber",
"counter",
@ -6737,6 +6739,7 @@ dependencies = [
"glob",
"handlebars",
"hashbrown 0.14.0",
"hdrhistogram",
"hostname",
"http",
"hyper",

5
docs/histograms.txt Normal file

@ -0,0 +1,5 @@
[2023-06-09T18:45:50Z WARN web3_proxy::rpcs::consensus] TODO: find the troughs in the histogram: HISTFAAAADV4nC3GoQ0AIAxE0eNAkRAEwbELQxE2QGG7aEeoaL95fz0ZACq8HKbwb/U5bGXystMAZl8EMw==
Paste the HIST data here: <https://hdrhistogram.github.io/HdrHistogramJSDemo/decoding-demo.html>
Save the "Decoded histogram data" to a file and upload it here: <http://hdrhistogram.github.io/HdrHistogram/plotFiles.html>

@ -100,6 +100,8 @@ tracing-subscriber = "0.3"
ulid = { version = "1.0.0", features = ["rand", "uuid", "serde"] }
url = "2.4.0"
uuid = { version = "1.3.3", default-features = false, features = ["fast-rng", "serde", "v4", "zerocopy"] }
hdrhistogram = "7.5.2"
base64 = "0.21.2"
[dev-dependencies]
tokio = { version = "1.28.2", features = ["full", "test-util"] }

@ -310,7 +310,6 @@ mod tests {
Web3RpcConfig {
http_url: Some(anvil.endpoint()),
soft_limit: 100,
tier: 0,
..Default::default()
},
),
@ -319,7 +318,6 @@ mod tests {
Web3RpcConfig {
ws_url: Some(anvil.ws_endpoint()),
soft_limit: 100,
tier: 0,
..Default::default()
},
),

@ -256,9 +256,6 @@ pub struct Web3RpcConfig {
/// only use this rpc if everything else is lagging too far. this allows us to ignore fast but very low limit rpcs
#[serde(default)]
pub backup: bool,
/// All else equal, a server with a lower tier receives all requests
#[serde(default = "default_tier")]
pub tier: u64,
/// Subscribe to the firehose of pending transactions
/// Don't do this with free rpcs
#[serde(default)]
@ -268,10 +265,6 @@ pub struct Web3RpcConfig {
pub extra: HashMap<String, serde_json::Value>,
}
fn default_tier() -> u64 {
0
}
impl Web3RpcConfig {
/// Create a Web3Rpc from config
/// TODO: move this into Web3Rpc? (just need to make things pub(crate))

@ -61,6 +61,7 @@ pub enum Web3ProxyError {
EthersWsClient(ethers::prelude::WsClientError),
FlumeRecv(flume::RecvError),
GasEstimateNotU256,
HdrRecord(hdrhistogram::errors::RecordError),
Headers(headers::Error),
HeaderToString(ToStrError),
Hyper(hyper::Error),
@ -338,6 +339,17 @@ impl Web3ProxyError {
},
)
}
Self::HdrRecord(err) => {
warn!("HdrRecord {:?}", err);
(
StatusCode::INTERNAL_SERVER_ERROR,
JsonRpcErrorData {
message: format!("{}", err).into(),
code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(),
data: None,
},
)
}
Self::Headers(err) => {
warn!("HeadersError {:?}", err);
(

@ -13,7 +13,6 @@ use ethbloom::Input as BloomInput;
use ethers::abi::AbiEncode;
use ethers::types::{Address, TransactionReceipt, ValueOrArray, H256};
use hashbrown::HashMap;
// use http::StatusCode;
use http::StatusCode;
use log::{debug, info, trace};
use migration::sea_orm::prelude::Decimal;
@ -21,7 +20,6 @@ use migration::sea_orm::{
self, ActiveModelTrait, ColumnTrait, EntityTrait, QueryFilter, TransactionTrait,
};
use migration::{Expr, OnConflict};
use num_traits::Pow;
use payment_contracts::ierc20::IERC20;
use payment_contracts::payment_factory::{self, PaymentFactory};
use serde_json::json;

@ -3,27 +3,31 @@ use super::many::Web3Rpcs;
use super::one::Web3Rpc;
use crate::errors::{Web3ProxyErrorContext, Web3ProxyResult};
use crate::frontend::authorization::Authorization;
use anyhow::Context;
use base64::engine::general_purpose;
use derive_more::Constructor;
use ethers::prelude::{H256, U64};
use hashbrown::{HashMap, HashSet};
use hdrhistogram::serialization::{Serializer, V2DeflateSerializer};
use hdrhistogram::Histogram;
use itertools::{Itertools, MinMaxResult};
use log::{trace, warn};
use log::{debug, log_enabled, trace, warn, Level};
use moka::future::Cache;
use serde::Serialize;
use std::cmp::{Ordering, Reverse};
use std::collections::BTreeMap;
use std::fmt;
use std::sync::Arc;
use std::sync::{atomic, Arc};
use tokio::time::Instant;
#[derive(Clone, Serialize)]
struct RpcData {
struct ConsensusRpcData {
head_block_num: U64,
// TODO: this is too simple. erigon has 4 prune levels (hrct)
oldest_block_num: U64,
}
impl RpcData {
impl ConsensusRpcData {
fn new(rpc: &Web3Rpc, head: &Web3ProxyBlock) -> Self {
let head_block_num = *head.number();
@ -45,13 +49,13 @@ impl RpcData {
#[derive(Constructor, Clone, Copy, Debug, Default, Eq, PartialEq, Serialize)]
pub struct RpcRanking {
tier: u64,
tier: u8,
backup: bool,
head_num: Option<U64>,
}
impl RpcRanking {
pub fn add_offset(&self, offset: u64) -> Self {
pub fn add_offset(&self, offset: u8) -> Self {
Self {
tier: self.tier + offset,
backup: self.backup,
@ -66,9 +70,10 @@ impl RpcRanking {
}
}
fn sort_key(&self) -> (u64, bool, Reverse<Option<U64>>) {
fn sort_key(&self) -> (bool, u8, Reverse<Option<U64>>) {
// TODO: add soft_limit here? add peak_ewma here?
(self.tier, !self.backup, Reverse(self.head_num))
// TODO: should backup or tier be checked first? now that tiers are automated, backups
(!self.backup, self.tier, Reverse(self.head_num))
}
}
@ -94,9 +99,10 @@ pub enum ShouldWaitForBlock {
/// A collection of Web3Rpcs that are on the same block.
/// Serialize is so we can print it on our /status endpoint
/// TODO: one data structure of head_rpcs and other_rpcs that is sorted best first
#[derive(Clone, Serialize)]
pub struct ConsensusWeb3Rpcs {
pub(crate) tier: u64,
pub(crate) tier: u8,
pub(crate) backups_needed: bool,
// TODO: this is already inside best_rpcs. Don't skip, instead make a shorter serialize
@ -112,7 +118,7 @@ pub struct ConsensusWeb3Rpcs {
// TODO: make this work. the key needs to be a string. I think we need `serialize_with`
#[serde(skip_serializing)]
rpc_data: HashMap<Arc<Web3Rpc>, RpcData>,
rpc_data: HashMap<Arc<Web3Rpc>, ConsensusRpcData>,
}
impl ConsensusWeb3Rpcs {
@ -332,26 +338,20 @@ type FirstSeenCache = Cache<H256, Instant>;
/// A ConsensusConnections builder that tracks all connection heads across multiple groups of servers
pub struct ConsensusFinder {
/// backups for all tiers are only used if necessary
/// `tiers[0] = only tier 0`
/// `tiers[1] = tier 0 and tier 1`
/// `tiers[n] = tier 0..=n`
rpc_heads: HashMap<Arc<Web3Rpc>, Web3ProxyBlock>,
/// never serve blocks that are too old
max_block_age: Option<u64>,
/// tier 0 will be prefered as long as the distance between it and the other tiers is <= max_tier_lag
max_block_lag: Option<U64>,
/// used to track rpc.head_latency. The same cache should be shared between all ConnectionsGroups
/// Block Hash -> First Seen Instant. used to track rpc.head_latency. The same cache should be shared between all ConnectionsGroups
first_seen: FirstSeenCache,
}
impl ConsensusFinder {
pub fn new(max_block_age: Option<u64>, max_block_lag: Option<U64>) -> Self {
// TODO: what's a good capacity for this? it shouldn't need to be very large
// TODO: if we change Web3ProxyBlock to store the instance, i think we could use the block_by_hash cache
let first_seen = Cache::new(16);
// TODO: hard coding 0-9 isn't great, but its easier than refactoring this to be smart about config reloading
let rpc_heads = HashMap::new();
Self {
@ -433,11 +433,100 @@ impl ConsensusFinder {
Ok(changed)
}
pub async fn update_tiers(
&mut self,
authorization: &Arc<Authorization>,
web3_rpcs: &Web3Rpcs,
) -> Web3ProxyResult<()> {
match self.rpc_heads.len() {
0 => {}
1 => {
for rpc in self.rpc_heads.keys() {
rpc.tier.store(0, atomic::Ordering::Relaxed)
}
}
_ => {
// iterate first to find bounds
let mut min_latency = u64::MAX;
let mut max_latency = u64::MIN;
let mut weighted_latencies = HashMap::new();
for rpc in self.rpc_heads.keys() {
let weighted_latency_seconds = rpc.weighted_peak_ewma_seconds();
let weighted_latency_ms = (weighted_latency_seconds * 1000.0).round() as i64;
let weighted_latency_ms: u64 = weighted_latency_ms
.try_into()
.context("weighted_latency_ms does not fit in a u64")?;
min_latency = min_latency.min(weighted_latency_ms);
max_latency = min_latency.max(weighted_latency_ms);
weighted_latencies.insert(rpc, weighted_latency_ms);
}
// // histogram requires high to be at least 2 x low
// // using min_latency for low does not work how we want it though
// max_latency = max_latency.max(2 * min_latency);
// create the histogram
let mut hist = Histogram::<u32>::new_with_bounds(1, max_latency, 3).unwrap();
for weighted_latency_ms in weighted_latencies.values() {
hist.record(*weighted_latency_ms)?;
}
// dev logging
if log_enabled!(Level::Trace) {
// print the histogram. see docs/histograms.txt for more info
let mut encoder =
base64::write::EncoderWriter::new(Vec::new(), &general_purpose::STANDARD);
V2DeflateSerializer::new()
.serialize(&hist, &mut encoder)
.unwrap();
let encoded = encoder.finish().unwrap();
let encoded = String::from_utf8(encoded).unwrap();
trace!("weighted_latencies: {}", encoded);
}
// TODO: get someone who is better at math to do something smarter
// this is not a very good use of stddev, but it works for now
let stddev = hist.stdev();
for (rpc, weighted_latency_ms) in weighted_latencies.into_iter() {
let tier = (weighted_latency_ms - min_latency) as f64 / stddev;
let tier = tier.floor() as u64;
let tier = tier.clamp(u8::MIN.into(), u8::MAX.into()) as u8;
// TODO: this should be trace
trace!(
"{} - weighted_latency: {}ms, tier {}",
rpc,
weighted_latency_ms,
tier
);
rpc.tier.store(tier, atomic::Ordering::Relaxed);
}
}
}
Ok(())
}
pub async fn find_consensus_connections(
&mut self,
authorization: &Arc<Authorization>,
web3_rpcs: &Web3Rpcs,
) -> Web3ProxyResult<Option<ConsensusWeb3Rpcs>> {
self.update_tiers(authorization, web3_rpcs).await?;
let minmax_block = self.rpc_heads.values().minmax_by_key(|&x| x.number());
let (lowest_block, highest_block) = match minmax_block {
@ -477,13 +566,14 @@ impl ConsensusFinder {
let mut backup_consensus = None;
let mut rpc_heads_by_tier: Vec<_> = self.rpc_heads.iter().collect();
rpc_heads_by_tier.sort_by_cached_key(|(rpc, _)| rpc.tier);
rpc_heads_by_tier.sort_by_cached_key(|(rpc, _)| rpc.tier.load(atomic::Ordering::Relaxed));
let current_tier = rpc_heads_by_tier
.first()
.expect("rpc_heads_by_tier should never be empty")
.0
.tier;
.tier
.load(atomic::Ordering::Relaxed);
// trace!("first_tier: {}", current_tier);
@ -492,7 +582,9 @@ impl ConsensusFinder {
// loop over all the rpc heads (grouped by tier) and their parents to find consensus
// TODO: i'm sure theres a lot of shortcuts that could be taken, but this is simplest to implement
for (rpc, rpc_head) in rpc_heads_by_tier.into_iter() {
if current_tier != rpc.tier {
let rpc_tier = rpc.tier.load(atomic::Ordering::Relaxed);
if current_tier != rpc_tier {
// we finished processing a tier. check for primary results
if let Some(consensus) = self.count_votes(&primary_votes, web3_rpcs) {
trace!("found enough votes on tier {}", current_tier);
@ -600,7 +692,7 @@ impl ConsensusFinder {
let tier = consensus_rpcs
.iter()
.map(|x| x.tier)
.map(|x| x.tier.load(atomic::Ordering::Relaxed))
.max()
.expect("there should always be a max");
@ -615,7 +707,11 @@ impl ConsensusFinder {
{
let x_head_num = *x_head.number();
let key: RpcRanking = RpcRanking::new(x.tier, x.backup, Some(x_head_num));
let key: RpcRanking = RpcRanking::new(
x.tier.load(atomic::Ordering::Relaxed),
x.backup,
Some(x_head_num),
);
other_rpcs
.entry(key)
@ -627,7 +723,7 @@ impl ConsensusFinder {
let mut rpc_data = HashMap::with_capacity(self.rpc_heads.len());
for (x, x_head) in self.rpc_heads.iter() {
let y = RpcData::new(x, x_head);
let y = ConsensusRpcData::new(x, x_head);
rpc_data.insert(x.clone(), y);
}
@ -647,8 +743,11 @@ impl ConsensusFinder {
None
}
pub fn worst_tier(&self) -> Option<u64> {
self.rpc_heads.iter().map(|(x, _)| x.tier).max()
pub fn worst_tier(&self) -> Option<u8> {
self.rpc_heads
.iter()
.map(|(x, _)| x.tier.load(atomic::Ordering::Relaxed))
.max()
}
}

@ -31,7 +31,7 @@ use serde_json::value::RawValue;
use std::borrow::Cow;
use std::cmp::{min_by_key, Reverse};
use std::fmt::{self, Display};
use std::sync::atomic::Ordering;
use std::sync::atomic::{self, Ordering};
use std::sync::Arc;
use thread_fast_rng::rand::seq::SliceRandom;
use tokio::select;
@ -440,7 +440,7 @@ impl Web3Rpcs {
trace!("{} vs {}", rpc_a, rpc_b);
// TODO: cached key to save a read lock
// TODO: ties to the server with the smallest block_data_limit
let faster_rpc = min_by_key(rpc_a, rpc_b, |x| x.peak_ewma());
let faster_rpc = min_by_key(rpc_a, rpc_b, |x| x.weighted_peak_ewma_seconds());
trace!("winner: {}", faster_rpc);
// add to the skip list in case this one fails
@ -1288,20 +1288,21 @@ impl Serialize for Web3Rpcs {
/// TODO: should this be moved into a `impl Web3Rpc`?
/// TODO: i think we still have sorts scattered around the code that should use this
/// TODO: take AsRef or something like that? We don't need an Arc here
fn rpc_sync_status_sort_key(x: &Arc<Web3Rpc>) -> (Reverse<U64>, u64, bool, OrderedFloat<f64>) {
/// TODO: tests on this!
fn rpc_sync_status_sort_key(x: &Arc<Web3Rpc>) -> (bool, Reverse<U64>, u8, OrderedFloat<f64>) {
let head_block = x
.head_block
.as_ref()
.and_then(|x| x.borrow().as_ref().map(|x| *x.number()))
.unwrap_or_default();
let tier = x.tier;
let tier = x.tier.load(atomic::Ordering::Relaxed);
let peak_ewma = x.peak_ewma();
let peak_ewma = x.weighted_peak_ewma_seconds();
let backup = x.backup;
(Reverse(head_block), tier, backup, peak_ewma)
(!backup, Reverse(head_block), tier, peak_ewma)
}
mod tests {
@ -1361,42 +1362,42 @@ mod tests {
let mut rpcs: Vec<_> = [
Web3Rpc {
name: "a".to_string(),
tier: 0,
// tier: 0,
head_block: Some(tx_a),
peak_latency: Some(new_peak_latency()),
..Default::default()
},
Web3Rpc {
name: "b".to_string(),
tier: 0,
// tier: 0,
head_block: Some(tx_b),
peak_latency: Some(new_peak_latency()),
..Default::default()
},
Web3Rpc {
name: "c".to_string(),
tier: 0,
// tier: 0,
head_block: Some(tx_c),
peak_latency: Some(new_peak_latency()),
..Default::default()
},
Web3Rpc {
name: "d".to_string(),
tier: 1,
// tier: 1,
head_block: Some(tx_d),
peak_latency: Some(new_peak_latency()),
..Default::default()
},
Web3Rpc {
name: "e".to_string(),
tier: 1,
// tier: 1,
head_block: Some(tx_e),
peak_latency: Some(new_peak_latency()),
..Default::default()
},
Web3Rpc {
name: "f".to_string(),
tier: 1,
// tier: 1,
head_block: Some(tx_f),
peak_latency: Some(new_peak_latency()),
..Default::default()
@ -1410,6 +1411,7 @@ mod tests {
let names_in_sort_order: Vec<_> = rpcs.iter().map(|x| x.name.as_str()).collect();
// TODO: the tier refactor likely broke this
assert_eq!(names_in_sort_order, ["c", "f", "b", "e", "a", "d"]);
}
@ -1452,7 +1454,7 @@ mod tests {
automatic_block_limit: false,
backup: false,
block_data_limit: block_data_limit.into(),
tier: 0,
// tier: 0,
head_block: Some(tx_synced),
peak_latency: Some(new_peak_latency()),
..Default::default()
@ -1466,7 +1468,7 @@ mod tests {
automatic_block_limit: false,
backup: false,
block_data_limit: block_data_limit.into(),
tier: 0,
// tier: 0,
head_block: Some(tx_lagged),
peak_latency: Some(new_peak_latency()),
..Default::default()
@ -1736,7 +1738,7 @@ mod tests {
automatic_block_limit: false,
backup: false,
block_data_limit: 64.into(),
tier: 1,
// tier: 1,
head_block: Some(tx_pruned),
..Default::default()
};
@ -1749,7 +1751,7 @@ mod tests {
automatic_block_limit: false,
backup: false,
block_data_limit: u64::MAX.into(),
tier: 2,
// tier: 2,
head_block: Some(tx_archive),
..Default::default()
};
@ -1918,7 +1920,7 @@ mod tests {
automatic_block_limit: false,
backup: false,
block_data_limit: 64.into(),
tier: 0,
// tier: 0,
head_block: Some(tx_mock_geth),
peak_latency: Some(new_peak_latency()),
..Default::default()
@ -1930,7 +1932,7 @@ mod tests {
automatic_block_limit: false,
backup: false,
block_data_limit: u64::MAX.into(),
tier: 1,
// tier: 1,
head_block: Some(tx_mock_erigon_archive),
peak_latency: Some(new_peak_latency()),
..Default::default()

@ -24,7 +24,7 @@ use serde::Serialize;
use serde_json::json;
use std::fmt;
use std::hash::{Hash, Hasher};
use std::sync::atomic::{self, AtomicU64, AtomicUsize};
use std::sync::atomic::{self, AtomicU64, AtomicU8, AtomicUsize};
use std::{cmp::Ordering, sync::Arc};
use tokio::sync::watch;
use tokio::time::{sleep, sleep_until, timeout, Duration, Instant};
@ -54,8 +54,6 @@ pub struct Web3Rpc {
pub backup: bool,
/// TODO: have an enum for this so that "no limit" prints pretty?
pub(super) block_data_limit: AtomicU64,
/// Lower tiers are higher priority when sending requests
pub(super) tier: u64,
/// TODO: change this to a watch channel so that http providers can subscribe and take action on change.
/// this is only inside an Option so that the "Default" derive works. it will always be set.
pub(super) head_block: Option<watch::Sender<Option<Web3ProxyBlock>>>,
@ -64,6 +62,8 @@ pub struct Web3Rpc {
/// Track peak request latency
/// This is only inside an Option so that the "Default" derive works. it will always be set.
pub(super) peak_latency: Option<PeakEwmaLatency>,
/// Automatically set priority
pub(super) tier: AtomicU8,
/// Track total requests served
/// TODO: maybe move this to graphana
pub(super) total_requests: AtomicUsize,
@ -196,7 +196,6 @@ impl Web3Rpc {
name,
peak_latency: Some(peak_latency),
soft_limit: config.soft_limit,
tier: config.tier,
ws_provider,
disconnect_watch: Some(disconnect_watch),
..Default::default()
@ -219,7 +218,7 @@ impl Web3Rpc {
Ok((new_connection, handle))
}
pub fn peak_ewma(&self) -> OrderedFloat<f64> {
pub fn weighted_peak_ewma_seconds(&self) -> OrderedFloat<f64> {
let peak_latency = if let Some(peak_latency) = self.peak_latency.as_ref() {
peak_latency.latency().as_secs_f64()
} else {
@ -538,7 +537,7 @@ impl Web3Rpc {
// TODO: how often? different depending on the chain?
// TODO: reset this timeout when a new block is seen? we need to keep request_latency updated though
let health_sleep_seconds = 10;
let health_sleep_seconds = 5;
// health check loop
let f = async move {
@ -550,7 +549,7 @@ impl Web3Rpc {
while !rpc.should_disconnect() {
new_total_requests = rpc.total_requests.load(atomic::Ordering::Relaxed);
if new_total_requests - old_total_requests < 10 {
if new_total_requests - old_total_requests < 5 {
// TODO: if this fails too many times, reset the connection
// TODO: move this into a function and the chaining should be easier
if let Err(err) = rpc.healthcheck(error_handler).await {
@ -900,18 +899,20 @@ impl Web3Rpc {
impl Hash for Web3Rpc {
fn hash<H: Hasher>(&self, state: &mut H) {
self.name.hash(state);
self.display_name.hash(state);
self.http_provider.as_ref().map(|x| x.url()).hash(state);
// TODO: figure out how to get the url for the provider
// TODO: url does NOT include the authorization data. i think created_at should protect us if auth changes without anything else
// self.ws_provider.map(|x| x.url()).hash(state);
self.automatic_block_limit.hash(state);
// do not include automatic block limit because it can change
// do not include tier because it can change
self.backup.hash(state);
self.created_at.hash(state);
self.display_name.hash(state);
self.name.hash(state);
// TODO: url does NOT include the authorization data. i think created_at should protect us if auth changes without anything else
self.http_provider.as_ref().map(|x| x.url()).hash(state);
// TODO: figure out how to get the url for the ws provider
// self.ws_provider.map(|x| x.url()).hash(state);
// TODO: don't include soft_limit if we change them to be dynamic
self.soft_limit.hash(state);
self.tier.hash(state);
self.created_at.hash(state);
}
}
@ -988,7 +989,7 @@ impl Serialize for Web3Rpc {
&self.peak_latency.as_ref().unwrap().latency().as_millis(),
)?;
state.serialize_field("peak_ewma_s", self.peak_ewma().as_ref())?;
state.serialize_field("peak_ewma_s", self.weighted_peak_ewma_seconds().as_ref())?;
state.end()
}
@ -1047,7 +1048,6 @@ mod tests {
automatic_block_limit: false,
backup: false,
block_data_limit: block_data_limit.into(),
tier: 0,
head_block: Some(tx),
..Default::default()
};
@ -1082,7 +1082,6 @@ mod tests {
automatic_block_limit: false,
backup: false,
block_data_limit: block_data_limit.into(),
tier: 0,
head_block: Some(tx),
..Default::default()
};