2022-05-12 02:50:52 +03:00
///! Load balanced communication with a group of web3 providers
2022-06-16 05:53:37 +03:00
use anyhow ::Context ;
2022-05-18 23:18:01 +03:00
use arc_swap ::ArcSwap ;
2022-05-28 07:26:24 +03:00
use counter ::Counter ;
2022-06-14 09:42:52 +03:00
use dashmap ::DashMap ;
2022-05-05 22:07:09 +03:00
use derive_more ::From ;
2022-06-14 08:43:28 +03:00
use ethers ::prelude ::{ Block , ProviderError , Transaction , TxHash , H256 } ;
2022-06-16 05:53:37 +03:00
use futures ::future ::try_join_all ;
2022-05-05 22:07:09 +03:00
use futures ::stream ::FuturesUnordered ;
use futures ::StreamExt ;
2022-05-20 04:26:02 +03:00
use hashbrown ::HashMap ;
2022-05-21 01:16:15 +03:00
use serde ::ser ::{ SerializeStruct , Serializer } ;
use serde ::Serialize ;
2022-05-05 22:07:09 +03:00
use serde_json ::value ::RawValue ;
use std ::cmp ;
2022-05-20 04:26:02 +03:00
use std ::collections ::{ BTreeMap , BTreeSet } ;
2022-05-05 22:07:09 +03:00
use std ::fmt ;
use std ::sync ::Arc ;
2022-05-22 02:34:05 +03:00
use std ::time ::Duration ;
2022-06-16 05:53:37 +03:00
use tokio ::sync ::{ broadcast , watch } ;
2022-05-17 19:23:27 +03:00
use tokio ::task ;
2022-06-29 22:15:05 +03:00
use tokio ::time ::{ interval , sleep , MissedTickBehavior } ;
2022-06-16 20:51:49 +03:00
use tracing ::{ debug , error , info , info_span , instrument , trace , warn } ;
2022-05-05 22:07:09 +03:00
2022-06-16 05:53:37 +03:00
use crate ::app ::{ flatten_handle , AnyhowJoinHandle , TxState } ;
2022-05-05 22:07:09 +03:00
use crate ::config ::Web3ConnectionConfig ;
2022-05-06 08:44:30 +03:00
use crate ::connection ::{ ActiveRequestHandle , Web3Connection } ;
2022-05-28 07:26:24 +03:00
use crate ::jsonrpc ::{ JsonRpcForwardedResponse , JsonRpcRequest } ;
2022-05-05 22:07:09 +03:00
2022-05-21 01:16:15 +03:00
// Serialize so we can print it on our debug endpoint
#[ derive(Clone, Default, Serialize) ]
2022-05-18 23:18:01 +03:00
struct SyncedConnections {
head_block_num : u64 ,
2022-05-15 09:27:13 +03:00
head_block_hash : H256 ,
2022-06-14 07:04:14 +03:00
// TODO: this should be able to serialize, but it isn't
#[ serde(skip_serializing) ]
inner : BTreeSet < Arc < Web3Connection > > ,
2022-06-16 05:53:37 +03:00
// TODO: use petgraph for keeping track of the chain so we can do better fork handling
2022-05-05 22:07:09 +03:00
}
impl fmt ::Debug for SyncedConnections {
fn fmt ( & self , f : & mut fmt ::Formatter < '_ > ) -> fmt ::Result {
// TODO: the default formatter takes forever to write. this is too quiet though
2022-07-16 02:59:34 +03:00
f . debug_struct ( " SyncedConnections " )
. field ( " head_block_num " , & self . head_block_num )
. field ( " head_block_hash " , & self . head_block_hash )
. finish_non_exhaustive ( )
2022-05-05 22:07:09 +03:00
}
}
impl SyncedConnections {
2022-05-18 23:18:01 +03:00
pub fn get_head_block_hash ( & self ) -> & H256 {
& self . head_block_hash
2022-05-15 22:28:22 +03:00
}
2022-07-09 05:23:26 +03:00
pub fn get_head_block_num ( & self ) -> u64 {
self . head_block_num
}
2022-05-05 22:07:09 +03:00
}
/// A collection of web3 connections. Sends requests either the current best server or all servers.
#[ derive(From) ]
pub struct Web3Connections {
inner : Vec < Arc < Web3Connection > > ,
2022-05-18 23:18:01 +03:00
synced_connections : ArcSwap < SyncedConnections > ,
2022-06-16 20:51:49 +03:00
pending_transactions : Arc < DashMap < TxHash , TxState > > ,
2022-05-05 22:07:09 +03:00
}
2022-05-21 01:16:15 +03:00
impl Serialize for Web3Connections {
fn serialize < S > ( & self , serializer : S ) -> Result < S ::Ok , S ::Error >
where
S : Serializer ,
{
let inner : Vec < & Web3Connection > = self . inner . iter ( ) . map ( | x | x . as_ref ( ) ) . collect ( ) ;
// 3 is the number of fields in the struct.
let mut state = serializer . serialize_struct ( " Web3Connections " , 2 ) ? ;
state . serialize_field ( " rpcs " , & inner ) ? ;
state . serialize_field ( " synced_connections " , & * * self . synced_connections . load ( ) ) ? ;
state . end ( )
}
}
2022-05-05 22:07:09 +03:00
impl fmt ::Debug for Web3Connections {
fn fmt ( & self , f : & mut fmt ::Formatter < '_ > ) -> fmt ::Result {
// TODO: the default formatter takes forever to write. this is too quiet though
f . debug_struct ( " Web3Connections " )
. field ( " inner " , & self . inner )
. finish_non_exhaustive ( )
}
}
impl Web3Connections {
2022-06-14 07:04:14 +03:00
// #[instrument(name = "spawn_Web3Connections", skip_all)]
pub async fn spawn (
2022-05-12 21:49:57 +03:00
chain_id : usize ,
2022-06-14 08:43:28 +03:00
server_configs : Vec < Web3ConnectionConfig > ,
2022-05-22 02:34:05 +03:00
http_client : Option < & reqwest ::Client > ,
2022-07-07 06:22:09 +03:00
redis_client_pool : Option < & redis_cell_client ::RedisClientPool > ,
2022-06-14 07:04:14 +03:00
head_block_sender : Option < watch ::Sender < Block < TxHash > > > ,
2022-06-16 05:53:37 +03:00
pending_tx_sender : Option < broadcast ::Sender < TxState > > ,
2022-06-16 20:51:49 +03:00
pending_transactions : Arc < DashMap < TxHash , TxState > > ,
2022-06-14 08:43:28 +03:00
) -> anyhow ::Result < ( Arc < Self > , AnyhowJoinHandle < ( ) > ) > {
let num_connections = server_configs . len ( ) ;
2022-06-14 09:09:56 +03:00
// TODO: try_join_all
2022-06-16 20:51:49 +03:00
let mut handles = vec! [ ] ;
2022-06-14 08:43:28 +03:00
// TODO: only create these if head_block_sender and pending_tx_sender are set
let ( pending_tx_id_sender , pending_tx_id_receiver ) = flume ::unbounded ( ) ;
let ( block_sender , block_receiver ) = flume ::unbounded ( ) ;
2022-05-05 22:07:09 +03:00
2022-06-29 22:15:05 +03:00
let http_interval_sender = if http_client . is_some ( ) {
let ( sender , receiver ) = broadcast ::channel ( 1 ) ;
drop ( receiver ) ;
// TODO: what interval?
let mut interval = interval ( Duration ::from_secs ( 13 ) ) ;
interval . set_missed_tick_behavior ( MissedTickBehavior ::Delay ) ;
let sender = Arc ::new ( sender ) ;
let f = {
let sender = sender . clone ( ) ;
async move {
loop {
// TODO: every time a head_block arrives (maybe with a small delay), or on the interval.
interval . tick ( ) . await ;
2022-07-16 08:21:08 +03:00
info! ( " http interval ready " ) ;
2022-06-29 22:15:05 +03:00
// errors are okay. they mean that all receivers have been dropped
let _ = sender . send ( ( ) ) ;
}
}
} ;
// TODO: do something with this handle?
tokio ::spawn ( f ) ;
Some ( sender )
} else {
None
} ;
2022-05-16 01:02:14 +03:00
// turn configs into connections
let mut connections = Vec ::with_capacity ( num_connections ) ;
2022-06-14 08:43:28 +03:00
for server_config in server_configs . into_iter ( ) {
2022-05-12 21:49:57 +03:00
match server_config
2022-06-14 08:43:28 +03:00
. spawn (
2022-07-07 06:22:09 +03:00
redis_client_pool ,
2022-06-14 08:43:28 +03:00
chain_id ,
http_client ,
2022-06-29 22:15:05 +03:00
http_interval_sender . clone ( ) ,
2022-06-14 08:43:28 +03:00
Some ( block_sender . clone ( ) ) ,
Some ( pending_tx_id_sender . clone ( ) ) ,
)
2022-05-12 21:49:57 +03:00
. await
{
2022-06-14 08:43:28 +03:00
Ok ( ( connection , connection_handle ) ) = > {
2022-06-16 20:51:49 +03:00
handles . push ( flatten_handle ( connection_handle ) ) ;
2022-06-14 08:43:28 +03:00
connections . push ( connection )
}
2022-06-21 04:02:49 +03:00
// TODO: include the server url in this
2022-05-13 00:29:33 +03:00
Err ( e ) = > warn! ( " Unable to connect to a server! {:?} " , e ) ,
2022-05-06 00:38:15 +03:00
}
2022-05-05 22:07:09 +03:00
}
2022-05-22 02:49:23 +03:00
// TODO: less than 3? what should we do here?
2022-05-16 01:02:14 +03:00
if connections . len ( ) < 2 {
2022-05-22 03:34:33 +03:00
warn! ( " Only {} connection(s)! " , connections . len ( ) ) ;
2022-05-16 01:02:14 +03:00
}
2022-05-15 22:28:22 +03:00
2022-05-20 05:01:02 +03:00
let synced_connections = SyncedConnections ::default ( ) ;
2022-05-16 01:02:14 +03:00
let connections = Arc ::new ( Self {
inner : connections ,
2022-05-18 23:18:01 +03:00
synced_connections : ArcSwap ::new ( Arc ::new ( synced_connections ) ) ,
2022-06-16 20:51:49 +03:00
pending_transactions ,
2022-05-16 01:02:14 +03:00
} ) ;
2022-06-14 07:04:14 +03:00
let handle = {
let connections = connections . clone ( ) ;
tokio ::spawn ( async move {
2022-06-16 20:51:49 +03:00
// TODO: try_join_all with the other handles here
2022-06-14 07:04:14 +03:00
connections
2022-06-14 08:43:28 +03:00
. subscribe (
pending_tx_id_receiver ,
block_receiver ,
head_block_sender ,
pending_tx_sender ,
)
2022-06-14 07:04:14 +03:00
. await
} )
} ;
2022-06-14 08:43:28 +03:00
Ok ( ( connections , handle ) )
2022-05-18 19:35:06 +03:00
}
2022-06-16 20:51:49 +03:00
async fn _funnel_transaction (
& self ,
rpc : Arc < Web3Connection > ,
pending_tx_id : TxHash ,
2022-06-17 01:23:41 +03:00
) -> Result < Option < TxState > , ProviderError > {
2022-06-16 20:51:49 +03:00
// TODO: yearn devs have had better luck with batching these, but i think that's likely just adding a delay itself
2022-07-16 02:59:34 +03:00
// TODO: there is a race here on geth. sometimes the rpc isn't yet ready to serve the transaction (even though they told us about it!)
2022-06-16 23:57:48 +03:00
// TODO: maximum wait time
2022-06-17 01:23:41 +03:00
let pending_transaction : Transaction = match rpc . try_request_handle ( ) . await {
Ok ( request_handle ) = > {
request_handle
. request ( " eth_getTransactionByHash " , ( pending_tx_id , ) )
. await ?
}
Err ( err ) = > {
trace! (
? pending_tx_id ,
? rpc ,
? err ,
" cancelled funneling transaction "
) ;
return Ok ( None ) ;
}
} ;
2022-06-16 20:51:49 +03:00
trace! ( ? pending_transaction , " pending " ) ;
match & pending_transaction . block_hash {
Some ( _block_hash ) = > {
// the transaction is already confirmed. no need to save in the pending_transactions map
2022-06-17 01:23:41 +03:00
Ok ( Some ( TxState ::Confirmed ( pending_transaction ) ) )
2022-06-16 20:51:49 +03:00
}
2022-06-17 01:23:41 +03:00
None = > Ok ( Some ( TxState ::Pending ( pending_transaction ) ) ) ,
2022-06-16 20:51:49 +03:00
}
}
async fn funnel_transaction (
2022-06-16 05:53:37 +03:00
self : Arc < Self > ,
rpc : Arc < Web3Connection > ,
pending_tx_id : TxHash ,
pending_tx_sender : broadcast ::Sender < TxState > ,
) -> anyhow ::Result < ( ) > {
2022-06-16 20:51:49 +03:00
// TODO: how many retries? until some timestamp is hit is probably better. maybe just loop and call this with a timeout
2022-06-16 23:57:48 +03:00
// TODO: after more investigation, i don't think retries will help. i think this is because chains of transactions get dropped from memory
// TODO: also check the "confirmed transactions" mapping? maybe one shared mapping with TxState in it?
2022-06-16 20:51:49 +03:00
2022-06-18 10:06:54 +03:00
if pending_tx_sender . receiver_count ( ) = = 0 {
// no receivers, so no point in querying to get the full transaction
2022-06-16 23:57:48 +03:00
return Ok ( ( ) ) ;
}
2022-06-16 05:53:37 +03:00
2022-07-16 08:21:08 +03:00
trace! ( ? pending_tx_id , " checking pending_transactions on {} " , rpc ) ;
2022-06-18 10:06:54 +03:00
if self . pending_transactions . contains_key ( & pending_tx_id ) {
// this transaction has already been processed
2022-06-16 23:57:48 +03:00
return Ok ( ( ) ) ;
}
// query the rpc for this transaction
// it is possible that another rpc is also being queried. thats fine. we want the fastest response
match self . _funnel_transaction ( rpc . clone ( ) , pending_tx_id ) . await {
2022-06-17 01:23:41 +03:00
Ok ( Some ( tx_state ) ) = > {
2022-06-16 20:51:49 +03:00
let _ = pending_tx_sender . send ( tx_state ) ;
2022-06-16 05:53:37 +03:00
2022-06-16 23:57:48 +03:00
trace! ( ? pending_tx_id , " sent " ) ;
2022-06-16 05:53:37 +03:00
2022-06-16 23:57:48 +03:00
// we sent the transaction. return now. don't break looping because that gives a warning
2022-06-16 20:51:49 +03:00
return Ok ( ( ) ) ;
2022-06-16 05:53:37 +03:00
}
2022-06-17 01:23:41 +03:00
Ok ( None ) = > { }
2022-06-16 23:57:48 +03:00
Err ( err ) = > {
trace! ( ? err , ? pending_tx_id , " failed fetching transaction " ) ;
// unable to update the entry. sleep and try again soon
// TODO: retry with exponential backoff with jitter starting from a much smaller time
// sleep(Duration::from_millis(100)).await;
}
2022-06-16 05:53:37 +03:00
}
2022-06-16 23:57:48 +03:00
// warn is too loud. this is somewhat common
// "There is a Pending txn with a lower account nonce. This txn can only be executed after confirmation of the earlier Txn Hash#"
// sometimes it's been pending for many hours
// sometimes it's maybe something else?
debug! ( ? pending_tx_id , " not found on {} " , rpc ) ;
2022-06-16 05:53:37 +03:00
Ok ( ( ) )
}
2022-06-14 07:04:14 +03:00
/// subscribe to all the backend rpcs
async fn subscribe (
self : Arc < Self > ,
2022-06-14 08:43:28 +03:00
pending_tx_id_receiver : flume ::Receiver < ( TxHash , Arc < Web3Connection > ) > ,
block_receiver : flume ::Receiver < ( Block < TxHash > , Arc < Web3Connection > ) > ,
2022-06-14 07:04:14 +03:00
head_block_sender : Option < watch ::Sender < Block < TxHash > > > ,
2022-06-16 05:53:37 +03:00
pending_tx_sender : Option < broadcast ::Sender < TxState > > ,
2022-06-14 07:04:14 +03:00
) -> anyhow ::Result < ( ) > {
2022-06-16 05:53:37 +03:00
let mut futures = vec! [ ] ;
2022-05-18 19:35:06 +03:00
2022-06-14 08:43:28 +03:00
// setup the transaction funnel
2022-06-14 07:04:14 +03:00
// it skips any duplicates (unless they are being orphaned)
// fetches new transactions from the notifying rpc
// forwards new transacitons to pending_tx_receipt_sender
2022-06-15 01:02:18 +03:00
if let Some ( pending_tx_sender ) = pending_tx_sender . clone ( ) {
2022-06-14 07:04:14 +03:00
// TODO: do something with the handle so we can catch any errors
2022-06-14 09:42:52 +03:00
let clone = self . clone ( ) ;
2022-06-14 07:04:14 +03:00
let handle = task ::spawn ( async move {
2022-06-16 05:53:37 +03:00
while let Ok ( ( pending_tx_id , rpc ) ) = pending_tx_id_receiver . recv_async ( ) . await {
// TODO: spawn this
2022-06-16 20:51:49 +03:00
let f = clone . clone ( ) . funnel_transaction (
2022-06-16 05:53:37 +03:00
rpc ,
pending_tx_id ,
pending_tx_sender . clone ( ) ,
) ;
tokio ::spawn ( f ) ;
2022-06-14 07:04:14 +03:00
}
Ok ( ( ) )
} ) ;
2022-06-16 05:53:37 +03:00
futures . push ( flatten_handle ( handle ) ) ;
} else {
unimplemented! ( ) ;
2022-06-14 07:04:14 +03:00
}
2022-06-14 08:43:28 +03:00
// setup the block funnel
2022-06-14 07:04:14 +03:00
if let Some ( head_block_sender ) = head_block_sender {
let connections = Arc ::clone ( & self ) ;
2022-06-15 01:02:18 +03:00
let pending_tx_sender = pending_tx_sender . clone ( ) ;
2022-05-18 19:35:06 +03:00
let handle = task ::Builder ::default ( )
2022-06-14 07:04:14 +03:00
. name ( " update_synced_rpcs " )
2022-05-17 19:23:27 +03:00
. spawn ( async move {
2022-06-14 07:04:14 +03:00
connections
2022-06-15 01:02:18 +03:00
. update_synced_rpcs ( block_receiver , head_block_sender , pending_tx_sender )
2022-05-17 19:23:27 +03:00
. await
} ) ;
2022-05-18 19:35:06 +03:00
2022-06-16 05:53:37 +03:00
futures . push ( flatten_handle ( handle ) ) ;
2022-05-16 01:02:14 +03:00
}
2022-06-14 08:43:28 +03:00
if futures . is_empty ( ) {
// no transaction or block subscriptions.
unimplemented! ( " every second, check that the provider is still connected " ) ;
}
2022-06-16 05:53:37 +03:00
if let Err ( e ) = try_join_all ( futures ) . await {
2022-06-16 20:51:49 +03:00
error! ( " subscriptions over: {:?} " , self ) ;
2022-06-16 05:53:37 +03:00
return Err ( e ) ;
2022-06-14 07:04:14 +03:00
}
2022-05-18 19:35:06 +03:00
2022-06-14 08:43:28 +03:00
info! ( " subscriptions over: {:?} " , self ) ;
2022-06-14 07:04:14 +03:00
Ok ( ( ) )
2022-05-05 22:07:09 +03:00
}
2022-07-09 05:23:26 +03:00
pub fn get_head_block_num ( & self ) -> u64 {
self . synced_connections . load ( ) . get_head_block_num ( )
}
2022-05-18 23:18:01 +03:00
pub fn get_head_block_hash ( & self ) -> H256 {
* self . synced_connections . load ( ) . get_head_block_hash ( )
2022-05-16 01:02:14 +03:00
}
2022-05-05 22:07:09 +03:00
2022-06-29 21:22:53 +03:00
pub fn has_synced_rpcs ( & self ) -> bool {
2022-06-29 22:15:05 +03:00
! self . synced_connections . load ( ) . inner . is_empty ( )
2022-06-29 21:22:53 +03:00
}
2022-07-09 05:23:26 +03:00
pub fn num_synced_rpcs ( & self ) -> usize {
self . synced_connections . load ( ) . inner . len ( )
}
2022-05-28 07:26:24 +03:00
/// Send the same request to all the handles. Returning the most common success or most common error.
2022-05-17 03:56:56 +03:00
#[ instrument(skip_all) ]
2022-05-12 02:50:52 +03:00
pub async fn try_send_parallel_requests (
2022-05-28 07:26:24 +03:00
& self ,
2022-05-12 02:50:52 +03:00
active_request_handles : Vec < ActiveRequestHandle > ,
2022-05-28 07:26:24 +03:00
method : & str ,
// TODO: remove this box once i figure out how to do the options
2022-07-08 22:01:11 +03:00
params : Option < & serde_json ::Value > ,
2022-05-28 07:26:24 +03:00
) -> Result < Box < RawValue > , ProviderError > {
2022-05-12 06:54:42 +03:00
// TODO: if only 1 active_request_handles, do self.try_send_request
2022-05-05 22:07:09 +03:00
2022-05-28 07:26:24 +03:00
let responses = active_request_handles
. into_iter ( )
. map ( | active_request_handle | async move {
let result : Result < Box < RawValue > , _ > =
active_request_handle . request ( method , params ) . await ;
result
} )
. collect ::< FuturesUnordered < _ > > ( )
. collect ::< Vec < Result < Box < RawValue > , ProviderError > > > ( )
. await ;
2022-05-28 21:45:45 +03:00
// TODO: Strings are not great keys, but we can't use RawValue or ProviderError as keys because they don't implement Hash or Eq
2022-05-28 07:26:24 +03:00
let mut count_map : HashMap < String , Result < Box < RawValue > , ProviderError > > = HashMap ::new ( ) ;
let mut counts : Counter < String > = Counter ::new ( ) ;
let mut any_ok = false ;
for response in responses {
let s = format! ( " {:?} " , response ) ;
if count_map . get ( & s ) . is_none ( ) {
if response . is_ok ( ) {
any_ok = true ;
}
2022-05-17 19:23:27 +03:00
2022-05-28 07:26:24 +03:00
count_map . insert ( s . clone ( ) , response ) ;
}
2022-05-05 22:07:09 +03:00
2022-05-28 07:26:24 +03:00
counts . update ( [ s ] . into_iter ( ) ) ;
2022-05-05 22:07:09 +03:00
}
2022-05-28 07:26:24 +03:00
for ( most_common , _ ) in counts . most_common_ordered ( ) {
let most_common = count_map . remove ( & most_common ) . unwrap ( ) ;
if any_ok & & most_common . is_err ( ) {
// errors were more common, but we are going to skip them because we got an okay
continue ;
} else {
// return the most common
return most_common ;
2022-05-05 22:07:09 +03:00
}
}
2022-05-28 07:26:24 +03:00
// TODO: what should we do if we get here? i don't think we will
panic! ( " i don't think this is possible " )
2022-05-05 22:07:09 +03:00
}
2022-05-13 20:43:37 +03:00
/// TODO: possible dead lock here. investigate more. probably refactor
2022-05-18 23:18:01 +03:00
/// TODO: move parts of this onto SyncedConnections?
2022-05-20 05:16:48 +03:00
// we don't instrument here because we put a span inside the while loop
2022-05-16 01:02:14 +03:00
async fn update_synced_rpcs (
2022-05-15 09:27:13 +03:00
& self ,
2022-06-14 07:04:14 +03:00
block_receiver : flume ::Receiver < ( Block < TxHash > , Arc < Web3Connection > ) > ,
2022-05-30 07:30:13 +03:00
head_block_sender : watch ::Sender < Block < TxHash > > ,
2022-06-15 01:02:18 +03:00
// TODO: use pending_tx_sender
2022-06-16 05:53:37 +03:00
pending_tx_sender : Option < broadcast ::Sender < TxState > > ,
2022-05-15 09:27:13 +03:00
) -> anyhow ::Result < ( ) > {
2022-05-29 22:33:10 +03:00
let total_rpcs = self . inner . len ( ) ;
2022-05-18 23:18:01 +03:00
2022-06-14 07:04:14 +03:00
let mut connection_states : HashMap < Arc < Web3Connection > , _ > =
HashMap ::with_capacity ( total_rpcs ) ;
2022-05-18 19:35:06 +03:00
2022-05-20 05:01:02 +03:00
let mut pending_synced_connections = SyncedConnections ::default ( ) ;
2022-05-18 23:18:01 +03:00
2022-06-14 07:04:14 +03:00
while let Ok ( ( new_block , rpc ) ) = block_receiver . recv_async ( ) . await {
2022-05-31 04:55:04 +03:00
let new_block_num = match new_block . number {
Some ( x ) = > x . as_u64 ( ) ,
None = > {
2022-06-16 05:53:37 +03:00
// block without a number is expected a node is syncing or
if new_block . hash . is_some ( ) {
// this seems unlikely, but i'm pretty sure we see it
warn! ( ? new_block , " Block without number! " ) ;
}
2022-05-31 04:55:04 +03:00
continue ;
}
} ;
2022-05-30 04:28:22 +03:00
let new_block_hash = new_block . hash . unwrap ( ) ;
2022-05-20 00:58:21 +03:00
// TODO: span with more in it?
2022-05-19 06:00:54 +03:00
// TODO: make sure i'm doing this span right
2022-05-20 00:58:21 +03:00
// TODO: show the actual rpc url?
2022-06-14 07:04:14 +03:00
let span = info_span! ( " block_receiver " , ? rpc , new_block_num ) ;
// TODO: clippy lint to make sure we don't hold this across an awaited future
2022-05-18 19:35:06 +03:00
let _enter = span . enter ( ) ;
2022-05-20 00:58:21 +03:00
if new_block_num = = 0 {
2022-06-15 01:02:18 +03:00
warn! ( " still syncing " ) ;
2022-05-20 00:58:21 +03:00
}
2022-06-14 07:04:14 +03:00
connection_states . insert ( rpc . clone ( ) , ( new_block_num , new_block_hash ) ) ;
2022-05-18 23:18:01 +03:00
// TODO: do something to update the synced blocks
2022-05-18 23:28:00 +03:00
match new_block_num . cmp ( & pending_synced_connections . head_block_num ) {
2022-05-18 23:18:01 +03:00
cmp ::Ordering ::Greater = > {
// the rpc's newest block is the new overall best block
2022-05-20 05:19:35 +03:00
// TODO: if trace, do the full block hash?
2022-05-21 23:40:22 +03:00
// TODO: only accept this block if it is a child of the current head_block
2022-05-20 05:19:35 +03:00
info! ( " new head: {} " , new_block_hash ) ;
2022-05-18 23:18:01 +03:00
2022-05-18 23:28:00 +03:00
pending_synced_connections . inner . clear ( ) ;
2022-06-14 07:04:14 +03:00
pending_synced_connections . inner . insert ( rpc ) ;
2022-05-18 23:18:01 +03:00
2022-05-18 23:28:00 +03:00
pending_synced_connections . head_block_num = new_block_num ;
2022-05-19 06:00:54 +03:00
// TODO: if the parent hash isn't our previous best block, ignore it
2022-05-18 23:28:00 +03:00
pending_synced_connections . head_block_hash = new_block_hash ;
2022-05-30 04:28:22 +03:00
2022-06-16 05:53:37 +03:00
head_block_sender
. send ( new_block . clone ( ) )
. context ( " head_block_sender " ) ? ;
2022-06-14 09:09:56 +03:00
// TODO: mark all transactions as confirmed
// TODO: mark any orphaned transactions as unconfirmed
2022-05-18 23:18:01 +03:00
}
cmp ::Ordering ::Equal = > {
2022-05-19 06:00:54 +03:00
if new_block_hash = = pending_synced_connections . head_block_hash {
// this rpc has caught up with the best known head
// do not clear synced_connections.
// we just want to add this rpc to the end
// TODO: HashSet here? i think we get dupes if we don't
2022-06-14 07:04:14 +03:00
pending_synced_connections . inner . insert ( rpc ) ;
2022-05-19 06:00:54 +03:00
} else {
2022-05-18 23:18:01 +03:00
// same height, but different chain
2022-05-19 06:00:54 +03:00
// check connection_states to see which head block is more popular!
2022-06-14 07:04:14 +03:00
let mut rpc_ids_by_block : BTreeMap < H256 , Vec < Arc < Web3Connection > > > =
BTreeMap ::new ( ) ;
2022-05-19 06:00:54 +03:00
2022-05-29 22:33:10 +03:00
let mut counted_rpcs = 0 ;
2022-05-19 06:00:54 +03:00
2022-06-14 07:04:14 +03:00
for ( rpc , ( block_num , block_hash ) ) in connection_states . iter ( ) {
2022-05-19 06:00:54 +03:00
if * block_num ! = new_block_num {
// this connection isn't synced. we don't care what hash it has
continue ;
}
2022-05-29 22:33:10 +03:00
counted_rpcs + = 1 ;
2022-05-19 06:00:54 +03:00
let count = rpc_ids_by_block
. entry ( * block_hash )
2022-05-29 22:33:10 +03:00
. or_insert_with ( | | Vec ::with_capacity ( total_rpcs - 1 ) ) ;
2022-05-19 06:00:54 +03:00
2022-06-14 07:04:14 +03:00
count . push ( rpc . clone ( ) ) ;
2022-05-19 06:00:54 +03:00
}
2022-05-29 22:33:10 +03:00
let most_common_head_hash = * rpc_ids_by_block
2022-05-19 06:00:54 +03:00
. iter ( )
. max_by ( | a , b | a . 1. len ( ) . cmp ( & b . 1. len ( ) ) )
. map ( | ( k , _v ) | k )
. unwrap ( ) ;
2022-05-29 22:33:10 +03:00
let synced_rpcs = rpc_ids_by_block . remove ( & most_common_head_hash ) . unwrap ( ) ;
2022-05-18 23:18:01 +03:00
warn! (
2022-05-19 06:00:54 +03:00
" chain is forked! {} possible heads. {}/{}/{} rpcs have {} " ,
2022-05-29 22:33:10 +03:00
rpc_ids_by_block . len ( ) + 1 ,
synced_rpcs . len ( ) ,
counted_rpcs ,
total_rpcs ,
2022-05-19 06:00:54 +03:00
most_common_head_hash
2022-05-18 23:18:01 +03:00
) ;
2022-05-05 22:07:09 +03:00
2022-05-30 04:28:22 +03:00
// TODO: do this more efficiently?
if pending_synced_connections . head_block_hash ! = most_common_head_hash {
2022-06-16 05:53:37 +03:00
head_block_sender
. send ( new_block . clone ( ) )
. context ( " head_block_sender " ) ? ;
2022-05-30 04:28:22 +03:00
pending_synced_connections . head_block_hash = most_common_head_hash ;
}
2022-05-29 22:33:10 +03:00
pending_synced_connections . inner = synced_rpcs . into_iter ( ) . collect ( ) ;
2022-05-19 06:00:54 +03:00
}
2022-05-18 23:18:01 +03:00
}
cmp ::Ordering ::Less = > {
// this isn't the best block in the tier. don't do anything
2022-06-14 07:04:14 +03:00
if ! pending_synced_connections . inner . remove ( & rpc ) {
2022-05-19 06:00:54 +03:00
// we didn't remove anything. nothing more to do
continue ;
}
// we removed. don't continue so that we update self.synced_connections
2022-05-18 23:18:01 +03:00
}
}
// the synced connections have changed
2022-05-18 23:28:00 +03:00
let synced_connections = Arc ::new ( pending_synced_connections . clone ( ) ) ;
2022-05-18 23:18:01 +03:00
2022-05-29 22:33:10 +03:00
if synced_connections . inner . len ( ) = = total_rpcs {
2022-05-22 07:22:30 +03:00
// TODO: more metrics
2022-07-02 04:20:28 +03:00
trace! ( " all head: {} " , new_block_hash ) ;
2022-05-22 07:22:30 +03:00
}
2022-05-20 05:11:50 +03:00
trace! (
" rpcs at {}: {:?} " ,
synced_connections . head_block_hash ,
synced_connections . inner
) ;
2022-06-15 01:02:18 +03:00
// TODO: what if the hashes don't match?
if pending_synced_connections . head_block_hash = = new_block_hash {
// mark all transactions in the block as confirmed
2022-06-16 05:53:37 +03:00
if pending_tx_sender . is_some ( ) {
2022-06-15 01:02:18 +03:00
for tx_hash in & new_block . transactions {
2022-06-16 23:57:48 +03:00
// TODO: should we mark as confirmed via pending_tx_sender?
// TODO: possible deadlock here!
// trace!("removing {}...", tx_hash);
2022-06-16 05:53:37 +03:00
let _ = self . pending_transactions . remove ( tx_hash ) ;
2022-06-16 23:57:48 +03:00
// trace!("removed {}", tx_hash);
2022-06-15 01:02:18 +03:00
}
} ;
// TODO: mark any orphaned transactions as unconfirmed
}
2022-05-20 05:11:50 +03:00
// TODO: only publish if there are x (default 2) nodes synced to this block?
2022-06-15 01:02:18 +03:00
// TODO: do this before or after processing all the transactions in this block?
2022-05-18 23:28:00 +03:00
self . synced_connections . swap ( synced_connections ) ;
2022-05-06 08:44:30 +03:00
}
2022-05-05 22:07:09 +03:00
2022-05-18 19:35:06 +03:00
// TODO: if there was an error, we should return it
warn! ( " block_receiver exited! " ) ;
2022-05-05 22:07:09 +03:00
Ok ( ( ) )
}
/// get the best available rpc server
2022-05-17 03:56:56 +03:00
#[ instrument(skip_all) ]
2022-07-02 04:20:28 +03:00
pub async fn next_upstream_server (
& self ,
skip : & [ Arc < Web3Connection > ] ,
2022-07-09 06:34:39 +03:00
archive_needed : bool ,
2022-07-02 04:20:28 +03:00
) -> Result < ActiveRequestHandle , Option < Duration > > {
2022-05-22 02:34:05 +03:00
let mut earliest_retry_after = None ;
2022-05-05 22:07:09 +03:00
2022-07-16 02:59:34 +03:00
// filter the synced rpcs
let mut synced_rpcs : Vec < Arc < Web3Connection > > = if archive_needed {
// TODO: this includes ALL archive servers. but we only want them if they are on a somewhat recent block
// TODO: maybe instead of "archive_needed" bool it should be the minimum height. then even delayed servers might be fine. will need to track all heights then
self . inner
. iter ( )
. filter ( | x | x . is_archive ( ) )
. filter ( | x | ! skip . contains ( x ) )
. cloned ( )
. collect ( )
} else {
self . synced_connections
. load ( )
. inner
. iter ( )
. filter ( | x | ! skip . contains ( x ) )
. cloned ( )
. collect ( )
} ;
2022-05-05 22:07:09 +03:00
2022-07-02 04:20:28 +03:00
if synced_rpcs . is_empty ( ) {
return Err ( None ) ;
}
2022-07-16 07:13:02 +03:00
let sort_cache : HashMap < _ , _ > = synced_rpcs
2022-05-05 22:07:09 +03:00
. iter ( )
2022-06-14 07:04:14 +03:00
. map ( | rpc | {
2022-05-18 23:18:01 +03:00
let active_requests = rpc . active_requests ( ) ;
let soft_limit = rpc . soft_limit ( ) ;
2022-07-16 07:13:02 +03:00
let is_archive = rpc . is_archive ( ) ;
2022-05-06 23:44:12 +03:00
let utilization = active_requests as f32 / soft_limit as f32 ;
2022-07-16 07:13:02 +03:00
( rpc . clone ( ) , ( is_archive , utilization , soft_limit ) )
2022-05-05 22:07:09 +03:00
} )
. collect ( ) ;
2022-06-14 07:04:14 +03:00
synced_rpcs . sort_unstable_by ( | a , b | {
2022-07-16 07:13:02 +03:00
let a_sorts = sort_cache . get ( a ) . unwrap ( ) ;
let b_sorts = sort_cache . get ( b ) . unwrap ( ) ;
2022-05-06 23:44:12 +03:00
// TODO: i'm comparing floats. crap
2022-07-16 07:13:02 +03:00
a_sorts . partial_cmp ( b_sorts ) . unwrap_or ( cmp ::Ordering ::Equal )
2022-05-05 22:07:09 +03:00
} ) ;
2022-05-19 06:00:54 +03:00
// now that the rpcs are sorted, try to get an active request handle for one of them
2022-06-14 07:04:14 +03:00
for rpc in synced_rpcs . into_iter ( ) {
2022-05-05 22:07:09 +03:00
// increment our connection counter
2022-05-22 02:34:05 +03:00
match rpc . try_request_handle ( ) . await {
Err ( retry_after ) = > {
earliest_retry_after = earliest_retry_after . min ( Some ( retry_after ) ) ;
2022-05-06 07:29:25 +03:00
}
2022-05-06 23:44:12 +03:00
Ok ( handle ) = > {
2022-06-14 07:04:14 +03:00
trace! ( " next server on {:?}: {:?} " , self , rpc ) ;
2022-05-06 23:44:12 +03:00
return Ok ( handle ) ;
}
2022-05-05 22:07:09 +03:00
}
}
2022-05-22 02:34:05 +03:00
warn! ( " no servers on {:?}! {:?} " , self , earliest_retry_after ) ;
2022-05-06 23:44:12 +03:00
2022-05-05 22:07:09 +03:00
// this might be None
2022-05-22 02:34:05 +03:00
Err ( earliest_retry_after )
2022-05-05 22:07:09 +03:00
}
/// get all rpc servers that are not rate limited
2022-05-13 23:50:11 +03:00
/// returns servers even if they aren't in sync. This is useful for broadcasting signed transactions
2022-07-09 06:34:39 +03:00
pub async fn get_upstream_servers (
& self ,
archive_needed : bool ,
) -> Result < Vec < ActiveRequestHandle > , Option < Duration > > {
2022-05-22 02:34:05 +03:00
let mut earliest_retry_after = None ;
2022-05-05 22:07:09 +03:00
// TODO: with capacity?
let mut selected_rpcs = vec! [ ] ;
for connection in self . inner . iter ( ) {
2022-07-09 07:25:59 +03:00
if archive_needed & & ! connection . is_archive ( ) {
continue ;
}
2022-05-05 22:07:09 +03:00
// check rate limits and increment our connection counter
2022-05-22 02:34:05 +03:00
match connection . try_request_handle ( ) . await {
Err ( retry_after ) = > {
2022-05-06 07:29:25 +03:00
// this rpc is not available. skip it
2022-07-02 04:20:28 +03:00
earliest_retry_after = earliest_retry_after . min ( Some ( retry_after ) ) ;
2022-05-06 07:29:25 +03:00
}
Ok ( handle ) = > selected_rpcs . push ( handle ) ,
2022-05-05 22:07:09 +03:00
}
}
if ! selected_rpcs . is_empty ( ) {
return Ok ( selected_rpcs ) ;
}
2022-05-22 02:34:05 +03:00
// return the earliest retry_after (if no rpcs are synced, this will be None)
Err ( earliest_retry_after )
2022-05-05 22:07:09 +03:00
}
2022-05-28 07:26:24 +03:00
2022-05-29 04:23:58 +03:00
/// be sure there is a timeout on this or it might loop forever
pub async fn try_send_best_upstream_server (
& self ,
request : JsonRpcRequest ,
2022-07-09 05:23:26 +03:00
archive_needed : bool ,
2022-05-29 04:23:58 +03:00
) -> anyhow ::Result < JsonRpcForwardedResponse > {
2022-07-02 04:20:28 +03:00
let mut skip_rpcs = vec! [ ] ;
// TODO: maximum retries?
2022-05-29 04:23:58 +03:00
loop {
2022-07-02 04:20:28 +03:00
if skip_rpcs . len ( ) = = self . inner . len ( ) {
break ;
}
2022-07-09 06:34:39 +03:00
match self . next_upstream_server ( & skip_rpcs , archive_needed ) . await {
2022-05-29 04:23:58 +03:00
Ok ( active_request_handle ) = > {
2022-07-02 04:20:28 +03:00
// save the rpc in case we get an error and want to retry on another server
skip_rpcs . push ( active_request_handle . clone_connection ( ) ) ;
2022-05-29 04:23:58 +03:00
let response_result = active_request_handle
. request ( & request . method , & request . params )
. await ;
2022-07-02 04:20:28 +03:00
match JsonRpcForwardedResponse ::try_from_response_result (
2022-06-04 00:45:44 +03:00
response_result ,
request . id . clone ( ) ,
) {
Ok ( response ) = > {
2022-07-02 04:20:28 +03:00
if let Some ( error ) = & response . error {
trace! ( ? response , " rpc error " ) ;
2022-07-09 07:25:59 +03:00
// some errors should be retried on other nodes
if error . code = = - 32000 {
let error_msg = error . message . as_str ( ) ;
// TODO: regex?
let retry_prefixes = [
2022-07-02 04:20:28 +03:00
" header not found " ,
" header for hash not found " ,
2022-07-09 07:25:59 +03:00
" missing trie node " ,
2022-07-02 04:20:28 +03:00
" node not started " ,
" RPC timeout " ,
2022-07-09 07:25:59 +03:00
] ;
for retry_prefix in retry_prefixes {
if error_msg . starts_with ( retry_prefix ) {
continue ;
}
}
2022-07-02 04:20:28 +03:00
}
2022-06-04 00:45:44 +03:00
} else {
2022-07-02 04:20:28 +03:00
trace! ( ? response , " rpc success " ) ;
2022-06-04 00:45:44 +03:00
}
2022-05-29 04:23:58 +03:00
2022-06-04 00:45:44 +03:00
return Ok ( response ) ;
}
Err ( e ) = > {
warn! ( ? self , ? e , " Backend server error! " ) ;
2022-05-29 04:23:58 +03:00
2022-06-04 00:45:44 +03:00
// TODO: sleep how long? until synced_connections changes or rate limits are available
sleep ( Duration ::from_millis ( 100 ) ) . await ;
// TODO: when we retry, depending on the error, skip using this same server
// for example, if rpc isn't available on this server, retrying is bad
// but if an http error, retrying on same is probably fine
continue ;
}
}
2022-05-29 04:23:58 +03:00
}
Err ( None ) = > {
warn! ( ? self , " No servers in sync! " ) ;
2022-07-02 04:20:28 +03:00
// TODO: subscribe to something on synced connections. maybe it should just be a watch channel
2022-06-03 00:47:43 +03:00
sleep ( Duration ::from_millis ( 200 ) ) . await ;
continue ;
// return Err(anyhow::anyhow!("no servers in sync"));
2022-05-29 04:23:58 +03:00
}
Err ( Some ( retry_after ) ) = > {
// TODO: move this to a helper function
// sleep (TODO: with a lock?) until our rate limits should be available
// TODO: if a server catches up sync while we are waiting, we could stop waiting
warn! ( ? retry_after , " All rate limits exceeded. Sleeping " ) ;
sleep ( retry_after ) . await ;
continue ;
}
}
}
2022-07-02 04:20:28 +03:00
Err ( anyhow ::anyhow! ( " all retries exhausted " ) )
2022-05-29 04:23:58 +03:00
}
2022-07-02 04:20:28 +03:00
/// be sure there is a timeout on this or it might loop forever
2022-05-28 07:26:24 +03:00
pub async fn try_send_all_upstream_servers (
& self ,
request : JsonRpcRequest ,
2022-07-09 05:23:26 +03:00
archive_needed : bool ,
2022-05-28 07:26:24 +03:00
) -> anyhow ::Result < JsonRpcForwardedResponse > {
loop {
2022-07-09 06:34:39 +03:00
match self . get_upstream_servers ( archive_needed ) . await {
2022-05-28 07:26:24 +03:00
Ok ( active_request_handles ) = > {
// TODO: benchmark this compared to waiting on unbounded futures
// TODO: do something with this handle?
// TODO: this is not working right. simplify
let quorum_response = self
. try_send_parallel_requests (
active_request_handles ,
request . method . as_ref ( ) ,
2022-07-08 22:01:11 +03:00
request . params . as_ref ( ) ,
2022-05-28 07:26:24 +03:00
)
. await ? ;
let response = JsonRpcForwardedResponse {
jsonrpc : " 2.0 " . to_string ( ) ,
id : request . id ,
result : Some ( quorum_response ) ,
error : None ,
} ;
return Ok ( response ) ;
}
Err ( None ) = > {
2022-05-29 04:23:58 +03:00
warn! ( ? self , " No servers in sync! " ) ;
2022-05-28 07:26:24 +03:00
// TODO: i don't think this will ever happen
2022-06-03 00:47:43 +03:00
// TODO: return a 502? if it does?
// return Err(anyhow::anyhow!("no available rpcs!"));
// TODO: sleep how long?
2022-07-16 03:08:22 +03:00
// TODO: subscribe to something in SyncedConnections instead
2022-06-03 00:47:43 +03:00
sleep ( Duration ::from_millis ( 200 ) ) . await ;
continue ;
2022-05-28 07:26:24 +03:00
}
Err ( Some ( retry_after ) ) = > {
// TODO: move this to a helper function
// sleep (TODO: with a lock?) until our rate limits should be available
// TODO: if a server catches up sync while we are waiting, we could stop waiting
2022-06-03 00:47:43 +03:00
warn! ( " All rate limits exceeded. Sleeping " ) ;
2022-05-28 07:26:24 +03:00
sleep ( retry_after ) . await ;
2022-06-03 00:47:43 +03:00
continue ;
2022-05-28 07:26:24 +03:00
}
}
}
}
2022-05-05 22:07:09 +03:00
}
2022-07-16 07:13:02 +03:00
mod tests {
#[ test ]
fn test_false_before_true ( ) {
let mut x = [ true , false , true ] ;
x . sort ( ) ;
assert_eq! ( x , [ false , true , true ] )
}
}