2022-08-27 02:44:25 +03:00
///! Rate-limited communication with a web3 provider.
2022-12-03 08:31:03 +03:00
use super ::blockchain ::{ ArcBlock , BlockHashesCache , SavedBlock } ;
2022-08-24 02:56:47 +03:00
use super ::provider ::Web3Provider ;
2022-09-09 06:53:16 +03:00
use super ::request ::{ OpenRequestHandle , OpenRequestHandleMetrics , OpenRequestResult } ;
2022-08-24 02:56:47 +03:00
use crate ::app ::{ flatten_handle , AnyhowJoinHandle } ;
use crate ::config ::BlockAndRpc ;
2022-11-08 22:58:11 +03:00
use crate ::frontend ::authorization ::Authorization ;
2022-06-16 05:53:37 +03:00
use anyhow ::Context ;
2022-11-07 00:05:03 +03:00
use ethers ::prelude ::{ Bytes , Middleware , ProviderError , TxHash , H256 , U64 } ;
2022-12-06 01:38:54 +03:00
use ethers ::types ::U256 ;
2022-06-16 05:53:37 +03:00
use futures ::future ::try_join_all ;
2022-05-05 22:07:09 +03:00
use futures ::StreamExt ;
2022-11-25 03:45:13 +03:00
use log ::{ debug , error , info , trace , warn , Level } ;
2022-11-14 21:24:52 +03:00
use migration ::sea_orm ::DatabaseConnection ;
2022-08-10 05:37:34 +03:00
use parking_lot ::RwLock ;
2022-09-15 20:57:24 +03:00
use redis_rate_limiter ::{ RedisPool , RedisRateLimitResult , RedisRateLimiter } ;
2022-05-21 01:16:15 +03:00
use serde ::ser ::{ SerializeStruct , Serializer } ;
use serde ::Serialize ;
2022-09-24 05:47:44 +03:00
use serde_json ::json ;
2022-09-14 04:43:09 +03:00
use std ::cmp ::min ;
2022-05-05 22:07:09 +03:00
use std ::fmt ;
2022-06-14 07:04:14 +03:00
use std ::hash ::{ Hash , Hasher } ;
2022-07-19 04:31:12 +03:00
use std ::sync ::atomic ::{ self , AtomicU32 , AtomicU64 } ;
2022-05-05 22:07:09 +03:00
use std ::{ cmp ::Ordering , sync ::Arc } ;
2022-11-12 09:11:58 +03:00
use thread_fast_rng ::rand ::Rng ;
use thread_fast_rng ::thread_fast_rng ;
2022-12-06 00:13:36 +03:00
use tokio ::sync ::{ broadcast , oneshot , RwLock as AsyncRwLock } ;
use tokio ::time ::{ interval , sleep , sleep_until , timeout , Duration , Instant , MissedTickBehavior } ;
2022-12-28 05:17:11 +03:00
// TODO: maybe provider state should have the block data limit in it. but it is inside an async lock and we can't Serialize then
2022-12-06 03:06:28 +03:00
#[ derive(Clone, Debug) ]
2022-12-06 00:13:36 +03:00
pub enum ProviderState {
None ,
NotReady ( Arc < Web3Provider > ) ,
Ready ( Arc < Web3Provider > ) ,
}
impl ProviderState {
pub async fn provider ( & self , allow_not_ready : bool ) -> Option < & Arc < Web3Provider > > {
match self {
ProviderState ::None = > None ,
ProviderState ::NotReady ( x ) = > {
if allow_not_ready {
Some ( x )
} else {
// TODO: do a ready check here?
None
}
}
ProviderState ::Ready ( x ) = > {
if x . ready ( ) {
Some ( x )
} else {
None
}
}
}
}
}
2022-06-14 08:43:28 +03:00
2022-09-15 20:57:24 +03:00
/// An active connection to a Web3 RPC server like geth or erigon.
2022-08-24 02:13:56 +03:00
pub struct Web3Connection {
2022-08-24 03:11:49 +03:00
pub name : String ,
2022-11-14 00:05:37 +03:00
pub display_name : Option < String > ,
2022-08-24 02:13:56 +03:00
/// TODO: can we get this from the provider? do we even need it?
2022-11-23 01:45:22 +03:00
pub ( super ) url : String ,
2022-09-15 20:57:24 +03:00
/// Some connections use an http_client. we keep a clone for reconnecting
2022-11-23 01:45:22 +03:00
pub ( super ) http_client : Option < reqwest ::Client > ,
2022-08-24 02:13:56 +03:00
/// keep track of currently open requests. We sort on this
2022-08-24 03:11:49 +03:00
pub ( super ) active_requests : AtomicU32 ,
2022-11-25 03:45:13 +03:00
/// keep track of total requests from the frontend
pub ( super ) frontend_requests : AtomicU64 ,
/// keep track of total requests from web3-proxy itself
pub ( super ) internal_requests : AtomicU64 ,
2022-08-24 02:13:56 +03:00
/// provider is in a RwLock so that we can replace it if re-connecting
/// it is an async lock because we hold it open across awaits
2022-12-06 00:13:36 +03:00
pub ( super ) provider_state : AsyncRwLock < ProviderState > ,
2022-08-24 02:13:56 +03:00
/// rate limits are stored in a central redis so that multiple proxies can share their rate limits
2022-09-15 20:57:24 +03:00
/// We do not use the deferred rate limiter because going over limits would cause errors
2022-11-23 01:45:22 +03:00
pub ( super ) hard_limit : Option < RedisRateLimiter > ,
2022-08-24 02:13:56 +03:00
/// used for load balancing to the least loaded server
2022-08-26 20:26:17 +03:00
pub ( super ) soft_limit : u32 ,
2022-12-06 00:13:36 +03:00
/// use web3 queries to find the block data limit for archive/pruned nodes
pub ( super ) automatic_block_limit : bool ,
2022-09-06 23:12:45 +03:00
/// TODO: have an enum for this so that "no limit" prints pretty?
2022-11-23 01:45:22 +03:00
pub ( super ) block_data_limit : AtomicU64 ,
2022-11-12 09:11:58 +03:00
/// Lower weight are higher priority when sending requests. 0 to 99.
pub ( super ) weight : f64 ,
2022-11-22 23:23:08 +03:00
/// TODO: should this be an AsyncRwLock?
2022-12-03 08:31:03 +03:00
pub ( super ) head_block : RwLock < Option < SavedBlock > > ,
2022-09-09 06:53:16 +03:00
pub ( super ) open_request_handle_metrics : Arc < OpenRequestHandleMetrics > ,
2022-08-24 02:13:56 +03:00
}
2022-05-05 22:07:09 +03:00
impl Web3Connection {
2022-06-14 07:04:14 +03:00
/// Connect to a web3 rpc
2022-07-19 07:21:32 +03:00
// TODO: have this take a builder (which will have channels attached)
2022-06-14 08:43:28 +03:00
#[ allow(clippy::too_many_arguments) ]
2022-06-14 07:04:14 +03:00
pub async fn spawn (
2022-08-10 08:56:09 +03:00
name : String ,
2022-11-14 00:05:37 +03:00
display_name : Option < String > ,
2022-07-19 07:21:32 +03:00
chain_id : u64 ,
2022-11-08 22:58:11 +03:00
db_conn : Option < DatabaseConnection > ,
2022-05-05 22:07:09 +03:00
url_str : String ,
2022-05-15 04:51:24 +03:00
// optional because this is only used for http providers. websocket providers don't use it
2022-07-19 07:21:32 +03:00
http_client : Option < reqwest ::Client > ,
2022-06-29 22:15:05 +03:00
http_interval_sender : Option < Arc < broadcast ::Sender < ( ) > > > ,
2022-08-16 01:50:56 +03:00
// TODO: have a builder struct for this.
hard_limit : Option < ( u64 , RedisPool ) > ,
2022-05-05 22:07:09 +03:00
// TODO: think more about this type
soft_limit : u32 ,
2022-11-25 03:45:13 +03:00
block_data_limit : Option < u64 > ,
2022-09-17 05:17:20 +03:00
block_map : BlockHashesCache ,
2022-07-22 08:11:26 +03:00
block_sender : Option < flume ::Sender < BlockAndRpc > > ,
2022-06-14 08:43:28 +03:00
tx_id_sender : Option < flume ::Sender < ( TxHash , Arc < Self > ) > > ,
reconnect : bool ,
2022-08-08 22:57:54 +03:00
weight : u32 ,
2022-09-09 06:53:16 +03:00
open_request_handle_metrics : Arc < OpenRequestHandleMetrics > ,
2022-06-14 08:43:28 +03:00
) -> anyhow ::Result < ( Arc < Web3Connection > , AnyhowJoinHandle < ( ) > ) > {
2022-09-15 20:57:24 +03:00
let hard_limit = hard_limit . map ( | ( hard_rate_limit , redis_pool ) | {
// TODO: is cache size 1 okay? i think we need
RedisRateLimiter ::new (
2022-08-06 08:46:33 +03:00
" web3_proxy " ,
2022-11-12 09:11:58 +03:00
& format! ( " {} : {} " , chain_id , name ) ,
2022-05-22 02:34:05 +03:00
hard_rate_limit ,
2022-08-30 23:01:42 +03:00
60.0 ,
2022-09-15 20:57:24 +03:00
redis_pool ,
2022-05-22 02:34:05 +03:00
)
} ) ;
2022-05-05 22:07:09 +03:00
2022-11-12 09:11:58 +03:00
// turn weight 0 into 100% and weight 100 into 0%
let weight = ( 100 - weight ) as f64 / 100.0 ;
2022-12-06 00:13:36 +03:00
// TODO: should we do this even if block_sender is None? then we would know limits on private relays
let block_data_limit : AtomicU64 = block_data_limit . unwrap_or_default ( ) . into ( ) ;
let automatic_block_limit =
( block_data_limit . load ( atomic ::Ordering ::Acquire ) = = 0 ) & & block_sender . is_some ( ) ;
2022-11-25 03:45:13 +03:00
2022-07-09 07:25:59 +03:00
let new_connection = Self {
2022-08-10 08:56:09 +03:00
name ,
2022-11-14 00:05:37 +03:00
display_name ,
2022-09-14 05:11:48 +03:00
http_client ,
2022-11-12 09:11:58 +03:00
url : url_str ,
2022-05-15 04:51:24 +03:00
active_requests : 0. into ( ) ,
2022-11-25 03:45:13 +03:00
frontend_requests : 0. into ( ) ,
internal_requests : 0. into ( ) ,
2022-12-06 00:13:36 +03:00
provider_state : AsyncRwLock ::new ( ProviderState ::None ) ,
2022-05-22 02:34:05 +03:00
hard_limit ,
2022-05-05 22:07:09 +03:00
soft_limit ,
2022-12-06 00:13:36 +03:00
automatic_block_limit ,
2022-11-25 03:45:13 +03:00
block_data_limit ,
2022-12-03 08:31:03 +03:00
head_block : RwLock ::new ( Default ::default ( ) ) ,
2022-08-08 22:57:54 +03:00
weight ,
2022-09-09 06:53:16 +03:00
open_request_handle_metrics ,
2022-05-12 21:49:57 +03:00
} ;
2022-07-09 07:25:59 +03:00
let new_connection = Arc ::new ( new_connection ) ;
2022-05-12 21:49:57 +03:00
2022-07-19 04:31:12 +03:00
// subscribe to new blocks and new transactions
2022-12-06 00:13:36 +03:00
// subscribing starts the connection (with retries)
2022-07-19 04:31:12 +03:00
// TODO: make transaction subscription optional (just pass None for tx_id_sender)
2022-06-14 08:43:28 +03:00
let handle = {
2022-07-09 07:25:59 +03:00
let new_connection = new_connection . clone ( ) ;
2022-12-06 00:13:36 +03:00
let authorization = Arc ::new ( Authorization ::internal ( db_conn ) ? ) ;
2022-06-14 08:43:28 +03:00
tokio ::spawn ( async move {
2022-07-09 07:25:59 +03:00
new_connection
2022-08-26 20:26:17 +03:00
. subscribe (
2022-11-08 22:58:11 +03:00
& authorization ,
2022-08-26 20:26:17 +03:00
block_map ,
block_sender ,
2022-12-06 00:13:36 +03:00
chain_id ,
http_interval_sender ,
2022-08-26 20:26:17 +03:00
reconnect ,
2022-12-06 00:13:36 +03:00
tx_id_sender ,
2022-08-26 20:26:17 +03:00
)
2022-06-14 08:43:28 +03:00
. await
} )
} ;
2022-08-27 05:13:36 +03:00
Ok ( ( new_connection , handle ) )
}
2022-07-19 04:31:12 +03:00
2022-12-28 05:17:11 +03:00
// TODO: would be great if rpcs exposed this. see https://github.com/ledgerwatch/erigon/issues/6391
2022-11-08 22:58:11 +03:00
async fn check_block_data_limit (
self : & Arc < Self > ,
authorization : & Arc < Authorization > ,
) -> anyhow ::Result < Option < u64 > > {
2022-12-06 00:13:36 +03:00
if ! self . automatic_block_limit {
2022-12-28 05:17:11 +03:00
// TODO: is this a good thing to return?
2022-12-06 00:13:36 +03:00
return Ok ( None ) ;
}
// check if we are synced
let head_block : ArcBlock = self
. wait_for_request_handle ( authorization , Duration ::from_secs ( 30 ) , true )
. await ?
2022-12-06 03:18:31 +03:00
. request ::< _ , Option < _ > > (
2022-12-06 00:13:36 +03:00
" eth_getBlockByNumber " ,
& json! ( ( " latest " , false ) ) ,
// error here are expected, so keep the level low
2022-12-21 09:01:35 +03:00
Level ::Warn . into ( ) ,
2022-12-06 00:13:36 +03:00
)
2022-12-06 03:18:31 +03:00
. await ?
2022-12-06 03:35:51 +03:00
. context ( " no block during check_block_data_limit! " ) ? ;
2022-12-06 00:13:36 +03:00
if SavedBlock ::from ( head_block ) . syncing ( ) {
// if the node is syncing, we can't check its block data limit
2022-12-28 05:17:11 +03:00
return Ok ( None ) ;
2022-12-06 00:13:36 +03:00
}
// TODO: add SavedBlock to self? probably best not to. we might not get marked Ready
2022-12-28 05:17:11 +03:00
let mut limit = None ;
2022-11-25 03:45:13 +03:00
// TODO: binary search between 90k and max?
2022-12-06 00:13:36 +03:00
// TODO: start at 0 or 1?
2022-11-25 03:45:13 +03:00
for block_data_limit in [ 0 , 32 , 64 , 128 , 256 , 512 , 1024 , 90_000 , u64 ::MAX ] {
2022-12-06 00:13:36 +03:00
let handle = self
. wait_for_request_handle ( authorization , Duration ::from_secs ( 30 ) , true )
. await ? ;
2022-07-19 04:31:12 +03:00
2022-12-06 00:13:36 +03:00
let head_block_num_future = handle . request ::< Option < ( ) > , U256 > (
" eth_blockNumber " ,
& None ,
// error here are expected, so keep the level low
Level ::Debug . into ( ) ,
) ;
2022-08-11 00:52:28 +03:00
2022-12-06 00:13:36 +03:00
let head_block_num = timeout ( Duration ::from_secs ( 5 ) , head_block_num_future )
. await
. context ( " timeout fetching eth_blockNumber " ) ?
. context ( " provider error " ) ? ;
2022-09-06 06:26:23 +03:00
2022-11-25 03:45:13 +03:00
let maybe_archive_block = head_block_num . saturating_sub ( ( block_data_limit ) . into ( ) ) ;
2022-08-27 05:13:36 +03:00
2022-12-06 00:13:36 +03:00
trace! (
" checking maybe_archive_block on {}: {} " ,
self ,
maybe_archive_block
) ;
2022-09-06 06:26:23 +03:00
// TODO: wait for the handle BEFORE we check the current block number. it might be delayed too!
2022-09-20 09:00:27 +03:00
// TODO: what should the request be?
2022-12-06 00:13:36 +03:00
let handle = self
. wait_for_request_handle ( authorization , Duration ::from_secs ( 30 ) , true )
. await ? ;
let archive_result : Result < Bytes , _ > = handle
2022-08-27 05:13:36 +03:00
. request (
" eth_getCode " ,
2022-09-24 05:47:44 +03:00
& json! ( (
2022-08-27 05:13:36 +03:00
" 0xdead00000000000000000000000000000000beef " ,
maybe_archive_block ,
2022-09-24 05:47:44 +03:00
) ) ,
2022-09-21 07:48:21 +03:00
// error here are expected, so keep the level low
2022-11-25 03:45:13 +03:00
Level ::Trace . into ( ) ,
2022-08-27 05:13:36 +03:00
)
. await ;
2022-11-25 03:45:13 +03:00
trace! (
2022-12-06 00:13:36 +03:00
" archive_result on {} for {} ({}): {:?} " ,
self ,
2022-11-25 03:45:13 +03:00
block_data_limit ,
2022-12-06 00:13:36 +03:00
maybe_archive_block ,
2022-11-25 03:45:13 +03:00
archive_result
) ;
2022-08-27 05:13:36 +03:00
2022-11-25 03:45:13 +03:00
if archive_result . is_err ( ) {
2022-08-27 05:13:36 +03:00
break ;
2022-07-19 04:31:12 +03:00
}
2022-11-25 03:45:13 +03:00
limit = Some ( block_data_limit ) ;
2022-07-19 04:31:12 +03:00
}
2022-08-27 05:13:36 +03:00
if let Some ( limit ) = limit {
self . block_data_limit
. store ( limit , atomic ::Ordering ::Release ) ;
}
2022-07-19 04:31:12 +03:00
2022-12-06 03:47:27 +03:00
info! ( " block data limit on {}: {:?} " , self , limit ) ;
2022-11-25 03:45:13 +03:00
2022-08-27 05:13:36 +03:00
Ok ( limit )
2022-07-09 07:25:59 +03:00
}
2022-11-04 01:16:27 +03:00
/// TODO: this might be too simple. different nodes can prune differently. its possible we will have a block range
2022-07-25 03:27:00 +03:00
pub fn block_data_limit ( & self ) -> U64 {
2022-11-04 01:16:27 +03:00
self . block_data_limit . load ( atomic ::Ordering ::Relaxed ) . into ( )
2022-07-19 04:31:12 +03:00
}
2023-01-02 21:34:16 +03:00
pub fn syncing ( & self ) -> bool {
match self . head_block . read ( ) . clone ( ) {
None = > true ,
Some ( x ) = > x . syncing ( ) ,
}
}
2022-07-22 22:30:39 +03:00
pub fn has_block_data ( & self , needed_block_num : & U64 ) -> bool {
2022-12-03 08:31:03 +03:00
let head_block_num = match self . head_block . read ( ) . clone ( ) {
2022-09-06 06:26:23 +03:00
None = > return false ,
2022-12-03 08:35:29 +03:00
Some ( x ) = > {
if x . syncing ( ) {
// skip syncing nodes. even though they might be able to serve a query,
2022-12-28 05:17:11 +03:00
// latency will be poor and it will get in the way of them syncing further
2022-12-03 08:35:29 +03:00
return false ;
}
x . number ( )
}
2022-09-06 06:26:23 +03:00
} ;
2022-07-19 04:31:12 +03:00
2022-11-20 01:05:51 +03:00
// this rpc doesn't have that block yet. still syncing
2022-11-22 23:23:08 +03:00
if needed_block_num > & head_block_num {
2022-11-04 01:16:27 +03:00
return false ;
}
// if this is a pruning node, we might not actually have the block
let block_data_limit : U64 = self . block_data_limit ( ) ;
2022-11-22 23:23:08 +03:00
let oldest_block_num = head_block_num . saturating_sub ( block_data_limit ) ;
2022-07-19 04:31:12 +03:00
2022-12-08 09:54:38 +03:00
* needed_block_num > = oldest_block_num
2022-05-05 22:07:09 +03:00
}
2022-09-14 04:43:09 +03:00
/// reconnect to the provider. errors are retried forever with exponential backoff with jitter.
2022-10-25 07:12:24 +03:00
/// We use the "Decorrelated" jitter from <https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/>
2022-09-14 04:43:09 +03:00
/// TODO: maybe it would be better to use "Full Jitter". The "Full Jitter" approach uses less work, but slightly more time.
2022-12-06 00:13:36 +03:00
pub async fn retrying_connect (
2022-09-14 04:43:09 +03:00
self : & Arc < Self > ,
block_sender : Option < & flume ::Sender < BlockAndRpc > > ,
2022-12-06 00:13:36 +03:00
chain_id : u64 ,
db_conn : Option < & DatabaseConnection > ,
delay_start : bool ,
2022-09-14 04:43:09 +03:00
) -> anyhow ::Result < ( ) > {
// there are several crates that have retry helpers, but they all seem more complex than necessary
2022-11-14 21:24:52 +03:00
// TODO: move this backoff logic into a helper function so we can use it when doing database locking
2022-09-14 04:43:09 +03:00
let base_ms = 500 ;
let cap_ms = 30_000 ;
let range_multiplier = 3 ;
// sleep once before the initial retry attempt
2022-09-14 05:11:48 +03:00
// TODO: now that we use this method for our initial connection, do we still want this sleep?
2022-12-06 00:13:36 +03:00
let mut sleep_ms = if delay_start {
2022-09-14 05:11:48 +03:00
let first_sleep_ms = min (
cap_ms ,
2022-11-12 09:11:58 +03:00
thread_fast_rng ( ) . gen_range ( base_ms .. ( base_ms * range_multiplier ) ) ,
2022-09-14 05:11:48 +03:00
) ;
2022-09-14 09:38:53 +03:00
let reconnect_in = Duration ::from_millis ( first_sleep_ms ) ;
2022-11-12 11:24:32 +03:00
warn! ( " Reconnect to {} in {}ms " , self , reconnect_in . as_millis ( ) ) ;
2022-09-14 09:38:53 +03:00
sleep ( reconnect_in ) . await ;
2022-09-14 05:11:48 +03:00
first_sleep_ms
} else {
base_ms
} ;
2022-09-14 04:43:09 +03:00
// retry until we succeed
2022-12-06 00:13:36 +03:00
while let Err ( err ) = self . connect ( block_sender , chain_id , db_conn ) . await {
// thread_rng is crytographically secure. we don't need that here
2022-09-14 04:43:09 +03:00
sleep_ms = min (
cap_ms ,
2022-11-12 09:11:58 +03:00
thread_fast_rng ( ) . gen_range ( base_ms .. ( sleep_ms * range_multiplier ) ) ,
2022-09-14 04:43:09 +03:00
) ;
let retry_in = Duration ::from_millis ( sleep_ms ) ;
2022-11-12 11:24:32 +03:00
warn! (
" Failed reconnect to {}! Retry in {}ms. err={:?} " ,
self ,
retry_in . as_millis ( ) ,
err ,
) ;
2022-09-14 04:43:09 +03:00
sleep ( retry_in ) . await ;
}
Ok ( ( ) )
}
2022-12-06 00:13:36 +03:00
/// connect to the web3 provider
async fn connect (
2022-06-14 07:04:14 +03:00
self : & Arc < Self > ,
2022-09-14 04:43:09 +03:00
block_sender : Option < & flume ::Sender < BlockAndRpc > > ,
2022-12-06 00:13:36 +03:00
chain_id : u64 ,
db_conn : Option < & DatabaseConnection > ,
2022-06-14 07:04:14 +03:00
) -> anyhow ::Result < ( ) > {
2022-12-06 03:06:28 +03:00
// trace!("provider_state {} locking...", self);
let mut provider_state = self
. provider_state
. try_write ( )
. context ( " locking provider for write " ) ? ;
// trace!("provider_state {} locked: {:?}", self, provider_state);
2022-09-14 09:38:53 +03:00
2022-12-06 00:13:36 +03:00
match & * provider_state {
ProviderState ::None = > {
info! ( " connecting to {} " , self ) ;
}
ProviderState ::NotReady ( provider ) | ProviderState ::Ready ( provider ) = > {
// disconnect the current provider
if let Web3Provider ::Mock = provider . as_ref ( ) {
2022-09-14 09:38:53 +03:00
return Ok ( ( ) ) ;
}
2022-09-13 02:00:10 +03:00
2022-12-06 03:06:28 +03:00
debug! ( " reconnecting to {} " , self ) ;
2022-06-14 07:04:14 +03:00
2022-12-06 00:13:36 +03:00
// disconnect the current provider
* provider_state = ProviderState ::None ;
2022-09-14 07:27:18 +03:00
2022-12-06 00:13:36 +03:00
// reset sync status
2022-12-06 03:06:28 +03:00
// trace!("locking head block on {}", self);
2022-12-06 00:13:36 +03:00
{
let mut head_block = self . head_block . write ( ) ;
* head_block = None ;
}
2022-12-06 03:06:28 +03:00
// trace!("done with head block on {}", self);
2022-12-06 00:13:36 +03:00
// tell the block subscriber that we don't have any blocks
if let Some ( block_sender ) = block_sender {
block_sender
. send_async ( ( None , self . clone ( ) ) )
. await
. context ( " block_sender during connect " ) ? ;
}
2022-09-14 07:27:18 +03:00
}
2022-06-16 05:53:37 +03:00
}
2022-06-14 07:04:14 +03:00
2022-12-06 03:06:28 +03:00
// trace!("Creating new Web3Provider on {}", self);
2022-09-14 04:43:09 +03:00
// TODO: if this fails, keep retrying! otherwise it crashes and doesn't try again!
2022-09-14 05:11:48 +03:00
let new_provider = Web3Provider ::from_str ( & self . url , self . http_client . clone ( ) ) . await ? ;
2022-09-14 04:43:09 +03:00
2022-12-06 03:06:28 +03:00
// trace!("saving provider state as NotReady on {}", self);
2022-12-06 00:13:36 +03:00
* provider_state = ProviderState ::NotReady ( Arc ::new ( new_provider ) ) ;
// drop the lock so that we can get a request handle
2022-12-06 03:06:28 +03:00
// trace!("provider_state {} unlocked", self);
2022-12-06 00:13:36 +03:00
drop ( provider_state ) ;
let authorization = Arc ::new ( Authorization ::internal ( db_conn . cloned ( ) ) ? ) ;
// check the server's chain_id here
// TODO: some public rpcs (on bsc and fantom) do not return an id and so this ends up being an error
// TODO: what should the timeout be? should there be a request timeout?
2022-12-06 03:06:28 +03:00
// trace!("waiting on chain id for {}", self);
2022-12-06 00:13:36 +03:00
let found_chain_id : Result < U64 , _ > = self
. wait_for_request_handle ( & authorization , Duration ::from_secs ( 30 ) , true )
. await ?
. request (
" eth_chainId " ,
& json! ( Option ::None ::< ( ) > ) ,
Level ::Trace . into ( ) ,
)
. await ;
2022-12-06 03:06:28 +03:00
// trace!("found_chain_id: {:?}", found_chain_id);
2022-12-06 00:13:36 +03:00
match found_chain_id {
Ok ( found_chain_id ) = > {
// TODO: there has to be a cleaner way to do this
if chain_id ! = found_chain_id . as_u64 ( ) {
return Err ( anyhow ::anyhow! (
" incorrect chain id! Config has {}, but RPC has {} " ,
chain_id ,
found_chain_id
)
. context ( format! ( " failed @ {} " , self ) ) ) ;
}
}
Err ( e ) = > {
return Err ( anyhow ::Error ::from ( e ) ) ;
}
}
self . check_block_data_limit ( & authorization ) . await ? ;
{
2022-12-06 03:06:28 +03:00
// trace!("locking for ready...");
2022-12-06 00:13:36 +03:00
let mut provider_state = self . provider_state . write ( ) . await ;
2022-12-06 03:06:28 +03:00
// trace!("locked for ready...");
2022-12-06 00:13:36 +03:00
// TODO: do this without a clone
let ready_provider = provider_state
. provider ( true )
. await
. context ( " provider missing " ) ?
. clone ( ) ;
* provider_state = ProviderState ::Ready ( ready_provider ) ;
2022-12-06 03:06:28 +03:00
// trace!("unlocked for ready...");
2022-12-06 00:13:36 +03:00
}
2022-09-14 04:43:09 +03:00
2022-11-12 11:24:32 +03:00
info! ( " successfully connected to {} " , self ) ;
2022-09-14 04:43:09 +03:00
2022-06-14 07:04:14 +03:00
Ok ( ( ) )
}
2022-05-06 09:07:01 +03:00
#[ inline ]
2022-05-05 22:07:09 +03:00
pub fn active_requests ( & self ) -> u32 {
self . active_requests . load ( atomic ::Ordering ::Acquire )
}
2022-08-26 20:26:17 +03:00
async fn send_head_block_result (
self : & Arc < Self > ,
2022-09-06 15:29:37 +03:00
new_head_block : Result < Option < ArcBlock > , ProviderError > ,
2022-07-22 08:11:26 +03:00
block_sender : & flume ::Sender < BlockAndRpc > ,
2022-09-17 05:17:20 +03:00
block_map : BlockHashesCache ,
2022-05-30 07:30:13 +03:00
) -> anyhow ::Result < ( ) > {
2022-12-03 08:31:03 +03:00
let new_head_block = match new_head_block {
2022-09-06 15:29:37 +03:00
Ok ( None ) = > {
2022-11-06 23:52:11 +03:00
{
2022-12-06 01:38:54 +03:00
let mut head_block = self . head_block . write ( ) ;
2022-12-03 08:31:03 +03:00
2022-12-06 01:38:54 +03:00
if head_block . is_none ( ) {
2022-12-03 08:31:03 +03:00
// we previously sent a None. return early
return Ok ( ( ) ) ;
}
warn! ( " {} is not synced! " , self ) ;
2022-11-06 23:52:11 +03:00
2022-12-06 01:38:54 +03:00
* head_block = None ;
2022-11-06 23:52:11 +03:00
}
2022-12-03 08:31:03 +03:00
None
2022-09-06 15:29:37 +03:00
}
2022-11-06 23:52:11 +03:00
Ok ( Some ( new_head_block ) ) = > {
2022-12-03 08:31:03 +03:00
let new_hash = new_head_block
. hash
. context ( " sending block to connections " ) ? ;
2022-08-26 20:26:17 +03:00
2022-09-14 22:39:08 +03:00
// if we already have this block saved, set new_head_block to that arc. otherwise store this copy
2022-11-06 23:52:11 +03:00
let new_head_block = block_map
2022-11-03 02:14:16 +03:00
. get_with ( new_hash , async move { new_head_block } )
2022-09-14 22:39:08 +03:00
. await ;
2022-08-30 23:01:42 +03:00
2022-08-26 20:26:17 +03:00
// save the block so we don't send the same one multiple times
// also save so that archive checks can know how far back to query
{
2022-12-03 08:31:03 +03:00
let mut head_block = self . head_block . write ( ) ;
2022-09-06 06:26:23 +03:00
2022-12-03 08:31:03 +03:00
let _ = head_block . insert ( new_head_block . clone ( ) . into ( ) ) ;
2022-07-19 04:31:12 +03:00
}
2022-12-03 08:31:03 +03:00
Some ( new_head_block )
2022-05-15 09:27:13 +03:00
}
2022-11-06 23:52:11 +03:00
Err ( err ) = > {
2022-11-12 11:24:32 +03:00
warn! ( " unable to get block from {}. err={:?} " , self , err ) ;
2022-11-06 23:52:11 +03:00
{
2022-12-06 01:38:54 +03:00
let mut head_block = self . head_block . write ( ) ;
2022-11-06 23:52:11 +03:00
2022-12-06 01:38:54 +03:00
* head_block = None ;
2022-11-06 23:52:11 +03:00
}
2022-08-07 09:48:57 +03:00
2022-12-03 08:31:03 +03:00
None
2022-05-15 09:27:13 +03:00
}
2022-12-03 08:31:03 +03:00
} ;
// send an empty block to take this server out of rotation
block_sender
. send_async ( ( new_head_block , self . clone ( ) ) )
. await
. context ( " block_sender " ) ? ;
2022-05-30 07:30:13 +03:00
Ok ( ( ) )
2022-05-15 09:27:13 +03:00
}
2022-09-14 04:43:09 +03:00
/// subscribe to blocks and transactions with automatic reconnects
2022-12-06 00:13:36 +03:00
/// This should only exit when the program is exiting.
/// TODO: should more of these args be on self?
#[ allow(clippy::too_many_arguments) ]
2022-06-14 08:43:28 +03:00
async fn subscribe (
2022-06-14 07:04:14 +03:00
self : Arc < Self > ,
2022-11-08 22:58:11 +03:00
authorization : & Arc < Authorization > ,
2022-09-17 05:17:20 +03:00
block_map : BlockHashesCache ,
2022-07-22 08:11:26 +03:00
block_sender : Option < flume ::Sender < BlockAndRpc > > ,
2022-12-06 00:13:36 +03:00
chain_id : u64 ,
http_interval_sender : Option < Arc < broadcast ::Sender < ( ) > > > ,
2022-06-14 07:04:14 +03:00
reconnect : bool ,
2022-12-06 00:13:36 +03:00
tx_id_sender : Option < flume ::Sender < ( TxHash , Arc < Self > ) > > ,
2022-06-14 07:04:14 +03:00
) -> anyhow ::Result < ( ) > {
2022-06-16 05:53:37 +03:00
loop {
2022-07-19 07:24:16 +03:00
let http_interval_receiver = http_interval_sender . as_ref ( ) . map ( | x | x . subscribe ( ) ) ;
2022-06-29 22:15:05 +03:00
2022-06-16 05:53:37 +03:00
let mut futures = vec! [ ] ;
2022-12-06 00:13:36 +03:00
{
// health check
// TODO: move this into a proper function
let authorization = authorization . clone ( ) ;
let block_sender = block_sender . clone ( ) ;
let conn = self . clone ( ) ;
let ( ready_tx , ready_rx ) = oneshot ::channel ( ) ;
let f = async move {
// initial sleep to allow for the initial connection
conn . retrying_connect (
block_sender . as_ref ( ) ,
chain_id ,
authorization . db_conn . as_ref ( ) ,
false ,
)
. await ? ;
// provider is ready
ready_tx . send ( ( ) ) . unwrap ( ) ;
// wait before doing the initial health check
// TODO: how often?
2022-12-06 03:06:28 +03:00
// TODO: subscribe to self.head_block
2022-12-06 00:13:36 +03:00
let health_sleep_seconds = 10 ;
sleep ( Duration ::from_secs ( health_sleep_seconds ) ) . await ;
2022-12-06 03:06:28 +03:00
let mut warned = 0 ;
2022-12-06 00:13:36 +03:00
loop {
// TODO: what if we just happened to have this check line up with another restart?
// TODO: think more about this
2022-12-06 03:06:28 +03:00
// trace!("health check on {}. locking...", conn);
2022-12-06 00:13:36 +03:00
if conn
. provider_state
. read ( )
. await
. provider ( false )
. await
. is_none ( )
{
2022-12-06 03:06:28 +03:00
// trace!("health check unlocked with error on {}", conn);
2022-12-06 00:13:36 +03:00
// returning error will trigger a reconnect
return Err ( anyhow ::anyhow! ( " {} is not ready " , conn ) ) ;
}
2022-12-06 03:06:28 +03:00
// trace!("health check on {}. unlocked", conn);
2022-12-06 00:13:36 +03:00
if let Some ( x ) = & * conn . head_block . read ( ) {
// if this block is too old, return an error so we reconnect
2022-12-06 03:06:28 +03:00
let current_lag = x . lag ( ) ;
if current_lag > 0 {
let level = if warned = = 0 {
log ::Level ::Warn
} else if current_lag % 1000 = = 0 {
log ::Level ::Debug
} else {
log ::Level ::Trace
} ;
log ::log! (
level ,
" {} is lagged {} secs: {} {} " ,
conn ,
current_lag ,
x . number ( ) ,
x . hash ( ) ,
) ;
warned + = 1 ;
} else {
// reset warnings now that we are connected
warned = 0 ;
2022-12-06 00:13:36 +03:00
}
}
sleep ( Duration ::from_secs ( health_sleep_seconds ) ) . await ;
}
} ;
futures . push ( flatten_handle ( tokio ::spawn ( f ) ) ) ;
// wait on the initial connection
ready_rx . await ? ;
}
2022-06-16 05:53:37 +03:00
if let Some ( block_sender ) = & block_sender {
2022-08-26 20:26:17 +03:00
let f = self . clone ( ) . subscribe_new_heads (
2022-11-08 22:58:11 +03:00
authorization . clone ( ) ,
2022-08-26 20:26:17 +03:00
http_interval_receiver ,
block_sender . clone ( ) ,
block_map . clone ( ) ,
) ;
2022-06-16 05:53:37 +03:00
futures . push ( flatten_handle ( tokio ::spawn ( f ) ) ) ;
}
if let Some ( tx_id_sender ) = & tx_id_sender {
let f = self
. clone ( )
2022-11-08 22:58:11 +03:00
. subscribe_pending_transactions ( authorization . clone ( ) , tx_id_sender . clone ( ) ) ;
2022-06-16 05:53:37 +03:00
futures . push ( flatten_handle ( tokio ::spawn ( f ) ) ) ;
}
match try_join_all ( futures ) . await {
2022-09-14 04:43:09 +03:00
Ok ( _ ) = > {
// futures all exited without error. break instead of restarting subscriptions
break ;
}
2022-06-16 05:53:37 +03:00
Err ( err ) = > {
if reconnect {
2022-12-06 03:06:28 +03:00
warn! ( " {} connection ended. err={:?} " , self , err ) ;
2022-12-06 00:13:36 +03:00
self . clone ( )
. retrying_connect (
block_sender . as_ref ( ) ,
chain_id ,
authorization . db_conn . as_ref ( ) ,
true ,
)
. await ? ;
2022-06-16 05:53:37 +03:00
} else {
2022-11-12 11:24:32 +03:00
error! ( " {} subscription exited. err={:?} " , self , err ) ;
2022-06-16 05:53:37 +03:00
return Err ( err ) ;
}
2022-06-14 08:43:28 +03:00
}
2022-06-14 07:04:14 +03:00
}
}
2022-11-12 11:24:32 +03:00
info! ( " all subscriptions on {} completed " , self ) ;
2022-09-06 16:14:15 +03:00
2022-06-14 07:04:14 +03:00
Ok ( ( ) )
}
2022-05-17 05:26:47 +03:00
/// Subscribe to new blocks. If `reconnect` is true, this runs forever.
2022-06-14 07:04:14 +03:00
async fn subscribe_new_heads (
2022-05-05 22:07:09 +03:00
self : Arc < Self > ,
2022-11-08 22:58:11 +03:00
authorization : Arc < Authorization > ,
2022-06-29 22:15:05 +03:00
http_interval_receiver : Option < broadcast ::Receiver < ( ) > > ,
2022-07-22 08:11:26 +03:00
block_sender : flume ::Sender < BlockAndRpc > ,
2022-09-17 05:17:20 +03:00
block_map : BlockHashesCache ,
2022-05-05 22:07:09 +03:00
) -> anyhow ::Result < ( ) > {
2022-12-06 00:13:36 +03:00
trace! ( " watching new heads on {} " , self ) ;
2022-06-14 07:04:14 +03:00
2022-12-06 03:06:28 +03:00
// trace!("locking on new heads");
let provider_state = self
. provider_state
. try_read ( )
. context ( " subscribe_new_heads " ) ?
. clone ( ) ;
// trace!("unlocked on new heads");
// TODO: need a timeout
if let ProviderState ::Ready ( provider ) = provider_state {
2022-12-06 00:13:36 +03:00
match provider . as_ref ( ) {
2022-11-23 01:45:22 +03:00
Web3Provider ::Mock = > unimplemented! ( ) ,
2022-06-14 07:04:14 +03:00
Web3Provider ::Http ( _provider ) = > {
// there is a "watch_blocks" function, but a lot of public nodes do not support the necessary rpc endpoints
2022-06-29 22:15:05 +03:00
// TODO: try watch_blocks and fall back to this?
let mut http_interval_receiver = http_interval_receiver . unwrap ( ) ;
2022-06-14 07:04:14 +03:00
2022-07-19 04:31:12 +03:00
let mut last_hash = H256 ::zero ( ) ;
2022-06-14 07:04:14 +03:00
2023-01-03 17:08:40 +03:00
// TODO: default to true?
let mut was_syncing = self . syncing ( ) ;
2022-06-14 07:04:14 +03:00
loop {
2022-09-20 09:00:27 +03:00
// TODO: what should the max_wait be?
2022-09-23 00:03:37 +03:00
match self
2022-12-06 00:13:36 +03:00
. wait_for_request_handle ( & authorization , Duration ::from_secs ( 30 ) , false )
2022-09-23 00:03:37 +03:00
. await
{
2022-08-26 20:26:17 +03:00
Ok ( active_request_handle ) = > {
2022-11-06 23:52:11 +03:00
let block : Result < Option < ArcBlock > , _ > = active_request_handle
2022-09-21 07:48:21 +03:00
. request (
" eth_getBlockByNumber " ,
2022-09-24 05:47:44 +03:00
& json! ( ( " latest " , false ) ) ,
2022-12-21 09:01:35 +03:00
Level ::Warn . into ( ) ,
2022-09-21 07:48:21 +03:00
)
2022-06-14 07:04:14 +03:00
. await ;
2022-08-26 20:26:17 +03:00
match block {
2022-11-06 23:52:11 +03:00
Ok ( None ) = > {
warn! ( " no head block on {} " , self ) ;
2023-01-03 17:08:40 +03:00
was_syncing = true ;
2022-11-06 23:52:11 +03:00
self . send_head_block_result (
Ok ( None ) ,
& block_sender ,
block_map . clone ( ) ,
)
. await ? ;
}
Ok ( Some ( block ) ) = > {
2022-08-26 20:26:17 +03:00
// don't send repeat blocks
let new_hash = block
. hash
. expect ( " blocks here should always have hashes " ) ;
if new_hash ! = last_hash {
// new hash!
last_hash = new_hash ;
self . send_head_block_result (
2022-11-06 23:52:11 +03:00
Ok ( Some ( block ) ) ,
2022-08-26 20:26:17 +03:00
& block_sender ,
block_map . clone ( ) ,
)
2022-08-07 09:48:57 +03:00
. await ? ;
2023-01-03 17:08:40 +03:00
if was_syncing {
was_syncing = self . syncing ( ) ;
if ! was_syncing {
// we were syncing, but we aren't anymore
if let Err ( err ) = self
. check_block_data_limit ( & authorization )
. await
{
warn! ( " unable to check block data limit after syncing ended. {:?} " , err ) ;
}
}
} else {
// TODO: it wasn't syncing, what if it is now?
was_syncing = true ;
}
2022-08-26 20:26:17 +03:00
}
}
Err ( err ) = > {
// we did not get a block back. something is up with the server. take it out of rotation
self . send_head_block_result (
Err ( err ) ,
& block_sender ,
block_map . clone ( ) ,
)
. await ? ;
2023-01-03 17:08:40 +03:00
was_syncing = true ;
2022-07-19 07:31:30 +03:00
}
2022-05-16 22:15:40 +03:00
}
2022-06-14 07:04:14 +03:00
}
2022-07-09 01:14:45 +03:00
Err ( err ) = > {
2022-11-12 11:24:32 +03:00
warn! ( " Internal error on latest block from {}. {:?} " , self , err ) ;
2022-11-06 23:52:11 +03:00
self . send_head_block_result (
Ok ( None ) ,
& block_sender ,
block_map . clone ( ) ,
)
. await ? ;
2023-01-03 17:08:40 +03:00
was_syncing = true ;
2022-08-11 00:29:50 +03:00
// TODO: what should we do? sleep? extra time?
2022-05-16 22:15:40 +03:00
}
}
2022-07-19 04:31:12 +03:00
2022-08-26 20:26:17 +03:00
// wait for the next interval
2022-07-19 04:31:12 +03:00
// TODO: if error or rate limit, increase interval?
while let Err ( err ) = http_interval_receiver . recv ( ) . await {
match err {
broadcast ::error ::RecvError ::Closed = > {
2022-08-26 20:26:17 +03:00
// channel is closed! that's not good. bubble the error up
2022-07-19 04:31:12 +03:00
return Err ( err . into ( ) ) ;
}
broadcast ::error ::RecvError ::Lagged ( lagged ) = > {
2022-08-26 20:26:17 +03:00
// querying the block was delayed
// this can happen if tokio is very busy or waiting for requests limits took too long
2022-11-12 11:24:32 +03:00
warn! ( " http interval on {} lagging by {}! " , self , lagged ) ;
2022-07-19 04:31:12 +03:00
}
}
}
2022-05-15 22:28:22 +03:00
}
2022-06-14 07:04:14 +03:00
}
Web3Provider ::Ws ( provider ) = > {
2022-08-30 23:01:42 +03:00
// todo: move subscribe_blocks onto the request handle?
2022-09-23 00:03:37 +03:00
let active_request_handle = self
2022-12-06 00:13:36 +03:00
. wait_for_request_handle ( & authorization , Duration ::from_secs ( 30 ) , false )
2022-09-23 00:03:37 +03:00
. await ;
2022-06-14 07:04:14 +03:00
let mut stream = provider . subscribe_blocks ( ) . await ? ;
drop ( active_request_handle ) ;
// query the block once since the subscription doesn't send the current block
// there is a very small race condition here where the stream could send us a new block right now
// all it does is print "new block" for the same block as current block
2022-11-06 23:52:11 +03:00
// TODO: how does this get wrapped in an arc? does ethers handle that?
2022-09-06 15:29:37 +03:00
let block : Result < Option < ArcBlock > , _ > = self
2022-12-06 00:13:36 +03:00
. wait_for_request_handle ( & authorization , Duration ::from_secs ( 30 ) , false )
2022-08-06 08:26:43 +03:00
. await ?
2022-09-21 07:48:21 +03:00
. request (
" eth_getBlockByNumber " ,
2022-09-24 05:47:44 +03:00
& json! ( ( " latest " , false ) ) ,
2022-12-21 09:01:35 +03:00
Level ::Warn . into ( ) ,
2022-09-21 07:48:21 +03:00
)
2022-11-06 23:52:11 +03:00
. await ;
2022-06-14 07:04:14 +03:00
2022-09-06 19:49:07 +03:00
let mut last_hash = match & block {
Ok ( Some ( new_block ) ) = > new_block
. hash
. expect ( " blocks should always have a hash here " ) ,
_ = > H256 ::zero ( ) ,
} ;
2022-08-26 20:26:17 +03:00
self . send_head_block_result ( block , & block_sender , block_map . clone ( ) )
. await ? ;
2022-06-14 07:04:14 +03:00
2023-01-03 17:08:40 +03:00
let mut was_syncing = self . syncing ( ) ;
2023-01-03 08:01:04 +03:00
2023-01-03 17:08:40 +03:00
loop {
// TODO: timeout should be based on block time
// this timeout is here because
match timeout ( Duration ::from_secs ( 60 ) , stream . next ( ) ) . await {
Ok ( Some ( new_block ) ) = > {
// TODO: check the new block's hash to be sure we don't send dupes
let new_hash = new_block
. hash
. expect ( " blocks should always have a hash here " ) ;
if new_hash = = last_hash {
// some rpcs like to give us duplicates. don't waste our time on them
continue ;
} else {
last_hash = new_hash ;
}
2023-01-03 16:26:21 +03:00
2023-01-03 17:08:40 +03:00
self . send_head_block_result (
Ok ( Some ( Arc ::new ( new_block ) ) ) ,
& block_sender ,
block_map . clone ( ) ,
)
. await ? ;
if was_syncing {
was_syncing = self . syncing ( ) ;
if ! was_syncing {
// we were syncing, but we aren't anymore
if let Err ( err ) =
self . check_block_data_limit ( & authorization ) . await
{
warn! ( " unable to check block data limit after syncing ended. {:?} " , err ) ;
break ;
}
}
} else {
// TODO: it wasn't syncing, what if it is now?
}
}
Ok ( None ) = > {
warn! ( " new_heads subscription to {} ended " , self ) ;
break ;
}
Err ( err ) = > {
warn! ( " {} timed out waiting for block! {:?} " , self , err ) ;
break ;
}
}
2022-05-17 05:26:47 +03:00
}
2022-07-09 01:14:45 +03:00
2022-11-06 23:52:11 +03:00
// clear the head block. this might not be needed, but it won't hurt
self . send_head_block_result ( Ok ( None ) , & block_sender , block_map )
. await ? ;
2022-09-06 19:49:07 +03:00
// TODO: is this always an error?
// TODO: we probably don't want a warn and to return error
2022-12-06 00:13:36 +03:00
Err ( anyhow ::anyhow! ( " new_heads subscription ended " ) )
2022-05-05 22:07:09 +03:00
}
2022-05-17 05:26:47 +03:00
}
2022-12-06 00:13:36 +03:00
} else {
Err ( anyhow ::anyhow! (
" Provider not ready! Unable to subscribe to heads "
) )
2022-06-14 07:04:14 +03:00
}
}
2022-05-17 05:26:47 +03:00
2022-06-14 07:04:14 +03:00
async fn subscribe_pending_transactions (
self : Arc < Self > ,
2022-11-08 22:58:11 +03:00
authorization : Arc < Authorization > ,
2022-06-14 07:04:14 +03:00
tx_id_sender : flume ::Sender < ( TxHash , Arc < Self > ) > ,
) -> anyhow ::Result < ( ) > {
2022-12-06 03:06:28 +03:00
if let ProviderState ::Ready ( provider ) = self
. provider_state
. try_read ( )
. context ( " subscribe_pending_transactions " ) ?
. clone ( )
{
2022-12-06 00:13:36 +03:00
trace! ( " watching pending transactions on {} " , self ) ;
match provider . as_ref ( ) {
2022-11-23 01:45:22 +03:00
Web3Provider ::Mock = > unimplemented! ( ) ,
2022-06-18 10:06:54 +03:00
Web3Provider ::Http ( provider ) = > {
2022-06-14 07:04:14 +03:00
// there is a "watch_pending_transactions" function, but a lot of public nodes do not support the necessary rpc endpoints
// TODO: what should this interval be? probably automatically set to some fraction of block time
// TODO: maybe it would be better to have one interval for all of the http providers, but this works for now
// TODO: if there are some websocket providers, maybe have a longer interval and a channel that tells the https to update when a websocket gets a new head? if they are slow this wouldn't work well though
2022-06-14 08:43:28 +03:00
let mut interval = interval ( Duration ::from_secs ( 60 ) ) ;
2022-06-14 07:04:14 +03:00
interval . set_missed_tick_behavior ( MissedTickBehavior ::Delay ) ;
loop {
2022-06-14 08:43:28 +03:00
// TODO: actually do something here
/*
2022-06-14 07:04:14 +03:00
match self . try_request_handle ( ) . await {
Ok ( active_request_handle ) = > {
// TODO: check the filter
2022-07-22 22:30:39 +03:00
todo! ( " actually send a request " ) ;
2022-06-14 07:04:14 +03:00
}
Err ( e ) = > {
warn! ( " Failed getting latest block from {}: {:?} " , self , e ) ;
}
}
2022-06-14 08:43:28 +03:00
* /
2022-07-07 06:22:09 +03:00
// wait for the interval
// TODO: if error or rate limit, increase interval?
interval . tick ( ) . await ;
2022-06-14 07:04:14 +03:00
}
}
Web3Provider ::Ws ( provider ) = > {
2022-07-09 02:02:32 +03:00
// TODO: maybe the subscribe_pending_txs function should be on the active_request_handle
2022-09-23 00:03:37 +03:00
let active_request_handle = self
2022-12-06 00:13:36 +03:00
. wait_for_request_handle ( & authorization , Duration ::from_secs ( 30 ) , false )
2022-09-23 00:03:37 +03:00
. await ;
2022-06-14 07:04:14 +03:00
let mut stream = provider . subscribe_pending_txs ( ) . await ? ;
drop ( active_request_handle ) ;
2022-07-08 21:27:06 +03:00
while let Some ( pending_tx_id ) = stream . next ( ) . await {
tx_id_sender
. send_async ( ( pending_tx_id , self . clone ( ) ) )
. await
. context ( " tx_id_sender " ) ? ;
2022-08-11 00:29:50 +03:00
// TODO: periodically check for listeners. if no one is subscribed, unsubscribe and wait for a subscription
2022-06-14 07:04:14 +03:00
}
2022-07-08 21:27:06 +03:00
2022-09-06 19:49:07 +03:00
// TODO: is this always an error?
// TODO: we probably don't want a warn and to return error
2022-11-14 00:05:37 +03:00
warn! ( " pending_transactions subscription ended on {} " , self ) ;
2022-09-06 19:49:07 +03:00
return Err ( anyhow ::anyhow! ( " pending_transactions subscription ended " ) ) ;
2022-06-14 07:04:14 +03:00
}
2022-05-05 22:07:09 +03:00
}
2022-12-06 00:13:36 +03:00
} else {
warn! (
" Provider not ready! Unable to watch pending transactions on {} " ,
self
) ;
2022-05-05 22:07:09 +03:00
}
Ok ( ( ) )
}
2022-08-30 23:01:42 +03:00
/// be careful with this; it might wait forever!
2022-12-06 00:13:36 +03:00
/// `allow_not_ready` is only for use by health checks while starting the provider
2022-09-20 09:00:27 +03:00
pub async fn wait_for_request_handle (
self : & Arc < Self > ,
2022-11-08 22:58:11 +03:00
authorization : & Arc < Authorization > ,
2022-09-20 09:00:27 +03:00
max_wait : Duration ,
2022-12-06 00:13:36 +03:00
allow_not_ready : bool ,
2022-09-20 09:00:27 +03:00
) -> anyhow ::Result < OpenRequestHandle > {
let max_wait = Instant ::now ( ) + max_wait ;
2022-05-16 22:15:40 +03:00
2022-06-17 01:23:41 +03:00
loop {
2022-12-06 00:13:36 +03:00
match self
. try_request_handle ( authorization , allow_not_ready )
. await
{
2022-08-24 03:59:05 +03:00
Ok ( OpenRequestResult ::Handle ( handle ) ) = > return Ok ( handle ) ,
2022-08-24 03:14:49 +03:00
Ok ( OpenRequestResult ::RetryAt ( retry_at ) ) = > {
2022-08-07 09:48:57 +03:00
// TODO: emit a stat?
2022-11-12 11:24:32 +03:00
// // trace!(?retry_at);
2022-09-20 09:00:27 +03:00
if retry_at > max_wait {
// break now since we will wait past our maximum wait time
2022-11-25 10:41:53 +03:00
// TODO: don't use anyhow. use specific error type
2022-09-20 09:00:27 +03:00
return Err ( anyhow ::anyhow! ( " timeout waiting for request handle " ) ) ;
}
2022-08-07 09:48:57 +03:00
sleep_until ( retry_at ) . await ;
}
2022-12-06 00:13:36 +03:00
Ok ( OpenRequestResult ::NotReady ) = > {
2022-08-24 03:59:05 +03:00
// TODO: when can this happen? log? emit a stat?
// TODO: subscribe to the head block on this
2022-08-07 09:48:57 +03:00
// TODO: sleep how long? maybe just error?
2022-11-25 10:41:53 +03:00
// TODO: don't use anyhow. use specific error type
2022-09-20 09:00:27 +03:00
return Err ( anyhow ::anyhow! ( " unable to retry for request handle " ) ) ;
2022-05-06 07:29:25 +03:00
}
2022-08-07 09:48:57 +03:00
Err ( err ) = > return Err ( err ) ,
2022-05-06 07:29:25 +03:00
}
2022-05-05 22:07:09 +03:00
}
}
2022-09-22 23:27:14 +03:00
pub async fn try_request_handle (
self : & Arc < Self > ,
2022-11-08 22:58:11 +03:00
authorization : & Arc < Authorization > ,
2022-12-06 00:13:36 +03:00
// TODO? ready_provider: Option<&Arc<Web3Provider>>,
allow_not_ready : bool ,
2022-09-22 23:27:14 +03:00
) -> anyhow ::Result < OpenRequestResult > {
2022-12-08 09:54:38 +03:00
// TODO: think more about this read block
if ! allow_not_ready
& & self
. provider_state
. read ( )
. await
. provider ( allow_not_ready )
. await
. is_none ( )
{
return Ok ( OpenRequestResult ::NotReady ) ;
}
2022-05-05 22:07:09 +03:00
// check rate limits
2022-05-22 02:34:05 +03:00
if let Some ( ratelimiter ) = self . hard_limit . as_ref ( ) {
2022-09-20 09:56:24 +03:00
// TODO: how should we know if we should set expire or not?
2022-09-24 06:59:21 +03:00
match ratelimiter . throttle ( ) . await ? {
2022-09-15 20:57:24 +03:00
RedisRateLimitResult ::Allowed ( _ ) = > {
2022-11-12 11:24:32 +03:00
// // trace!("rate limit succeeded")
2022-05-05 22:07:09 +03:00
}
2022-09-15 20:57:24 +03:00
RedisRateLimitResult ::RetryAt ( retry_at , _ ) = > {
2022-05-05 22:07:09 +03:00
// rate limit failed
2022-05-22 02:34:05 +03:00
// save the smallest retry_after. if nothing succeeds, return an Err with retry_after in it
2022-05-05 22:07:09 +03:00
// TODO: use tracing better
2022-06-25 06:33:49 +03:00
// TODO: i'm seeing "Exhausted rate limit on moralis: 0ns". How is it getting 0?
2022-11-14 00:05:37 +03:00
warn! ( " Exhausted rate limit on {}. Retry at {:?} " , self , retry_at ) ;
2022-05-05 22:07:09 +03:00
2022-09-10 03:12:14 +03:00
return Ok ( OpenRequestResult ::RetryAt ( retry_at ) ) ;
2022-08-07 09:48:57 +03:00
}
2022-09-15 20:57:24 +03:00
RedisRateLimitResult ::RetryNever = > {
2022-12-06 00:13:36 +03:00
return Ok ( OpenRequestResult ::NotReady ) ;
2022-05-05 22:07:09 +03:00
}
}
} ;
2022-12-06 03:06:28 +03:00
let handle = OpenRequestHandle ::new ( authorization . clone ( ) , self . clone ( ) ) . await ;
2022-08-24 03:11:49 +03:00
2022-08-24 03:59:05 +03:00
Ok ( OpenRequestResult ::Handle ( handle ) )
2022-08-24 03:11:49 +03:00
}
}
2022-08-07 09:48:57 +03:00
2022-08-24 03:11:49 +03:00
impl fmt ::Debug for Web3Provider {
fn fmt ( & self , f : & mut fmt ::Formatter < '_ > ) -> fmt ::Result {
// TODO: the default Debug takes forever to write. this is too quiet though. we at least need the url
f . debug_struct ( " Web3Provider " ) . finish_non_exhaustive ( )
2022-05-06 07:29:25 +03:00
}
}
2022-06-14 07:04:14 +03:00
impl Hash for Web3Connection {
fn hash < H : Hasher > ( & self , state : & mut H ) {
2022-08-24 03:32:16 +03:00
// TODO: is this enough?
self . name . hash ( state ) ;
2022-06-14 07:04:14 +03:00
}
}
2022-05-05 22:07:09 +03:00
impl Eq for Web3Connection { }
impl Ord for Web3Connection {
fn cmp ( & self , other : & Self ) -> std ::cmp ::Ordering {
2022-08-24 03:32:16 +03:00
self . name . cmp ( & other . name )
2022-05-05 22:07:09 +03:00
}
}
impl PartialOrd for Web3Connection {
fn partial_cmp ( & self , other : & Self ) -> Option < Ordering > {
Some ( self . cmp ( other ) )
}
}
impl PartialEq for Web3Connection {
fn eq ( & self , other : & Self ) -> bool {
2022-08-24 03:32:16 +03:00
self . name = = other . name
2022-05-05 22:07:09 +03:00
}
}
2022-08-10 08:56:09 +03:00
impl Serialize for Web3Connection {
fn serialize < S > ( & self , serializer : S ) -> Result < S ::Ok , S ::Error >
where
S : Serializer ,
{
// 3 is the number of fields in the struct.
2022-11-14 00:05:37 +03:00
let mut state = serializer . serialize_struct ( " Web3Connection " , 8 ) ? ;
2022-08-10 08:56:09 +03:00
2022-11-14 00:05:37 +03:00
// the url is excluded because it likely includes private information. just show the name that we use in keys
2022-08-10 08:56:09 +03:00
state . serialize_field ( " name " , & self . name ) ? ;
2022-11-14 00:05:37 +03:00
// a longer name for display to users
state . serialize_field ( " display_name " , & self . display_name ) ? ;
2022-08-10 08:56:09 +03:00
2022-12-28 05:17:11 +03:00
match self . block_data_limit . load ( atomic ::Ordering ::Relaxed ) {
u64 ::MAX = > {
state . serialize_field ( " block_data_limit " , & None ::< ( ) > ) ? ;
}
block_data_limit = > {
state . serialize_field ( " block_data_limit " , & block_data_limit ) ? ;
}
2022-09-06 23:12:45 +03:00
}
2022-08-10 08:56:09 +03:00
2022-11-08 01:10:19 +03:00
state . serialize_field ( " weight " , & self . weight ) ? ;
2022-08-10 08:56:09 +03:00
state . serialize_field ( " soft_limit " , & self . soft_limit ) ? ;
state . serialize_field (
" active_requests " ,
& self . active_requests . load ( atomic ::Ordering ::Relaxed ) ,
) ? ;
2022-09-06 23:12:45 +03:00
state . serialize_field (
" total_requests " ,
2022-11-25 03:45:13 +03:00
& self . frontend_requests . load ( atomic ::Ordering ::Relaxed ) ,
2022-09-06 23:12:45 +03:00
) ? ;
2022-12-28 05:17:11 +03:00
{
// TODO: maybe this is too much data. serialize less?
let head_block = & * self . head_block . read ( ) ;
state . serialize_field ( " head_block " , head_block ) ? ;
}
2022-09-05 19:39:46 +03:00
2022-08-10 08:56:09 +03:00
state . end ( )
}
}
impl fmt ::Debug for Web3Connection {
fn fmt ( & self , f : & mut fmt ::Formatter < '_ > ) -> fmt ::Result {
let mut f = f . debug_struct ( " Web3Connection " ) ;
2022-08-24 03:32:16 +03:00
f . field ( " name " , & self . name ) ;
2022-08-10 08:56:09 +03:00
let block_data_limit = self . block_data_limit . load ( atomic ::Ordering ::Relaxed ) ;
if block_data_limit = = u64 ::MAX {
2022-09-07 06:54:16 +03:00
f . field ( " blocks " , & " all " ) ;
2022-08-10 08:56:09 +03:00
} else {
2022-09-07 06:54:16 +03:00
f . field ( " blocks " , & block_data_limit ) ;
2022-08-10 08:56:09 +03:00
}
f . finish_non_exhaustive ( )
}
}
impl fmt ::Display for Web3Connection {
fn fmt ( & self , f : & mut fmt ::Formatter < '_ > ) -> fmt ::Result {
// TODO: filter basic auth and api keys
2022-08-24 03:32:16 +03:00
write! ( f , " {} " , & self . name )
2022-08-10 08:56:09 +03:00
}
}
2022-11-22 23:23:08 +03:00
mod tests {
2022-11-30 00:34:42 +03:00
#![ allow(unused_imports) ]
2022-11-22 23:23:08 +03:00
use super ::* ;
2022-12-05 04:10:20 +03:00
use ethers ::types ::{ Block , U256 } ;
use std ::time ::{ SystemTime , UNIX_EPOCH } ;
2022-11-22 23:23:08 +03:00
#[ test ]
fn test_archive_node_has_block_data ( ) {
2022-12-05 04:10:20 +03:00
let now = SystemTime ::now ( )
. duration_since ( UNIX_EPOCH )
. expect ( " cannot tell the time " )
. as_secs ( )
. into ( ) ;
2022-12-03 08:31:03 +03:00
let random_block = Block {
hash : Some ( H256 ::random ( ) ) ,
number : Some ( 1_000_000. into ( ) ) ,
2022-12-05 04:10:20 +03:00
timestamp : now ,
2022-12-03 08:31:03 +03:00
.. Default ::default ( )
2022-11-22 23:23:08 +03:00
} ;
2022-12-03 08:31:03 +03:00
let random_block = Arc ::new ( random_block ) ;
let head_block = SavedBlock ::new ( random_block ) ;
2022-11-22 23:44:23 +03:00
let block_data_limit = u64 ::MAX ;
2022-11-22 23:23:08 +03:00
let metrics = OpenRequestHandleMetrics ::default ( ) ;
let x = Web3Connection {
name : " name " . to_string ( ) ,
display_name : None ,
url : " ws://example.com " . to_string ( ) ,
http_client : None ,
active_requests : 0. into ( ) ,
2022-11-25 03:45:13 +03:00
frontend_requests : 0. into ( ) ,
internal_requests : 0. into ( ) ,
2022-12-06 00:13:36 +03:00
provider_state : AsyncRwLock ::new ( ProviderState ::None ) ,
2022-11-22 23:23:08 +03:00
hard_limit : None ,
soft_limit : 1_000 ,
2022-12-06 00:13:36 +03:00
automatic_block_limit : false ,
2022-11-22 23:44:23 +03:00
block_data_limit : block_data_limit . into ( ) ,
2022-11-22 23:23:08 +03:00
weight : 100.0 ,
2022-12-03 08:31:03 +03:00
head_block : RwLock ::new ( Some ( head_block . clone ( ) ) ) ,
2022-11-22 23:23:08 +03:00
open_request_handle_metrics : Arc ::new ( metrics ) ,
} ;
assert! ( x . has_block_data ( & 0. into ( ) ) ) ;
assert! ( x . has_block_data ( & 1. into ( ) ) ) ;
2022-12-03 08:31:03 +03:00
assert! ( x . has_block_data ( & head_block . number ( ) ) ) ;
assert! ( ! x . has_block_data ( & ( head_block . number ( ) + 1 ) ) ) ;
assert! ( ! x . has_block_data ( & ( head_block . number ( ) + 1000 ) ) ) ;
2022-11-22 23:23:08 +03:00
}
2022-11-22 23:44:23 +03:00
#[ test ]
fn test_pruned_node_has_block_data ( ) {
2022-12-05 04:10:20 +03:00
let now = SystemTime ::now ( )
. duration_since ( UNIX_EPOCH )
. expect ( " cannot tell the time " )
. as_secs ( )
. into ( ) ;
2022-12-03 08:31:03 +03:00
let head_block : SavedBlock = Arc ::new ( Block {
hash : Some ( H256 ::random ( ) ) ,
number : Some ( 1_000_000. into ( ) ) ,
2022-12-05 04:10:20 +03:00
timestamp : now ,
2022-12-03 08:31:03 +03:00
.. Default ::default ( )
} )
. into ( ) ;
2022-11-22 23:44:23 +03:00
let block_data_limit = 64 ;
let metrics = OpenRequestHandleMetrics ::default ( ) ;
2022-11-25 03:45:13 +03:00
// TODO: this is getting long. have a `impl Default`
2022-11-22 23:44:23 +03:00
let x = Web3Connection {
name : " name " . to_string ( ) ,
display_name : None ,
url : " ws://example.com " . to_string ( ) ,
http_client : None ,
active_requests : 0. into ( ) ,
2022-11-25 03:45:13 +03:00
frontend_requests : 0. into ( ) ,
internal_requests : 0. into ( ) ,
2022-12-06 00:13:36 +03:00
provider_state : AsyncRwLock ::new ( ProviderState ::None ) ,
2022-11-22 23:44:23 +03:00
hard_limit : None ,
soft_limit : 1_000 ,
2022-12-06 00:13:36 +03:00
automatic_block_limit : false ,
2022-11-22 23:44:23 +03:00
block_data_limit : block_data_limit . into ( ) ,
weight : 100.0 ,
2022-12-03 08:31:03 +03:00
head_block : RwLock ::new ( Some ( head_block . clone ( ) ) ) ,
2022-11-22 23:44:23 +03:00
open_request_handle_metrics : Arc ::new ( metrics ) ,
} ;
assert! ( ! x . has_block_data ( & 0. into ( ) ) ) ;
assert! ( ! x . has_block_data ( & 1. into ( ) ) ) ;
2022-12-03 08:31:03 +03:00
assert! ( ! x . has_block_data ( & ( head_block . number ( ) - block_data_limit - 1 ) ) ) ;
assert! ( x . has_block_data ( & ( head_block . number ( ) - block_data_limit ) ) ) ;
assert! ( x . has_block_data ( & head_block . number ( ) ) ) ;
assert! ( ! x . has_block_data ( & ( head_block . number ( ) + 1 ) ) ) ;
assert! ( ! x . has_block_data ( & ( head_block . number ( ) + 1000 ) ) ) ;
2022-11-22 23:44:23 +03:00
}
2022-12-05 04:10:20 +03:00
#[ test ]
fn test_lagged_node_not_has_block_data ( ) {
let now : U256 = SystemTime ::now ( )
. duration_since ( UNIX_EPOCH )
. expect ( " cannot tell the time " )
. as_secs ( )
. into ( ) ;
// head block is an hour old
let head_block = Block {
hash : Some ( H256 ::random ( ) ) ,
number : Some ( 1_000_000. into ( ) ) ,
timestamp : now - 3600 ,
.. Default ::default ( )
} ;
let head_block = Arc ::new ( head_block ) ;
let head_block = SavedBlock ::new ( head_block ) ;
let block_data_limit = u64 ::MAX ;
let metrics = OpenRequestHandleMetrics ::default ( ) ;
let x = Web3Connection {
name : " name " . to_string ( ) ,
display_name : None ,
url : " ws://example.com " . to_string ( ) ,
http_client : None ,
active_requests : 0. into ( ) ,
frontend_requests : 0. into ( ) ,
internal_requests : 0. into ( ) ,
2022-12-06 00:13:36 +03:00
provider_state : AsyncRwLock ::new ( ProviderState ::None ) ,
2022-12-05 04:10:20 +03:00
hard_limit : None ,
soft_limit : 1_000 ,
2022-12-06 00:13:36 +03:00
automatic_block_limit : false ,
2022-12-05 04:10:20 +03:00
block_data_limit : block_data_limit . into ( ) ,
weight : 100.0 ,
head_block : RwLock ::new ( Some ( head_block . clone ( ) ) ) ,
open_request_handle_metrics : Arc ::new ( metrics ) ,
} ;
assert! ( ! x . has_block_data ( & 0. into ( ) ) ) ;
assert! ( ! x . has_block_data ( & 1. into ( ) ) ) ;
assert! ( ! x . has_block_data ( & head_block . number ( ) ) ) ;
assert! ( ! x . has_block_data ( & ( head_block . number ( ) + 1 ) ) ) ;
assert! ( ! x . has_block_data ( & ( head_block . number ( ) + 1000 ) ) ) ;
}
2022-11-22 23:23:08 +03:00
}