2022-05-12 02:50:52 +03:00
///! Load balanced communication with a group of web3 providers
2022-06-16 05:53:37 +03:00
use anyhow ::Context ;
2022-05-18 23:18:01 +03:00
use arc_swap ::ArcSwap ;
2022-05-28 07:26:24 +03:00
use counter ::Counter ;
2022-06-14 09:42:52 +03:00
use dashmap ::DashMap ;
2022-05-05 22:07:09 +03:00
use derive_more ::From ;
2022-08-07 23:44:56 +03:00
use ethers ::prelude ::{ Block , ProviderError , Transaction , TxHash , H256 , U256 , U64 } ;
2022-07-19 07:21:32 +03:00
use futures ::future ::{ join_all , try_join_all } ;
2022-05-05 22:07:09 +03:00
use futures ::stream ::FuturesUnordered ;
use futures ::StreamExt ;
2022-05-20 04:26:02 +03:00
use hashbrown ::HashMap ;
2022-07-22 08:11:26 +03:00
use indexmap ::{ IndexMap , IndexSet } ;
2022-08-07 23:44:56 +03:00
use std ::cmp ::Reverse ;
2022-07-19 09:41:04 +03:00
// use parking_lot::RwLock;
// use petgraph::graphmap::DiGraphMap;
2022-08-24 02:13:56 +03:00
use super ::SyncedConnections ;
use super ::{ ActiveRequestHandle , HandleResult , Web3Connection } ;
use crate ::app ::{ flatten_handle , AnyhowJoinHandle , TxState } ;
use crate ::config ::Web3ConnectionConfig ;
use crate ::jsonrpc ::{ JsonRpcForwardedResponse , JsonRpcRequest } ;
2022-05-21 01:16:15 +03:00
use serde ::ser ::{ SerializeStruct , Serializer } ;
use serde ::Serialize ;
2022-07-19 04:31:12 +03:00
use serde_json ::json ;
2022-05-05 22:07:09 +03:00
use serde_json ::value ::RawValue ;
use std ::cmp ;
use std ::fmt ;
use std ::sync ::Arc ;
2022-06-16 05:53:37 +03:00
use tokio ::sync ::{ broadcast , watch } ;
2022-05-17 19:23:27 +03:00
use tokio ::task ;
2022-08-07 09:48:57 +03:00
use tokio ::time ::{ interval , sleep , sleep_until , MissedTickBehavior } ;
use tokio ::time ::{ Duration , Instant } ;
2022-07-25 03:27:00 +03:00
use tracing ::{ debug , error , info , instrument , trace , warn } ;
2022-05-05 22:07:09 +03:00
2022-07-22 08:11:26 +03:00
#[ derive(Default) ]
pub struct BlockChain {
/// only includes blocks on the main chain.
chain_map : DashMap < U64 , Arc < Block < TxHash > > > ,
/// all blocks, including orphans
block_map : DashMap < H256 , Arc < Block < TxHash > > > ,
// TODO: petgraph?
}
impl BlockChain {
pub fn add_block ( & self , block : Arc < Block < TxHash > > , cannonical : bool ) {
let hash = block . hash . unwrap ( ) ;
if cannonical {
let num = block . number . unwrap ( ) ;
let entry = self . chain_map . entry ( num ) ;
let mut is_new = false ;
entry . or_insert_with ( | | {
is_new = true ;
block . clone ( )
} ) ;
if ! is_new {
return ;
}
}
self . block_map . entry ( hash ) . or_insert ( block ) ;
}
2022-08-04 01:23:10 +03:00
pub fn cannonical_block ( & self , num : & U64 ) -> Option < Arc < Block < TxHash > > > {
2022-07-22 08:11:26 +03:00
self . chain_map . get ( num ) . map ( | x | x . clone ( ) )
}
2022-08-04 01:23:10 +03:00
pub fn block ( & self , hash : & H256 ) -> Option < Arc < Block < TxHash > > > {
2022-07-22 08:11:26 +03:00
self . block_map . get ( hash ) . map ( | x | x . clone ( ) )
}
}
2022-05-05 22:07:09 +03:00
/// A collection of web3 connections. Sends requests either the current best server or all servers.
#[ derive(From) ]
pub struct Web3Connections {
2022-07-25 03:27:00 +03:00
conns : IndexMap < String , Arc < Web3Connection > > ,
2022-05-18 23:18:01 +03:00
synced_connections : ArcSwap < SyncedConnections > ,
2022-06-16 20:51:49 +03:00
pending_transactions : Arc < DashMap < TxHash , TxState > > ,
2022-07-19 09:41:04 +03:00
// TODO: i think chain is what we want, but i'm not sure how we'll use it yet
// TODO: this graph is going to grow forever unless we do some sort of pruning. maybe store pruned in redis?
// chain: Arc<RwLock<DiGraphMap<H256, Block<TxHash>>>>,
2022-07-22 08:11:26 +03:00
chain : BlockChain ,
2022-05-05 22:07:09 +03:00
}
2022-05-21 01:16:15 +03:00
impl Serialize for Web3Connections {
fn serialize < S > ( & self , serializer : S ) -> Result < S ::Ok , S ::Error >
where
S : Serializer ,
{
2022-07-25 03:27:00 +03:00
let conns : Vec < & Web3Connection > = self . conns . iter ( ) . map ( | x | x . 1. as_ref ( ) ) . collect ( ) ;
2022-05-21 01:16:15 +03:00
let mut state = serializer . serialize_struct ( " Web3Connections " , 2 ) ? ;
2022-07-25 03:27:00 +03:00
state . serialize_field ( " conns " , & conns ) ? ;
2022-05-21 01:16:15 +03:00
state . serialize_field ( " synced_connections " , & * * self . synced_connections . load ( ) ) ? ;
state . end ( )
}
}
2022-05-05 22:07:09 +03:00
impl fmt ::Debug for Web3Connections {
fn fmt ( & self , f : & mut fmt ::Formatter < '_ > ) -> fmt ::Result {
// TODO: the default formatter takes forever to write. this is too quiet though
f . debug_struct ( " Web3Connections " )
2022-07-25 03:27:00 +03:00
. field ( " conns " , & self . conns )
2022-05-05 22:07:09 +03:00
. finish_non_exhaustive ( )
}
}
impl Web3Connections {
2022-06-14 07:04:14 +03:00
// #[instrument(name = "spawn_Web3Connections", skip_all)]
pub async fn spawn (
2022-07-19 07:21:32 +03:00
chain_id : u64 ,
2022-08-10 08:56:09 +03:00
server_configs : HashMap < String , Web3ConnectionConfig > ,
2022-07-19 07:21:32 +03:00
http_client : Option < reqwest ::Client > ,
2022-08-16 01:50:56 +03:00
redis_client_pool : Option < redis_rate_limit ::RedisPool > ,
2022-07-22 08:11:26 +03:00
head_block_sender : Option < watch ::Sender < Arc < Block < TxHash > > > > ,
2022-06-16 05:53:37 +03:00
pending_tx_sender : Option < broadcast ::Sender < TxState > > ,
2022-06-16 20:51:49 +03:00
pending_transactions : Arc < DashMap < TxHash , TxState > > ,
2022-06-14 08:43:28 +03:00
) -> anyhow ::Result < ( Arc < Self > , AnyhowJoinHandle < ( ) > ) > {
let ( pending_tx_id_sender , pending_tx_id_receiver ) = flume ::unbounded ( ) ;
2022-07-22 08:11:26 +03:00
let ( block_sender , block_receiver ) =
flume ::unbounded ::< ( Arc < Block < H256 > > , Arc < Web3Connection > ) > ( ) ;
2022-05-05 22:07:09 +03:00
2022-06-29 22:15:05 +03:00
let http_interval_sender = if http_client . is_some ( ) {
let ( sender , receiver ) = broadcast ::channel ( 1 ) ;
drop ( receiver ) ;
2022-07-19 07:21:32 +03:00
// TODO: what interval? follow a websocket also? maybe by watching synced connections with a timeout. will need debounce
2022-06-29 22:15:05 +03:00
let mut interval = interval ( Duration ::from_secs ( 13 ) ) ;
interval . set_missed_tick_behavior ( MissedTickBehavior ::Delay ) ;
let sender = Arc ::new ( sender ) ;
let f = {
let sender = sender . clone ( ) ;
async move {
loop {
// TODO: every time a head_block arrives (maybe with a small delay), or on the interval.
interval . tick ( ) . await ;
2022-07-16 08:25:01 +03:00
trace! ( " http interval ready " ) ;
2022-07-16 08:21:08 +03:00
2022-06-29 22:15:05 +03:00
// errors are okay. they mean that all receivers have been dropped
let _ = sender . send ( ( ) ) ;
}
}
} ;
// TODO: do something with this handle?
tokio ::spawn ( f ) ;
Some ( sender )
} else {
None
} ;
2022-07-19 07:21:32 +03:00
// turn configs into connections (in parallel)
2022-08-12 22:07:14 +03:00
// TODO: move this into a helper function. then we can use it when configs change (will need a remove function too)
2022-07-19 07:21:32 +03:00
let spawn_handles : Vec < _ > = server_configs
. into_iter ( )
2022-08-10 08:56:09 +03:00
. map ( | ( server_name , server_config ) | {
2022-07-19 07:21:32 +03:00
let http_client = http_client . clone ( ) ;
let redis_client_pool = redis_client_pool . clone ( ) ;
let http_interval_sender = http_interval_sender . clone ( ) ;
2022-08-11 00:52:28 +03:00
let block_sender = if head_block_sender . is_some ( ) {
Some ( block_sender . clone ( ) )
} else {
None
} ;
2022-07-19 07:21:32 +03:00
let pending_tx_id_sender = Some ( pending_tx_id_sender . clone ( ) ) ;
tokio ::spawn ( async move {
server_config
. spawn (
2022-08-10 08:56:09 +03:00
server_name ,
2022-07-19 07:21:32 +03:00
redis_client_pool ,
chain_id ,
http_client ,
http_interval_sender ,
block_sender ,
pending_tx_id_sender ,
)
. await
} )
} )
. collect ( ) ;
2022-07-25 03:27:00 +03:00
let mut connections = IndexMap ::new ( ) ;
2022-07-19 07:21:32 +03:00
let mut handles = vec! [ ] ;
// TODO: futures unordered?
for x in join_all ( spawn_handles ) . await {
// TODO: how should we handle errors here? one rpc being down shouldn't cause the program to exit
match x {
Ok ( Ok ( ( connection , handle ) ) ) = > {
2022-08-10 08:56:09 +03:00
connections . insert ( connection . url . clone ( ) , connection ) ;
2022-07-19 07:21:32 +03:00
handles . push ( handle ) ;
}
Ok ( Err ( err ) ) = > {
// TODO: some of these are probably retry-able
error! ( ? err ) ;
}
Err ( err ) = > {
return Err ( err . into ( ) ) ;
2022-06-14 08:43:28 +03:00
}
2022-05-06 00:38:15 +03:00
}
2022-05-05 22:07:09 +03:00
}
2022-05-22 02:49:23 +03:00
// TODO: less than 3? what should we do here?
2022-05-16 01:02:14 +03:00
if connections . len ( ) < 2 {
2022-05-22 03:34:33 +03:00
warn! ( " Only {} connection(s)! " , connections . len ( ) ) ;
2022-05-16 01:02:14 +03:00
}
2022-05-15 22:28:22 +03:00
2022-05-20 05:01:02 +03:00
let synced_connections = SyncedConnections ::default ( ) ;
2022-05-16 01:02:14 +03:00
let connections = Arc ::new ( Self {
2022-07-25 03:27:00 +03:00
conns : connections ,
2022-05-18 23:18:01 +03:00
synced_connections : ArcSwap ::new ( Arc ::new ( synced_connections ) ) ,
2022-06-16 20:51:49 +03:00
pending_transactions ,
2022-07-22 08:11:26 +03:00
chain : Default ::default ( ) ,
2022-05-16 01:02:14 +03:00
} ) ;
2022-06-14 07:04:14 +03:00
let handle = {
let connections = connections . clone ( ) ;
tokio ::spawn ( async move {
2022-06-16 20:51:49 +03:00
// TODO: try_join_all with the other handles here
2022-06-14 07:04:14 +03:00
connections
2022-06-14 08:43:28 +03:00
. subscribe (
pending_tx_id_receiver ,
block_receiver ,
head_block_sender ,
pending_tx_sender ,
)
2022-06-14 07:04:14 +03:00
. await
} )
} ;
2022-06-14 08:43:28 +03:00
Ok ( ( connections , handle ) )
2022-05-18 19:35:06 +03:00
}
2022-06-16 20:51:49 +03:00
async fn _funnel_transaction (
& self ,
rpc : Arc < Web3Connection > ,
pending_tx_id : TxHash ,
2022-06-17 01:23:41 +03:00
) -> Result < Option < TxState > , ProviderError > {
2022-06-16 20:51:49 +03:00
// TODO: yearn devs have had better luck with batching these, but i think that's likely just adding a delay itself
2022-07-16 02:59:34 +03:00
// TODO: there is a race here on geth. sometimes the rpc isn't yet ready to serve the transaction (even though they told us about it!)
2022-06-16 23:57:48 +03:00
// TODO: maximum wait time
2022-06-17 01:23:41 +03:00
let pending_transaction : Transaction = match rpc . try_request_handle ( ) . await {
2022-08-07 09:48:57 +03:00
Ok ( HandleResult ::ActiveRequest ( handle ) ) = > {
handle
2022-06-17 01:23:41 +03:00
. request ( " eth_getTransactionByHash " , ( pending_tx_id , ) )
. await ?
}
2022-08-07 09:48:57 +03:00
Ok ( _ ) = > {
// TODO: actually retry?
return Ok ( None ) ;
}
2022-06-17 01:23:41 +03:00
Err ( err ) = > {
trace! (
? pending_tx_id ,
? rpc ,
? err ,
" cancelled funneling transaction "
) ;
return Ok ( None ) ;
}
} ;
2022-06-16 20:51:49 +03:00
trace! ( ? pending_transaction , " pending " ) ;
match & pending_transaction . block_hash {
Some ( _block_hash ) = > {
// the transaction is already confirmed. no need to save in the pending_transactions map
2022-06-17 01:23:41 +03:00
Ok ( Some ( TxState ::Confirmed ( pending_transaction ) ) )
2022-06-16 20:51:49 +03:00
}
2022-06-17 01:23:41 +03:00
None = > Ok ( Some ( TxState ::Pending ( pending_transaction ) ) ) ,
2022-06-16 20:51:49 +03:00
}
}
2022-07-23 03:19:13 +03:00
/// dedupe transaction and send them to any listening clients
2022-06-16 20:51:49 +03:00
async fn funnel_transaction (
2022-06-16 05:53:37 +03:00
self : Arc < Self > ,
rpc : Arc < Web3Connection > ,
pending_tx_id : TxHash ,
pending_tx_sender : broadcast ::Sender < TxState > ,
) -> anyhow ::Result < ( ) > {
2022-06-16 20:51:49 +03:00
// TODO: how many retries? until some timestamp is hit is probably better. maybe just loop and call this with a timeout
2022-06-16 23:57:48 +03:00
// TODO: after more investigation, i don't think retries will help. i think this is because chains of transactions get dropped from memory
// TODO: also check the "confirmed transactions" mapping? maybe one shared mapping with TxState in it?
2022-06-16 20:51:49 +03:00
2022-06-18 10:06:54 +03:00
if pending_tx_sender . receiver_count ( ) = = 0 {
// no receivers, so no point in querying to get the full transaction
2022-06-16 23:57:48 +03:00
return Ok ( ( ) ) ;
}
2022-06-16 05:53:37 +03:00
2022-07-16 08:21:08 +03:00
trace! ( ? pending_tx_id , " checking pending_transactions on {} " , rpc ) ;
2022-06-18 10:06:54 +03:00
if self . pending_transactions . contains_key ( & pending_tx_id ) {
// this transaction has already been processed
2022-06-16 23:57:48 +03:00
return Ok ( ( ) ) ;
}
// query the rpc for this transaction
// it is possible that another rpc is also being queried. thats fine. we want the fastest response
match self . _funnel_transaction ( rpc . clone ( ) , pending_tx_id ) . await {
2022-06-17 01:23:41 +03:00
Ok ( Some ( tx_state ) ) = > {
2022-06-16 20:51:49 +03:00
let _ = pending_tx_sender . send ( tx_state ) ;
2022-06-16 05:53:37 +03:00
2022-06-16 23:57:48 +03:00
trace! ( ? pending_tx_id , " sent " ) ;
2022-06-16 05:53:37 +03:00
2022-06-16 23:57:48 +03:00
// we sent the transaction. return now. don't break looping because that gives a warning
2022-06-16 20:51:49 +03:00
return Ok ( ( ) ) ;
2022-06-16 05:53:37 +03:00
}
2022-06-17 01:23:41 +03:00
Ok ( None ) = > { }
2022-06-16 23:57:48 +03:00
Err ( err ) = > {
trace! ( ? err , ? pending_tx_id , " failed fetching transaction " ) ;
// unable to update the entry. sleep and try again soon
// TODO: retry with exponential backoff with jitter starting from a much smaller time
// sleep(Duration::from_millis(100)).await;
}
2022-06-16 05:53:37 +03:00
}
2022-06-16 23:57:48 +03:00
// warn is too loud. this is somewhat common
// "There is a Pending txn with a lower account nonce. This txn can only be executed after confirmation of the earlier Txn Hash#"
// sometimes it's been pending for many hours
// sometimes it's maybe something else?
debug! ( ? pending_tx_id , " not found on {} " , rpc ) ;
2022-06-16 05:53:37 +03:00
Ok ( ( ) )
}
2022-07-22 22:30:39 +03:00
/// subscribe to blocks and transactions from all the backend rpcs.
/// blocks are processed by all the `Web3Connection`s and then sent to the `block_receiver`
/// transaction ids from all the `Web3Connection`s are deduplicated and forwarded to `pending_tx_sender`
2022-06-14 07:04:14 +03:00
async fn subscribe (
self : Arc < Self > ,
2022-06-14 08:43:28 +03:00
pending_tx_id_receiver : flume ::Receiver < ( TxHash , Arc < Web3Connection > ) > ,
2022-07-22 08:11:26 +03:00
block_receiver : flume ::Receiver < ( Arc < Block < TxHash > > , Arc < Web3Connection > ) > ,
head_block_sender : Option < watch ::Sender < Arc < Block < TxHash > > > > ,
2022-06-16 05:53:37 +03:00
pending_tx_sender : Option < broadcast ::Sender < TxState > > ,
2022-06-14 07:04:14 +03:00
) -> anyhow ::Result < ( ) > {
2022-06-16 05:53:37 +03:00
let mut futures = vec! [ ] ;
2022-05-18 19:35:06 +03:00
2022-06-14 08:43:28 +03:00
// setup the transaction funnel
2022-06-14 07:04:14 +03:00
// it skips any duplicates (unless they are being orphaned)
// fetches new transactions from the notifying rpc
// forwards new transacitons to pending_tx_receipt_sender
2022-06-15 01:02:18 +03:00
if let Some ( pending_tx_sender ) = pending_tx_sender . clone ( ) {
2022-06-14 07:04:14 +03:00
// TODO: do something with the handle so we can catch any errors
2022-06-14 09:42:52 +03:00
let clone = self . clone ( ) ;
2022-06-14 07:04:14 +03:00
let handle = task ::spawn ( async move {
2022-06-16 05:53:37 +03:00
while let Ok ( ( pending_tx_id , rpc ) ) = pending_tx_id_receiver . recv_async ( ) . await {
2022-06-16 20:51:49 +03:00
let f = clone . clone ( ) . funnel_transaction (
2022-06-16 05:53:37 +03:00
rpc ,
pending_tx_id ,
pending_tx_sender . clone ( ) ,
) ;
tokio ::spawn ( f ) ;
2022-06-14 07:04:14 +03:00
}
Ok ( ( ) )
} ) ;
2022-06-16 05:53:37 +03:00
futures . push ( flatten_handle ( handle ) ) ;
2022-06-14 07:04:14 +03:00
}
2022-06-14 08:43:28 +03:00
// setup the block funnel
2022-06-14 07:04:14 +03:00
if let Some ( head_block_sender ) = head_block_sender {
let connections = Arc ::clone ( & self ) ;
2022-06-15 01:02:18 +03:00
let pending_tx_sender = pending_tx_sender . clone ( ) ;
2022-05-18 19:35:06 +03:00
let handle = task ::Builder ::default ( )
2022-06-14 07:04:14 +03:00
. name ( " update_synced_rpcs " )
2022-05-17 19:23:27 +03:00
. spawn ( async move {
2022-06-14 07:04:14 +03:00
connections
2022-06-15 01:02:18 +03:00
. update_synced_rpcs ( block_receiver , head_block_sender , pending_tx_sender )
2022-05-17 19:23:27 +03:00
. await
} ) ;
2022-05-18 19:35:06 +03:00
2022-06-16 05:53:37 +03:00
futures . push ( flatten_handle ( handle ) ) ;
2022-05-16 01:02:14 +03:00
}
2022-06-14 08:43:28 +03:00
if futures . is_empty ( ) {
// no transaction or block subscriptions.
2022-08-11 00:29:50 +03:00
// todo!("every second, check that the provider is still connected");?
let handle = task ::Builder ::default ( ) . name ( " noop " ) . spawn ( async move {
loop {
sleep ( Duration ::from_secs ( 600 ) ) . await ;
}
} ) ;
futures . push ( flatten_handle ( handle ) ) ;
2022-06-14 08:43:28 +03:00
}
2022-06-16 05:53:37 +03:00
if let Err ( e ) = try_join_all ( futures ) . await {
2022-06-16 20:51:49 +03:00
error! ( " subscriptions over: {:?} " , self ) ;
2022-06-16 05:53:37 +03:00
return Err ( e ) ;
2022-06-14 07:04:14 +03:00
}
2022-05-18 19:35:06 +03:00
2022-06-14 08:43:28 +03:00
info! ( " subscriptions over: {:?} " , self ) ;
2022-06-14 07:04:14 +03:00
Ok ( ( ) )
2022-05-05 22:07:09 +03:00
}
2022-08-04 01:23:10 +03:00
pub async fn block ( & self , hash : & H256 ) -> anyhow ::Result < Arc < Block < TxHash > > > {
2022-07-22 22:30:39 +03:00
// first, try to get the hash from our cache
2022-08-04 01:23:10 +03:00
if let Some ( block ) = self . chain . block ( hash ) {
2022-07-22 08:11:26 +03:00
return Ok ( block ) ;
2022-07-19 09:41:04 +03:00
}
2022-07-19 04:31:12 +03:00
2022-07-22 22:30:39 +03:00
// block not in cache. we need to ask an rpc for it
// TODO: helper for method+params => JsonRpcRequest
// TODO: get block with the transactions?
let request = json! ( { " id " : " 1 " , " method " : " eth_getBlockByHash " , " params " : ( hash , false ) } ) ;
let request : JsonRpcRequest = serde_json ::from_value ( request ) ? ;
// TODO: if error, retry?
let response = self . try_send_best_upstream_server ( request , None ) . await ? ;
let block = response . result . unwrap ( ) ;
2022-07-21 02:49:29 +03:00
2022-07-22 22:30:39 +03:00
let block : Block < TxHash > = serde_json ::from_str ( block . get ( ) ) ? ;
let block = Arc ::new ( block ) ;
self . chain . add_block ( block . clone ( ) , false ) ;
Ok ( block )
}
/// Get the heaviest chain's block from cache or backend rpc
2022-08-04 01:23:10 +03:00
pub async fn cannonical_block ( & self , num : & U64 ) -> anyhow ::Result < Arc < Block < TxHash > > > {
2022-07-22 22:30:39 +03:00
// first, try to get the hash from our cache
2022-08-04 01:23:10 +03:00
if let Some ( block ) = self . chain . cannonical_block ( num ) {
2022-07-22 22:30:39 +03:00
return Ok ( block ) ;
}
// block not in cache. we need to ask an rpc for it
// but before we do any queries, be sure the requested block num exists
2022-08-04 01:23:10 +03:00
let head_block_num = self . head_block_num ( ) ;
2022-07-22 22:30:39 +03:00
if num > & head_block_num {
2022-08-11 00:29:50 +03:00
// TODO: i'm seeing this a lot when using ethspam. i dont know why though. i thought we delayed publishing
// TODO: instead of error, maybe just sleep and try again?
2022-07-21 02:49:29 +03:00
return Err ( anyhow ::anyhow! (
" Head block is #{}, but #{} was requested " ,
head_block_num ,
num
) ) ;
}
// TODO: helper for method+params => JsonRpcRequest
2022-07-22 22:30:39 +03:00
// TODO: get block with the transactions?
2022-08-11 00:29:50 +03:00
let request = json! ( { " jsonrpc " : " 2.0 " , " id " : " 1 " , " method " : " eth_getBlockByNumber " , " params " : ( num , false ) } ) ;
2022-07-19 04:31:12 +03:00
let request : JsonRpcRequest = serde_json ::from_value ( request ) ? ;
2022-07-21 02:49:29 +03:00
// TODO: if error, retry?
2022-07-19 04:31:12 +03:00
let response = self
. try_send_best_upstream_server ( request , Some ( num ) )
. await ? ;
2022-07-22 08:11:26 +03:00
let block = response . result . unwrap ( ) ;
2022-07-21 02:49:29 +03:00
2022-07-22 08:11:26 +03:00
let block : Block < TxHash > = serde_json ::from_str ( block . get ( ) ) ? ;
2022-07-19 04:31:12 +03:00
2022-07-22 08:11:26 +03:00
let block = Arc ::new ( block ) ;
2022-07-19 09:41:04 +03:00
2022-07-22 08:11:26 +03:00
self . chain . add_block ( block . clone ( ) , true ) ;
Ok ( block )
}
2022-07-22 22:30:39 +03:00
/// Convenience method to get the cannonical block at a given block height.
2022-08-04 01:23:10 +03:00
pub async fn block_hash ( & self , num : & U64 ) -> anyhow ::Result < H256 > {
let block = self . cannonical_block ( num ) . await ? ;
2022-07-22 08:11:26 +03:00
let hash = block . hash . unwrap ( ) ;
2022-07-19 09:41:04 +03:00
Ok ( hash )
2022-07-19 04:31:12 +03:00
}
2022-08-04 01:23:10 +03:00
pub fn head_block ( & self ) -> ( U64 , H256 ) {
2022-07-21 02:49:29 +03:00
let synced_connections = self . synced_connections . load ( ) ;
2022-08-10 08:56:09 +03:00
(
synced_connections . head_block_num ,
synced_connections . head_block_hash ,
)
2022-07-21 02:49:29 +03:00
}
2022-08-04 01:23:10 +03:00
pub fn head_block_hash ( & self ) -> H256 {
2022-08-10 08:56:09 +03:00
self . synced_connections . load ( ) . head_block_hash
2022-05-16 01:02:14 +03:00
}
2022-05-05 22:07:09 +03:00
2022-08-04 01:23:10 +03:00
pub fn head_block_num ( & self ) -> U64 {
2022-08-10 08:56:09 +03:00
self . synced_connections . load ( ) . head_block_num
2022-07-21 02:49:29 +03:00
}
2022-08-04 01:23:10 +03:00
pub fn synced ( & self ) -> bool {
2022-07-26 07:53:38 +03:00
// TODO: require a minimum number of synced rpcs
// TODO: move this whole function to SyncedConnections
2022-07-25 03:27:00 +03:00
if self . synced_connections . load ( ) . conns . is_empty ( ) {
2022-07-21 02:49:29 +03:00
return false ;
}
2022-08-04 01:23:10 +03:00
self . head_block_num ( ) > U64 ::zero ( )
2022-06-29 21:22:53 +03:00
}
2022-07-09 05:23:26 +03:00
pub fn num_synced_rpcs ( & self ) -> usize {
2022-07-25 03:27:00 +03:00
self . synced_connections . load ( ) . conns . len ( )
2022-07-09 05:23:26 +03:00
}
2022-05-28 07:26:24 +03:00
/// Send the same request to all the handles. Returning the most common success or most common error.
2022-05-17 03:56:56 +03:00
#[ instrument(skip_all) ]
2022-05-12 02:50:52 +03:00
pub async fn try_send_parallel_requests (
2022-05-28 07:26:24 +03:00
& self ,
2022-05-12 02:50:52 +03:00
active_request_handles : Vec < ActiveRequestHandle > ,
2022-05-28 07:26:24 +03:00
method : & str ,
// TODO: remove this box once i figure out how to do the options
2022-07-08 22:01:11 +03:00
params : Option < & serde_json ::Value > ,
2022-05-28 07:26:24 +03:00
) -> Result < Box < RawValue > , ProviderError > {
2022-05-12 06:54:42 +03:00
// TODO: if only 1 active_request_handles, do self.try_send_request
2022-05-05 22:07:09 +03:00
2022-05-28 07:26:24 +03:00
let responses = active_request_handles
. into_iter ( )
. map ( | active_request_handle | async move {
let result : Result < Box < RawValue > , _ > =
active_request_handle . request ( method , params ) . await ;
result
} )
. collect ::< FuturesUnordered < _ > > ( )
. collect ::< Vec < Result < Box < RawValue > , ProviderError > > > ( )
. await ;
2022-05-28 21:45:45 +03:00
// TODO: Strings are not great keys, but we can't use RawValue or ProviderError as keys because they don't implement Hash or Eq
2022-05-28 07:26:24 +03:00
let mut count_map : HashMap < String , Result < Box < RawValue > , ProviderError > > = HashMap ::new ( ) ;
let mut counts : Counter < String > = Counter ::new ( ) ;
let mut any_ok = false ;
for response in responses {
let s = format! ( " {:?} " , response ) ;
if count_map . get ( & s ) . is_none ( ) {
if response . is_ok ( ) {
any_ok = true ;
}
2022-05-17 19:23:27 +03:00
2022-05-28 07:26:24 +03:00
count_map . insert ( s . clone ( ) , response ) ;
}
2022-05-05 22:07:09 +03:00
2022-05-28 07:26:24 +03:00
counts . update ( [ s ] . into_iter ( ) ) ;
2022-05-05 22:07:09 +03:00
}
2022-05-28 07:26:24 +03:00
for ( most_common , _ ) in counts . most_common_ordered ( ) {
let most_common = count_map . remove ( & most_common ) . unwrap ( ) ;
if any_ok & & most_common . is_err ( ) {
// errors were more common, but we are going to skip them because we got an okay
continue ;
} else {
// return the most common
return most_common ;
2022-05-05 22:07:09 +03:00
}
}
2022-05-28 07:26:24 +03:00
// TODO: what should we do if we get here? i don't think we will
panic! ( " i don't think this is possible " )
2022-05-05 22:07:09 +03:00
}
2022-08-11 00:29:50 +03:00
/// TODO: move parts of this onto SyncedConnections? it needs to be simpler
2022-05-20 05:16:48 +03:00
// we don't instrument here because we put a span inside the while loop
2022-05-16 01:02:14 +03:00
async fn update_synced_rpcs (
2022-05-15 09:27:13 +03:00
& self ,
2022-07-22 08:11:26 +03:00
block_receiver : flume ::Receiver < ( Arc < Block < TxHash > > , Arc < Web3Connection > ) > ,
head_block_sender : watch ::Sender < Arc < Block < TxHash > > > ,
2022-06-15 01:02:18 +03:00
// TODO: use pending_tx_sender
2022-06-16 05:53:37 +03:00
pending_tx_sender : Option < broadcast ::Sender < TxState > > ,
2022-05-15 09:27:13 +03:00
) -> anyhow ::Result < ( ) > {
2022-07-25 03:27:00 +03:00
let total_rpcs = self . conns . len ( ) ;
2022-05-18 23:18:01 +03:00
2022-07-25 03:27:00 +03:00
// TODO: rpc name instead of url (will do this with config reload revamp)
// TODO: indexmap or hashmap? what hasher? with_capacity?
2022-08-11 00:29:50 +03:00
// TODO: this will grow unbounded. prune old heads automatically
2022-07-25 03:27:00 +03:00
let mut connection_heads = IndexMap ::< String , Arc < Block < TxHash > > > ::new ( ) ;
2022-05-18 23:18:01 +03:00
2022-06-14 07:04:14 +03:00
while let Ok ( ( new_block , rpc ) ) = block_receiver . recv_async ( ) . await {
2022-07-25 03:27:00 +03:00
let new_block_hash = if let Some ( hash ) = new_block . hash {
hash
} else {
2022-08-10 08:56:09 +03:00
connection_heads . remove ( & rpc . url ) ;
2022-07-25 03:27:00 +03:00
continue ;
} ;
// TODO: dry this with the code above
let new_block_num = if let Some ( num ) = new_block . number {
num
} else {
// this seems unlikely, but i'm pretty sure we have seen it
// maybe when a node is syncing or reconnecting?
warn! ( % rpc , ? new_block , " Block without number! " ) ;
2022-08-10 08:56:09 +03:00
connection_heads . remove ( & rpc . url ) ;
2022-07-25 03:27:00 +03:00
continue ;
2022-05-31 04:55:04 +03:00
} ;
2022-05-30 04:28:22 +03:00
2022-05-20 00:58:21 +03:00
// TODO: span with more in it?
2022-05-19 06:00:54 +03:00
// TODO: make sure i'm doing this span right
2022-05-20 00:58:21 +03:00
// TODO: show the actual rpc url?
2022-06-14 07:04:14 +03:00
// TODO: clippy lint to make sure we don't hold this across an awaited future
2022-07-25 03:27:00 +03:00
// TODO: what level?
// let _span = info_span!("block_receiver", %rpc, %new_block_num).entered();
if new_block_num = = U64 ::zero ( ) {
warn! ( % rpc , % new_block_num , " still syncing " ) ;
2022-05-18 19:35:06 +03:00
2022-08-10 08:56:09 +03:00
connection_heads . remove ( & rpc . url ) ;
2022-07-25 03:27:00 +03:00
} else {
2022-08-10 08:56:09 +03:00
connection_heads . insert ( rpc . url . clone ( ) , new_block . clone ( ) ) ;
2022-07-25 03:27:00 +03:00
self . chain . add_block ( new_block . clone ( ) , false ) ;
2022-05-20 00:58:21 +03:00
}
2022-07-25 03:27:00 +03:00
// iterate connection_heads to find the oldest block
let lowest_block_num = if let Some ( lowest_block ) = connection_heads
. values ( )
. min_by ( | a , b | a . number . cmp ( & b . number ) )
{
2022-08-11 00:29:50 +03:00
lowest_block
. number
. expect ( " all blocks here should have a number " )
2022-07-25 03:27:00 +03:00
} else {
continue ;
} ;
2022-07-22 08:11:26 +03:00
2022-07-25 03:27:00 +03:00
// iterate connection_heads to find the consensus block
let mut rpcs_by_num = IndexMap ::< U64 , Vec < & str > > ::new ( ) ;
let mut blocks_by_hash = IndexMap ::< H256 , Arc < Block < TxHash > > > ::new ( ) ;
// block_hash => soft_limit, rpcs
// TODO: proper type for this?
let mut rpcs_by_hash = IndexMap ::< H256 , Vec < & str > > ::new ( ) ;
let mut total_soft_limit = 0 ;
2022-05-18 23:18:01 +03:00
2022-07-25 03:27:00 +03:00
for ( rpc_url , head_block ) in connection_heads . iter ( ) {
if let Some ( rpc ) = self . conns . get ( rpc_url ) {
// we need the total soft limit in order to know when its safe to update the backends
2022-08-10 08:56:09 +03:00
total_soft_limit + = rpc . soft_limit ;
2022-05-18 23:18:01 +03:00
2022-07-25 03:27:00 +03:00
let head_hash = head_block . hash . unwrap ( ) ;
2022-05-18 23:18:01 +03:00
2022-07-25 03:27:00 +03:00
// save the block
blocks_by_hash
. entry ( head_hash )
. or_insert_with ( | | head_block . clone ( ) ) ;
2022-05-19 06:00:54 +03:00
2022-07-25 03:27:00 +03:00
// add the rpc to all relevant block heights
let mut block = head_block . clone ( ) ;
while block . number . unwrap ( ) > = lowest_block_num {
let block_hash = block . hash . unwrap ( ) ;
let block_num = block . number . unwrap ( ) ;
2022-05-30 04:28:22 +03:00
2022-07-25 03:27:00 +03:00
// save the rpcs and the sum of their soft limit by their head hash
let rpc_urls_by_hash =
rpcs_by_hash . entry ( block_hash ) . or_insert_with ( Vec ::new ) ;
rpc_urls_by_hash . push ( rpc_url ) ;
2022-06-14 09:09:56 +03:00
2022-07-25 03:27:00 +03:00
// save the rpcs by their number
let rpc_urls_by_num = rpcs_by_num . entry ( block_num ) . or_insert_with ( Vec ::new ) ;
2022-07-19 09:41:04 +03:00
2022-07-25 03:27:00 +03:00
rpc_urls_by_num . push ( rpc_url ) ;
2022-07-22 08:11:26 +03:00
2022-08-04 01:23:10 +03:00
if let Some ( parent ) = self . chain . block ( & block . parent_hash ) {
2022-07-25 03:27:00 +03:00
// save the parent block
blocks_by_hash . insert ( block . parent_hash , parent . clone ( ) ) ;
block = parent
} else {
// log this? eventually we will hit a block we don't have, so it's not an error
break ;
}
}
2022-05-18 23:18:01 +03:00
}
2022-07-25 03:27:00 +03:00
}
2022-05-19 06:00:54 +03:00
2022-07-25 21:00:29 +03:00
// TODO: default_min_soft_limit? without, we start serving traffic at the start too quickly
// let min_soft_limit = total_soft_limit / 2;
let min_soft_limit = 1 ;
2022-07-25 21:21:58 +03:00
let num_possible_heads = rpcs_by_hash . len ( ) ;
trace! ( ? rpcs_by_hash ) ;
2022-05-19 06:00:54 +03:00
2022-07-25 03:27:00 +03:00
struct State < ' a > {
block : & ' a Arc < Block < TxHash > > ,
sum_soft_limit : u32 ,
conns : Vec < & ' a str > ,
}
2022-05-19 06:00:54 +03:00
2022-07-25 03:27:00 +03:00
impl < ' a > State < ' a > {
// TODO: there are sortable traits, but this seems simpler
2022-08-07 23:44:56 +03:00
/// sort the blocks in descending height
2022-08-08 22:57:54 +03:00
fn sortable_values ( & self ) -> ( & U64 , & u32 , & U256 , & H256 ) {
2022-08-11 00:29:50 +03:00
// trace!(?self.block, ?self.conns);
2022-08-07 23:44:56 +03:00
// first we care about the block number
2022-07-25 03:27:00 +03:00
let block_num = self . block . number . as_ref ( ) . unwrap ( ) ;
2022-05-19 06:00:54 +03:00
2022-08-07 23:44:56 +03:00
// if block_num ties, the block with the highest total difficulty *should* be the winner
// TODO: sometimes i see a block with no total difficulty. websocket subscription doesn't get everything
// let total_difficulty = self.block.total_difficulty.as_ref().expect("wat");
// all the nodes should already be doing this fork priority logic themselves
// so, it should be safe to just look at whatever our node majority thinks and go with that
2022-07-25 03:27:00 +03:00
let sum_soft_limit = & self . sum_soft_limit ;
2022-05-19 06:00:54 +03:00
2022-08-07 23:44:56 +03:00
let difficulty = & self . block . difficulty ;
2022-05-19 06:00:54 +03:00
2022-08-07 23:44:56 +03:00
// if we are still tied (unlikely). this will definitely break the tie
// TODO: what does geth do?
2022-07-25 03:27:00 +03:00
let block_hash = self . block . hash . as_ref ( ) . unwrap ( ) ;
2022-05-19 06:00:54 +03:00
2022-08-08 22:57:54 +03:00
( block_num , sum_soft_limit , difficulty , block_hash )
2022-07-25 03:27:00 +03:00
}
}
2022-08-07 23:44:56 +03:00
// TODO: this needs tests
2022-07-25 03:27:00 +03:00
if let Some ( x ) = rpcs_by_hash
. into_iter ( )
. filter_map ( | ( hash , conns ) | {
// TODO: move this to `State::new` function on
let sum_soft_limit = conns
. iter ( )
. map ( | rpc_url | {
if let Some ( rpc ) = self . conns . get ( * rpc_url ) {
2022-08-10 08:56:09 +03:00
rpc . soft_limit
2022-07-25 03:27:00 +03:00
} else {
0
}
} )
. sum ( ) ;
2022-05-30 04:28:22 +03:00
2022-07-25 03:27:00 +03:00
if sum_soft_limit < min_soft_limit {
trace! ( ? sum_soft_limit , ? min_soft_limit , " sum_soft_limit too low " ) ;
None
} else {
let block = blocks_by_hash . get ( & hash ) . unwrap ( ) ;
Some ( State {
block ,
sum_soft_limit ,
conns ,
} )
2022-05-19 06:00:54 +03:00
}
2022-07-25 03:27:00 +03:00
} )
2022-08-11 00:29:50 +03:00
// sort b to a for descending order. sort a to b for ascending order? maybe not "max_by" is smart
. max_by ( | a , b | a . sortable_values ( ) . cmp ( & b . sortable_values ( ) ) )
2022-07-25 03:27:00 +03:00
{
let best_head_num = x . block . number . unwrap ( ) ;
let best_head_hash = x . block . hash . unwrap ( ) ;
let best_rpcs = x . conns ;
let synced_rpcs = rpcs_by_num . remove ( & best_head_num ) . unwrap ( ) ;
if best_rpcs . len ( ) = = synced_rpcs . len ( ) {
trace! (
" {}/{}/{}/{} rpcs have {} " ,
best_rpcs . len ( ) ,
synced_rpcs . len ( ) ,
connection_heads . len ( ) ,
total_rpcs ,
best_head_hash
) ;
} else {
2022-07-25 21:21:58 +03:00
// TODO: this isn't necessarily a fork. this might just be an rpc being slow
// TODO: log all the heads?
2022-07-25 03:27:00 +03:00
warn! (
" chain is forked! {} possible heads. {}/{}/{}/{} rpcs have {} " ,
2022-07-25 21:21:58 +03:00
num_possible_heads ,
2022-07-25 03:27:00 +03:00
best_rpcs . len ( ) ,
synced_rpcs . len ( ) ,
connection_heads . len ( ) ,
total_rpcs ,
best_head_hash
) ;
2022-05-18 23:18:01 +03:00
}
2022-07-19 09:41:04 +03:00
2022-07-25 03:27:00 +03:00
let num_best_rpcs = best_rpcs . len ( ) ;
2022-07-19 09:41:04 +03:00
2022-07-25 03:27:00 +03:00
// TODOL: do this without clone?
let conns = best_rpcs
. into_iter ( )
. map ( | x | self . conns . get ( x ) . unwrap ( ) . clone ( ) )
. collect ( ) ;
2022-05-18 23:18:01 +03:00
2022-07-25 03:27:00 +03:00
let pending_synced_connections = SyncedConnections {
2022-08-10 08:56:09 +03:00
head_block_num : best_head_num ,
2022-07-25 03:27:00 +03:00
head_block_hash : best_head_hash ,
conns ,
} ;
2022-05-18 23:18:01 +03:00
2022-08-04 01:23:10 +03:00
let current_head_block = self . head_block_hash ( ) ;
2022-07-25 03:27:00 +03:00
let new_head_block =
pending_synced_connections . head_block_hash ! = current_head_block ;
if new_head_block {
self . chain . add_block ( new_block . clone ( ) , true ) ;
2022-08-06 04:17:25 +03:00
debug! (
2022-07-26 01:36:02 +03:00
" {}/{} rpcs at {} ({}). head at {:?} " ,
2022-07-25 03:27:00 +03:00
pending_synced_connections . conns . len ( ) ,
self . conns . len ( ) ,
pending_synced_connections . head_block_num ,
2022-08-11 00:29:50 +03:00
pending_synced_connections . head_block_hash ,
2022-07-26 01:36:02 +03:00
pending_synced_connections
. conns
. iter ( )
. map ( | x | format! ( " {} " , x ) )
. collect ::< Vec < _ > > ( ) ,
2022-07-25 03:27:00 +03:00
) ;
// TODO: what if the hashes don't match?
if pending_synced_connections . head_block_hash = = new_block_hash {
// mark all transactions in the block as confirmed
if pending_tx_sender . is_some ( ) {
for tx_hash in & new_block . transactions {
// TODO: should we mark as confirmed via pending_tx_sender?
// TODO: possible deadlock here!
// trace!("removing {}...", tx_hash);
let _ = self . pending_transactions . remove ( tx_hash ) ;
// trace!("removed {}", tx_hash);
}
} ;
2022-05-22 07:22:30 +03:00
2022-07-25 03:27:00 +03:00
// TODO: mark any orphaned transactions as unconfirmed
2022-06-15 01:02:18 +03:00
}
2022-07-26 01:36:02 +03:00
} else if num_best_rpcs = = self . conns . len ( ) {
2022-08-06 04:17:25 +03:00
trace! (
2022-07-26 01:36:02 +03:00
" all {} rpcs at {} ({}) " ,
num_best_rpcs ,
pending_synced_connections . head_block_num ,
2022-08-11 00:29:50 +03:00
pending_synced_connections . head_block_hash ,
2022-07-26 01:36:02 +03:00
) ;
2022-07-25 03:27:00 +03:00
} else {
trace! (
2022-07-26 01:36:02 +03:00
? pending_synced_connections ,
2022-07-25 03:27:00 +03:00
" {}/{} rpcs at {} ({}) " ,
2022-07-26 01:36:02 +03:00
num_best_rpcs ,
2022-07-25 03:27:00 +03:00
self . conns . len ( ) ,
pending_synced_connections . head_block_num ,
2022-08-11 00:29:50 +03:00
pending_synced_connections . head_block_hash ,
2022-07-25 03:27:00 +03:00
) ;
}
2022-06-15 01:02:18 +03:00
2022-07-25 03:27:00 +03:00
// TODO: do this before or after processing all the transactions in this block?
2022-08-11 00:29:50 +03:00
// TODO: only swap if there is a change?
2022-08-11 00:52:28 +03:00
trace! ( ? pending_synced_connections , " swapping " ) ;
2022-07-25 03:27:00 +03:00
self . synced_connections
. swap ( Arc ::new ( pending_synced_connections ) ) ;
2022-06-15 01:02:18 +03:00
2022-07-25 03:27:00 +03:00
if new_head_block {
2022-08-11 00:29:50 +03:00
// TODO: is new_head_block accurate?
// TODO: move this onto self.chain?
2022-07-25 03:27:00 +03:00
head_block_sender
. send ( new_block . clone ( ) )
. context ( " head_block_sender " ) ? ;
}
} else {
2022-08-11 00:29:50 +03:00
// TODO: is this expected when we first start?
2022-07-25 03:27:00 +03:00
// TODO: make sure self.synced_connections is empty
warn! ( " not enough rpcs in sync " ) ;
2022-07-22 08:11:26 +03:00
}
2022-05-06 08:44:30 +03:00
}
2022-05-05 22:07:09 +03:00
2022-05-18 19:35:06 +03:00
// TODO: if there was an error, we should return it
warn! ( " block_receiver exited! " ) ;
2022-05-05 22:07:09 +03:00
Ok ( ( ) )
}
/// get the best available rpc server
2022-05-17 03:56:56 +03:00
#[ instrument(skip_all) ]
2022-07-02 04:20:28 +03:00
pub async fn next_upstream_server (
& self ,
skip : & [ Arc < Web3Connection > ] ,
2022-07-22 22:30:39 +03:00
min_block_needed : Option < & U64 > ,
2022-08-07 09:48:57 +03:00
) -> anyhow ::Result < HandleResult > {
let mut earliest_retry_at = None ;
2022-05-05 22:07:09 +03:00
2022-07-16 02:59:34 +03:00
// filter the synced rpcs
2022-07-19 04:31:12 +03:00
// TODO: we are going to be checking "has_block_data" a lot now. i think we pretty much always have min_block_needed now that we override "latest"
let mut synced_rpcs : Vec < Arc < Web3Connection > > =
if let Some ( min_block_needed ) = min_block_needed {
// TODO: this includes ALL archive servers. but we only want them if they are on a somewhat recent block
// TODO: maybe instead of "archive_needed" bool it should be the minimum height. then even delayed servers might be fine. will need to track all heights then
2022-07-25 03:27:00 +03:00
self . conns
. values ( )
2022-07-19 04:31:12 +03:00
. filter ( | x | x . has_block_data ( min_block_needed ) )
. filter ( | x | ! skip . contains ( x ) )
. cloned ( )
. collect ( )
} else {
self . synced_connections
. load ( )
2022-07-25 03:27:00 +03:00
. conns
2022-07-19 04:31:12 +03:00
. iter ( )
. filter ( | x | ! skip . contains ( x ) )
. cloned ( )
. collect ( )
} ;
2022-05-05 22:07:09 +03:00
2022-07-02 04:20:28 +03:00
if synced_rpcs . is_empty ( ) {
2022-08-07 09:48:57 +03:00
// TODO: what should happen here? might be nicer to retry in a second
return Err ( anyhow ::anyhow! ( " not synced " ) ) ;
2022-07-02 04:20:28 +03:00
}
2022-07-16 07:13:02 +03:00
let sort_cache : HashMap < _ , _ > = synced_rpcs
2022-05-05 22:07:09 +03:00
. iter ( )
2022-06-14 07:04:14 +03:00
. map ( | rpc | {
2022-07-19 04:31:12 +03:00
// TODO: get active requests and the soft limit out of redis?
2022-08-10 08:56:09 +03:00
let weight = rpc . weight ;
2022-05-18 23:18:01 +03:00
let active_requests = rpc . active_requests ( ) ;
2022-08-10 08:56:09 +03:00
let soft_limit = rpc . soft_limit ;
2022-05-06 23:44:12 +03:00
let utilization = active_requests as f32 / soft_limit as f32 ;
2022-07-19 04:31:12 +03:00
// TODO: double check this sorts how we want
2022-08-08 22:57:54 +03:00
( rpc . clone ( ) , ( weight , utilization , Reverse ( soft_limit ) ) )
2022-05-05 22:07:09 +03:00
} )
. collect ( ) ;
2022-06-14 07:04:14 +03:00
synced_rpcs . sort_unstable_by ( | a , b | {
2022-07-16 07:13:02 +03:00
let a_sorts = sort_cache . get ( a ) . unwrap ( ) ;
let b_sorts = sort_cache . get ( b ) . unwrap ( ) ;
2022-05-06 23:44:12 +03:00
// TODO: i'm comparing floats. crap
2022-07-16 07:13:02 +03:00
a_sorts . partial_cmp ( b_sorts ) . unwrap_or ( cmp ::Ordering ::Equal )
2022-05-05 22:07:09 +03:00
} ) ;
2022-05-19 06:00:54 +03:00
// now that the rpcs are sorted, try to get an active request handle for one of them
2022-06-14 07:04:14 +03:00
for rpc in synced_rpcs . into_iter ( ) {
2022-05-05 22:07:09 +03:00
// increment our connection counter
2022-05-22 02:34:05 +03:00
match rpc . try_request_handle ( ) . await {
2022-08-07 09:48:57 +03:00
Ok ( HandleResult ::ActiveRequest ( handle ) ) = > {
2022-06-14 07:04:14 +03:00
trace! ( " next server on {:?}: {:?} " , self , rpc ) ;
2022-08-07 09:48:57 +03:00
return Ok ( HandleResult ::ActiveRequest ( handle ) ) ;
}
Ok ( HandleResult ::RetryAt ( retry_at ) ) = > {
earliest_retry_at = earliest_retry_at . min ( Some ( retry_at ) ) ;
}
Ok ( HandleResult ::None ) = > {
// TODO: log a warning?
}
Err ( err ) = > {
// TODO: log a warning?
warn! ( ? err , " No request handle for {} " , rpc )
2022-05-06 23:44:12 +03:00
}
2022-05-05 22:07:09 +03:00
}
}
2022-08-07 09:48:57 +03:00
warn! ( " no servers on {:?}! {:?} " , self , earliest_retry_at ) ;
2022-05-06 23:44:12 +03:00
2022-08-07 09:48:57 +03:00
match earliest_retry_at {
None = > todo! ( ) ,
Some ( earliest_retry_at ) = > Ok ( HandleResult ::RetryAt ( earliest_retry_at ) ) ,
}
2022-05-05 22:07:09 +03:00
}
/// get all rpc servers that are not rate limited
2022-05-13 23:50:11 +03:00
/// returns servers even if they aren't in sync. This is useful for broadcasting signed transactions
2022-08-07 09:48:57 +03:00
// TODO: better type on this that can return an anyhow::Result
2022-08-04 01:23:10 +03:00
pub async fn upstream_servers (
2022-07-09 06:34:39 +03:00
& self ,
2022-07-22 22:30:39 +03:00
min_block_needed : Option < & U64 > ,
2022-08-07 09:48:57 +03:00
) -> Result < Vec < ActiveRequestHandle > , Option < Instant > > {
let mut earliest_retry_at = None ;
2022-05-05 22:07:09 +03:00
// TODO: with capacity?
let mut selected_rpcs = vec! [ ] ;
2022-07-25 03:27:00 +03:00
for connection in self . conns . values ( ) {
2022-07-19 04:31:12 +03:00
if let Some ( min_block_needed ) = min_block_needed {
if ! connection . has_block_data ( min_block_needed ) {
continue ;
}
2022-07-09 07:25:59 +03:00
}
2022-05-05 22:07:09 +03:00
// check rate limits and increment our connection counter
2022-05-22 02:34:05 +03:00
match connection . try_request_handle ( ) . await {
2022-08-07 09:48:57 +03:00
Ok ( HandleResult ::RetryAt ( retry_at ) ) = > {
2022-05-06 07:29:25 +03:00
// this rpc is not available. skip it
2022-08-07 09:48:57 +03:00
earliest_retry_at = earliest_retry_at . min ( Some ( retry_at ) ) ;
}
Ok ( HandleResult ::ActiveRequest ( handle ) ) = > selected_rpcs . push ( handle ) ,
Ok ( HandleResult ::None ) = > {
warn! ( " no request handle for {} " , connection )
}
Err ( err ) = > {
warn! ( ? err , " error getting request handle for {} " , connection )
2022-05-06 07:29:25 +03:00
}
2022-05-05 22:07:09 +03:00
}
}
if ! selected_rpcs . is_empty ( ) {
return Ok ( selected_rpcs ) ;
}
2022-05-22 02:34:05 +03:00
// return the earliest retry_after (if no rpcs are synced, this will be None)
2022-08-07 09:48:57 +03:00
Err ( earliest_retry_at )
2022-05-05 22:07:09 +03:00
}
2022-05-28 07:26:24 +03:00
2022-05-29 04:23:58 +03:00
/// be sure there is a timeout on this or it might loop forever
pub async fn try_send_best_upstream_server (
& self ,
request : JsonRpcRequest ,
2022-07-22 22:30:39 +03:00
min_block_needed : Option < & U64 > ,
2022-05-29 04:23:58 +03:00
) -> anyhow ::Result < JsonRpcForwardedResponse > {
2022-07-02 04:20:28 +03:00
let mut skip_rpcs = vec! [ ] ;
// TODO: maximum retries?
2022-05-29 04:23:58 +03:00
loop {
2022-07-25 03:27:00 +03:00
if skip_rpcs . len ( ) = = self . conns . len ( ) {
2022-07-02 04:20:28 +03:00
break ;
}
2022-07-19 04:31:12 +03:00
match self
. next_upstream_server ( & skip_rpcs , min_block_needed )
. await
{
2022-08-07 09:48:57 +03:00
Ok ( HandleResult ::ActiveRequest ( active_request_handle ) ) = > {
2022-07-02 04:20:28 +03:00
// save the rpc in case we get an error and want to retry on another server
skip_rpcs . push ( active_request_handle . clone_connection ( ) ) ;
2022-05-29 04:23:58 +03:00
let response_result = active_request_handle
. request ( & request . method , & request . params )
. await ;
2022-07-02 04:20:28 +03:00
match JsonRpcForwardedResponse ::try_from_response_result (
2022-06-04 00:45:44 +03:00
response_result ,
request . id . clone ( ) ,
) {
Ok ( response ) = > {
2022-07-02 04:20:28 +03:00
if let Some ( error ) = & response . error {
trace! ( ? response , " rpc error " ) ;
2022-07-09 07:25:59 +03:00
// some errors should be retried on other nodes
if error . code = = - 32000 {
let error_msg = error . message . as_str ( ) ;
// TODO: regex?
let retry_prefixes = [
2022-07-02 04:20:28 +03:00
" header not found " ,
" header for hash not found " ,
2022-07-09 07:25:59 +03:00
" missing trie node " ,
2022-07-02 04:20:28 +03:00
" node not started " ,
" RPC timeout " ,
2022-07-09 07:25:59 +03:00
] ;
for retry_prefix in retry_prefixes {
if error_msg . starts_with ( retry_prefix ) {
continue ;
}
}
2022-07-02 04:20:28 +03:00
}
2022-06-04 00:45:44 +03:00
} else {
2022-07-02 04:20:28 +03:00
trace! ( ? response , " rpc success " ) ;
2022-06-04 00:45:44 +03:00
}
2022-05-29 04:23:58 +03:00
2022-06-04 00:45:44 +03:00
return Ok ( response ) ;
}
Err ( e ) = > {
warn! ( ? self , ? e , " Backend server error! " ) ;
2022-05-29 04:23:58 +03:00
2022-06-04 00:45:44 +03:00
// TODO: sleep how long? until synced_connections changes or rate limits are available
sleep ( Duration ::from_millis ( 100 ) ) . await ;
// TODO: when we retry, depending on the error, skip using this same server
// for example, if rpc isn't available on this server, retrying is bad
// but if an http error, retrying on same is probably fine
continue ;
}
}
2022-05-29 04:23:58 +03:00
}
2022-08-07 09:48:57 +03:00
Ok ( HandleResult ::RetryAt ( retry_at ) ) = > {
// TODO: move this to a helper function
// sleep (TODO: with a lock?) until our rate limits should be available
// TODO: if a server catches up sync while we are waiting, we could stop waiting
warn! ( ? retry_at , " All rate limits exceeded. Sleeping " ) ;
2022-05-29 04:23:58 +03:00
2022-08-07 09:48:57 +03:00
sleep_until ( retry_at ) . await ;
2022-06-03 00:47:43 +03:00
continue ;
2022-05-29 04:23:58 +03:00
}
2022-08-07 09:48:57 +03:00
Ok ( HandleResult ::None ) = > {
warn! ( ? self , " No server handles! " ) ;
2022-05-29 04:23:58 +03:00
2022-08-07 09:48:57 +03:00
// TODO: subscribe to something on synced connections. maybe it should just be a watch channel
sleep ( Duration ::from_millis ( 200 ) ) . await ;
2022-05-29 04:23:58 +03:00
continue ;
}
2022-08-07 09:48:57 +03:00
Err ( err ) = > {
return Err ( err ) ;
}
2022-05-29 04:23:58 +03:00
}
}
2022-07-02 04:20:28 +03:00
Err ( anyhow ::anyhow! ( " all retries exhausted " ) )
2022-05-29 04:23:58 +03:00
}
2022-07-02 04:20:28 +03:00
/// be sure there is a timeout on this or it might loop forever
2022-05-28 07:26:24 +03:00
pub async fn try_send_all_upstream_servers (
& self ,
request : JsonRpcRequest ,
2022-07-22 22:30:39 +03:00
min_block_needed : Option < & U64 > ,
2022-05-28 07:26:24 +03:00
) -> anyhow ::Result < JsonRpcForwardedResponse > {
loop {
2022-08-04 01:23:10 +03:00
match self . upstream_servers ( min_block_needed ) . await {
2022-05-28 07:26:24 +03:00
Ok ( active_request_handles ) = > {
// TODO: benchmark this compared to waiting on unbounded futures
// TODO: do something with this handle?
// TODO: this is not working right. simplify
let quorum_response = self
. try_send_parallel_requests (
active_request_handles ,
request . method . as_ref ( ) ,
2022-07-08 22:01:11 +03:00
request . params . as_ref ( ) ,
2022-05-28 07:26:24 +03:00
)
. await ? ;
let response = JsonRpcForwardedResponse {
jsonrpc : " 2.0 " . to_string ( ) ,
id : request . id ,
result : Some ( quorum_response ) ,
error : None ,
} ;
return Ok ( response ) ;
}
Err ( None ) = > {
2022-05-29 04:23:58 +03:00
warn! ( ? self , " No servers in sync! " ) ;
2022-05-28 07:26:24 +03:00
// TODO: i don't think this will ever happen
2022-06-03 00:47:43 +03:00
// TODO: return a 502? if it does?
// return Err(anyhow::anyhow!("no available rpcs!"));
// TODO: sleep how long?
2022-07-16 03:08:22 +03:00
// TODO: subscribe to something in SyncedConnections instead
2022-06-03 00:47:43 +03:00
sleep ( Duration ::from_millis ( 200 ) ) . await ;
continue ;
2022-05-28 07:26:24 +03:00
}
2022-08-07 09:48:57 +03:00
Err ( Some ( retry_at ) ) = > {
2022-05-28 07:26:24 +03:00
// TODO: move this to a helper function
// sleep (TODO: with a lock?) until our rate limits should be available
// TODO: if a server catches up sync while we are waiting, we could stop waiting
2022-06-03 00:47:43 +03:00
warn! ( " All rate limits exceeded. Sleeping " ) ;
2022-08-07 09:48:57 +03:00
sleep_until ( retry_at ) . await ;
2022-05-28 07:26:24 +03:00
2022-06-03 00:47:43 +03:00
continue ;
2022-05-28 07:26:24 +03:00
}
}
}
}
2022-05-05 22:07:09 +03:00
}
2022-07-16 07:13:02 +03:00
mod tests {
#[ test ]
fn test_false_before_true ( ) {
let mut x = [ true , false , true ] ;
2022-07-19 04:31:12 +03:00
x . sort_unstable ( ) ;
2022-07-16 07:13:02 +03:00
assert_eq! ( x , [ false , true , true ] )
}
}
2022-08-24 02:13:56 +03:00
impl fmt ::Debug for SyncedConnections {
fn fmt ( & self , f : & mut fmt ::Formatter < '_ > ) -> fmt ::Result {
// TODO: the default formatter takes forever to write. this is too quiet though
// TODO: print the actual conns?
f . debug_struct ( " SyncedConnections " )
. field ( " head_num " , & self . head_block_num )
. field ( " head_hash " , & self . head_block_hash )
. field ( " num_conns " , & self . conns . len ( ) )
. finish_non_exhaustive ( )
}
}