i think it works

This commit is contained in:
Bryan Stitt 2023-02-15 12:33:43 -08:00
parent df668a5dfd
commit 1fb4dd6ccc
5 changed files with 174 additions and 70 deletions

@ -415,6 +415,8 @@ impl Web3Rpcs {
// TODO: what should we do if the block number of new_synced_connections is < old_synced_connections? wait?
let consensus_tier = new_synced_connections.tier;
let total_tiers = consensus_finder.len();
let backups_needed = new_synced_connections.backups_needed;
let consensus_head_block = new_synced_connections.head_block.clone();
let num_consensus_rpcs = new_synced_connections.num_conns();
@ -434,7 +436,9 @@ impl Web3Rpcs {
match &old_consensus_head_connections.head_block {
None => {
debug!(
"first {}{}/{}/{} block={}, rpc={}",
"first {}/{} {}{}/{}/{} block={}, rpc={}",
consensus_tier,
total_tiers,
backups_voted_str,
num_consensus_rpcs,
num_active_rpcs,
@ -469,7 +473,9 @@ impl Web3Rpcs {
// no change in hash. no need to use head_block_sender
// TODO: trace level if rpc is backup
debug!(
"con {}{}/{}/{} con={} rpc={}@{}",
"con {}/{} {}{}/{}/{} con={} rpc={}@{}",
consensus_tier,
total_tiers,
backups_voted_str,
num_consensus_rpcs,
num_active_rpcs,
@ -486,7 +492,9 @@ impl Web3Rpcs {
}
debug!(
"unc {}{}/{}/{} con_head={} old={} rpc={}@{}",
"unc {}/{} {}{}/{}/{} con_head={} old={} rpc={}@{}",
consensus_tier,
total_tiers,
backups_voted_str,
num_consensus_rpcs,
num_active_rpcs,
@ -511,7 +519,9 @@ impl Web3Rpcs {
// this is unlikely but possible
// TODO: better log
warn!(
"chain rolled back {}{}/{}/{} con={} old={} rpc={}@{}",
"chain rolled back {}/{} {}{}/{}/{} con={} old={} rpc={}@{}",
consensus_tier,
total_tiers,
backups_voted_str,
num_consensus_rpcs,
num_active_rpcs,
@ -541,7 +551,9 @@ impl Web3Rpcs {
}
Ordering::Greater => {
debug!(
"new {}{}/{}/{} con={} rpc={}@{}",
"new {}/{} {}{}/{}/{} con={} rpc={}@{}",
consensus_tier,
total_tiers,
backups_voted_str,
num_consensus_rpcs,
num_active_rpcs,
@ -573,7 +585,9 @@ impl Web3Rpcs {
if num_active_rpcs >= self.min_head_rpcs {
// no consensus!!!
error!(
"non {}{}/{}/{} rpc={}@{}",
"non {}/{} {}{}/{}/{} rpc={}@{}",
consensus_tier,
total_tiers,
backups_voted_str,
num_consensus_rpcs,
num_active_rpcs,
@ -584,7 +598,9 @@ impl Web3Rpcs {
} else {
// no consensus, but we do not have enough rpcs connected yet to panic
debug!(
"non {}{}/{}/{} rpc={}@{}",
"non {}/{} {}{}/{}/{} rpc={}@{}",
consensus_tier,
total_tiers,
backups_voted_str,
num_consensus_rpcs,
num_active_rpcs,

@ -7,15 +7,18 @@ use anyhow::Context;
use ethers::prelude::{H256, U64};
use hashbrown::{HashMap, HashSet};
use log::{debug, trace, warn};
use moka::future::Cache;
use serde::Serialize;
use std::collections::BTreeMap;
use std::fmt;
use std::sync::Arc;
use tokio::time::Instant;
/// A collection of Web3Rpcs that are on the same block.
/// Serialize is so we can print it on our debug endpoint
#[derive(Clone, Default, Serialize)]
pub struct ConsensusWeb3Rpcs {
pub(super) tier: u64,
pub(super) head_block: Option<Web3ProxyBlock>,
// TODO: this should be able to serialize, but it isn't
#[serde(skip_serializing)]
@ -74,22 +77,25 @@ impl Web3Rpcs {
}
}
type FirstSeenCache = Cache<H256, Instant, hashbrown::hash_map::DefaultHashBuilder>;
pub struct ConnectionsGroup {
rpc_name_to_block: HashMap<String, Web3ProxyBlock>,
// TODO: what if there are two blocks with the same number?
highest_block: Option<Web3ProxyBlock>,
}
impl Default for ConnectionsGroup {
fn default() -> Self {
Self {
rpc_name_to_block: Default::default(),
highest_block: Default::default(),
}
}
/// used to track rpc.head_latency. The same cache should be shared between all ConnectionsGroups
first_seen: FirstSeenCache,
}
impl ConnectionsGroup {
pub fn new(first_seen: FirstSeenCache) -> Self {
Self {
rpc_name_to_block: Default::default(),
highest_block: Default::default(),
first_seen,
}
}
pub fn len(&self) -> usize {
self.rpc_name_to_block.len()
}
@ -115,7 +121,17 @@ impl ConnectionsGroup {
}
}
fn insert(&mut self, rpc: &Web3Rpc, block: Web3ProxyBlock) -> Option<Web3ProxyBlock> {
async fn insert(&mut self, rpc: &Web3Rpc, block: Web3ProxyBlock) -> Option<Web3ProxyBlock> {
let first_seen = self
.first_seen
.get_with(*block.hash(), async move { Instant::now() })
.await;
// TODO: this should be 0 if we are first seen, but i think it will be slightly non-zero
rpc.head_latency
.write()
.record(first_seen.elapsed().as_secs_f64() * 1000.0);
// TODO: what about a reorg to the same height?
if Some(block.number()) > self.highest_block.as_ref().map(|x| x.number()) {
self.highest_block = Some(block.clone());
@ -179,6 +195,7 @@ impl ConnectionsGroup {
authorization: &Arc<Authorization>,
web3_rpcs: &Web3Rpcs,
min_consensus_block_num: Option<U64>,
tier: &u64,
) -> anyhow::Result<ConsensusWeb3Rpcs> {
let mut maybe_head_block = match self.highest_block.clone() {
None => return Err(anyhow::anyhow!("no blocks known")),
@ -191,13 +208,18 @@ impl ConnectionsGroup {
if let Some(min_consensus_block_num) = min_consensus_block_num {
maybe_head_block
.number()
.saturating_add(1.into())
.saturating_sub(min_consensus_block_num)
.as_u64()
} else {
// TODO: get from app config? different chains probably should have different values. 10 is probably too much
10
};
trace!(
"max_lag_consensus_to_highest: {}",
max_lag_consensus_to_highest
);
let num_known = self.rpc_name_to_block.len();
if num_known < web3_rpcs.min_head_rpcs {
@ -338,7 +360,7 @@ impl ConnectionsGroup {
}
// success! this block has enough soft limit and nodes on it (or on later blocks)
let conns: Vec<Arc<Web3Rpc>> = primary_consensus_rpcs
let rpcs: Vec<Arc<Web3Rpc>> = primary_consensus_rpcs
.into_iter()
.filter_map(|conn_name| web3_rpcs.by_name.get(conn_name).cloned())
.collect();
@ -349,8 +371,9 @@ impl ConnectionsGroup {
let _ = maybe_head_block.number();
Ok(ConsensusWeb3Rpcs {
tier: *tier,
head_block: Some(maybe_head_block),
rpcs: conns,
rpcs,
backups_voted: backup_rpcs_voted,
backups_needed: primary_rpcs_voted.is_none(),
})
@ -377,10 +400,15 @@ impl ConsensusFinder {
max_block_age: Option<u64>,
max_block_lag: Option<U64>,
) -> Self {
// TODO: what's a good capacity for this?
let first_seen = Cache::builder()
.max_capacity(16)
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default());
// TODO: this will need some thought when config reloading is written
let tiers = configured_tiers
.iter()
.map(|x| (*x, Default::default()))
.map(|x| (*x, ConnectionsGroup::new(first_seen.clone())))
.collect();
Self {
@ -389,9 +417,11 @@ impl ConsensusFinder {
max_block_lag,
}
}
}
impl ConsensusFinder {
pub fn len(&self) -> usize {
self.tiers.len()
}
/// get the ConnectionsGroup that contains all rpcs
/// panics if there are no tiers
pub fn all_rpcs_group(&self) -> Option<&ConnectionsGroup> {
@ -421,7 +451,11 @@ impl ConsensusFinder {
}
/// returns the block that the rpc was on before updating to the new_block
pub fn insert(&mut self, rpc: &Web3Rpc, new_block: Web3ProxyBlock) -> Option<Web3ProxyBlock> {
pub async fn insert(
&mut self,
rpc: &Web3Rpc,
new_block: Web3ProxyBlock,
) -> Option<Web3ProxyBlock> {
let mut old = None;
// TODO: error if rpc.tier is not in self.tiers
@ -432,7 +466,7 @@ impl ConsensusFinder {
}
// TODO: should new_block be a ref?
let x = tier_group.insert(rpc, new_block.clone());
let x = tier_group.insert(rpc, new_block.clone()).await;
if old.is_none() && x.is_some() {
old = x;
@ -473,7 +507,7 @@ impl ConsensusFinder {
}
}
if let Some(prev_block) = self.insert(&rpc, rpc_head_block.clone()) {
if let Some(prev_block) = self.insert(&rpc, rpc_head_block.clone()).await {
if prev_block.hash() == rpc_head_block.hash() {
// this block was already sent by this rpc. return early
false
@ -527,13 +561,13 @@ impl ConsensusFinder {
// TODO: how should errors be handled?
// TODO: find the best tier with a connectionsgroup. best case, this only queries the first tier
// TODO: do we need to calculate all of them? I think having highest_known_block included as part of min_block_num should make that unnecessary
for (i, x) in self.tiers.iter() {
trace!("checking tier {}: {:#?}", i, x.rpc_name_to_block);
for (tier, x) in self.tiers.iter() {
trace!("checking tier {}: {:#?}", tier, x.rpc_name_to_block);
if let Ok(consensus_head_connections) = x
.consensus_head_connections(authorization, web3_connections, min_block_num)
.consensus_head_connections(authorization, web3_connections, min_block_num, tier)
.await
{
trace!("success on tier {}", i);
trace!("success on tier {}", tier);
// we got one! hopefully it didn't need to use any backups.
// but even if it did need backup servers, that is better than going to a worse tier
return Ok(consensus_head_connections);
@ -546,8 +580,8 @@ impl ConsensusFinder {
#[cfg(test)]
mod test {
#[test]
fn test_simplest_case_consensus_head_connections() {
todo!();
}
// #[test]
// fn test_simplest_case_consensus_head_connections() {
// todo!();
// }
}

@ -458,10 +458,7 @@ impl Web3Rpcs {
max_block_needed
))
}
cmp::Ordering::Less => {
// hmmmm
todo!("now what do we do?");
}
cmp::Ordering::Less => min_block_needed.cmp(head_block_num),
}
}
};

@ -21,33 +21,74 @@ use serde_json::json;
use std::cmp::min;
use std::fmt;
use std::hash::{Hash, Hasher};
use std::sync::atomic::{self, AtomicU64};
use std::sync::atomic::{self, AtomicU64, AtomicUsize};
use std::{cmp::Ordering, sync::Arc};
use thread_fast_rng::rand::Rng;
use thread_fast_rng::thread_fast_rng;
use tokio::sync::{broadcast, oneshot, watch, RwLock as AsyncRwLock};
use tokio::time::{sleep, sleep_until, timeout, Duration, Instant};
pub struct Web3RpcLatencies {
/// Traack how far behind the fastest node we are
pub new_head: Histogram<u64>,
/// exponentially weighted moving average of how far behind the fastest node we are
pub new_head_ewma: u32,
/// Track how long an rpc call takes on average
pub request: Histogram<u64>,
/// exponentially weighted moving average of how far behind the fastest node we are
pub request_ewma: u32,
pub struct Latency {
/// Track how many milliseconds slower we are than the fastest node
pub histogram: Histogram<u64>,
/// exponentially weighted moving average of how many milliseconds behind the fastest node we are
pub ewma: ewma::EWMA,
}
impl Default for Web3RpcLatencies {
impl Serialize for Latency {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut state = serializer.serialize_struct("latency", 6)?;
state.serialize_field("ewma_ms", &self.ewma.value())?;
state.serialize_field("histogram_len", &self.histogram.len())?;
state.serialize_field("mean_ms", &self.histogram.mean())?;
state.serialize_field("p50_ms", &self.histogram.value_at_quantile(0.50))?;
state.serialize_field("p75_ms", &self.histogram.value_at_quantile(0.75))?;
state.serialize_field("p99_ms", &self.histogram.value_at_quantile(0.99))?;
state.end()
}
}
impl Latency {
pub fn record(&mut self, milliseconds: f64) {
self.ewma.add(milliseconds);
// histogram needs ints and not floats
self.histogram.record(milliseconds as u64).unwrap();
}
}
impl Default for Latency {
fn default() -> Self {
todo!("use ewma crate, not u32");
Self {
new_head: Histogram::new(3).unwrap(),
new_head_ewma: 0,
request: Histogram::new(3).unwrap(),
request_ewma: 0,
}
// TODO: what should the default sigfig be?
let sigfig = 0;
// TODO: what should the default span be? 25 requests? have a "new"
let span = 25.0;
Self::new(sigfig, span).expect("default histogram sigfigs should always work")
}
}
impl Latency {
pub fn new(sigfig: u8, span: f64) -> Result<Self, hdrhistogram::CreationError> {
let alpha = Self::span_to_alpha(span);
let histogram = Histogram::new(sigfig)?;
Ok(Self {
histogram,
ewma: ewma::EWMA::new(alpha),
})
}
fn span_to_alpha(span: f64) -> f64 {
2.0 / (span + 1.0)
}
}
@ -83,8 +124,13 @@ pub struct Web3Rpc {
pub(super) tier: u64,
/// TODO: change this to a watch channel so that http providers can subscribe and take action on change.
pub(super) head_block: RwLock<Option<Web3ProxyBlock>>,
/// Track how fast this RPC is
pub(super) latency: Web3RpcLatencies,
/// Track head block latency
pub(super) head_latency: RwLock<Latency>,
/// Track request latency
pub(super) request_latency: RwLock<Latency>,
/// Track total requests served
/// TODO: maybe move this to graphana
pub(super) total_requests: AtomicUsize,
}
impl Web3Rpc {
@ -1081,7 +1127,7 @@ impl Serialize for Web3Rpc {
S: Serializer,
{
// 3 is the number of fields in the struct.
let mut state = serializer.serialize_struct("Web3Rpc", 9)?;
let mut state = serializer.serialize_struct("Web3Rpc", 10)?;
// the url is excluded because it likely includes private information. just show the name that we use in keys
state.serialize_field("name", &self.name)?;
@ -1103,17 +1149,17 @@ impl Serialize for Web3Rpc {
state.serialize_field("soft_limit", &self.soft_limit)?;
// TODO: keep this for the "popularity_contest" command? or maybe better to just use graphana?
// state.serialize_field(
// "frontend_requests",
// &self.frontend_requests.load(atomic::Ordering::Relaxed),
// )?;
// TODO: maybe this is too much data. serialize less?
state.serialize_field("head_block", &*self.head_block.read())?;
{
// TODO: maybe this is too much data. serialize less?
let head_block = &*self.head_block.read();
state.serialize_field("head_block", head_block)?;
}
state.serialize_field("head_latency", &*self.head_latency.read())?;
state.serialize_field("request_latency", &*self.request_latency.read())?;
state.serialize_field(
"total_requests",
&self.total_requests.load(atomic::Ordering::Relaxed),
)?;
state.end()
}
@ -1207,7 +1253,6 @@ mod tests {
let block_data_limit = 64;
// TODO: this is getting long. have a `impl Default`
let x = Web3Rpc {
name: "name".to_string(),
soft_limit: 1_000,

@ -183,6 +183,12 @@ impl OpenRequestHandle {
let provider = provider.expect("provider was checked already");
self.rpc
.total_requests
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let start = Instant::now();
// TODO: replace ethers-rs providers with our own that supports streaming the responses
let response = match provider.as_ref() {
#[cfg(test)]
@ -367,6 +373,12 @@ impl OpenRequestHandle {
tokio::spawn(f);
}
}
} else {
// TODO: locking now will slow us down. send latency into a channel instead
self.rpc
.request_latency
.write()
.record(start.elapsed().as_secs_f64() * 1000.0);
}
response