2023-02-14 23:14:50 +03:00
use super ::blockchain ::Web3ProxyBlock ;
use super ::many ::Web3Rpcs ;
use super ::one ::Web3Rpc ;
2023-05-31 07:26:11 +03:00
use crate ::errors ::{ Web3ProxyErrorContext , Web3ProxyResult } ;
2023-01-26 08:24:09 +03:00
use crate ::frontend ::authorization ::Authorization ;
2023-06-09 22:21:50 +03:00
use anyhow ::Context ;
use base64 ::engine ::general_purpose ;
2023-05-13 01:15:32 +03:00
use derive_more ::Constructor ;
2023-02-14 23:14:50 +03:00
use ethers ::prelude ::{ H256 , U64 } ;
use hashbrown ::{ HashMap , HashSet } ;
2023-06-09 22:21:50 +03:00
use hdrhistogram ::serialization ::{ Serializer , V2DeflateSerializer } ;
use hdrhistogram ::Histogram ;
2023-03-21 21:16:18 +03:00
use itertools ::{ Itertools , MinMaxResult } ;
2023-06-09 23:09:58 +03:00
use log ::{ log_enabled , trace , warn , Level } ;
2023-06-08 03:26:38 +03:00
use moka ::future ::Cache ;
2023-02-14 23:14:50 +03:00
use serde ::Serialize ;
2023-05-17 04:18:56 +03:00
use std ::cmp ::{ Ordering , Reverse } ;
2023-05-13 01:15:32 +03:00
use std ::collections ::BTreeMap ;
2023-02-14 23:14:50 +03:00
use std ::fmt ;
2023-06-09 22:21:50 +03:00
use std ::sync ::{ atomic , Arc } ;
2023-02-15 23:33:43 +03:00
use tokio ::time ::Instant ;
2023-02-14 23:14:50 +03:00
2023-05-13 01:15:32 +03:00
#[ derive(Clone, Serialize) ]
2023-06-09 22:21:50 +03:00
struct ConsensusRpcData {
2023-05-13 01:15:32 +03:00
head_block_num : U64 ,
// TODO: this is too simple. erigon has 4 prune levels (hrct)
oldest_block_num : U64 ,
}
2023-06-09 22:21:50 +03:00
impl ConsensusRpcData {
2023-05-13 01:15:32 +03:00
fn new ( rpc : & Web3Rpc , head : & Web3ProxyBlock ) -> Self {
let head_block_num = * head . number ( ) ;
let block_data_limit = rpc . block_data_limit ( ) ;
let oldest_block_num = head_block_num . saturating_sub ( block_data_limit ) ;
Self {
head_block_num ,
oldest_block_num ,
}
}
// TODO: take an enum for the type of data (hrtc)
fn data_available ( & self , block_num : & U64 ) -> bool {
* block_num > = self . oldest_block_num & & * block_num < = self . head_block_num
}
}
#[ derive(Constructor, Clone, Copy, Debug, Default, Eq, PartialEq, Serialize) ]
pub struct RpcRanking {
2023-06-13 06:44:52 +03:00
tier : u32 ,
2023-05-13 01:15:32 +03:00
backup : bool ,
head_num : Option < U64 > ,
}
impl RpcRanking {
2023-06-13 06:44:52 +03:00
pub fn add_offset ( & self , offset : u32 ) -> Self {
2023-05-13 01:15:32 +03:00
Self {
2023-06-13 06:44:52 +03:00
tier : self . tier . saturating_add ( offset ) ,
2023-05-13 01:15:32 +03:00
backup : self . backup ,
head_num : self . head_num ,
}
}
pub fn default_with_backup ( backup : bool ) -> Self {
Self {
backup ,
.. Default ::default ( )
}
}
2023-06-13 06:44:52 +03:00
fn sort_key ( & self ) -> ( bool , u32 , Reverse < Option < U64 > > ) {
2023-05-13 01:15:32 +03:00
// TODO: add soft_limit here? add peak_ewma here?
2023-06-09 22:21:50 +03:00
// TODO: should backup or tier be checked first? now that tiers are automated, backups
2023-06-09 23:09:58 +03:00
// TODO: should we include a random number in here?
// TODO: should we include peak_ewma_latency or weighted_peak_ewma_latency?
2023-06-09 22:21:50 +03:00
( ! self . backup , self . tier , Reverse ( self . head_num ) )
2023-05-13 01:15:32 +03:00
}
}
impl Ord for RpcRanking {
fn cmp ( & self , other : & Self ) -> std ::cmp ::Ordering {
self . sort_key ( ) . cmp ( & other . sort_key ( ) )
}
}
impl PartialOrd for RpcRanking {
fn partial_cmp ( & self , other : & Self ) -> Option < std ::cmp ::Ordering > {
Some ( self . cmp ( other ) )
}
}
pub type RankedRpcMap = BTreeMap < RpcRanking , Vec < Arc < Web3Rpc > > > ;
2023-05-17 08:29:36 +03:00
pub enum ShouldWaitForBlock {
Ready ,
Wait { current : Option < U64 > } ,
NeverReady ,
}
2023-02-14 23:14:50 +03:00
/// A collection of Web3Rpcs that are on the same block.
2023-05-17 08:29:36 +03:00
/// Serialize is so we can print it on our /status endpoint
2023-06-09 22:21:50 +03:00
/// TODO: one data structure of head_rpcs and other_rpcs that is sorted best first
2023-02-27 09:44:09 +03:00
#[ derive(Clone, Serialize) ]
2023-02-14 23:14:50 +03:00
pub struct ConsensusWeb3Rpcs {
2023-06-13 06:44:52 +03:00
pub ( crate ) tier : u32 ,
2023-05-13 01:15:32 +03:00
pub ( crate ) backups_needed : bool ,
2023-05-13 09:22:20 +03:00
2023-05-17 08:29:36 +03:00
// TODO: this is already inside best_rpcs. Don't skip, instead make a shorter serialize
2023-05-13 09:22:20 +03:00
#[ serde(skip_serializing) ]
2023-04-14 03:15:01 +03:00
pub ( crate ) head_block : Web3ProxyBlock ,
2023-05-13 09:22:20 +03:00
// TODO: smaller serialize
2023-05-17 08:29:36 +03:00
pub ( crate ) head_rpcs : Vec < Arc < Web3Rpc > > ,
2023-05-13 09:22:20 +03:00
2023-05-17 08:29:36 +03:00
// TODO: make this work. the key needs to be a string. I think we need `serialize_with`
2023-05-13 09:22:20 +03:00
#[ serde(skip_serializing) ]
2023-05-13 01:15:32 +03:00
pub ( crate ) other_rpcs : RankedRpcMap ,
2023-05-17 08:29:36 +03:00
// TODO: make this work. the key needs to be a string. I think we need `serialize_with`
2023-05-13 09:22:20 +03:00
#[ serde(skip_serializing) ]
2023-06-09 22:21:50 +03:00
rpc_data : HashMap < Arc < Web3Rpc > , ConsensusRpcData > ,
2023-02-14 23:14:50 +03:00
}
impl ConsensusWeb3Rpcs {
2023-05-13 01:15:32 +03:00
#[ inline ]
2023-05-16 23:26:39 +03:00
pub fn num_consensus_rpcs ( & self ) -> usize {
2023-05-17 08:29:36 +03:00
self . head_rpcs . len ( )
2023-02-14 23:14:50 +03:00
}
2023-05-17 08:29:36 +03:00
/// will tell you if you should wait for a block
/// TODO: also include method (or maybe an enum representing the different prune types)
pub fn should_wait_for_block (
2023-05-17 04:18:56 +03:00
& self ,
needed_block_num : Option < & U64 > ,
skip_rpcs : & [ Arc < Web3Rpc > ] ,
2023-05-17 08:29:36 +03:00
) -> ShouldWaitForBlock {
2023-05-18 23:47:00 +03:00
// TODO: i think checking synced is always a waste of time. though i guess there could be a race
2023-05-18 23:51:28 +03:00
if self
. head_rpcs
. iter ( )
. any ( | rpc | self . rpc_will_work_eventually ( rpc , needed_block_num , skip_rpcs ) )
{
let head_num = self . head_block . number ( ) ;
if Some ( head_num ) > = needed_block_num {
2023-05-27 11:53:49 +03:00
trace! ( " best (head) block: {} " , head_num ) ;
2023-05-18 23:51:28 +03:00
return ShouldWaitForBlock ::Ready ;
}
}
2023-05-17 08:29:36 +03:00
// all of the head rpcs are skipped
let mut best_num = None ;
// iterate the other rpc tiers to find the next best block
for ( next_ranking , next_rpcs ) in self . other_rpcs . iter ( ) {
if ! next_rpcs
. iter ( )
. any ( | rpc | self . rpc_will_work_eventually ( rpc , needed_block_num , skip_rpcs ) )
{
2023-05-27 11:53:49 +03:00
trace! ( " everything in this ranking ({:?}) is skipped " , next_ranking ) ;
2023-05-17 08:29:36 +03:00
continue ;
}
let next_head_num = next_ranking . head_num . as_ref ( ) ;
2023-05-16 23:26:39 +03:00
2023-05-17 08:29:36 +03:00
if next_head_num > = needed_block_num {
2023-05-27 11:53:49 +03:00
trace! ( " best (head) block: {:?} " , next_head_num ) ;
2023-05-17 08:29:36 +03:00
return ShouldWaitForBlock ::Ready ;
2023-05-16 23:26:39 +03:00
}
2023-05-17 08:29:36 +03:00
best_num = next_head_num ;
}
// TODO: this seems wrong
if best_num . is_some ( ) {
2023-05-27 11:53:49 +03:00
trace! ( " best (old) block: {:?} " , best_num ) ;
2023-05-17 08:29:36 +03:00
ShouldWaitForBlock ::Wait {
current : best_num . copied ( ) ,
}
2023-05-16 23:26:39 +03:00
} else {
2023-05-27 11:53:49 +03:00
trace! ( " never ready " ) ;
2023-05-17 08:29:36 +03:00
ShouldWaitForBlock ::NeverReady
2023-05-16 23:26:39 +03:00
}
}
2023-05-13 01:15:32 +03:00
pub fn has_block_data ( & self , rpc : & Web3Rpc , block_num : & U64 ) -> bool {
self . rpc_data
. get ( rpc )
. map ( | x | x . data_available ( block_num ) )
. unwrap_or ( false )
}
2023-05-17 08:29:36 +03:00
// TODO: take method as a param, too. mark nodes with supported methods (maybe do it optimistically? on)
2023-05-19 08:43:07 +03:00
pub fn rpc_will_work_eventually (
2023-05-17 08:29:36 +03:00
& self ,
rpc : & Arc < Web3Rpc > ,
needed_block_num : Option < & U64 > ,
skip_rpcs : & [ Arc < Web3Rpc > ] ,
) -> bool {
if skip_rpcs . contains ( rpc ) {
// if rpc is skipped, it must have already been determined it is unable to serve the request
return false ;
}
if let Some ( needed_block_num ) = needed_block_num {
if let Some ( rpc_data ) = self . rpc_data . get ( rpc ) {
match rpc_data . head_block_num . cmp ( needed_block_num ) {
Ordering ::Less = > {
2023-05-29 08:47:39 +03:00
trace! ( " {} is behind. let it catch up " , rpc ) ;
// TODO: what if this is a pruned rpc that is behind by a lot, and the block is old, too?
2023-05-17 08:29:36 +03:00
return true ;
}
Ordering ::Greater | Ordering ::Equal = > {
// rpc is synced past the needed block. make sure the block isn't too old for it
if self . has_block_data ( rpc , needed_block_num ) {
2023-05-27 11:53:49 +03:00
trace! ( " {} has {} " , rpc , needed_block_num ) ;
2023-05-17 08:29:36 +03:00
return true ;
} else {
2023-05-27 11:53:49 +03:00
trace! ( " {} does not have {} " , rpc , needed_block_num ) ;
2023-05-17 08:29:36 +03:00
return false ;
}
}
}
}
// no rpc data for this rpc. thats not promising
2023-05-29 08:47:39 +03:00
false
} else {
// if no needed_block_num was specified, then this should work
true
2023-05-17 08:29:36 +03:00
}
}
2023-05-17 04:18:56 +03:00
// TODO: better name for this
2023-05-29 08:42:18 +03:00
// TODO: this should probably be on the rpcs as "can_serve_request"
// TODO: this should probably take the method, too
2023-05-17 08:29:36 +03:00
pub fn rpc_will_work_now (
2023-05-13 01:15:32 +03:00
& self ,
skip : & [ Arc < Web3Rpc > ] ,
min_block_needed : Option < & U64 > ,
max_block_needed : Option < & U64 > ,
rpc : & Arc < Web3Rpc > ,
) -> bool {
if skip . contains ( rpc ) {
trace! ( " skipping {} " , rpc ) ;
return false ;
}
if let Some ( min_block_needed ) = min_block_needed {
if ! self . has_block_data ( rpc , min_block_needed ) {
trace! (
" {} is missing min_block_needed ({}). skipping " ,
rpc ,
min_block_needed ,
) ;
return false ;
}
}
if let Some ( max_block_needed ) = max_block_needed {
if ! self . has_block_data ( rpc , max_block_needed ) {
trace! (
" {} is missing max_block_needed ({}). skipping " ,
rpc ,
max_block_needed ,
) ;
return false ;
}
}
2023-05-29 08:42:18 +03:00
// TODO: this might be a big perf hit. benchmark
2023-05-29 08:47:39 +03:00
if let Some ( x ) = rpc . hard_limit_until . as_ref ( ) {
if * x . borrow ( ) > Instant ::now ( ) {
2023-05-29 08:42:18 +03:00
trace! ( " {} is rate limited. skipping " , rpc , ) ;
return false ;
}
}
2023-05-13 01:15:32 +03:00
true
}
2023-02-14 23:14:50 +03:00
// TODO: sum_hard_limit?
}
impl fmt ::Debug for ConsensusWeb3Rpcs {
fn fmt ( & self , f : & mut fmt ::Formatter < '_ > ) -> fmt ::Result {
// TODO: the default formatter takes forever to write. this is too quiet though
// TODO: print the actual conns?
2023-01-26 08:24:09 +03:00
f . debug_struct ( " ConsensusWeb3Rpcs " )
2023-02-14 23:14:50 +03:00
. field ( " head_block " , & self . head_block )
2023-05-17 08:29:36 +03:00
. field ( " num_conns " , & self . head_rpcs . len ( ) )
2023-02-14 23:14:50 +03:00
. finish_non_exhaustive ( )
}
}
2023-05-13 01:15:32 +03:00
// TODO: refs for all of these. borrow on a Sender is cheap enough
2023-02-14 23:14:50 +03:00
impl Web3Rpcs {
// TODO: return a ref?
pub fn head_block ( & self ) -> Option < Web3ProxyBlock > {
2023-02-26 10:52:33 +03:00
self . watch_consensus_head_sender
2023-02-14 23:14:50 +03:00
. as_ref ( )
2023-02-15 04:41:40 +03:00
. and_then ( | x | x . borrow ( ) . clone ( ) )
2023-02-14 23:14:50 +03:00
}
// TODO: return a ref?
pub fn head_block_hash ( & self ) -> Option < H256 > {
self . head_block ( ) . map ( | x | * x . hash ( ) )
}
// TODO: return a ref?
pub fn head_block_num ( & self ) -> Option < U64 > {
self . head_block ( ) . map ( | x | * x . number ( ) )
}
pub fn synced ( & self ) -> bool {
2023-02-27 09:44:09 +03:00
let consensus = self . watch_consensus_rpcs_sender . borrow ( ) ;
if let Some ( consensus ) = consensus . as_ref ( ) {
2023-05-17 08:29:36 +03:00
! consensus . head_rpcs . is_empty ( )
2023-02-27 09:44:09 +03:00
} else {
false
}
2023-02-14 23:14:50 +03:00
}
pub fn num_synced_rpcs ( & self ) -> usize {
2023-02-27 09:44:09 +03:00
let consensus = self . watch_consensus_rpcs_sender . borrow ( ) ;
if let Some ( consensus ) = consensus . as_ref ( ) {
2023-05-17 08:29:36 +03:00
consensus . head_rpcs . len ( )
2023-02-27 09:44:09 +03:00
} else {
0
}
2023-02-14 23:14:50 +03:00
}
}
2023-05-18 23:34:22 +03:00
type FirstSeenCache = Cache < H256 , Instant > ;
2023-02-15 23:33:43 +03:00
2023-02-14 23:14:50 +03:00
/// A ConsensusConnections builder that tracks all connection heads across multiple groups of servers
pub struct ConsensusFinder {
2023-03-21 21:16:18 +03:00
rpc_heads : HashMap < Arc < Web3Rpc > , Web3ProxyBlock > ,
2023-02-14 23:14:50 +03:00
/// never serve blocks that are too old
max_block_age : Option < u64 > ,
/// tier 0 will be prefered as long as the distance between it and the other tiers is <= max_tier_lag
max_block_lag : Option < U64 > ,
2023-06-09 22:21:50 +03:00
/// Block Hash -> First Seen Instant. used to track rpc.head_latency. The same cache should be shared between all ConnectionsGroups
2023-03-21 21:16:18 +03:00
first_seen : FirstSeenCache ,
2023-02-14 23:14:50 +03:00
}
impl ConsensusFinder {
2023-02-27 07:00:13 +03:00
pub fn new ( max_block_age : Option < u64 > , max_block_lag : Option < U64 > ) -> Self {
// TODO: what's a good capacity for this? it shouldn't need to be very large
2023-05-18 23:34:22 +03:00
let first_seen = Cache ::new ( 16 ) ;
2023-02-15 23:33:43 +03:00
2023-03-21 21:16:18 +03:00
let rpc_heads = HashMap ::new ( ) ;
2023-02-14 23:41:05 +03:00
2023-02-14 23:14:50 +03:00
Self {
2023-03-21 21:16:18 +03:00
rpc_heads ,
2023-02-14 23:14:50 +03:00
max_block_age ,
max_block_lag ,
2023-03-21 21:16:18 +03:00
first_seen ,
2023-02-14 23:14:50 +03:00
}
}
2023-02-15 23:33:43 +03:00
pub fn len ( & self ) -> usize {
2023-03-21 21:16:18 +03:00
self . rpc_heads . len ( )
2023-02-15 23:33:43 +03:00
}
2023-02-27 09:44:09 +03:00
pub fn is_empty ( & self ) -> bool {
2023-03-21 21:16:18 +03:00
self . rpc_heads . is_empty ( )
2023-02-14 23:14:50 +03:00
}
2023-03-21 21:16:18 +03:00
fn remove ( & mut self , rpc : & Arc < Web3Rpc > ) -> Option < Web3ProxyBlock > {
self . rpc_heads . remove ( rpc )
2023-02-14 23:14:50 +03:00
}
2023-03-21 21:16:18 +03:00
async fn insert ( & mut self , rpc : Arc < Web3Rpc > , block : Web3ProxyBlock ) -> Option < Web3ProxyBlock > {
let first_seen = self
. first_seen
2023-06-08 03:26:38 +03:00
. get_with_by_ref ( block . hash ( ) , async { Instant ::now ( ) } )
. await ;
2023-02-14 23:14:50 +03:00
2023-05-16 22:18:59 +03:00
// calculate elapsed time before trying to lock
2023-03-21 21:16:18 +03:00
let latency = first_seen . elapsed ( ) ;
2023-02-14 23:14:50 +03:00
2023-05-16 22:18:59 +03:00
// record the time behind the fastest node
2023-03-21 21:16:18 +03:00
rpc . head_latency . write ( ) . record ( latency ) ;
2023-02-14 23:14:50 +03:00
2023-05-16 22:18:59 +03:00
// update the local mapping of rpc -> block
2023-03-21 21:16:18 +03:00
self . rpc_heads . insert ( rpc , block )
2023-02-14 23:14:50 +03:00
}
/// Update our tracking of the rpc and return true if something changed
pub ( crate ) async fn update_rpc (
& mut self ,
rpc_head_block : Option < Web3ProxyBlock > ,
rpc : Arc < Web3Rpc > ,
// we need this so we can save the block to caches. i don't like it though. maybe we should use a lazy_static Cache wrapper that has a "save_block" method?. i generally dislike globals but i also dislike all the types having to pass eachother around
web3_connections : & Web3Rpcs ,
2023-03-20 23:45:21 +03:00
) -> Web3ProxyResult < bool > {
2023-02-14 23:14:50 +03:00
// add the rpc's block to connection_heads, or remove the rpc from connection_heads
let changed = match rpc_head_block {
Some ( mut rpc_head_block ) = > {
// we don't know if its on the heaviest chain yet
rpc_head_block = web3_connections
. try_cache_block ( rpc_head_block , false )
2023-02-14 23:41:05 +03:00
. await
2023-03-20 23:45:21 +03:00
. web3_context ( " failed caching block " ) ? ;
2023-02-14 23:14:50 +03:00
if let Some ( max_age ) = self . max_block_age {
if rpc_head_block . age ( ) > max_age {
trace! ( " rpc_head_block from {} is too old! {} " , rpc , rpc_head_block ) ;
return Ok ( self . remove ( & rpc ) . is_some ( ) ) ;
}
}
2023-03-21 21:16:18 +03:00
if let Some ( prev_block ) = self . insert ( rpc , rpc_head_block . clone ( ) ) . await {
// false if this block was already sent by this rpc
2023-02-27 09:44:09 +03:00
// true if new block for this rpc
prev_block . hash ( ) ! = rpc_head_block . hash ( )
2023-02-14 23:14:50 +03:00
} else {
// first block for this rpc
true
}
}
None = > {
2023-02-27 09:44:09 +03:00
// false if this rpc was already removed
// true if rpc head changed from being synced to not
self . remove ( & rpc ) . is_some ( )
2023-02-14 23:14:50 +03:00
}
} ;
Ok ( changed )
}
2023-06-09 23:09:58 +03:00
pub async fn update_tiers ( & mut self ) -> Web3ProxyResult < ( ) > {
2023-06-09 22:21:50 +03:00
match self . rpc_heads . len ( ) {
0 = > { }
1 = > {
for rpc in self . rpc_heads . keys ( ) {
rpc . tier . store ( 0 , atomic ::Ordering ::Relaxed )
}
}
_ = > {
// iterate first to find bounds
let mut min_latency = u64 ::MAX ;
let mut max_latency = u64 ::MIN ;
let mut weighted_latencies = HashMap ::new ( ) ;
for rpc in self . rpc_heads . keys ( ) {
let weighted_latency_seconds = rpc . weighted_peak_ewma_seconds ( ) ;
let weighted_latency_ms = ( weighted_latency_seconds * 1000.0 ) . round ( ) as i64 ;
let weighted_latency_ms : u64 = weighted_latency_ms
. try_into ( )
. context ( " weighted_latency_ms does not fit in a u64 " ) ? ;
min_latency = min_latency . min ( weighted_latency_ms ) ;
max_latency = min_latency . max ( weighted_latency_ms ) ;
weighted_latencies . insert ( rpc , weighted_latency_ms ) ;
}
// // histogram requires high to be at least 2 x low
// // using min_latency for low does not work how we want it though
2023-06-13 03:55:23 +03:00
max_latency = max_latency . max ( 1000 ) ;
2023-06-09 22:21:50 +03:00
// create the histogram
let mut hist = Histogram ::< u32 > ::new_with_bounds ( 1 , max_latency , 3 ) . unwrap ( ) ;
2023-06-09 23:30:00 +03:00
// TODO: resize shouldn't be necessary, but i've seen it error
hist . auto ( true ) ;
2023-06-09 22:21:50 +03:00
for weighted_latency_ms in weighted_latencies . values ( ) {
hist . record ( * weighted_latency_ms ) ? ;
}
// dev logging
if log_enabled! ( Level ::Trace ) {
// print the histogram. see docs/histograms.txt for more info
let mut encoder =
base64 ::write ::EncoderWriter ::new ( Vec ::new ( ) , & general_purpose ::STANDARD ) ;
V2DeflateSerializer ::new ( )
. serialize ( & hist , & mut encoder )
. unwrap ( ) ;
let encoded = encoder . finish ( ) . unwrap ( ) ;
let encoded = String ::from_utf8 ( encoded ) . unwrap ( ) ;
trace! ( " weighted_latencies: {} " , encoded ) ;
}
2023-06-13 06:44:52 +03:00
// TODO: get someone who is better at math to do something smarter. maybe involving stddev?
let divisor = 30 f64 . max ( min_latency as f64 / 2.0 ) ;
2023-06-09 22:21:50 +03:00
for ( rpc , weighted_latency_ms ) in weighted_latencies . into_iter ( ) {
2023-06-13 06:44:52 +03:00
let tier = ( weighted_latency_ms - min_latency ) as f64 / divisor ;
2023-06-09 22:21:50 +03:00
2023-06-16 20:52:13 +03:00
let tier = ( tier . floor ( ) as u32 ) . saturating_add ( 1 ) ;
2023-06-09 22:21:50 +03:00
// TODO: this should be trace
trace! (
" {} - weighted_latency: {}ms, tier {} " ,
rpc ,
weighted_latency_ms ,
tier
) ;
rpc . tier . store ( tier , atomic ::Ordering ::Relaxed ) ;
}
}
}
Ok ( ( ) )
}
2023-03-21 21:16:18 +03:00
pub async fn find_consensus_connections (
2023-02-14 23:14:50 +03:00
& mut self ,
authorization : & Arc < Authorization > ,
2023-03-21 21:16:18 +03:00
web3_rpcs : & Web3Rpcs ,
2023-04-01 09:23:30 +03:00
) -> Web3ProxyResult < Option < ConsensusWeb3Rpcs > > {
2023-06-09 23:09:58 +03:00
self . update_tiers ( ) . await ? ;
2023-06-09 22:21:50 +03:00
2023-03-30 15:42:56 +03:00
let minmax_block = self . rpc_heads . values ( ) . minmax_by_key ( | & x | x . number ( ) ) ;
2023-03-21 21:16:18 +03:00
let ( lowest_block , highest_block ) = match minmax_block {
MinMaxResult ::NoElements = > return Ok ( None ) ,
MinMaxResult ::OneElement ( x ) = > ( x , x ) ,
MinMaxResult ::MinMax ( min , max ) = > ( min , max ) ,
} ;
let highest_block_number = highest_block . number ( ) ;
trace! ( " highest_block_number: {} " , highest_block_number ) ;
trace! ( " lowest_block_number: {} " , lowest_block . number ( ) ) ;
2023-05-12 03:04:33 +03:00
let max_lag_block_number =
highest_block_number . saturating_sub ( self . max_block_lag . unwrap_or_else ( | | U64 ::from ( 5 ) ) ) ;
2023-03-21 21:16:18 +03:00
trace! ( " max_lag_block_number: {} " , max_lag_block_number ) ;
let lowest_block_number = lowest_block . number ( ) . max ( & max_lag_block_number ) ;
2023-06-09 23:31:46 +03:00
// TODO: should lowest block number be set such that the rpc won't ever go backwards?
2023-03-21 21:16:18 +03:00
trace! ( " safe lowest_block_number: {} " , lowest_block_number ) ;
let num_known = self . rpc_heads . len ( ) ;
2023-06-09 23:30:00 +03:00
if num_known < web3_rpcs . min_synced_rpcs {
2023-03-21 21:16:18 +03:00
// this keeps us from serving requests when the proxy first starts
2023-05-12 03:04:33 +03:00
trace! ( " not enough servers known " ) ;
2023-03-21 21:16:18 +03:00
return Ok ( None ) ;
}
// TODO: also track the sum of *available* hard_limits? if any servers have no hard limits, use their soft limit or no limit?
// TODO: struct for the value of the votes hashmap?
2023-06-13 07:03:38 +03:00
let mut primary_votes : HashMap < Web3ProxyBlock , ( HashSet < & Arc < Web3Rpc > > , u32 ) > =
Default ::default ( ) ;
let mut backup_votes : HashMap < Web3ProxyBlock , ( HashSet < & Arc < Web3Rpc > > , u32 ) > =
Default ::default ( ) ;
2023-03-21 21:16:18 +03:00
2023-06-13 07:03:38 +03:00
for ( rpc , rpc_head ) in self . rpc_heads . iter ( ) {
2023-03-21 21:16:18 +03:00
let mut block_to_check = rpc_head . clone ( ) ;
while block_to_check . number ( ) > = lowest_block_number {
if ! rpc . backup {
// backup nodes are excluded from the primary voting
let entry = primary_votes . entry ( block_to_check . clone ( ) ) . or_default ( ) ;
2023-06-13 07:03:38 +03:00
entry . 0. insert ( rpc ) ;
2023-03-21 21:16:18 +03:00
entry . 1 + = rpc . soft_limit ;
}
// both primary and backup rpcs get included in the backup voting
let backup_entry = backup_votes . entry ( block_to_check . clone ( ) ) . or_default ( ) ;
2023-06-13 07:03:38 +03:00
backup_entry . 0. insert ( rpc ) ;
2023-03-21 21:16:18 +03:00
backup_entry . 1 + = rpc . soft_limit ;
2023-03-30 15:42:56 +03:00
match web3_rpcs
. block ( authorization , block_to_check . parent_hash ( ) , Some ( rpc ) )
. await
{
2023-03-21 21:16:18 +03:00
Ok ( parent_block ) = > block_to_check = parent_block ,
Err ( err ) = > {
2023-05-13 09:20:59 +03:00
warn! (
" Problem fetching parent block of {:?} during consensus finding: {:#?} " ,
block_to_check . hash ( ) ,
err
) ;
2023-03-21 21:16:18 +03:00
break ;
}
}
}
}
// we finished processing all tiers. check for primary results (if anything but the last tier found consensus, we already returned above)
if let Some ( consensus ) = self . count_votes ( & primary_votes , web3_rpcs ) {
2023-03-30 15:42:56 +03:00
return Ok ( Some ( consensus ) ) ;
2023-03-21 21:16:18 +03:00
}
2023-06-13 07:03:38 +03:00
// primary votes didn't work. hopefully backup tiers are synced
2023-03-21 21:16:18 +03:00
Ok ( self . count_votes ( & backup_votes , web3_rpcs ) )
}
// TODO: have min_sum_soft_limit and min_head_rpcs on self instead of on Web3Rpcs
2023-03-30 15:42:56 +03:00
fn count_votes (
& self ,
2023-06-13 07:03:38 +03:00
votes : & HashMap < Web3ProxyBlock , ( HashSet < & Arc < Web3Rpc > > , u32 ) > ,
2023-03-30 15:42:56 +03:00
web3_rpcs : & Web3Rpcs ,
) -> Option < ConsensusWeb3Rpcs > {
2023-03-21 21:16:18 +03:00
// sort the primary votes ascending by tier and descending by block num
2023-03-30 15:42:56 +03:00
let mut votes : Vec < _ > = votes
2023-05-13 01:15:32 +03:00
. into_iter ( )
2023-03-30 15:42:56 +03:00
. map ( | ( block , ( rpc_names , sum_soft_limit ) ) | ( block , sum_soft_limit , rpc_names ) )
. collect ( ) ;
votes . sort_by_cached_key ( | ( block , sum_soft_limit , rpc_names ) | {
(
Reverse ( * block . number ( ) ) ,
Reverse ( * sum_soft_limit ) ,
Reverse ( rpc_names . len ( ) ) ,
)
} ) ;
2023-03-21 21:16:18 +03:00
// return the first result that exceededs confgured minimums (if any)
for ( maybe_head_block , sum_soft_limit , rpc_names ) in votes {
if * sum_soft_limit < web3_rpcs . min_sum_soft_limit {
continue ;
2023-02-14 23:14:50 +03:00
}
2023-03-21 21:16:18 +03:00
// TODO: different mins for backup vs primary
2023-06-09 23:30:00 +03:00
if rpc_names . len ( ) < web3_rpcs . min_synced_rpcs {
2023-03-21 21:16:18 +03:00
continue ;
2023-02-14 23:14:50 +03:00
}
2023-03-21 21:16:18 +03:00
2023-03-23 00:23:14 +03:00
trace! ( " rpc_names: {:#?} " , rpc_names ) ;
2023-06-13 07:03:38 +03:00
if rpc_names . len ( ) < web3_rpcs . min_synced_rpcs {
2023-03-21 21:16:18 +03:00
continue ;
}
2023-06-13 07:03:38 +03:00
2023-03-21 21:16:18 +03:00
// consensus found!
2023-06-13 07:03:38 +03:00
let consensus_rpcs : Vec < Arc < _ > > = rpc_names . iter ( ) . map ( | x | Arc ::clone ( x ) ) . collect ( ) ;
2023-03-21 21:16:18 +03:00
2023-03-30 15:42:56 +03:00
let tier = consensus_rpcs
. iter ( )
2023-06-09 22:21:50 +03:00
. map ( | x | x . tier . load ( atomic ::Ordering ::Relaxed ) )
2023-03-30 15:42:56 +03:00
. max ( )
. expect ( " there should always be a max " ) ;
2023-03-21 21:16:18 +03:00
let backups_needed = consensus_rpcs . iter ( ) . any ( | x | x . backup ) ;
2023-05-13 01:15:32 +03:00
let mut other_rpcs = BTreeMap ::new ( ) ;
for ( x , x_head ) in self
. rpc_heads
. iter ( )
. filter ( | ( k , _ ) | ! consensus_rpcs . contains ( k ) )
{
let x_head_num = * x_head . number ( ) ;
2023-06-09 22:21:50 +03:00
let key : RpcRanking = RpcRanking ::new (
x . tier . load ( atomic ::Ordering ::Relaxed ) ,
x . backup ,
Some ( x_head_num ) ,
) ;
2023-05-13 01:15:32 +03:00
other_rpcs
. entry ( key )
. or_insert_with ( Vec ::new )
. push ( x . clone ( ) ) ;
}
// TODO: how should we populate this?
let mut rpc_data = HashMap ::with_capacity ( self . rpc_heads . len ( ) ) ;
for ( x , x_head ) in self . rpc_heads . iter ( ) {
2023-06-09 22:21:50 +03:00
let y = ConsensusRpcData ::new ( x , x_head ) ;
2023-05-13 01:15:32 +03:00
rpc_data . insert ( x . clone ( ) , y ) ;
}
2023-03-21 21:16:18 +03:00
let consensus = ConsensusWeb3Rpcs {
tier ,
head_block : maybe_head_block . clone ( ) ,
2023-05-17 08:29:36 +03:00
head_rpcs : consensus_rpcs ,
2023-05-13 01:15:32 +03:00
other_rpcs ,
2023-03-21 21:16:18 +03:00
backups_needed ,
2023-05-13 01:15:32 +03:00
rpc_data ,
2023-03-21 21:16:18 +03:00
} ;
return Some ( consensus ) ;
2023-02-14 23:14:50 +03:00
}
2023-03-21 21:16:18 +03:00
None
}
2023-06-13 06:44:52 +03:00
pub fn worst_tier ( & self ) -> Option < u32 > {
2023-06-09 22:21:50 +03:00
self . rpc_heads
. iter ( )
. map ( | ( x , _ ) | x . tier . load ( atomic ::Ordering ::Relaxed ) )
. max ( )
2023-02-14 23:14:50 +03:00
}
}
2023-02-15 04:41:40 +03:00
#[ cfg(test) ]
mod test {
2023-02-15 23:33:43 +03:00
// #[test]
// fn test_simplest_case_consensus_head_connections() {
// todo!();
// }
2023-02-15 04:41:40 +03:00
}