web3-proxy/web3_proxy/src/rpcs/consensus.rs

658 lines
22 KiB
Rust
Raw Normal View History

use super::blockchain::Web3ProxyBlock;
use super::many::Web3Rpcs;
use super::one::Web3Rpc;
use crate::frontend::authorization::Authorization;
2023-04-06 01:15:20 +03:00
use crate::frontend::errors::{Web3ProxyErrorContext, Web3ProxyResult};
use derive_more::Constructor;
use ethers::prelude::{H256, U64};
use hashbrown::{HashMap, HashSet};
use itertools::{Itertools, MinMaxResult};
use log::{debug, trace, warn};
2023-05-18 23:34:22 +03:00
use quick_cache_ttl::Cache;
use serde::Serialize;
use std::cmp::{Ordering, Reverse};
use std::collections::BTreeMap;
2023-05-18 23:34:22 +03:00
use std::convert::Infallible;
use std::fmt;
use std::sync::Arc;
2023-02-15 23:33:43 +03:00
use tokio::time::Instant;
#[derive(Clone, Serialize)]
struct RpcData {
head_block_num: U64,
// TODO: this is too simple. erigon has 4 prune levels (hrct)
oldest_block_num: U64,
}
impl RpcData {
fn new(rpc: &Web3Rpc, head: &Web3ProxyBlock) -> Self {
let head_block_num = *head.number();
let block_data_limit = rpc.block_data_limit();
let oldest_block_num = head_block_num.saturating_sub(block_data_limit);
Self {
head_block_num,
oldest_block_num,
}
}
// TODO: take an enum for the type of data (hrtc)
fn data_available(&self, block_num: &U64) -> bool {
*block_num >= self.oldest_block_num && *block_num <= self.head_block_num
}
}
#[derive(Constructor, Clone, Copy, Debug, Default, Eq, PartialEq, Serialize)]
pub struct RpcRanking {
tier: u64,
backup: bool,
head_num: Option<U64>,
}
impl RpcRanking {
pub fn add_offset(&self, offset: u64) -> Self {
Self {
tier: self.tier + offset,
backup: self.backup,
head_num: self.head_num,
}
}
pub fn default_with_backup(backup: bool) -> Self {
Self {
backup,
..Default::default()
}
}
fn sort_key(&self) -> (u64, bool, Reverse<Option<U64>>) {
// TODO: add soft_limit here? add peak_ewma here?
(self.tier, !self.backup, Reverse(self.head_num))
}
}
impl Ord for RpcRanking {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.sort_key().cmp(&other.sort_key())
}
}
impl PartialOrd for RpcRanking {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
pub type RankedRpcMap = BTreeMap<RpcRanking, Vec<Arc<Web3Rpc>>>;
pub enum ShouldWaitForBlock {
Ready,
Wait { current: Option<U64> },
NeverReady,
}
/// A collection of Web3Rpcs that are on the same block.
/// Serialize is so we can print it on our /status endpoint
#[derive(Clone, Serialize)]
pub struct ConsensusWeb3Rpcs {
pub(crate) tier: u64,
pub(crate) backups_needed: bool,
2023-05-13 09:22:20 +03:00
// TODO: this is already inside best_rpcs. Don't skip, instead make a shorter serialize
2023-05-13 09:22:20 +03:00
#[serde(skip_serializing)]
pub(crate) head_block: Web3ProxyBlock,
2023-05-13 09:22:20 +03:00
// TODO: smaller serialize
pub(crate) head_rpcs: Vec<Arc<Web3Rpc>>,
2023-05-13 09:22:20 +03:00
// TODO: make this work. the key needs to be a string. I think we need `serialize_with`
2023-05-13 09:22:20 +03:00
#[serde(skip_serializing)]
pub(crate) other_rpcs: RankedRpcMap,
// TODO: make this work. the key needs to be a string. I think we need `serialize_with`
2023-05-13 09:22:20 +03:00
#[serde(skip_serializing)]
rpc_data: HashMap<Arc<Web3Rpc>, RpcData>,
}
impl ConsensusWeb3Rpcs {
#[inline]
2023-05-16 23:26:39 +03:00
pub fn num_consensus_rpcs(&self) -> usize {
self.head_rpcs.len()
}
/// will tell you if you should wait for a block
/// TODO: also include method (or maybe an enum representing the different prune types)
pub fn should_wait_for_block(
&self,
needed_block_num: Option<&U64>,
skip_rpcs: &[Arc<Web3Rpc>],
) -> ShouldWaitForBlock {
// TODO: i think checking synced is always a waste of time. though i guess there could be a race
// if self
// .head_rpcs
// .iter()
// .any(|rpc| self.rpc_will_work_eventually(rpc, needed_block_num, skip_rpcs))
// {
// let head_num = self.head_block.number();
// if Some(head_num) >= needed_block_num {
// debug!("best (head) block: {}", head_num);
// return ShouldWaitForBlock::Ready;
// }
// }
// all of the head rpcs are skipped
let mut best_num = None;
// iterate the other rpc tiers to find the next best block
for (next_ranking, next_rpcs) in self.other_rpcs.iter() {
if !next_rpcs
.iter()
.any(|rpc| self.rpc_will_work_eventually(rpc, needed_block_num, skip_rpcs))
{
// TODO: too verbose
debug!("everything in this ranking ({:?}) is skipped", next_ranking);
continue;
}
let next_head_num = next_ranking.head_num.as_ref();
2023-05-16 23:26:39 +03:00
if next_head_num >= needed_block_num {
debug!("best (head) block: {:?}", next_head_num);
return ShouldWaitForBlock::Ready;
2023-05-16 23:26:39 +03:00
}
best_num = next_head_num;
}
// TODO: this seems wrong
if best_num.is_some() {
// TODO: too verbose
debug!("best (old) block: {:?}", best_num);
ShouldWaitForBlock::Wait {
current: best_num.copied(),
}
2023-05-16 23:26:39 +03:00
} else {
// TODO: too verbose
debug!("never ready");
ShouldWaitForBlock::NeverReady
2023-05-16 23:26:39 +03:00
}
}
pub fn has_block_data(&self, rpc: &Web3Rpc, block_num: &U64) -> bool {
self.rpc_data
.get(rpc)
.map(|x| x.data_available(block_num))
.unwrap_or(false)
}
// TODO: take method as a param, too. mark nodes with supported methods (maybe do it optimistically? on)
fn rpc_will_work_eventually(
&self,
rpc: &Arc<Web3Rpc>,
needed_block_num: Option<&U64>,
skip_rpcs: &[Arc<Web3Rpc>],
) -> bool {
// return true if this rpc will never work for us. "false" is good
if skip_rpcs.contains(rpc) {
// if rpc is skipped, it must have already been determined it is unable to serve the request
return false;
}
if let Some(needed_block_num) = needed_block_num {
if let Some(rpc_data) = self.rpc_data.get(rpc) {
match rpc_data.head_block_num.cmp(needed_block_num) {
Ordering::Less => {
debug!("{} is behind. let it catch up", rpc);
return true;
}
Ordering::Greater | Ordering::Equal => {
// rpc is synced past the needed block. make sure the block isn't too old for it
if self.has_block_data(rpc, needed_block_num) {
debug!("{} has {}", rpc, needed_block_num);
return true;
} else {
debug!("{} does not have {}", rpc, needed_block_num);
return false;
}
}
}
}
// no rpc data for this rpc. thats not promising
return true;
}
false
}
// TODO: better name for this
pub fn rpc_will_work_now(
&self,
skip: &[Arc<Web3Rpc>],
min_block_needed: Option<&U64>,
max_block_needed: Option<&U64>,
rpc: &Arc<Web3Rpc>,
) -> bool {
if skip.contains(rpc) {
trace!("skipping {}", rpc);
return false;
}
if let Some(min_block_needed) = min_block_needed {
if !self.has_block_data(rpc, min_block_needed) {
trace!(
"{} is missing min_block_needed ({}). skipping",
rpc,
min_block_needed,
);
return false;
}
}
if let Some(max_block_needed) = max_block_needed {
if !self.has_block_data(rpc, max_block_needed) {
trace!(
"{} is missing max_block_needed ({}). skipping",
rpc,
max_block_needed,
);
return false;
}
}
// we could check hard rate limits here, but i think it is faster to do later
true
}
// TODO: sum_hard_limit?
}
impl fmt::Debug for ConsensusWeb3Rpcs {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// TODO: the default formatter takes forever to write. this is too quiet though
// TODO: print the actual conns?
f.debug_struct("ConsensusWeb3Rpcs")
.field("head_block", &self.head_block)
.field("num_conns", &self.head_rpcs.len())
.finish_non_exhaustive()
}
}
// TODO: refs for all of these. borrow on a Sender is cheap enough
impl Web3Rpcs {
// TODO: return a ref?
pub fn head_block(&self) -> Option<Web3ProxyBlock> {
self.watch_consensus_head_sender
.as_ref()
2023-02-15 04:41:40 +03:00
.and_then(|x| x.borrow().clone())
}
// TODO: return a ref?
pub fn head_block_hash(&self) -> Option<H256> {
self.head_block().map(|x| *x.hash())
}
// TODO: return a ref?
pub fn head_block_num(&self) -> Option<U64> {
self.head_block().map(|x| *x.number())
}
pub fn synced(&self) -> bool {
let consensus = self.watch_consensus_rpcs_sender.borrow();
if let Some(consensus) = consensus.as_ref() {
!consensus.head_rpcs.is_empty()
} else {
false
}
}
pub fn num_synced_rpcs(&self) -> usize {
let consensus = self.watch_consensus_rpcs_sender.borrow();
if let Some(consensus) = consensus.as_ref() {
consensus.head_rpcs.len()
} else {
0
}
}
}
2023-05-18 23:34:22 +03:00
type FirstSeenCache = Cache<H256, Instant>;
2023-02-15 23:33:43 +03:00
/// A ConsensusConnections builder that tracks all connection heads across multiple groups of servers
pub struct ConsensusFinder {
/// backups for all tiers are only used if necessary
2023-04-11 01:06:40 +03:00
/// `tiers[0] = only tier 0`
/// `tiers[1] = tier 0 and tier 1`
/// `tiers[n] = tier 0..=n`
rpc_heads: HashMap<Arc<Web3Rpc>, Web3ProxyBlock>,
/// never serve blocks that are too old
max_block_age: Option<u64>,
/// tier 0 will be prefered as long as the distance between it and the other tiers is <= max_tier_lag
max_block_lag: Option<U64>,
/// used to track rpc.head_latency. The same cache should be shared between all ConnectionsGroups
first_seen: FirstSeenCache,
}
impl ConsensusFinder {
2023-02-27 07:00:13 +03:00
pub fn new(max_block_age: Option<u64>, max_block_lag: Option<U64>) -> Self {
// TODO: what's a good capacity for this? it shouldn't need to be very large
// TODO: if we change Web3ProxyBlock to store the instance, i think we could use the block_by_hash cache
2023-05-18 23:34:22 +03:00
let first_seen = Cache::new(16);
2023-02-15 23:33:43 +03:00
2023-02-27 07:00:13 +03:00
// TODO: hard coding 0-9 isn't great, but its easier than refactoring this to be smart about config reloading
let rpc_heads = HashMap::new();
Self {
rpc_heads,
max_block_age,
max_block_lag,
first_seen,
}
}
2023-02-15 23:33:43 +03:00
pub fn len(&self) -> usize {
self.rpc_heads.len()
2023-02-15 23:33:43 +03:00
}
pub fn is_empty(&self) -> bool {
self.rpc_heads.is_empty()
}
fn remove(&mut self, rpc: &Arc<Web3Rpc>) -> Option<Web3ProxyBlock> {
self.rpc_heads.remove(rpc)
}
async fn insert(&mut self, rpc: Arc<Web3Rpc>, block: Web3ProxyBlock) -> Option<Web3ProxyBlock> {
let first_seen = self
.first_seen
2023-05-18 23:34:22 +03:00
.get_or_insert_async::<Infallible>(block.hash(), async { Ok(Instant::now()) })
.await
.unwrap();
// calculate elapsed time before trying to lock
let latency = first_seen.elapsed();
// record the time behind the fastest node
rpc.head_latency.write().record(latency);
// update the local mapping of rpc -> block
self.rpc_heads.insert(rpc, block)
}
/// Update our tracking of the rpc and return true if something changed
pub(crate) async fn update_rpc(
&mut self,
rpc_head_block: Option<Web3ProxyBlock>,
rpc: Arc<Web3Rpc>,
// we need this so we can save the block to caches. i don't like it though. maybe we should use a lazy_static Cache wrapper that has a "save_block" method?. i generally dislike globals but i also dislike all the types having to pass eachother around
web3_connections: &Web3Rpcs,
2023-03-20 23:45:21 +03:00
) -> Web3ProxyResult<bool> {
// add the rpc's block to connection_heads, or remove the rpc from connection_heads
let changed = match rpc_head_block {
Some(mut rpc_head_block) => {
// we don't know if its on the heaviest chain yet
rpc_head_block = web3_connections
.try_cache_block(rpc_head_block, false)
.await
2023-03-20 23:45:21 +03:00
.web3_context("failed caching block")?;
if let Some(max_age) = self.max_block_age {
if rpc_head_block.age() > max_age {
trace!("rpc_head_block from {} is too old! {}", rpc, rpc_head_block);
return Ok(self.remove(&rpc).is_some());
}
}
if let Some(prev_block) = self.insert(rpc, rpc_head_block.clone()).await {
// false if this block was already sent by this rpc
// true if new block for this rpc
prev_block.hash() != rpc_head_block.hash()
} else {
// first block for this rpc
true
}
}
None => {
// false if this rpc was already removed
// true if rpc head changed from being synced to not
self.remove(&rpc).is_some()
}
};
Ok(changed)
}
pub async fn find_consensus_connections(
&mut self,
authorization: &Arc<Authorization>,
web3_rpcs: &Web3Rpcs,
) -> Web3ProxyResult<Option<ConsensusWeb3Rpcs>> {
2023-03-30 15:42:56 +03:00
let minmax_block = self.rpc_heads.values().minmax_by_key(|&x| x.number());
let (lowest_block, highest_block) = match minmax_block {
MinMaxResult::NoElements => return Ok(None),
MinMaxResult::OneElement(x) => (x, x),
MinMaxResult::MinMax(min, max) => (min, max),
};
let highest_block_number = highest_block.number();
trace!("highest_block_number: {}", highest_block_number);
trace!("lowest_block_number: {}", lowest_block.number());
2023-05-12 03:04:33 +03:00
let max_lag_block_number =
highest_block_number.saturating_sub(self.max_block_lag.unwrap_or_else(|| U64::from(5)));
trace!("max_lag_block_number: {}", max_lag_block_number);
let lowest_block_number = lowest_block.number().max(&max_lag_block_number);
trace!("safe lowest_block_number: {}", lowest_block_number);
let num_known = self.rpc_heads.len();
if num_known < web3_rpcs.min_head_rpcs {
// this keeps us from serving requests when the proxy first starts
2023-05-12 03:04:33 +03:00
trace!("not enough servers known");
return Ok(None);
}
// TODO: also track the sum of *available* hard_limits? if any servers have no hard limits, use their soft limit or no limit?
// TODO: struct for the value of the votes hashmap?
let mut primary_votes: HashMap<Web3ProxyBlock, (HashSet<&str>, u32)> = Default::default();
let mut backup_votes: HashMap<Web3ProxyBlock, (HashSet<&str>, u32)> = Default::default();
let mut backup_consensus = None;
let mut rpc_heads_by_tier: Vec<_> = self.rpc_heads.iter().collect();
rpc_heads_by_tier.sort_by_cached_key(|(rpc, _)| rpc.tier);
2023-03-30 15:42:56 +03:00
let current_tier = rpc_heads_by_tier
.first()
.expect("rpc_heads_by_tier should never be empty")
.0
.tier;
// trace!("first_tier: {}", current_tier);
2023-05-12 03:04:33 +03:00
// trace!("rpc_heads_by_tier: {:#?}", rpc_heads_by_tier);
2023-05-13 09:40:15 +03:00
// loop over all the rpc heads (grouped by tier) and their parents to find consensus
// TODO: i'm sure theres a lot of shortcuts that could be taken, but this is simplest to implement
2023-05-13 09:40:15 +03:00
for (rpc, rpc_head) in rpc_heads_by_tier.into_iter() {
if current_tier != rpc.tier {
// we finished processing a tier. check for primary results
if let Some(consensus) = self.count_votes(&primary_votes, web3_rpcs) {
2023-05-12 03:04:33 +03:00
trace!("found enough votes on tier {}", current_tier);
2023-03-30 15:42:56 +03:00
return Ok(Some(consensus));
}
// only set backup consensus once. we don't want it to keep checking on worse tiers if it already found consensus
if backup_consensus.is_none() {
if let Some(consensus) = self.count_votes(&backup_votes, web3_rpcs) {
2023-05-12 03:04:33 +03:00
trace!("found backup votes on tier {}", current_tier);
2023-03-30 15:42:56 +03:00
backup_consensus = Some(consensus)
}
}
}
let mut block_to_check = rpc_head.clone();
while block_to_check.number() >= lowest_block_number {
if !rpc.backup {
// backup nodes are excluded from the primary voting
let entry = primary_votes.entry(block_to_check.clone()).or_default();
entry.0.insert(&rpc.name);
entry.1 += rpc.soft_limit;
}
// both primary and backup rpcs get included in the backup voting
let backup_entry = backup_votes.entry(block_to_check.clone()).or_default();
backup_entry.0.insert(&rpc.name);
backup_entry.1 += rpc.soft_limit;
2023-03-30 15:42:56 +03:00
match web3_rpcs
.block(authorization, block_to_check.parent_hash(), Some(rpc))
.await
{
Ok(parent_block) => block_to_check = parent_block,
Err(err) => {
2023-05-13 09:20:59 +03:00
warn!(
"Problem fetching parent block of {:?} during consensus finding: {:#?}",
block_to_check.hash(),
err
);
break;
}
}
}
}
// we finished processing all tiers. check for primary results (if anything but the last tier found consensus, we already returned above)
if let Some(consensus) = self.count_votes(&primary_votes, web3_rpcs) {
2023-03-30 15:42:56 +03:00
return Ok(Some(consensus));
}
// only set backup consensus once. we don't want it to keep checking on worse tiers if it already found consensus
if let Some(consensus) = backup_consensus {
return Ok(Some(consensus));
}
2023-03-30 15:42:56 +03:00
// count votes one last time
Ok(self.count_votes(&backup_votes, web3_rpcs))
}
// TODO: have min_sum_soft_limit and min_head_rpcs on self instead of on Web3Rpcs
2023-03-30 15:42:56 +03:00
fn count_votes(
&self,
votes: &HashMap<Web3ProxyBlock, (HashSet<&str>, u32)>,
web3_rpcs: &Web3Rpcs,
) -> Option<ConsensusWeb3Rpcs> {
// sort the primary votes ascending by tier and descending by block num
2023-03-30 15:42:56 +03:00
let mut votes: Vec<_> = votes
.into_iter()
2023-03-30 15:42:56 +03:00
.map(|(block, (rpc_names, sum_soft_limit))| (block, sum_soft_limit, rpc_names))
.collect();
votes.sort_by_cached_key(|(block, sum_soft_limit, rpc_names)| {
(
Reverse(*block.number()),
Reverse(*sum_soft_limit),
Reverse(rpc_names.len()),
)
});
// return the first result that exceededs confgured minimums (if any)
for (maybe_head_block, sum_soft_limit, rpc_names) in votes {
if *sum_soft_limit < web3_rpcs.min_sum_soft_limit {
continue;
}
// TODO: different mins for backup vs primary
if rpc_names.len() < web3_rpcs.min_head_rpcs {
continue;
}
2023-03-23 00:23:14 +03:00
trace!("rpc_names: {:#?}", rpc_names);
// consensus likely found! load the rpcs to make sure they all have active connections
2023-03-30 15:42:56 +03:00
let consensus_rpcs: Vec<_> = rpc_names
.into_iter()
.filter_map(|x| web3_rpcs.get(x))
.collect();
if consensus_rpcs.len() < web3_rpcs.min_head_rpcs {
continue;
}
// consensus found!
2023-03-30 15:42:56 +03:00
let tier = consensus_rpcs
.iter()
.map(|x| x.tier)
.max()
.expect("there should always be a max");
let backups_needed = consensus_rpcs.iter().any(|x| x.backup);
let mut other_rpcs = BTreeMap::new();
for (x, x_head) in self
.rpc_heads
.iter()
.filter(|(k, _)| !consensus_rpcs.contains(k))
{
let x_head_num = *x_head.number();
let key: RpcRanking = RpcRanking::new(x.tier, x.backup, Some(x_head_num));
other_rpcs
.entry(key)
.or_insert_with(Vec::new)
.push(x.clone());
}
// TODO: how should we populate this?
let mut rpc_data = HashMap::with_capacity(self.rpc_heads.len());
for (x, x_head) in self.rpc_heads.iter() {
let y = RpcData::new(x, x_head);
rpc_data.insert(x.clone(), y);
}
let consensus = ConsensusWeb3Rpcs {
tier,
head_block: maybe_head_block.clone(),
head_rpcs: consensus_rpcs,
other_rpcs,
backups_needed,
rpc_data,
};
return Some(consensus);
}
None
}
pub fn worst_tier(&self) -> Option<u64> {
self.rpc_heads.iter().map(|(x, _)| x.tier).max()
}
}
2023-02-15 04:41:40 +03:00
#[cfg(test)]
mod test {
2023-02-15 23:33:43 +03:00
// #[test]
// fn test_simplest_case_consensus_head_connections() {
// todo!();
// }
2023-02-15 04:41:40 +03:00
}