i think it works
This commit is contained in:
parent
658f658a0b
commit
88da33c7e4
@ -415,6 +415,8 @@ impl Web3Rpcs {
|
||||
|
||||
// TODO: what should we do if the block number of new_synced_connections is < old_synced_connections? wait?
|
||||
|
||||
let consensus_tier = new_synced_connections.tier;
|
||||
let total_tiers = consensus_finder.len();
|
||||
let backups_needed = new_synced_connections.backups_needed;
|
||||
let consensus_head_block = new_synced_connections.head_block.clone();
|
||||
let num_consensus_rpcs = new_synced_connections.num_conns();
|
||||
@ -434,7 +436,9 @@ impl Web3Rpcs {
|
||||
match &old_consensus_head_connections.head_block {
|
||||
None => {
|
||||
debug!(
|
||||
"first {}{}/{}/{} block={}, rpc={}",
|
||||
"first {}/{} {}{}/{}/{} block={}, rpc={}",
|
||||
consensus_tier,
|
||||
total_tiers,
|
||||
backups_voted_str,
|
||||
num_consensus_rpcs,
|
||||
num_active_rpcs,
|
||||
@ -469,7 +473,9 @@ impl Web3Rpcs {
|
||||
// no change in hash. no need to use head_block_sender
|
||||
// TODO: trace level if rpc is backup
|
||||
debug!(
|
||||
"con {}{}/{}/{} con={} rpc={}@{}",
|
||||
"con {}/{} {}{}/{}/{} con={} rpc={}@{}",
|
||||
consensus_tier,
|
||||
total_tiers,
|
||||
backups_voted_str,
|
||||
num_consensus_rpcs,
|
||||
num_active_rpcs,
|
||||
@ -486,7 +492,9 @@ impl Web3Rpcs {
|
||||
}
|
||||
|
||||
debug!(
|
||||
"unc {}{}/{}/{} con_head={} old={} rpc={}@{}",
|
||||
"unc {}/{} {}{}/{}/{} con_head={} old={} rpc={}@{}",
|
||||
consensus_tier,
|
||||
total_tiers,
|
||||
backups_voted_str,
|
||||
num_consensus_rpcs,
|
||||
num_active_rpcs,
|
||||
@ -511,7 +519,9 @@ impl Web3Rpcs {
|
||||
// this is unlikely but possible
|
||||
// TODO: better log
|
||||
warn!(
|
||||
"chain rolled back {}{}/{}/{} con={} old={} rpc={}@{}",
|
||||
"chain rolled back {}/{} {}{}/{}/{} con={} old={} rpc={}@{}",
|
||||
consensus_tier,
|
||||
total_tiers,
|
||||
backups_voted_str,
|
||||
num_consensus_rpcs,
|
||||
num_active_rpcs,
|
||||
@ -541,7 +551,9 @@ impl Web3Rpcs {
|
||||
}
|
||||
Ordering::Greater => {
|
||||
debug!(
|
||||
"new {}{}/{}/{} con={} rpc={}@{}",
|
||||
"new {}/{} {}{}/{}/{} con={} rpc={}@{}",
|
||||
consensus_tier,
|
||||
total_tiers,
|
||||
backups_voted_str,
|
||||
num_consensus_rpcs,
|
||||
num_active_rpcs,
|
||||
@ -573,7 +585,9 @@ impl Web3Rpcs {
|
||||
if num_active_rpcs >= self.min_head_rpcs {
|
||||
// no consensus!!!
|
||||
error!(
|
||||
"non {}{}/{}/{} rpc={}@{}",
|
||||
"non {}/{} {}{}/{}/{} rpc={}@{}",
|
||||
consensus_tier,
|
||||
total_tiers,
|
||||
backups_voted_str,
|
||||
num_consensus_rpcs,
|
||||
num_active_rpcs,
|
||||
@ -584,7 +598,9 @@ impl Web3Rpcs {
|
||||
} else {
|
||||
// no consensus, but we do not have enough rpcs connected yet to panic
|
||||
debug!(
|
||||
"non {}{}/{}/{} rpc={}@{}",
|
||||
"non {}/{} {}{}/{}/{} rpc={}@{}",
|
||||
consensus_tier,
|
||||
total_tiers,
|
||||
backups_voted_str,
|
||||
num_consensus_rpcs,
|
||||
num_active_rpcs,
|
||||
|
@ -7,15 +7,18 @@ use anyhow::Context;
|
||||
use ethers::prelude::{H256, U64};
|
||||
use hashbrown::{HashMap, HashSet};
|
||||
use log::{debug, trace, warn};
|
||||
use moka::future::Cache;
|
||||
use serde::Serialize;
|
||||
use std::collections::BTreeMap;
|
||||
use std::fmt;
|
||||
use std::sync::Arc;
|
||||
use tokio::time::Instant;
|
||||
|
||||
/// A collection of Web3Rpcs that are on the same block.
|
||||
/// Serialize is so we can print it on our debug endpoint
|
||||
#[derive(Clone, Default, Serialize)]
|
||||
pub struct ConsensusWeb3Rpcs {
|
||||
pub(super) tier: u64,
|
||||
pub(super) head_block: Option<Web3ProxyBlock>,
|
||||
// TODO: this should be able to serialize, but it isn't
|
||||
#[serde(skip_serializing)]
|
||||
@ -74,22 +77,25 @@ impl Web3Rpcs {
|
||||
}
|
||||
}
|
||||
|
||||
type FirstSeenCache = Cache<H256, Instant, hashbrown::hash_map::DefaultHashBuilder>;
|
||||
|
||||
pub struct ConnectionsGroup {
|
||||
rpc_name_to_block: HashMap<String, Web3ProxyBlock>,
|
||||
// TODO: what if there are two blocks with the same number?
|
||||
highest_block: Option<Web3ProxyBlock>,
|
||||
}
|
||||
|
||||
impl Default for ConnectionsGroup {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
rpc_name_to_block: Default::default(),
|
||||
highest_block: Default::default(),
|
||||
}
|
||||
}
|
||||
/// used to track rpc.head_latency. The same cache should be shared between all ConnectionsGroups
|
||||
first_seen: FirstSeenCache,
|
||||
}
|
||||
|
||||
impl ConnectionsGroup {
|
||||
pub fn new(first_seen: FirstSeenCache) -> Self {
|
||||
Self {
|
||||
rpc_name_to_block: Default::default(),
|
||||
highest_block: Default::default(),
|
||||
first_seen,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn len(&self) -> usize {
|
||||
self.rpc_name_to_block.len()
|
||||
}
|
||||
@ -115,7 +121,17 @@ impl ConnectionsGroup {
|
||||
}
|
||||
}
|
||||
|
||||
fn insert(&mut self, rpc: &Web3Rpc, block: Web3ProxyBlock) -> Option<Web3ProxyBlock> {
|
||||
async fn insert(&mut self, rpc: &Web3Rpc, block: Web3ProxyBlock) -> Option<Web3ProxyBlock> {
|
||||
let first_seen = self
|
||||
.first_seen
|
||||
.get_with(*block.hash(), async move { Instant::now() })
|
||||
.await;
|
||||
|
||||
// TODO: this should be 0 if we are first seen, but i think it will be slightly non-zero
|
||||
rpc.head_latency
|
||||
.write()
|
||||
.record(first_seen.elapsed().as_secs_f64() * 1000.0);
|
||||
|
||||
// TODO: what about a reorg to the same height?
|
||||
if Some(block.number()) > self.highest_block.as_ref().map(|x| x.number()) {
|
||||
self.highest_block = Some(block.clone());
|
||||
@ -179,6 +195,7 @@ impl ConnectionsGroup {
|
||||
authorization: &Arc<Authorization>,
|
||||
web3_rpcs: &Web3Rpcs,
|
||||
min_consensus_block_num: Option<U64>,
|
||||
tier: &u64,
|
||||
) -> anyhow::Result<ConsensusWeb3Rpcs> {
|
||||
let mut maybe_head_block = match self.highest_block.clone() {
|
||||
None => return Err(anyhow::anyhow!("no blocks known")),
|
||||
@ -191,13 +208,18 @@ impl ConnectionsGroup {
|
||||
if let Some(min_consensus_block_num) = min_consensus_block_num {
|
||||
maybe_head_block
|
||||
.number()
|
||||
.saturating_add(1.into())
|
||||
.saturating_sub(min_consensus_block_num)
|
||||
.as_u64()
|
||||
} else {
|
||||
// TODO: get from app config? different chains probably should have different values. 10 is probably too much
|
||||
10
|
||||
};
|
||||
|
||||
trace!(
|
||||
"max_lag_consensus_to_highest: {}",
|
||||
max_lag_consensus_to_highest
|
||||
);
|
||||
|
||||
let num_known = self.rpc_name_to_block.len();
|
||||
|
||||
if num_known < web3_rpcs.min_head_rpcs {
|
||||
@ -338,7 +360,7 @@ impl ConnectionsGroup {
|
||||
}
|
||||
|
||||
// success! this block has enough soft limit and nodes on it (or on later blocks)
|
||||
let conns: Vec<Arc<Web3Rpc>> = primary_consensus_rpcs
|
||||
let rpcs: Vec<Arc<Web3Rpc>> = primary_consensus_rpcs
|
||||
.into_iter()
|
||||
.filter_map(|conn_name| web3_rpcs.by_name.get(conn_name).cloned())
|
||||
.collect();
|
||||
@ -349,8 +371,9 @@ impl ConnectionsGroup {
|
||||
let _ = maybe_head_block.number();
|
||||
|
||||
Ok(ConsensusWeb3Rpcs {
|
||||
tier: *tier,
|
||||
head_block: Some(maybe_head_block),
|
||||
rpcs: conns,
|
||||
rpcs,
|
||||
backups_voted: backup_rpcs_voted,
|
||||
backups_needed: primary_rpcs_voted.is_none(),
|
||||
})
|
||||
@ -377,10 +400,15 @@ impl ConsensusFinder {
|
||||
max_block_age: Option<u64>,
|
||||
max_block_lag: Option<U64>,
|
||||
) -> Self {
|
||||
// TODO: what's a good capacity for this?
|
||||
let first_seen = Cache::builder()
|
||||
.max_capacity(16)
|
||||
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default());
|
||||
|
||||
// TODO: this will need some thought when config reloading is written
|
||||
let tiers = configured_tiers
|
||||
.iter()
|
||||
.map(|x| (*x, Default::default()))
|
||||
.map(|x| (*x, ConnectionsGroup::new(first_seen.clone())))
|
||||
.collect();
|
||||
|
||||
Self {
|
||||
@ -389,9 +417,11 @@ impl ConsensusFinder {
|
||||
max_block_lag,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ConsensusFinder {
|
||||
pub fn len(&self) -> usize {
|
||||
self.tiers.len()
|
||||
}
|
||||
|
||||
/// get the ConnectionsGroup that contains all rpcs
|
||||
/// panics if there are no tiers
|
||||
pub fn all_rpcs_group(&self) -> Option<&ConnectionsGroup> {
|
||||
@ -421,7 +451,11 @@ impl ConsensusFinder {
|
||||
}
|
||||
|
||||
/// returns the block that the rpc was on before updating to the new_block
|
||||
pub fn insert(&mut self, rpc: &Web3Rpc, new_block: Web3ProxyBlock) -> Option<Web3ProxyBlock> {
|
||||
pub async fn insert(
|
||||
&mut self,
|
||||
rpc: &Web3Rpc,
|
||||
new_block: Web3ProxyBlock,
|
||||
) -> Option<Web3ProxyBlock> {
|
||||
let mut old = None;
|
||||
|
||||
// TODO: error if rpc.tier is not in self.tiers
|
||||
@ -432,7 +466,7 @@ impl ConsensusFinder {
|
||||
}
|
||||
|
||||
// TODO: should new_block be a ref?
|
||||
let x = tier_group.insert(rpc, new_block.clone());
|
||||
let x = tier_group.insert(rpc, new_block.clone()).await;
|
||||
|
||||
if old.is_none() && x.is_some() {
|
||||
old = x;
|
||||
@ -473,7 +507,7 @@ impl ConsensusFinder {
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(prev_block) = self.insert(&rpc, rpc_head_block.clone()) {
|
||||
if let Some(prev_block) = self.insert(&rpc, rpc_head_block.clone()).await {
|
||||
if prev_block.hash() == rpc_head_block.hash() {
|
||||
// this block was already sent by this rpc. return early
|
||||
false
|
||||
@ -527,13 +561,13 @@ impl ConsensusFinder {
|
||||
// TODO: how should errors be handled?
|
||||
// TODO: find the best tier with a connectionsgroup. best case, this only queries the first tier
|
||||
// TODO: do we need to calculate all of them? I think having highest_known_block included as part of min_block_num should make that unnecessary
|
||||
for (i, x) in self.tiers.iter() {
|
||||
trace!("checking tier {}: {:#?}", i, x.rpc_name_to_block);
|
||||
for (tier, x) in self.tiers.iter() {
|
||||
trace!("checking tier {}: {:#?}", tier, x.rpc_name_to_block);
|
||||
if let Ok(consensus_head_connections) = x
|
||||
.consensus_head_connections(authorization, web3_connections, min_block_num)
|
||||
.consensus_head_connections(authorization, web3_connections, min_block_num, tier)
|
||||
.await
|
||||
{
|
||||
trace!("success on tier {}", i);
|
||||
trace!("success on tier {}", tier);
|
||||
// we got one! hopefully it didn't need to use any backups.
|
||||
// but even if it did need backup servers, that is better than going to a worse tier
|
||||
return Ok(consensus_head_connections);
|
||||
@ -546,8 +580,8 @@ impl ConsensusFinder {
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
#[test]
|
||||
fn test_simplest_case_consensus_head_connections() {
|
||||
todo!();
|
||||
}
|
||||
// #[test]
|
||||
// fn test_simplest_case_consensus_head_connections() {
|
||||
// todo!();
|
||||
// }
|
||||
}
|
||||
|
@ -458,10 +458,7 @@ impl Web3Rpcs {
|
||||
max_block_needed
|
||||
))
|
||||
}
|
||||
cmp::Ordering::Less => {
|
||||
// hmmmm
|
||||
todo!("now what do we do?");
|
||||
}
|
||||
cmp::Ordering::Less => min_block_needed.cmp(head_block_num),
|
||||
}
|
||||
}
|
||||
};
|
||||
|
@ -21,33 +21,74 @@ use serde_json::json;
|
||||
use std::cmp::min;
|
||||
use std::fmt;
|
||||
use std::hash::{Hash, Hasher};
|
||||
use std::sync::atomic::{self, AtomicU64};
|
||||
use std::sync::atomic::{self, AtomicU64, AtomicUsize};
|
||||
use std::{cmp::Ordering, sync::Arc};
|
||||
use thread_fast_rng::rand::Rng;
|
||||
use thread_fast_rng::thread_fast_rng;
|
||||
use tokio::sync::{broadcast, oneshot, watch, RwLock as AsyncRwLock};
|
||||
use tokio::time::{sleep, sleep_until, timeout, Duration, Instant};
|
||||
|
||||
pub struct Web3RpcLatencies {
|
||||
/// Traack how far behind the fastest node we are
|
||||
pub new_head: Histogram<u64>,
|
||||
/// exponentially weighted moving average of how far behind the fastest node we are
|
||||
pub new_head_ewma: u32,
|
||||
/// Track how long an rpc call takes on average
|
||||
pub request: Histogram<u64>,
|
||||
/// exponentially weighted moving average of how far behind the fastest node we are
|
||||
pub request_ewma: u32,
|
||||
pub struct Latency {
|
||||
/// Track how many milliseconds slower we are than the fastest node
|
||||
pub histogram: Histogram<u64>,
|
||||
/// exponentially weighted moving average of how many milliseconds behind the fastest node we are
|
||||
pub ewma: ewma::EWMA,
|
||||
}
|
||||
|
||||
impl Default for Web3RpcLatencies {
|
||||
impl Serialize for Latency {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: Serializer,
|
||||
{
|
||||
let mut state = serializer.serialize_struct("latency", 6)?;
|
||||
|
||||
state.serialize_field("ewma_ms", &self.ewma.value())?;
|
||||
|
||||
state.serialize_field("histogram_len", &self.histogram.len())?;
|
||||
state.serialize_field("mean_ms", &self.histogram.mean())?;
|
||||
state.serialize_field("p50_ms", &self.histogram.value_at_quantile(0.50))?;
|
||||
state.serialize_field("p75_ms", &self.histogram.value_at_quantile(0.75))?;
|
||||
state.serialize_field("p99_ms", &self.histogram.value_at_quantile(0.99))?;
|
||||
|
||||
state.end()
|
||||
}
|
||||
}
|
||||
|
||||
impl Latency {
|
||||
pub fn record(&mut self, milliseconds: f64) {
|
||||
self.ewma.add(milliseconds);
|
||||
|
||||
// histogram needs ints and not floats
|
||||
self.histogram.record(milliseconds as u64).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for Latency {
|
||||
fn default() -> Self {
|
||||
todo!("use ewma crate, not u32");
|
||||
Self {
|
||||
new_head: Histogram::new(3).unwrap(),
|
||||
new_head_ewma: 0,
|
||||
request: Histogram::new(3).unwrap(),
|
||||
request_ewma: 0,
|
||||
}
|
||||
// TODO: what should the default sigfig be?
|
||||
let sigfig = 0;
|
||||
|
||||
// TODO: what should the default span be? 25 requests? have a "new"
|
||||
let span = 25.0;
|
||||
|
||||
Self::new(sigfig, span).expect("default histogram sigfigs should always work")
|
||||
}
|
||||
}
|
||||
|
||||
impl Latency {
|
||||
pub fn new(sigfig: u8, span: f64) -> Result<Self, hdrhistogram::CreationError> {
|
||||
let alpha = Self::span_to_alpha(span);
|
||||
|
||||
let histogram = Histogram::new(sigfig)?;
|
||||
|
||||
Ok(Self {
|
||||
histogram,
|
||||
ewma: ewma::EWMA::new(alpha),
|
||||
})
|
||||
}
|
||||
|
||||
fn span_to_alpha(span: f64) -> f64 {
|
||||
2.0 / (span + 1.0)
|
||||
}
|
||||
}
|
||||
|
||||
@ -83,8 +124,13 @@ pub struct Web3Rpc {
|
||||
pub(super) tier: u64,
|
||||
/// TODO: change this to a watch channel so that http providers can subscribe and take action on change.
|
||||
pub(super) head_block: RwLock<Option<Web3ProxyBlock>>,
|
||||
/// Track how fast this RPC is
|
||||
pub(super) latency: Web3RpcLatencies,
|
||||
/// Track head block latency
|
||||
pub(super) head_latency: RwLock<Latency>,
|
||||
/// Track request latency
|
||||
pub(super) request_latency: RwLock<Latency>,
|
||||
/// Track total requests served
|
||||
/// TODO: maybe move this to graphana
|
||||
pub(super) total_requests: AtomicUsize,
|
||||
}
|
||||
|
||||
impl Web3Rpc {
|
||||
@ -1081,7 +1127,7 @@ impl Serialize for Web3Rpc {
|
||||
S: Serializer,
|
||||
{
|
||||
// 3 is the number of fields in the struct.
|
||||
let mut state = serializer.serialize_struct("Web3Rpc", 9)?;
|
||||
let mut state = serializer.serialize_struct("Web3Rpc", 10)?;
|
||||
|
||||
// the url is excluded because it likely includes private information. just show the name that we use in keys
|
||||
state.serialize_field("name", &self.name)?;
|
||||
@ -1103,17 +1149,17 @@ impl Serialize for Web3Rpc {
|
||||
|
||||
state.serialize_field("soft_limit", &self.soft_limit)?;
|
||||
|
||||
// TODO: keep this for the "popularity_contest" command? or maybe better to just use graphana?
|
||||
// state.serialize_field(
|
||||
// "frontend_requests",
|
||||
// &self.frontend_requests.load(atomic::Ordering::Relaxed),
|
||||
// )?;
|
||||
// TODO: maybe this is too much data. serialize less?
|
||||
state.serialize_field("head_block", &*self.head_block.read())?;
|
||||
|
||||
{
|
||||
// TODO: maybe this is too much data. serialize less?
|
||||
let head_block = &*self.head_block.read();
|
||||
state.serialize_field("head_block", head_block)?;
|
||||
}
|
||||
state.serialize_field("head_latency", &*self.head_latency.read())?;
|
||||
|
||||
state.serialize_field("request_latency", &*self.request_latency.read())?;
|
||||
|
||||
state.serialize_field(
|
||||
"total_requests",
|
||||
&self.total_requests.load(atomic::Ordering::Relaxed),
|
||||
)?;
|
||||
|
||||
state.end()
|
||||
}
|
||||
@ -1207,7 +1253,6 @@ mod tests {
|
||||
|
||||
let block_data_limit = 64;
|
||||
|
||||
// TODO: this is getting long. have a `impl Default`
|
||||
let x = Web3Rpc {
|
||||
name: "name".to_string(),
|
||||
soft_limit: 1_000,
|
||||
|
@ -183,6 +183,12 @@ impl OpenRequestHandle {
|
||||
|
||||
let provider = provider.expect("provider was checked already");
|
||||
|
||||
self.rpc
|
||||
.total_requests
|
||||
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
|
||||
let start = Instant::now();
|
||||
|
||||
// TODO: replace ethers-rs providers with our own that supports streaming the responses
|
||||
let response = match provider.as_ref() {
|
||||
#[cfg(test)]
|
||||
@ -367,6 +373,12 @@ impl OpenRequestHandle {
|
||||
tokio::spawn(f);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// TODO: locking now will slow us down. send latency into a channel instead
|
||||
self.rpc
|
||||
.request_latency
|
||||
.write()
|
||||
.record(start.elapsed().as_secs_f64() * 1000.0);
|
||||
}
|
||||
|
||||
response
|
||||
|
Loading…
Reference in New Issue
Block a user