more direct consensus finding code
this hopefully has less bugs. speed isn't super important since this isn't on the host path.
This commit is contained in:
parent
9fae137e45
commit
8eff48611c
2
TODO.md
2
TODO.md
@ -390,6 +390,7 @@ These are not yet ordered. There might be duplicates. We might not actually need
|
||||
- [x] maybe we shouldn't route eth_getLogs to syncing nodes. serving queries slows down sync significantly
|
||||
- change the send_best function to only include servers that are at least close to fully synced
|
||||
- [-] have private transactions be enabled by a url setting rather than a setting on the key
|
||||
- [ ] eth_sendRawTransaction should only forward if the chain_id matches what we are running
|
||||
- [ ] cli for adding rpc keys to an existing user
|
||||
- [ ] rate limiting/throttling on query_user_stats
|
||||
- [ ] web3rpc configs should have a max_concurrent_requests
|
||||
@ -425,6 +426,7 @@ These are not yet ordered. There might be duplicates. We might not actually need
|
||||
- [ ] implement remaining subscriptions
|
||||
- would be nice if our subscriptions had better gaurentees than geth/erigon do, but maybe simpler to just setup a broadcast channel and proxy all the respones to a backend instead
|
||||
- [ ] tests should use `test-env-log = "0.2.8"`
|
||||
- [ ] eth_sendRawTransaction should only forward if the chain_id matches what we are running
|
||||
- [ ] weighted random choice should still prioritize non-archive servers
|
||||
- maybe shuffle randomly and then sort by (block_limit, random_index)?
|
||||
- maybe sum available_requests grouped by archive/non-archive. only limit to non-archive if they have enough?
|
||||
|
@ -232,6 +232,7 @@ pub async fn serve(port: u16, proxy_app: Arc<Web3ProxyApp>) -> anyhow::Result<()
|
||||
axum::Server::bind(&addr)
|
||||
// TODO: option to use with_connect_info. we want it in dev, but not when running behind a proxy, but not
|
||||
.serve(service)
|
||||
.await
|
||||
.map_err(Into::into)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
@ -12,6 +12,7 @@ use log::{debug, trace, warn, Level};
|
||||
use moka::future::Cache;
|
||||
use serde::Serialize;
|
||||
use serde_json::json;
|
||||
use std::hash::Hash;
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
use std::{cmp::Ordering, fmt::Display, sync::Arc};
|
||||
use tokio::sync::broadcast;
|
||||
@ -42,6 +43,14 @@ impl PartialEq for Web3ProxyBlock {
|
||||
}
|
||||
}
|
||||
|
||||
impl Eq for Web3ProxyBlock {}
|
||||
|
||||
impl Hash for Web3ProxyBlock {
|
||||
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
|
||||
self.block.hash.hash(state);
|
||||
}
|
||||
}
|
||||
|
||||
impl Web3ProxyBlock {
|
||||
/// A new block has arrived over a subscription
|
||||
pub fn try_new(block: ArcBlock) -> Option<Self> {
|
||||
@ -393,36 +402,28 @@ impl Web3Rpcs {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let new_synced_connections = consensus_finder
|
||||
.best_consensus_connections(authorization, self)
|
||||
.await
|
||||
.context("no consensus head block!")
|
||||
.map_err(|err| {
|
||||
self.watch_consensus_rpcs_sender.send_replace(None);
|
||||
let new_synced_connections = match consensus_finder
|
||||
.find_consensus_connections(authorization, self)
|
||||
.await {
|
||||
Err(err) => {
|
||||
let err = err.context("error while finding consensus head block!");
|
||||
|
||||
err
|
||||
})?;
|
||||
|
||||
// TODO: what should we do if the block number of new_synced_connections is < old_synced_connections? wait?
|
||||
return Err(err);
|
||||
}
|
||||
Ok(None) => {
|
||||
return Err(anyhow!("no consensus head block!"));
|
||||
}
|
||||
Ok(Some(x)) => x
|
||||
};
|
||||
|
||||
let watch_consensus_head_sender = self.watch_consensus_head_sender.as_ref().unwrap();
|
||||
let consensus_tier = new_synced_connections.tier;
|
||||
let total_tiers = consensus_finder.len();
|
||||
// TODO: think more about this unwrap
|
||||
let total_tiers = consensus_finder.worst_tier().unwrap_or(10);
|
||||
let backups_needed = new_synced_connections.backups_needed;
|
||||
let consensus_head_block = new_synced_connections.head_block.clone();
|
||||
let num_consensus_rpcs = new_synced_connections.num_conns();
|
||||
let mut num_synced_rpcs = 0;
|
||||
let num_active_rpcs = consensus_finder
|
||||
.all_rpcs_group()
|
||||
.map(|x| {
|
||||
for v in x.rpc_to_block.values() {
|
||||
if *v == consensus_head_block {
|
||||
num_synced_rpcs += 1;
|
||||
}
|
||||
}
|
||||
x.len()
|
||||
})
|
||||
.unwrap_or_default();
|
||||
let num_active_rpcs = consensus_finder.len();
|
||||
let total_rpcs = self.by_name.read().len();
|
||||
|
||||
let old_consensus_head_connections = self
|
||||
@ -474,12 +475,11 @@ impl Web3Rpcs {
|
||||
// no change in hash. no need to use watch_consensus_head_sender
|
||||
// TODO: trace level if rpc is backup
|
||||
debug!(
|
||||
"con {}/{} {}{}/{}/{}/{} con={} rpc={}@{}",
|
||||
"con {}/{} {}{}/{}/{} con={} rpc={}@{}",
|
||||
consensus_tier,
|
||||
total_tiers,
|
||||
backups_voted_str,
|
||||
num_consensus_rpcs,
|
||||
num_synced_rpcs,
|
||||
num_active_rpcs,
|
||||
total_rpcs,
|
||||
consensus_head_block,
|
||||
@ -490,12 +490,11 @@ impl Web3Rpcs {
|
||||
// hash changed
|
||||
|
||||
debug!(
|
||||
"unc {}/{} {}{}/{}/{}/{} con_head={} old={} rpc={}@{}",
|
||||
"unc {}/{} {}{}/{}/{} con_head={} old={} rpc={}@{}",
|
||||
consensus_tier,
|
||||
total_tiers,
|
||||
backups_voted_str,
|
||||
num_consensus_rpcs,
|
||||
num_synced_rpcs,
|
||||
num_active_rpcs,
|
||||
total_rpcs,
|
||||
consensus_head_block,
|
||||
@ -518,12 +517,11 @@ impl Web3Rpcs {
|
||||
// this is unlikely but possible
|
||||
// TODO: better log
|
||||
warn!(
|
||||
"chain rolled back {}/{} {}{}/{}/{}/{} con={} old={} rpc={}@{}",
|
||||
"chain rolled back t{}/{} {}{}/{}/{} con={} old={} rpc={}@{}",
|
||||
consensus_tier,
|
||||
total_tiers,
|
||||
backups_voted_str,
|
||||
num_consensus_rpcs,
|
||||
num_synced_rpcs,
|
||||
num_active_rpcs,
|
||||
total_rpcs,
|
||||
consensus_head_block,
|
||||
@ -549,12 +547,11 @@ impl Web3Rpcs {
|
||||
}
|
||||
Ordering::Greater => {
|
||||
debug!(
|
||||
"new {}/{} {}{}/{}/{}/{} con={} rpc={}@{}",
|
||||
"new {}/{} {}{}/{}/{} con={} rpc={}@{}",
|
||||
consensus_tier,
|
||||
total_tiers,
|
||||
backups_voted_str,
|
||||
num_consensus_rpcs,
|
||||
num_synced_rpcs,
|
||||
num_active_rpcs,
|
||||
total_rpcs,
|
||||
consensus_head_block,
|
||||
|
@ -6,10 +6,11 @@ use super::one::Web3Rpc;
|
||||
use anyhow::Context;
|
||||
use ethers::prelude::{H256, U64};
|
||||
use hashbrown::{HashMap, HashSet};
|
||||
use log::{debug, trace, warn};
|
||||
use itertools::{Itertools, MinMaxResult};
|
||||
use log::{trace, warn};
|
||||
use moka::future::Cache;
|
||||
use serde::Serialize;
|
||||
use std::collections::BTreeMap;
|
||||
use std::cmp::Reverse;
|
||||
use std::fmt;
|
||||
use std::sync::Arc;
|
||||
use tokio::time::Instant;
|
||||
@ -18,23 +19,19 @@ use tokio::time::Instant;
|
||||
/// Serialize is so we can print it on our debug endpoint
|
||||
#[derive(Clone, Serialize)]
|
||||
pub struct ConsensusWeb3Rpcs {
|
||||
// TODO: tier should be an option, or we should have consensus be stored as an Option<ConsensusWeb3Rpcs>
|
||||
pub(super) tier: u64,
|
||||
pub(super) head_block: Web3ProxyBlock,
|
||||
// TODO: this should be able to serialize, but it isn't
|
||||
#[serde(skip_serializing)]
|
||||
pub(super) rpcs: Vec<Arc<Web3Rpc>>,
|
||||
pub(super) backups_voted: Option<Web3ProxyBlock>,
|
||||
pub(super) best_rpcs: Vec<Arc<Web3Rpc>>,
|
||||
// TODO: functions like "compare_backup_vote()"
|
||||
// pub(super) backups_voted: Option<Web3ProxyBlock>,
|
||||
pub(super) backups_needed: bool,
|
||||
}
|
||||
|
||||
impl ConsensusWeb3Rpcs {
|
||||
pub fn num_conns(&self) -> usize {
|
||||
self.rpcs.len()
|
||||
}
|
||||
|
||||
pub fn sum_soft_limit(&self) -> u32 {
|
||||
self.rpcs.iter().fold(0, |sum, rpc| sum + rpc.soft_limit)
|
||||
self.best_rpcs.len()
|
||||
}
|
||||
|
||||
// TODO: sum_hard_limit?
|
||||
@ -44,9 +41,9 @@ impl fmt::Debug for ConsensusWeb3Rpcs {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
// TODO: the default formatter takes forever to write. this is too quiet though
|
||||
// TODO: print the actual conns?
|
||||
f.debug_struct("ConsensusConnections")
|
||||
f.debug_struct("ConsensusWeb3Rpcs")
|
||||
.field("head_block", &self.head_block)
|
||||
.field("num_conns", &self.rpcs.len())
|
||||
.field("num_conns", &self.best_rpcs.len())
|
||||
.finish_non_exhaustive()
|
||||
}
|
||||
}
|
||||
@ -73,7 +70,7 @@ impl Web3Rpcs {
|
||||
let consensus = self.watch_consensus_rpcs_sender.borrow();
|
||||
|
||||
if let Some(consensus) = consensus.as_ref() {
|
||||
!consensus.rpcs.is_empty()
|
||||
!consensus.best_rpcs.is_empty()
|
||||
} else {
|
||||
false
|
||||
}
|
||||
@ -83,7 +80,7 @@ impl Web3Rpcs {
|
||||
let consensus = self.watch_consensus_rpcs_sender.borrow();
|
||||
|
||||
if let Some(consensus) = consensus.as_ref() {
|
||||
consensus.rpcs.len()
|
||||
consensus.best_rpcs.len()
|
||||
} else {
|
||||
0
|
||||
}
|
||||
@ -92,50 +89,51 @@ impl Web3Rpcs {
|
||||
|
||||
type FirstSeenCache = Cache<H256, Instant, hashbrown::hash_map::DefaultHashBuilder>;
|
||||
|
||||
pub struct ConnectionsGroup {
|
||||
pub rpc_to_block: HashMap<Arc<Web3Rpc>, Web3ProxyBlock>,
|
||||
// TODO: what if there are two blocks with the same number?
|
||||
pub highest_block: Option<Web3ProxyBlock>,
|
||||
/// A ConsensusConnections builder that tracks all connection heads across multiple groups of servers
|
||||
pub struct ConsensusFinder {
|
||||
/// backups for all tiers are only used if necessary
|
||||
/// tiers[0] = only tier 0.
|
||||
/// tiers[1] = tier 0 and tier 1
|
||||
/// tiers[n] = tier 0..=n
|
||||
/// This is a BTreeMap and not a Vec because sometimes a tier is empty
|
||||
rpc_heads: HashMap<Arc<Web3Rpc>, Web3ProxyBlock>,
|
||||
/// never serve blocks that are too old
|
||||
max_block_age: Option<u64>,
|
||||
/// tier 0 will be prefered as long as the distance between it and the other tiers is <= max_tier_lag
|
||||
max_block_lag: Option<U64>,
|
||||
/// used to track rpc.head_latency. The same cache should be shared between all ConnectionsGroups
|
||||
first_seen: FirstSeenCache,
|
||||
}
|
||||
|
||||
impl ConnectionsGroup {
|
||||
pub fn new(first_seen: FirstSeenCache) -> Self {
|
||||
impl ConsensusFinder {
|
||||
pub fn new(max_block_age: Option<u64>, max_block_lag: Option<U64>) -> Self {
|
||||
// TODO: what's a good capacity for this? it shouldn't need to be very large
|
||||
// TODO: if we change Web3ProxyBlock to store the instance, i think we could use the block_by_hash cache
|
||||
let first_seen = Cache::builder()
|
||||
.max_capacity(16)
|
||||
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default());
|
||||
|
||||
// TODO: hard coding 0-9 isn't great, but its easier than refactoring this to be smart about config reloading
|
||||
let rpc_heads = HashMap::new();
|
||||
|
||||
Self {
|
||||
rpc_to_block: Default::default(),
|
||||
highest_block: Default::default(),
|
||||
rpc_heads,
|
||||
max_block_age,
|
||||
max_block_lag,
|
||||
first_seen,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn len(&self) -> usize {
|
||||
self.rpc_to_block.len()
|
||||
self.rpc_heads.len()
|
||||
}
|
||||
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.rpc_to_block.is_empty()
|
||||
self.rpc_heads.is_empty()
|
||||
}
|
||||
|
||||
fn remove(&mut self, rpc: &Arc<Web3Rpc>) -> Option<Web3ProxyBlock> {
|
||||
if let Some(removed_block) = self.rpc_to_block.remove(rpc) {
|
||||
match self.highest_block.as_mut() {
|
||||
None => {}
|
||||
Some(current_highest_block) => {
|
||||
if removed_block.hash() == current_highest_block.hash() {
|
||||
for maybe_highest_block in self.rpc_to_block.values() {
|
||||
if maybe_highest_block.number() > current_highest_block.number() {
|
||||
*current_highest_block = maybe_highest_block.clone();
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Some(removed_block)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
self.rpc_heads.remove(rpc)
|
||||
}
|
||||
|
||||
async fn insert(&mut self, rpc: Arc<Web3Rpc>, block: Web3ProxyBlock) -> Option<Web3ProxyBlock> {
|
||||
@ -150,322 +148,7 @@ impl ConnectionsGroup {
|
||||
|
||||
rpc.head_latency.write().record(latency);
|
||||
|
||||
// TODO: what about a reorg to the same height?
|
||||
if Some(block.number()) > self.highest_block.as_ref().map(|x| x.number()) {
|
||||
self.highest_block = Some(block.clone());
|
||||
}
|
||||
|
||||
self.rpc_to_block.insert(rpc, block)
|
||||
}
|
||||
|
||||
/// min_consensus_block_num keeps us from ever going backwards.
|
||||
/// TODO: think about min_consensus_block_num more. i think this might cause an outage if the chain is doing weird things. but 503s is probably better than broken data.
|
||||
pub(self) async fn consensus_head_connections(
|
||||
&self,
|
||||
authorization: &Arc<Authorization>,
|
||||
web3_rpcs: &Web3Rpcs,
|
||||
min_consensus_block_num: Option<U64>,
|
||||
tier: &u64,
|
||||
) -> anyhow::Result<ConsensusWeb3Rpcs> {
|
||||
let mut maybe_head_block = match self.highest_block.clone() {
|
||||
None => return Err(anyhow::anyhow!("no blocks known")),
|
||||
Some(x) => x,
|
||||
};
|
||||
|
||||
// TODO: take max_distance_consensus_to_highest as an argument?
|
||||
// TODO: what if someone's backup node is misconfigured and goes on a really fast forked chain?
|
||||
let max_lag_consensus_to_highest =
|
||||
if let Some(min_consensus_block_num) = min_consensus_block_num {
|
||||
maybe_head_block
|
||||
.number()
|
||||
.saturating_add(1.into())
|
||||
.saturating_sub(min_consensus_block_num)
|
||||
.as_u64()
|
||||
} else {
|
||||
10
|
||||
};
|
||||
|
||||
trace!(
|
||||
"max_lag_consensus_to_highest: {}",
|
||||
max_lag_consensus_to_highest
|
||||
);
|
||||
|
||||
let num_known = self.rpc_to_block.len();
|
||||
|
||||
if num_known < web3_rpcs.min_head_rpcs {
|
||||
return Err(anyhow::anyhow!(
|
||||
"not enough rpcs connected: {}/{}",
|
||||
num_known,
|
||||
web3_rpcs.min_head_rpcs,
|
||||
));
|
||||
}
|
||||
|
||||
let mut primary_rpcs_voted: Option<Web3ProxyBlock> = None;
|
||||
let mut backup_rpcs_voted: Option<Web3ProxyBlock> = None;
|
||||
|
||||
// track rpcs on this heaviest chain so we can build a new ConsensusConnections
|
||||
let mut primary_consensus_rpcs = HashSet::<&str>::new();
|
||||
let mut backup_consensus_rpcs = HashSet::<&str>::new();
|
||||
let mut skip_rpcs = HashSet::<&str>::new();
|
||||
|
||||
// a running total of the soft limits covered by the rpcs that agree on the head block
|
||||
let mut primary_sum_soft_limit: u32 = 0;
|
||||
let mut backup_sum_soft_limit: u32 = 0;
|
||||
|
||||
// TODO: also track the sum of *available* hard_limits. if any servers have no hard limits, use their soft limit or no limit?
|
||||
|
||||
// check the highest work block for a set of rpcs that can serve our request load
|
||||
// if it doesn't have enough rpcs for our request load, check the parent block
|
||||
// TODO: loop for how many parent blocks? we don't want to serve blocks that are too far behind. probably different per chain
|
||||
// TODO: this loop is pretty long. any way to clean up this code?
|
||||
for _ in 0..max_lag_consensus_to_highest {
|
||||
let maybe_head_hash = maybe_head_block.hash();
|
||||
|
||||
// find all rpcs with maybe_head_hash as their current head
|
||||
for (rpc, rpc_head) in self.rpc_to_block.iter() {
|
||||
if rpc_head.hash() != maybe_head_hash {
|
||||
// connection is not on the desired block
|
||||
continue;
|
||||
}
|
||||
let rpc_name = rpc.name.as_str();
|
||||
if backup_consensus_rpcs.contains(rpc_name) {
|
||||
// connection is on a later block in this same chain
|
||||
continue;
|
||||
}
|
||||
if primary_consensus_rpcs.contains(rpc_name) {
|
||||
// connection is on a later block in this same chain
|
||||
continue;
|
||||
}
|
||||
if skip_rpcs.contains(rpc_name) {
|
||||
// connection is missing or theres some other reason to skip this rpc
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Some(rpc) = web3_rpcs.by_name.read().get(rpc_name) {
|
||||
if backup_rpcs_voted.is_some() {
|
||||
// backups already voted for a head block on another tier. don't change it
|
||||
} else {
|
||||
backup_consensus_rpcs.insert(rpc_name);
|
||||
backup_sum_soft_limit += rpc.soft_limit;
|
||||
}
|
||||
if !rpc.backup {
|
||||
primary_consensus_rpcs.insert(rpc_name);
|
||||
primary_sum_soft_limit += rpc.soft_limit;
|
||||
}
|
||||
} else {
|
||||
// i don't think this is an error. i think its just if a reconnect is currently happening
|
||||
if web3_rpcs.synced() {
|
||||
warn!("connection missing: {}", rpc_name);
|
||||
debug!("web3_rpcs.by_name: {:#?}", web3_rpcs.by_name);
|
||||
} else {
|
||||
debug!("connection missing: {}", rpc_name);
|
||||
}
|
||||
skip_rpcs.insert(rpc_name);
|
||||
}
|
||||
}
|
||||
|
||||
if primary_sum_soft_limit >= web3_rpcs.min_sum_soft_limit
|
||||
&& primary_consensus_rpcs.len() >= web3_rpcs.min_head_rpcs
|
||||
{
|
||||
// we have enough servers with enough requests! yey!
|
||||
primary_rpcs_voted = Some(maybe_head_block.clone());
|
||||
break;
|
||||
}
|
||||
|
||||
if backup_rpcs_voted.is_none()
|
||||
&& backup_sum_soft_limit >= web3_rpcs.min_sum_soft_limit
|
||||
&& backup_consensus_rpcs.len() >= web3_rpcs.min_head_rpcs
|
||||
{
|
||||
// if we include backup servers, we have enough servers with high enough limits
|
||||
backup_rpcs_voted = Some(maybe_head_block.clone());
|
||||
}
|
||||
|
||||
// not enough rpcs on this block. check the parent block
|
||||
match web3_rpcs
|
||||
.block(authorization, maybe_head_block.parent_hash(), None)
|
||||
.await
|
||||
{
|
||||
Ok(parent_block) => {
|
||||
// trace!(
|
||||
// child=%maybe_head_hash, parent=%parent_block.hash.unwrap(), "avoiding thundering herd. checking consensus on parent block",
|
||||
// );
|
||||
maybe_head_block = parent_block;
|
||||
continue;
|
||||
}
|
||||
Err(err) => {
|
||||
let soft_limit_percent = (primary_sum_soft_limit as f32
|
||||
/ web3_rpcs.min_sum_soft_limit as f32)
|
||||
* 100.0;
|
||||
|
||||
let err_msg = format!("ran out of parents to check. rpcs {}/{}/{}. soft limit: {:.2}% ({}/{}). err: {:#?}",
|
||||
primary_consensus_rpcs.len(),
|
||||
num_known,
|
||||
web3_rpcs.min_head_rpcs,
|
||||
soft_limit_percent,
|
||||
primary_sum_soft_limit,
|
||||
web3_rpcs.min_sum_soft_limit,
|
||||
err,
|
||||
);
|
||||
|
||||
if backup_rpcs_voted.is_some() {
|
||||
warn!("{}. using backup vote", err_msg);
|
||||
break;
|
||||
} else {
|
||||
return Err(anyhow::anyhow!(err_msg));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: if consensus_head_rpcs.is_empty, try another method of finding the head block. will need to change the return Err above into breaks.
|
||||
|
||||
// we've done all the searching for the heaviest block that we can
|
||||
if (primary_consensus_rpcs.len() < web3_rpcs.min_head_rpcs
|
||||
|| primary_sum_soft_limit < web3_rpcs.min_sum_soft_limit)
|
||||
&& backup_rpcs_voted.is_none()
|
||||
{
|
||||
// if we get here, not enough servers are synced. return an error
|
||||
let soft_limit_percent =
|
||||
(primary_sum_soft_limit as f32 / web3_rpcs.min_sum_soft_limit as f32) * 100.0;
|
||||
|
||||
return Err(anyhow::anyhow!(
|
||||
"Not enough resources. rpcs {}/{}/{}. soft limit: {:.2}% ({}/{})",
|
||||
primary_consensus_rpcs.len(),
|
||||
num_known,
|
||||
web3_rpcs.min_head_rpcs,
|
||||
primary_sum_soft_limit,
|
||||
web3_rpcs.min_sum_soft_limit,
|
||||
soft_limit_percent,
|
||||
));
|
||||
}
|
||||
|
||||
// success! a block has enough soft limit and nodes on it (or on later blocks)
|
||||
let backups_needed;
|
||||
let (head_block, consensus_rpcs) = if let Some(head_block) = primary_rpcs_voted {
|
||||
backups_needed = false;
|
||||
(head_block, primary_consensus_rpcs)
|
||||
} else if let Some(head_block) = backup_rpcs_voted.clone() {
|
||||
backups_needed = true;
|
||||
(head_block, backup_consensus_rpcs)
|
||||
} else {
|
||||
return Err(anyhow::anyhow!("No head voted!"));
|
||||
};
|
||||
|
||||
#[cfg(debug_assertions)]
|
||||
{
|
||||
let _ = head_block.hash();
|
||||
let _ = head_block.number();
|
||||
}
|
||||
|
||||
let consensus_rpcs: Vec<Arc<Web3Rpc>> = consensus_rpcs
|
||||
.into_iter()
|
||||
.filter_map(|conn_name| web3_rpcs.by_name.read().get(conn_name).cloned())
|
||||
.collect();
|
||||
|
||||
Ok(ConsensusWeb3Rpcs {
|
||||
tier: *tier,
|
||||
head_block,
|
||||
rpcs: consensus_rpcs,
|
||||
backups_voted: backup_rpcs_voted,
|
||||
backups_needed,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// A ConsensusConnections builder that tracks all connection heads across multiple groups of servers
|
||||
pub struct ConsensusFinder {
|
||||
/// backups for all tiers are only used if necessary
|
||||
/// tiers[0] = only tier 0.
|
||||
/// tiers[1] = tier 0 and tier 1
|
||||
/// tiers[n] = tier 0..=n
|
||||
/// This is a BTreeMap and not a Vec because sometimes a tier is empty
|
||||
tiers: BTreeMap<u64, ConnectionsGroup>,
|
||||
/// never serve blocks that are too old
|
||||
max_block_age: Option<u64>,
|
||||
/// tier 0 will be prefered as long as the distance between it and the other tiers is <= max_tier_lag
|
||||
max_block_lag: Option<U64>,
|
||||
}
|
||||
|
||||
impl ConsensusFinder {
|
||||
pub fn new(max_block_age: Option<u64>, max_block_lag: Option<U64>) -> Self {
|
||||
// TODO: what's a good capacity for this? it shouldn't need to be very large
|
||||
// TODO: if we change Web3ProxyBlock to store the instance, i think we could use the block_by_hash cache
|
||||
let first_seen = Cache::builder()
|
||||
.max_capacity(16)
|
||||
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default());
|
||||
|
||||
// TODO: hard coding 0-9 isn't great, but its easier than refactoring this to be smart about config reloading
|
||||
let tiers = (0..10)
|
||||
.map(|x| (x, ConnectionsGroup::new(first_seen.clone())))
|
||||
.collect();
|
||||
|
||||
Self {
|
||||
tiers,
|
||||
max_block_age,
|
||||
max_block_lag,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn len(&self) -> usize {
|
||||
self.tiers.len()
|
||||
}
|
||||
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.tiers.is_empty()
|
||||
}
|
||||
|
||||
/// get the ConnectionsGroup that contains all rpcs
|
||||
/// panics if there are no tiers
|
||||
pub fn all_rpcs_group(&self) -> Option<&ConnectionsGroup> {
|
||||
self.tiers.values().last()
|
||||
}
|
||||
|
||||
/// get the mutable ConnectionsGroup that contains all rpcs
|
||||
pub fn all_mut(&mut self) -> Option<&mut ConnectionsGroup> {
|
||||
self.tiers.values_mut().last()
|
||||
}
|
||||
|
||||
pub fn remove(&mut self, rpc: &Arc<Web3Rpc>) -> Option<Web3ProxyBlock> {
|
||||
let mut removed = None;
|
||||
|
||||
for (i, tier_group) in self.tiers.iter_mut().rev() {
|
||||
if i < &rpc.tier {
|
||||
break;
|
||||
}
|
||||
let x = tier_group.remove(rpc);
|
||||
|
||||
if removed.is_none() && x.is_some() {
|
||||
removed = x;
|
||||
}
|
||||
}
|
||||
|
||||
removed
|
||||
}
|
||||
|
||||
/// returns the block that the rpc was on before updating to the new_block
|
||||
pub async fn insert(
|
||||
&mut self,
|
||||
rpc: &Arc<Web3Rpc>,
|
||||
new_block: Web3ProxyBlock,
|
||||
) -> Option<Web3ProxyBlock> {
|
||||
let mut old = None;
|
||||
|
||||
// TODO: error if rpc.tier is not in self.tiers
|
||||
|
||||
for (i, tier_group) in self.tiers.iter_mut().rev() {
|
||||
if i < &rpc.tier {
|
||||
break;
|
||||
}
|
||||
|
||||
// TODO: should new_block be a ref?
|
||||
let x = tier_group.insert(rpc.clone(), new_block.clone()).await;
|
||||
|
||||
if old.is_none() && x.is_some() {
|
||||
old = x;
|
||||
}
|
||||
}
|
||||
|
||||
old
|
||||
self.rpc_heads.insert(rpc, block)
|
||||
}
|
||||
|
||||
/// Update our tracking of the rpc and return true if something changed
|
||||
@ -499,8 +182,8 @@ impl ConsensusFinder {
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(prev_block) = self.insert(&rpc, rpc_head_block.clone()).await {
|
||||
// false if this block was already sent by this rpc. return early
|
||||
if let Some(prev_block) = self.insert(rpc, rpc_head_block.clone()).await {
|
||||
// false if this block was already sent by this rpc
|
||||
// true if new block for this rpc
|
||||
prev_block.hash() != rpc_head_block.hash()
|
||||
} else {
|
||||
@ -518,47 +201,155 @@ impl ConsensusFinder {
|
||||
Ok(changed)
|
||||
}
|
||||
|
||||
pub async fn best_consensus_connections(
|
||||
pub async fn find_consensus_connections(
|
||||
&mut self,
|
||||
authorization: &Arc<Authorization>,
|
||||
web3_connections: &Web3Rpcs,
|
||||
) -> anyhow::Result<ConsensusWeb3Rpcs> {
|
||||
// TODO: attach context to these?
|
||||
let highest_known_block = self
|
||||
.all_rpcs_group()
|
||||
.context("no rpcs")?
|
||||
.highest_block
|
||||
.as_ref()
|
||||
.context("no highest block")?;
|
||||
web3_rpcs: &Web3Rpcs,
|
||||
) -> anyhow::Result<Option<ConsensusWeb3Rpcs>> {
|
||||
let minmax_block = self
|
||||
.rpc_heads
|
||||
.values().minmax_by_key(|&x| x.number());
|
||||
|
||||
trace!("highest_known_block: {}", highest_known_block);
|
||||
let (lowest_block, highest_block) = match minmax_block {
|
||||
MinMaxResult::NoElements => return Ok(None),
|
||||
MinMaxResult::OneElement(x) => (x, x),
|
||||
MinMaxResult::MinMax(min, max) => (min, max),
|
||||
};
|
||||
|
||||
let min_block_num = self
|
||||
.max_block_lag
|
||||
.map(|x| highest_known_block.number().saturating_sub(x))
|
||||
// we also want to be sure we don't ever go backwards!
|
||||
.max(web3_connections.head_block_num());
|
||||
let highest_block_number = highest_block.number();
|
||||
|
||||
trace!("min_block_num: {:#?}", min_block_num);
|
||||
trace!("highest_block_number: {}", highest_block_number);
|
||||
|
||||
// TODO Should this be a Vec<Result<Option<_, _>>>?
|
||||
// TODO: how should errors be handled?
|
||||
// TODO: find the best tier with a connectionsgroup. best case, this only queries the first tier
|
||||
// TODO: do we need to calculate all of them? I think having highest_known_block included as part of min_block_num should make that unnecessary
|
||||
for (tier, x) in self.tiers.iter() {
|
||||
trace!("checking tier {}: {:#?}", tier, x.rpc_to_block);
|
||||
if let Ok(consensus_head_connections) = x
|
||||
.consensus_head_connections(authorization, web3_connections, min_block_num, tier)
|
||||
.await
|
||||
{
|
||||
trace!("success on tier {}", tier);
|
||||
// we got one! hopefully it didn't need to use any backups.
|
||||
// but even if it did need backup servers, that is better than going to a worse tier
|
||||
return Ok(consensus_head_connections);
|
||||
trace!("lowest_block_number: {}", lowest_block.number());
|
||||
|
||||
let max_lag_block_number = highest_block_number.saturating_sub(self.max_block_lag.unwrap_or_else(|| U64::from(10)));
|
||||
|
||||
trace!("max_lag_block_number: {}", max_lag_block_number);
|
||||
|
||||
let lowest_block_number = lowest_block.number().max(&max_lag_block_number);
|
||||
|
||||
trace!("safe lowest_block_number: {}", lowest_block_number);
|
||||
|
||||
let num_known = self.rpc_heads.len();
|
||||
|
||||
if num_known < web3_rpcs.min_head_rpcs {
|
||||
// this keeps us from serving requests when the proxy first starts
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
// TODO: also track the sum of *available* hard_limits? if any servers have no hard limits, use their soft limit or no limit?
|
||||
// TODO: struct for the value of the votes hashmap?
|
||||
let mut primary_votes: HashMap<Web3ProxyBlock, (HashSet<&str>, u32)> = Default::default();
|
||||
let mut backup_votes: HashMap<Web3ProxyBlock, (HashSet<&str>, u32)> = Default::default();
|
||||
|
||||
let mut backup_consensus = None;
|
||||
|
||||
let mut rpc_heads_by_tier: Vec<_> = self.rpc_heads.iter().collect();
|
||||
rpc_heads_by_tier.sort_by_cached_key(|(rpc, _)| rpc.tier);
|
||||
|
||||
let current_tier = rpc_heads_by_tier.first().expect("rpc_heads_by_tier should never be empty").0.tier;
|
||||
|
||||
// loop over all the rpc heads (grouped by tier) and their parents to find consensus
|
||||
// TODO: i'm sure theres a lot of shortcuts that could be taken, but this is simplest to implement
|
||||
for (rpc, rpc_head) in self.rpc_heads.iter() {
|
||||
if current_tier != rpc.tier {
|
||||
// we finished processing a tier. check for primary results
|
||||
if let Some(consensus) = self.count_votes(&primary_votes, web3_rpcs) {
|
||||
return Ok(Some(consensus))
|
||||
}
|
||||
|
||||
// only set backup consensus once. we don't want it to keep checking on worse tiers if it already found consensus
|
||||
if backup_consensus.is_none() {
|
||||
if let Some(consensus) = self.count_votes(&backup_votes, web3_rpcs) {
|
||||
backup_consensus =Some(consensus)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let mut block_to_check = rpc_head.clone();
|
||||
|
||||
while block_to_check.number() >= lowest_block_number {
|
||||
if !rpc.backup {
|
||||
// backup nodes are excluded from the primary voting
|
||||
let entry = primary_votes.entry(block_to_check.clone()).or_default();
|
||||
|
||||
entry.0.insert(&rpc.name);
|
||||
entry.1 += rpc.soft_limit;
|
||||
}
|
||||
|
||||
// both primary and backup rpcs get included in the backup voting
|
||||
let backup_entry = backup_votes.entry(block_to_check.clone()).or_default();
|
||||
|
||||
backup_entry.0.insert(&rpc.name);
|
||||
backup_entry.1 += rpc.soft_limit;
|
||||
|
||||
match web3_rpcs.block(authorization, block_to_check.parent_hash(), Some(rpc)).await {
|
||||
Ok(parent_block) => block_to_check = parent_block,
|
||||
Err(err) => {
|
||||
warn!("Problem fetching parent block of {:#?} during consensus finding: {:#?}", block_to_check, err);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return Err(anyhow::anyhow!("failed finding consensus on all tiers"));
|
||||
// we finished processing all tiers. check for primary results (if anything but the last tier found consensus, we already returned above)
|
||||
if let Some(consensus) = self.count_votes(&primary_votes, web3_rpcs) {
|
||||
return Ok(Some(consensus))
|
||||
}
|
||||
|
||||
// only set backup consensus once. we don't want it to keep checking on worse tiers if it already found consensus
|
||||
if let Some(consensus) = backup_consensus {
|
||||
return Ok(Some(consensus));
|
||||
}
|
||||
|
||||
// count votes one last time
|
||||
Ok(self.count_votes(&backup_votes, web3_rpcs))
|
||||
}
|
||||
|
||||
// TODO: have min_sum_soft_limit and min_head_rpcs on self instead of on Web3Rpcs
|
||||
fn count_votes(&self, votes: &HashMap<Web3ProxyBlock, (HashSet<&str>, u32)>, web3_rpcs: &Web3Rpcs) -> Option<ConsensusWeb3Rpcs> {
|
||||
// sort the primary votes ascending by tier and descending by block num
|
||||
let mut votes: Vec<_> = votes.iter().map(|(block, (rpc_names, sum_soft_limit))| (block, sum_soft_limit, rpc_names)).collect();
|
||||
votes.sort_by_cached_key(|(block, sum_soft_limit, rpc_names)| (Reverse(*block.number()), Reverse(*sum_soft_limit), Reverse(rpc_names.len())));
|
||||
|
||||
// return the first result that exceededs confgured minimums (if any)
|
||||
for (maybe_head_block, sum_soft_limit, rpc_names) in votes {
|
||||
if *sum_soft_limit < web3_rpcs.min_sum_soft_limit {
|
||||
continue;
|
||||
}
|
||||
// TODO: different mins for backup vs primary
|
||||
if rpc_names.len() < web3_rpcs.min_head_rpcs {
|
||||
continue;
|
||||
}
|
||||
|
||||
// consensus likely found! load the rpcs to make sure they all have active connections
|
||||
let consensus_rpcs: Vec<_> = rpc_names.into_iter().filter_map(|x| web3_rpcs.get(x)).collect();
|
||||
|
||||
if consensus_rpcs.len() < web3_rpcs.min_head_rpcs {
|
||||
continue;
|
||||
}
|
||||
// consensus found!
|
||||
|
||||
let tier = consensus_rpcs.iter().map(|x| x.tier).max().expect("there should always be a max");
|
||||
|
||||
let backups_needed = consensus_rpcs.iter().any(|x| x.backup);
|
||||
|
||||
let consensus = ConsensusWeb3Rpcs {
|
||||
tier,
|
||||
head_block: maybe_head_block.clone(),
|
||||
best_rpcs: consensus_rpcs,
|
||||
backups_needed,
|
||||
};
|
||||
|
||||
return Some(consensus);
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
|
||||
pub fn worst_tier(&self) -> Option<u64> {
|
||||
self.rpc_heads.iter().map(|(x, _)| x.tier).max()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -27,7 +27,7 @@ use serde::ser::{SerializeStruct, Serializer};
|
||||
use serde::Serialize;
|
||||
use serde_json::json;
|
||||
use serde_json::value::RawValue;
|
||||
use std::cmp::min_by_key;
|
||||
use std::cmp::{min_by_key, Reverse};
|
||||
use std::collections::BTreeMap;
|
||||
use std::sync::atomic::{self, Ordering};
|
||||
use std::sync::Arc;
|
||||
@ -611,7 +611,7 @@ impl Web3Rpcs {
|
||||
// they are all at the same block and it is already sized to what we need
|
||||
let key = (0, None);
|
||||
|
||||
for x in synced_connections.rpcs.iter() {
|
||||
for x in synced_connections.best_rpcs.iter() {
|
||||
if skip.contains(x) {
|
||||
trace!("skipping: {}", x);
|
||||
continue;
|
||||
@ -741,7 +741,7 @@ impl Web3Rpcs {
|
||||
let synced_rpcs = self.watch_consensus_rpcs_sender.borrow();
|
||||
|
||||
if let Some(synced_rpcs) = synced_rpcs.as_ref() {
|
||||
synced_rpcs.rpcs.clone()
|
||||
synced_rpcs.best_rpcs.clone()
|
||||
} else {
|
||||
vec![]
|
||||
}
|
||||
@ -1219,9 +1219,8 @@ impl Serialize for Web3Rpcs {
|
||||
/// TODO: should this be moved into a `impl Web3Rpc`?
|
||||
/// TODO: i think we still have sorts scattered around the code that should use this
|
||||
/// TODO: take AsRef or something like that? We don't need an Arc here
|
||||
fn rpc_sync_status_sort_key(x: &Arc<Web3Rpc>) -> (U64, u64, bool, OrderedFloat<f64>) {
|
||||
let reversed_head_block = U64::MAX
|
||||
- x.head_block
|
||||
fn rpc_sync_status_sort_key(x: &Arc<Web3Rpc>) -> (Reverse<U64>, u64, bool, OrderedFloat<f64>) {
|
||||
let head_block = x.head_block
|
||||
.read()
|
||||
.as_ref()
|
||||
.map(|x| *x.number())
|
||||
@ -1241,7 +1240,7 @@ fn rpc_sync_status_sort_key(x: &Arc<Web3Rpc>) -> (U64, u64, bool, OrderedFloat<f
|
||||
|
||||
let backup = x.backup;
|
||||
|
||||
(reversed_head_block, tier, backup, peak_ewma)
|
||||
(Reverse(head_block), tier, backup, peak_ewma)
|
||||
}
|
||||
|
||||
mod tests {
|
||||
|
@ -191,7 +191,7 @@ impl OpenRequestHandle {
|
||||
// TODO: replace ethers-rs providers with our own that supports streaming the responses
|
||||
let response = match provider.as_ref() {
|
||||
#[cfg(test)]
|
||||
Web3Provider::Mock => unimplemented!(),
|
||||
Web3Provider::Mock => return Err(ProviderError::CustomError("mock provider can't respond".to_string())),
|
||||
Web3Provider::Ws(p) => p.request(method, params).await,
|
||||
Web3Provider::Http(p) | Web3Provider::Both(p, _) => {
|
||||
// TODO: i keep hearing that http is faster. but ws has always been better for me. investigate more with actual benchmarks
|
||||
|
Loading…
Reference in New Issue
Block a user