2022-08-27 02:44:25 +03:00
///! Rate-limited communication with a web3 provider.
2022-08-30 23:01:42 +03:00
use super ::blockchain ::{ ArcBlock , BlockHashesMap , BlockId } ;
2022-08-24 02:56:47 +03:00
use super ::provider ::Web3Provider ;
2022-09-09 06:53:16 +03:00
use super ::request ::{ OpenRequestHandle , OpenRequestHandleMetrics , OpenRequestResult } ;
2022-08-24 02:56:47 +03:00
use crate ::app ::{ flatten_handle , AnyhowJoinHandle } ;
use crate ::config ::BlockAndRpc ;
2022-06-16 05:53:37 +03:00
use anyhow ::Context ;
2022-07-19 04:31:12 +03:00
use ethers ::prelude ::{ Block , Bytes , Middleware , ProviderError , TxHash , H256 , U64 } ;
2022-06-16 05:53:37 +03:00
use futures ::future ::try_join_all ;
2022-05-05 22:07:09 +03:00
use futures ::StreamExt ;
2022-08-10 05:37:34 +03:00
use parking_lot ::RwLock ;
2022-09-14 04:43:09 +03:00
use rand ::Rng ;
2022-09-15 20:57:24 +03:00
use redis_rate_limiter ::{ RedisPool , RedisRateLimitResult , RedisRateLimiter } ;
2022-05-21 01:16:15 +03:00
use serde ::ser ::{ SerializeStruct , Serializer } ;
use serde ::Serialize ;
2022-09-14 04:43:09 +03:00
use std ::cmp ::min ;
2022-05-05 22:07:09 +03:00
use std ::fmt ;
2022-06-14 07:04:14 +03:00
use std ::hash ::{ Hash , Hasher } ;
2022-07-19 04:31:12 +03:00
use std ::sync ::atomic ::{ self , AtomicU32 , AtomicU64 } ;
2022-05-05 22:07:09 +03:00
use std ::{ cmp ::Ordering , sync ::Arc } ;
2022-09-06 19:49:07 +03:00
use tokio ::sync ::broadcast ;
2022-08-10 05:37:34 +03:00
use tokio ::sync ::RwLock as AsyncRwLock ;
2022-08-24 03:14:49 +03:00
use tokio ::time ::{ interval , sleep , sleep_until , Duration , MissedTickBehavior } ;
2022-09-06 16:14:15 +03:00
use tracing ::{ debug , error , info , instrument , trace , warn } ;
2022-06-14 08:43:28 +03:00
2022-09-15 20:57:24 +03:00
/// An active connection to a Web3 RPC server like geth or erigon.
2022-08-24 02:13:56 +03:00
pub struct Web3Connection {
2022-08-24 03:11:49 +03:00
pub name : String ,
2022-08-24 02:13:56 +03:00
/// TODO: can we get this from the provider? do we even need it?
2022-08-26 20:26:17 +03:00
url : String ,
2022-09-15 20:57:24 +03:00
/// Some connections use an http_client. we keep a clone for reconnecting
2022-09-14 05:11:48 +03:00
http_client : Option < reqwest ::Client > ,
2022-08-24 02:13:56 +03:00
/// keep track of currently open requests. We sort on this
2022-08-24 03:11:49 +03:00
pub ( super ) active_requests : AtomicU32 ,
2022-09-06 23:12:45 +03:00
/// keep track of total requests
/// TODO: is this type okay?
2022-09-09 06:53:16 +03:00
/// TODO: replace this with something in metered?
2022-09-06 23:12:45 +03:00
pub ( super ) total_requests : AtomicU64 ,
2022-08-24 02:13:56 +03:00
/// provider is in a RwLock so that we can replace it if re-connecting
/// it is an async lock because we hold it open across awaits
2022-08-24 03:11:49 +03:00
pub ( super ) provider : AsyncRwLock < Option < Arc < Web3Provider > > > ,
2022-08-24 02:13:56 +03:00
/// rate limits are stored in a central redis so that multiple proxies can share their rate limits
2022-09-15 20:57:24 +03:00
/// We do not use the deferred rate limiter because going over limits would cause errors
hard_limit : Option < RedisRateLimiter > ,
2022-08-24 02:13:56 +03:00
/// used for load balancing to the least loaded server
2022-08-26 20:26:17 +03:00
pub ( super ) soft_limit : u32 ,
2022-09-06 23:12:45 +03:00
/// TODO: have an enum for this so that "no limit" prints pretty?
2022-08-24 02:13:56 +03:00
block_data_limit : AtomicU64 ,
2022-08-26 20:26:17 +03:00
/// Lower weight are higher priority when sending requests
pub ( super ) weight : u32 ,
// TODO: async lock?
2022-09-06 06:26:23 +03:00
pub ( super ) head_block_id : RwLock < Option < BlockId > > ,
2022-09-09 06:53:16 +03:00
pub ( super ) open_request_handle_metrics : Arc < OpenRequestHandleMetrics > ,
2022-08-24 02:13:56 +03:00
}
2022-05-05 22:07:09 +03:00
impl Web3Connection {
2022-06-14 07:04:14 +03:00
/// Connect to a web3 rpc
2022-06-14 08:43:28 +03:00
// #[instrument(name = "spawn_Web3Connection", skip(hard_limit, http_client))]
2022-07-19 07:21:32 +03:00
// TODO: have this take a builder (which will have channels attached)
2022-06-14 08:43:28 +03:00
#[ allow(clippy::too_many_arguments) ]
2022-06-14 07:04:14 +03:00
pub async fn spawn (
2022-08-10 08:56:09 +03:00
name : String ,
2022-07-19 07:21:32 +03:00
chain_id : u64 ,
2022-05-05 22:07:09 +03:00
url_str : String ,
2022-05-15 04:51:24 +03:00
// optional because this is only used for http providers. websocket providers don't use it
2022-07-19 07:21:32 +03:00
http_client : Option < reqwest ::Client > ,
2022-06-29 22:15:05 +03:00
http_interval_sender : Option < Arc < broadcast ::Sender < ( ) > > > ,
2022-08-16 01:50:56 +03:00
// TODO: have a builder struct for this.
hard_limit : Option < ( u64 , RedisPool ) > ,
2022-05-05 22:07:09 +03:00
// TODO: think more about this type
soft_limit : u32 ,
2022-08-28 02:49:41 +03:00
block_map : BlockHashesMap ,
2022-07-22 08:11:26 +03:00
block_sender : Option < flume ::Sender < BlockAndRpc > > ,
2022-06-14 08:43:28 +03:00
tx_id_sender : Option < flume ::Sender < ( TxHash , Arc < Self > ) > > ,
reconnect : bool ,
2022-08-08 22:57:54 +03:00
weight : u32 ,
2022-09-09 06:53:16 +03:00
open_request_handle_metrics : Arc < OpenRequestHandleMetrics > ,
2022-06-14 08:43:28 +03:00
) -> anyhow ::Result < ( Arc < Web3Connection > , AnyhowJoinHandle < ( ) > ) > {
2022-09-15 20:57:24 +03:00
let hard_limit = hard_limit . map ( | ( hard_rate_limit , redis_pool ) | {
// TODO: is cache size 1 okay? i think we need
RedisRateLimiter ::new (
2022-08-06 08:46:33 +03:00
" web3_proxy " ,
2022-08-06 05:29:55 +03:00
& format! ( " {} : {} " , chain_id , url_str ) ,
2022-05-22 02:34:05 +03:00
hard_rate_limit ,
2022-08-30 23:01:42 +03:00
60.0 ,
2022-09-15 20:57:24 +03:00
redis_pool ,
2022-05-22 02:34:05 +03:00
)
} ) ;
2022-05-05 22:07:09 +03:00
2022-07-09 07:25:59 +03:00
let new_connection = Self {
2022-08-10 08:56:09 +03:00
name ,
2022-09-14 05:11:48 +03:00
http_client ,
2022-05-05 22:07:09 +03:00
url : url_str . clone ( ) ,
2022-05-15 04:51:24 +03:00
active_requests : 0. into ( ) ,
2022-09-06 23:12:45 +03:00
total_requests : 0. into ( ) ,
2022-09-14 04:43:09 +03:00
provider : AsyncRwLock ::new ( None ) ,
2022-05-22 02:34:05 +03:00
hard_limit ,
2022-05-05 22:07:09 +03:00
soft_limit ,
2022-07-19 04:31:12 +03:00
block_data_limit : Default ::default ( ) ,
2022-09-05 19:39:46 +03:00
head_block_id : RwLock ::new ( Default ::default ( ) ) ,
2022-08-08 22:57:54 +03:00
weight ,
2022-09-09 06:53:16 +03:00
open_request_handle_metrics ,
2022-05-12 21:49:57 +03:00
} ;
2022-07-09 07:25:59 +03:00
let new_connection = Arc ::new ( new_connection ) ;
2022-05-12 21:49:57 +03:00
2022-09-14 04:43:09 +03:00
// connect to the server (with retries)
new_connection
2022-09-14 05:11:48 +03:00
. retrying_reconnect ( block_sender . as_ref ( ) , false )
2022-09-14 04:43:09 +03:00
. await ? ;
2022-05-15 04:51:24 +03:00
// check the server's chain_id here
2022-07-19 04:31:12 +03:00
// TODO: move this outside the `new` function and into a `start` function or something. that way we can do retries from there
2022-05-15 04:51:24 +03:00
// TODO: some public rpcs (on bsc and fantom) do not return an id and so this ends up being an error
2022-06-17 01:23:41 +03:00
// TODO: this will wait forever. do we want that?
2022-07-19 07:21:32 +03:00
let found_chain_id : Result < U64 , _ > = new_connection
2022-06-14 07:04:14 +03:00
. wait_for_request_handle ( )
2022-08-06 08:26:43 +03:00
. await ?
2022-09-14 07:27:18 +03:00
. request ( " eth_chainId " , Option ::None ::< ( ) > , false )
2022-05-13 00:20:33 +03:00
. await ;
match found_chain_id {
Ok ( found_chain_id ) = > {
2022-06-29 22:15:05 +03:00
// TODO: there has to be a cleaner way to do this
2022-07-19 07:21:32 +03:00
if chain_id ! = found_chain_id . as_u64 ( ) {
2022-05-13 00:20:33 +03:00
return Err ( anyhow ::anyhow! (
" incorrect chain id! Expected {}. Found {} " ,
chain_id ,
found_chain_id
2022-07-19 07:21:32 +03:00
)
2022-07-19 09:41:04 +03:00
. context ( format! ( " failed @ {} " , new_connection ) ) ) ;
2022-05-13 00:20:33 +03:00
}
}
Err ( e ) = > {
2022-07-19 09:41:04 +03:00
let e = anyhow ::Error ::from ( e ) . context ( format! ( " failed @ {} " , new_connection ) ) ;
2022-05-13 00:20:33 +03:00
return Err ( e ) ;
}
2022-05-12 21:49:57 +03:00
}
2022-08-11 00:52:28 +03:00
let will_subscribe_to_blocks = block_sender . is_some ( ) ;
2022-07-19 04:31:12 +03:00
// subscribe to new blocks and new transactions
// TODO: make transaction subscription optional (just pass None for tx_id_sender)
2022-06-14 08:43:28 +03:00
let handle = {
2022-07-09 07:25:59 +03:00
let new_connection = new_connection . clone ( ) ;
2022-06-14 08:43:28 +03:00
tokio ::spawn ( async move {
2022-07-09 07:25:59 +03:00
new_connection
2022-08-26 20:26:17 +03:00
. subscribe (
http_interval_sender ,
block_map ,
block_sender ,
tx_id_sender ,
reconnect ,
)
2022-06-14 08:43:28 +03:00
. await
} )
} ;
2022-07-19 04:31:12 +03:00
// we could take "archive" as a parameter, but we would want a safety check on it regardless
2022-07-19 07:22:02 +03:00
// check common archive thresholds
2022-07-23 03:19:13 +03:00
// TODO: would be great if rpcs exposed this
// TODO: move this to a helper function so we can recheck on errors or as the chain grows
2022-08-03 03:27:26 +03:00
// TODO: move this to a helper function that checks
2022-08-11 00:52:28 +03:00
if will_subscribe_to_blocks {
// TODO: make sure the server isn't still syncing
2022-07-19 04:31:12 +03:00
2022-08-11 00:52:28 +03:00
// TODO: don't sleep. wait for new heads subscription instead
// TODO: i think instead of atomics, we could maybe use a watch channel
sleep ( Duration ::from_millis ( 250 ) ) . await ;
2022-07-19 04:31:12 +03:00
2022-08-27 05:13:36 +03:00
new_connection . check_block_data_limit ( ) . await ? ;
}
2022-07-19 04:31:12 +03:00
2022-08-27 05:13:36 +03:00
Ok ( ( new_connection , handle ) )
}
2022-07-19 04:31:12 +03:00
2022-09-05 19:25:21 +03:00
#[ instrument(skip_all) ]
2022-08-27 05:13:36 +03:00
async fn check_block_data_limit ( self : & Arc < Self > ) -> anyhow ::Result < Option < u64 > > {
let mut limit = None ;
2022-08-11 00:52:28 +03:00
2022-08-27 05:13:36 +03:00
for block_data_limit in [ u64 ::MAX , 90_000 , 128 , 64 , 32 ] {
2022-09-06 06:26:23 +03:00
let mut head_block_id = self . head_block_id . read ( ) . clone ( ) ;
2022-08-11 00:52:28 +03:00
2022-09-14 05:11:48 +03:00
// TODO: subscribe to a channel instead of polling. subscribe to http_interval_sender?
2022-09-06 06:26:23 +03:00
while head_block_id . is_none ( ) {
2022-09-05 19:25:21 +03:00
warn! ( rpc = % self , " no head block yet. retrying " ) ;
2022-07-19 04:31:12 +03:00
2022-09-14 05:11:48 +03:00
sleep ( Duration ::from_secs ( 13 ) ) . await ;
2022-08-11 00:52:28 +03:00
2022-09-06 06:26:23 +03:00
head_block_id = self . head_block_id . read ( ) . clone ( ) ;
2022-08-27 05:13:36 +03:00
}
2022-09-06 06:26:23 +03:00
let head_block_num = head_block_id . expect ( " is_none was checked above " ) . num ;
debug_assert_ne! ( head_block_num , U64 ::zero ( ) ) ;
2022-08-11 00:52:28 +03:00
2022-08-27 05:13:36 +03:00
// TODO: subtract 1 from block_data_limit for safety?
let maybe_archive_block = head_block_num
. saturating_sub ( ( block_data_limit ) . into ( ) )
. max ( U64 ::one ( ) ) ;
2022-09-06 06:26:23 +03:00
// TODO: wait for the handle BEFORE we check the current block number. it might be delayed too!
2022-08-27 05:13:36 +03:00
let archive_result : Result < Bytes , _ > = self
. wait_for_request_handle ( )
. await ?
. request (
" eth_getCode " ,
(
" 0xdead00000000000000000000000000000000beef " ,
maybe_archive_block ,
) ,
2022-09-14 07:27:18 +03:00
true ,
2022-08-27 05:13:36 +03:00
)
. await ;
2022-09-05 19:25:21 +03:00
trace! ( ? archive_result , rpc = % self ) ;
2022-08-27 05:13:36 +03:00
if archive_result . is_ok ( ) {
limit = Some ( block_data_limit ) ;
break ;
2022-07-19 04:31:12 +03:00
}
}
2022-08-27 05:13:36 +03:00
if let Some ( limit ) = limit {
self . block_data_limit
. store ( limit , atomic ::Ordering ::Release ) ;
}
2022-07-19 04:31:12 +03:00
2022-08-27 05:13:36 +03:00
Ok ( limit )
2022-07-09 07:25:59 +03:00
}
2022-07-19 04:31:12 +03:00
/// TODO: this might be too simple. different nodes can prune differently
2022-07-25 03:27:00 +03:00
pub fn block_data_limit ( & self ) -> U64 {
2022-07-19 04:31:12 +03:00
self . block_data_limit . load ( atomic ::Ordering ::Acquire ) . into ( )
}
2022-07-22 22:30:39 +03:00
pub fn has_block_data ( & self , needed_block_num : & U64 ) -> bool {
2022-07-25 03:27:00 +03:00
let block_data_limit : U64 = self . block_data_limit ( ) ;
2022-07-19 04:31:12 +03:00
2022-09-06 06:26:23 +03:00
let head_block_id = self . head_block_id . read ( ) . clone ( ) ;
let newest_block_num = match head_block_id {
None = > return false ,
Some ( x ) = > x . num ,
} ;
2022-07-19 04:31:12 +03:00
let oldest_block_num = newest_block_num
. saturating_sub ( block_data_limit )
. max ( U64 ::one ( ) ) ;
2022-07-22 22:30:39 +03:00
needed_block_num > = & oldest_block_num & & needed_block_num < = & newest_block_num
2022-05-05 22:07:09 +03:00
}
2022-09-14 04:43:09 +03:00
/// reconnect to the provider. errors are retried forever with exponential backoff with jitter.
/// We use the "Decorrelated" jitter from https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/
/// TODO: maybe it would be better to use "Full Jitter". The "Full Jitter" approach uses less work, but slightly more time.
pub async fn retrying_reconnect (
self : & Arc < Self > ,
block_sender : Option < & flume ::Sender < BlockAndRpc > > ,
2022-09-14 05:11:48 +03:00
initial_sleep : bool ,
2022-09-14 04:43:09 +03:00
) -> anyhow ::Result < ( ) > {
// there are several crates that have retry helpers, but they all seem more complex than necessary
let base_ms = 500 ;
let cap_ms = 30_000 ;
let range_multiplier = 3 ;
// sleep once before the initial retry attempt
2022-09-14 05:11:48 +03:00
// TODO: now that we use this method for our initial connection, do we still want this sleep?
let mut sleep_ms = if initial_sleep {
let first_sleep_ms = min (
cap_ms ,
rand ::thread_rng ( ) . gen_range ( base_ms .. ( base_ms * range_multiplier ) ) ,
) ;
2022-09-14 09:38:53 +03:00
let reconnect_in = Duration ::from_millis ( first_sleep_ms ) ;
warn! ( rpc = % self , ? reconnect_in , " Reconnect in " ) ;
sleep ( reconnect_in ) . await ;
2022-09-14 05:11:48 +03:00
first_sleep_ms
} else {
base_ms
} ;
2022-09-14 04:43:09 +03:00
// retry until we succeed
while let Err ( err ) = self . reconnect ( block_sender ) . await {
sleep_ms = min (
cap_ms ,
rand ::thread_rng ( ) . gen_range ( base_ms .. ( sleep_ms * range_multiplier ) ) ,
) ;
let retry_in = Duration ::from_millis ( sleep_ms ) ;
warn! ( rpc = % self , ? retry_in , ? err , " Failed to reconnect! " ) ;
sleep ( retry_in ) . await ;
}
Ok ( ( ) )
}
2022-09-06 15:29:37 +03:00
/// reconnect a websocket provider
2022-06-14 07:04:14 +03:00
#[ instrument(skip_all) ]
pub async fn reconnect (
self : & Arc < Self > ,
2022-09-14 07:27:18 +03:00
// websocket doesn't need the http client
2022-09-14 04:43:09 +03:00
block_sender : Option < & flume ::Sender < BlockAndRpc > > ,
2022-06-14 07:04:14 +03:00
) -> anyhow ::Result < ( ) > {
// since this lock is held open over an await, we use tokio's locking
2022-07-19 04:31:12 +03:00
// TODO: timeout on this lock. if its slow, something is wrong
2022-09-14 09:38:53 +03:00
let mut provider_option = self . provider . write ( ) . await ;
if let Some ( provider ) = & * provider_option {
match provider . as_ref ( ) {
Web3Provider ::Http ( _ ) = > {
// http clients don't need to do anything for reconnecting
// they *do* need to run this function to setup the first
return Ok ( ( ) ) ;
}
Web3Provider ::Ws ( _ ) = > { }
2022-09-14 07:27:18 +03:00
}
info! ( rpc = % self , " reconnecting " ) ;
2022-09-13 02:00:10 +03:00
2022-09-14 07:27:18 +03:00
// disconnect the current provider
2022-09-14 09:38:53 +03:00
* provider_option = None ;
2022-06-14 07:04:14 +03:00
2022-09-14 07:27:18 +03:00
// reset sync status
{
let mut head_block_id = self . head_block_id . write ( ) ;
* head_block_id = None ;
}
// tell the block subscriber that we don't have any blocks
if let Some ( block_sender ) = & block_sender {
block_sender
. send_async ( ( None , self . clone ( ) ) )
. await
. context ( " block_sender during connect " ) ? ;
}
} else {
info! ( rpc = % self , " connecting " ) ;
2022-06-16 05:53:37 +03:00
}
2022-06-14 07:04:14 +03:00
2022-09-14 04:43:09 +03:00
// TODO: if this fails, keep retrying! otherwise it crashes and doesn't try again!
2022-09-14 05:11:48 +03:00
let new_provider = Web3Provider ::from_str ( & self . url , self . http_client . clone ( ) ) . await ? ;
2022-09-14 04:43:09 +03:00
2022-09-14 09:38:53 +03:00
* provider_option = Some ( Arc ::new ( new_provider ) ) ;
2022-09-14 04:43:09 +03:00
2022-09-14 06:32:38 +03:00
info! ( rpc = % self , " successfully connected " ) ;
2022-09-14 04:43:09 +03:00
2022-06-14 07:04:14 +03:00
Ok ( ( ) )
}
2022-05-06 09:07:01 +03:00
#[ inline ]
2022-05-05 22:07:09 +03:00
pub fn active_requests ( & self ) -> u32 {
self . active_requests . load ( atomic ::Ordering ::Acquire )
}
2022-06-04 01:22:55 +03:00
#[ inline ]
pub async fn has_provider ( & self ) -> bool {
self . provider . read ( ) . await . is_some ( )
}
2022-05-17 03:56:56 +03:00
#[ instrument(skip_all) ]
2022-08-26 20:26:17 +03:00
async fn send_head_block_result (
self : & Arc < Self > ,
2022-09-06 15:29:37 +03:00
new_head_block : Result < Option < ArcBlock > , ProviderError > ,
2022-07-22 08:11:26 +03:00
block_sender : & flume ::Sender < BlockAndRpc > ,
2022-08-28 02:49:41 +03:00
block_map : BlockHashesMap ,
2022-05-30 07:30:13 +03:00
) -> anyhow ::Result < ( ) > {
2022-08-26 20:26:17 +03:00
match new_head_block {
2022-09-06 15:29:37 +03:00
Ok ( None ) = > {
2022-09-13 02:00:10 +03:00
// TODO: i think this should clear the local block and then update over the block sender
2022-09-06 15:29:37 +03:00
todo! ( " handle no block " )
}
Ok ( Some ( mut new_head_block ) ) = > {
2022-08-26 20:26:17 +03:00
// TODO: is unwrap_or_default ok? we might have an empty block
let new_hash = new_head_block . hash . unwrap_or_default ( ) ;
2022-09-14 22:39:08 +03:00
// if we already have this block saved, set new_head_block to that arc. otherwise store this copy
new_head_block = block_map
. get_with ( new_hash , async move { new_head_block } )
. await ;
2022-08-30 23:01:42 +03:00
2022-08-26 20:26:17 +03:00
let new_num = new_head_block . number . unwrap_or_default ( ) ;
// save the block so we don't send the same one multiple times
// also save so that archive checks can know how far back to query
{
2022-09-06 06:26:23 +03:00
let mut head_block_id = self . head_block_id . write ( ) ;
if head_block_id . is_none ( ) {
* head_block_id = Some ( BlockId {
hash : new_hash ,
num : new_num ,
} ) ;
} else {
head_block_id . as_mut ( ) . map ( | x | {
x . hash = new_hash ;
x . num = new_num ;
x
} ) ;
}
2022-07-19 04:31:12 +03:00
}
2022-06-16 05:53:37 +03:00
block_sender
2022-09-06 15:29:37 +03:00
. send_async ( ( Some ( new_head_block ) , self . clone ( ) ) )
2022-06-16 05:53:37 +03:00
. await
. context ( " block_sender " ) ? ;
2022-05-15 09:27:13 +03:00
}
Err ( e ) = > {
warn! ( " unable to get block from {}: {} " , self , e ) ;
2022-08-26 20:26:17 +03:00
// TODO: do something to rpc_chain?
2022-08-07 09:48:57 +03:00
// send an empty block to take this server out of rotation
block_sender
2022-09-06 15:29:37 +03:00
. send_async ( ( None , self . clone ( ) ) )
2022-08-07 09:48:57 +03:00
. await
. context ( " block_sender " ) ? ;
2022-05-15 09:27:13 +03:00
}
}
2022-05-30 07:30:13 +03:00
Ok ( ( ) )
2022-05-15 09:27:13 +03:00
}
2022-09-14 04:43:09 +03:00
/// subscribe to blocks and transactions with automatic reconnects
2022-09-02 23:46:39 +03:00
#[ instrument(skip_all) ]
2022-06-14 08:43:28 +03:00
async fn subscribe (
2022-06-14 07:04:14 +03:00
self : Arc < Self > ,
2022-06-29 22:15:05 +03:00
http_interval_sender : Option < Arc < broadcast ::Sender < ( ) > > > ,
2022-08-28 02:49:41 +03:00
block_map : BlockHashesMap ,
2022-07-22 08:11:26 +03:00
block_sender : Option < flume ::Sender < BlockAndRpc > > ,
2022-06-14 08:43:28 +03:00
tx_id_sender : Option < flume ::Sender < ( TxHash , Arc < Self > ) > > ,
2022-06-14 07:04:14 +03:00
reconnect : bool ,
) -> anyhow ::Result < ( ) > {
2022-06-16 05:53:37 +03:00
loop {
2022-09-06 16:14:15 +03:00
debug! ( rpc = % self , " subscribing " ) ;
2022-07-19 07:24:16 +03:00
let http_interval_receiver = http_interval_sender . as_ref ( ) . map ( | x | x . subscribe ( ) ) ;
2022-06-29 22:15:05 +03:00
2022-06-16 05:53:37 +03:00
let mut futures = vec! [ ] ;
if let Some ( block_sender ) = & block_sender {
2022-08-26 20:26:17 +03:00
let f = self . clone ( ) . subscribe_new_heads (
http_interval_receiver ,
block_sender . clone ( ) ,
block_map . clone ( ) ,
) ;
2022-06-16 05:53:37 +03:00
futures . push ( flatten_handle ( tokio ::spawn ( f ) ) ) ;
}
if let Some ( tx_id_sender ) = & tx_id_sender {
let f = self
. clone ( )
. subscribe_pending_transactions ( tx_id_sender . clone ( ) ) ;
futures . push ( flatten_handle ( tokio ::spawn ( f ) ) ) ;
}
2022-09-06 19:49:07 +03:00
{
// TODO: move this into a proper function
let conn = self . clone ( ) ;
// health check
let f = async move {
loop {
if let Some ( provider ) = conn . provider . read ( ) . await . as_ref ( ) {
if provider . ready ( ) {
trace! ( rpc = % conn , " provider is ready " ) ;
} else {
warn! ( rpc = % conn , " provider is NOT ready " ) ;
return Err ( anyhow ::anyhow! ( " provider is not ready " ) ) ;
}
}
2022-09-06 16:14:15 +03:00
2022-09-06 19:49:07 +03:00
// TODO: how often?
// TODO: should we also check that the head block has changed recently?
// TODO: maybe instead we should do a simple subscription and follow that instead
sleep ( Duration ::from_secs ( 10 ) ) . await ;
}
} ;
2022-09-06 16:14:15 +03:00
futures . push ( flatten_handle ( tokio ::spawn ( f ) ) ) ;
2022-06-14 07:04:14 +03:00
}
2022-06-16 05:53:37 +03:00
match try_join_all ( futures ) . await {
2022-09-14 04:43:09 +03:00
Ok ( _ ) = > {
// futures all exited without error. break instead of restarting subscriptions
break ;
}
2022-06-16 05:53:37 +03:00
Err ( err ) = > {
if reconnect {
warn! (
2022-09-05 19:25:21 +03:00
rpc = % self ,
2022-09-06 19:49:07 +03:00
? err ,
" subscription exited " ,
2022-06-16 05:53:37 +03:00
) ;
2022-09-14 05:11:48 +03:00
self . retrying_reconnect ( block_sender . as_ref ( ) , true ) . await ? ;
2022-06-16 05:53:37 +03:00
} else {
2022-09-05 19:25:21 +03:00
error! ( rpc = % self , ? err , " subscription exited " ) ;
2022-06-16 05:53:37 +03:00
return Err ( err ) ;
}
2022-06-14 08:43:28 +03:00
}
2022-06-14 07:04:14 +03:00
}
}
2022-09-06 19:49:07 +03:00
info! ( rpc = % self , " all subscriptions complete " ) ;
2022-09-06 16:14:15 +03:00
2022-06-14 07:04:14 +03:00
Ok ( ( ) )
}
2022-05-17 05:26:47 +03:00
/// Subscribe to new blocks. If `reconnect` is true, this runs forever.
2022-05-17 03:56:56 +03:00
#[ instrument(skip_all) ]
2022-06-14 07:04:14 +03:00
async fn subscribe_new_heads (
2022-05-05 22:07:09 +03:00
self : Arc < Self > ,
2022-06-29 22:15:05 +03:00
http_interval_receiver : Option < broadcast ::Receiver < ( ) > > ,
2022-07-22 08:11:26 +03:00
block_sender : flume ::Sender < BlockAndRpc > ,
2022-08-28 02:49:41 +03:00
block_map : BlockHashesMap ,
2022-05-05 22:07:09 +03:00
) -> anyhow ::Result < ( ) > {
2022-09-03 05:59:30 +03:00
info! ( % self , " watching new heads " ) ;
2022-06-14 07:04:14 +03:00
// TODO: is a RwLock of an Option<Arc> the right thing here?
if let Some ( provider ) = self . provider . read ( ) . await . clone ( ) {
match & * provider {
Web3Provider ::Http ( _provider ) = > {
// there is a "watch_blocks" function, but a lot of public nodes do not support the necessary rpc endpoints
2022-06-29 22:15:05 +03:00
// TODO: try watch_blocks and fall back to this?
let mut http_interval_receiver = http_interval_receiver . unwrap ( ) ;
2022-06-14 07:04:14 +03:00
2022-07-19 04:31:12 +03:00
let mut last_hash = H256 ::zero ( ) ;
2022-06-14 07:04:14 +03:00
loop {
2022-08-26 20:26:17 +03:00
// TODO: try, or wait_for?
match self . wait_for_request_handle ( ) . await {
Ok ( active_request_handle ) = > {
2022-06-14 07:04:14 +03:00
let block : Result < Block < TxHash > , _ > = active_request_handle
2022-09-14 07:27:18 +03:00
. request ( " eth_getBlockByNumber " , ( " latest " , false ) , false )
2022-06-14 07:04:14 +03:00
. await ;
2022-08-26 20:26:17 +03:00
match block {
Ok ( block ) = > {
// don't send repeat blocks
let new_hash = block
. hash
. expect ( " blocks here should always have hashes " ) ;
if new_hash ! = last_hash {
// new hash!
last_hash = new_hash ;
self . send_head_block_result (
2022-09-06 15:29:37 +03:00
Ok ( Some ( Arc ::new ( block ) ) ) ,
2022-08-26 20:26:17 +03:00
& block_sender ,
block_map . clone ( ) ,
)
2022-08-07 09:48:57 +03:00
. await ? ;
2022-08-26 20:26:17 +03:00
}
}
Err ( err ) = > {
// we did not get a block back. something is up with the server. take it out of rotation
self . send_head_block_result (
Err ( err ) ,
& block_sender ,
block_map . clone ( ) ,
)
. await ? ;
2022-07-19 07:31:30 +03:00
}
2022-05-16 22:15:40 +03:00
}
2022-06-14 07:04:14 +03:00
}
2022-07-09 01:14:45 +03:00
Err ( err ) = > {
2022-08-11 00:29:50 +03:00
warn! ( ? err , " Internal error on latest block from {} " , self ) ;
// TODO: what should we do? sleep? extra time?
2022-05-16 22:15:40 +03:00
}
}
2022-07-19 04:31:12 +03:00
2022-08-26 20:26:17 +03:00
// wait for the next interval
2022-07-19 04:31:12 +03:00
// TODO: if error or rate limit, increase interval?
while let Err ( err ) = http_interval_receiver . recv ( ) . await {
match err {
broadcast ::error ::RecvError ::Closed = > {
2022-08-26 20:26:17 +03:00
// channel is closed! that's not good. bubble the error up
2022-07-19 04:31:12 +03:00
return Err ( err . into ( ) ) ;
}
broadcast ::error ::RecvError ::Lagged ( lagged ) = > {
2022-08-26 20:26:17 +03:00
// querying the block was delayed
// this can happen if tokio is very busy or waiting for requests limits took too long
2022-09-05 19:25:21 +03:00
warn! ( ? err , rpc = % self , " http interval lagging by {}! " , lagged ) ;
2022-07-19 04:31:12 +03:00
}
}
}
2022-09-05 19:25:21 +03:00
trace! ( rpc = % self , " ok http interval " ) ;
2022-05-15 22:28:22 +03:00
}
2022-06-14 07:04:14 +03:00
}
Web3Provider ::Ws ( provider ) = > {
2022-08-30 23:01:42 +03:00
// todo: move subscribe_blocks onto the request handle?
2022-06-14 07:04:14 +03:00
let active_request_handle = self . wait_for_request_handle ( ) . await ;
let mut stream = provider . subscribe_blocks ( ) . await ? ;
drop ( active_request_handle ) ;
// query the block once since the subscription doesn't send the current block
// there is a very small race condition here where the stream could send us a new block right now
// all it does is print "new block" for the same block as current block
2022-09-06 15:29:37 +03:00
let block : Result < Option < ArcBlock > , _ > = self
2022-06-14 07:04:14 +03:00
. wait_for_request_handle ( )
2022-08-06 08:26:43 +03:00
. await ?
2022-09-14 07:27:18 +03:00
. request ( " eth_getBlockByNumber " , ( " latest " , false ) , false )
2022-08-26 20:26:17 +03:00
. await
2022-09-06 15:29:37 +03:00
. map ( | x | Some ( Arc ::new ( x ) ) ) ;
2022-06-14 07:04:14 +03:00
2022-09-06 19:49:07 +03:00
let mut last_hash = match & block {
Ok ( Some ( new_block ) ) = > new_block
. hash
. expect ( " blocks should always have a hash here " ) ,
_ = > H256 ::zero ( ) ,
} ;
2022-08-26 20:26:17 +03:00
self . send_head_block_result ( block , & block_sender , block_map . clone ( ) )
. await ? ;
2022-06-14 07:04:14 +03:00
2022-07-09 01:14:45 +03:00
while let Some ( new_block ) = stream . next ( ) . await {
2022-09-06 19:49:07 +03:00
// TODO: check the new block's hash to be sure we don't send dupes
let new_hash = new_block
. hash
. expect ( " blocks should always have a hash here " ) ;
if new_hash = = last_hash {
// some rpcs like to give us duplicates. don't waste our time on them
continue ;
} else {
last_hash = new_hash ;
}
2022-08-26 20:26:17 +03:00
self . send_head_block_result (
2022-09-06 15:29:37 +03:00
Ok ( Some ( Arc ::new ( new_block ) ) ) ,
2022-08-26 20:26:17 +03:00
& block_sender ,
block_map . clone ( ) ,
)
. await ? ;
2022-05-17 05:26:47 +03:00
}
2022-07-09 01:14:45 +03:00
2022-09-06 19:49:07 +03:00
// TODO: is this always an error?
// TODO: we probably don't want a warn and to return error
warn! ( rpc = % self , " new_heads subscription ended " ) ;
return Err ( anyhow ::anyhow! ( " new_heads subscription ended " ) ) ;
2022-05-05 22:07:09 +03:00
}
2022-05-17 05:26:47 +03:00
}
2022-06-14 07:04:14 +03:00
}
2022-05-17 03:56:56 +03:00
2022-06-14 07:04:14 +03:00
Ok ( ( ) )
}
2022-05-17 05:26:47 +03:00
2022-06-14 07:04:14 +03:00
#[ instrument(skip_all) ]
async fn subscribe_pending_transactions (
self : Arc < Self > ,
tx_id_sender : flume ::Sender < ( TxHash , Arc < Self > ) > ,
) -> anyhow ::Result < ( ) > {
2022-09-03 05:59:30 +03:00
info! ( % self , " watching pending transactions " ) ;
2022-06-14 07:04:14 +03:00
// TODO: is a RwLock of an Option<Arc> the right thing here?
if let Some ( provider ) = self . provider . read ( ) . await . clone ( ) {
match & * provider {
2022-06-18 10:06:54 +03:00
Web3Provider ::Http ( provider ) = > {
2022-06-14 07:04:14 +03:00
// there is a "watch_pending_transactions" function, but a lot of public nodes do not support the necessary rpc endpoints
// TODO: what should this interval be? probably automatically set to some fraction of block time
// TODO: maybe it would be better to have one interval for all of the http providers, but this works for now
// TODO: if there are some websocket providers, maybe have a longer interval and a channel that tells the https to update when a websocket gets a new head? if they are slow this wouldn't work well though
2022-06-14 08:43:28 +03:00
let mut interval = interval ( Duration ::from_secs ( 60 ) ) ;
2022-06-14 07:04:14 +03:00
interval . set_missed_tick_behavior ( MissedTickBehavior ::Delay ) ;
loop {
2022-06-14 08:43:28 +03:00
// TODO: actually do something here
/*
2022-06-14 07:04:14 +03:00
match self . try_request_handle ( ) . await {
Ok ( active_request_handle ) = > {
// TODO: check the filter
2022-07-22 22:30:39 +03:00
todo! ( " actually send a request " ) ;
2022-06-14 07:04:14 +03:00
}
Err ( e ) = > {
warn! ( " Failed getting latest block from {}: {:?} " , self , e ) ;
}
}
2022-06-14 08:43:28 +03:00
* /
2022-07-07 06:22:09 +03:00
// wait for the interval
// TODO: if error or rate limit, increase interval?
interval . tick ( ) . await ;
2022-06-14 07:04:14 +03:00
}
}
Web3Provider ::Ws ( provider ) = > {
2022-07-09 02:02:32 +03:00
// TODO: maybe the subscribe_pending_txs function should be on the active_request_handle
2022-06-14 07:04:14 +03:00
let active_request_handle = self . wait_for_request_handle ( ) . await ;
let mut stream = provider . subscribe_pending_txs ( ) . await ? ;
drop ( active_request_handle ) ;
2022-07-08 21:27:06 +03:00
while let Some ( pending_tx_id ) = stream . next ( ) . await {
tx_id_sender
. send_async ( ( pending_tx_id , self . clone ( ) ) )
. await
. context ( " tx_id_sender " ) ? ;
2022-08-11 00:29:50 +03:00
// TODO: periodically check for listeners. if no one is subscribed, unsubscribe and wait for a subscription
2022-06-14 07:04:14 +03:00
}
2022-07-08 21:27:06 +03:00
2022-09-06 19:49:07 +03:00
// TODO: is this always an error?
// TODO: we probably don't want a warn and to return error
warn! ( rpc = % self , " pending_transactions subscription ended " ) ;
return Err ( anyhow ::anyhow! ( " pending_transactions subscription ended " ) ) ;
2022-06-14 07:04:14 +03:00
}
2022-05-05 22:07:09 +03:00
}
}
Ok ( ( ) )
}
2022-08-30 23:01:42 +03:00
/// be careful with this; it might wait forever!
2022-08-24 03:59:05 +03:00
// TODO: maximum wait time?
2022-08-30 23:01:42 +03:00
#[ instrument ]
2022-08-24 03:14:49 +03:00
pub async fn wait_for_request_handle ( self : & Arc < Self > ) -> anyhow ::Result < OpenRequestHandle > {
2022-07-09 02:02:32 +03:00
// TODO: maximum wait time? i think timeouts in other parts of the code are probably best
2022-05-16 22:15:40 +03:00
2022-06-17 01:23:41 +03:00
loop {
2022-08-30 23:01:42 +03:00
let x = self . try_request_handle ( ) . await ;
trace! ( ? x , " try_request_handle " ) ;
match x {
2022-08-24 03:59:05 +03:00
Ok ( OpenRequestResult ::Handle ( handle ) ) = > return Ok ( handle ) ,
2022-08-24 03:14:49 +03:00
Ok ( OpenRequestResult ::RetryAt ( retry_at ) ) = > {
2022-08-07 09:48:57 +03:00
// TODO: emit a stat?
2022-08-30 23:01:42 +03:00
trace! ( ? retry_at ) ;
2022-08-07 09:48:57 +03:00
sleep_until ( retry_at ) . await ;
}
2022-08-30 23:01:42 +03:00
Ok ( OpenRequestResult ::RetryNever ) = > {
2022-08-24 03:59:05 +03:00
// TODO: when can this happen? log? emit a stat?
// TODO: subscribe to the head block on this
2022-08-07 09:48:57 +03:00
// TODO: sleep how long? maybe just error?
2022-08-30 23:01:42 +03:00
return Err ( anyhow ::anyhow! ( " unable to retry " ) ) ;
2022-05-06 07:29:25 +03:00
}
2022-08-07 09:48:57 +03:00
Err ( err ) = > return Err ( err ) ,
2022-05-06 07:29:25 +03:00
}
2022-05-05 22:07:09 +03:00
}
}
2022-08-30 23:01:42 +03:00
#[ instrument ]
2022-08-26 20:26:17 +03:00
pub async fn try_request_handle ( self : & Arc < Self > ) -> anyhow ::Result < OpenRequestResult > {
2022-06-04 01:22:55 +03:00
// check that we are connected
if ! self . has_provider ( ) . await {
2022-08-07 09:48:57 +03:00
// TODO: emit a stat?
2022-08-30 23:01:42 +03:00
return Ok ( OpenRequestResult ::RetryNever ) ;
2022-06-04 01:22:55 +03:00
}
2022-05-05 22:07:09 +03:00
// check rate limits
2022-05-22 02:34:05 +03:00
if let Some ( ratelimiter ) = self . hard_limit . as_ref ( ) {
2022-09-10 03:12:14 +03:00
match ratelimiter . throttle ( ) . await ? {
2022-09-15 20:57:24 +03:00
RedisRateLimitResult ::Allowed ( _ ) = > {
2022-08-30 23:01:42 +03:00
trace! ( " rate limit succeeded " )
2022-05-05 22:07:09 +03:00
}
2022-09-15 20:57:24 +03:00
RedisRateLimitResult ::RetryAt ( retry_at , _ ) = > {
2022-05-05 22:07:09 +03:00
// rate limit failed
2022-05-22 02:34:05 +03:00
// save the smallest retry_after. if nothing succeeds, return an Err with retry_after in it
2022-05-05 22:07:09 +03:00
// TODO: use tracing better
2022-06-25 06:33:49 +03:00
// TODO: i'm seeing "Exhausted rate limit on moralis: 0ns". How is it getting 0?
2022-09-05 19:25:21 +03:00
warn! ( ? retry_at , rpc = % self , " Exhausted rate limit " ) ;
2022-05-05 22:07:09 +03:00
2022-09-10 03:12:14 +03:00
return Ok ( OpenRequestResult ::RetryAt ( retry_at ) ) ;
2022-08-07 09:48:57 +03:00
}
2022-09-15 20:57:24 +03:00
RedisRateLimitResult ::RetryNever = > {
2022-09-10 03:12:14 +03:00
return Ok ( OpenRequestResult ::RetryNever ) ;
2022-05-05 22:07:09 +03:00
}
}
} ;
2022-08-24 03:14:49 +03:00
let handle = OpenRequestHandle ::new ( self . clone ( ) ) ;
2022-08-24 03:11:49 +03:00
2022-08-24 03:59:05 +03:00
Ok ( OpenRequestResult ::Handle ( handle ) )
2022-08-24 03:11:49 +03:00
}
}
2022-08-07 09:48:57 +03:00
2022-08-24 03:11:49 +03:00
impl fmt ::Debug for Web3Provider {
fn fmt ( & self , f : & mut fmt ::Formatter < '_ > ) -> fmt ::Result {
// TODO: the default Debug takes forever to write. this is too quiet though. we at least need the url
f . debug_struct ( " Web3Provider " ) . finish_non_exhaustive ( )
2022-05-06 07:29:25 +03:00
}
}
2022-06-14 07:04:14 +03:00
impl Hash for Web3Connection {
fn hash < H : Hasher > ( & self , state : & mut H ) {
2022-08-24 03:32:16 +03:00
// TODO: is this enough?
self . name . hash ( state ) ;
2022-06-14 07:04:14 +03:00
}
}
2022-05-05 22:07:09 +03:00
impl Eq for Web3Connection { }
impl Ord for Web3Connection {
fn cmp ( & self , other : & Self ) -> std ::cmp ::Ordering {
2022-08-24 03:32:16 +03:00
self . name . cmp ( & other . name )
2022-05-05 22:07:09 +03:00
}
}
impl PartialOrd for Web3Connection {
fn partial_cmp ( & self , other : & Self ) -> Option < Ordering > {
Some ( self . cmp ( other ) )
}
}
impl PartialEq for Web3Connection {
fn eq ( & self , other : & Self ) -> bool {
2022-08-24 03:32:16 +03:00
self . name = = other . name
2022-05-05 22:07:09 +03:00
}
}
2022-08-10 08:56:09 +03:00
impl Serialize for Web3Connection {
fn serialize < S > ( & self , serializer : S ) -> Result < S ::Ok , S ::Error >
where
S : Serializer ,
{
// 3 is the number of fields in the struct.
2022-09-06 23:12:45 +03:00
let mut state = serializer . serialize_struct ( " Web3Connection " , 6 ) ? ;
2022-08-10 08:56:09 +03:00
// the url is excluded because it likely includes private information. just show the name
state . serialize_field ( " name " , & self . name ) ? ;
let block_data_limit = self . block_data_limit . load ( atomic ::Ordering ::Relaxed ) ;
2022-09-06 23:12:45 +03:00
if block_data_limit = = u64 ::MAX {
2022-09-07 06:54:16 +03:00
state . serialize_field ( " block_data_limit " , & None ::< ( ) > ) ? ;
2022-09-06 23:12:45 +03:00
} else {
state . serialize_field ( " block_data_limit " , & block_data_limit ) ? ;
}
2022-08-10 08:56:09 +03:00
state . serialize_field ( " soft_limit " , & self . soft_limit ) ? ;
state . serialize_field (
" active_requests " ,
& self . active_requests . load ( atomic ::Ordering ::Relaxed ) ,
) ? ;
2022-09-06 23:12:45 +03:00
state . serialize_field (
" total_requests " ,
& self . total_requests . load ( atomic ::Ordering ::Relaxed ) ,
) ? ;
2022-09-05 19:39:46 +03:00
let head_block_id = & * self . head_block_id . read ( ) ;
state . serialize_field ( " head_block_id " , head_block_id ) ? ;
2022-08-10 08:56:09 +03:00
state . end ( )
}
}
impl fmt ::Debug for Web3Connection {
fn fmt ( & self , f : & mut fmt ::Formatter < '_ > ) -> fmt ::Result {
let mut f = f . debug_struct ( " Web3Connection " ) ;
2022-08-24 03:32:16 +03:00
f . field ( " name " , & self . name ) ;
2022-08-10 08:56:09 +03:00
let block_data_limit = self . block_data_limit . load ( atomic ::Ordering ::Relaxed ) ;
if block_data_limit = = u64 ::MAX {
2022-09-07 06:54:16 +03:00
f . field ( " blocks " , & " all " ) ;
2022-08-10 08:56:09 +03:00
} else {
2022-09-07 06:54:16 +03:00
f . field ( " blocks " , & block_data_limit ) ;
2022-08-10 08:56:09 +03:00
}
f . finish_non_exhaustive ( )
}
}
impl fmt ::Display for Web3Connection {
fn fmt ( & self , f : & mut fmt ::Formatter < '_ > ) -> fmt ::Result {
// TODO: filter basic auth and api keys
2022-08-24 03:32:16 +03:00
write! ( f , " {} " , & self . name )
2022-08-10 08:56:09 +03:00
}
}