2023-05-31 02:32:34 +03:00
//! Rate-limited communication with a web3 provider.
2023-02-26 10:52:33 +03:00
use super ::blockchain ::{ ArcBlock , BlocksByHashCache , Web3ProxyBlock } ;
2023-05-23 01:32:15 +03:00
use super ::provider ::{ connect_http , connect_ws , EthersHttpProvider , EthersWsProvider } ;
2023-02-06 05:16:09 +03:00
use super ::request ::{ OpenRequestHandle , OpenRequestResult } ;
2023-05-24 00:40:34 +03:00
use crate ::app ::{ flatten_handle , Web3ProxyJoinHandle } ;
2023-02-12 12:22:53 +03:00
use crate ::config ::{ BlockAndRpc , Web3RpcConfig } ;
2023-05-31 07:26:11 +03:00
use crate ::errors ::{ Web3ProxyError , Web3ProxyResult } ;
2022-11-08 22:58:11 +03:00
use crate ::frontend ::authorization ::Authorization ;
2023-05-31 02:32:34 +03:00
use crate ::jsonrpc ::{ JsonRpcParams , JsonRpcResultData } ;
2023-04-06 01:34:28 +03:00
use crate ::rpcs ::request ::RequestErrorHandler ;
2023-02-12 12:22:53 +03:00
use anyhow ::{ anyhow , Context } ;
2023-06-13 04:58:51 +03:00
use arc_swap ::ArcSwapOption ;
2023-05-24 00:40:34 +03:00
use ethers ::prelude ::{ Bytes , Middleware , TxHash , U64 } ;
2023-02-16 11:26:58 +03:00
use ethers ::types ::{ Address , Transaction , U256 } ;
2023-03-23 04:43:13 +03:00
use futures ::future ::try_join_all ;
2023-03-31 14:43:41 +03:00
use futures ::StreamExt ;
2023-05-11 23:09:15 +03:00
use latency ::{ EwmaLatency , PeakEwmaLatency } ;
2023-05-24 00:40:34 +03:00
use log ::{ debug , info , trace , warn , Level } ;
2022-11-14 21:24:52 +03:00
use migration ::sea_orm ::DatabaseConnection ;
2023-06-13 05:13:06 +03:00
use nanorand ::Rng ;
2023-02-16 11:26:58 +03:00
use ordered_float ::OrderedFloat ;
2022-08-10 05:37:34 +03:00
use parking_lot ::RwLock ;
2022-09-15 20:57:24 +03:00
use redis_rate_limiter ::{ RedisPool , RedisRateLimitResult , RedisRateLimiter } ;
2022-05-21 01:16:15 +03:00
use serde ::ser ::{ SerializeStruct , Serializer } ;
use serde ::Serialize ;
2022-09-24 05:47:44 +03:00
use serde_json ::json ;
2023-06-09 23:09:58 +03:00
use std ::cmp ::Reverse ;
2022-05-05 22:07:09 +03:00
use std ::fmt ;
2022-06-14 07:04:14 +03:00
use std ::hash ::{ Hash , Hasher } ;
2023-06-13 06:44:52 +03:00
use std ::sync ::atomic ::{ self , AtomicU32 , AtomicU64 , AtomicUsize } ;
2022-05-05 22:07:09 +03:00
use std ::{ cmp ::Ordering , sync ::Arc } ;
2023-05-24 00:40:34 +03:00
use tokio ::sync ::watch ;
2023-06-13 08:26:10 +03:00
use tokio ::time ::{ interval , sleep , sleep_until , timeout , Duration , Instant , MissedTickBehavior } ;
2023-05-11 23:07:31 +03:00
use url ::Url ;
2022-12-06 00:13:36 +03:00
2022-09-15 20:57:24 +03:00
/// An active connection to a Web3 RPC server like geth or erigon.
2023-02-03 01:48:23 +03:00
#[ derive(Default) ]
2023-02-06 20:55:27 +03:00
pub struct Web3Rpc {
2022-08-24 03:11:49 +03:00
pub name : String ,
2023-06-13 08:26:10 +03:00
pub block_interval : Duration ,
2022-11-14 00:05:37 +03:00
pub display_name : Option < String > ,
2023-01-03 22:54:24 +03:00
pub db_conn : Option < DatabaseConnection > ,
2023-05-23 01:32:15 +03:00
/// most all requests prefer use the http_provider
pub ( super ) http_provider : Option < EthersHttpProvider > ,
2023-06-13 04:58:51 +03:00
/// the websocket url is only used for subscriptions
pub ( super ) ws_url : Option < Url > ,
2023-05-23 01:32:15 +03:00
/// the websocket provider is only used for subscriptions
2023-06-13 04:58:51 +03:00
pub ( super ) ws_provider : ArcSwapOption < EthersWsProvider > ,
2023-05-13 01:15:32 +03:00
/// keep track of hard limits
2023-06-13 04:58:51 +03:00
/// hard_limit_until is only inside an Option so that the "Default" derive works. it will always be set.
2023-01-25 07:44:50 +03:00
pub ( super ) hard_limit_until : Option < watch ::Sender < Instant > > ,
2022-08-24 02:13:56 +03:00
/// rate limits are stored in a central redis so that multiple proxies can share their rate limits
2022-09-15 20:57:24 +03:00
/// We do not use the deferred rate limiter because going over limits would cause errors
2022-11-23 01:45:22 +03:00
pub ( super ) hard_limit : Option < RedisRateLimiter > ,
2023-05-13 01:15:32 +03:00
/// used for ensuring enough requests are available before advancing the head block
2022-08-26 20:26:17 +03:00
pub ( super ) soft_limit : u32 ,
2022-12-06 00:13:36 +03:00
/// use web3 queries to find the block data limit for archive/pruned nodes
pub ( super ) automatic_block_limit : bool ,
2023-01-19 13:13:00 +03:00
/// only use this rpc if everything else is lagging too far. this allows us to ignore fast but very low limit rpcs
2023-03-01 22:23:59 +03:00
pub backup : bool ,
2022-09-06 23:12:45 +03:00
/// TODO: have an enum for this so that "no limit" prints pretty?
2022-11-23 01:45:22 +03:00
pub ( super ) block_data_limit : AtomicU64 ,
2023-06-13 04:58:51 +03:00
/// head_block is only inside an Option so that the "Default" derive works. it will always be set.
2023-06-13 20:00:08 +03:00
pub ( super ) head_block : Option < watch ::Sender < Option < Web3ProxyBlock > > > ,
2023-02-15 23:33:43 +03:00
/// Track head block latency
2023-05-11 23:09:15 +03:00
pub ( super ) head_latency : RwLock < EwmaLatency > ,
/// Track peak request latency
2023-06-13 04:58:51 +03:00
/// peak_latency is only inside an Option so that the "Default" derive works. it will always be set.
2023-05-11 23:09:15 +03:00
pub ( super ) peak_latency : Option < PeakEwmaLatency > ,
2023-06-09 22:21:50 +03:00
/// Automatically set priority
2023-06-13 06:44:52 +03:00
pub ( super ) tier : AtomicU32 ,
2023-02-15 23:33:43 +03:00
/// Track total requests served
2023-06-13 21:51:19 +03:00
pub ( super ) internal_requests : AtomicUsize ,
/// Track total requests served
pub ( super ) external_requests : AtomicUsize ,
/// Track in-flight requests
2023-02-16 11:26:58 +03:00
pub ( super ) active_requests : AtomicUsize ,
2023-06-13 04:58:51 +03:00
/// disconnect_watch is only inside an Option so that the "Default" derive works. it will always be set.
2023-02-27 10:52:37 +03:00
pub ( super ) disconnect_watch : Option < watch ::Sender < bool > > ,
2023-06-13 04:58:51 +03:00
/// created_at is only inside an Option so that the "Default" derive works. it will always be set.
2023-02-28 22:01:34 +03:00
pub ( super ) created_at : Option < Instant > ,
2022-08-24 02:13:56 +03:00
}
2023-02-06 20:55:27 +03:00
impl Web3Rpc {
2022-06-14 07:04:14 +03:00
/// Connect to a web3 rpc
2023-01-04 09:37:51 +03:00
// TODO: have this take a builder (which will have channels attached). or maybe just take the config and give the config public fields
2022-06-14 08:43:28 +03:00
#[ allow(clippy::too_many_arguments) ]
2022-06-14 07:04:14 +03:00
pub async fn spawn (
2023-02-12 12:22:53 +03:00
mut config : Web3RpcConfig ,
2022-08-10 08:56:09 +03:00
name : String ,
2022-07-19 07:21:32 +03:00
chain_id : u64 ,
2022-11-08 22:58:11 +03:00
db_conn : Option < DatabaseConnection > ,
2022-05-15 04:51:24 +03:00
// optional because this is only used for http providers. websocket providers don't use it
2022-07-19 07:21:32 +03:00
http_client : Option < reqwest ::Client > ,
2023-02-12 12:22:53 +03:00
redis_pool : Option < RedisPool > ,
2023-05-24 00:40:34 +03:00
block_interval : Duration ,
2023-02-26 10:52:33 +03:00
block_map : BlocksByHashCache ,
2023-06-16 10:46:27 +03:00
block_and_rpc_sender : Option < flume ::Sender < BlockAndRpc > > ,
2023-05-13 21:13:02 +03:00
tx_id_sender : Option < flume ::Sender < ( TxHash , Arc < Self > ) > > ,
2023-05-24 00:40:34 +03:00
) -> anyhow ::Result < ( Arc < Web3Rpc > , Web3ProxyJoinHandle < ( ) > ) > {
2023-02-28 22:01:34 +03:00
let created_at = Instant ::now ( ) ;
2023-02-12 12:22:53 +03:00
let hard_limit = match ( config . hard_limit , redis_pool ) {
( None , None ) = > None ,
( Some ( hard_limit ) , Some ( redis_pool ) ) = > {
// TODO: in process rate limiter instead? or is deffered good enough?
let rrl = RedisRateLimiter ::new (
" web3_proxy " ,
& format! ( " {} : {} " , chain_id , name ) ,
hard_limit ,
60.0 ,
redis_pool ,
) ;
Some ( rrl )
}
( None , Some ( _ ) ) = > None ,
( Some ( _hard_limit ) , None ) = > {
return Err ( anyhow ::anyhow! (
" no redis client pool! needed for hard limit "
) )
}
} ;
let tx_id_sender = if config . subscribe_txs {
tx_id_sender
} else {
None
} ;
let backup = config . backup ;
let block_data_limit : AtomicU64 = config . block_data_limit . unwrap_or_default ( ) . into ( ) ;
2023-06-16 10:46:27 +03:00
let automatic_block_limit = ( block_data_limit . load ( atomic ::Ordering ::Acquire ) = = 0 )
& & block_and_rpc_sender . is_some ( ) ;
2022-11-25 03:45:13 +03:00
2023-05-13 01:15:32 +03:00
// have a sender for tracking hard limit anywhere. we use this in case we
2023-01-25 07:44:50 +03:00
// and track on servers that have a configured hard limit
2023-05-13 01:15:32 +03:00
let ( hard_limit_until , _ ) = watch ::channel ( Instant ::now ( ) ) ;
2023-01-25 07:44:50 +03:00
2023-02-12 12:22:53 +03:00
if config . ws_url . is_none ( ) & & config . http_url . is_none ( ) {
if let Some ( url ) = config . url {
if url . starts_with ( " ws " ) {
config . ws_url = Some ( url ) ;
} else if url . starts_with ( " http " ) {
config . http_url = Some ( url ) ;
} else {
return Err ( anyhow! ( " only ws or http urls are supported " ) ) ;
}
} else {
return Err ( anyhow! (
" either ws_url or http_url are required. it is best to set both "
) ) ;
}
}
2023-05-13 01:15:32 +03:00
let ( head_block , _ ) = watch ::channel ( None ) ;
2023-05-11 23:09:15 +03:00
// Spawn the task for calculting average peak latency
// TODO Should these defaults be in config
let peak_latency = PeakEwmaLatency ::spawn (
// Decay over 15s
2023-05-23 01:43:39 +03:00
Duration ::from_secs ( 15 ) ,
2023-05-11 23:09:15 +03:00
// Peak requests so far around 5k, we will use an order of magnitude
// more to be safe. Should only use about 50mb RAM
50_000 ,
// Start latency at 1 second
Duration ::from_secs ( 1 ) ,
) ;
2023-05-23 01:32:15 +03:00
let http_provider = if let Some ( http_url ) = config . http_url {
let http_url = http_url . parse ::< Url > ( ) ? ;
2023-05-24 00:40:34 +03:00
Some ( connect_http ( http_url , http_client , block_interval ) ? )
2023-05-23 01:32:15 +03:00
// TODO: check the provider is on the right chain
2023-05-11 23:07:31 +03:00
} else {
None
} ;
2023-06-13 04:58:51 +03:00
let ws_url = if let Some ( ws_url ) = config . ws_url {
2023-05-23 01:32:15 +03:00
let ws_url = ws_url . parse ::< Url > ( ) ? ;
2023-06-13 04:58:51 +03:00
Some ( ws_url )
2023-05-11 23:07:31 +03:00
} else {
None
} ;
2023-05-24 00:40:34 +03:00
let ( disconnect_watch , _ ) = watch ::channel ( false ) ;
2023-05-23 01:32:15 +03:00
let new_rpc = Self {
2022-12-06 00:13:36 +03:00
automatic_block_limit ,
2023-01-19 13:13:00 +03:00
backup ,
2022-11-25 03:45:13 +03:00
block_data_limit ,
2023-06-13 08:26:10 +03:00
block_interval ,
2023-02-28 22:01:34 +03:00
created_at : Some ( created_at ) ,
2023-06-13 04:58:51 +03:00
db_conn ,
2023-05-23 01:32:15 +03:00
display_name : config . display_name ,
hard_limit ,
hard_limit_until : Some ( hard_limit_until ) ,
2023-06-13 20:00:08 +03:00
head_block : Some ( head_block ) ,
2023-05-23 01:32:15 +03:00
http_provider ,
name ,
2023-05-11 23:09:15 +03:00
peak_latency : Some ( peak_latency ) ,
2023-05-23 01:32:15 +03:00
soft_limit : config . soft_limit ,
2023-06-13 04:58:51 +03:00
ws_url ,
2023-05-24 00:40:34 +03:00
disconnect_watch : Some ( disconnect_watch ) ,
2023-02-06 20:55:27 +03:00
.. Default ::default ( )
2022-05-12 21:49:57 +03:00
} ;
2023-05-23 01:32:15 +03:00
let new_connection = Arc ::new ( new_rpc ) ;
2022-05-12 21:49:57 +03:00
2022-07-19 04:31:12 +03:00
// subscribe to new blocks and new transactions
2022-12-06 00:13:36 +03:00
// subscribing starts the connection (with retries)
2022-07-19 04:31:12 +03:00
// TODO: make transaction subscription optional (just pass None for tx_id_sender)
2022-06-14 08:43:28 +03:00
let handle = {
2022-07-09 07:25:59 +03:00
let new_connection = new_connection . clone ( ) ;
2022-06-14 08:43:28 +03:00
tokio ::spawn ( async move {
2023-06-13 04:58:51 +03:00
// TODO: this needs to be a subscribe_with_reconnect that does a retry with jitter and exponential backoff
2022-07-09 07:25:59 +03:00
new_connection
2023-06-16 10:46:27 +03:00
. subscribe_with_reconnect (
block_map ,
block_and_rpc_sender ,
chain_id ,
tx_id_sender ,
)
2022-06-14 08:43:28 +03:00
. await
} )
} ;
2022-08-27 05:13:36 +03:00
Ok ( ( new_connection , handle ) )
}
2022-07-19 04:31:12 +03:00
2023-06-09 23:09:58 +03:00
/// sort by...
/// - backups last
/// - block number (descending)
2023-06-17 22:09:04 +03:00
/// - tier (ascending)
2023-06-09 23:09:58 +03:00
/// TODO: tests on this!
2023-06-17 22:09:04 +03:00
/// TODO: should tier or block number take priority? it depends on the request. need to make sure filtering is right and then it should work tier before block_number
2023-06-09 23:09:58 +03:00
/// TODO: should this return a struct that implements sorting traits?
2023-06-17 22:09:04 +03:00
fn sort_on ( & self , max_block : Option < U64 > ) -> ( bool , Reverse < U64 > , u32 ) {
2023-06-09 23:09:58 +03:00
let mut head_block = self
2023-06-13 20:00:08 +03:00
. head_block
2023-06-09 23:09:58 +03:00
. as_ref ( )
. and_then ( | x | x . borrow ( ) . as_ref ( ) . map ( | x | * x . number ( ) ) )
. unwrap_or_default ( ) ;
if let Some ( max_block ) = max_block {
head_block = head_block . min ( max_block ) ;
}
let tier = self . tier . load ( atomic ::Ordering ::Relaxed ) ;
let backup = self . backup ;
2023-06-17 22:09:04 +03:00
( ! backup , Reverse ( head_block ) , tier )
2023-06-09 23:09:58 +03:00
}
pub fn sort_for_load_balancing_on (
& self ,
max_block : Option < U64 > ,
2023-06-17 22:09:04 +03:00
) -> ( ( bool , Reverse < U64 > , u32 ) , OrderedFloat < f64 > ) {
2023-06-09 23:09:58 +03:00
let sort_on = self . sort_on ( max_block ) ;
let weighted_peak_ewma_seconds = self . weighted_peak_ewma_seconds ( ) ;
let x = ( sort_on , weighted_peak_ewma_seconds ) ;
trace! ( " sort_for_load_balancing {}: {:?} " , self , x ) ;
x
}
/// like sort_for_load_balancing, but shuffles tiers randomly instead of sorting by weighted_peak_ewma_seconds
pub fn shuffle_for_load_balancing_on (
& self ,
max_block : Option < U64 > ,
2023-06-17 22:09:04 +03:00
) -> ( ( bool , Reverse < U64 > , u32 ) , u8 ) {
2023-06-09 23:09:58 +03:00
let sort_on = self . sort_on ( max_block ) ;
2023-06-13 05:13:06 +03:00
let mut rng = nanorand ::tls_rng ( ) ;
2023-06-09 23:09:58 +03:00
2023-06-13 08:42:22 +03:00
let r = rng . generate ::< u8 > ( ) ;
2023-06-09 23:09:58 +03:00
( sort_on , r )
}
2023-06-09 22:21:50 +03:00
pub fn weighted_peak_ewma_seconds ( & self ) -> OrderedFloat < f64 > {
2023-05-24 00:52:45 +03:00
let peak_latency = if let Some ( peak_latency ) = self . peak_latency . as_ref ( ) {
peak_latency . latency ( ) . as_secs_f64 ( )
} else {
1.0
} ;
2023-02-16 11:26:58 +03:00
// TODO: what ordering?
2023-05-13 09:00:03 +03:00
let active_requests = self . active_requests . load ( atomic ::Ordering ::Acquire ) as f64 + 1.0 ;
2023-02-16 11:26:58 +03:00
2023-05-24 00:52:45 +03:00
OrderedFloat ( peak_latency * active_requests )
2023-02-16 11:26:58 +03:00
}
2022-12-28 05:17:11 +03:00
// TODO: would be great if rpcs exposed this. see https://github.com/ledgerwatch/erigon/issues/6391
2023-05-31 02:32:34 +03:00
async fn check_block_data_limit ( self : & Arc < Self > ) -> anyhow ::Result < Option < u64 > > {
2022-12-06 00:13:36 +03:00
if ! self . automatic_block_limit {
2022-12-28 05:17:11 +03:00
// TODO: is this a good thing to return?
2022-12-06 00:13:36 +03:00
return Ok ( None ) ;
}
2023-01-19 14:05:39 +03:00
// TODO: check eth_syncing. if it is not false, return Ok(None)
2022-12-06 00:13:36 +03:00
2022-12-28 05:17:11 +03:00
let mut limit = None ;
2022-11-25 03:45:13 +03:00
// TODO: binary search between 90k and max?
2022-12-06 00:13:36 +03:00
// TODO: start at 0 or 1?
2022-11-25 03:45:13 +03:00
for block_data_limit in [ 0 , 32 , 64 , 128 , 256 , 512 , 1024 , 90_000 , u64 ::MAX ] {
2023-05-31 02:32:34 +03:00
let head_block_num_future = self . internal_request ::< _ , U256 > (
2022-12-06 00:13:36 +03:00
" eth_blockNumber " ,
2023-05-31 02:32:34 +03:00
& ( ) ,
2022-12-06 00:13:36 +03:00
// error here are expected, so keep the level low
2023-05-31 02:32:34 +03:00
Some ( Level ::Debug . into ( ) ) ,
2022-12-06 00:13:36 +03:00
) ;
2022-08-11 00:52:28 +03:00
2022-12-06 00:13:36 +03:00
let head_block_num = timeout ( Duration ::from_secs ( 5 ) , head_block_num_future )
. await
. context ( " timeout fetching eth_blockNumber " ) ?
. context ( " provider error " ) ? ;
2022-09-06 06:26:23 +03:00
2022-11-25 03:45:13 +03:00
let maybe_archive_block = head_block_num . saturating_sub ( ( block_data_limit ) . into ( ) ) ;
2022-08-27 05:13:36 +03:00
2022-12-06 00:13:36 +03:00
trace! (
" checking maybe_archive_block on {}: {} " ,
self ,
maybe_archive_block
) ;
2022-09-06 06:26:23 +03:00
// TODO: wait for the handle BEFORE we check the current block number. it might be delayed too!
2022-09-20 09:00:27 +03:00
// TODO: what should the request be?
2023-05-24 00:40:34 +03:00
let archive_result : Result < Bytes , _ > = self
2023-05-31 02:32:34 +03:00
. internal_request (
2022-08-27 05:13:36 +03:00
" eth_getCode " ,
2022-09-24 05:47:44 +03:00
& json! ( (
2022-08-27 05:13:36 +03:00
" 0xdead00000000000000000000000000000000beef " ,
maybe_archive_block ,
2022-09-24 05:47:44 +03:00
) ) ,
2022-09-21 07:48:21 +03:00
// error here are expected, so keep the level low
2023-05-31 02:32:34 +03:00
Some ( Level ::Trace . into ( ) ) ,
2022-08-27 05:13:36 +03:00
)
. await ;
2022-11-25 03:45:13 +03:00
trace! (
2022-12-06 00:13:36 +03:00
" archive_result on {} for {} ({}): {:?} " ,
self ,
2022-11-25 03:45:13 +03:00
block_data_limit ,
2022-12-06 00:13:36 +03:00
maybe_archive_block ,
2022-11-25 03:45:13 +03:00
archive_result
) ;
2022-08-27 05:13:36 +03:00
2022-11-25 03:45:13 +03:00
if archive_result . is_err ( ) {
2022-08-27 05:13:36 +03:00
break ;
2022-07-19 04:31:12 +03:00
}
2022-11-25 03:45:13 +03:00
limit = Some ( block_data_limit ) ;
2022-07-19 04:31:12 +03:00
}
2022-08-27 05:13:36 +03:00
if let Some ( limit ) = limit {
2023-01-03 22:54:24 +03:00
if limit = = 0 {
warn! ( " {} is unable to serve requests " , self ) ;
}
2022-08-27 05:13:36 +03:00
self . block_data_limit
. store ( limit , atomic ::Ordering ::Release ) ;
}
2022-07-19 04:31:12 +03:00
2023-02-22 08:14:49 +03:00
if limit = = Some ( u64 ::MAX ) {
info! ( " block data limit on {}: archive " , self ) ;
} else {
info! ( " block data limit on {}: {:?} " , self , limit ) ;
}
2022-11-25 03:45:13 +03:00
2022-08-27 05:13:36 +03:00
Ok ( limit )
2022-07-09 07:25:59 +03:00
}
2022-11-04 01:16:27 +03:00
/// TODO: this might be too simple. different nodes can prune differently. its possible we will have a block range
2022-07-25 03:27:00 +03:00
pub fn block_data_limit ( & self ) -> U64 {
2023-01-03 22:54:24 +03:00
self . block_data_limit . load ( atomic ::Ordering ::Acquire ) . into ( )
2022-07-19 04:31:12 +03:00
}
2023-05-13 01:15:32 +03:00
/// TODO: get rid of this now that consensus rpcs does it
2022-07-22 22:30:39 +03:00
pub fn has_block_data ( & self , needed_block_num : & U64 ) -> bool {
2023-06-13 20:00:08 +03:00
let head_block_num = match self . head_block . as_ref ( ) . unwrap ( ) . borrow ( ) . as_ref ( ) {
2022-09-06 06:26:23 +03:00
None = > return false ,
2023-02-14 23:14:50 +03:00
Some ( x ) = > * x . number ( ) ,
2022-09-06 06:26:23 +03:00
} ;
2022-07-19 04:31:12 +03:00
2022-11-20 01:05:51 +03:00
// this rpc doesn't have that block yet. still syncing
2022-11-22 23:23:08 +03:00
if needed_block_num > & head_block_num {
2023-01-26 08:24:09 +03:00
trace! (
" {} has head {} but needs {} " ,
self ,
head_block_num ,
needed_block_num ,
) ;
2022-11-04 01:16:27 +03:00
return false ;
}
// if this is a pruning node, we might not actually have the block
let block_data_limit : U64 = self . block_data_limit ( ) ;
2022-11-22 23:23:08 +03:00
let oldest_block_num = head_block_num . saturating_sub ( block_data_limit ) ;
2022-07-19 04:31:12 +03:00
2023-01-26 08:24:09 +03:00
if needed_block_num < & oldest_block_num {
trace! (
" {} needs {} but the oldest available is {} " ,
self ,
needed_block_num ,
oldest_block_num
) ;
return false ;
}
true
2022-05-05 22:07:09 +03:00
}
2023-05-23 01:32:15 +03:00
/// query the web3 provider to confirm it is on the expected chain with the expected data available
2023-06-13 04:58:51 +03:00
/// TODO: this currently checks only the http if both http and ws are set. it should check both and make sure they match
2023-05-24 00:40:34 +03:00
async fn check_provider ( self : & Arc < Self > , chain_id : u64 ) -> Web3ProxyResult < ( ) > {
2023-05-23 01:32:15 +03:00
// check the server's chain_id here
// TODO: some public rpcs (on bsc and fantom) do not return an id and so this ends up being an error
// TODO: what should the timeout be? should there be a request timeout?
// trace!("waiting on chain id for {}", self);
2023-05-31 02:32:34 +03:00
let found_chain_id : U64 = self
. internal_request ( " eth_chainId " , & ( ) , Some ( Level ::Trace . into ( ) ) )
. await ? ;
2023-05-23 01:32:15 +03:00
trace! ( " found_chain_id: {:#?} " , found_chain_id ) ;
2023-05-31 02:32:34 +03:00
if chain_id ! = found_chain_id . as_u64 ( ) {
return Err ( anyhow ::anyhow! (
" incorrect chain id! Config has {}, but RPC has {} " ,
chain_id ,
found_chain_id
)
. context ( format! ( " failed @ {} " , self ) )
. into ( ) ) ;
2023-05-23 01:32:15 +03:00
}
2022-12-06 00:13:36 +03:00
2023-05-31 02:32:34 +03:00
self . check_block_data_limit ( )
2023-05-23 01:32:15 +03:00
. await
. context ( format! ( " unable to check_block_data_limit of {} " , self ) ) ? ;
2023-02-28 00:29:07 +03:00
2023-05-23 01:32:15 +03:00
info! ( " successfully connected to {} " , self ) ;
2023-02-28 00:29:07 +03:00
2023-02-27 10:52:37 +03:00
Ok ( ( ) )
}
2023-05-13 01:15:32 +03:00
pub ( crate ) async fn send_head_block_result (
2022-08-26 20:26:17 +03:00
self : & Arc < Self > ,
2023-05-24 00:40:34 +03:00
new_head_block : Web3ProxyResult < Option < ArcBlock > > ,
2023-06-13 20:00:08 +03:00
block_and_rpc_sender : & flume ::Sender < BlockAndRpc > ,
2023-05-24 00:40:34 +03:00
block_map : & BlocksByHashCache ,
) -> Web3ProxyResult < ( ) > {
2023-06-13 20:00:08 +03:00
let head_block_sender = self . head_block . as_ref ( ) . unwrap ( ) ;
2023-02-28 22:01:34 +03:00
2023-06-13 19:31:22 +03:00
let new_head_block = match new_head_block {
Ok ( x ) = > {
let x = x . and_then ( Web3ProxyBlock ::try_new ) ;
match x {
None = > {
if head_block_sender . borrow ( ) . is_none ( ) {
// we previously sent a None. return early
return Ok ( ( ) ) ;
}
2022-11-06 23:52:11 +03:00
2023-06-13 19:31:22 +03:00
let age = self . created_at . unwrap ( ) . elapsed ( ) . as_millis ( ) ;
2022-11-06 23:52:11 +03:00
2023-06-13 19:31:22 +03:00
debug! ( " clearing head block on {} ({}ms old)! " , self , age ) ;
2023-02-14 23:14:50 +03:00
2023-06-13 20:00:08 +03:00
// send an empty block to take this server out of rotation
2023-06-13 19:31:22 +03:00
head_block_sender . send_replace ( None ) ;
2022-08-26 20:26:17 +03:00
2023-06-13 20:00:08 +03:00
// TODO: clear self.block_data_limit?
2023-06-13 19:31:22 +03:00
None
}
Some ( new_head_block ) = > {
let new_hash = * new_head_block . hash ( ) ;
// if we already have this block saved, set new_head_block to that arc. otherwise store this copy
let new_head_block = block_map
. get_with_by_ref ( & new_hash , async move { new_head_block } )
. await ;
2023-06-13 20:00:08 +03:00
// we are synced! yey!
2023-06-13 19:31:22 +03:00
head_block_sender . send_replace ( Some ( new_head_block . clone ( ) ) ) ;
if self . block_data_limit ( ) = = U64 ::zero ( ) {
if let Err ( err ) = self . check_block_data_limit ( ) . await {
warn! (
" failed checking block limit after {} finished syncing. {:?} " ,
self , err
) ;
}
}
2022-07-19 04:31:12 +03:00
2023-06-13 19:31:22 +03:00
Some ( new_head_block )
2023-01-03 22:54:24 +03:00
}
}
2022-05-15 09:27:13 +03:00
}
2022-11-06 23:52:11 +03:00
Err ( err ) = > {
2022-11-12 11:24:32 +03:00
warn! ( " unable to get block from {}. err={:?} " , self , err ) ;
2022-11-06 23:52:11 +03:00
2023-06-13 20:00:08 +03:00
// send an empty block to take this server out of rotation
2023-06-13 19:31:22 +03:00
head_block_sender . send_replace ( None ) ;
2022-08-07 09:48:57 +03:00
2023-06-13 20:00:08 +03:00
// TODO: clear self.block_data_limit?
2022-12-03 08:31:03 +03:00
None
2022-05-15 09:27:13 +03:00
}
2022-12-03 08:31:03 +03:00
} ;
2023-06-13 20:00:08 +03:00
// tell web3rpcs about this rpc having this block
block_and_rpc_sender
2023-05-13 21:13:02 +03:00
. send_async ( ( new_head_block , self . clone ( ) ) )
2022-12-03 08:31:03 +03:00
. await
2023-06-13 20:00:08 +03:00
. context ( " block_and_rpc_sender failed sending " ) ? ;
2022-05-30 07:30:13 +03:00
Ok ( ( ) )
2022-05-15 09:27:13 +03:00
}
2023-03-02 21:27:32 +03:00
fn should_disconnect ( & self ) -> bool {
* self . disconnect_watch . as_ref ( ) . unwrap ( ) . borrow ( )
}
2023-05-24 00:40:34 +03:00
async fn healthcheck (
self : & Arc < Self > ,
2023-05-31 02:32:34 +03:00
error_handler : Option < RequestErrorHandler > ,
2023-05-24 00:40:34 +03:00
) -> Web3ProxyResult < ( ) > {
2023-06-13 20:00:08 +03:00
let head_block = self . head_block . as_ref ( ) . unwrap ( ) . borrow ( ) . clone ( ) ;
2023-05-24 00:40:34 +03:00
if let Some ( head_block ) = head_block {
let head_block = head_block . block ;
// TODO: if head block is very old and not expected to be syncing, emit warning
let block_number = head_block . number . context ( " no block number " ) ? ;
let to = if let Some ( txid ) = head_block . transactions . last ( ) . cloned ( ) {
let tx = self
2023-05-31 02:32:34 +03:00
. internal_request ::< _ , Option < Transaction > > (
2023-05-24 00:40:34 +03:00
" eth_getTransactionByHash " ,
& ( txid , ) ,
error_handler ,
)
. await ?
. context ( " no transaction " ) ? ;
// TODO: what default? something real?
tx . to . unwrap_or_else ( | | {
" 0xdead00000000000000000000000000000000beef "
. parse ::< Address > ( )
. expect ( " deafbeef " )
} )
} else {
" 0xdead00000000000000000000000000000000beef "
. parse ::< Address > ( )
. expect ( " deafbeef " )
} ;
let _code = self
2023-05-31 02:32:34 +03:00
. internal_request ::< _ , Option < Bytes > > (
2023-05-24 00:40:34 +03:00
" eth_getCode " ,
& ( to , block_number ) ,
error_handler ,
)
. await ? ;
} else {
// TODO: if head block is none for too long, give an error
}
Ok ( ( ) )
}
2023-06-13 04:58:51 +03:00
#[ allow(clippy::too_many_arguments) ]
async fn subscribe_with_reconnect (
self : Arc < Self > ,
block_map : BlocksByHashCache ,
2023-06-16 10:46:27 +03:00
block_and_rpc_sender : Option < flume ::Sender < BlockAndRpc > > ,
2023-06-13 04:58:51 +03:00
chain_id : u64 ,
tx_id_sender : Option < flume ::Sender < ( TxHash , Arc < Self > ) > > ,
) -> Web3ProxyResult < ( ) > {
loop {
if let Err ( err ) = self
. clone ( )
. subscribe (
block_map . clone ( ) ,
2023-06-16 10:46:27 +03:00
block_and_rpc_sender . clone ( ) ,
2023-06-13 04:58:51 +03:00
chain_id ,
tx_id_sender . clone ( ) ,
)
. await
{
if self . should_disconnect ( ) {
break ;
}
warn! ( " {} subscribe err: {:#?} " , self , err )
} else if self . should_disconnect ( ) {
break ;
}
if self . backup {
debug! ( " reconnecting to {} in 30 seconds " , self ) ;
} else {
info! ( " reconnecting to {} in 30 seconds " , self ) ;
}
// TODO: exponential backoff with jitter
sleep ( Duration ::from_secs ( 30 ) ) . await ;
}
Ok ( ( ) )
}
2023-05-23 01:32:15 +03:00
/// subscribe to blocks and transactions
2022-12-06 00:13:36 +03:00
/// This should only exit when the program is exiting.
2023-05-24 00:40:34 +03:00
/// TODO: should more of these args be on self? chain_id for sure
2022-06-14 08:43:28 +03:00
async fn subscribe (
2022-06-14 07:04:14 +03:00
self : Arc < Self > ,
2023-02-26 10:52:33 +03:00
block_map : BlocksByHashCache ,
2023-06-16 10:46:27 +03:00
block_and_rpc_sender : Option < flume ::Sender < BlockAndRpc > > ,
2022-12-06 00:13:36 +03:00
chain_id : u64 ,
2023-05-13 21:13:02 +03:00
tx_id_sender : Option < flume ::Sender < ( TxHash , Arc < Self > ) > > ,
2023-05-24 00:40:34 +03:00
) -> Web3ProxyResult < ( ) > {
2023-01-26 08:24:09 +03:00
let error_handler = if self . backup {
2023-05-31 02:32:34 +03:00
Some ( RequestErrorHandler ::DebugLevel )
2023-02-16 02:31:59 +03:00
} else {
2023-05-31 02:32:34 +03:00
Some ( RequestErrorHandler ::ErrorLevel )
2023-02-16 02:31:59 +03:00
} ;
2023-03-31 14:43:41 +03:00
2023-06-16 10:46:27 +03:00
if self . should_disconnect ( ) {
return Ok ( ( ) ) ;
}
2023-06-13 04:58:51 +03:00
if let Some ( url ) = self . ws_url . clone ( ) {
2023-06-17 09:46:20 +03:00
trace! ( " starting websocket provider on {} " , self ) ;
2023-06-13 04:58:51 +03:00
let x = connect_ws ( url , usize ::MAX ) . await ? ;
let x = Arc ::new ( x ) ;
self . ws_provider . store ( Some ( x ) ) ;
}
2023-06-16 10:46:27 +03:00
if self . should_disconnect ( ) {
return Ok ( ( ) ) ;
}
2023-06-17 09:46:20 +03:00
trace! ( " starting subscriptions on {} " , self ) ;
2022-06-16 05:53:37 +03:00
2023-05-24 00:40:34 +03:00
self . check_provider ( chain_id ) . await ? ;
2023-03-01 23:56:00 +03:00
2023-05-24 00:40:34 +03:00
let mut futures = vec! [ ] ;
2023-02-28 00:29:07 +03:00
2023-06-13 04:58:51 +03:00
// TODO: use this channel instead of self.disconnect_watch
2023-06-16 10:46:27 +03:00
let ( subscribe_stop_tx , subscribe_stop_rx ) = watch ::channel ( false ) ;
2023-06-13 04:58:51 +03:00
2023-06-16 10:46:27 +03:00
// subscribe to the disconnect watch. the app uses this when shutting down or when configs change
2023-06-13 04:58:51 +03:00
if let Some ( disconnect_watch_tx ) = self . disconnect_watch . as_ref ( ) {
2023-06-16 10:46:27 +03:00
let clone = self . clone ( ) ;
2023-06-13 04:58:51 +03:00
let mut disconnect_watch_rx = disconnect_watch_tx . subscribe ( ) ;
let f = async move {
2023-06-16 10:46:27 +03:00
loop {
if * disconnect_watch_rx . borrow_and_update ( ) {
info! ( " disconnect triggered on {} " , clone ) ;
break ;
}
disconnect_watch_rx . changed ( ) . await ? ;
2023-06-13 04:58:51 +03:00
}
Ok ( ( ) )
} ;
futures . push ( flatten_handle ( tokio ::spawn ( f ) ) ) ;
}
2023-05-24 00:40:34 +03:00
// health check that runs if there haven't been any recent requests
{
// TODO: move this into a proper function
let rpc = self . clone ( ) ;
// TODO: how often? different depending on the chain?
// TODO: reset this timeout when a new block is seen? we need to keep request_latency updated though
2023-06-09 22:21:50 +03:00
let health_sleep_seconds = 5 ;
2023-05-24 00:40:34 +03:00
// health check loop
let f = async move {
// TODO: benchmark this and lock contention
let mut old_total_requests = 0 ;
let mut new_total_requests ;
2023-06-13 04:58:51 +03:00
// errors here should not cause the loop to exit!
while ! ( * subscribe_stop_rx . borrow ( ) ) {
2023-06-13 21:51:19 +03:00
new_total_requests = rpc . internal_requests . load ( atomic ::Ordering ::Relaxed )
+ rpc . external_requests . load ( atomic ::Ordering ::Relaxed ) ;
2023-05-24 00:40:34 +03:00
2023-06-09 22:21:50 +03:00
if new_total_requests - old_total_requests < 5 {
2023-05-24 00:40:34 +03:00
// TODO: if this fails too many times, reset the connection
// TODO: move this into a function and the chaining should be easier
2023-05-31 02:32:34 +03:00
if let Err ( err ) = rpc . healthcheck ( error_handler ) . await {
2023-05-24 00:40:34 +03:00
// TODO: different level depending on the error handler
warn! ( " health checking {} failed: {:?} " , rpc , err ) ;
2022-12-06 00:13:36 +03:00
}
}
2023-05-24 00:40:34 +03:00
// TODO: should we count the requests done inside this health check
old_total_requests = new_total_requests ;
2022-12-06 00:13:36 +03:00
2023-05-24 00:40:34 +03:00
sleep ( Duration ::from_secs ( health_sleep_seconds ) ) . await ;
2022-09-14 04:43:09 +03:00
}
2023-03-23 04:43:13 +03:00
2023-05-24 00:40:34 +03:00
debug! ( " healthcheck loop on {} exited " , rpc ) ;
2023-03-23 04:43:13 +03:00
2023-05-24 00:40:34 +03:00
Ok ( ( ) )
} ;
2023-03-23 04:43:13 +03:00
2023-05-24 00:40:34 +03:00
futures . push ( flatten_handle ( tokio ::spawn ( f ) ) ) ;
}
2023-03-23 04:43:13 +03:00
2023-05-24 00:40:34 +03:00
// subscribe to new heads
2023-06-16 10:46:27 +03:00
if let Some ( block_and_rpc_sender ) = block_and_rpc_sender . clone ( ) {
2023-06-13 04:58:51 +03:00
let clone = self . clone ( ) ;
let subscribe_stop_rx = subscribe_stop_tx . subscribe ( ) ;
let f = async move {
let x = clone
2023-06-16 10:46:27 +03:00
. subscribe_new_heads (
block_and_rpc_sender . clone ( ) ,
block_map . clone ( ) ,
subscribe_stop_rx ,
)
2023-06-13 04:58:51 +03:00
. await ;
// error or success, we clear the block when subscribe_new_heads exits
clone
2023-06-16 10:46:27 +03:00
. send_head_block_result ( Ok ( None ) , & block_and_rpc_sender , & block_map )
2023-06-13 04:58:51 +03:00
. await ? ;
x
} ;
// TODO: if
2023-03-31 14:43:41 +03:00
2023-05-24 00:40:34 +03:00
futures . push ( flatten_handle ( tokio ::spawn ( f ) ) ) ;
}
2023-03-23 04:43:13 +03:00
2023-05-24 00:40:34 +03:00
// subscribe pending transactions
// TODO: make this opt-in. its a lot of bandwidth
if let Some ( tx_id_sender ) = tx_id_sender {
2023-06-13 04:58:51 +03:00
let subscribe_stop_rx = subscribe_stop_tx . subscribe ( ) ;
let f = self
. clone ( )
. subscribe_pending_transactions ( tx_id_sender , subscribe_stop_rx ) ;
2023-03-23 04:43:13 +03:00
2023-05-24 00:40:34 +03:00
futures . push ( flatten_handle ( tokio ::spawn ( f ) ) ) ;
}
// try_join on the futures
if let Err ( err ) = try_join_all ( futures ) . await {
warn! ( " subscription erred: {:?} " , err ) ;
2022-06-14 07:04:14 +03:00
}
2023-05-24 00:40:34 +03:00
debug! ( " subscriptions on {} exited " , self ) ;
2023-05-23 01:32:15 +03:00
2023-06-13 04:58:51 +03:00
subscribe_stop_tx . send_replace ( true ) ;
// TODO: wait for all of the futures to exit?
2022-09-06 16:14:15 +03:00
2023-06-16 10:46:27 +03:00
// TODO: tell ethers to disconnect?
self . ws_provider . store ( None ) ;
2022-06-14 07:04:14 +03:00
Ok ( ( ) )
}
2023-03-02 21:14:17 +03:00
/// Subscribe to new blocks.
async fn subscribe_new_heads (
2023-06-13 04:58:51 +03:00
self : & Arc < Self > ,
2023-05-13 21:13:02 +03:00
block_sender : flume ::Sender < BlockAndRpc > ,
2023-03-02 21:14:17 +03:00
block_map : BlocksByHashCache ,
2023-06-13 04:58:51 +03:00
subscribe_stop_rx : watch ::Receiver < bool > ,
2023-05-24 00:40:34 +03:00
) -> Web3ProxyResult < ( ) > {
2023-06-17 09:46:20 +03:00
trace! ( " subscribing to new heads on {} " , self ) ;
2023-03-02 21:14:17 +03:00
2023-05-31 02:32:34 +03:00
// TODO: different handler depending on backup or not
let error_handler = None ;
let authorization = Default ::default ( ) ;
2023-06-13 04:58:51 +03:00
if let Some ( ws_provider ) = self . ws_provider . load ( ) . as_ref ( ) {
2023-05-24 00:40:34 +03:00
// todo: move subscribe_blocks onto the request handle
2023-05-31 02:32:34 +03:00
let active_request_handle = self
. wait_for_request_handle ( & authorization , None , error_handler )
. await ;
2023-05-24 00:40:34 +03:00
let mut blocks = ws_provider . subscribe_blocks ( ) . await ? ;
drop ( active_request_handle ) ;
// query the block once since the subscription doesn't send the current block
// there is a very small race condition here where the stream could send us a new block right now
// but all seeing the same block twice won't break anything
// TODO: how does this get wrapped in an arc? does ethers handle that?
2023-06-13 04:58:51 +03:00
// TODO: send this request to the ws_provider instead of the http_provider
2023-05-24 00:40:34 +03:00
let latest_block : Result < Option < ArcBlock > , _ > = self
2023-05-31 02:32:34 +03:00
. authorized_request (
2023-05-24 00:40:34 +03:00
" eth_getBlockByNumber " ,
2023-05-31 02:32:34 +03:00
& ( " latest " , false ) ,
& authorization ,
Some ( Level ::Warn . into ( ) ) ,
2023-05-24 00:40:34 +03:00
)
. await ;
2022-11-06 23:52:11 +03:00
2023-05-24 00:40:34 +03:00
self . send_head_block_result ( latest_block , & block_sender , & block_map )
. await ? ;
2022-07-19 04:31:12 +03:00
2023-05-24 00:40:34 +03:00
while let Some ( block ) = blocks . next ( ) . await {
2023-06-13 04:58:51 +03:00
if * subscribe_stop_rx . borrow ( ) {
2023-06-16 10:46:27 +03:00
trace! ( " stopping ws block subscription on {} " , self ) ;
2023-05-24 00:40:34 +03:00
break ;
2022-06-14 07:04:14 +03:00
}
2023-01-03 18:51:18 +03:00
2023-05-24 00:40:34 +03:00
let block = Arc ::new ( block ) ;
2022-07-09 01:14:45 +03:00
2023-05-24 00:40:34 +03:00
self . send_head_block_result ( Ok ( Some ( block ) ) , & block_sender , & block_map )
2023-02-12 12:22:53 +03:00
. await ? ;
2023-05-24 00:40:34 +03:00
}
2023-06-13 08:26:10 +03:00
} else if self . http_provider . is_some ( ) {
2023-05-24 00:40:34 +03:00
// there is a "watch_blocks" function, but a lot of public nodes do not support the necessary rpc endpoints
2023-06-13 08:26:10 +03:00
// TODO: is 1/2 the block time okay?
let mut i = interval ( self . block_interval / 2 ) ;
i . set_missed_tick_behavior ( MissedTickBehavior ::Delay ) ;
2023-02-12 12:22:53 +03:00
2023-06-13 08:26:10 +03:00
loop {
2023-06-13 04:58:51 +03:00
if * subscribe_stop_rx . borrow ( ) {
2023-06-16 10:46:27 +03:00
trace! ( " stopping http block subscription on {} " , self ) ;
2023-05-24 00:40:34 +03:00
break ;
}
2022-11-06 23:52:11 +03:00
2023-06-13 08:26:10 +03:00
let block_result = self
. authorized_request ::< _ , Option < ArcBlock > > (
" eth_getBlockByNumber " ,
& ( " latest " , false ) ,
& authorization ,
Some ( Level ::Warn . into ( ) ) ,
)
. await ;
2023-02-12 12:22:53 +03:00
2023-06-13 08:26:10 +03:00
self . send_head_block_result ( block_result , & block_sender , & block_map )
2023-02-12 12:22:53 +03:00
. await ? ;
2023-06-13 08:26:10 +03:00
i . tick ( ) . await ;
2022-05-17 05:26:47 +03:00
}
2023-05-24 00:40:34 +03:00
} else {
unimplemented! ( " no ws or http provider! " )
2022-06-14 07:04:14 +03:00
}
2023-02-27 10:52:37 +03:00
// clear the head block. this might not be needed, but it won't hurt
2023-05-24 00:40:34 +03:00
self . send_head_block_result ( Ok ( None ) , & block_sender , & block_map )
2023-02-27 10:52:37 +03:00
. await ? ;
2023-06-13 04:58:51 +03:00
if * subscribe_stop_rx . borrow ( ) {
2023-06-16 10:46:27 +03:00
debug! ( " new heads subscription exited " ) ;
2023-03-23 02:16:15 +03:00
Ok ( ( ) )
} else {
2023-05-24 00:40:34 +03:00
Err ( anyhow! ( " new_heads subscription exited. reconnect needed " ) . into ( ) )
2023-03-23 02:16:15 +03:00
}
2022-06-14 07:04:14 +03:00
}
2022-05-17 05:26:47 +03:00
2023-02-12 12:22:53 +03:00
/// Turn on the firehose of pending transactions
2022-06-14 07:04:14 +03:00
async fn subscribe_pending_transactions (
self : Arc < Self > ,
2023-05-13 21:13:02 +03:00
tx_id_sender : flume ::Sender < ( TxHash , Arc < Self > ) > ,
2023-06-13 04:58:51 +03:00
mut subscribe_stop_rx : watch ::Receiver < bool > ,
2023-05-24 00:40:34 +03:00
) -> Web3ProxyResult < ( ) > {
2023-06-16 10:46:27 +03:00
// TODO: check that it actually changed to true
loop {
if * subscribe_stop_rx . borrow_and_update ( ) {
break ;
}
subscribe_stop_rx . changed ( ) . await ? ;
}
2023-03-02 19:52:28 +03:00
2023-05-23 01:32:15 +03:00
/*
2023-02-12 12:22:53 +03:00
trace! ( " watching pending transactions on {} " , self ) ;
// TODO: does this keep the lock open for too long?
2023-03-02 21:27:32 +03:00
match provider . as_ref ( ) {
2023-03-17 05:38:11 +03:00
Web3Provider ::Http ( _provider ) = > {
2023-02-12 12:22:53 +03:00
// there is a "watch_pending_transactions" function, but a lot of public nodes do not support the necessary rpc endpoints
2023-03-02 21:27:32 +03:00
self . wait_for_disconnect ( ) . await ? ;
2023-02-12 12:22:53 +03:00
}
2023-03-02 21:27:32 +03:00
Web3Provider ::Both ( _ , client ) | Web3Provider ::Ws ( client ) = > {
2023-02-12 12:22:53 +03:00
// TODO: maybe the subscribe_pending_txs function should be on the active_request_handle
let active_request_handle = self
2023-03-02 21:27:32 +03:00
. wait_for_request_handle ( & authorization , None , Some ( provider . clone ( ) ) )
2023-02-12 12:22:53 +03:00
. await ? ;
2022-06-14 07:04:14 +03:00
2023-02-12 12:22:53 +03:00
let mut stream = client . subscribe_pending_txs ( ) . await ? ;
2022-06-14 07:04:14 +03:00
2023-02-12 12:22:53 +03:00
drop ( active_request_handle ) ;
2022-08-11 00:29:50 +03:00
2023-02-12 12:22:53 +03:00
while let Some ( pending_tx_id ) = stream . next ( ) . await {
tx_id_sender
2023-05-13 21:13:02 +03:00
. send_async ( ( pending_tx_id , self . clone ( ) ) )
2023-02-12 12:22:53 +03:00
. await
. context ( " tx_id_sender " ) ? ;
2022-07-08 21:27:06 +03:00
2023-02-12 12:22:53 +03:00
// TODO: periodically check for listeners. if no one is subscribed, unsubscribe and wait for a subscription
2023-02-27 10:52:37 +03:00
// TODO: select on this instead of checking every loop
if self . should_disconnect ( ) {
break ;
}
2022-06-14 07:04:14 +03:00
}
2023-02-12 12:22:53 +03:00
// TODO: is this always an error?
// TODO: we probably don't want a warn and to return error
2023-02-27 10:52:37 +03:00
debug! ( " pending_transactions subscription ended on {} " , self ) ;
2022-05-05 22:07:09 +03:00
}
2023-02-12 12:22:53 +03:00
#[ cfg(test) ]
2023-03-02 21:27:32 +03:00
Web3Provider ::Mock = > {
self . wait_for_disconnect ( ) . await ? ;
2023-02-27 10:52:37 +03:00
}
2022-05-05 22:07:09 +03:00
}
2023-05-23 01:32:15 +03:00
* /
2022-05-05 22:07:09 +03:00
2023-06-13 04:58:51 +03:00
if * subscribe_stop_rx . borrow ( ) {
2023-03-23 02:16:15 +03:00
Ok ( ( ) )
} else {
2023-05-24 00:40:34 +03:00
Err ( anyhow! ( " pending_transactions subscription exited. reconnect needed " ) . into ( ) )
2023-03-23 02:16:15 +03:00
}
2022-05-05 22:07:09 +03:00
}
2023-05-31 02:32:34 +03:00
pub async fn wait_for_request_handle (
self : & Arc < Self > ,
authorization : & Arc < Authorization > ,
2023-01-25 09:45:20 +03:00
max_wait : Option < Duration > ,
2023-05-31 02:32:34 +03:00
error_handler : Option < RequestErrorHandler > ,
2023-03-20 04:52:28 +03:00
) -> Web3ProxyResult < OpenRequestHandle > {
2023-05-31 02:32:34 +03:00
let max_wait_until = max_wait . map ( | x | Instant ::now ( ) + x ) ;
2022-05-16 22:15:40 +03:00
2022-06-17 01:23:41 +03:00
loop {
2023-05-31 02:32:34 +03:00
match self . try_request_handle ( authorization , error_handler ) . await {
2022-08-24 03:59:05 +03:00
Ok ( OpenRequestResult ::Handle ( handle ) ) = > return Ok ( handle ) ,
2022-08-24 03:14:49 +03:00
Ok ( OpenRequestResult ::RetryAt ( retry_at ) ) = > {
2022-08-07 09:48:57 +03:00
// TODO: emit a stat?
2023-01-25 09:45:20 +03:00
let wait = retry_at . duration_since ( Instant ::now ( ) ) ;
trace! (
" waiting {} millis for request handle on {} " ,
wait . as_millis ( ) ,
self
) ;
2023-05-31 02:32:34 +03:00
if let Some ( max_wait_until ) = max_wait_until {
if retry_at > max_wait_until {
2023-01-25 09:45:20 +03:00
// break now since we will wait past our maximum wait time
2023-03-20 04:52:28 +03:00
return Err ( Web3ProxyError ::Timeout ( None ) ) ;
2023-01-25 09:45:20 +03:00
}
2022-09-20 09:00:27 +03:00
}
2023-01-25 07:44:50 +03:00
2022-08-07 09:48:57 +03:00
sleep_until ( retry_at ) . await ;
}
2023-02-15 04:41:40 +03:00
Ok ( OpenRequestResult ::NotReady ) = > {
2022-08-24 03:59:05 +03:00
// TODO: when can this happen? log? emit a stat?
2023-01-25 07:44:50 +03:00
trace! ( " {} has no handle ready " , self ) ;
2023-05-31 02:32:34 +03:00
if let Some ( max_wait_until ) = max_wait_until {
if Instant ::now ( ) > max_wait_until {
2023-03-20 04:52:28 +03:00
return Err ( Web3ProxyError ::NoHandleReady ) ;
2023-01-25 09:45:20 +03:00
}
2023-01-25 07:44:50 +03:00
}
2022-08-07 09:48:57 +03:00
// TODO: sleep how long? maybe just error?
2023-05-24 00:40:34 +03:00
// TODO: instead of an arbitrary sleep, subscribe to the head block on this?
2023-01-25 07:44:50 +03:00
sleep ( Duration ::from_millis ( 10 ) ) . await ;
2022-05-06 07:29:25 +03:00
}
2022-08-07 09:48:57 +03:00
Err ( err ) = > return Err ( err ) ,
2022-05-06 07:29:25 +03:00
}
2022-05-05 22:07:09 +03:00
}
}
2022-09-22 23:27:14 +03:00
pub async fn try_request_handle (
self : & Arc < Self > ,
2022-11-08 22:58:11 +03:00
authorization : & Arc < Authorization > ,
2023-05-31 02:32:34 +03:00
error_handler : Option < RequestErrorHandler > ,
2023-03-20 04:52:28 +03:00
) -> Web3ProxyResult < OpenRequestResult > {
2023-05-23 01:32:15 +03:00
// TODO: if websocket is reconnecting, return an error?
2022-12-08 09:54:38 +03:00
2023-05-19 01:11:29 +03:00
// check cached rate limits
2023-01-25 07:44:50 +03:00
if let Some ( hard_limit_until ) = self . hard_limit_until . as_ref ( ) {
2023-02-27 09:44:09 +03:00
let hard_limit_ready = * hard_limit_until . borrow ( ) ;
2023-01-25 07:44:50 +03:00
let now = Instant ::now ( ) ;
if now < hard_limit_ready {
return Ok ( OpenRequestResult ::RetryAt ( hard_limit_ready ) ) ;
}
}
2023-05-19 01:11:29 +03:00
// check shared rate limits
2022-05-22 02:34:05 +03:00
if let Some ( ratelimiter ) = self . hard_limit . as_ref ( ) {
2022-09-20 09:56:24 +03:00
// TODO: how should we know if we should set expire or not?
2023-01-25 09:45:20 +03:00
match ratelimiter
. throttle ( )
. await
. context ( format! ( " attempting to throttle {} " , self ) ) ?
{
2022-09-15 20:57:24 +03:00
RedisRateLimitResult ::Allowed ( _ ) = > {
2023-01-25 07:44:50 +03:00
// trace!("rate limit succeeded")
2022-05-05 22:07:09 +03:00
}
2022-09-15 20:57:24 +03:00
RedisRateLimitResult ::RetryAt ( retry_at , _ ) = > {
2023-01-25 09:45:20 +03:00
// rate limit gave us a wait time
2023-05-19 01:11:29 +03:00
// if not a backup server, warn. backups hit rate limits often
2023-01-25 09:45:20 +03:00
if ! self . backup {
let when = retry_at . duration_since ( Instant ::now ( ) ) ;
warn! (
" Exhausted rate limit on {}. Retry in {}ms " ,
self ,
when . as_millis ( )
) ;
}
2022-05-05 22:07:09 +03:00
2023-01-25 07:44:50 +03:00
if let Some ( hard_limit_until ) = self . hard_limit_until . as_ref ( ) {
2023-02-27 09:44:09 +03:00
hard_limit_until . send_replace ( retry_at ) ;
2023-01-25 07:44:50 +03:00
}
2022-09-10 03:12:14 +03:00
return Ok ( OpenRequestResult ::RetryAt ( retry_at ) ) ;
2022-08-07 09:48:57 +03:00
}
2022-09-15 20:57:24 +03:00
RedisRateLimitResult ::RetryNever = > {
2023-05-19 01:11:29 +03:00
warn! ( " how did retry never on {} happen? " , self ) ;
2023-02-15 04:41:40 +03:00
return Ok ( OpenRequestResult ::NotReady ) ;
2022-05-05 22:07:09 +03:00
}
}
} ;
2023-05-31 02:32:34 +03:00
let handle =
OpenRequestHandle ::new ( authorization . clone ( ) , self . clone ( ) , error_handler ) . await ;
2022-08-24 03:11:49 +03:00
2023-05-31 02:32:34 +03:00
Ok ( handle . into ( ) )
2022-08-24 03:11:49 +03:00
}
2023-02-16 11:26:58 +03:00
2023-05-31 02:32:34 +03:00
pub async fn internal_request < P : JsonRpcParams , R : JsonRpcResultData > (
2023-02-16 11:26:58 +03:00
self : & Arc < Self > ,
method : & str ,
params : & P ,
2023-05-31 02:32:34 +03:00
error_handler : Option < RequestErrorHandler > ,
) -> Web3ProxyResult < R > {
let authorization = Default ::default ( ) ;
self . authorized_request ( method , params , & authorization , error_handler )
. await
}
pub async fn authorized_request < P : JsonRpcParams , R : JsonRpcResultData > (
self : & Arc < Self > ,
method : & str ,
params : & P ,
authorization : & Arc < Authorization > ,
error_handler : Option < RequestErrorHandler > ,
) -> Web3ProxyResult < R > {
2023-05-24 00:40:34 +03:00
// TODO: take max_wait as a function argument?
let x = self
2023-05-31 02:32:34 +03:00
. wait_for_request_handle ( authorization , None , error_handler )
2023-02-16 11:26:58 +03:00
. await ?
2023-05-31 02:32:34 +03:00
. request ::< P , R > ( method , params )
2023-05-24 00:40:34 +03:00
. await ? ;
Ok ( x )
2023-02-16 11:26:58 +03:00
}
2022-08-24 03:11:49 +03:00
}
2022-08-07 09:48:57 +03:00
2023-02-06 20:55:27 +03:00
impl Hash for Web3Rpc {
2022-06-14 07:04:14 +03:00
fn hash < H : Hasher > ( & self , state : & mut H ) {
2023-06-09 22:21:50 +03:00
// do not include automatic block limit because it can change
// do not include tier because it can change
self . backup . hash ( state ) ;
self . created_at . hash ( state ) ;
2023-02-28 22:01:34 +03:00
self . display_name . hash ( state ) ;
2023-06-09 22:21:50 +03:00
self . name . hash ( state ) ;
2023-05-23 01:32:15 +03:00
// TODO: url does NOT include the authorization data. i think created_at should protect us if auth changes without anything else
2023-06-09 22:21:50 +03:00
self . http_provider . as_ref ( ) . map ( | x | x . url ( ) ) . hash ( state ) ;
// TODO: figure out how to get the url for the ws provider
2023-05-23 01:32:15 +03:00
// self.ws_provider.map(|x| x.url()).hash(state);
2023-06-09 22:21:50 +03:00
2023-05-19 01:11:29 +03:00
// TODO: don't include soft_limit if we change them to be dynamic
2023-02-28 22:01:34 +03:00
self . soft_limit . hash ( state ) ;
2022-06-14 07:04:14 +03:00
}
}
2023-02-06 20:55:27 +03:00
impl Eq for Web3Rpc { }
2022-05-05 22:07:09 +03:00
2023-02-06 20:55:27 +03:00
impl Ord for Web3Rpc {
2022-05-05 22:07:09 +03:00
fn cmp ( & self , other : & Self ) -> std ::cmp ::Ordering {
2022-08-24 03:32:16 +03:00
self . name . cmp ( & other . name )
2022-05-05 22:07:09 +03:00
}
}
2023-02-06 20:55:27 +03:00
impl PartialOrd for Web3Rpc {
2022-05-05 22:07:09 +03:00
fn partial_cmp ( & self , other : & Self ) -> Option < Ordering > {
Some ( self . cmp ( other ) )
}
}
2023-02-06 20:55:27 +03:00
impl PartialEq for Web3Rpc {
2022-05-05 22:07:09 +03:00
fn eq ( & self , other : & Self ) -> bool {
2022-08-24 03:32:16 +03:00
self . name = = other . name
2022-05-05 22:07:09 +03:00
}
}
2022-08-10 08:56:09 +03:00
2023-02-06 20:55:27 +03:00
impl Serialize for Web3Rpc {
2022-08-10 08:56:09 +03:00
fn serialize < S > ( & self , serializer : S ) -> Result < S ::Ok , S ::Error >
where
S : Serializer ,
{
// 3 is the number of fields in the struct.
2023-06-13 21:51:19 +03:00
let mut state = serializer . serialize_struct ( " Web3Rpc " , 13 ) ? ;
2022-08-10 08:56:09 +03:00
2022-11-14 00:05:37 +03:00
// the url is excluded because it likely includes private information. just show the name that we use in keys
2022-08-10 08:56:09 +03:00
state . serialize_field ( " name " , & self . name ) ? ;
2022-11-14 00:05:37 +03:00
// a longer name for display to users
state . serialize_field ( " display_name " , & self . display_name ) ? ;
2022-08-10 08:56:09 +03:00
2023-02-02 19:00:59 +03:00
state . serialize_field ( " backup " , & self . backup ) ? ;
2023-05-13 09:00:03 +03:00
match self . block_data_limit . load ( atomic ::Ordering ::Acquire ) {
2022-12-28 05:17:11 +03:00
u64 ::MAX = > {
state . serialize_field ( " block_data_limit " , & None ::< ( ) > ) ? ;
}
block_data_limit = > {
state . serialize_field ( " block_data_limit " , & block_data_limit ) ? ;
}
2022-09-06 23:12:45 +03:00
}
2022-08-10 08:56:09 +03:00
2023-01-04 09:37:51 +03:00
state . serialize_field ( " tier " , & self . tier ) ? ;
2023-01-05 01:33:39 +03:00
2022-08-10 08:56:09 +03:00
state . serialize_field ( " soft_limit " , & self . soft_limit ) ? ;
2023-02-15 23:33:43 +03:00
// TODO: maybe this is too much data. serialize less?
2023-05-13 01:15:32 +03:00
{
2023-06-13 20:00:08 +03:00
let head_block = self . head_block . as_ref ( ) . unwrap ( ) ;
2023-05-13 01:15:32 +03:00
let head_block = head_block . borrow ( ) ;
let head_block = head_block . as_ref ( ) ;
state . serialize_field ( " head_block " , & head_block ) ? ;
}
2022-09-06 23:12:45 +03:00
2023-05-13 09:00:03 +03:00
state . serialize_field (
2023-06-13 21:51:19 +03:00
" external_requests " ,
& self . external_requests . load ( atomic ::Ordering ::Relaxed ) ,
) ? ;
state . serialize_field (
" internal_requests " ,
& self . internal_requests . load ( atomic ::Ordering ::Relaxed ) ,
2023-05-13 09:00:03 +03:00
) ? ;
state . serialize_field (
" active_requests " ,
2023-05-13 21:13:02 +03:00
& self . active_requests . load ( atomic ::Ordering ::Relaxed ) ,
2023-05-13 09:00:03 +03:00
) ? ;
2023-05-24 00:57:24 +03:00
state . serialize_field ( " head_latency_ms " , & self . head_latency . read ( ) . value ( ) ) ? ;
2023-05-13 21:13:02 +03:00
2023-02-15 23:33:43 +03:00
state . serialize_field (
2023-05-24 00:57:24 +03:00
" peak_latency_ms " ,
2023-05-23 01:43:39 +03:00
& self . peak_latency . as_ref ( ) . unwrap ( ) . latency ( ) . as_millis ( ) ,
2023-02-15 23:33:43 +03:00
) ? ;
2022-09-05 19:39:46 +03:00
2023-06-13 21:51:19 +03:00
{
let weighted_latency_ms = self . weighted_peak_ewma_seconds ( ) * 1000.0 ;
state . serialize_field ( " weighted_latency_ms " , weighted_latency_ms . as_ref ( ) ) ? ;
}
2023-05-13 21:13:02 +03:00
2022-08-10 08:56:09 +03:00
state . end ( )
}
}
2023-02-06 20:55:27 +03:00
impl fmt ::Debug for Web3Rpc {
2022-08-10 08:56:09 +03:00
fn fmt ( & self , f : & mut fmt ::Formatter < '_ > ) -> fmt ::Result {
2023-02-06 20:55:27 +03:00
let mut f = f . debug_struct ( " Web3Rpc " ) ;
2022-08-10 08:56:09 +03:00
2022-08-24 03:32:16 +03:00
f . field ( " name " , & self . name ) ;
2022-08-10 08:56:09 +03:00
2023-05-13 09:00:03 +03:00
let block_data_limit = self . block_data_limit . load ( atomic ::Ordering ::Acquire ) ;
2022-08-10 08:56:09 +03:00
if block_data_limit = = u64 ::MAX {
2022-09-07 06:54:16 +03:00
f . field ( " blocks " , & " all " ) ;
2022-08-10 08:56:09 +03:00
} else {
2022-09-07 06:54:16 +03:00
f . field ( " blocks " , & block_data_limit ) ;
2022-08-10 08:56:09 +03:00
}
f . finish_non_exhaustive ( )
}
}
2023-02-06 20:55:27 +03:00
impl fmt ::Display for Web3Rpc {
2022-08-10 08:56:09 +03:00
fn fmt ( & self , f : & mut fmt ::Formatter < '_ > ) -> fmt ::Result {
2022-08-24 03:32:16 +03:00
write! ( f , " {} " , & self . name )
2022-08-10 08:56:09 +03:00
}
}
2022-11-22 23:23:08 +03:00
mod tests {
2022-11-30 00:34:42 +03:00
#![ allow(unused_imports) ]
2022-11-22 23:23:08 +03:00
use super ::* ;
2023-05-24 00:40:34 +03:00
use ethers ::types ::{ Block , H256 , U256 } ;
2022-11-22 23:23:08 +03:00
#[ test ]
fn test_archive_node_has_block_data ( ) {
2023-01-26 08:24:09 +03:00
let now = chrono ::Utc ::now ( ) . timestamp ( ) . into ( ) ;
2022-12-05 04:10:20 +03:00
2022-12-03 08:31:03 +03:00
let random_block = Block {
hash : Some ( H256 ::random ( ) ) ,
number : Some ( 1_000_000. into ( ) ) ,
2022-12-05 04:10:20 +03:00
timestamp : now ,
2022-12-03 08:31:03 +03:00
.. Default ::default ( )
2022-11-22 23:23:08 +03:00
} ;
2022-12-03 08:31:03 +03:00
let random_block = Arc ::new ( random_block ) ;
2023-02-15 04:41:40 +03:00
let head_block = Web3ProxyBlock ::try_new ( random_block ) . unwrap ( ) ;
2022-11-22 23:44:23 +03:00
let block_data_limit = u64 ::MAX ;
2022-11-22 23:23:08 +03:00
2023-05-13 01:15:32 +03:00
let ( tx , _ ) = watch ::channel ( Some ( head_block . clone ( ) ) ) ;
2023-02-06 20:55:27 +03:00
let x = Web3Rpc {
2022-11-22 23:23:08 +03:00
name : " name " . to_string ( ) ,
soft_limit : 1_000 ,
2022-12-06 00:13:36 +03:00
automatic_block_limit : false ,
2023-01-19 13:13:00 +03:00
backup : false ,
2022-11-22 23:44:23 +03:00
block_data_limit : block_data_limit . into ( ) ,
2023-06-13 20:00:08 +03:00
head_block : Some ( tx ) ,
2023-02-06 20:55:27 +03:00
.. Default ::default ( )
2022-11-22 23:23:08 +03:00
} ;
assert! ( x . has_block_data ( & 0. into ( ) ) ) ;
assert! ( x . has_block_data ( & 1. into ( ) ) ) ;
2023-02-27 09:44:09 +03:00
assert! ( x . has_block_data ( head_block . number ( ) ) ) ;
2022-12-03 08:31:03 +03:00
assert! ( ! x . has_block_data ( & ( head_block . number ( ) + 1 ) ) ) ;
assert! ( ! x . has_block_data ( & ( head_block . number ( ) + 1000 ) ) ) ;
2022-11-22 23:23:08 +03:00
}
2022-11-22 23:44:23 +03:00
#[ test ]
fn test_pruned_node_has_block_data ( ) {
2023-01-26 08:24:09 +03:00
let now = chrono ::Utc ::now ( ) . timestamp ( ) . into ( ) ;
2022-12-05 04:10:20 +03:00
2023-02-14 23:14:50 +03:00
let head_block : Web3ProxyBlock = Arc ::new ( Block {
2022-12-03 08:31:03 +03:00
hash : Some ( H256 ::random ( ) ) ,
number : Some ( 1_000_000. into ( ) ) ,
2022-12-05 04:10:20 +03:00
timestamp : now ,
2022-12-03 08:31:03 +03:00
.. Default ::default ( )
} )
2023-02-15 04:41:40 +03:00
. try_into ( )
. unwrap ( ) ;
2022-11-22 23:44:23 +03:00
let block_data_limit = 64 ;
2023-05-13 01:15:32 +03:00
let ( tx , _rx ) = watch ::channel ( Some ( head_block . clone ( ) ) ) ;
2023-02-06 20:55:27 +03:00
let x = Web3Rpc {
2022-11-22 23:44:23 +03:00
name : " name " . to_string ( ) ,
soft_limit : 1_000 ,
2022-12-06 00:13:36 +03:00
automatic_block_limit : false ,
2023-01-19 13:13:00 +03:00
backup : false ,
2022-11-22 23:44:23 +03:00
block_data_limit : block_data_limit . into ( ) ,
2023-06-13 20:00:08 +03:00
head_block : Some ( tx ) ,
2023-02-06 20:55:27 +03:00
.. Default ::default ( )
2022-11-22 23:44:23 +03:00
} ;
assert! ( ! x . has_block_data ( & 0. into ( ) ) ) ;
assert! ( ! x . has_block_data ( & 1. into ( ) ) ) ;
2022-12-03 08:31:03 +03:00
assert! ( ! x . has_block_data ( & ( head_block . number ( ) - block_data_limit - 1 ) ) ) ;
assert! ( x . has_block_data ( & ( head_block . number ( ) - block_data_limit ) ) ) ;
2023-02-27 09:44:09 +03:00
assert! ( x . has_block_data ( head_block . number ( ) ) ) ;
2022-12-03 08:31:03 +03:00
assert! ( ! x . has_block_data ( & ( head_block . number ( ) + 1 ) ) ) ;
assert! ( ! x . has_block_data ( & ( head_block . number ( ) + 1000 ) ) ) ;
2022-11-22 23:44:23 +03:00
}
2022-12-05 04:10:20 +03:00
2023-01-19 14:05:39 +03:00
/*
// TODO: think about how to bring the concept of a "lagged" node back
2022-12-05 04:10:20 +03:00
#[ test ]
fn test_lagged_node_not_has_block_data ( ) {
2023-01-26 08:24:09 +03:00
let now = chrono ::Utc ::now ( ) . timestamp ( ) . into ( ) ;
2022-12-05 04:10:20 +03:00
// head block is an hour old
let head_block = Block {
hash : Some ( H256 ::random ( ) ) ,
number : Some ( 1_000_000. into ( ) ) ,
timestamp : now - 3600 ,
.. Default ::default ( )
} ;
let head_block = Arc ::new ( head_block ) ;
2023-01-26 08:24:09 +03:00
let head_block = Web3ProxyBlock ::new ( head_block ) ;
2022-12-05 04:10:20 +03:00
let block_data_limit = u64 ::MAX ;
let metrics = OpenRequestHandleMetrics ::default ( ) ;
2023-02-06 20:55:27 +03:00
let x = Web3Rpc {
2022-12-05 04:10:20 +03:00
name : " name " . to_string ( ) ,
2023-01-03 22:54:24 +03:00
db_conn : None ,
2022-12-05 04:10:20 +03:00
display_name : None ,
url : " ws://example.com " . to_string ( ) ,
http_client : None ,
active_requests : 0. into ( ) ,
frontend_requests : 0. into ( ) ,
internal_requests : 0. into ( ) ,
2022-12-06 00:13:36 +03:00
provider_state : AsyncRwLock ::new ( ProviderState ::None ) ,
2022-12-05 04:10:20 +03:00
hard_limit : None ,
soft_limit : 1_000 ,
2022-12-06 00:13:36 +03:00
automatic_block_limit : false ,
2023-01-19 13:13:00 +03:00
backup : false ,
2022-12-05 04:10:20 +03:00
block_data_limit : block_data_limit . into ( ) ,
2023-01-04 09:37:51 +03:00
tier : 0 ,
2022-12-05 04:10:20 +03:00
head_block : RwLock ::new ( Some ( head_block . clone ( ) ) ) ,
} ;
assert! ( ! x . has_block_data ( & 0. into ( ) ) ) ;
assert! ( ! x . has_block_data ( & 1. into ( ) ) ) ;
assert! ( ! x . has_block_data ( & head_block . number ( ) ) ) ;
assert! ( ! x . has_block_data ( & ( head_block . number ( ) + 1 ) ) ) ;
assert! ( ! x . has_block_data ( & ( head_block . number ( ) + 1000 ) ) ) ;
}
2023-01-19 14:05:39 +03:00
* /
2022-11-22 23:23:08 +03:00
}