diff --git a/config/development_polygon.toml b/config/development_polygon.toml index cbae4186..5f6d80e7 100644 --- a/config/development_polygon.toml +++ b/config/development_polygon.toml @@ -23,8 +23,6 @@ min_sum_soft_limit = 1_000 # only mark a block as the head block if the number of servers with it is great than or equal to min_synced_rpcs min_synced_rpcs = 1 -max_head_block_age = 30 - # redis is optional. it is used for rate limits set by `hard_limit` # TODO: how do we find the optimal redis_max_connections? too high actually ends up being slower volatile_redis_max_connections = 20 diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index a8b4225b..6c0cd669 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -196,10 +196,6 @@ impl Web3ProxyApp { ); } - if top_config.app.max_head_block_age.is_none() { - warn!("no max_head_block_age. stale data could be served!"); - } - if !top_config.extra.is_empty() { warn!( "unknown TopConfig fields!: {:?}", @@ -503,10 +499,12 @@ impl Web3ProxyApp { let ip_semaphores = Cache::new(max_users); let user_semaphores = Cache::new(max_users); + let chain_id = top_config.app.chain_id; + let (balanced_rpcs, balanced_handle, consensus_connections_watcher) = Web3Rpcs::spawn( + chain_id, db_conn.clone(), - top_config.app.max_head_block_age, - top_config.app.max_block_lag, + top_config.app.max_head_block_lag, top_config.app.min_synced_rpcs, top_config.app.min_sum_soft_limit, "balanced rpcs".to_string(), @@ -529,9 +527,9 @@ impl Web3ProxyApp { // TODO: Merge // let (private_rpcs, private_rpcs_handle) = Web3Rpcs::spawn( let (private_rpcs, private_handle, _) = Web3Rpcs::spawn( + chain_id, db_conn.clone(), - // private rpcs don't get subscriptions, so no need for max_head_block_age or max_block_lag - None, + // private rpcs don't get subscriptions, so no need for max_head_block_lag None, 0, 0, @@ -561,9 +559,9 @@ impl Web3ProxyApp { } else { // TODO: do something with the spawn handle let (bundler_4337_rpcs, bundler_4337_rpcs_handle, _) = Web3Rpcs::spawn( + chain_id, db_conn.clone(), - // bundler_4337_rpcs don't get subscriptions, so no need for max_head_block_age or max_block_lag - None, + // bundler_4337_rpcs don't get subscriptions, so no need for max_head_block_lag None, 0, 0, diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index 80d86aa6..155b8b8d 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -120,11 +120,8 @@ pub struct AppConfig { /// domain in sign-in-with-ethereum messages pub login_domain: Option, - /// do not serve any requests if the best known block is older than this many seconds. - pub max_head_block_age: Option, - /// do not serve any requests if the best known block is behind the best known block by more than this many blocks. - pub max_block_lag: Option, + pub max_head_block_lag: Option, /// Rate limit for bearer token authenticated entrypoints. /// This is separate from the rpc limits. @@ -233,6 +230,33 @@ fn default_response_cache_max_bytes() -> u64 { 10u64.pow(8) } +/// TODO: we can't query a provider because we need this to create a provider +pub fn average_block_interval(chain_id: u64) -> Duration { + match chain_id { + // ethereum + 1 => Duration::from_secs(12), + // ethereum-goerli + 5 => Duration::from_secs(12), + // binance + 56 => Duration::from_secs(3), + // polygon + 137 => Duration::from_secs(2), + // fantom + 250 => Duration::from_secs(1), + // arbitrum + 42161 => Duration::from_millis(500), + // anything else + _ => { + let default = 10; + warn!( + "unknown chain_id ({}). defaulting average_block_interval to {} seconds", + chain_id, default + ); + Duration::from_secs(default) + } + } +} + /// Configuration for a backend web3 RPC server #[derive(Clone, Debug, Default, Deserialize, PartialEq, Eq)] pub struct Web3RpcConfig { @@ -275,6 +299,7 @@ impl Web3RpcConfig { db_conn: Option, redis_pool: Option, chain_id: u64, + block_interval: Duration, http_client: Option, blocks_by_hash_cache: BlocksByHashCache, block_sender: Option>, @@ -284,31 +309,6 @@ impl Web3RpcConfig { warn!("unknown Web3RpcConfig fields!: {:?}", self.extra.keys()); } - // TODO: get this from config? a helper function? where does this belong? - let block_interval = match chain_id { - // ethereum - 1 => Duration::from_secs(12), - // ethereum-goerli - 5 => Duration::from_secs(12), - // binance - 56 => Duration::from_secs(3), - // polygon - 137 => Duration::from_secs(2), - // fantom - 250 => Duration::from_secs(1), - // arbitrum - 42161 => Duration::from_millis(500), - // anything else - _ => { - let default = 10; - warn!( - "unknown chain_id ({}). defaulting polling every {} seconds", - chain_id, default - ); - Duration::from_secs(default) - } - }; - Web3Rpc::spawn( self, name, diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index b6de5ebd..09b76c87 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -14,6 +14,7 @@ use serde::ser::SerializeStruct; use serde::Serialize; use serde_json::json; use std::hash::Hash; +use std::time::Duration; use std::{cmp::Ordering, fmt::Display, sync::Arc}; use tokio::sync::broadcast; @@ -89,23 +90,25 @@ impl Web3ProxyBlock { // no need to recalulate lag every time // if the head block gets too old, a health check restarts this connection // TODO: emit a stat for received_age - x.received_age = Some(x.age()); + x.received_age = Some(x.age().as_secs()); Some(x) } - pub fn age(&self) -> u64 { + pub fn age(&self) -> Duration { let now = chrono::Utc::now().timestamp(); let block_timestamp = self.block.timestamp.as_u32() as i64; - if block_timestamp < now { + let x = if block_timestamp < now { // this server is still syncing from too far away to serve requests // u64 is safe because we checked equality above (now - block_timestamp) as u64 } else { 0 - } + }; + + Duration::from_secs(x) } #[inline(always)] @@ -158,7 +161,7 @@ impl Display for Web3ProxyBlock { "{} ({}, {}s old)", self.number(), self.hash(), - self.age() + self.age().as_secs() ) } } @@ -402,7 +405,7 @@ impl Web3Rpcs { pending_tx_sender: Option>, ) -> Web3ProxyResult<()> { let mut connection_heads = - ConsensusFinder::new(self.max_head_block_age, self.max_block_lag); + ConsensusFinder::new(Some(self.max_head_block_age), Some(self.max_head_block_lag)); loop { match block_receiver.recv_async().await { diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index 1890b33c..abb5e769 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -18,6 +18,7 @@ use std::cmp::{Ordering, Reverse}; use std::collections::BTreeMap; use std::fmt; use std::sync::{atomic, Arc}; +use std::time::Duration; use tokio::time::Instant; #[derive(Clone, Serialize)] @@ -338,16 +339,16 @@ type FirstSeenCache = Cache; /// A ConsensusConnections builder that tracks all connection heads across multiple groups of servers pub struct ConsensusFinder { rpc_heads: HashMap, Web3ProxyBlock>, - /// never serve blocks that are too old - max_head_block_age: Option, + /// no consensus if the best known block is too old + max_head_block_age: Option, /// tier 0 will be prefered as long as the distance between it and the other tiers is <= max_tier_lag - max_block_lag: Option, + max_head_block_lag: Option, /// Block Hash -> First Seen Instant. used to track rpc.head_latency. The same cache should be shared between all ConnectionsGroups first_seen: FirstSeenCache, } impl ConsensusFinder { - pub fn new(max_head_block_age: Option, max_block_lag: Option) -> Self { + pub fn new(max_head_block_age: Option, max_head_block_lag: Option) -> Self { // TODO: what's a good capacity for this? it shouldn't need to be very large let first_seen = Cache::new(16); @@ -356,7 +357,7 @@ impl ConsensusFinder { Self { rpc_heads, max_head_block_age, - max_block_lag, + max_head_block_lag, first_seen, } } @@ -537,8 +538,8 @@ impl ConsensusFinder { trace!("lowest_block_number: {}", lowest_block.number()); // TODO: move this default. should be in config, not here - let max_lag_block_number = - highest_block_number.saturating_sub(self.max_block_lag.unwrap_or_else(|| U64::from(5))); + let max_lag_block_number = highest_block_number + .saturating_sub(self.max_head_block_lag.unwrap_or_else(|| U64::from(5))); trace!("max_lag_block_number: {}", max_lag_block_number); diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 043fb7dd..8ca269e3 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -4,7 +4,7 @@ use super::consensus::{ConsensusWeb3Rpcs, ShouldWaitForBlock}; use super::one::Web3Rpc; use super::request::{OpenRequestHandle, OpenRequestResult, RequestErrorHandler}; use crate::app::{flatten_handle, Web3ProxyApp, Web3ProxyJoinHandle}; -use crate::config::{BlockAndRpc, TxHashAndRpc, Web3RpcConfig}; +use crate::config::{average_block_interval, BlockAndRpc, TxHashAndRpc, Web3RpcConfig}; use crate::errors::{Web3ProxyError, Web3ProxyResult}; use crate::frontend::authorization::{Authorization, RequestMetadata}; use crate::frontend::rpc_proxy_ws::ProxyMode; @@ -66,18 +66,19 @@ pub struct Web3Rpcs { /// the soft limit required to agree on consensus for the head block. (thundering herd protection) pub(super) min_sum_soft_limit: u32, /// how far behind the highest known block height we can be before we stop serving requests - pub(super) max_block_lag: Option, + pub(super) max_head_block_lag: U64, /// how old our consensus head block we can be before we stop serving requests - pub(super) max_head_block_age: Option, + /// calculated based on max_head_block_lag and averge block times + pub(super) max_head_block_age: Duration, } impl Web3Rpcs { /// Spawn durable connections to multiple Web3 providers. #[allow(clippy::too_many_arguments)] pub async fn spawn( + chain_id: u64, db_conn: Option, - max_head_block_age: Option, - max_block_lag: Option, + max_head_block_lag: Option, min_head_rpcs: usize, min_sum_soft_limit: u32, name: String, @@ -115,13 +116,20 @@ impl Web3Rpcs { // by_name starts empty. self.apply_server_configs will add to it let by_name = Default::default(); + let max_head_block_lag = max_head_block_lag.unwrap_or(5.into()); + + let max_head_block_age = Duration::from_secs_f32( + (max_head_block_lag.as_u64() * 10) as f32 + * average_block_interval(chain_id).as_secs_f32(), + ); + let connections = Arc::new(Self { block_sender, blocks_by_hash, blocks_by_number, by_name, max_head_block_age, - max_block_lag, + max_head_block_lag, min_synced_rpcs: min_head_rpcs, min_sum_soft_limit, name, @@ -173,6 +181,10 @@ impl Web3Rpcs { }); } + let chain_id = app.config.chain_id; + + let block_interval = average_block_interval(chain_id); + // turn configs into connections (in parallel) let mut spawn_handles: FuturesUnordered<_> = rpc_configs .into_iter() @@ -194,7 +206,6 @@ impl Web3Rpcs { let pending_tx_id_sender = Some(self.pending_tx_id_sender.clone()); let blocks_by_hash_cache = self.blocks_by_hash.clone(); - let chain_id = app.config.chain_id; debug!("spawning {}", server_name); @@ -203,6 +214,7 @@ impl Web3Rpcs { db_conn, vredis_pool, chain_id, + block_interval, http_client, blocks_by_hash_cache, block_sender, @@ -1527,9 +1539,9 @@ mod tests { .time_to_live(Duration::from_secs(60)) .build(), // TODO: test max_head_block_age? - max_head_block_age: None, - // TODO: test max_block_lag? - max_block_lag: None, + max_head_block_age: Duration::from_secs(60), + // TODO: test max_head_block_lag? + max_head_block_lag: 5.into(), min_synced_rpcs: 1, min_sum_soft_limit: 1, }; @@ -1808,8 +1820,8 @@ mod tests { .build(), min_synced_rpcs: 1, min_sum_soft_limit: 4_000, - max_head_block_age: None, - max_block_lag: None, + max_head_block_age: Duration::from_secs(60), + max_head_block_lag: 5.into(), }; let authorization = Arc::new(Authorization::internal(None).unwrap()); @@ -1988,8 +2000,8 @@ mod tests { blocks_by_number: Cache::new(10_000), min_synced_rpcs: 1, min_sum_soft_limit: 1_000, - max_head_block_age: None, - max_block_lag: None, + max_head_block_age: Duration::from_secs(60), + max_head_block_lag: 5.into(), }; let authorization = Arc::new(Authorization::internal(None).unwrap());