2022-08-27 02:44:25 +03:00
///! Rate-limited communication with a web3 provider.
2023-02-26 10:52:33 +03:00
use super ::blockchain ::{ ArcBlock , BlocksByHashCache , Web3ProxyBlock } ;
2022-08-24 02:56:47 +03:00
use super ::provider ::Web3Provider ;
2023-02-06 05:16:09 +03:00
use super ::request ::{ OpenRequestHandle , OpenRequestResult } ;
2022-08-24 02:56:47 +03:00
use crate ::app ::{ flatten_handle , AnyhowJoinHandle } ;
2023-02-12 12:22:53 +03:00
use crate ::config ::{ BlockAndRpc , Web3RpcConfig } ;
2022-11-08 22:58:11 +03:00
use crate ::frontend ::authorization ::Authorization ;
2023-03-20 04:52:28 +03:00
use crate ::frontend ::errors ::{ Web3ProxyError , Web3ProxyResult } ;
2023-04-06 01:34:28 +03:00
use crate ::rpcs ::request ::RequestErrorHandler ;
2023-02-12 12:22:53 +03:00
use anyhow ::{ anyhow , Context } ;
2022-11-07 00:05:03 +03:00
use ethers ::prelude ::{ Bytes , Middleware , ProviderError , TxHash , H256 , U64 } ;
2023-02-16 11:26:58 +03:00
use ethers ::types ::{ Address , Transaction , U256 } ;
2023-03-23 04:43:13 +03:00
use futures ::future ::try_join_all ;
2023-03-31 14:43:41 +03:00
use futures ::StreamExt ;
2022-11-25 03:45:13 +03:00
use log ::{ debug , error , info , trace , warn , Level } ;
2022-11-14 21:24:52 +03:00
use migration ::sea_orm ::DatabaseConnection ;
2023-02-16 11:26:58 +03:00
use ordered_float ::OrderedFloat ;
2022-08-10 05:37:34 +03:00
use parking_lot ::RwLock ;
2022-09-15 20:57:24 +03:00
use redis_rate_limiter ::{ RedisPool , RedisRateLimitResult , RedisRateLimiter } ;
2022-05-21 01:16:15 +03:00
use serde ::ser ::{ SerializeStruct , Serializer } ;
use serde ::Serialize ;
2022-09-24 05:47:44 +03:00
use serde_json ::json ;
2022-09-14 04:43:09 +03:00
use std ::cmp ::min ;
2022-05-05 22:07:09 +03:00
use std ::fmt ;
2022-06-14 07:04:14 +03:00
use std ::hash ::{ Hash , Hasher } ;
2023-02-27 10:52:37 +03:00
use std ::sync ::atomic ::{ self , AtomicBool , AtomicU64 , AtomicUsize } ;
2022-05-05 22:07:09 +03:00
use std ::{ cmp ::Ordering , sync ::Arc } ;
2022-11-12 09:11:58 +03:00
use thread_fast_rng ::rand ::Rng ;
use thread_fast_rng ::thread_fast_rng ;
2023-01-25 07:44:50 +03:00
use tokio ::sync ::{ broadcast , oneshot , watch , RwLock as AsyncRwLock } ;
2023-02-06 05:16:09 +03:00
use tokio ::time ::{ sleep , sleep_until , timeout , Duration , Instant } ;
2022-12-06 00:13:36 +03:00
2023-02-15 23:33:43 +03:00
pub struct Latency {
/// exponentially weighted moving average of how many milliseconds behind the fastest node we are
2023-02-16 11:26:58 +03:00
ewma : ewma ::EWMA ,
2023-02-06 20:55:27 +03:00
}
2023-02-15 23:33:43 +03:00
impl Serialize for Latency {
fn serialize < S > ( & self , serializer : S ) -> Result < S ::Ok , S ::Error >
where
S : Serializer ,
{
2023-02-16 11:26:58 +03:00
serializer . serialize_f64 ( self . ewma . value ( ) )
2023-02-15 23:33:43 +03:00
}
}
impl Latency {
2023-02-16 11:26:58 +03:00
#[ inline(always) ]
pub fn record ( & mut self , duration : Duration ) {
self . record_ms ( duration . as_secs_f64 ( ) * 1000.0 ) ;
}
#[ inline(always) ]
pub fn record_ms ( & mut self , milliseconds : f64 ) {
2023-02-15 23:33:43 +03:00
self . ewma . add ( milliseconds ) ;
2023-02-16 11:26:58 +03:00
}
2023-02-15 23:33:43 +03:00
2023-02-16 11:26:58 +03:00
#[ inline(always) ]
pub fn value ( & self ) -> f64 {
self . ewma . value ( )
2023-02-15 23:33:43 +03:00
}
}
impl Default for Latency {
2023-02-06 20:55:27 +03:00
fn default ( ) -> Self {
2023-02-15 23:33:43 +03:00
// TODO: what should the default span be? 25 requests? have a "new"
let span = 25.0 ;
2023-02-16 11:26:58 +03:00
let start = 1000.0 ;
Self ::new ( span , start )
2023-02-15 23:33:43 +03:00
}
}
impl Latency {
2023-02-16 11:26:58 +03:00
// depending on the span, start might not be perfect
pub fn new ( span : f64 , start : f64 ) -> Self {
2023-02-15 23:33:43 +03:00
let alpha = Self ::span_to_alpha ( span ) ;
2023-02-16 11:26:58 +03:00
let mut ewma = ewma ::EWMA ::new ( alpha ) ;
2023-02-15 23:33:43 +03:00
2023-02-16 11:26:58 +03:00
if start > 0.0 {
for _ in 0 .. ( span as u64 ) {
ewma . add ( start ) ;
}
}
Self { ewma }
2023-02-15 23:33:43 +03:00
}
fn span_to_alpha ( span : f64 ) -> f64 {
2.0 / ( span + 1.0 )
2023-02-06 20:55:27 +03:00
}
}
2022-09-15 20:57:24 +03:00
/// An active connection to a Web3 RPC server like geth or erigon.
2023-02-03 01:48:23 +03:00
#[ derive(Default) ]
2023-02-06 20:55:27 +03:00
pub struct Web3Rpc {
2022-08-24 03:11:49 +03:00
pub name : String ,
2022-11-14 00:05:37 +03:00
pub display_name : Option < String > ,
2023-01-03 22:54:24 +03:00
pub db_conn : Option < DatabaseConnection > ,
2023-02-12 12:22:53 +03:00
pub ( super ) ws_url : Option < String > ,
pub ( super ) http_url : Option < String > ,
2022-09-15 20:57:24 +03:00
/// Some connections use an http_client. we keep a clone for reconnecting
2022-11-23 01:45:22 +03:00
pub ( super ) http_client : Option < reqwest ::Client > ,
2022-08-24 02:13:56 +03:00
/// provider is in a RwLock so that we can replace it if re-connecting
/// it is an async lock because we hold it open across awaits
2023-02-12 12:22:53 +03:00
/// this provider is only used for new heads subscriptions
2023-03-02 21:14:17 +03:00
/// TODO: watch channel instead of a lock
2023-01-26 08:24:09 +03:00
/// TODO: is this only used for new heads subscriptions? if so, rename
2023-02-12 21:22:20 +03:00
pub ( super ) provider : AsyncRwLock < Option < Arc < Web3Provider > > > ,
2023-01-26 08:24:09 +03:00
/// keep track of hard limits. Optional because we skip this code for our own servers.
2023-01-25 07:44:50 +03:00
pub ( super ) hard_limit_until : Option < watch ::Sender < Instant > > ,
2022-08-24 02:13:56 +03:00
/// rate limits are stored in a central redis so that multiple proxies can share their rate limits
2022-09-15 20:57:24 +03:00
/// We do not use the deferred rate limiter because going over limits would cause errors
2022-11-23 01:45:22 +03:00
pub ( super ) hard_limit : Option < RedisRateLimiter > ,
2022-08-24 02:13:56 +03:00
/// used for load balancing to the least loaded server
2022-08-26 20:26:17 +03:00
pub ( super ) soft_limit : u32 ,
2022-12-06 00:13:36 +03:00
/// use web3 queries to find the block data limit for archive/pruned nodes
pub ( super ) automatic_block_limit : bool ,
2023-01-19 13:13:00 +03:00
/// only use this rpc if everything else is lagging too far. this allows us to ignore fast but very low limit rpcs
2023-03-01 22:23:59 +03:00
pub backup : bool ,
2022-09-06 23:12:45 +03:00
/// TODO: have an enum for this so that "no limit" prints pretty?
2022-11-23 01:45:22 +03:00
pub ( super ) block_data_limit : AtomicU64 ,
2023-01-04 09:37:51 +03:00
/// Lower tiers are higher priority when sending requests
pub ( super ) tier : u64 ,
2023-02-12 12:22:53 +03:00
/// TODO: change this to a watch channel so that http providers can subscribe and take action on change.
2023-02-14 23:14:50 +03:00
pub ( super ) head_block : RwLock < Option < Web3ProxyBlock > > ,
2023-02-15 23:33:43 +03:00
/// Track head block latency
pub ( super ) head_latency : RwLock < Latency > ,
2023-02-16 11:26:58 +03:00
// /// Track request latency
// /// TODO: refactor this. this lock kills perf. for now just use head_latency
// pub(super) request_latency: RwLock<Latency>,
2023-02-15 23:33:43 +03:00
/// Track total requests served
/// TODO: maybe move this to graphana
pub ( super ) total_requests : AtomicUsize ,
2023-02-16 11:26:58 +03:00
pub ( super ) active_requests : AtomicUsize ,
2023-02-27 10:52:37 +03:00
pub ( super ) reconnect : AtomicBool ,
2023-03-02 21:27:32 +03:00
/// this is only inside an Option so that the "Default" derive works. it will always be set.
2023-02-27 10:52:37 +03:00
pub ( super ) disconnect_watch : Option < watch ::Sender < bool > > ,
2023-02-28 22:01:34 +03:00
pub ( super ) created_at : Option < Instant > ,
2022-08-24 02:13:56 +03:00
}
2023-02-06 20:55:27 +03:00
impl Web3Rpc {
2022-06-14 07:04:14 +03:00
/// Connect to a web3 rpc
2023-01-04 09:37:51 +03:00
// TODO: have this take a builder (which will have channels attached). or maybe just take the config and give the config public fields
2022-06-14 08:43:28 +03:00
#[ allow(clippy::too_many_arguments) ]
2022-06-14 07:04:14 +03:00
pub async fn spawn (
2023-02-12 12:22:53 +03:00
mut config : Web3RpcConfig ,
2022-08-10 08:56:09 +03:00
name : String ,
2022-07-19 07:21:32 +03:00
chain_id : u64 ,
2022-11-08 22:58:11 +03:00
db_conn : Option < DatabaseConnection > ,
2022-05-15 04:51:24 +03:00
// optional because this is only used for http providers. websocket providers don't use it
2022-07-19 07:21:32 +03:00
http_client : Option < reqwest ::Client > ,
2023-02-12 12:22:53 +03:00
// TODO: rename to http_new_head_interval_sender?
2022-06-29 22:15:05 +03:00
http_interval_sender : Option < Arc < broadcast ::Sender < ( ) > > > ,
2023-02-12 12:22:53 +03:00
redis_pool : Option < RedisPool > ,
// TODO: think more about soft limit. watching ewma of requests is probably better. but what should the random sort be on? maybe group on tier is enough
// soft_limit: u32,
2023-02-26 10:52:33 +03:00
block_map : BlocksByHashCache ,
2022-07-22 08:11:26 +03:00
block_sender : Option < flume ::Sender < BlockAndRpc > > ,
2022-06-14 08:43:28 +03:00
tx_id_sender : Option < flume ::Sender < ( TxHash , Arc < Self > ) > > ,
reconnect : bool ,
2023-02-06 20:55:27 +03:00
) -> anyhow ::Result < ( Arc < Web3Rpc > , AnyhowJoinHandle < ( ) > ) > {
2023-02-28 22:01:34 +03:00
let created_at = Instant ::now ( ) ;
2023-02-12 12:22:53 +03:00
let hard_limit = match ( config . hard_limit , redis_pool ) {
( None , None ) = > None ,
( Some ( hard_limit ) , Some ( redis_pool ) ) = > {
// TODO: in process rate limiter instead? or is deffered good enough?
let rrl = RedisRateLimiter ::new (
" web3_proxy " ,
& format! ( " {} : {} " , chain_id , name ) ,
hard_limit ,
60.0 ,
redis_pool ,
) ;
Some ( rrl )
}
( None , Some ( _ ) ) = > None ,
( Some ( _hard_limit ) , None ) = > {
return Err ( anyhow ::anyhow! (
" no redis client pool! needed for hard limit "
) )
}
} ;
let tx_id_sender = if config . subscribe_txs {
// TODO: warn if tx_id_sender is None?
tx_id_sender
} else {
None
} ;
let backup = config . backup ;
let block_data_limit : AtomicU64 = config . block_data_limit . unwrap_or_default ( ) . into ( ) ;
2022-12-06 00:13:36 +03:00
let automatic_block_limit =
( block_data_limit . load ( atomic ::Ordering ::Acquire ) = = 0 ) & & block_sender . is_some ( ) ;
2022-11-25 03:45:13 +03:00
2023-01-25 07:44:50 +03:00
// track hard limit until on backup servers (which might surprise us with rate limit changes)
// and track on servers that have a configured hard limit
let hard_limit_until = if backup | | hard_limit . is_some ( ) {
let ( sender , _ ) = watch ::channel ( Instant ::now ( ) ) ;
Some ( sender )
} else {
None
} ;
2023-02-12 12:22:53 +03:00
if config . ws_url . is_none ( ) & & config . http_url . is_none ( ) {
if let Some ( url ) = config . url {
if url . starts_with ( " ws " ) {
config . ws_url = Some ( url ) ;
} else if url . starts_with ( " http " ) {
config . http_url = Some ( url ) ;
} else {
return Err ( anyhow! ( " only ws or http urls are supported " ) ) ;
}
} else {
return Err ( anyhow! (
" either ws_url or http_url are required. it is best to set both "
) ) ;
}
}
2023-02-27 10:52:37 +03:00
let ( disconnect_sender , disconnect_receiver ) = watch ::channel ( false ) ;
let reconnect = reconnect . into ( ) ;
2022-07-09 07:25:59 +03:00
let new_connection = Self {
2022-08-10 08:56:09 +03:00
name ,
2023-01-03 22:54:24 +03:00
db_conn : db_conn . clone ( ) ,
2023-02-12 12:22:53 +03:00
display_name : config . display_name ,
2022-09-14 05:11:48 +03:00
http_client ,
2023-02-12 12:22:53 +03:00
ws_url : config . ws_url ,
http_url : config . http_url ,
2022-05-22 02:34:05 +03:00
hard_limit ,
2023-01-25 07:44:50 +03:00
hard_limit_until ,
2023-02-12 12:22:53 +03:00
soft_limit : config . soft_limit ,
2022-12-06 00:13:36 +03:00
automatic_block_limit ,
2023-01-19 13:13:00 +03:00
backup ,
2022-11-25 03:45:13 +03:00
block_data_limit ,
2023-02-27 10:52:37 +03:00
reconnect ,
2023-02-12 12:22:53 +03:00
tier : config . tier ,
2023-02-27 10:52:37 +03:00
disconnect_watch : Some ( disconnect_sender ) ,
2023-02-28 22:01:34 +03:00
created_at : Some ( created_at ) ,
2023-01-26 08:24:09 +03:00
head_block : RwLock ::new ( Default ::default ( ) ) ,
2023-02-06 20:55:27 +03:00
.. Default ::default ( )
2022-05-12 21:49:57 +03:00
} ;
2022-07-09 07:25:59 +03:00
let new_connection = Arc ::new ( new_connection ) ;
2022-05-12 21:49:57 +03:00
2022-07-19 04:31:12 +03:00
// subscribe to new blocks and new transactions
2022-12-06 00:13:36 +03:00
// subscribing starts the connection (with retries)
2022-07-19 04:31:12 +03:00
// TODO: make transaction subscription optional (just pass None for tx_id_sender)
2022-06-14 08:43:28 +03:00
let handle = {
2022-07-09 07:25:59 +03:00
let new_connection = new_connection . clone ( ) ;
2022-12-06 00:13:36 +03:00
let authorization = Arc ::new ( Authorization ::internal ( db_conn ) ? ) ;
2022-06-14 08:43:28 +03:00
tokio ::spawn ( async move {
2022-07-09 07:25:59 +03:00
new_connection
2022-08-26 20:26:17 +03:00
. subscribe (
2022-11-08 22:58:11 +03:00
& authorization ,
2022-08-26 20:26:17 +03:00
block_map ,
block_sender ,
2022-12-06 00:13:36 +03:00
chain_id ,
2023-02-27 10:52:37 +03:00
disconnect_receiver ,
2022-12-06 00:13:36 +03:00
http_interval_sender ,
tx_id_sender ,
2022-08-26 20:26:17 +03:00
)
2022-06-14 08:43:28 +03:00
. await
} )
} ;
2022-08-27 05:13:36 +03:00
Ok ( ( new_connection , handle ) )
}
2022-07-19 04:31:12 +03:00
2023-01-26 08:24:09 +03:00
pub fn peak_ewma ( & self ) -> OrderedFloat < f64 > {
2023-02-16 11:26:58 +03:00
// TODO: use request instead of head latency? that was killing perf though
let head_ewma = self . head_latency . read ( ) . value ( ) ;
// TODO: what ordering?
let active_requests = self . active_requests . load ( atomic ::Ordering ::Relaxed ) as f64 ;
// TODO: i'm not sure head * active is exactly right. but we'll see
// TODO: i don't think this actually counts as peak. investigate with atomics.rs and peak_ewma.rs
OrderedFloat ( head_ewma * active_requests )
}
2022-12-28 05:17:11 +03:00
// TODO: would be great if rpcs exposed this. see https://github.com/ledgerwatch/erigon/issues/6391
2022-11-08 22:58:11 +03:00
async fn check_block_data_limit (
self : & Arc < Self > ,
authorization : & Arc < Authorization > ,
2023-02-12 12:22:53 +03:00
unlocked_provider : Option < Arc < Web3Provider > > ,
2022-11-08 22:58:11 +03:00
) -> anyhow ::Result < Option < u64 > > {
2022-12-06 00:13:36 +03:00
if ! self . automatic_block_limit {
2022-12-28 05:17:11 +03:00
// TODO: is this a good thing to return?
2022-12-06 00:13:36 +03:00
return Ok ( None ) ;
}
2023-01-19 14:05:39 +03:00
// TODO: check eth_syncing. if it is not false, return Ok(None)
2022-12-06 00:13:36 +03:00
2022-12-28 05:17:11 +03:00
let mut limit = None ;
2022-11-25 03:45:13 +03:00
// TODO: binary search between 90k and max?
2022-12-06 00:13:36 +03:00
// TODO: start at 0 or 1?
2022-11-25 03:45:13 +03:00
for block_data_limit in [ 0 , 32 , 64 , 128 , 256 , 512 , 1024 , 90_000 , u64 ::MAX ] {
2022-12-06 00:13:36 +03:00
let handle = self
2023-02-12 12:22:53 +03:00
. wait_for_request_handle ( authorization , None , unlocked_provider . clone ( ) )
2022-12-06 00:13:36 +03:00
. await ? ;
2022-07-19 04:31:12 +03:00
2022-12-06 00:13:36 +03:00
let head_block_num_future = handle . request ::< Option < ( ) > , U256 > (
" eth_blockNumber " ,
& None ,
// error here are expected, so keep the level low
Level ::Debug . into ( ) ,
2023-02-12 12:22:53 +03:00
unlocked_provider . clone ( ) ,
2022-12-06 00:13:36 +03:00
) ;
2022-08-11 00:52:28 +03:00
2022-12-06 00:13:36 +03:00
let head_block_num = timeout ( Duration ::from_secs ( 5 ) , head_block_num_future )
. await
. context ( " timeout fetching eth_blockNumber " ) ?
. context ( " provider error " ) ? ;
2022-09-06 06:26:23 +03:00
2022-11-25 03:45:13 +03:00
let maybe_archive_block = head_block_num . saturating_sub ( ( block_data_limit ) . into ( ) ) ;
2022-08-27 05:13:36 +03:00
2022-12-06 00:13:36 +03:00
trace! (
" checking maybe_archive_block on {}: {} " ,
self ,
maybe_archive_block
) ;
2022-09-06 06:26:23 +03:00
// TODO: wait for the handle BEFORE we check the current block number. it might be delayed too!
2022-09-20 09:00:27 +03:00
// TODO: what should the request be?
2022-12-06 00:13:36 +03:00
let handle = self
2023-02-12 12:22:53 +03:00
. wait_for_request_handle ( authorization , None , unlocked_provider . clone ( ) )
2022-12-06 00:13:36 +03:00
. await ? ;
let archive_result : Result < Bytes , _ > = handle
2022-08-27 05:13:36 +03:00
. request (
" eth_getCode " ,
2022-09-24 05:47:44 +03:00
& json! ( (
2022-08-27 05:13:36 +03:00
" 0xdead00000000000000000000000000000000beef " ,
maybe_archive_block ,
2022-09-24 05:47:44 +03:00
) ) ,
2022-09-21 07:48:21 +03:00
// error here are expected, so keep the level low
2022-11-25 03:45:13 +03:00
Level ::Trace . into ( ) ,
2023-02-12 12:22:53 +03:00
unlocked_provider . clone ( ) ,
2022-08-27 05:13:36 +03:00
)
. await ;
2022-11-25 03:45:13 +03:00
trace! (
2022-12-06 00:13:36 +03:00
" archive_result on {} for {} ({}): {:?} " ,
self ,
2022-11-25 03:45:13 +03:00
block_data_limit ,
2022-12-06 00:13:36 +03:00
maybe_archive_block ,
2022-11-25 03:45:13 +03:00
archive_result
) ;
2022-08-27 05:13:36 +03:00
2022-11-25 03:45:13 +03:00
if archive_result . is_err ( ) {
2022-08-27 05:13:36 +03:00
break ;
2022-07-19 04:31:12 +03:00
}
2022-11-25 03:45:13 +03:00
limit = Some ( block_data_limit ) ;
2022-07-19 04:31:12 +03:00
}
2022-08-27 05:13:36 +03:00
if let Some ( limit ) = limit {
2023-01-03 22:54:24 +03:00
if limit = = 0 {
warn! ( " {} is unable to serve requests " , self ) ;
}
2022-08-27 05:13:36 +03:00
self . block_data_limit
. store ( limit , atomic ::Ordering ::Release ) ;
}
2022-07-19 04:31:12 +03:00
2023-02-22 08:14:49 +03:00
if limit = = Some ( u64 ::MAX ) {
info! ( " block data limit on {}: archive " , self ) ;
} else {
info! ( " block data limit on {}: {:?} " , self , limit ) ;
}
2022-11-25 03:45:13 +03:00
2022-08-27 05:13:36 +03:00
Ok ( limit )
2022-07-09 07:25:59 +03:00
}
2022-11-04 01:16:27 +03:00
/// TODO: this might be too simple. different nodes can prune differently. its possible we will have a block range
2022-07-25 03:27:00 +03:00
pub fn block_data_limit ( & self ) -> U64 {
2023-01-03 22:54:24 +03:00
self . block_data_limit . load ( atomic ::Ordering ::Acquire ) . into ( )
2022-07-19 04:31:12 +03:00
}
2022-07-22 22:30:39 +03:00
pub fn has_block_data ( & self , needed_block_num : & U64 ) -> bool {
2023-02-14 23:14:50 +03:00
let head_block_num = match self . head_block . read ( ) . as_ref ( ) {
2022-09-06 06:26:23 +03:00
None = > return false ,
2023-02-14 23:14:50 +03:00
Some ( x ) = > * x . number ( ) ,
2022-09-06 06:26:23 +03:00
} ;
2022-07-19 04:31:12 +03:00
2022-11-20 01:05:51 +03:00
// this rpc doesn't have that block yet. still syncing
2022-11-22 23:23:08 +03:00
if needed_block_num > & head_block_num {
2023-01-26 08:24:09 +03:00
trace! (
" {} has head {} but needs {} " ,
self ,
head_block_num ,
needed_block_num ,
) ;
2022-11-04 01:16:27 +03:00
return false ;
}
// if this is a pruning node, we might not actually have the block
let block_data_limit : U64 = self . block_data_limit ( ) ;
2022-11-22 23:23:08 +03:00
let oldest_block_num = head_block_num . saturating_sub ( block_data_limit ) ;
2022-07-19 04:31:12 +03:00
2023-01-26 08:24:09 +03:00
if needed_block_num < & oldest_block_num {
trace! (
" {} needs {} but the oldest available is {} " ,
self ,
needed_block_num ,
oldest_block_num
) ;
return false ;
}
true
2022-05-05 22:07:09 +03:00
}
2022-09-14 04:43:09 +03:00
/// reconnect to the provider. errors are retried forever with exponential backoff with jitter.
2022-10-25 07:12:24 +03:00
/// We use the "Decorrelated" jitter from <https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/>
2022-09-14 04:43:09 +03:00
/// TODO: maybe it would be better to use "Full Jitter". The "Full Jitter" approach uses less work, but slightly more time.
2022-12-06 00:13:36 +03:00
pub async fn retrying_connect (
2022-09-14 04:43:09 +03:00
self : & Arc < Self > ,
block_sender : Option < & flume ::Sender < BlockAndRpc > > ,
2022-12-06 00:13:36 +03:00
chain_id : u64 ,
db_conn : Option < & DatabaseConnection > ,
delay_start : bool ,
2022-09-14 04:43:09 +03:00
) -> anyhow ::Result < ( ) > {
// there are several crates that have retry helpers, but they all seem more complex than necessary
2022-11-14 21:24:52 +03:00
// TODO: move this backoff logic into a helper function so we can use it when doing database locking
2022-09-14 04:43:09 +03:00
let base_ms = 500 ;
let cap_ms = 30_000 ;
let range_multiplier = 3 ;
// sleep once before the initial retry attempt
2022-09-14 05:11:48 +03:00
// TODO: now that we use this method for our initial connection, do we still want this sleep?
2022-12-06 00:13:36 +03:00
let mut sleep_ms = if delay_start {
2022-09-14 05:11:48 +03:00
let first_sleep_ms = min (
cap_ms ,
2022-11-12 09:11:58 +03:00
thread_fast_rng ( ) . gen_range ( base_ms .. ( base_ms * range_multiplier ) ) ,
2022-09-14 05:11:48 +03:00
) ;
2022-09-14 09:38:53 +03:00
let reconnect_in = Duration ::from_millis ( first_sleep_ms ) ;
2023-01-10 05:23:27 +03:00
info! ( " Reconnect to {} in {}ms " , self , reconnect_in . as_millis ( ) ) ;
2022-09-14 09:38:53 +03:00
sleep ( reconnect_in ) . await ;
2022-09-14 05:11:48 +03:00
first_sleep_ms
} else {
base_ms
} ;
2022-09-14 04:43:09 +03:00
// retry until we succeed
2022-12-06 00:13:36 +03:00
while let Err ( err ) = self . connect ( block_sender , chain_id , db_conn ) . await {
2023-01-26 08:24:09 +03:00
// thread_rng is crytographically secure. we don't need that here. use thread_fast_rng instead
// TODO: min of 1 second? sleep longer if rate limited?
2022-09-14 04:43:09 +03:00
sleep_ms = min (
cap_ms ,
2022-11-12 09:11:58 +03:00
thread_fast_rng ( ) . gen_range ( base_ms .. ( sleep_ms * range_multiplier ) ) ,
2022-09-14 04:43:09 +03:00
) ;
let retry_in = Duration ::from_millis ( sleep_ms ) ;
2023-01-24 08:37:23 +03:00
let error_level = if self . backup {
log ::Level ::Debug
} else {
log ::Level ::Info
} ;
log ::log! (
error_level ,
2023-01-26 08:24:09 +03:00
" Failed (re)connect to {}! Retry in {}ms. err={:?} " ,
2022-11-12 11:24:32 +03:00
self ,
retry_in . as_millis ( ) ,
err ,
) ;
2022-09-14 04:43:09 +03:00
sleep ( retry_in ) . await ;
}
Ok ( ( ) )
}
2022-12-06 00:13:36 +03:00
/// connect to the web3 provider
async fn connect (
2022-06-14 07:04:14 +03:00
self : & Arc < Self > ,
2022-09-14 04:43:09 +03:00
block_sender : Option < & flume ::Sender < BlockAndRpc > > ,
2022-12-06 00:13:36 +03:00
chain_id : u64 ,
db_conn : Option < & DatabaseConnection > ,
2022-06-14 07:04:14 +03:00
) -> anyhow ::Result < ( ) > {
2023-02-12 21:22:20 +03:00
if let Ok ( mut unlocked_provider ) = self . provider . try_write ( ) {
2023-02-12 12:22:53 +03:00
#[ cfg(test) ]
if let Some ( Web3Provider ::Mock ) = unlocked_provider . as_deref ( ) {
return Ok ( ( ) ) ;
2022-12-06 00:13:36 +03:00
}
2022-09-13 02:00:10 +03:00
2023-02-12 12:22:53 +03:00
* unlocked_provider = if let Some ( ws_url ) = self . ws_url . as_ref ( ) {
// set up ws client
match & * unlocked_provider {
None = > {
info! ( " connecting to {} " , self ) ;
}
Some ( _ ) = > {
debug! ( " reconnecting to {} " , self ) ;
// tell the block subscriber that this rpc doesn't have any blocks
if let Some ( block_sender ) = block_sender {
block_sender
. send_async ( ( None , self . clone ( ) ) )
. await
. context ( " block_sender during connect " ) ? ;
}
2022-06-14 07:04:14 +03:00
2023-02-12 12:22:53 +03:00
// reset sync status
let mut head_block = self . head_block . write ( ) ;
* head_block = None ;
2022-09-14 07:27:18 +03:00
2023-02-12 12:22:53 +03:00
// disconnect the current provider
// TODO: what until the block_sender's receiver finishes updating this item?
* unlocked_provider = None ;
}
2022-12-06 00:13:36 +03:00
}
2023-02-12 12:22:53 +03:00
let p = Web3Provider ::from_str ( ws_url . as_str ( ) , None )
. await
. context ( format! ( " failed connecting to {} " , ws_url ) ) ? ;
assert! ( p . ws ( ) . is_some ( ) ) ;
Some ( Arc ::new ( p ) )
} else {
// http client
if let Some ( url ) = & self . http_url {
let p = Web3Provider ::from_str ( url , self . http_client . clone ( ) )
2022-12-06 00:13:36 +03:00
. await
2023-02-12 12:22:53 +03:00
. context ( format! ( " failed connecting to {} " , url ) ) ? ;
assert! ( p . http ( ) . is_some ( ) ) ;
Some ( Arc ::new ( p ) )
} else {
None
2022-12-06 00:13:36 +03:00
}
2023-02-12 12:22:53 +03:00
} ;
2022-06-14 07:04:14 +03:00
2023-02-12 12:22:53 +03:00
let authorization = Arc ::new ( Authorization ::internal ( db_conn . cloned ( ) ) ? ) ;
// check the server's chain_id here
// TODO: some public rpcs (on bsc and fantom) do not return an id and so this ends up being an error
// TODO: what should the timeout be? should there be a request timeout?
// trace!("waiting on chain id for {}", self);
let found_chain_id : Result < U64 , _ > = self
. wait_for_request_handle ( & authorization , None , unlocked_provider . clone ( ) )
. await ?
. request (
" eth_chainId " ,
& json! ( Option ::None ::< ( ) > ) ,
Level ::Trace . into ( ) ,
unlocked_provider . clone ( ) ,
)
. await ;
// trace!("found_chain_id: {:?}", found_chain_id);
match found_chain_id {
Ok ( found_chain_id ) = > {
// TODO: there has to be a cleaner way to do this
if chain_id ! = found_chain_id . as_u64 ( ) {
return Err ( anyhow ::anyhow! (
" incorrect chain id! Config has {}, but RPC has {} " ,
chain_id ,
found_chain_id
)
. context ( format! ( " failed @ {} " , self ) ) ) ;
}
}
Err ( e ) = > {
return Err ( anyhow ::Error ::from ( e ) ) ;
2022-12-06 00:13:36 +03:00
}
}
2023-02-12 12:22:53 +03:00
self . check_block_data_limit ( & authorization , unlocked_provider . clone ( ) )
. await ? ;
2022-12-06 00:13:36 +03:00
2023-02-12 12:22:53 +03:00
drop ( unlocked_provider ) ;
2022-12-06 00:13:36 +03:00
2023-02-12 12:22:53 +03:00
info! ( " successfully connected to {} " , self ) ;
2023-02-27 09:44:09 +03:00
} else if self . provider . read ( ) . await . is_none ( ) {
return Err ( anyhow! ( " failed waiting for client " ) ) ;
2023-02-12 12:22:53 +03:00
} ;
2022-09-14 04:43:09 +03:00
2022-06-14 07:04:14 +03:00
Ok ( ( ) )
}
2023-02-27 10:52:37 +03:00
pub async fn disconnect ( & self ) -> anyhow ::Result < ( ) > {
2023-02-28 22:09:49 +03:00
let age = self . created_at . unwrap ( ) . elapsed ( ) . as_secs ( ) ;
2023-02-28 22:01:34 +03:00
2023-02-28 22:09:49 +03:00
info! ( " disconnecting {} ({}s old) " , self , age ) ;
2023-02-27 10:52:37 +03:00
2023-02-28 00:29:07 +03:00
self . reconnect . store ( false , atomic ::Ordering ::Release ) ;
2023-02-27 10:52:37 +03:00
if let Err ( err ) = self . disconnect_watch . as_ref ( ) . unwrap ( ) . send ( true ) {
warn! ( " failed sending disconnect watch: {:?} " , err ) ;
} ;
2023-02-28 22:09:49 +03:00
trace! ( " disconnecting (locking) {} ({}s old) " , self , age ) ;
2023-02-28 00:29:07 +03:00
let mut provider = self . provider . write ( ) . await ;
2023-02-28 22:09:49 +03:00
trace! ( " disconnecting (clearing provider) {} ({}s old) " , self , age ) ;
2023-02-28 00:29:07 +03:00
* provider = None ;
2023-02-27 10:52:37 +03:00
Ok ( ( ) )
}
2022-08-26 20:26:17 +03:00
async fn send_head_block_result (
self : & Arc < Self > ,
2022-09-06 15:29:37 +03:00
new_head_block : Result < Option < ArcBlock > , ProviderError > ,
2022-07-22 08:11:26 +03:00
block_sender : & flume ::Sender < BlockAndRpc > ,
2023-02-26 10:52:33 +03:00
block_map : BlocksByHashCache ,
2022-05-30 07:30:13 +03:00
) -> anyhow ::Result < ( ) > {
2022-12-03 08:31:03 +03:00
let new_head_block = match new_head_block {
2022-09-06 15:29:37 +03:00
Ok ( None ) = > {
2022-11-06 23:52:11 +03:00
{
2022-12-06 01:38:54 +03:00
let mut head_block = self . head_block . write ( ) ;
2022-12-03 08:31:03 +03:00
2022-12-06 01:38:54 +03:00
if head_block . is_none ( ) {
2022-12-03 08:31:03 +03:00
// we previously sent a None. return early
return Ok ( ( ) ) ;
}
2023-02-28 22:01:34 +03:00
let age = self . created_at . unwrap ( ) . elapsed ( ) . as_millis ( ) ;
2023-02-28 22:09:49 +03:00
debug! ( " clearing head block on {} ({}ms old)! " , self , age ) ;
2022-11-06 23:52:11 +03:00
2022-12-06 01:38:54 +03:00
* head_block = None ;
2022-11-06 23:52:11 +03:00
}
2022-12-03 08:31:03 +03:00
None
2022-09-06 15:29:37 +03:00
}
2022-11-06 23:52:11 +03:00
Ok ( Some ( new_head_block ) ) = > {
2023-02-27 10:52:37 +03:00
let new_head_block = Web3ProxyBlock ::try_new ( new_head_block )
. expect ( " blocks from newHeads subscriptions should also convert " ) ;
2023-02-14 23:14:50 +03:00
let new_hash = * new_head_block . hash ( ) ;
2022-08-26 20:26:17 +03:00
2022-09-14 22:39:08 +03:00
// if we already have this block saved, set new_head_block to that arc. otherwise store this copy
2022-11-06 23:52:11 +03:00
let new_head_block = block_map
2022-11-03 02:14:16 +03:00
. get_with ( new_hash , async move { new_head_block } )
2022-09-14 22:39:08 +03:00
. await ;
2022-08-30 23:01:42 +03:00
2022-08-26 20:26:17 +03:00
// save the block so we don't send the same one multiple times
// also save so that archive checks can know how far back to query
{
2022-12-03 08:31:03 +03:00
let mut head_block = self . head_block . write ( ) ;
2022-09-06 06:26:23 +03:00
2023-02-27 09:44:09 +03:00
let _ = head_block . insert ( new_head_block . clone ( ) ) ;
2022-07-19 04:31:12 +03:00
}
2023-01-19 14:05:39 +03:00
if self . block_data_limit ( ) = = U64 ::zero ( ) {
2023-01-03 22:54:24 +03:00
let authorization = Arc ::new ( Authorization ::internal ( self . db_conn . clone ( ) ) ? ) ;
2023-02-12 12:22:53 +03:00
if let Err ( err ) = self . check_block_data_limit ( & authorization , None ) . await {
2023-01-03 22:54:24 +03:00
warn! (
" failed checking block limit after {} finished syncing. {:?} " ,
self , err
) ;
}
}
2022-12-03 08:31:03 +03:00
Some ( new_head_block )
2022-05-15 09:27:13 +03:00
}
2022-11-06 23:52:11 +03:00
Err ( err ) = > {
2022-11-12 11:24:32 +03:00
warn! ( " unable to get block from {}. err={:?} " , self , err ) ;
2022-11-06 23:52:11 +03:00
{
2022-12-06 01:38:54 +03:00
let mut head_block = self . head_block . write ( ) ;
2022-11-06 23:52:11 +03:00
2022-12-06 01:38:54 +03:00
* head_block = None ;
2022-11-06 23:52:11 +03:00
}
2022-08-07 09:48:57 +03:00
2022-12-03 08:31:03 +03:00
None
2022-05-15 09:27:13 +03:00
}
2022-12-03 08:31:03 +03:00
} ;
// send an empty block to take this server out of rotation
block_sender
. send_async ( ( new_head_block , self . clone ( ) ) )
. await
. context ( " block_sender " ) ? ;
2022-05-30 07:30:13 +03:00
Ok ( ( ) )
2022-05-15 09:27:13 +03:00
}
2023-03-02 21:27:32 +03:00
fn should_disconnect ( & self ) -> bool {
* self . disconnect_watch . as_ref ( ) . unwrap ( ) . borrow ( )
}
2022-09-14 04:43:09 +03:00
/// subscribe to blocks and transactions with automatic reconnects
2022-12-06 00:13:36 +03:00
/// This should only exit when the program is exiting.
/// TODO: should more of these args be on self?
#[ allow(clippy::too_many_arguments) ]
2022-06-14 08:43:28 +03:00
async fn subscribe (
2022-06-14 07:04:14 +03:00
self : Arc < Self > ,
2022-11-08 22:58:11 +03:00
authorization : & Arc < Authorization > ,
2023-02-26 10:52:33 +03:00
block_map : BlocksByHashCache ,
2022-07-22 08:11:26 +03:00
block_sender : Option < flume ::Sender < BlockAndRpc > > ,
2022-12-06 00:13:36 +03:00
chain_id : u64 ,
2023-02-27 10:52:37 +03:00
disconnect_receiver : watch ::Receiver < bool > ,
2022-12-06 00:13:36 +03:00
http_interval_sender : Option < Arc < broadcast ::Sender < ( ) > > > ,
tx_id_sender : Option < flume ::Sender < ( TxHash , Arc < Self > ) > > ,
2022-06-14 07:04:14 +03:00
) -> anyhow ::Result < ( ) > {
2023-01-26 08:24:09 +03:00
let error_handler = if self . backup {
2023-04-06 01:34:28 +03:00
RequestErrorHandler ::DebugLevel
2023-02-16 02:31:59 +03:00
} else {
2023-04-06 01:34:28 +03:00
RequestErrorHandler ::ErrorLevel
2023-02-16 02:31:59 +03:00
} ;
2023-03-31 14:43:41 +03:00
2023-03-23 04:43:13 +03:00
let mut delay_start = false ;
2023-02-16 02:31:59 +03:00
2023-03-23 02:16:15 +03:00
// this does loop. just only when reconnect is enabled
#[ allow(clippy::never_loop) ]
2022-06-16 05:53:37 +03:00
loop {
2023-03-23 02:16:15 +03:00
debug! ( " subscription loop started " ) ;
2022-06-16 05:53:37 +03:00
let mut futures = vec! [ ] ;
2023-03-01 23:56:00 +03:00
let http_interval_receiver = http_interval_sender . as_ref ( ) . map ( | x | x . subscribe ( ) ) ;
2022-12-06 00:13:36 +03:00
{
// TODO: move this into a proper function
let authorization = authorization . clone ( ) ;
let block_sender = block_sender . clone ( ) ;
2023-02-27 10:52:37 +03:00
let rpc = self . clone ( ) ;
2022-12-06 00:13:36 +03:00
let ( ready_tx , ready_rx ) = oneshot ::channel ( ) ;
let f = async move {
// initial sleep to allow for the initial connection
2023-02-27 10:52:37 +03:00
rpc . retrying_connect (
2022-12-06 00:13:36 +03:00
block_sender . as_ref ( ) ,
chain_id ,
authorization . db_conn . as_ref ( ) ,
2023-03-23 04:43:13 +03:00
delay_start ,
2022-12-06 00:13:36 +03:00
)
. await ? ;
// provider is ready
ready_tx . send ( ( ) ) . unwrap ( ) ;
2023-02-16 02:31:59 +03:00
// TODO: how often? different depending on the chain?
// TODO: reset this timeout when a new block is seen? we need to keep request_latency updated though
2023-02-16 11:26:58 +03:00
let health_sleep_seconds = 10 ;
2023-02-16 08:54:07 +03:00
// TODO: benchmark this and lock contention
2023-02-16 02:31:59 +03:00
let mut old_total_requests = 0 ;
let mut new_total_requests ;
2022-12-06 00:13:36 +03:00
2023-02-28 00:29:07 +03:00
// health check loop
2022-12-06 00:13:36 +03:00
loop {
2023-03-23 02:16:15 +03:00
// TODO: do we need this to be abortable?
2023-02-28 00:29:07 +03:00
if rpc . should_disconnect ( ) {
break ;
}
2023-02-16 02:31:59 +03:00
sleep ( Duration ::from_secs ( health_sleep_seconds ) ) . await ;
2023-02-28 00:29:07 +03:00
trace! ( " health check on {} " , rpc ) ;
2023-02-27 10:52:37 +03:00
2022-12-06 00:13:36 +03:00
// TODO: what if we just happened to have this check line up with another restart?
// TODO: think more about this
2023-02-27 10:52:37 +03:00
if let Some ( client ) = rpc . provider . read ( ) . await . clone ( ) {
2023-02-16 02:31:59 +03:00
// health check as a way of keeping this rpc's request_ewma accurate
// TODO: do something different if this is a backup server?
2023-02-27 10:52:37 +03:00
new_total_requests = rpc . total_requests . load ( atomic ::Ordering ::Relaxed ) ;
2023-02-16 02:31:59 +03:00
2023-02-22 07:25:02 +03:00
// TODO: how many requests should we require in order to skip a health check?
2023-02-16 02:31:59 +03:00
if new_total_requests - old_total_requests < 10 {
// TODO: if this fails too many times, reset the connection
2023-02-16 11:26:58 +03:00
// TODO: move this into a function and the chaining should be easier
2023-02-27 10:52:37 +03:00
let head_block = rpc . head_block . read ( ) . clone ( ) ;
2023-02-16 02:52:42 +03:00
2023-02-22 07:25:54 +03:00
if let Some ( ( block_number , txid ) ) = head_block . and_then ( | x | {
2023-04-06 01:15:20 +03:00
let block = x . block ;
2023-02-16 02:52:42 +03:00
2023-02-22 07:25:54 +03:00
let block_number = block . number ? ;
2023-02-16 02:52:42 +03:00
let txid = block . transactions . last ( ) . cloned ( ) ? ;
2023-02-22 07:25:54 +03:00
Some ( ( block_number , txid ) )
2023-02-16 02:52:42 +03:00
} ) {
2023-02-27 10:52:37 +03:00
let to = rpc
2023-02-16 11:26:58 +03:00
. wait_for_query ::< _ , Option < Transaction > > (
" eth_getTransactionByHash " ,
& ( txid , ) ,
2023-01-26 08:24:09 +03:00
error_handler ,
2023-02-16 11:26:58 +03:00
authorization . clone ( ) ,
Some ( client . clone ( ) ) ,
)
. await
. and_then ( | tx | {
let tx = tx . context ( " no transaction found " ) ? ;
// TODO: what default? something real?
let to = tx . to . unwrap_or_else ( | | {
" 0xdead00000000000000000000000000000000beef "
. parse ::< Address > ( )
. expect ( " deafbeef " )
} ) ;
Ok ( to )
} ) ;
let code = match to {
Err ( err ) = > {
2023-02-27 10:52:37 +03:00
if rpc . backup {
2023-02-16 11:26:58 +03:00
debug! (
" {} failed health check query! {:#?} " ,
2023-02-27 10:52:37 +03:00
rpc , err
2023-02-16 11:26:58 +03:00
) ;
} else {
warn! (
" {} failed health check query! {:#?} " ,
2023-02-27 10:52:37 +03:00
rpc , err
2023-02-16 11:26:58 +03:00
) ;
}
continue ;
}
Ok ( to ) = > {
2023-02-27 10:52:37 +03:00
rpc . wait_for_query ::< _ , Option < Bytes > > (
2023-02-16 11:26:58 +03:00
" eth_getCode " ,
2023-02-22 07:25:54 +03:00
& ( to , block_number ) ,
2023-01-26 08:24:09 +03:00
error_handler ,
2023-02-16 11:26:58 +03:00
authorization . clone ( ) ,
Some ( client ) ,
2023-02-16 02:52:42 +03:00
)
. await
}
2023-02-16 11:26:58 +03:00
} ;
if let Err ( err ) = code {
2023-02-27 10:52:37 +03:00
if rpc . backup {
debug! ( " {} failed health check query! {:#?} " , rpc , err ) ;
2023-02-16 11:26:58 +03:00
} else {
2023-02-27 10:52:37 +03:00
warn! ( " {} failed health check query! {:#?} " , rpc , err ) ;
2023-02-16 11:26:58 +03:00
}
continue ;
2023-02-16 02:52:42 +03:00
}
}
2023-02-16 02:31:59 +03:00
}
old_total_requests = new_total_requests ;
2022-12-06 00:13:36 +03:00
}
}
2023-02-28 00:29:07 +03:00
debug! ( " health checks for {} exited " , rpc ) ;
Ok ( ( ) )
2022-12-06 00:13:36 +03:00
} ;
futures . push ( flatten_handle ( tokio ::spawn ( f ) ) ) ;
// wait on the initial connection
ready_rx . await ? ;
}
2022-06-16 05:53:37 +03:00
if let Some ( block_sender ) = & block_sender {
2023-03-23 02:16:15 +03:00
// TODO: do we need this to be abortable?
2022-08-26 20:26:17 +03:00
let f = self . clone ( ) . subscribe_new_heads (
2022-11-08 22:58:11 +03:00
authorization . clone ( ) ,
2022-08-26 20:26:17 +03:00
http_interval_receiver ,
block_sender . clone ( ) ,
block_map . clone ( ) ,
) ;
2022-06-16 05:53:37 +03:00
futures . push ( flatten_handle ( tokio ::spawn ( f ) ) ) ;
}
if let Some ( tx_id_sender ) = & tx_id_sender {
2023-03-23 02:16:15 +03:00
// TODO: do we need this to be abortable?
2022-06-16 05:53:37 +03:00
let f = self
. clone ( )
2022-11-08 22:58:11 +03:00
. subscribe_pending_transactions ( authorization . clone ( ) , tx_id_sender . clone ( ) ) ;
2022-06-16 05:53:37 +03:00
futures . push ( flatten_handle ( tokio ::spawn ( f ) ) ) ;
}
match try_join_all ( futures ) . await {
2022-09-14 04:43:09 +03:00
Ok ( _ ) = > {
2023-03-23 04:43:13 +03:00
// future exited without error
// TODO: think about this more. we never set it to false. this can't be right
2022-09-14 04:43:09 +03:00
break ;
}
2022-06-16 05:53:37 +03:00
Err ( err ) = > {
2023-03-23 04:43:13 +03:00
let disconnect_sender = self . disconnect_watch . as_ref ( ) . unwrap ( ) ;
2023-02-27 10:52:37 +03:00
if self . reconnect . load ( atomic ::Ordering ::Acquire ) {
2023-03-23 04:43:13 +03:00
warn! ( " {} connection ended. reconnecting. err={:?} " , self , err ) ;
// TODO: i'm not sure if this is necessary, but telling everything to disconnect seems like a better idea than relying on timeouts and dropped futures.
disconnect_sender . send_replace ( true ) ;
disconnect_sender . send_replace ( false ) ;
// we call retrying_connect here with initial_delay=true. above, initial_delay=false
delay_start = true ;
continue ;
2023-03-23 02:16:15 +03:00
}
2023-03-31 14:43:41 +03:00
2023-03-23 04:43:13 +03:00
// reconnect is not enabled.
if * disconnect_receiver . borrow ( ) {
2023-02-27 10:52:37 +03:00
info! ( " {} is disconnecting " , self ) ;
break ;
2022-06-16 05:53:37 +03:00
} else {
2022-11-12 11:24:32 +03:00
error! ( " {} subscription exited. err={:?} " , self , err ) ;
2023-03-23 04:43:13 +03:00
disconnect_sender . send_replace ( true ) ;
break ;
2022-06-16 05:53:37 +03:00
}
2022-06-14 08:43:28 +03:00
}
2022-06-14 07:04:14 +03:00
}
}
2022-11-12 11:24:32 +03:00
info! ( " all subscriptions on {} completed " , self ) ;
2022-09-06 16:14:15 +03:00
2022-06-14 07:04:14 +03:00
Ok ( ( ) )
}
2023-03-02 21:14:17 +03:00
/// Subscribe to new blocks.
async fn subscribe_new_heads (
self : Arc < Self > ,
authorization : Arc < Authorization > ,
http_interval_receiver : Option < broadcast ::Receiver < ( ) > > ,
block_sender : flume ::Sender < BlockAndRpc > ,
block_map : BlocksByHashCache ,
) -> anyhow ::Result < ( ) > {
trace! ( " watching new heads on {} " , self ) ;
let provider = self . wait_for_provider ( ) . await ;
match provider . as_ref ( ) {
Web3Provider ::Http ( _client ) = > {
2023-02-12 12:22:53 +03:00
// there is a "watch_blocks" function, but a lot of public nodes do not support the necessary rpc endpoints
// TODO: try watch_blocks and fall back to this?
2022-06-29 22:15:05 +03:00
2023-02-12 12:22:53 +03:00
let mut http_interval_receiver = http_interval_receiver . unwrap ( ) ;
2022-06-14 07:04:14 +03:00
2023-02-12 12:22:53 +03:00
let mut last_hash = H256 ::zero ( ) ;
2022-06-14 07:04:14 +03:00
2023-02-27 10:52:37 +03:00
while ! self . should_disconnect ( ) {
2023-02-12 12:22:53 +03:00
// TODO: what should the max_wait be?
2023-03-02 21:14:17 +03:00
// we do not pass unlocked_provider because we want to get a new one each call. otherwise we might re-use an old one
2023-02-12 12:22:53 +03:00
match self
2023-03-02 21:14:17 +03:00
. wait_for_request_handle ( & authorization , None , None )
2023-02-12 12:22:53 +03:00
. await
{
Ok ( active_request_handle ) = > {
let block : Result < Option < ArcBlock > , _ > = active_request_handle
. request (
" eth_getBlockByNumber " ,
& json! ( ( " latest " , false ) ) ,
Level ::Warn . into ( ) ,
None ,
)
. await ;
match block {
Ok ( None ) = > {
warn! ( " no head block on {} " , self ) ;
self . send_head_block_result (
Ok ( None ) ,
& block_sender ,
block_map . clone ( ) ,
2022-09-21 07:48:21 +03:00
)
2023-02-12 12:22:53 +03:00
. await ? ;
}
Ok ( Some ( block ) ) = > {
// don't send repeat blocks
let new_hash =
block . hash . expect ( " blocks here should always have hashes " ) ;
2022-06-14 07:04:14 +03:00
2023-02-12 12:22:53 +03:00
if new_hash ! = last_hash {
// new hash!
last_hash = new_hash ;
2022-11-06 23:52:11 +03:00
self . send_head_block_result (
2023-02-12 12:22:53 +03:00
Ok ( Some ( block ) ) ,
2022-08-26 20:26:17 +03:00
& block_sender ,
block_map . clone ( ) ,
)
. await ? ;
2022-07-19 07:31:30 +03:00
}
2022-05-16 22:15:40 +03:00
}
2023-02-12 12:22:53 +03:00
Err ( err ) = > {
// we did not get a block back. something is up with the server. take it out of rotation
self . send_head_block_result (
Err ( err ) ,
& block_sender ,
block_map . clone ( ) ,
)
. await ? ;
}
2022-06-14 07:04:14 +03:00
}
2023-02-12 12:22:53 +03:00
}
Err ( err ) = > {
warn! ( " Internal error on latest block from {}. {:?} " , self , err ) ;
2022-11-06 23:52:11 +03:00
2023-02-12 12:22:53 +03:00
self . send_head_block_result ( Ok ( None ) , & block_sender , block_map . clone ( ) )
2022-11-06 23:52:11 +03:00
. await ? ;
2023-02-12 12:22:53 +03:00
// TODO: what should we do? sleep? extra time?
2022-05-16 22:15:40 +03:00
}
2023-02-12 12:22:53 +03:00
}
2022-07-19 04:31:12 +03:00
2023-02-12 12:22:53 +03:00
// wait for the next interval
// TODO: if error or rate limit, increase interval?
while let Err ( err ) = http_interval_receiver . recv ( ) . await {
match err {
broadcast ::error ::RecvError ::Closed = > {
// channel is closed! that's not good. bubble the error up
return Err ( err . into ( ) ) ;
}
broadcast ::error ::RecvError ::Lagged ( lagged ) = > {
// querying the block was delayed
// this can happen if tokio is very busy or waiting for requests limits took too long
2023-02-16 08:30:42 +03:00
if self . backup {
debug! ( " http interval on {} lagging by {}! " , self , lagged ) ;
} else {
warn! ( " http interval on {} lagging by {}! " , self , lagged ) ;
}
2022-07-19 04:31:12 +03:00
}
}
2022-05-15 22:28:22 +03:00
}
2022-06-14 07:04:14 +03:00
}
2023-02-12 12:22:53 +03:00
}
2023-03-02 21:14:17 +03:00
Web3Provider ::Both ( _ , client ) | Web3Provider ::Ws ( client ) = > {
2023-02-12 12:22:53 +03:00
// todo: move subscribe_blocks onto the request handle?
let active_request_handle = self
2023-03-02 21:14:17 +03:00
. wait_for_request_handle ( & authorization , None , Some ( provider . clone ( ) ) )
2023-02-12 12:22:53 +03:00
. await ;
let mut stream = client . subscribe_blocks ( ) . await ? ;
drop ( active_request_handle ) ;
// query the block once since the subscription doesn't send the current block
// there is a very small race condition here where the stream could send us a new block right now
// but all that does is print "new block" for the same block as current block
// TODO: how does this get wrapped in an arc? does ethers handle that?
// TODO: do this part over http?
let block : Result < Option < ArcBlock > , _ > = self
2023-03-02 21:14:17 +03:00
. wait_for_request_handle ( & authorization , None , Some ( provider . clone ( ) ) )
2023-02-12 12:22:53 +03:00
. await ?
. request (
" eth_getBlockByNumber " ,
& json! ( ( " latest " , false ) ) ,
Level ::Warn . into ( ) ,
2023-03-02 21:14:17 +03:00
Some ( provider . clone ( ) ) ,
2023-02-12 12:22:53 +03:00
)
. await ;
2023-01-03 18:51:18 +03:00
2023-02-12 12:22:53 +03:00
let mut last_hash = match & block {
Ok ( Some ( new_block ) ) = > new_block
. hash
. expect ( " blocks should always have a hash here " ) ,
_ = > H256 ::zero ( ) ,
} ;
2022-07-09 01:14:45 +03:00
2023-02-12 12:22:53 +03:00
self . send_head_block_result ( block , & block_sender , block_map . clone ( ) )
. await ? ;
while let Some ( new_block ) = stream . next ( ) . await {
2023-02-27 10:52:37 +03:00
// TODO: select on disconnect_watch instead of waiting for a block to arrive
if self . should_disconnect ( ) {
break ;
}
2023-02-12 12:22:53 +03:00
// TODO: check the new block's hash to be sure we don't send dupes
let new_hash = new_block
. hash
. expect ( " blocks should always have a hash here " ) ;
2022-11-06 23:52:11 +03:00
2023-02-12 12:22:53 +03:00
if new_hash = = last_hash {
// some rpcs like to give us duplicates. don't waste our time on them
continue ;
} else {
last_hash = new_hash ;
}
self . send_head_block_result (
Ok ( Some ( Arc ::new ( new_block ) ) ) ,
& block_sender ,
block_map . clone ( ) ,
)
. await ? ;
2022-05-05 22:07:09 +03:00
}
2023-02-12 12:22:53 +03:00
// TODO: is this always an error?
// TODO: we probably don't want a warn and to return error
2023-02-27 10:52:37 +03:00
debug! ( " new_heads subscription to {} ended " , self ) ;
2022-05-17 05:26:47 +03:00
}
2023-02-12 12:22:53 +03:00
#[ cfg(test) ]
2023-03-02 21:14:17 +03:00
Web3Provider ::Mock = > unimplemented! ( ) ,
2022-06-14 07:04:14 +03:00
}
2023-02-27 10:52:37 +03:00
// clear the head block. this might not be needed, but it won't hurt
self . send_head_block_result ( Ok ( None ) , & block_sender , block_map )
. await ? ;
2023-03-23 02:16:15 +03:00
if self . should_disconnect ( ) {
Ok ( ( ) )
} else {
Err ( anyhow! ( " new_heads subscription exited. reconnect needed " ) )
}
2022-06-14 07:04:14 +03:00
}
2022-05-17 05:26:47 +03:00
2023-02-12 12:22:53 +03:00
/// Turn on the firehose of pending transactions
2022-06-14 07:04:14 +03:00
async fn subscribe_pending_transactions (
self : Arc < Self > ,
2022-11-08 22:58:11 +03:00
authorization : Arc < Authorization > ,
2022-06-14 07:04:14 +03:00
tx_id_sender : flume ::Sender < ( TxHash , Arc < Self > ) > ,
) -> anyhow ::Result < ( ) > {
2023-02-12 12:22:53 +03:00
// TODO: give this a separate client. don't use new_head_client for everything. especially a firehose this big
// TODO: timeout
2023-03-02 21:27:32 +03:00
let provider = self . wait_for_provider ( ) . await ;
2023-03-02 19:52:28 +03:00
2023-02-12 12:22:53 +03:00
trace! ( " watching pending transactions on {} " , self ) ;
// TODO: does this keep the lock open for too long?
2023-03-02 21:27:32 +03:00
match provider . as_ref ( ) {
2023-03-17 05:38:11 +03:00
Web3Provider ::Http ( _provider ) = > {
2023-02-12 12:22:53 +03:00
// there is a "watch_pending_transactions" function, but a lot of public nodes do not support the necessary rpc endpoints
2023-03-02 21:27:32 +03:00
self . wait_for_disconnect ( ) . await ? ;
2023-02-12 12:22:53 +03:00
}
2023-03-02 21:27:32 +03:00
Web3Provider ::Both ( _ , client ) | Web3Provider ::Ws ( client ) = > {
2023-02-12 12:22:53 +03:00
// TODO: maybe the subscribe_pending_txs function should be on the active_request_handle
let active_request_handle = self
2023-03-02 21:27:32 +03:00
. wait_for_request_handle ( & authorization , None , Some ( provider . clone ( ) ) )
2023-02-12 12:22:53 +03:00
. await ? ;
2022-06-14 07:04:14 +03:00
2023-02-12 12:22:53 +03:00
let mut stream = client . subscribe_pending_txs ( ) . await ? ;
2022-06-14 07:04:14 +03:00
2023-02-12 12:22:53 +03:00
drop ( active_request_handle ) ;
2022-08-11 00:29:50 +03:00
2023-02-12 12:22:53 +03:00
while let Some ( pending_tx_id ) = stream . next ( ) . await {
tx_id_sender
. send_async ( ( pending_tx_id , self . clone ( ) ) )
. await
. context ( " tx_id_sender " ) ? ;
2022-07-08 21:27:06 +03:00
2023-02-12 12:22:53 +03:00
// TODO: periodically check for listeners. if no one is subscribed, unsubscribe and wait for a subscription
2023-02-27 10:52:37 +03:00
// TODO: select on this instead of checking every loop
if self . should_disconnect ( ) {
break ;
}
2022-06-14 07:04:14 +03:00
}
2023-02-12 12:22:53 +03:00
// TODO: is this always an error?
// TODO: we probably don't want a warn and to return error
2023-02-27 10:52:37 +03:00
debug! ( " pending_transactions subscription ended on {} " , self ) ;
2022-05-05 22:07:09 +03:00
}
2023-02-12 12:22:53 +03:00
#[ cfg(test) ]
2023-03-02 21:27:32 +03:00
Web3Provider ::Mock = > {
self . wait_for_disconnect ( ) . await ? ;
2023-02-27 10:52:37 +03:00
}
2022-05-05 22:07:09 +03:00
}
2023-03-23 02:16:15 +03:00
if self . should_disconnect ( ) {
Ok ( ( ) )
} else {
2023-03-31 14:43:41 +03:00
Err ( anyhow! (
" pending_transactions subscription exited. reconnect needed "
) )
2023-03-23 02:16:15 +03:00
}
2022-05-05 22:07:09 +03:00
}
2022-08-30 23:01:42 +03:00
/// be careful with this; it might wait forever!
2022-12-06 00:13:36 +03:00
/// `allow_not_ready` is only for use by health checks while starting the provider
2023-01-25 07:44:50 +03:00
/// TODO: don't use anyhow. use specific error type
2023-02-12 12:22:53 +03:00
pub async fn wait_for_request_handle < ' a > (
self : & ' a Arc < Self > ,
authorization : & ' a Arc < Authorization > ,
2023-01-25 09:45:20 +03:00
max_wait : Option < Duration > ,
2023-02-12 12:22:53 +03:00
unlocked_provider : Option < Arc < Web3Provider > > ,
2023-03-20 04:52:28 +03:00
) -> Web3ProxyResult < OpenRequestHandle > {
2023-01-25 09:45:20 +03:00
let max_wait = max_wait . map ( | x | Instant ::now ( ) + x ) ;
2022-05-16 22:15:40 +03:00
2022-06-17 01:23:41 +03:00
loop {
2022-12-06 00:13:36 +03:00
match self
2023-02-12 12:22:53 +03:00
. try_request_handle ( authorization , unlocked_provider . clone ( ) )
2022-12-06 00:13:36 +03:00
. await
{
2022-08-24 03:59:05 +03:00
Ok ( OpenRequestResult ::Handle ( handle ) ) = > return Ok ( handle ) ,
2022-08-24 03:14:49 +03:00
Ok ( OpenRequestResult ::RetryAt ( retry_at ) ) = > {
2022-08-07 09:48:57 +03:00
// TODO: emit a stat?
2023-01-25 09:45:20 +03:00
let wait = retry_at . duration_since ( Instant ::now ( ) ) ;
trace! (
" waiting {} millis for request handle on {} " ,
wait . as_millis ( ) ,
self
) ;
if let Some ( max_wait ) = max_wait {
if retry_at > max_wait {
// break now since we will wait past our maximum wait time
2023-03-20 04:52:28 +03:00
return Err ( Web3ProxyError ::Timeout ( None ) ) ;
2023-01-25 09:45:20 +03:00
}
2022-09-20 09:00:27 +03:00
}
2023-01-25 07:44:50 +03:00
2022-08-07 09:48:57 +03:00
sleep_until ( retry_at ) . await ;
}
2023-02-15 04:41:40 +03:00
Ok ( OpenRequestResult ::NotReady ) = > {
2022-08-24 03:59:05 +03:00
// TODO: when can this happen? log? emit a stat?
2023-01-25 07:44:50 +03:00
trace! ( " {} has no handle ready " , self ) ;
2023-01-25 09:45:20 +03:00
if let Some ( max_wait ) = max_wait {
let now = Instant ::now ( ) ;
2023-01-25 07:44:50 +03:00
2023-01-25 09:45:20 +03:00
if now > max_wait {
2023-03-20 04:52:28 +03:00
return Err ( Web3ProxyError ::NoHandleReady ) ;
2023-01-25 09:45:20 +03:00
}
2023-01-25 07:44:50 +03:00
}
2022-08-07 09:48:57 +03:00
// TODO: sleep how long? maybe just error?
2023-01-25 07:44:50 +03:00
// TODO: instead of an arbitrary sleep, subscribe to the head block on this
sleep ( Duration ::from_millis ( 10 ) ) . await ;
2022-05-06 07:29:25 +03:00
}
2022-08-07 09:48:57 +03:00
Err ( err ) = > return Err ( err ) ,
2022-05-06 07:29:25 +03:00
}
2022-05-05 22:07:09 +03:00
}
}
2022-09-22 23:27:14 +03:00
pub async fn try_request_handle (
self : & Arc < Self > ,
2022-11-08 22:58:11 +03:00
authorization : & Arc < Authorization > ,
2023-02-12 12:22:53 +03:00
// TODO: borrow on this instead of needing to clone the Arc?
unlocked_provider : Option < Arc < Web3Provider > > ,
2023-03-20 04:52:28 +03:00
) -> Web3ProxyResult < OpenRequestResult > {
2022-12-08 09:54:38 +03:00
// TODO: think more about this read block
2023-02-12 12:22:53 +03:00
// TODO: this should *not* be new_head_client. this should be a separate object
2023-02-12 21:22:20 +03:00
if unlocked_provider . is_some ( ) | | self . provider . read ( ) . await . is_some ( ) {
2023-02-12 12:22:53 +03:00
// we already have an unlocked provider. no need to lock
} else {
2023-02-15 04:41:40 +03:00
return Ok ( OpenRequestResult ::NotReady ) ;
2022-12-08 09:54:38 +03:00
}
2023-01-25 07:44:50 +03:00
if let Some ( hard_limit_until ) = self . hard_limit_until . as_ref ( ) {
2023-02-27 09:44:09 +03:00
let hard_limit_ready = * hard_limit_until . borrow ( ) ;
2023-01-25 07:44:50 +03:00
let now = Instant ::now ( ) ;
if now < hard_limit_ready {
return Ok ( OpenRequestResult ::RetryAt ( hard_limit_ready ) ) ;
}
}
2022-05-05 22:07:09 +03:00
// check rate limits
2022-05-22 02:34:05 +03:00
if let Some ( ratelimiter ) = self . hard_limit . as_ref ( ) {
2022-09-20 09:56:24 +03:00
// TODO: how should we know if we should set expire or not?
2023-01-25 09:45:20 +03:00
match ratelimiter
. throttle ( )
. await
. context ( format! ( " attempting to throttle {} " , self ) ) ?
{
2022-09-15 20:57:24 +03:00
RedisRateLimitResult ::Allowed ( _ ) = > {
2023-01-25 07:44:50 +03:00
// trace!("rate limit succeeded")
2022-05-05 22:07:09 +03:00
}
2022-09-15 20:57:24 +03:00
RedisRateLimitResult ::RetryAt ( retry_at , _ ) = > {
2023-01-25 09:45:20 +03:00
// rate limit gave us a wait time
if ! self . backup {
let when = retry_at . duration_since ( Instant ::now ( ) ) ;
warn! (
" Exhausted rate limit on {}. Retry in {}ms " ,
self ,
when . as_millis ( )
) ;
}
2022-05-05 22:07:09 +03:00
2023-01-25 07:44:50 +03:00
if let Some ( hard_limit_until ) = self . hard_limit_until . as_ref ( ) {
2023-02-27 09:44:09 +03:00
hard_limit_until . send_replace ( retry_at ) ;
2023-01-25 07:44:50 +03:00
}
2022-09-10 03:12:14 +03:00
return Ok ( OpenRequestResult ::RetryAt ( retry_at ) ) ;
2022-08-07 09:48:57 +03:00
}
2022-09-15 20:57:24 +03:00
RedisRateLimitResult ::RetryNever = > {
2023-02-15 04:41:40 +03:00
return Ok ( OpenRequestResult ::NotReady ) ;
2022-05-05 22:07:09 +03:00
}
}
} ;
2022-12-06 03:06:28 +03:00
let handle = OpenRequestHandle ::new ( authorization . clone ( ) , self . clone ( ) ) . await ;
2022-08-24 03:11:49 +03:00
2022-08-24 03:59:05 +03:00
Ok ( OpenRequestResult ::Handle ( handle ) )
2022-08-24 03:11:49 +03:00
}
2023-02-16 11:26:58 +03:00
2023-03-02 21:27:32 +03:00
async fn wait_for_disconnect ( & self ) -> Result < ( ) , tokio ::sync ::watch ::error ::RecvError > {
let mut disconnect_watch = self . disconnect_watch . as_ref ( ) . unwrap ( ) . subscribe ( ) ;
loop {
if * disconnect_watch . borrow_and_update ( ) {
// disconnect watch is set to "true"
return Ok ( ( ) ) ;
}
// wait for disconnect_watch to change
disconnect_watch . changed ( ) . await ? ;
}
}
async fn wait_for_provider ( & self ) -> Arc < Web3Provider > {
let mut provider = self . provider . read ( ) . await . clone ( ) ;
let mut logged = false ;
while provider . is_none ( ) {
// trace!("waiting on unlocked_provider: locking...");
sleep ( Duration ::from_millis ( 100 ) ) . await ;
if ! logged {
debug! ( " waiting for provider on {} " , self ) ;
logged = true ;
}
provider = self . provider . read ( ) . await . clone ( ) ;
}
provider . unwrap ( )
}
2023-02-16 11:26:58 +03:00
pub async fn wait_for_query < P , R > (
self : & Arc < Self > ,
method : & str ,
params : & P ,
2023-04-06 01:34:28 +03:00
revert_handler : RequestErrorHandler ,
2023-02-16 11:26:58 +03:00
authorization : Arc < Authorization > ,
unlocked_provider : Option < Arc < Web3Provider > > ,
) -> anyhow ::Result < R >
where
// TODO: not sure about this type. would be better to not need clones, but measure and spawns combine to need it
P : Clone + fmt ::Debug + serde ::Serialize + Send + Sync + 'static ,
R : serde ::Serialize + serde ::de ::DeserializeOwned + fmt ::Debug ,
{
self . wait_for_request_handle ( & authorization , None , None )
. await ?
. request ::< P , R > ( method , params , revert_handler , unlocked_provider )
. await
. context ( " ProviderError from the backend " )
}
2022-08-24 03:11:49 +03:00
}
2022-08-07 09:48:57 +03:00
2022-08-24 03:11:49 +03:00
impl fmt ::Debug for Web3Provider {
fn fmt ( & self , f : & mut fmt ::Formatter < '_ > ) -> fmt ::Result {
// TODO: the default Debug takes forever to write. this is too quiet though. we at least need the url
f . debug_struct ( " Web3Provider " ) . finish_non_exhaustive ( )
2022-05-06 07:29:25 +03:00
}
}
2023-02-06 20:55:27 +03:00
impl Hash for Web3Rpc {
2022-06-14 07:04:14 +03:00
fn hash < H : Hasher > ( & self , state : & mut H ) {
2022-08-24 03:32:16 +03:00
self . name . hash ( state ) ;
2023-02-28 22:01:34 +03:00
self . display_name . hash ( state ) ;
self . http_url . hash ( state ) ;
self . ws_url . hash ( state ) ;
self . automatic_block_limit . hash ( state ) ;
self . backup . hash ( state ) ;
// TODO: including soft_limit might need to change if we change them to be dynamic
self . soft_limit . hash ( state ) ;
self . tier . hash ( state ) ;
self . created_at . hash ( state ) ;
2022-06-14 07:04:14 +03:00
}
}
2023-02-06 20:55:27 +03:00
impl Eq for Web3Rpc { }
2022-05-05 22:07:09 +03:00
2023-02-06 20:55:27 +03:00
impl Ord for Web3Rpc {
2022-05-05 22:07:09 +03:00
fn cmp ( & self , other : & Self ) -> std ::cmp ::Ordering {
2022-08-24 03:32:16 +03:00
self . name . cmp ( & other . name )
2022-05-05 22:07:09 +03:00
}
}
2023-02-06 20:55:27 +03:00
impl PartialOrd for Web3Rpc {
2022-05-05 22:07:09 +03:00
fn partial_cmp ( & self , other : & Self ) -> Option < Ordering > {
Some ( self . cmp ( other ) )
}
}
2023-02-06 20:55:27 +03:00
impl PartialEq for Web3Rpc {
2022-05-05 22:07:09 +03:00
fn eq ( & self , other : & Self ) -> bool {
2022-08-24 03:32:16 +03:00
self . name = = other . name
2022-05-05 22:07:09 +03:00
}
}
2022-08-10 08:56:09 +03:00
2023-02-06 20:55:27 +03:00
impl Serialize for Web3Rpc {
2022-08-10 08:56:09 +03:00
fn serialize < S > ( & self , serializer : S ) -> Result < S ::Ok , S ::Error >
where
S : Serializer ,
{
// 3 is the number of fields in the struct.
2023-01-26 08:24:09 +03:00
let mut state = serializer . serialize_struct ( " Web3Rpc " , 9 ) ? ;
2022-08-10 08:56:09 +03:00
2022-11-14 00:05:37 +03:00
// the url is excluded because it likely includes private information. just show the name that we use in keys
2022-08-10 08:56:09 +03:00
state . serialize_field ( " name " , & self . name ) ? ;
2022-11-14 00:05:37 +03:00
// a longer name for display to users
state . serialize_field ( " display_name " , & self . display_name ) ? ;
2022-08-10 08:56:09 +03:00
2023-02-02 19:00:59 +03:00
state . serialize_field ( " backup " , & self . backup ) ? ;
2022-12-28 05:17:11 +03:00
match self . block_data_limit . load ( atomic ::Ordering ::Relaxed ) {
u64 ::MAX = > {
state . serialize_field ( " block_data_limit " , & None ::< ( ) > ) ? ;
}
block_data_limit = > {
state . serialize_field ( " block_data_limit " , & block_data_limit ) ? ;
}
2022-09-06 23:12:45 +03:00
}
2022-08-10 08:56:09 +03:00
2023-01-04 09:37:51 +03:00
state . serialize_field ( " tier " , & self . tier ) ? ;
2023-01-05 01:33:39 +03:00
2022-08-10 08:56:09 +03:00
state . serialize_field ( " soft_limit " , & self . soft_limit ) ? ;
2023-02-15 23:33:43 +03:00
// TODO: maybe this is too much data. serialize less?
state . serialize_field ( " head_block " , & * self . head_block . read ( ) ) ? ;
2022-09-06 23:12:45 +03:00
2023-02-16 11:26:58 +03:00
state . serialize_field ( " head_latency " , & self . head_latency . read ( ) . value ( ) ) ? ;
2023-02-15 23:33:43 +03:00
state . serialize_field (
" total_requests " ,
& self . total_requests . load ( atomic ::Ordering ::Relaxed ) ,
) ? ;
2022-09-05 19:39:46 +03:00
2022-08-10 08:56:09 +03:00
state . end ( )
}
}
2023-02-06 20:55:27 +03:00
impl fmt ::Debug for Web3Rpc {
2022-08-10 08:56:09 +03:00
fn fmt ( & self , f : & mut fmt ::Formatter < '_ > ) -> fmt ::Result {
2023-02-06 20:55:27 +03:00
let mut f = f . debug_struct ( " Web3Rpc " ) ;
2022-08-10 08:56:09 +03:00
2022-08-24 03:32:16 +03:00
f . field ( " name " , & self . name ) ;
2022-08-10 08:56:09 +03:00
let block_data_limit = self . block_data_limit . load ( atomic ::Ordering ::Relaxed ) ;
if block_data_limit = = u64 ::MAX {
2022-09-07 06:54:16 +03:00
f . field ( " blocks " , & " all " ) ;
2022-08-10 08:56:09 +03:00
} else {
2022-09-07 06:54:16 +03:00
f . field ( " blocks " , & block_data_limit ) ;
2022-08-10 08:56:09 +03:00
}
f . finish_non_exhaustive ( )
}
}
2023-02-06 20:55:27 +03:00
impl fmt ::Display for Web3Rpc {
2022-08-10 08:56:09 +03:00
fn fmt ( & self , f : & mut fmt ::Formatter < '_ > ) -> fmt ::Result {
// TODO: filter basic auth and api keys
2022-08-24 03:32:16 +03:00
write! ( f , " {} " , & self . name )
2022-08-10 08:56:09 +03:00
}
}
2022-11-22 23:23:08 +03:00
mod tests {
2022-11-30 00:34:42 +03:00
#![ allow(unused_imports) ]
2022-11-22 23:23:08 +03:00
use super ::* ;
2022-12-05 04:10:20 +03:00
use ethers ::types ::{ Block , U256 } ;
2022-11-22 23:23:08 +03:00
#[ test ]
fn test_archive_node_has_block_data ( ) {
2023-01-26 08:24:09 +03:00
let now = chrono ::Utc ::now ( ) . timestamp ( ) . into ( ) ;
2022-12-05 04:10:20 +03:00
2022-12-03 08:31:03 +03:00
let random_block = Block {
hash : Some ( H256 ::random ( ) ) ,
number : Some ( 1_000_000. into ( ) ) ,
2022-12-05 04:10:20 +03:00
timestamp : now ,
2022-12-03 08:31:03 +03:00
.. Default ::default ( )
2022-11-22 23:23:08 +03:00
} ;
2022-12-03 08:31:03 +03:00
let random_block = Arc ::new ( random_block ) ;
2023-02-15 04:41:40 +03:00
let head_block = Web3ProxyBlock ::try_new ( random_block ) . unwrap ( ) ;
2022-11-22 23:44:23 +03:00
let block_data_limit = u64 ::MAX ;
2022-11-22 23:23:08 +03:00
2023-02-06 20:55:27 +03:00
let x = Web3Rpc {
2022-11-22 23:23:08 +03:00
name : " name " . to_string ( ) ,
2023-02-12 12:22:53 +03:00
ws_url : Some ( " ws://example.com " . to_string ( ) ) ,
2022-11-22 23:23:08 +03:00
soft_limit : 1_000 ,
2022-12-06 00:13:36 +03:00
automatic_block_limit : false ,
2023-01-19 13:13:00 +03:00
backup : false ,
2022-11-22 23:44:23 +03:00
block_data_limit : block_data_limit . into ( ) ,
2023-01-04 09:37:51 +03:00
tier : 0 ,
2022-12-03 08:31:03 +03:00
head_block : RwLock ::new ( Some ( head_block . clone ( ) ) ) ,
2023-02-06 20:55:27 +03:00
.. Default ::default ( )
2022-11-22 23:23:08 +03:00
} ;
assert! ( x . has_block_data ( & 0. into ( ) ) ) ;
assert! ( x . has_block_data ( & 1. into ( ) ) ) ;
2023-02-27 09:44:09 +03:00
assert! ( x . has_block_data ( head_block . number ( ) ) ) ;
2022-12-03 08:31:03 +03:00
assert! ( ! x . has_block_data ( & ( head_block . number ( ) + 1 ) ) ) ;
assert! ( ! x . has_block_data ( & ( head_block . number ( ) + 1000 ) ) ) ;
2022-11-22 23:23:08 +03:00
}
2022-11-22 23:44:23 +03:00
#[ test ]
fn test_pruned_node_has_block_data ( ) {
2023-01-26 08:24:09 +03:00
let now = chrono ::Utc ::now ( ) . timestamp ( ) . into ( ) ;
2022-12-05 04:10:20 +03:00
2023-02-14 23:14:50 +03:00
let head_block : Web3ProxyBlock = Arc ::new ( Block {
2022-12-03 08:31:03 +03:00
hash : Some ( H256 ::random ( ) ) ,
number : Some ( 1_000_000. into ( ) ) ,
2022-12-05 04:10:20 +03:00
timestamp : now ,
2022-12-03 08:31:03 +03:00
.. Default ::default ( )
} )
2023-02-15 04:41:40 +03:00
. try_into ( )
. unwrap ( ) ;
2022-11-22 23:44:23 +03:00
let block_data_limit = 64 ;
2023-02-06 20:55:27 +03:00
let x = Web3Rpc {
2022-11-22 23:44:23 +03:00
name : " name " . to_string ( ) ,
soft_limit : 1_000 ,
2022-12-06 00:13:36 +03:00
automatic_block_limit : false ,
2023-01-19 13:13:00 +03:00
backup : false ,
2022-11-22 23:44:23 +03:00
block_data_limit : block_data_limit . into ( ) ,
2023-01-04 09:37:51 +03:00
tier : 0 ,
2022-12-03 08:31:03 +03:00
head_block : RwLock ::new ( Some ( head_block . clone ( ) ) ) ,
2023-02-06 20:55:27 +03:00
.. Default ::default ( )
2022-11-22 23:44:23 +03:00
} ;
assert! ( ! x . has_block_data ( & 0. into ( ) ) ) ;
assert! ( ! x . has_block_data ( & 1. into ( ) ) ) ;
2022-12-03 08:31:03 +03:00
assert! ( ! x . has_block_data ( & ( head_block . number ( ) - block_data_limit - 1 ) ) ) ;
assert! ( x . has_block_data ( & ( head_block . number ( ) - block_data_limit ) ) ) ;
2023-02-27 09:44:09 +03:00
assert! ( x . has_block_data ( head_block . number ( ) ) ) ;
2022-12-03 08:31:03 +03:00
assert! ( ! x . has_block_data ( & ( head_block . number ( ) + 1 ) ) ) ;
assert! ( ! x . has_block_data ( & ( head_block . number ( ) + 1000 ) ) ) ;
2022-11-22 23:44:23 +03:00
}
2022-12-05 04:10:20 +03:00
2023-01-19 14:05:39 +03:00
/*
// TODO: think about how to bring the concept of a "lagged" node back
2022-12-05 04:10:20 +03:00
#[ test ]
fn test_lagged_node_not_has_block_data ( ) {
2023-01-26 08:24:09 +03:00
let now = chrono ::Utc ::now ( ) . timestamp ( ) . into ( ) ;
2022-12-05 04:10:20 +03:00
// head block is an hour old
let head_block = Block {
hash : Some ( H256 ::random ( ) ) ,
number : Some ( 1_000_000. into ( ) ) ,
timestamp : now - 3600 ,
.. Default ::default ( )
} ;
let head_block = Arc ::new ( head_block ) ;
2023-01-26 08:24:09 +03:00
let head_block = Web3ProxyBlock ::new ( head_block ) ;
2022-12-05 04:10:20 +03:00
let block_data_limit = u64 ::MAX ;
let metrics = OpenRequestHandleMetrics ::default ( ) ;
2023-02-06 20:55:27 +03:00
let x = Web3Rpc {
2022-12-05 04:10:20 +03:00
name : " name " . to_string ( ) ,
2023-01-03 22:54:24 +03:00
db_conn : None ,
2022-12-05 04:10:20 +03:00
display_name : None ,
url : " ws://example.com " . to_string ( ) ,
http_client : None ,
active_requests : 0. into ( ) ,
frontend_requests : 0. into ( ) ,
internal_requests : 0. into ( ) ,
2022-12-06 00:13:36 +03:00
provider_state : AsyncRwLock ::new ( ProviderState ::None ) ,
2022-12-05 04:10:20 +03:00
hard_limit : None ,
soft_limit : 1_000 ,
2022-12-06 00:13:36 +03:00
automatic_block_limit : false ,
2023-01-19 13:13:00 +03:00
backup : false ,
2022-12-05 04:10:20 +03:00
block_data_limit : block_data_limit . into ( ) ,
2023-01-04 09:37:51 +03:00
tier : 0 ,
2022-12-05 04:10:20 +03:00
head_block : RwLock ::new ( Some ( head_block . clone ( ) ) ) ,
} ;
assert! ( ! x . has_block_data ( & 0. into ( ) ) ) ;
assert! ( ! x . has_block_data ( & 1. into ( ) ) ) ;
assert! ( ! x . has_block_data ( & head_block . number ( ) ) ) ;
assert! ( ! x . has_block_data ( & ( head_block . number ( ) + 1 ) ) ) ;
assert! ( ! x . has_block_data ( & ( head_block . number ( ) + 1000 ) ) ) ;
}
2023-01-19 14:05:39 +03:00
* /
2022-11-22 23:23:08 +03:00
}