2022-08-24 02:56:47 +03:00
///! Keep track of the blockchain as seen by a Web3Connections.
2022-08-24 03:11:49 +03:00
use super ::connection ::Web3Connection ;
use super ::connections ::Web3Connections ;
2022-08-24 03:59:05 +03:00
use super ::transactions ::TxStatus ;
2022-11-08 22:58:11 +03:00
use crate ::frontend ::authorization ::Authorization ;
2022-08-27 02:44:25 +03:00
use crate ::{
2023-01-19 13:13:00 +03:00
config ::BlockAndRpc , jsonrpc ::JsonRpcRequest , rpcs ::synced_connections ::ConsensusConnections ,
2022-08-27 02:44:25 +03:00
} ;
2022-09-01 08:58:55 +03:00
use anyhow ::Context ;
2022-08-26 20:26:17 +03:00
use derive_more ::From ;
2022-08-27 06:11:58 +03:00
use ethers ::prelude ::{ Block , TxHash , H256 , U64 } ;
2022-08-27 02:44:25 +03:00
use hashbrown ::{ HashMap , HashSet } ;
2023-01-19 13:13:00 +03:00
use log ::{ debug , error , warn , Level } ;
2022-09-05 08:53:58 +03:00
use moka ::future ::Cache ;
2022-09-01 08:58:55 +03:00
use serde ::Serialize ;
2022-08-24 02:56:47 +03:00
use serde_json ::json ;
2022-12-01 01:11:14 +03:00
use std ::time ::{ SystemTime , UNIX_EPOCH } ;
2022-09-01 08:58:55 +03:00
use std ::{ cmp ::Ordering , fmt ::Display , sync ::Arc } ;
2022-08-24 02:56:47 +03:00
use tokio ::sync ::{ broadcast , watch } ;
2022-09-20 09:00:27 +03:00
use tokio ::time ::Duration ;
2022-08-24 02:56:47 +03:00
2022-09-05 08:53:58 +03:00
// TODO: type for Hydrated Blocks with their full transactions?
2022-08-30 23:01:42 +03:00
pub type ArcBlock = Arc < Block < TxHash > > ;
2022-09-20 04:33:39 +03:00
pub type BlockHashesCache = Cache < H256 , ArcBlock , hashbrown ::hash_map ::DefaultHashBuilder > ;
2022-08-30 23:01:42 +03:00
2023-01-19 13:13:00 +03:00
/// A block and its age.
2022-09-01 08:58:55 +03:00
#[ derive(Clone, Debug, Default, From, Serialize) ]
2022-12-03 08:31:03 +03:00
pub struct SavedBlock {
pub block : ArcBlock ,
/// number of seconds this block was behind the current time when received
2023-01-04 23:07:53 +03:00
pub age : u64 ,
2022-08-26 20:26:17 +03:00
}
2022-12-17 07:05:01 +03:00
impl PartialEq for SavedBlock {
fn eq ( & self , other : & Self ) -> bool {
match ( self . block . hash , other . block . hash ) {
( None , None ) = > true ,
( Some ( _ ) , None ) = > false ,
( None , Some ( _ ) ) = > false ,
( Some ( s ) , Some ( o ) ) = > s = = o ,
}
}
}
2022-12-03 08:31:03 +03:00
impl SavedBlock {
pub fn new ( block : ArcBlock ) -> Self {
2023-01-04 23:07:53 +03:00
let mut x = Self { block , age : 0 } ;
2022-12-06 00:13:36 +03:00
// no need to recalulate lag every time
// if the head block gets too old, a health check restarts this connection
2023-01-04 23:07:53 +03:00
x . age = x . lag ( ) ;
2022-12-06 00:13:36 +03:00
x
}
pub fn lag ( & self ) -> u64 {
2022-12-03 08:31:03 +03:00
let now = SystemTime ::now ( )
. duration_since ( UNIX_EPOCH )
. expect ( " there should always be time " ) ;
2022-12-06 00:13:36 +03:00
let block_timestamp = Duration ::from_secs ( self . block . timestamp . as_u64 ( ) ) ;
2022-12-03 08:31:03 +03:00
2023-01-03 19:33:49 +03:00
if block_timestamp < now {
2022-12-03 08:31:03 +03:00
// this server is still syncing from too far away to serve requests
// u64 is safe because ew checked equality above
2023-01-03 19:33:49 +03:00
( now - block_timestamp ) . as_secs ( ) as u64
2022-12-03 08:31:03 +03:00
} else {
0
2022-12-06 00:13:36 +03:00
}
2022-12-03 08:31:03 +03:00
}
pub fn hash ( & self ) -> H256 {
2022-12-17 07:05:01 +03:00
self . block . hash . expect ( " saved blocks must have a hash " )
2022-12-03 08:31:03 +03:00
}
// TODO: return as U64 or u64?
pub fn number ( & self ) -> U64 {
2022-12-17 07:05:01 +03:00
self . block . number . expect ( " saved blocks must have a number " )
2022-12-03 08:31:03 +03:00
}
}
impl From < ArcBlock > for SavedBlock {
fn from ( x : ArcBlock ) -> Self {
SavedBlock ::new ( x )
}
}
impl Display for SavedBlock {
2022-09-01 08:58:55 +03:00
fn fmt ( & self , f : & mut std ::fmt ::Formatter < '_ > ) -> std ::fmt ::Result {
2023-01-04 23:07:53 +03:00
write! ( f , " {} ({}, {}s old) " , self . number ( ) , self . hash ( ) , self . age )
2022-09-01 08:58:55 +03:00
}
}
2022-08-24 02:56:47 +03:00
impl Web3Connections {
2022-12-03 08:31:03 +03:00
/// add a block to our mappings and track the heaviest chain
2023-01-19 13:13:00 +03:00
pub async fn save_block (
& self ,
block : ArcBlock ,
heaviest_chain : bool ,
) -> anyhow ::Result < ArcBlock > {
2022-09-03 00:35:03 +03:00
// TODO: i think we can rearrange this function to make it faster on the hot path
2022-09-01 08:58:55 +03:00
let block_hash = block . hash . as_ref ( ) . context ( " no block hash " ) ? ;
2022-09-06 15:29:37 +03:00
2022-09-14 08:26:46 +03:00
// skip Block::default()
2022-09-06 15:29:37 +03:00
if block_hash . is_zero ( ) {
2022-09-14 08:26:46 +03:00
debug! ( " Skipping block without hash! " ) ;
2023-01-19 13:13:00 +03:00
return Ok ( block ) ;
2022-09-06 15:29:37 +03:00
}
2022-10-27 00:39:26 +03:00
let block_num = block . number . as_ref ( ) . context ( " no block num " ) ? ;
2022-09-30 07:18:18 +03:00
// TODO: think more about heaviest_chain. would be better to do the check inside this function
2022-09-05 09:13:36 +03:00
if heaviest_chain {
// this is the only place that writes to block_numbers
2022-09-14 08:26:46 +03:00
// multiple inserts should be okay though
2022-12-03 08:31:03 +03:00
// TODO: info that there was a fork?
2022-09-05 08:53:58 +03:00
self . block_numbers . insert ( * block_num , * block_hash ) . await ;
2022-09-02 08:40:56 +03:00
}
2022-12-03 08:31:03 +03:00
// this block is very likely already in block_hashes
// TODO: use their get_with
2023-01-19 13:13:00 +03:00
let block = self
. block_hashes
2022-12-03 08:31:03 +03:00
. get_with ( * block_hash , async move { block . clone ( ) } )
2022-09-05 09:13:36 +03:00
. await ;
2023-01-19 13:13:00 +03:00
Ok ( block )
2022-08-24 02:56:47 +03:00
}
2022-08-27 05:13:36 +03:00
/// Get a block from caches with fallback.
/// Will query a specific node or the best available.
2023-01-19 13:13:00 +03:00
/// TODO: return anyhow::Result<Option<ArcBlock>>?
2022-08-26 20:26:17 +03:00
pub async fn block (
& self ,
2022-11-08 22:58:11 +03:00
authorization : & Arc < Authorization > ,
2022-08-26 20:26:17 +03:00
hash : & H256 ,
2022-08-27 05:13:36 +03:00
rpc : Option < & Arc < Web3Connection > > ,
2022-08-30 23:01:42 +03:00
) -> anyhow ::Result < ArcBlock > {
2022-08-24 02:56:47 +03:00
// first, try to get the hash from our cache
2022-09-03 00:35:03 +03:00
// the cache is set last, so if its here, its everywhere
2023-01-19 13:13:00 +03:00
// TODO: use try_get_with
2022-08-28 02:49:41 +03:00
if let Some ( block ) = self . block_hashes . get ( hash ) {
2022-09-05 08:53:58 +03:00
return Ok ( block ) ;
2022-08-24 02:56:47 +03:00
}
// block not in cache. we need to ask an rpc for it
2022-09-23 00:51:52 +03:00
let get_block_params = ( * hash , false ) ;
2022-08-24 02:56:47 +03:00
// TODO: if error, retry?
2022-11-21 01:52:08 +03:00
let block : ArcBlock = match rpc {
2022-12-06 03:18:31 +03:00
Some ( rpc ) = > rpc
2023-01-25 09:45:20 +03:00
. wait_for_request_handle ( authorization , Some ( Duration ::from_secs ( 30 ) ) , false )
2022-12-06 03:18:31 +03:00
. await ?
. request ::< _ , Option < _ > > (
" eth_getBlockByHash " ,
& json! ( get_block_params ) ,
Level ::Error . into ( ) ,
)
. await ?
. context ( " no block! " ) ? ,
2022-08-27 05:13:36 +03:00
None = > {
2022-09-02 08:40:56 +03:00
// TODO: helper for method+params => JsonRpcRequest
// TODO: does this id matter?
2022-09-03 00:35:03 +03:00
let request = json! ( { " id " : " 1 " , " method " : " eth_getBlockByHash " , " params " : get_block_params } ) ;
2022-08-27 05:13:36 +03:00
let request : JsonRpcRequest = serde_json ::from_value ( request ) ? ;
2022-08-24 02:56:47 +03:00
2022-11-08 22:58:11 +03:00
// TODO: request_metadata? maybe we should put it in the authorization?
2023-01-23 09:02:08 +03:00
// TODO: think more about this wait_for_sync
2022-09-22 23:27:14 +03:00
let response = self
2023-01-25 07:44:50 +03:00
. try_send_best_consensus_head_connection ( authorization , request , None , None )
2022-09-22 23:27:14 +03:00
. await ? ;
2022-08-24 02:56:47 +03:00
2022-12-01 05:01:41 +03:00
let block = response . result . context ( " failed fetching block " ) ? ;
2022-08-27 05:13:36 +03:00
2022-12-06 03:18:31 +03:00
let block : Option < ArcBlock > = serde_json ::from_str ( block . get ( ) ) ? ;
block . context ( " no block! " ) ?
2022-08-27 05:13:36 +03:00
}
} ;
2022-08-24 02:56:47 +03:00
2022-08-30 23:01:42 +03:00
// the block was fetched using eth_getBlockByHash, so it should have all fields
2022-09-14 08:26:46 +03:00
// TODO: fill in heaviest_chain! if the block is old enough, is this definitely true?
2023-01-19 13:13:00 +03:00
let block = self . save_block ( block , false ) . await ? ;
2022-08-24 02:56:47 +03:00
Ok ( block )
}
/// Convenience method to get the cannonical block at a given block height.
2022-11-08 22:58:11 +03:00
pub async fn block_hash (
& self ,
authorization : & Arc < Authorization > ,
num : & U64 ,
) -> anyhow ::Result < ( H256 , bool ) > {
let ( block , is_archive_block ) = self . cannonical_block ( authorization , num ) . await ? ;
2022-08-24 02:56:47 +03:00
2022-11-03 02:14:16 +03:00
let hash = block . hash . expect ( " Saved blocks should always have hashes " ) ;
2022-08-24 02:56:47 +03:00
2022-11-03 02:14:16 +03:00
Ok ( ( hash , is_archive_block ) )
2022-08-24 02:56:47 +03:00
}
/// Get the heaviest chain's block from cache or backend rpc
2023-01-23 09:02:08 +03:00
/// Caution! If a future block is requested, this might wait forever. Be sure to have a timeout outside of this!
2022-11-08 22:58:11 +03:00
pub async fn cannonical_block (
& self ,
authorization : & Arc < Authorization > ,
num : & U64 ,
) -> anyhow ::Result < ( ArcBlock , bool ) > {
2022-08-28 02:49:41 +03:00
// we only have blocks by hash now
2022-09-05 08:53:58 +03:00
// maybe save them during save_block in a blocks_by_number Cache<U64, Vec<ArcBlock>>
2022-08-28 02:49:41 +03:00
// if theres multiple, use petgraph to find the one on the main chain (and remove the others if they have enough confirmations)
2022-08-26 20:26:17 +03:00
2023-01-23 09:02:08 +03:00
let mut consensus_head_receiver = self
. watch_consensus_head_receiver
. as_ref ( )
. context ( " need new head subscriptions to fetch cannonical_block " ) ?
. clone ( ) ;
2022-09-03 00:35:03 +03:00
// be sure the requested block num exists
2023-01-23 09:02:08 +03:00
let mut head_block_num = consensus_head_receiver . borrow_and_update ( ) . number ;
2022-11-03 02:14:16 +03:00
2023-01-23 09:02:08 +03:00
loop {
if let Some ( head_block_num ) = head_block_num {
if num < = & head_block_num {
break ;
}
}
2022-11-03 02:14:16 +03:00
2023-01-23 09:02:08 +03:00
consensus_head_receiver . changed ( ) . await ? ;
head_block_num = consensus_head_receiver . borrow_and_update ( ) . number ;
2022-08-24 02:56:47 +03:00
}
2023-01-23 09:02:08 +03:00
let head_block_num =
head_block_num . expect ( " we should only get here if we have a head block " ) ;
// TODO: geth does 64, erigon does 90k. sometimes we run a mix
2023-01-31 19:30:24 +03:00
// TODO: do this dynamically based on balanced_rpcs block_data_limit
2023-01-23 09:02:08 +03:00
let archive_needed = num < & ( head_block_num - U64 ::from ( 64 ) ) ;
2022-09-03 00:35:03 +03:00
// try to get the hash from our cache
// deref to not keep the lock open
2022-09-05 08:53:58 +03:00
if let Some ( block_hash ) = self . block_numbers . get ( num ) {
2022-09-03 00:35:03 +03:00
// TODO: sometimes this needs to fetch the block. why? i thought block_numbers would only be set if the block hash was set
2022-11-08 22:58:11 +03:00
// TODO: pass authorization through here?
let block = self . block ( authorization , & block_hash , None ) . await ? ;
2022-11-03 02:14:16 +03:00
return Ok ( ( block , archive_needed ) ) ;
2022-09-03 00:35:03 +03:00
}
// block number not in cache. we need to ask an rpc for it
2022-08-24 02:56:47 +03:00
// TODO: helper for method+params => JsonRpcRequest
let request = json! ( { " jsonrpc " : " 2.0 " , " id " : " 1 " , " method " : " eth_getBlockByNumber " , " params " : ( num , false ) } ) ;
let request : JsonRpcRequest = serde_json ::from_value ( request ) ? ;
// TODO: if error, retry?
2022-11-08 22:58:11 +03:00
// TODO: request_metadata or authorization?
2023-01-25 09:45:20 +03:00
// we don't actually set min_block_needed here because all nodes have all blocks
2022-08-24 02:56:47 +03:00
let response = self
2023-01-25 09:45:20 +03:00
. try_send_best_consensus_head_connection ( authorization , request , None , None )
2022-08-24 02:56:47 +03:00
. await ? ;
2023-01-25 09:45:20 +03:00
if let Some ( err ) = response . error {
debug! ( " could not find canonical block {}: {:?} " , num , err ) ;
}
let raw_block = response . result . context ( " no cannonical block result " ) ? ;
2022-08-24 02:56:47 +03:00
2022-11-21 01:52:08 +03:00
let block : ArcBlock = serde_json ::from_str ( raw_block . get ( ) ) ? ;
2022-08-24 02:56:47 +03:00
2022-09-02 08:40:56 +03:00
// the block was fetched using eth_getBlockByNumber, so it should have all fields and be on the heaviest chain
2023-01-19 13:13:00 +03:00
let block = self . save_block ( block , true ) . await ? ;
2022-08-24 02:56:47 +03:00
2022-12-03 08:31:03 +03:00
Ok ( ( block , archive_needed ) )
2022-08-24 02:56:47 +03:00
}
2022-08-26 20:26:17 +03:00
pub ( super ) async fn process_incoming_blocks (
2022-08-24 03:59:05 +03:00
& self ,
2022-11-08 22:58:11 +03:00
authorization : & Arc < Authorization > ,
2022-08-26 20:26:17 +03:00
block_receiver : flume ::Receiver < BlockAndRpc > ,
2022-09-05 08:53:58 +03:00
// TODO: document that this is a watch sender and not a broadcast! if things get busy, blocks might get missed
// Geth's subscriptions have the same potential for skipping blocks.
2022-08-30 23:01:42 +03:00
head_block_sender : watch ::Sender < ArcBlock > ,
2022-08-24 03:59:05 +03:00
pending_tx_sender : Option < broadcast ::Sender < TxStatus > > ,
) -> anyhow ::Result < ( ) > {
// TODO: indexmap or hashmap? what hasher? with_capacity?
2022-08-26 20:26:17 +03:00
// TODO: this will grow unbounded. prune old heads on this at the same time we prune the graph?
2023-01-19 13:13:00 +03:00
let mut connection_heads = ConsensusFinder ::default ( ) ;
2022-08-24 03:59:05 +03:00
while let Ok ( ( new_block , rpc ) ) = block_receiver . recv_async ( ) . await {
2022-12-03 08:31:03 +03:00
let new_block = new_block . map ( Into ::into ) ;
2022-09-14 08:26:46 +03:00
let rpc_name = rpc . name . clone ( ) ;
2022-12-03 08:31:03 +03:00
2022-09-14 08:26:46 +03:00
if let Err ( err ) = self
. process_block_from_rpc (
2022-11-08 22:58:11 +03:00
authorization ,
2022-09-14 08:26:46 +03:00
& mut connection_heads ,
new_block ,
rpc ,
& head_block_sender ,
& pending_tx_sender ,
)
. await
{
2022-11-12 11:24:32 +03:00
warn! ( " unable to process block from rpc {}: {:?} " , rpc_name , err ) ;
2022-09-14 08:26:46 +03:00
}
2022-08-24 03:59:05 +03:00
}
2023-01-19 13:13:00 +03:00
// TODO: if there was an error, should we return it instead of an Ok?
2022-08-24 03:59:05 +03:00
warn! ( " block_receiver exited! " ) ;
Ok ( ( ) )
}
2022-08-26 20:26:17 +03:00
/// `connection_heads` is a mapping of rpc_names to head block hashes.
2022-11-21 01:52:08 +03:00
/// self.blockchain_map is a mapping of hashes to the complete ArcBlock.
2022-08-27 02:44:25 +03:00
/// TODO: return something?
2022-11-23 01:45:22 +03:00
pub ( crate ) async fn process_block_from_rpc (
2022-08-24 02:56:47 +03:00
& self ,
2022-11-08 22:58:11 +03:00
authorization : & Arc < Authorization > ,
2023-01-19 13:13:00 +03:00
consensus_finder : & mut ConsensusFinder ,
2022-12-03 08:31:03 +03:00
rpc_head_block : Option < SavedBlock > ,
2022-08-24 02:56:47 +03:00
rpc : Arc < Web3Connection > ,
2022-08-30 23:01:42 +03:00
head_block_sender : & watch ::Sender < ArcBlock > ,
2022-08-24 03:59:05 +03:00
pending_tx_sender : & Option < broadcast ::Sender < TxStatus > > ,
2022-08-24 02:56:47 +03:00
) -> anyhow ::Result < ( ) > {
2023-01-19 13:13:00 +03:00
// TODO: how should we handle an error here?
if ! consensus_finder
. update_rpc ( rpc_head_block . clone ( ) , rpc . clone ( ) , self )
. await ?
{
// nothing changed. no need
return Ok ( ( ) ) ;
}
2022-12-01 01:11:14 +03:00
2023-01-19 13:13:00 +03:00
let new_synced_connections = consensus_finder
. best_consensus_connections ( authorization , self )
. await ;
2022-12-01 01:11:14 +03:00
2023-01-23 09:02:08 +03:00
// TODO: what should we do if the block number of new_synced_connections is < old_synced_connections? wait?
2023-01-19 13:13:00 +03:00
let includes_backups = new_synced_connections . includes_backups ;
let consensus_head_block = new_synced_connections . head_block . clone ( ) ;
let num_consensus_rpcs = new_synced_connections . num_conns ( ) ;
let num_checked_rpcs = new_synced_connections . num_checked_conns ;
let num_active_rpcs = consensus_finder . all . rpc_name_to_hash . len ( ) ;
let total_rpcs = self . conns . len ( ) ;
2023-01-23 09:02:08 +03:00
let old_consensus_head_connections = self
. watch_consensus_connections_sender
. send_replace ( Arc ::new ( new_synced_connections ) ) ;
2023-01-19 13:13:00 +03:00
2023-01-20 05:30:30 +03:00
let includes_backups_str = if includes_backups { " B " } else { " " } ;
2023-01-20 05:14:47 +03:00
2023-01-19 13:13:00 +03:00
if let Some ( consensus_saved_block ) = consensus_head_block {
2023-01-23 09:02:08 +03:00
match & old_consensus_head_connections . head_block {
2023-01-19 13:13:00 +03:00
None = > {
debug! (
2023-01-20 05:30:30 +03:00
" first {}{}/{}/{}/{} block={}, rpc={} " ,
includes_backups_str ,
2023-01-19 13:13:00 +03:00
num_consensus_rpcs ,
num_checked_rpcs ,
num_active_rpcs ,
total_rpcs ,
consensus_saved_block ,
2023-01-20 05:14:47 +03:00
rpc ,
2023-01-19 13:13:00 +03:00
) ;
if includes_backups {
// TODO: what else should be in this error?
warn! ( " Backup RPCs are in use! " ) ;
2022-12-03 08:31:03 +03:00
}
2022-12-01 01:11:14 +03:00
2023-01-19 13:13:00 +03:00
let consensus_head_block =
self . save_block ( consensus_saved_block . block , true ) . await ? ;
2022-08-24 02:56:47 +03:00
2023-01-19 13:13:00 +03:00
head_block_sender
. send ( consensus_head_block )
. context ( " head_block_sender sending consensus_head_block " ) ? ;
2022-12-03 08:31:03 +03:00
}
2023-01-19 13:13:00 +03:00
Some ( old_head_block ) = > {
// TODO: do this log item better
let rpc_head_str = rpc_head_block
. map ( | x | x . to_string ( ) )
. unwrap_or_else ( | | " None " . to_string ( ) ) ;
match consensus_saved_block . number ( ) . cmp ( & old_head_block . number ( ) ) {
Ordering ::Equal = > {
// multiple blocks with the same fork!
if consensus_saved_block . hash ( ) = = old_head_block . hash ( ) {
// no change in hash. no need to use head_block_sender
debug! (
2023-01-20 05:30:30 +03:00
" con {}{}/{}/{}/{} con={} rpc={}@{} " ,
includes_backups_str ,
2023-01-19 13:13:00 +03:00
num_consensus_rpcs ,
num_checked_rpcs ,
num_active_rpcs ,
total_rpcs ,
consensus_saved_block ,
rpc ,
rpc_head_str ,
)
} else {
// hash changed
2022-09-06 19:49:07 +03:00
2023-01-19 13:13:00 +03:00
if includes_backups {
// TODO: what else should be in this error?
warn! ( " Backup RPCs are in use! " ) ;
}
debug! (
2023-01-20 05:30:30 +03:00
" unc {}{}/{}/{}/{} con_head={} old={} rpc={}@{} " ,
includes_backups_str ,
2023-01-19 13:13:00 +03:00
num_consensus_rpcs ,
num_checked_rpcs ,
num_active_rpcs ,
total_rpcs ,
consensus_saved_block ,
old_head_block ,
rpc ,
rpc_head_str ,
) ;
let consensus_head_block = self
. save_block ( consensus_saved_block . block , true )
. await
. context ( " save consensus_head_block as heaviest chain " ) ? ;
head_block_sender
. send ( consensus_head_block )
. context ( " head_block_sender sending consensus_head_block " ) ? ;
}
}
Ordering ::Less = > {
// this is unlikely but possible
// TODO: better log
warn! (
2023-01-20 05:30:30 +03:00
" chain rolled back {}{}/{}/{}/{} con={} old={} rpc={}@{} " ,
includes_backups_str ,
2023-01-19 13:13:00 +03:00
num_consensus_rpcs ,
num_checked_rpcs ,
num_active_rpcs ,
total_rpcs ,
consensus_saved_block ,
old_head_block ,
rpc ,
rpc_head_str ,
) ;
if includes_backups {
// TODO: what else should be in this error?
warn! ( " Backup RPCs are in use! " ) ;
}
// TODO: tell save_block to remove any higher block numbers from the cache. not needed because we have other checks on requested blocks being > head, but still seems like a good idea
let consensus_head_block = self
. save_block ( consensus_saved_block . block , true )
. await
. context (
" save_block sending consensus_head_block as heaviest chain " ,
) ? ;
head_block_sender
. send ( consensus_head_block )
. context ( " head_block_sender sending consensus_head_block " ) ? ;
}
Ordering ::Greater = > {
debug! (
2023-01-20 05:30:30 +03:00
" new {}{}/{}/{}/{} con={} rpc={}@{} " ,
includes_backups_str ,
2023-01-19 13:13:00 +03:00
num_consensus_rpcs ,
num_checked_rpcs ,
num_active_rpcs ,
total_rpcs ,
consensus_saved_block ,
rpc ,
rpc_head_str ,
) ;
if includes_backups {
// TODO: what else should be in this error?
warn! ( " Backup RPCs are in use! " ) ;
}
let consensus_head_block =
self . save_block ( consensus_saved_block . block , true ) . await ? ;
head_block_sender . send ( consensus_head_block ) ? ;
}
}
}
2022-08-24 02:56:47 +03:00
}
2023-01-19 13:13:00 +03:00
} else {
// TODO: do this log item better
let rpc_head_str = rpc_head_block
. map ( | x | x . to_string ( ) )
. unwrap_or_else ( | | " None " . to_string ( ) ) ;
if num_checked_rpcs > = self . min_head_rpcs {
error! (
2023-01-20 05:30:30 +03:00
" non {}{}/{}/{}/{} rpc={}@{} " ,
includes_backups_str ,
2023-01-19 13:13:00 +03:00
num_consensus_rpcs ,
num_checked_rpcs ,
num_active_rpcs ,
total_rpcs ,
rpc ,
rpc_head_str ,
) ;
} else {
debug! (
2023-01-20 05:30:30 +03:00
" non {}{}/{}/{}/{} rpc={}@{} " ,
includes_backups_str ,
2023-01-19 13:13:00 +03:00
num_consensus_rpcs ,
num_checked_rpcs ,
num_active_rpcs ,
total_rpcs ,
rpc ,
rpc_head_str ,
) ;
}
}
Ok ( ( ) )
}
}
struct ConnectionsGroup {
2023-01-20 05:14:47 +03:00
/// TODO: this group might not actually include backups, but they were at leastchecked
2023-01-19 13:13:00 +03:00
includes_backups : bool ,
rpc_name_to_hash : HashMap < String , H256 > ,
}
impl ConnectionsGroup {
fn new ( with_backups : bool ) -> Self {
Self {
includes_backups : with_backups ,
rpc_name_to_hash : Default ::default ( ) ,
}
}
fn without_backups ( ) -> Self {
Self ::new ( false )
}
fn with_backups ( ) -> Self {
Self ::new ( true )
}
fn remove ( & mut self , rpc : & Web3Connection ) -> Option < H256 > {
self . rpc_name_to_hash . remove ( rpc . name . as_str ( ) )
}
2022-08-24 02:56:47 +03:00
2023-01-19 13:13:00 +03:00
fn insert ( & mut self , rpc : & Web3Connection , block_hash : H256 ) -> Option < H256 > {
self . rpc_name_to_hash . insert ( rpc . name . clone ( ) , block_hash )
}
// TODO: i don't love having this here. move to web3_connections?
async fn get_block_from_rpc (
& self ,
rpc_name : & str ,
hash : & H256 ,
authorization : & Arc < Authorization > ,
web3_connections : & Web3Connections ,
) -> anyhow ::Result < ArcBlock > {
// // TODO: why does this happen?!?! seems to only happen with uncled blocks
// // TODO: maybe we should do try_get_with?
// // TODO: maybe we should just continue. this only seems to happen when an older block is received
// warn!(
// "Missing connection_head_block in block_hashes. Fetching now. hash={}. other={}",
// connection_head_hash, conn_name
// );
// this option should almost always be populated. if the connection reconnects at a bad time it might not be available though
let rpc = web3_connections . conns . get ( rpc_name ) ;
web3_connections . block ( authorization , hash , rpc ) . await
}
// TODO: do this during insert/remove?
pub ( self ) async fn highest_block (
& self ,
authorization : & Arc < Authorization > ,
web3_connections : & Web3Connections ,
) -> Option < ArcBlock > {
let mut checked_heads = HashSet ::with_capacity ( self . rpc_name_to_hash . len ( ) ) ;
let mut highest_block = None ::< ArcBlock > ;
for ( rpc_name , rpc_head_hash ) in self . rpc_name_to_hash . iter ( ) {
// don't waste time checking the same hash multiple times
if checked_heads . contains ( rpc_head_hash ) {
2022-08-27 02:44:25 +03:00
continue ;
}
2022-08-26 20:26:17 +03:00
2023-01-19 13:13:00 +03:00
let rpc_block = match self
. get_block_from_rpc ( rpc_name , rpc_head_hash , authorization , web3_connections )
. await
{
Ok ( x ) = > x ,
Err ( err ) = > {
warn! (
" failed getting block {} from {} while finding highest block number: {:?} " ,
rpc_head_hash , rpc_name , err ,
) ;
continue ;
2022-09-14 08:26:46 +03:00
}
2022-09-02 23:46:39 +03:00
} ;
2022-08-24 02:56:47 +03:00
2023-01-19 13:13:00 +03:00
checked_heads . insert ( rpc_head_hash ) ;
// if this is the first block we've tried
// or if this rpc's newest block has a higher number
// we used to check total difficulty, but that isn't a thing anymore on ETH
// TODO: we still need total difficulty on some other PoW chains. whats annoying is it isn't considered part of the "block header" just the block. so websockets don't return it
let highest_num = highest_block
. as_ref ( )
. map ( | x | x . number . expect ( " blocks here should always have a number " ) ) ;
let rpc_num = rpc_block . as_ref ( ) . number ;
if rpc_num > highest_num {
highest_block = Some ( rpc_block ) ;
2022-08-27 02:44:25 +03:00
}
}
2022-08-24 02:56:47 +03:00
2023-01-19 13:13:00 +03:00
highest_block
}
2022-08-27 02:44:25 +03:00
2023-01-19 13:13:00 +03:00
pub ( self ) async fn consensus_head_connections (
& self ,
authorization : & Arc < Authorization > ,
web3_connections : & Web3Connections ,
) -> anyhow ::Result < ConsensusConnections > {
let mut maybe_head_block = match self . highest_block ( authorization , web3_connections ) . await {
None = > return Err ( anyhow ::anyhow! ( " No blocks known " ) ) ,
Some ( x ) = > x ,
} ;
let num_known = self . rpc_name_to_hash . len ( ) ;
// track rpcs on this heaviest chain so we can build a new ConsensusConnections
let mut highest_rpcs = HashSet ::< & str > ::new ( ) ;
// a running total of the soft limits covered by the rpcs that agree on the head block
let mut highest_rpcs_sum_soft_limit : u32 = 0 ;
// TODO: also track highest_rpcs_sum_hard_limit? llama doesn't need this, so it can wait
// check the highest work block for a set of rpcs that can serve our request load
// if it doesn't have enough rpcs for our request load, check the parent block
// TODO: loop for how many parent blocks? we don't want to serve blocks that are too far behind. probably different per chain
// TODO: this loop is pretty long. any way to clean up this code?
for _ in 0 .. 6 {
let maybe_head_hash = maybe_head_block
. hash
. as_ref ( )
. expect ( " blocks here always need hashes " ) ;
// find all rpcs with maybe_head_block as their current head
for ( rpc_name , rpc_head_hash ) in self . rpc_name_to_hash . iter ( ) {
if rpc_head_hash ! = maybe_head_hash {
// connection is not on the desired block
continue ;
}
if highest_rpcs . contains ( rpc_name . as_str ( ) ) {
// connection is on a child block
continue ;
2022-08-27 02:44:25 +03:00
}
2023-01-19 13:13:00 +03:00
if let Some ( rpc ) = web3_connections . conns . get ( rpc_name . as_str ( ) ) {
highest_rpcs . insert ( rpc_name ) ;
highest_rpcs_sum_soft_limit + = rpc . soft_limit ;
} else {
// i don't think this is an error. i think its just if a reconnect is currently happening
warn! ( " connection missing: {} " , rpc_name ) ;
2022-08-24 02:56:47 +03:00
}
2022-09-05 19:25:21 +03:00
}
2023-01-19 13:13:00 +03:00
if highest_rpcs_sum_soft_limit > = web3_connections . min_sum_soft_limit
& & highest_rpcs . len ( ) > = web3_connections . min_head_rpcs
{
// we have enough servers with enough requests
break ;
}
2022-09-05 19:25:21 +03:00
2023-01-19 13:13:00 +03:00
// not enough rpcs yet. check the parent block
if let Some ( parent_block ) = web3_connections
. block_hashes
. get ( & maybe_head_block . parent_hash )
{
// trace!(
// child=%maybe_head_hash, parent=%parent_block.hash.unwrap(), "avoiding thundering herd",
// );
2022-09-06 15:29:37 +03:00
2023-01-19 13:13:00 +03:00
maybe_head_block = parent_block ;
continue ;
} else {
if num_known < web3_connections . min_head_rpcs {
return Err ( anyhow ::anyhow! (
" not enough rpcs connected: {}/{}/{} " ,
highest_rpcs . len ( ) ,
num_known ,
web3_connections . min_head_rpcs ,
) ) ;
} else {
let soft_limit_percent = ( highest_rpcs_sum_soft_limit as f32
/ web3_connections . min_sum_soft_limit as f32 )
* 100.0 ;
return Err ( anyhow ::anyhow! (
" ran out of parents to check. rpcs {}/{}/{}. soft limit: {:.2}% ({}/{}) " ,
highest_rpcs . len ( ) ,
num_known ,
web3_connections . min_head_rpcs ,
highest_rpcs_sum_soft_limit ,
web3_connections . min_sum_soft_limit ,
soft_limit_percent ,
) ) ;
}
}
}
2022-09-05 19:25:21 +03:00
2023-01-19 13:13:00 +03:00
// TODO: if consensus_head_rpcs.is_empty, try another method of finding the head block. will need to change the return Err above into breaks.
2022-08-24 02:56:47 +03:00
2023-01-19 13:13:00 +03:00
// we've done all the searching for the heaviest block that we can
if highest_rpcs . len ( ) < web3_connections . min_head_rpcs
| | highest_rpcs_sum_soft_limit < web3_connections . min_sum_soft_limit
{
// if we get here, not enough servers are synced. return an error
let soft_limit_percent = ( highest_rpcs_sum_soft_limit as f32
/ web3_connections . min_sum_soft_limit as f32 )
* 100.0 ;
2022-09-02 23:16:20 +03:00
2023-01-19 13:13:00 +03:00
return Err ( anyhow ::anyhow! (
" Not enough resources. rpcs {}/{}/{}. soft limit: {:.2}% ({}/{}) " ,
highest_rpcs . len ( ) ,
num_known ,
web3_connections . min_head_rpcs ,
highest_rpcs_sum_soft_limit ,
web3_connections . min_sum_soft_limit ,
soft_limit_percent ,
) ) ;
}
2022-09-02 23:16:20 +03:00
2023-01-19 13:13:00 +03:00
// success! this block has enough soft limit and nodes on it (or on later blocks)
let conns : Vec < Arc < Web3Connection > > = highest_rpcs
. into_iter ( )
. filter_map ( | conn_name | web3_connections . conns . get ( conn_name ) . cloned ( ) )
. collect ( ) ;
// TODO: DEBUG only check
let _ = maybe_head_block
. hash
. expect ( " head blocks always have hashes " ) ;
let _ = maybe_head_block
. number
. expect ( " head blocks always have numbers " ) ;
let consensus_head_block : SavedBlock = maybe_head_block . into ( ) ;
Ok ( ConsensusConnections {
head_block : Some ( consensus_head_block ) ,
conns ,
num_checked_conns : self . rpc_name_to_hash . len ( ) ,
includes_backups : self . includes_backups ,
} )
}
}
2022-09-02 08:40:56 +03:00
2023-01-19 13:13:00 +03:00
/// A ConsensusConnections builder that tracks all connection heads across multiple groups of servers
pub struct ConsensusFinder {
/// only main servers
main : ConnectionsGroup ,
/// main and backup servers
all : ConnectionsGroup ,
}
2022-09-02 23:16:20 +03:00
2023-01-19 13:13:00 +03:00
impl Default for ConsensusFinder {
fn default ( ) -> Self {
Self {
main : ConnectionsGroup ::without_backups ( ) ,
all : ConnectionsGroup ::with_backups ( ) ,
}
}
}
impl ConsensusFinder {
fn remove ( & mut self , rpc : & Web3Connection ) -> Option < H256 > {
// TODO: should we have multiple backup tiers? (remote datacenters vs third party)
if ! rpc . backup {
self . main . remove ( rpc ) ;
}
self . all . remove ( rpc )
}
fn insert ( & mut self , rpc : & Web3Connection , new_hash : H256 ) -> Option < H256 > {
// TODO: should we have multiple backup tiers? (remote datacenters vs third party)
if ! rpc . backup {
self . main . insert ( rpc , new_hash ) ;
}
self . all . insert ( rpc , new_hash )
}
/// Update our tracking of the rpc and return true if something changed
async fn update_rpc (
& mut self ,
rpc_head_block : Option < SavedBlock > ,
rpc : Arc < Web3Connection > ,
// we need this so we can save the block to caches. i don't like it though. maybe we should use a lazy_static Cache wrapper that has a "save_block" method?. i generally dislike globals but i also dislike all the types having to pass eachother around
web3_connections : & Web3Connections ,
) -> anyhow ::Result < bool > {
// add the rpc's block to connection_heads, or remove the rpc from connection_heads
let changed = match rpc_head_block {
Some ( mut rpc_head_block ) = > {
// we don't know if its on the heaviest chain yet
rpc_head_block . block = web3_connections
. save_block ( rpc_head_block . block , false )
. await ? ;
// we used to remove here if the block was too far behind. but it just made things more complicated
let rpc_head_hash = rpc_head_block . hash ( ) ;
if let Some ( prev_hash ) = self . insert ( & rpc , rpc_head_hash ) {
if prev_hash = = rpc_head_hash {
// this block was already sent by this rpc. return early
false
} else {
// new block for this rpc
true
2022-09-02 08:40:56 +03:00
}
2023-01-19 13:13:00 +03:00
} else {
// first block for this rpc
true
2022-08-24 02:56:47 +03:00
}
}
2023-01-19 13:13:00 +03:00
None = > {
if self . remove ( & rpc ) . is_none ( ) {
// this rpc was already removed
false
} else {
// rpc head changed from being synced to not
true
}
}
} ;
Ok ( changed )
}
// TODO: this could definitely be cleaner. i don't like the error handling/unwrapping
async fn best_consensus_connections (
& mut self ,
authorization : & Arc < Authorization > ,
web3_connections : & Web3Connections ,
) -> ConsensusConnections {
let highest_block_num = match self
. all
. highest_block ( authorization , web3_connections )
. await
{
None = > {
return ConsensusConnections ::default ( ) ;
}
Some ( x ) = > x . number . expect ( " blocks here should always have a number " ) ,
} ;
2023-01-23 09:02:08 +03:00
// TODO: also needs to be not less than our current head
let mut min_block_num = highest_block_num . saturating_sub ( U64 ::from ( 5 ) ) ;
// we also want to be sure we don't ever go backwards!
if let Some ( current_consensus_head_num ) = web3_connections . head_block_num ( ) {
min_block_num = min_block_num . max ( current_consensus_head_num ) ;
}
2023-01-19 13:13:00 +03:00
// TODO: pass `min_block_num` to consensus_head_connections?
let consensus_head_for_main = self
. main
. consensus_head_connections ( authorization , web3_connections )
. await
. map_err ( | err | err . context ( " cannot use main group " ) ) ;
let consensus_num_for_main = consensus_head_for_main
. as_ref ( )
. ok ( )
. map ( | x | x . head_block . as_ref ( ) . unwrap ( ) . number ( ) ) ;
if let Some ( consensus_num_for_main ) = consensus_num_for_main {
if consensus_num_for_main > = min_block_num {
return consensus_head_for_main . unwrap ( ) ;
}
2022-08-27 02:44:25 +03:00
}
2023-01-19 13:13:00 +03:00
// TODO: pass `min_block_num` to consensus_head_connections?
let consensus_connections_for_all = match self
. all
. consensus_head_connections ( authorization , web3_connections )
. await
{
Err ( err ) = > {
2023-01-19 13:26:54 +03:00
if self . all . rpc_name_to_hash . len ( ) < web3_connections . min_head_rpcs {
debug! ( " No consensus head yet: {} " , err ) ;
}
2023-01-19 13:13:00 +03:00
return ConsensusConnections ::default ( ) ;
}
Ok ( x ) = > x ,
} ;
let consensus_num_for_all = consensus_connections_for_all
. head_block
. as_ref ( )
. map ( | x | x . number ( ) ) ;
if consensus_num_for_all > consensus_num_for_main {
if consensus_num_for_all < Some ( min_block_num ) {
// TODO: this should have an alarm in sentry
error! ( " CONSENSUS HEAD w/ BACKUP NODES IS VERY OLD! " ) ;
}
consensus_connections_for_all
} else {
if let Ok ( x ) = consensus_head_for_main {
error! ( " CONSENSUS HEAD IS VERY OLD! Backup RPCs did not improve this situation " ) ;
x
} else {
2023-01-19 13:26:54 +03:00
// TODO: i don't think we need this error. and i doublt we'll ever even get here
2023-01-19 13:13:00 +03:00
error! ( " NO CONSENSUS HEAD! " ) ;
ConsensusConnections ::default ( )
}
}
2022-08-24 02:56:47 +03:00
}
}