2022-05-12 02:50:52 +03:00
///! Load balanced communication with a group of web3 providers
2022-08-24 02:56:47 +03:00
use super ::SyncedConnections ;
use super ::{ ActiveRequestHandle , HandleResult , Web3Connection } ;
use crate ::app ::{ flatten_handle , AnyhowJoinHandle , TxState } ;
use crate ::config ::Web3ConnectionConfig ;
use crate ::jsonrpc ::{ JsonRpcForwardedResponse , JsonRpcRequest } ;
2022-05-18 23:18:01 +03:00
use arc_swap ::ArcSwap ;
2022-05-28 07:26:24 +03:00
use counter ::Counter ;
2022-06-14 09:42:52 +03:00
use dashmap ::DashMap ;
2022-05-05 22:07:09 +03:00
use derive_more ::From ;
2022-08-24 02:56:47 +03:00
use ethers ::prelude ::{ Block , ProviderError , Transaction , TxHash , H256 , U64 } ;
2022-07-19 07:21:32 +03:00
use futures ::future ::{ join_all , try_join_all } ;
2022-05-05 22:07:09 +03:00
use futures ::stream ::FuturesUnordered ;
use futures ::StreamExt ;
2022-05-20 04:26:02 +03:00
use hashbrown ::HashMap ;
2022-08-24 02:56:47 +03:00
use indexmap ::IndexMap ;
2022-05-21 01:16:15 +03:00
use serde ::ser ::{ SerializeStruct , Serializer } ;
use serde ::Serialize ;
2022-05-05 22:07:09 +03:00
use serde_json ::value ::RawValue ;
use std ::cmp ;
2022-08-24 02:56:47 +03:00
use std ::cmp ::Reverse ;
2022-05-05 22:07:09 +03:00
use std ::fmt ;
use std ::sync ::Arc ;
2022-06-16 05:53:37 +03:00
use tokio ::sync ::{ broadcast , watch } ;
2022-05-17 19:23:27 +03:00
use tokio ::task ;
2022-08-07 09:48:57 +03:00
use tokio ::time ::{ interval , sleep , sleep_until , MissedTickBehavior } ;
use tokio ::time ::{ Duration , Instant } ;
2022-07-25 03:27:00 +03:00
use tracing ::{ debug , error , info , instrument , trace , warn } ;
2022-05-05 22:07:09 +03:00
/// A collection of web3 connections. Sends requests either the current best server or all servers.
#[ derive(From) ]
pub struct Web3Connections {
2022-08-24 02:56:47 +03:00
pub ( super ) conns : IndexMap < String , Arc < Web3Connection > > ,
pub ( super ) synced_connections : ArcSwap < SyncedConnections > ,
pub ( super ) pending_transactions : Arc < DashMap < TxHash , TxState > > ,
/// only includes blocks on the main chain.
/// TODO: this map is going to grow forever unless we do some sort of pruning. maybe store pruned in redis?
pub ( super ) chain_map : DashMap < U64 , Arc < Block < TxHash > > > ,
/// all blocks, including orphans
/// TODO: this map is going to grow forever unless we do some sort of pruning. maybe store pruned in redis?
pub ( super ) block_map : DashMap < H256 , Arc < Block < TxHash > > > ,
// TODO: petgraph? might help with pruning the maps
2022-05-05 22:07:09 +03:00
}
impl Web3Connections {
2022-08-24 02:56:47 +03:00
/// Spawn durable connections to multiple Web3 providers.
2022-06-14 07:04:14 +03:00
pub async fn spawn (
2022-07-19 07:21:32 +03:00
chain_id : u64 ,
2022-08-10 08:56:09 +03:00
server_configs : HashMap < String , Web3ConnectionConfig > ,
2022-07-19 07:21:32 +03:00
http_client : Option < reqwest ::Client > ,
2022-08-16 01:50:56 +03:00
redis_client_pool : Option < redis_rate_limit ::RedisPool > ,
2022-07-22 08:11:26 +03:00
head_block_sender : Option < watch ::Sender < Arc < Block < TxHash > > > > ,
2022-06-16 05:53:37 +03:00
pending_tx_sender : Option < broadcast ::Sender < TxState > > ,
2022-06-16 20:51:49 +03:00
pending_transactions : Arc < DashMap < TxHash , TxState > > ,
2022-06-14 08:43:28 +03:00
) -> anyhow ::Result < ( Arc < Self > , AnyhowJoinHandle < ( ) > ) > {
let ( pending_tx_id_sender , pending_tx_id_receiver ) = flume ::unbounded ( ) ;
2022-07-22 08:11:26 +03:00
let ( block_sender , block_receiver ) =
flume ::unbounded ::< ( Arc < Block < H256 > > , Arc < Web3Connection > ) > ( ) ;
2022-05-05 22:07:09 +03:00
2022-06-29 22:15:05 +03:00
let http_interval_sender = if http_client . is_some ( ) {
let ( sender , receiver ) = broadcast ::channel ( 1 ) ;
drop ( receiver ) ;
2022-07-19 07:21:32 +03:00
// TODO: what interval? follow a websocket also? maybe by watching synced connections with a timeout. will need debounce
2022-06-29 22:15:05 +03:00
let mut interval = interval ( Duration ::from_secs ( 13 ) ) ;
interval . set_missed_tick_behavior ( MissedTickBehavior ::Delay ) ;
let sender = Arc ::new ( sender ) ;
let f = {
let sender = sender . clone ( ) ;
async move {
loop {
// TODO: every time a head_block arrives (maybe with a small delay), or on the interval.
interval . tick ( ) . await ;
2022-07-16 08:25:01 +03:00
trace! ( " http interval ready " ) ;
2022-07-16 08:21:08 +03:00
2022-06-29 22:15:05 +03:00
// errors are okay. they mean that all receivers have been dropped
let _ = sender . send ( ( ) ) ;
}
}
} ;
// TODO: do something with this handle?
tokio ::spawn ( f ) ;
Some ( sender )
} else {
None
} ;
2022-07-19 07:21:32 +03:00
// turn configs into connections (in parallel)
2022-08-12 22:07:14 +03:00
// TODO: move this into a helper function. then we can use it when configs change (will need a remove function too)
2022-07-19 07:21:32 +03:00
let spawn_handles : Vec < _ > = server_configs
. into_iter ( )
2022-08-10 08:56:09 +03:00
. map ( | ( server_name , server_config ) | {
2022-07-19 07:21:32 +03:00
let http_client = http_client . clone ( ) ;
let redis_client_pool = redis_client_pool . clone ( ) ;
let http_interval_sender = http_interval_sender . clone ( ) ;
2022-08-11 00:52:28 +03:00
let block_sender = if head_block_sender . is_some ( ) {
Some ( block_sender . clone ( ) )
} else {
None
} ;
2022-07-19 07:21:32 +03:00
let pending_tx_id_sender = Some ( pending_tx_id_sender . clone ( ) ) ;
tokio ::spawn ( async move {
server_config
. spawn (
2022-08-10 08:56:09 +03:00
server_name ,
2022-07-19 07:21:32 +03:00
redis_client_pool ,
chain_id ,
http_client ,
http_interval_sender ,
block_sender ,
pending_tx_id_sender ,
)
. await
} )
} )
. collect ( ) ;
2022-07-25 03:27:00 +03:00
let mut connections = IndexMap ::new ( ) ;
2022-07-19 07:21:32 +03:00
let mut handles = vec! [ ] ;
// TODO: futures unordered?
for x in join_all ( spawn_handles ) . await {
// TODO: how should we handle errors here? one rpc being down shouldn't cause the program to exit
match x {
Ok ( Ok ( ( connection , handle ) ) ) = > {
2022-08-10 08:56:09 +03:00
connections . insert ( connection . url . clone ( ) , connection ) ;
2022-07-19 07:21:32 +03:00
handles . push ( handle ) ;
}
Ok ( Err ( err ) ) = > {
// TODO: some of these are probably retry-able
error! ( ? err ) ;
}
Err ( err ) = > {
return Err ( err . into ( ) ) ;
2022-06-14 08:43:28 +03:00
}
2022-05-06 00:38:15 +03:00
}
2022-05-05 22:07:09 +03:00
}
2022-05-22 02:49:23 +03:00
// TODO: less than 3? what should we do here?
2022-05-16 01:02:14 +03:00
if connections . len ( ) < 2 {
2022-05-22 03:34:33 +03:00
warn! ( " Only {} connection(s)! " , connections . len ( ) ) ;
2022-05-16 01:02:14 +03:00
}
2022-05-15 22:28:22 +03:00
2022-05-20 05:01:02 +03:00
let synced_connections = SyncedConnections ::default ( ) ;
2022-05-16 01:02:14 +03:00
let connections = Arc ::new ( Self {
2022-07-25 03:27:00 +03:00
conns : connections ,
2022-05-18 23:18:01 +03:00
synced_connections : ArcSwap ::new ( Arc ::new ( synced_connections ) ) ,
2022-06-16 20:51:49 +03:00
pending_transactions ,
2022-08-24 02:56:47 +03:00
chain_map : Default ::default ( ) ,
block_map : Default ::default ( ) ,
2022-05-16 01:02:14 +03:00
} ) ;
2022-06-14 07:04:14 +03:00
let handle = {
let connections = connections . clone ( ) ;
tokio ::spawn ( async move {
2022-06-16 20:51:49 +03:00
// TODO: try_join_all with the other handles here
2022-06-14 07:04:14 +03:00
connections
2022-06-14 08:43:28 +03:00
. subscribe (
pending_tx_id_receiver ,
block_receiver ,
head_block_sender ,
pending_tx_sender ,
)
2022-06-14 07:04:14 +03:00
. await
} )
} ;
2022-06-14 08:43:28 +03:00
Ok ( ( connections , handle ) )
2022-05-18 19:35:06 +03:00
}
2022-06-16 20:51:49 +03:00
async fn _funnel_transaction (
& self ,
rpc : Arc < Web3Connection > ,
pending_tx_id : TxHash ,
2022-06-17 01:23:41 +03:00
) -> Result < Option < TxState > , ProviderError > {
2022-06-16 20:51:49 +03:00
// TODO: yearn devs have had better luck with batching these, but i think that's likely just adding a delay itself
2022-07-16 02:59:34 +03:00
// TODO: there is a race here on geth. sometimes the rpc isn't yet ready to serve the transaction (even though they told us about it!)
2022-06-16 23:57:48 +03:00
// TODO: maximum wait time
2022-06-17 01:23:41 +03:00
let pending_transaction : Transaction = match rpc . try_request_handle ( ) . await {
2022-08-07 09:48:57 +03:00
Ok ( HandleResult ::ActiveRequest ( handle ) ) = > {
handle
2022-06-17 01:23:41 +03:00
. request ( " eth_getTransactionByHash " , ( pending_tx_id , ) )
. await ?
}
2022-08-07 09:48:57 +03:00
Ok ( _ ) = > {
// TODO: actually retry?
return Ok ( None ) ;
}
2022-06-17 01:23:41 +03:00
Err ( err ) = > {
trace! (
? pending_tx_id ,
? rpc ,
? err ,
" cancelled funneling transaction "
) ;
return Ok ( None ) ;
}
} ;
2022-06-16 20:51:49 +03:00
trace! ( ? pending_transaction , " pending " ) ;
match & pending_transaction . block_hash {
Some ( _block_hash ) = > {
// the transaction is already confirmed. no need to save in the pending_transactions map
2022-06-17 01:23:41 +03:00
Ok ( Some ( TxState ::Confirmed ( pending_transaction ) ) )
2022-06-16 20:51:49 +03:00
}
2022-06-17 01:23:41 +03:00
None = > Ok ( Some ( TxState ::Pending ( pending_transaction ) ) ) ,
2022-06-16 20:51:49 +03:00
}
}
2022-07-23 03:19:13 +03:00
/// dedupe transaction and send them to any listening clients
2022-06-16 20:51:49 +03:00
async fn funnel_transaction (
2022-06-16 05:53:37 +03:00
self : Arc < Self > ,
rpc : Arc < Web3Connection > ,
pending_tx_id : TxHash ,
pending_tx_sender : broadcast ::Sender < TxState > ,
) -> anyhow ::Result < ( ) > {
2022-06-16 20:51:49 +03:00
// TODO: how many retries? until some timestamp is hit is probably better. maybe just loop and call this with a timeout
2022-06-16 23:57:48 +03:00
// TODO: after more investigation, i don't think retries will help. i think this is because chains of transactions get dropped from memory
// TODO: also check the "confirmed transactions" mapping? maybe one shared mapping with TxState in it?
2022-06-16 20:51:49 +03:00
2022-06-18 10:06:54 +03:00
if pending_tx_sender . receiver_count ( ) = = 0 {
// no receivers, so no point in querying to get the full transaction
2022-06-16 23:57:48 +03:00
return Ok ( ( ) ) ;
}
2022-06-16 05:53:37 +03:00
2022-07-16 08:21:08 +03:00
trace! ( ? pending_tx_id , " checking pending_transactions on {} " , rpc ) ;
2022-06-18 10:06:54 +03:00
if self . pending_transactions . contains_key ( & pending_tx_id ) {
// this transaction has already been processed
2022-06-16 23:57:48 +03:00
return Ok ( ( ) ) ;
}
// query the rpc for this transaction
// it is possible that another rpc is also being queried. thats fine. we want the fastest response
match self . _funnel_transaction ( rpc . clone ( ) , pending_tx_id ) . await {
2022-06-17 01:23:41 +03:00
Ok ( Some ( tx_state ) ) = > {
2022-06-16 20:51:49 +03:00
let _ = pending_tx_sender . send ( tx_state ) ;
2022-06-16 05:53:37 +03:00
2022-06-16 23:57:48 +03:00
trace! ( ? pending_tx_id , " sent " ) ;
2022-06-16 05:53:37 +03:00
2022-06-16 23:57:48 +03:00
// we sent the transaction. return now. don't break looping because that gives a warning
2022-06-16 20:51:49 +03:00
return Ok ( ( ) ) ;
2022-06-16 05:53:37 +03:00
}
2022-06-17 01:23:41 +03:00
Ok ( None ) = > { }
2022-06-16 23:57:48 +03:00
Err ( err ) = > {
trace! ( ? err , ? pending_tx_id , " failed fetching transaction " ) ;
// unable to update the entry. sleep and try again soon
// TODO: retry with exponential backoff with jitter starting from a much smaller time
// sleep(Duration::from_millis(100)).await;
}
2022-06-16 05:53:37 +03:00
}
2022-06-16 23:57:48 +03:00
// warn is too loud. this is somewhat common
// "There is a Pending txn with a lower account nonce. This txn can only be executed after confirmation of the earlier Txn Hash#"
// sometimes it's been pending for many hours
// sometimes it's maybe something else?
debug! ( ? pending_tx_id , " not found on {} " , rpc ) ;
2022-06-16 05:53:37 +03:00
Ok ( ( ) )
}
2022-07-22 22:30:39 +03:00
/// subscribe to blocks and transactions from all the backend rpcs.
/// blocks are processed by all the `Web3Connection`s and then sent to the `block_receiver`
/// transaction ids from all the `Web3Connection`s are deduplicated and forwarded to `pending_tx_sender`
2022-06-14 07:04:14 +03:00
async fn subscribe (
self : Arc < Self > ,
2022-06-14 08:43:28 +03:00
pending_tx_id_receiver : flume ::Receiver < ( TxHash , Arc < Web3Connection > ) > ,
2022-07-22 08:11:26 +03:00
block_receiver : flume ::Receiver < ( Arc < Block < TxHash > > , Arc < Web3Connection > ) > ,
head_block_sender : Option < watch ::Sender < Arc < Block < TxHash > > > > ,
2022-06-16 05:53:37 +03:00
pending_tx_sender : Option < broadcast ::Sender < TxState > > ,
2022-06-14 07:04:14 +03:00
) -> anyhow ::Result < ( ) > {
2022-06-16 05:53:37 +03:00
let mut futures = vec! [ ] ;
2022-05-18 19:35:06 +03:00
2022-06-14 08:43:28 +03:00
// setup the transaction funnel
2022-06-14 07:04:14 +03:00
// it skips any duplicates (unless they are being orphaned)
// fetches new transactions from the notifying rpc
// forwards new transacitons to pending_tx_receipt_sender
2022-06-15 01:02:18 +03:00
if let Some ( pending_tx_sender ) = pending_tx_sender . clone ( ) {
2022-06-14 07:04:14 +03:00
// TODO: do something with the handle so we can catch any errors
2022-06-14 09:42:52 +03:00
let clone = self . clone ( ) ;
2022-06-14 07:04:14 +03:00
let handle = task ::spawn ( async move {
2022-06-16 05:53:37 +03:00
while let Ok ( ( pending_tx_id , rpc ) ) = pending_tx_id_receiver . recv_async ( ) . await {
2022-06-16 20:51:49 +03:00
let f = clone . clone ( ) . funnel_transaction (
2022-06-16 05:53:37 +03:00
rpc ,
pending_tx_id ,
pending_tx_sender . clone ( ) ,
) ;
tokio ::spawn ( f ) ;
2022-06-14 07:04:14 +03:00
}
Ok ( ( ) )
} ) ;
2022-06-16 05:53:37 +03:00
futures . push ( flatten_handle ( handle ) ) ;
2022-06-14 07:04:14 +03:00
}
2022-06-14 08:43:28 +03:00
// setup the block funnel
2022-06-14 07:04:14 +03:00
if let Some ( head_block_sender ) = head_block_sender {
let connections = Arc ::clone ( & self ) ;
2022-06-15 01:02:18 +03:00
let pending_tx_sender = pending_tx_sender . clone ( ) ;
2022-05-18 19:35:06 +03:00
let handle = task ::Builder ::default ( )
2022-06-14 07:04:14 +03:00
. name ( " update_synced_rpcs " )
2022-05-17 19:23:27 +03:00
. spawn ( async move {
2022-06-14 07:04:14 +03:00
connections
2022-06-15 01:02:18 +03:00
. update_synced_rpcs ( block_receiver , head_block_sender , pending_tx_sender )
2022-05-17 19:23:27 +03:00
. await
} ) ;
2022-05-18 19:35:06 +03:00
2022-06-16 05:53:37 +03:00
futures . push ( flatten_handle ( handle ) ) ;
2022-05-16 01:02:14 +03:00
}
2022-06-14 08:43:28 +03:00
if futures . is_empty ( ) {
// no transaction or block subscriptions.
2022-08-11 00:29:50 +03:00
// todo!("every second, check that the provider is still connected");?
let handle = task ::Builder ::default ( ) . name ( " noop " ) . spawn ( async move {
loop {
sleep ( Duration ::from_secs ( 600 ) ) . await ;
}
} ) ;
futures . push ( flatten_handle ( handle ) ) ;
2022-06-14 08:43:28 +03:00
}
2022-06-16 05:53:37 +03:00
if let Err ( e ) = try_join_all ( futures ) . await {
2022-06-16 20:51:49 +03:00
error! ( " subscriptions over: {:?} " , self ) ;
2022-06-16 05:53:37 +03:00
return Err ( e ) ;
2022-06-14 07:04:14 +03:00
}
2022-05-18 19:35:06 +03:00
2022-06-14 08:43:28 +03:00
info! ( " subscriptions over: {:?} " , self ) ;
2022-06-14 07:04:14 +03:00
Ok ( ( ) )
2022-05-05 22:07:09 +03:00
}
2022-05-28 07:26:24 +03:00
/// Send the same request to all the handles. Returning the most common success or most common error.
2022-05-17 03:56:56 +03:00
#[ instrument(skip_all) ]
2022-05-12 02:50:52 +03:00
pub async fn try_send_parallel_requests (
2022-05-28 07:26:24 +03:00
& self ,
2022-05-12 02:50:52 +03:00
active_request_handles : Vec < ActiveRequestHandle > ,
2022-05-28 07:26:24 +03:00
method : & str ,
// TODO: remove this box once i figure out how to do the options
2022-07-08 22:01:11 +03:00
params : Option < & serde_json ::Value > ,
2022-05-28 07:26:24 +03:00
) -> Result < Box < RawValue > , ProviderError > {
2022-05-12 06:54:42 +03:00
// TODO: if only 1 active_request_handles, do self.try_send_request
2022-05-05 22:07:09 +03:00
2022-05-28 07:26:24 +03:00
let responses = active_request_handles
. into_iter ( )
. map ( | active_request_handle | async move {
let result : Result < Box < RawValue > , _ > =
active_request_handle . request ( method , params ) . await ;
result
} )
. collect ::< FuturesUnordered < _ > > ( )
. collect ::< Vec < Result < Box < RawValue > , ProviderError > > > ( )
. await ;
2022-05-28 21:45:45 +03:00
// TODO: Strings are not great keys, but we can't use RawValue or ProviderError as keys because they don't implement Hash or Eq
2022-05-28 07:26:24 +03:00
let mut count_map : HashMap < String , Result < Box < RawValue > , ProviderError > > = HashMap ::new ( ) ;
let mut counts : Counter < String > = Counter ::new ( ) ;
let mut any_ok = false ;
for response in responses {
let s = format! ( " {:?} " , response ) ;
if count_map . get ( & s ) . is_none ( ) {
if response . is_ok ( ) {
any_ok = true ;
}
2022-05-17 19:23:27 +03:00
2022-05-28 07:26:24 +03:00
count_map . insert ( s . clone ( ) , response ) ;
}
2022-05-05 22:07:09 +03:00
2022-05-28 07:26:24 +03:00
counts . update ( [ s ] . into_iter ( ) ) ;
2022-05-05 22:07:09 +03:00
}
2022-05-28 07:26:24 +03:00
for ( most_common , _ ) in counts . most_common_ordered ( ) {
let most_common = count_map . remove ( & most_common ) . unwrap ( ) ;
if any_ok & & most_common . is_err ( ) {
// errors were more common, but we are going to skip them because we got an okay
continue ;
} else {
// return the most common
return most_common ;
2022-05-05 22:07:09 +03:00
}
}
2022-05-28 07:26:24 +03:00
// TODO: what should we do if we get here? i don't think we will
panic! ( " i don't think this is possible " )
2022-05-05 22:07:09 +03:00
}
2022-08-11 00:29:50 +03:00
/// TODO: move parts of this onto SyncedConnections? it needs to be simpler
2022-05-20 05:16:48 +03:00
// we don't instrument here because we put a span inside the while loop
2022-05-16 01:02:14 +03:00
async fn update_synced_rpcs (
2022-05-15 09:27:13 +03:00
& self ,
2022-07-22 08:11:26 +03:00
block_receiver : flume ::Receiver < ( Arc < Block < TxHash > > , Arc < Web3Connection > ) > ,
2022-08-24 02:56:47 +03:00
// TODO: head_block_sender should be a broadcast_sender like pending_tx_sender
2022-07-22 08:11:26 +03:00
head_block_sender : watch ::Sender < Arc < Block < TxHash > > > ,
2022-06-16 05:53:37 +03:00
pending_tx_sender : Option < broadcast ::Sender < TxState > > ,
2022-05-15 09:27:13 +03:00
) -> anyhow ::Result < ( ) > {
2022-07-25 03:27:00 +03:00
// TODO: indexmap or hashmap? what hasher? with_capacity?
2022-08-11 00:29:50 +03:00
// TODO: this will grow unbounded. prune old heads automatically
2022-07-25 03:27:00 +03:00
let mut connection_heads = IndexMap ::< String , Arc < Block < TxHash > > > ::new ( ) ;
2022-05-18 23:18:01 +03:00
2022-06-14 07:04:14 +03:00
while let Ok ( ( new_block , rpc ) ) = block_receiver . recv_async ( ) . await {
2022-08-24 02:56:47 +03:00
self . recv_block_from_rpc (
& mut connection_heads ,
new_block ,
rpc ,
& head_block_sender ,
& pending_tx_sender ,
)
. await ? ;
2022-05-06 08:44:30 +03:00
}
2022-05-05 22:07:09 +03:00
2022-05-18 19:35:06 +03:00
// TODO: if there was an error, we should return it
warn! ( " block_receiver exited! " ) ;
2022-05-05 22:07:09 +03:00
Ok ( ( ) )
}
/// get the best available rpc server
2022-05-17 03:56:56 +03:00
#[ instrument(skip_all) ]
2022-07-02 04:20:28 +03:00
pub async fn next_upstream_server (
& self ,
skip : & [ Arc < Web3Connection > ] ,
2022-07-22 22:30:39 +03:00
min_block_needed : Option < & U64 > ,
2022-08-07 09:48:57 +03:00
) -> anyhow ::Result < HandleResult > {
let mut earliest_retry_at = None ;
2022-05-05 22:07:09 +03:00
2022-07-16 02:59:34 +03:00
// filter the synced rpcs
2022-07-19 04:31:12 +03:00
// TODO: we are going to be checking "has_block_data" a lot now. i think we pretty much always have min_block_needed now that we override "latest"
let mut synced_rpcs : Vec < Arc < Web3Connection > > =
if let Some ( min_block_needed ) = min_block_needed {
// TODO: this includes ALL archive servers. but we only want them if they are on a somewhat recent block
// TODO: maybe instead of "archive_needed" bool it should be the minimum height. then even delayed servers might be fine. will need to track all heights then
2022-07-25 03:27:00 +03:00
self . conns
. values ( )
2022-07-19 04:31:12 +03:00
. filter ( | x | x . has_block_data ( min_block_needed ) )
. filter ( | x | ! skip . contains ( x ) )
. cloned ( )
. collect ( )
} else {
self . synced_connections
. load ( )
2022-07-25 03:27:00 +03:00
. conns
2022-07-19 04:31:12 +03:00
. iter ( )
. filter ( | x | ! skip . contains ( x ) )
. cloned ( )
. collect ( )
} ;
2022-05-05 22:07:09 +03:00
2022-07-02 04:20:28 +03:00
if synced_rpcs . is_empty ( ) {
2022-08-07 09:48:57 +03:00
// TODO: what should happen here? might be nicer to retry in a second
return Err ( anyhow ::anyhow! ( " not synced " ) ) ;
2022-07-02 04:20:28 +03:00
}
2022-07-16 07:13:02 +03:00
let sort_cache : HashMap < _ , _ > = synced_rpcs
2022-05-05 22:07:09 +03:00
. iter ( )
2022-06-14 07:04:14 +03:00
. map ( | rpc | {
2022-07-19 04:31:12 +03:00
// TODO: get active requests and the soft limit out of redis?
2022-08-10 08:56:09 +03:00
let weight = rpc . weight ;
2022-05-18 23:18:01 +03:00
let active_requests = rpc . active_requests ( ) ;
2022-08-10 08:56:09 +03:00
let soft_limit = rpc . soft_limit ;
2022-05-06 23:44:12 +03:00
let utilization = active_requests as f32 / soft_limit as f32 ;
2022-07-19 04:31:12 +03:00
// TODO: double check this sorts how we want
2022-08-08 22:57:54 +03:00
( rpc . clone ( ) , ( weight , utilization , Reverse ( soft_limit ) ) )
2022-05-05 22:07:09 +03:00
} )
. collect ( ) ;
2022-06-14 07:04:14 +03:00
synced_rpcs . sort_unstable_by ( | a , b | {
2022-07-16 07:13:02 +03:00
let a_sorts = sort_cache . get ( a ) . unwrap ( ) ;
let b_sorts = sort_cache . get ( b ) . unwrap ( ) ;
2022-05-06 23:44:12 +03:00
// TODO: i'm comparing floats. crap
2022-07-16 07:13:02 +03:00
a_sorts . partial_cmp ( b_sorts ) . unwrap_or ( cmp ::Ordering ::Equal )
2022-05-05 22:07:09 +03:00
} ) ;
2022-05-19 06:00:54 +03:00
// now that the rpcs are sorted, try to get an active request handle for one of them
2022-06-14 07:04:14 +03:00
for rpc in synced_rpcs . into_iter ( ) {
2022-05-05 22:07:09 +03:00
// increment our connection counter
2022-05-22 02:34:05 +03:00
match rpc . try_request_handle ( ) . await {
2022-08-07 09:48:57 +03:00
Ok ( HandleResult ::ActiveRequest ( handle ) ) = > {
2022-06-14 07:04:14 +03:00
trace! ( " next server on {:?}: {:?} " , self , rpc ) ;
2022-08-07 09:48:57 +03:00
return Ok ( HandleResult ::ActiveRequest ( handle ) ) ;
}
Ok ( HandleResult ::RetryAt ( retry_at ) ) = > {
earliest_retry_at = earliest_retry_at . min ( Some ( retry_at ) ) ;
}
Ok ( HandleResult ::None ) = > {
// TODO: log a warning?
}
Err ( err ) = > {
// TODO: log a warning?
warn! ( ? err , " No request handle for {} " , rpc )
2022-05-06 23:44:12 +03:00
}
2022-05-05 22:07:09 +03:00
}
}
2022-08-07 09:48:57 +03:00
warn! ( " no servers on {:?}! {:?} " , self , earliest_retry_at ) ;
2022-05-06 23:44:12 +03:00
2022-08-07 09:48:57 +03:00
match earliest_retry_at {
None = > todo! ( ) ,
Some ( earliest_retry_at ) = > Ok ( HandleResult ::RetryAt ( earliest_retry_at ) ) ,
}
2022-05-05 22:07:09 +03:00
}
/// get all rpc servers that are not rate limited
2022-05-13 23:50:11 +03:00
/// returns servers even if they aren't in sync. This is useful for broadcasting signed transactions
2022-08-07 09:48:57 +03:00
// TODO: better type on this that can return an anyhow::Result
2022-08-04 01:23:10 +03:00
pub async fn upstream_servers (
2022-07-09 06:34:39 +03:00
& self ,
2022-07-22 22:30:39 +03:00
min_block_needed : Option < & U64 > ,
2022-08-07 09:48:57 +03:00
) -> Result < Vec < ActiveRequestHandle > , Option < Instant > > {
let mut earliest_retry_at = None ;
2022-05-05 22:07:09 +03:00
// TODO: with capacity?
let mut selected_rpcs = vec! [ ] ;
2022-07-25 03:27:00 +03:00
for connection in self . conns . values ( ) {
2022-07-19 04:31:12 +03:00
if let Some ( min_block_needed ) = min_block_needed {
if ! connection . has_block_data ( min_block_needed ) {
continue ;
}
2022-07-09 07:25:59 +03:00
}
2022-05-05 22:07:09 +03:00
// check rate limits and increment our connection counter
2022-05-22 02:34:05 +03:00
match connection . try_request_handle ( ) . await {
2022-08-07 09:48:57 +03:00
Ok ( HandleResult ::RetryAt ( retry_at ) ) = > {
2022-05-06 07:29:25 +03:00
// this rpc is not available. skip it
2022-08-07 09:48:57 +03:00
earliest_retry_at = earliest_retry_at . min ( Some ( retry_at ) ) ;
}
Ok ( HandleResult ::ActiveRequest ( handle ) ) = > selected_rpcs . push ( handle ) ,
Ok ( HandleResult ::None ) = > {
warn! ( " no request handle for {} " , connection )
}
Err ( err ) = > {
warn! ( ? err , " error getting request handle for {} " , connection )
2022-05-06 07:29:25 +03:00
}
2022-05-05 22:07:09 +03:00
}
}
if ! selected_rpcs . is_empty ( ) {
return Ok ( selected_rpcs ) ;
}
2022-05-22 02:34:05 +03:00
// return the earliest retry_after (if no rpcs are synced, this will be None)
2022-08-07 09:48:57 +03:00
Err ( earliest_retry_at )
2022-05-05 22:07:09 +03:00
}
2022-05-28 07:26:24 +03:00
2022-05-29 04:23:58 +03:00
/// be sure there is a timeout on this or it might loop forever
pub async fn try_send_best_upstream_server (
& self ,
request : JsonRpcRequest ,
2022-07-22 22:30:39 +03:00
min_block_needed : Option < & U64 > ,
2022-05-29 04:23:58 +03:00
) -> anyhow ::Result < JsonRpcForwardedResponse > {
2022-07-02 04:20:28 +03:00
let mut skip_rpcs = vec! [ ] ;
// TODO: maximum retries?
2022-05-29 04:23:58 +03:00
loop {
2022-07-25 03:27:00 +03:00
if skip_rpcs . len ( ) = = self . conns . len ( ) {
2022-07-02 04:20:28 +03:00
break ;
}
2022-07-19 04:31:12 +03:00
match self
. next_upstream_server ( & skip_rpcs , min_block_needed )
. await
{
2022-08-07 09:48:57 +03:00
Ok ( HandleResult ::ActiveRequest ( active_request_handle ) ) = > {
2022-07-02 04:20:28 +03:00
// save the rpc in case we get an error and want to retry on another server
skip_rpcs . push ( active_request_handle . clone_connection ( ) ) ;
2022-05-29 04:23:58 +03:00
let response_result = active_request_handle
. request ( & request . method , & request . params )
. await ;
2022-07-02 04:20:28 +03:00
match JsonRpcForwardedResponse ::try_from_response_result (
2022-06-04 00:45:44 +03:00
response_result ,
request . id . clone ( ) ,
) {
Ok ( response ) = > {
2022-07-02 04:20:28 +03:00
if let Some ( error ) = & response . error {
trace! ( ? response , " rpc error " ) ;
2022-07-09 07:25:59 +03:00
// some errors should be retried on other nodes
if error . code = = - 32000 {
let error_msg = error . message . as_str ( ) ;
// TODO: regex?
let retry_prefixes = [
2022-07-02 04:20:28 +03:00
" header not found " ,
" header for hash not found " ,
2022-07-09 07:25:59 +03:00
" missing trie node " ,
2022-07-02 04:20:28 +03:00
" node not started " ,
" RPC timeout " ,
2022-07-09 07:25:59 +03:00
] ;
for retry_prefix in retry_prefixes {
if error_msg . starts_with ( retry_prefix ) {
continue ;
}
}
2022-07-02 04:20:28 +03:00
}
2022-06-04 00:45:44 +03:00
} else {
2022-07-02 04:20:28 +03:00
trace! ( ? response , " rpc success " ) ;
2022-06-04 00:45:44 +03:00
}
2022-05-29 04:23:58 +03:00
2022-06-04 00:45:44 +03:00
return Ok ( response ) ;
}
Err ( e ) = > {
warn! ( ? self , ? e , " Backend server error! " ) ;
2022-05-29 04:23:58 +03:00
2022-06-04 00:45:44 +03:00
// TODO: sleep how long? until synced_connections changes or rate limits are available
sleep ( Duration ::from_millis ( 100 ) ) . await ;
// TODO: when we retry, depending on the error, skip using this same server
// for example, if rpc isn't available on this server, retrying is bad
// but if an http error, retrying on same is probably fine
continue ;
}
}
2022-05-29 04:23:58 +03:00
}
2022-08-07 09:48:57 +03:00
Ok ( HandleResult ::RetryAt ( retry_at ) ) = > {
// TODO: move this to a helper function
// sleep (TODO: with a lock?) until our rate limits should be available
// TODO: if a server catches up sync while we are waiting, we could stop waiting
warn! ( ? retry_at , " All rate limits exceeded. Sleeping " ) ;
2022-05-29 04:23:58 +03:00
2022-08-07 09:48:57 +03:00
sleep_until ( retry_at ) . await ;
2022-06-03 00:47:43 +03:00
continue ;
2022-05-29 04:23:58 +03:00
}
2022-08-07 09:48:57 +03:00
Ok ( HandleResult ::None ) = > {
warn! ( ? self , " No server handles! " ) ;
2022-05-29 04:23:58 +03:00
2022-08-07 09:48:57 +03:00
// TODO: subscribe to something on synced connections. maybe it should just be a watch channel
sleep ( Duration ::from_millis ( 200 ) ) . await ;
2022-05-29 04:23:58 +03:00
continue ;
}
2022-08-07 09:48:57 +03:00
Err ( err ) = > {
return Err ( err ) ;
}
2022-05-29 04:23:58 +03:00
}
}
2022-07-02 04:20:28 +03:00
Err ( anyhow ::anyhow! ( " all retries exhausted " ) )
2022-05-29 04:23:58 +03:00
}
2022-07-02 04:20:28 +03:00
/// be sure there is a timeout on this or it might loop forever
2022-05-28 07:26:24 +03:00
pub async fn try_send_all_upstream_servers (
& self ,
request : JsonRpcRequest ,
2022-07-22 22:30:39 +03:00
min_block_needed : Option < & U64 > ,
2022-05-28 07:26:24 +03:00
) -> anyhow ::Result < JsonRpcForwardedResponse > {
loop {
2022-08-04 01:23:10 +03:00
match self . upstream_servers ( min_block_needed ) . await {
2022-05-28 07:26:24 +03:00
Ok ( active_request_handles ) = > {
// TODO: benchmark this compared to waiting on unbounded futures
// TODO: do something with this handle?
// TODO: this is not working right. simplify
let quorum_response = self
. try_send_parallel_requests (
active_request_handles ,
request . method . as_ref ( ) ,
2022-07-08 22:01:11 +03:00
request . params . as_ref ( ) ,
2022-05-28 07:26:24 +03:00
)
. await ? ;
let response = JsonRpcForwardedResponse {
jsonrpc : " 2.0 " . to_string ( ) ,
id : request . id ,
result : Some ( quorum_response ) ,
error : None ,
} ;
return Ok ( response ) ;
}
Err ( None ) = > {
2022-05-29 04:23:58 +03:00
warn! ( ? self , " No servers in sync! " ) ;
2022-05-28 07:26:24 +03:00
// TODO: i don't think this will ever happen
2022-06-03 00:47:43 +03:00
// TODO: return a 502? if it does?
// return Err(anyhow::anyhow!("no available rpcs!"));
// TODO: sleep how long?
2022-07-16 03:08:22 +03:00
// TODO: subscribe to something in SyncedConnections instead
2022-06-03 00:47:43 +03:00
sleep ( Duration ::from_millis ( 200 ) ) . await ;
continue ;
2022-05-28 07:26:24 +03:00
}
2022-08-07 09:48:57 +03:00
Err ( Some ( retry_at ) ) = > {
2022-05-28 07:26:24 +03:00
// TODO: move this to a helper function
// sleep (TODO: with a lock?) until our rate limits should be available
// TODO: if a server catches up sync while we are waiting, we could stop waiting
2022-06-03 00:47:43 +03:00
warn! ( " All rate limits exceeded. Sleeping " ) ;
2022-08-07 09:48:57 +03:00
sleep_until ( retry_at ) . await ;
2022-05-28 07:26:24 +03:00
2022-06-03 00:47:43 +03:00
continue ;
2022-05-28 07:26:24 +03:00
}
}
}
}
2022-05-05 22:07:09 +03:00
}
2022-07-16 07:13:02 +03:00
2022-08-24 02:56:47 +03:00
impl fmt ::Debug for Web3Connections {
2022-08-24 02:13:56 +03:00
fn fmt ( & self , f : & mut fmt ::Formatter < '_ > ) -> fmt ::Result {
// TODO: the default formatter takes forever to write. this is too quiet though
2022-08-24 02:56:47 +03:00
f . debug_struct ( " Web3Connections " )
. field ( " conns " , & self . conns )
2022-08-24 02:13:56 +03:00
. finish_non_exhaustive ( )
}
}
2022-08-24 02:56:47 +03:00
impl Serialize for Web3Connections {
fn serialize < S > ( & self , serializer : S ) -> Result < S ::Ok , S ::Error >
where
S : Serializer ,
{
let conns : Vec < & Web3Connection > = self . conns . iter ( ) . map ( | x | x . 1. as_ref ( ) ) . collect ( ) ;
let mut state = serializer . serialize_struct ( " Web3Connections " , 2 ) ? ;
state . serialize_field ( " conns " , & conns ) ? ;
state . serialize_field ( " synced_connections " , & * * self . synced_connections . load ( ) ) ? ;
state . end ( )
}
}