diff --git a/Cargo.lock b/Cargo.lock index 89548b54..48d61556 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -282,13 +282,13 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" [[package]] name = "axum" -version = "0.6.5" +version = "0.6.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3114e77b361ec716aa429ae5c04243abe00cf7548e870b9370affcc5c491a7d0" +checksum = "4e246206a63c9830e118d12c894f56a82033da1a2361f5544deeee3df85c99d9" dependencies = [ "async-trait", "axum-core", - "base64 0.20.0", + "base64 0.21.0", "bitflags", "bytes", "futures-util", @@ -347,9 +347,9 @@ dependencies = [ [[package]] name = "axum-macros" -version = "0.3.3" +version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cb6bee4e05a5e0a5a67515ab24978efa7a80575a7a41a9fae35bb27fed6645d2" +checksum = "5fbf955307ff8addb48d2399393c9e2740dd491537ec562b66ab364fc4a38841" dependencies = [ "heck 0.4.0", "proc-macro2", @@ -419,12 +419,6 @@ version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8" -[[package]] -name = "base64" -version = "0.20.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ea22880d78093b0cbe17c89f64a7d457941e65759157ec6cb31a31d652b05e5" - [[package]] name = "base64" version = "0.21.0" @@ -3093,9 +3087,9 @@ dependencies = [ [[package]] name = "once_cell" -version = "1.17.0" +version = "1.17.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6f61fba1741ea2b3d6a1e3178721804bb716a68a6aeba1149b5d52e3d464ea66" +checksum = "b7e5500299e16ebb147ae15a00a942af264cf3688f47923b8fc2cd5858f23ad3" [[package]] name = "opaque-debug" @@ -4520,9 +4514,9 @@ dependencies = [ [[package]] name = "serde_prometheus" -version = "0.2.0" +version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1bfb6048d9e4ebc41f7d1a42c79b04c5b460633be307620a0e34a8f81970ea47" +checksum = "9c1a4ca38f4e746460d1dbd3711b8ca8ae314d1b21247edeff61dd20325b5a6f" dependencies = [ "heapless", "nom", @@ -5793,6 +5787,7 @@ dependencies = [ "notify", "num", "num-traits", + "once_cell", "pagerduty-rs", "parking_lot 0.12.1", "prettytable", diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index e4a9b93c..7d8ab888 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -27,9 +27,9 @@ thread-fast-rng = { path = "../thread-fast-rng" } anyhow = { version = "1.0.69", features = ["backtrace"] } argh = "0.1.10" -axum = { version = "0.6.5", features = ["headers", "ws"] } +axum = { version = "0.6.6", features = ["headers", "ws"] } axum-client-ip = "0.4.0" -axum-macros = "0.3.3" +axum-macros = "0.3.4" chrono = "0.4.23" counter = "0.5.7" derive_more = "0.99.17" @@ -52,6 +52,7 @@ moka = { version = "0.10.0", default-features = false, features = ["future"] } notify = "5.1.0" num = "0.4.0" num-traits = "0.2.15" +once_cell = { version = "1.17.1" } pagerduty-rs = { version = "0.1.6", default-features = false, features = ["async", "rustls", "sync"] } parking_lot = { version = "0.12.1", features = ["arc_lock"] } prettytable = "*" @@ -62,7 +63,7 @@ rustc-hash = "1.1.0" sentry = { version = "0.29.3", default-features = false, features = ["backtrace", "contexts", "panic", "anyhow", "reqwest", "rustls", "log", "sentry-log"] } serde = { version = "1.0.152", features = [] } serde_json = { version = "1.0.93", default-features = false, features = ["alloc", "raw_value"] } -serde_prometheus = "0.2.0" +serde_prometheus = "0.2.1" siwe = "0.5.0" time = "0.3.17" tokio = { version = "1.25.0", features = ["full"] } diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 91e9c95d..68ef6bb4 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -10,7 +10,7 @@ use crate::frontend::rpc_proxy_ws::ProxyMode; use crate::jsonrpc::{ JsonRpcForwardedResponse, JsonRpcForwardedResponseEnum, JsonRpcRequest, JsonRpcRequestEnum, }; -use crate::rpcs::blockchain::{ArcBlock, SavedBlock}; +use crate::rpcs::blockchain::{BlockHashesCache, Web3ProxyBlock}; use crate::rpcs::many::Web3Rpcs; use crate::rpcs::one::Web3Rpc; use crate::rpcs::transactions::TxStatus; @@ -23,7 +23,7 @@ use derive_more::From; use entities::sea_orm_active_enums::LogLevel; use entities::user; use ethers::core::utils::keccak256; -use ethers::prelude::{Address, Block, Bytes, Transaction, TxHash, H256, U64}; +use ethers::prelude::{Address, Bytes, Transaction, TxHash, H256, U64}; use ethers::types::U256; use ethers::utils::rlp::{Decodable, Rlp}; use futures::future::join_all; @@ -69,9 +69,9 @@ pub static REQUEST_PERIOD: u64 = 60; #[derive(From)] struct ResponseCacheKey { // if none, this is cached until evicted - from_block: Option, + from_block: Option, // to_block is only set when ranges of blocks are requested (like with eth_getLogs) - to_block: Option, + to_block: Option, method: String, // TODO: better type for this params: Option, @@ -204,7 +204,7 @@ pub struct Web3ProxyApp { response_cache: ResponseCache, // don't drop this or the sender will stop working // TODO: broadcast channel instead? - watch_consensus_head_receiver: watch::Receiver, + watch_consensus_head_receiver: watch::Receiver, pending_tx_sender: broadcast::Sender, pub config: AppConfig, pub db_conn: Option, @@ -542,7 +542,7 @@ impl Web3ProxyApp { // TODO: i don't like doing Block::default here! Change this to "None"? let (watch_consensus_head_sender, watch_consensus_head_receiver) = - watch::channel(Arc::new(Block::default())); + watch::channel(Web3ProxyBlock::default()); // TODO: will one receiver lagging be okay? how big should this be? let (pending_tx_sender, pending_tx_receiver) = broadcast::channel(256); @@ -563,11 +563,11 @@ impl Web3ProxyApp { // TODO: limits from config // these blocks don't have full transactions, but they do have rather variable amounts of transaction hashes // TODO: how can we do the weigher better? - let block_map = Cache::builder() + let block_map: BlockHashesCache = Cache::builder() .max_capacity(1024 * 1024 * 1024) - .weigher(|_k, v: &ArcBlock| { + .weigher(|_k, v: &Web3ProxyBlock| { // TODO: is this good enough? - 1 + v.transactions.len().try_into().unwrap_or(u32::MAX) + 1 + v.block.transactions.len().try_into().unwrap_or(u32::MAX) }) .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()); @@ -577,6 +577,8 @@ impl Web3ProxyApp { top_config.app.chain_id, db_conn.clone(), http_client.clone(), + top_config.app.max_block_age, + top_config.app.max_block_lag, top_config.app.min_synced_rpcs, top_config.app.min_sum_soft_limit, pending_transactions.clone(), @@ -603,6 +605,9 @@ impl Web3ProxyApp { top_config.app.chain_id, db_conn.clone(), http_client.clone(), + // private rpcs don't get subscriptions, so no need for max_block_age or max_block_lag + None, + None, 0, 0, pending_transactions.clone(), @@ -735,7 +740,7 @@ impl Web3ProxyApp { Ok((app, cancellable_handles, important_background_handles).into()) } - pub fn head_block_receiver(&self) -> watch::Receiver { + pub fn head_block_receiver(&self) -> watch::Receiver { self.watch_consensus_head_receiver.clone() } @@ -1481,7 +1486,7 @@ impl Web3ProxyApp { .await?; Some(ResponseCacheKey { - from_block: Some(SavedBlock::new(request_block)), + from_block: Some(request_block), to_block: None, method: method.to_string(), // TODO: hash here? @@ -1521,8 +1526,8 @@ impl Web3ProxyApp { .await?; Some(ResponseCacheKey { - from_block: Some(SavedBlock::new(from_block)), - to_block: Some(SavedBlock::new(to_block)), + from_block: Some(from_block), + to_block: Some(to_block), method: method.to_string(), // TODO: hash here? params: request.params.clone(), @@ -1537,8 +1542,8 @@ impl Web3ProxyApp { let authorization = authorization.clone(); if let Some(cache_key) = cache_key { - let from_block_num = cache_key.from_block.as_ref().map(|x| x.number()); - let to_block_num = cache_key.to_block.as_ref().map(|x| x.number()); + let from_block_num = cache_key.from_block.as_ref().map(|x| *x.number()); + let to_block_num = cache_key.to_block.as_ref().map(|x| *x.number()); self.response_cache .try_get_with(cache_key, async move { diff --git a/web3_proxy/src/app/ws.rs b/web3_proxy/src/app/ws.rs index 582ea814..e61db2c2 100644 --- a/web3_proxy/src/app/ws.rs +++ b/web3_proxy/src/app/ws.rs @@ -72,7 +72,7 @@ impl Web3ProxyApp { "params": { "subscription": subscription_id, // TODO: option to include full transaction objects instead of just the hashes? - "result": new_head.as_ref(), + "result": new_head.block, }, }); diff --git a/web3_proxy/src/block_number.rs b/web3_proxy/src/block_number.rs index da708286..4b92d1e7 100644 --- a/web3_proxy/src/block_number.rs +++ b/web3_proxy/src/block_number.rs @@ -80,12 +80,7 @@ pub async fn clean_block_number( .context("fetching block number from hash")?; // TODO: set change to true? i think not we should probably use hashes for everything. - ( - block - .number - .expect("blocks here should always have numbers"), - false, - ) + (*block.number(), false) } else { return Err(anyhow::anyhow!("blockHash missing")); } diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index 2bec1bd0..54456bb4 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -1,9 +1,9 @@ -use crate::rpcs::blockchain::BlockHashesCache; +use crate::app::AnyhowJoinHandle; +use crate::rpcs::blockchain::{BlockHashesCache, Web3ProxyBlock}; use crate::rpcs::one::Web3Rpc; -use crate::{app::AnyhowJoinHandle, rpcs::blockchain::ArcBlock}; use argh::FromArgs; use ethers::prelude::TxHash; -use ethers::types::U256; +use ethers::types::{U256, U64}; use hashbrown::HashMap; use log::warn; use migration::sea_orm::DatabaseConnection; @@ -11,7 +11,7 @@ use serde::Deserialize; use std::sync::Arc; use tokio::sync::broadcast; -pub type BlockAndRpc = (Option, Arc); +pub type BlockAndRpc = (Option, Arc); pub type TxHashAndRpc = (TxHash, Arc); #[derive(Debug, FromArgs)] @@ -105,6 +105,12 @@ pub struct AppConfig { pub invite_code: Option, pub login_domain: Option, + /// do not serve any requests if the best known block is older than this many seconds. + pub max_block_age: Option, + + /// do not serve any requests if the best known block is behind the best known block by more than this many blocks. + pub max_block_lag: Option, + /// Rate limit for bearer token authenticated entrypoints. /// This is separate from the rpc limits. #[serde(default = "default_bearer_token_max_concurrent_requests")] diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index b70663f1..b6bfd01e 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -1,16 +1,14 @@ +use super::consensus::ConsensusFinder; use super::many::Web3Rpcs; ///! Keep track of the blockchain as seen by a Web3Rpcs. use super::one::Web3Rpc; use super::transactions::TxStatus; use crate::frontend::authorization::Authorization; -use crate::{ - config::BlockAndRpc, jsonrpc::JsonRpcRequest, rpcs::synced_connections::ConsensusWeb3Rpcs, -}; +use crate::{config::BlockAndRpc, jsonrpc::JsonRpcRequest}; use anyhow::Context; use derive_more::From; use ethers::prelude::{Block, TxHash, H256, U64}; -use hashbrown::{HashMap, HashSet}; -use log::{debug, error, warn, Level}; +use log::{debug, error, trace, warn, Level}; use moka::future::Cache; use serde::Serialize; use serde_json::json; @@ -22,17 +20,18 @@ use tokio::time::Duration; // TODO: type for Hydrated Blocks with their full transactions? pub type ArcBlock = Arc>; -pub type BlockHashesCache = Cache; +pub type BlockHashesCache = Cache; /// A block and its age. #[derive(Clone, Debug, Default, From, Serialize)] -pub struct SavedBlock { +pub struct Web3ProxyBlock { pub block: ArcBlock, /// number of seconds this block was behind the current time when received - pub age: u64, + /// this is only set if the block is from a subscription + pub received_age: Option, } -impl PartialEq for SavedBlock { +impl PartialEq for Web3ProxyBlock { fn eq(&self, other: &Self) -> bool { match (self.block.hash, other.block.hash) { (None, None) => true, @@ -43,18 +42,23 @@ impl PartialEq for SavedBlock { } } -impl SavedBlock { +impl Web3ProxyBlock { + /// A new block has arrived over a subscription pub fn new(block: ArcBlock) -> Self { - let mut x = Self { block, age: 0 }; + let mut x = Self { + block, + received_age: None, + }; // no need to recalulate lag every time // if the head block gets too old, a health check restarts this connection - x.age = x.lag(); + // TODO: emit a stat for received_age + x.received_age = Some(x.age()); x } - pub fn lag(&self) -> u64 { + pub fn age(&self) -> u64 { let now = SystemTime::now() .duration_since(UNIX_EPOCH) .expect("there should always be time"); @@ -70,37 +74,58 @@ impl SavedBlock { } } - pub fn hash(&self) -> H256 { - self.block.hash.expect("saved blocks must have a hash") + #[inline(always)] + pub fn parent_hash(&self) -> &H256 { + &self.block.parent_hash } - // TODO: return as U64 or u64? - pub fn number(&self) -> U64 { - self.block.number.expect("saved blocks must have a number") + #[inline(always)] + pub fn hash(&self) -> &H256 { + self.block + .hash + .as_ref() + .expect("saved blocks must have a hash") + } + + #[inline(always)] + pub fn number(&self) -> &U64 { + self.block + .number + .as_ref() + .expect("saved blocks must have a number") } } -impl From for SavedBlock { +impl From for Web3ProxyBlock { fn from(x: ArcBlock) -> Self { - SavedBlock::new(x) + Web3ProxyBlock { + block: x, + received_age: None, + } } } -impl Display for SavedBlock { +impl Display for Web3ProxyBlock { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{} ({}, {}s old)", self.number(), self.hash(), self.age) + write!( + f, + "{} ({}, {}s old)", + self.number(), + self.hash(), + self.age() + ) } } impl Web3Rpcs { /// add a block to our mappings and track the heaviest chain - pub async fn save_block( + pub async fn try_cache_block( &self, - block: ArcBlock, + block: Web3ProxyBlock, heaviest_chain: bool, - ) -> anyhow::Result { + ) -> anyhow::Result { // TODO: i think we can rearrange this function to make it faster on the hot path - let block_hash = block.hash.as_ref().context("no block hash")?; + let block_hash = block.hash(); // skip Block::default() if block_hash.is_zero() { @@ -108,7 +133,7 @@ impl Web3Rpcs { return Ok(block); } - let block_num = block.number.as_ref().context("no block num")?; + let block_num = block.number(); // TODO: think more about heaviest_chain. would be better to do the check inside this function if heaviest_chain { @@ -136,7 +161,7 @@ impl Web3Rpcs { authorization: &Arc, hash: &H256, rpc: Option<&Arc>, - ) -> anyhow::Result { + ) -> anyhow::Result { // first, try to get the hash from our cache // the cache is set last, so if its here, its everywhere // TODO: use try_get_with @@ -147,17 +172,18 @@ impl Web3Rpcs { // block not in cache. we need to ask an rpc for it let get_block_params = (*hash, false); // TODO: if error, retry? - let block: ArcBlock = match rpc { + let block: Web3ProxyBlock = match rpc { Some(rpc) => rpc .wait_for_request_handle(authorization, Some(Duration::from_secs(30)), None) .await? - .request::<_, Option<_>>( + .request::<_, Option>( "eth_getBlockByHash", &json!(get_block_params), Level::Error.into(), None, ) .await? + .map(Into::into) .context("no block!")?, None => { // TODO: helper for method+params => JsonRpcRequest @@ -181,13 +207,14 @@ impl Web3Rpcs { let block: Option = serde_json::from_str(block.get())?; - block.context("no block!")? + // TODO: from isn't great here. received time is going to be weird + block.map(Into::into).context("no block!")? } }; // the block was fetched using eth_getBlockByHash, so it should have all fields // TODO: fill in heaviest_chain! if the block is old enough, is this definitely true? - let block = self.save_block(block, false).await?; + let block = self.try_cache_block(block, false).await?; Ok(block) } @@ -200,7 +227,7 @@ impl Web3Rpcs { ) -> anyhow::Result<(H256, u64)> { let (block, block_depth) = self.cannonical_block(authorization, num).await?; - let hash = block.hash.expect("Saved blocks should always have hashes"); + let hash = *block.hash(); Ok((hash, block_depth)) } @@ -211,7 +238,7 @@ impl Web3Rpcs { &self, authorization: &Arc, num: &U64, - ) -> anyhow::Result<(ArcBlock, u64)> { + ) -> anyhow::Result<(Web3ProxyBlock, u64)> { // we only have blocks by hash now // maybe save them during save_block in a blocks_by_number Cache> // if theres multiple, use petgraph to find the one on the main chain (and remove the others if they have enough confirmations) @@ -223,28 +250,21 @@ impl Web3Rpcs { .clone(); // be sure the requested block num exists - let mut head_block_num = consensus_head_receiver.borrow_and_update().number; + // TODO: is this okay? what if we aren't synced?! + let mut head_block_num = *consensus_head_receiver.borrow_and_update().number(); loop { - if let Some(head_block_num) = head_block_num { - if num <= &head_block_num { - break; - } + if num <= &head_block_num { + break; } + trace!("waiting for future block {} > {}", num, head_block_num); consensus_head_receiver.changed().await?; - head_block_num = consensus_head_receiver.borrow_and_update().number; + head_block_num = *consensus_head_receiver.borrow_and_update().number(); } - let head_block_num = - head_block_num.expect("we should only get here if we have a head block"); - - let block_depth = if num >= &head_block_num { - 0 - } else { - (head_block_num - num).as_u64() - }; + let block_depth = (head_block_num - num).as_u64(); // try to get the hash from our cache // deref to not keep the lock open @@ -276,8 +296,10 @@ impl Web3Rpcs { let block: ArcBlock = serde_json::from_str(raw_block.get())?; + let block = Web3ProxyBlock::from(block); + // the block was fetched using eth_getBlockByNumber, so it should have all fields and be on the heaviest chain - let block = self.save_block(block, true).await?; + let block = self.try_cache_block(block, true).await?; Ok((block, block_depth)) } @@ -288,18 +310,16 @@ impl Web3Rpcs { block_receiver: flume::Receiver, // TODO: document that this is a watch sender and not a broadcast! if things get busy, blocks might get missed // Geth's subscriptions have the same potential for skipping blocks. - head_block_sender: watch::Sender, + head_block_sender: watch::Sender, pending_tx_sender: Option>, ) -> anyhow::Result<()> { // TODO: indexmap or hashmap? what hasher? with_capacity? // TODO: this will grow unbounded. prune old heads on this at the same time we prune the graph? - let mut connection_heads = ConsensusFinder::default(); + let mut connection_heads = ConsensusFinder::new(self.max_block_age, self.max_block_lag); loop { match block_receiver.recv_async().await { Ok((new_block, rpc)) => { - let new_block = new_block.map(Into::into); - let rpc_name = rpc.name.clone(); if let Err(err) = self @@ -313,7 +333,7 @@ impl Web3Rpcs { ) .await { - warn!("unable to process block from rpc {}: {:?}", rpc_name, err); + warn!("unable to process block from rpc {}: {:#?}", rpc_name, err); } } Err(err) => { @@ -331,60 +351,72 @@ impl Web3Rpcs { &self, authorization: &Arc, consensus_finder: &mut ConsensusFinder, - rpc_head_block: Option, + rpc_head_block: Option, rpc: Arc, - head_block_sender: &watch::Sender, + head_block_sender: &watch::Sender, pending_tx_sender: &Option>, ) -> anyhow::Result<()> { // TODO: how should we handle an error here? if !consensus_finder .update_rpc(rpc_head_block.clone(), rpc.clone(), self) - .await? + .await + .context("failed to update rpc")? { - // nothing changed. no need + // nothing changed. no need to scan for a new consensus head return Ok(()); } let new_synced_connections = consensus_finder .best_consensus_connections(authorization, self) - .await; + .await + .context("no consensus head block!") + .map_err(|err| { + self.watch_consensus_rpcs_sender + .send_replace(Arc::new(Default::default())); + + err + })?; // TODO: what should we do if the block number of new_synced_connections is < old_synced_connections? wait? - let includes_backups = new_synced_connections.includes_backups; + let backups_needed = new_synced_connections.backups_needed; let consensus_head_block = new_synced_connections.head_block.clone(); let num_consensus_rpcs = new_synced_connections.num_conns(); - let num_checked_rpcs = new_synced_connections.num_checked_conns; - let num_active_rpcs = consensus_finder.all.rpc_name_to_hash.len(); + let num_checked_rpcs = 0; // TODO: figure this out + let num_active_rpcs = consensus_finder + .all_rpcs_group() + .map(|x| x.len()) + .unwrap_or_default(); let total_rpcs = self.conns.len(); let old_consensus_head_connections = self - .watch_consensus_connections_sender + .watch_consensus_rpcs_sender .send_replace(Arc::new(new_synced_connections)); - let includes_backups_str = if includes_backups { "B " } else { "" }; + let backups_voted_str = if backups_needed { "B " } else { "" }; - if let Some(consensus_saved_block) = consensus_head_block { + if let Some(consensus_head_block) = consensus_head_block { match &old_consensus_head_connections.head_block { None => { debug!( "first {}{}/{}/{}/{} block={}, rpc={}", - includes_backups_str, + backups_voted_str, num_consensus_rpcs, num_checked_rpcs, num_active_rpcs, total_rpcs, - consensus_saved_block, + consensus_head_block, rpc, ); - if includes_backups { + if backups_needed { // TODO: what else should be in this error? warn!("Backup RPCs are in use!"); } + // this should already be cached let consensus_head_block = - self.save_block(consensus_saved_block.block, true).await?; + self.try_cache_block(consensus_head_block, true).await?; head_block_sender .send(consensus_head_block) @@ -396,46 +428,45 @@ impl Web3Rpcs { .map(|x| x.to_string()) .unwrap_or_else(|| "None".to_string()); - match consensus_saved_block.number().cmp(&old_head_block.number()) { + match consensus_head_block.number().cmp(&old_head_block.number()) { Ordering::Equal => { // multiple blocks with the same fork! - if consensus_saved_block.hash() == old_head_block.hash() { + if consensus_head_block.hash() == old_head_block.hash() { // no change in hash. no need to use head_block_sender // TODO: trace level if rpc is backup debug!( "con {}{}/{}/{}/{} con={} rpc={}@{}", - includes_backups_str, + backups_voted_str, num_consensus_rpcs, num_checked_rpcs, num_active_rpcs, total_rpcs, - consensus_saved_block, + consensus_head_block, rpc, rpc_head_str, ) } else { // hash changed - - if includes_backups { + if backups_needed { // TODO: what else should be in this error? warn!("Backup RPCs are in use!"); } debug!( "unc {}{}/{}/{}/{} con_head={} old={} rpc={}@{}", - includes_backups_str, + backups_voted_str, num_consensus_rpcs, num_checked_rpcs, num_active_rpcs, total_rpcs, - consensus_saved_block, + consensus_head_block, old_head_block, rpc, rpc_head_str, ); let consensus_head_block = self - .save_block(consensus_saved_block.block, true) + .try_cache_block(consensus_head_block, true) .await .context("save consensus_head_block as heaviest chain")?; @@ -449,25 +480,25 @@ impl Web3Rpcs { // TODO: better log warn!( "chain rolled back {}{}/{}/{}/{} con={} old={} rpc={}@{}", - includes_backups_str, + backups_voted_str, num_consensus_rpcs, num_checked_rpcs, num_active_rpcs, total_rpcs, - consensus_saved_block, + consensus_head_block, old_head_block, rpc, rpc_head_str, ); - if includes_backups { + if backups_needed { // TODO: what else should be in this error? warn!("Backup RPCs are in use!"); } // TODO: tell save_block to remove any higher block numbers from the cache. not needed because we have other checks on requested blocks being > head, but still seems like a good idea let consensus_head_block = self - .save_block(consensus_saved_block.block, true) + .try_cache_block(consensus_head_block, true) .await .context( "save_block sending consensus_head_block as heaviest chain", @@ -480,23 +511,23 @@ impl Web3Rpcs { Ordering::Greater => { debug!( "new {}{}/{}/{}/{} con={} rpc={}@{}", - includes_backups_str, + backups_voted_str, num_consensus_rpcs, num_checked_rpcs, num_active_rpcs, total_rpcs, - consensus_saved_block, + consensus_head_block, rpc, rpc_head_str, ); - if includes_backups { + if backups_needed { // TODO: what else should be in this error? warn!("Backup RPCs are in use!"); } let consensus_head_block = - self.save_block(consensus_saved_block.block, true).await?; + self.try_cache_block(consensus_head_block, true).await?; head_block_sender.send(consensus_head_block)?; } @@ -512,7 +543,7 @@ impl Web3Rpcs { if num_checked_rpcs >= self.min_head_rpcs { error!( "non {}{}/{}/{}/{} rpc={}@{}", - includes_backups_str, + backups_voted_str, num_consensus_rpcs, num_checked_rpcs, num_active_rpcs, @@ -523,7 +554,7 @@ impl Web3Rpcs { } else { debug!( "non {}{}/{}/{}/{} rpc={}@{}", - includes_backups_str, + backups_voted_str, num_consensus_rpcs, num_checked_rpcs, num_active_rpcs, @@ -537,403 +568,3 @@ impl Web3Rpcs { Ok(()) } } - -struct ConnectionsGroup { - /// TODO: this group might not actually include backups, but they were at leastchecked - includes_backups: bool, - rpc_name_to_hash: HashMap, -} - -impl ConnectionsGroup { - fn new(with_backups: bool) -> Self { - Self { - includes_backups: with_backups, - rpc_name_to_hash: Default::default(), - } - } - - fn without_backups() -> Self { - Self::new(false) - } - - fn with_backups() -> Self { - Self::new(true) - } - - fn remove(&mut self, rpc: &Web3Rpc) -> Option { - self.rpc_name_to_hash.remove(rpc.name.as_str()) - } - - fn insert(&mut self, rpc: &Web3Rpc, block_hash: H256) -> Option { - self.rpc_name_to_hash.insert(rpc.name.clone(), block_hash) - } - - // TODO: i don't love having this here. move to web3_connections? - async fn get_block_from_rpc( - &self, - rpc_name: &str, - hash: &H256, - authorization: &Arc, - web3_rpcs: &Web3Rpcs, - ) -> anyhow::Result { - // // TODO: why does this happen?!?! seems to only happen with uncled blocks - // // TODO: maybe we should do try_get_with? - // // TODO: maybe we should just continue. this only seems to happen when an older block is received - // warn!( - // "Missing connection_head_block in block_hashes. Fetching now. hash={}. other={}", - // connection_head_hash, conn_name - // ); - - // this option should almost always be populated. if the connection reconnects at a bad time it might not be available though - // TODO: if this is None, I think we should error. - let rpc = web3_rpcs.conns.get(rpc_name); - - web3_rpcs.block(authorization, hash, rpc).await - } - - // TODO: do this during insert/remove? - pub(self) async fn highest_block( - &self, - authorization: &Arc, - web3_rpcs: &Web3Rpcs, - ) -> Option { - let mut checked_heads = HashSet::with_capacity(self.rpc_name_to_hash.len()); - let mut highest_block = None::; - - for (rpc_name, rpc_head_hash) in self.rpc_name_to_hash.iter() { - // don't waste time checking the same hash multiple times - if checked_heads.contains(rpc_head_hash) { - continue; - } - - let rpc_block = match self - .get_block_from_rpc(rpc_name, rpc_head_hash, authorization, web3_rpcs) - .await - { - Ok(x) => x, - Err(err) => { - warn!( - "failed getting block {} from {} while finding highest block number: {:?}", - rpc_head_hash, rpc_name, err, - ); - continue; - } - }; - - checked_heads.insert(rpc_head_hash); - - // if this is the first block we've tried - // or if this rpc's newest block has a higher number - // we used to check total difficulty, but that isn't a thing anymore on ETH - // TODO: we still need total difficulty on some other PoW chains. whats annoying is it isn't considered part of the "block header" just the block. so websockets don't return it - let highest_num = highest_block - .as_ref() - .map(|x| x.number.expect("blocks here should always have a number")); - let rpc_num = rpc_block.as_ref().number; - - if rpc_num > highest_num { - highest_block = Some(rpc_block); - } - } - - highest_block - } - - pub(self) async fn consensus_head_connections( - &self, - authorization: &Arc, - web3_rpcs: &Web3Rpcs, - ) -> anyhow::Result { - let mut maybe_head_block = match self.highest_block(authorization, web3_rpcs).await { - None => return Err(anyhow::anyhow!("No blocks known")), - Some(x) => x, - }; - - let num_known = self.rpc_name_to_hash.len(); - - // track rpcs on this heaviest chain so we can build a new ConsensusConnections - let mut highest_rpcs = HashSet::<&str>::new(); - // a running total of the soft limits covered by the rpcs that agree on the head block - let mut highest_rpcs_sum_soft_limit: u32 = 0; - // TODO: also track highest_rpcs_sum_hard_limit? llama doesn't need this, so it can wait - - // check the highest work block for a set of rpcs that can serve our request load - // if it doesn't have enough rpcs for our request load, check the parent block - // TODO: loop for how many parent blocks? we don't want to serve blocks that are too far behind. probably different per chain - // TODO: this loop is pretty long. any way to clean up this code? - for _ in 0..6 { - let maybe_head_hash = maybe_head_block - .hash - .as_ref() - .expect("blocks here always need hashes"); - - // find all rpcs with maybe_head_block as their current head - for (rpc_name, rpc_head_hash) in self.rpc_name_to_hash.iter() { - if rpc_head_hash != maybe_head_hash { - // connection is not on the desired block - continue; - } - if highest_rpcs.contains(rpc_name.as_str()) { - // connection is on a child block - continue; - } - - if let Some(rpc) = web3_rpcs.conns.get(rpc_name.as_str()) { - highest_rpcs.insert(rpc_name); - highest_rpcs_sum_soft_limit += rpc.soft_limit; - } else { - // i don't think this is an error. i think its just if a reconnect is currently happening - warn!("connection missing: {}", rpc_name); - debug!("web3_rpcs.conns: {:#?}", web3_rpcs.conns); - } - } - - if highest_rpcs_sum_soft_limit >= web3_rpcs.min_sum_soft_limit - && highest_rpcs.len() >= web3_rpcs.min_head_rpcs - { - // we have enough servers with enough requests - break; - } - - // not enough rpcs yet. check the parent block - if let Some(parent_block) = web3_rpcs.block_hashes.get(&maybe_head_block.parent_hash) { - // trace!( - // child=%maybe_head_hash, parent=%parent_block.hash.unwrap(), "avoiding thundering herd", - // ); - - maybe_head_block = parent_block; - continue; - } else { - if num_known < web3_rpcs.min_head_rpcs { - return Err(anyhow::anyhow!( - "not enough rpcs connected: {}/{}/{}", - highest_rpcs.len(), - num_known, - web3_rpcs.min_head_rpcs, - )); - } else { - let soft_limit_percent = (highest_rpcs_sum_soft_limit as f32 - / web3_rpcs.min_sum_soft_limit as f32) - * 100.0; - - return Err(anyhow::anyhow!( - "ran out of parents to check. rpcs {}/{}/{}. soft limit: {:.2}% ({}/{})", - highest_rpcs.len(), - num_known, - web3_rpcs.min_head_rpcs, - highest_rpcs_sum_soft_limit, - web3_rpcs.min_sum_soft_limit, - soft_limit_percent, - )); - } - } - } - - // TODO: if consensus_head_rpcs.is_empty, try another method of finding the head block. will need to change the return Err above into breaks. - - // we've done all the searching for the heaviest block that we can - if highest_rpcs.len() < web3_rpcs.min_head_rpcs - || highest_rpcs_sum_soft_limit < web3_rpcs.min_sum_soft_limit - { - // if we get here, not enough servers are synced. return an error - let soft_limit_percent = - (highest_rpcs_sum_soft_limit as f32 / web3_rpcs.min_sum_soft_limit as f32) * 100.0; - - return Err(anyhow::anyhow!( - "Not enough resources. rpcs {}/{}/{}. soft limit: {:.2}% ({}/{})", - highest_rpcs.len(), - num_known, - web3_rpcs.min_head_rpcs, - highest_rpcs_sum_soft_limit, - web3_rpcs.min_sum_soft_limit, - soft_limit_percent, - )); - } - - // success! this block has enough soft limit and nodes on it (or on later blocks) - let conns: Vec> = highest_rpcs - .into_iter() - .filter_map(|conn_name| web3_rpcs.conns.get(conn_name).cloned()) - .collect(); - - // TODO: DEBUG only check - let _ = maybe_head_block - .hash - .expect("head blocks always have hashes"); - let _ = maybe_head_block - .number - .expect("head blocks always have numbers"); - - let consensus_head_block: SavedBlock = maybe_head_block.into(); - - Ok(ConsensusWeb3Rpcs { - head_block: Some(consensus_head_block), - conns, - num_checked_conns: self.rpc_name_to_hash.len(), - includes_backups: self.includes_backups, - }) - } -} - -/// A ConsensusConnections builder that tracks all connection heads across multiple groups of servers -pub struct ConsensusFinder { - /// only main servers - main: ConnectionsGroup, - /// main and backup servers - all: ConnectionsGroup, -} - -impl Default for ConsensusFinder { - fn default() -> Self { - Self { - main: ConnectionsGroup::without_backups(), - all: ConnectionsGroup::with_backups(), - } - } -} - -impl ConsensusFinder { - fn remove(&mut self, rpc: &Web3Rpc) -> Option { - // TODO: should we have multiple backup tiers? (remote datacenters vs third party) - if !rpc.backup { - self.main.remove(rpc); - } - self.all.remove(rpc) - } - - fn insert(&mut self, rpc: &Web3Rpc, new_hash: H256) -> Option { - // TODO: should we have multiple backup tiers? (remote datacenters vs third party) - if !rpc.backup { - self.main.insert(rpc, new_hash); - } - self.all.insert(rpc, new_hash) - } - - /// Update our tracking of the rpc and return true if something changed - async fn update_rpc( - &mut self, - rpc_head_block: Option, - rpc: Arc, - // we need this so we can save the block to caches. i don't like it though. maybe we should use a lazy_static Cache wrapper that has a "save_block" method?. i generally dislike globals but i also dislike all the types having to pass eachother around - web3_connections: &Web3Rpcs, - ) -> anyhow::Result { - // add the rpc's block to connection_heads, or remove the rpc from connection_heads - let changed = match rpc_head_block { - Some(mut rpc_head_block) => { - // we don't know if its on the heaviest chain yet - rpc_head_block.block = web3_connections - .save_block(rpc_head_block.block, false) - .await?; - - // we used to remove here if the block was too far behind. but it just made things more complicated - - let rpc_head_hash = rpc_head_block.hash(); - - if let Some(prev_hash) = self.insert(&rpc, rpc_head_hash) { - if prev_hash == rpc_head_hash { - // this block was already sent by this rpc. return early - false - } else { - // new block for this rpc - true - } - } else { - // first block for this rpc - true - } - } - None => { - if self.remove(&rpc).is_none() { - // this rpc was already removed - false - } else { - // rpc head changed from being synced to not - true - } - } - }; - - Ok(changed) - } - - // TODO: this could definitely be cleaner. i don't like the error handling/unwrapping - async fn best_consensus_connections( - &mut self, - authorization: &Arc, - web3_connections: &Web3Rpcs, - ) -> ConsensusWeb3Rpcs { - let highest_block_num = match self - .all - .highest_block(authorization, web3_connections) - .await - { - None => { - return ConsensusWeb3Rpcs::default(); - } - Some(x) => x.number.expect("blocks here should always have a number"), - }; - - // TODO: also needs to be not less than our current head - let mut min_block_num = highest_block_num.saturating_sub(U64::from(5)); - - // we also want to be sure we don't ever go backwards! - if let Some(current_consensus_head_num) = web3_connections.head_block_num() { - min_block_num = min_block_num.max(current_consensus_head_num); - } - - // TODO: pass `min_block_num` to consensus_head_connections? - let consensus_head_for_main = self - .main - .consensus_head_connections(authorization, web3_connections) - .await - .map_err(|err| err.context("cannot use main group")); - - let consensus_num_for_main = consensus_head_for_main - .as_ref() - .ok() - .map(|x| x.head_block.as_ref().unwrap().number()); - - if let Some(consensus_num_for_main) = consensus_num_for_main { - if consensus_num_for_main >= min_block_num { - return consensus_head_for_main.unwrap(); - } - } - - // TODO: pass `min_block_num` to consensus_head_connections? - let consensus_connections_for_all = match self - .all - .consensus_head_connections(authorization, web3_connections) - .await - { - Err(err) => { - if self.all.rpc_name_to_hash.len() < web3_connections.min_head_rpcs { - debug!("No consensus head yet: {}", err); - } - return ConsensusWeb3Rpcs::default(); - } - Ok(x) => x, - }; - - let consensus_num_for_all = consensus_connections_for_all - .head_block - .as_ref() - .map(|x| x.number()); - - if consensus_num_for_all > consensus_num_for_main { - if consensus_num_for_all < Some(min_block_num) { - // TODO: this should have an alarm in sentry - error!("CONSENSUS HEAD w/ BACKUP NODES IS VERY OLD!"); - } - consensus_connections_for_all - } else { - if let Ok(x) = consensus_head_for_main { - error!("CONSENSUS HEAD IS VERY OLD! Backup RPCs did not improve this situation"); - x - } else { - // TODO: i don't think we need this error. and i doublt we'll ever even get here - error!("NO CONSENSUS HEAD!"); - ConsensusWeb3Rpcs::default() - } - } - } -} diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs new file mode 100644 index 00000000..289be536 --- /dev/null +++ b/web3_proxy/src/rpcs/consensus.rs @@ -0,0 +1,527 @@ +use crate::frontend::authorization::Authorization; + +use super::blockchain::Web3ProxyBlock; +use super::many::Web3Rpcs; +use super::one::Web3Rpc; +use ethers::prelude::{H256, U64}; +use hashbrown::{HashMap, HashSet}; +use log::{debug, trace, warn}; +use serde::Serialize; +use std::collections::BTreeMap; +use std::fmt; +use std::sync::Arc; + +/// A collection of Web3Rpcs that are on the same block. +/// Serialize is so we can print it on our debug endpoint +#[derive(Clone, Default, Serialize)] +pub struct ConsensusWeb3Rpcs { + pub(super) head_block: Option, + // TODO: this should be able to serialize, but it isn't + #[serde(skip_serializing)] + pub(super) conns: Vec>, + pub(super) backups_voted: Option, + pub(super) backups_needed: bool, +} + +impl ConsensusWeb3Rpcs { + pub fn num_conns(&self) -> usize { + self.conns.len() + } + + pub fn sum_soft_limit(&self) -> u32 { + self.conns.iter().fold(0, |sum, rpc| sum + rpc.soft_limit) + } + + // TODO: sum_hard_limit? +} + +impl fmt::Debug for ConsensusWeb3Rpcs { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + // TODO: the default formatter takes forever to write. this is too quiet though + // TODO: print the actual conns? + f.debug_struct("ConsensusConnections") + .field("head_block", &self.head_block) + .field("num_conns", &self.conns.len()) + .finish_non_exhaustive() + } +} + +impl Web3Rpcs { + // TODO: return a ref? + pub fn head_block(&self) -> Option { + self.watch_consensus_head_receiver + .as_ref() + .map(|x| x.borrow().clone()) + } + + // TODO: return a ref? + pub fn head_block_hash(&self) -> Option { + self.head_block().map(|x| *x.hash()) + } + + // TODO: return a ref? + pub fn head_block_num(&self) -> Option { + self.head_block().map(|x| *x.number()) + } + + pub fn synced(&self) -> bool { + !self.watch_consensus_rpcs_sender.borrow().conns.is_empty() + } + + pub fn num_synced_rpcs(&self) -> usize { + self.watch_consensus_rpcs_sender.borrow().conns.len() + } +} + +pub struct ConnectionsGroup { + rpc_name_to_block: HashMap, + // TODO: what if there are two blocks with the same number? + highest_block: Option, +} + +impl Default for ConnectionsGroup { + fn default() -> Self { + Self { + rpc_name_to_block: Default::default(), + highest_block: Default::default(), + } + } +} + +impl ConnectionsGroup { + pub fn len(&self) -> usize { + self.rpc_name_to_block.len() + } + + fn remove(&mut self, rpc_name: &str) -> Option { + if let Some(removed_block) = self.rpc_name_to_block.remove(rpc_name) { + match self.highest_block.as_mut() { + None => {} + Some(current_highest_block) => { + if removed_block.hash() == current_highest_block.hash() { + for maybe_highest_block in self.rpc_name_to_block.values() { + if maybe_highest_block.number() > current_highest_block.number() { + *current_highest_block = maybe_highest_block.clone(); + }; + } + } + } + } + + Some(removed_block) + } else { + None + } + } + + fn insert(&mut self, rpc: &Web3Rpc, block: Web3ProxyBlock) -> Option { + // TODO: what about a reorg to the same height? + if Some(block.number()) > self.highest_block.as_ref().map(|x| x.number()) { + self.highest_block = Some(block.clone()); + } + + self.rpc_name_to_block.insert(rpc.name.clone(), block) + } + + // // TODO: do this during insert/remove? + // pub(self) async fn highest_block( + // &self, + // authorization: &Arc, + // web3_rpcs: &Web3Rpcs, + // ) -> Option { + // let mut checked_heads = HashSet::with_capacity(self.rpc_name_to_hash.len()); + // let mut highest_block = None::; + + // for (rpc_name, rpc_head_hash) in self.rpc_name_to_hash.iter() { + // // don't waste time checking the same hash multiple times + // if checked_heads.contains(rpc_head_hash) { + // continue; + // } + + // let rpc_block = match web3_rpcs + // .get_block_from_rpc(rpc_name, rpc_head_hash, authorization) + // .await + // { + // Ok(x) => x, + // Err(err) => { + // warn!( + // "failed getting block {} from {} while finding highest block number: {:?}", + // rpc_head_hash, rpc_name, err, + // ); + // continue; + // } + // }; + + // checked_heads.insert(rpc_head_hash); + + // // if this is the first block we've tried + // // or if this rpc's newest block has a higher number + // // we used to check total difficulty, but that isn't a thing anymore on ETH + // // TODO: we still need total difficulty on some other PoW chains. whats annoying is it isn't considered part of the "block header" just the block. so websockets don't return it + // let highest_num = highest_block + // .as_ref() + // .map(|x| x.number.expect("blocks here should always have a number")); + // let rpc_num = rpc_block.as_ref().number; + + // if rpc_num > highest_num { + // highest_block = Some(rpc_block); + // } + // } + + // highest_block + // } + + /// min_consensus_block_num keeps us from ever going backwards. + /// TODO: think about min_consensus_block_num more. i think this might cause an outage if the chain is doing weird things. but 503s is probably better than broken data. + pub(self) async fn consensus_head_connections( + &self, + authorization: &Arc, + web3_rpcs: &Web3Rpcs, + min_consensus_block_num: Option, + ) -> anyhow::Result { + let mut maybe_head_block = match self.highest_block.clone() { + None => return Err(anyhow::anyhow!("no blocks known")), + Some(x) => x, + }; + + // TODO: take max_distance_consensus_to_highest as an argument? + // TODO: what if someone's backup node is misconfigured and goes on a really fast forked chain? + let max_lag_consensus_to_highest = + if let Some(min_consensus_block_num) = min_consensus_block_num { + maybe_head_block + .number() + .saturating_sub(min_consensus_block_num) + .as_u64() + } else { + // TODO: get from app config? different chains probably should have different values. 10 is probably too much + 10 + }; + + let num_known = self.rpc_name_to_block.len(); + + if num_known < web3_rpcs.min_head_rpcs { + return Err(anyhow::anyhow!( + "not enough rpcs connected: {}/{}", + num_known, + web3_rpcs.min_head_rpcs, + )); + } + + let mut primary_rpcs_voted: Option = None; + let mut backup_rpcs_voted: Option = None; + + // track rpcs on this heaviest chain so we can build a new ConsensusConnections + let mut primary_consensus_rpcs = HashSet::<&str>::new(); + let mut backup_consensus_rpcs = HashSet::<&str>::new(); + + // a running total of the soft limits covered by the rpcs that agree on the head block + let mut primary_sum_soft_limit: u32 = 0; + let mut backup_sum_soft_limit: u32 = 0; + + // TODO: also track the sum of *available* hard_limits. if any servers have no hard limits, use their soft limit or no limit? + + // check the highest work block for a set of rpcs that can serve our request load + // if it doesn't have enough rpcs for our request load, check the parent block + // TODO: loop for how many parent blocks? we don't want to serve blocks that are too far behind. probably different per chain + // TODO: this loop is pretty long. any way to clean up this code? + for _ in 0..max_lag_consensus_to_highest { + let maybe_head_hash = maybe_head_block.hash(); + + // find all rpcs with maybe_head_hash as their current head + for (rpc_name, rpc_head) in self.rpc_name_to_block.iter() { + if rpc_head.hash() != maybe_head_hash { + // connection is not on the desired block + continue; + } + if backup_consensus_rpcs.contains(rpc_name.as_str()) { + // connection is on a later block in this same chain + continue; + } + if primary_consensus_rpcs.contains(rpc_name.as_str()) { + // connection is on a later block in this same chain + continue; + } + + if let Some(rpc) = web3_rpcs.conns.get(rpc_name.as_str()) { + if backup_rpcs_voted.is_some() { + // backups already voted for a head block. don't change it + } else { + backup_consensus_rpcs.insert(rpc_name); + backup_sum_soft_limit += rpc.soft_limit; + } + if !rpc.backup { + primary_consensus_rpcs.insert(rpc_name); + primary_sum_soft_limit += rpc.soft_limit; + } + } else { + // i don't think this is an error. i think its just if a reconnect is currently happening + warn!("connection missing: {}", rpc_name); + debug!("web3_rpcs.conns: {:#?}", web3_rpcs.conns); + } + } + + if primary_sum_soft_limit >= web3_rpcs.min_sum_soft_limit + && primary_consensus_rpcs.len() >= web3_rpcs.min_head_rpcs + { + // we have enough servers with enough requests! yey! + primary_rpcs_voted = Some(maybe_head_block.clone()); + break; + } + + if backup_rpcs_voted.is_none() + && backup_consensus_rpcs != primary_consensus_rpcs + && backup_sum_soft_limit >= web3_rpcs.min_sum_soft_limit + && backup_consensus_rpcs.len() >= web3_rpcs.min_head_rpcs + { + // if we include backup servers, we have enough servers with high enough limits + backup_rpcs_voted = Some(maybe_head_block.clone()); + } + + // not enough rpcs on this block. check the parent block + match web3_rpcs + .block(authorization, &maybe_head_block.parent_hash(), None) + .await + { + Ok(parent_block) => { + // trace!( + // child=%maybe_head_hash, parent=%parent_block.hash.unwrap(), "avoiding thundering herd. checking consensus on parent block", + // ); + maybe_head_block = parent_block.into(); + continue; + } + Err(err) => { + let soft_limit_percent = (primary_sum_soft_limit as f32 + / web3_rpcs.min_sum_soft_limit as f32) + * 100.0; + + let err_msg = format!("ran out of parents to check. rpcs {}/{}/{}. soft limit: {:.2}% ({}/{}). err: {:#?}", + primary_consensus_rpcs.len(), + num_known, + web3_rpcs.min_head_rpcs, + primary_sum_soft_limit, + web3_rpcs.min_sum_soft_limit, + soft_limit_percent, + err, + ); + + if backup_rpcs_voted.is_some() { + warn!("{}", err_msg); + break; + } else { + return Err(anyhow::anyhow!(err_msg)); + } + } + } + } + + // TODO: if consensus_head_rpcs.is_empty, try another method of finding the head block. will need to change the return Err above into breaks. + + // we've done all the searching for the heaviest block that we can + if (primary_consensus_rpcs.len() < web3_rpcs.min_head_rpcs + || primary_sum_soft_limit < web3_rpcs.min_sum_soft_limit) + && backup_rpcs_voted.is_none() + { + // if we get here, not enough servers are synced. return an error + let soft_limit_percent = + (primary_sum_soft_limit as f32 / web3_rpcs.min_sum_soft_limit as f32) * 100.0; + + return Err(anyhow::anyhow!( + "Not enough resources. rpcs {}/{}/{}. soft limit: {:.2}% ({}/{})", + primary_consensus_rpcs.len(), + num_known, + web3_rpcs.min_head_rpcs, + primary_sum_soft_limit, + web3_rpcs.min_sum_soft_limit, + soft_limit_percent, + )); + } + + // success! this block has enough soft limit and nodes on it (or on later blocks) + let conns: Vec> = primary_consensus_rpcs + .into_iter() + .filter_map(|conn_name| web3_rpcs.conns.get(conn_name).cloned()) + .collect(); + + #[cfg(debug_assertions)] + let _ = maybe_head_block.hash(); + #[cfg(debug_assertions)] + let _ = maybe_head_block.number(); + + Ok(ConsensusWeb3Rpcs { + head_block: Some(maybe_head_block), + conns, + backups_voted: backup_rpcs_voted, + backups_needed: primary_rpcs_voted.is_none(), + }) + } +} + +/// A ConsensusConnections builder that tracks all connection heads across multiple groups of servers +pub struct ConsensusFinder { + /// backups for all tiers are only used if necessary + /// tiers[0] = only tier 0. + /// tiers[1] = tier 0 and tier 1 + /// tiers[n] = tier 0..=n + /// This is a BTreeMap and not a Vec because sometimes a tier is empty + tiers: BTreeMap, + /// never serve blocks that are too old + max_block_age: Option, + /// tier 0 will be prefered as long as the distance between it and the other tiers is <= max_tier_lag + max_block_lag: Option, +} + +impl ConsensusFinder { + pub fn new(max_block_age: Option, max_block_lag: Option) -> Self { + Self { + tiers: Default::default(), + max_block_age, + max_block_lag, + } + } +} + +impl ConsensusFinder { + /// get the ConnectionsGroup that contains all rpcs + /// panics if there are no tiers + pub fn all_rpcs_group(&self) -> Option<&ConnectionsGroup> { + self.tiers.values().last() + } + + /// get the mutable ConnectionsGroup that contains all rpcs + pub fn all_mut(&mut self) -> Option<&mut ConnectionsGroup> { + self.tiers.values_mut().last() + } + + pub fn remove(&mut self, rpc: &Web3Rpc) -> Option { + let mut removed = None; + + for (i, tier_group) in self.tiers.iter_mut().rev() { + if i < &rpc.tier { + break; + } + let x = tier_group.remove(rpc.name.as_str()); + + if removed.is_none() && x.is_some() { + removed = x; + } + } + + removed + } + + /// returns the block that the rpc was on before updating to the new_block + pub fn insert(&mut self, rpc: &Web3Rpc, new_block: Web3ProxyBlock) -> Option { + let mut old = None; + + for (i, tier_group) in self.tiers.iter_mut().rev() { + if i > &rpc.tier { + break; + } + + // TODO: should new_block be a ref? + let x = tier_group.insert(rpc, new_block.clone()); + + if old.is_none() && x.is_some() { + old = x; + } + } + + old + } + + /// Update our tracking of the rpc and return true if something changed + pub(crate) async fn update_rpc( + &mut self, + rpc_head_block: Option, + rpc: Arc, + // we need this so we can save the block to caches. i don't like it though. maybe we should use a lazy_static Cache wrapper that has a "save_block" method?. i generally dislike globals but i also dislike all the types having to pass eachother around + web3_connections: &Web3Rpcs, + ) -> anyhow::Result { + // add the rpc's block to connection_heads, or remove the rpc from connection_heads + let changed = match rpc_head_block { + Some(mut rpc_head_block) => { + // we don't know if its on the heaviest chain yet + rpc_head_block = web3_connections + .try_cache_block(rpc_head_block, false) + .await?; + + // if let Some(max_block_lag) = max_block_lag { + // if rpc_head_block.number() < ??? { + // trace!("rpc_head_block from {} is too far behind! {}", rpc, rpc_head_block); + // return Ok(self.remove(&rpc).is_some()); + // } + // } + + if let Some(max_age) = self.max_block_age { + if rpc_head_block.age() > max_age { + trace!("rpc_head_block from {} is too old! {}", rpc, rpc_head_block); + return Ok(self.remove(&rpc).is_some()); + } + } + + if let Some(prev_block) = self.insert(&rpc, rpc_head_block.clone()) { + if prev_block.hash() == rpc_head_block.hash() { + // this block was already sent by this rpc. return early + false + } else { + // new block for this rpc + true + } + } else { + // first block for this rpc + true + } + } + None => { + if self.remove(&rpc).is_none() { + // this rpc was already removed + false + } else { + // rpc head changed from being synced to not + true + } + } + }; + + Ok(changed) + } + + // TODO: this could definitely be cleaner. i don't like the error handling/unwrapping + pub async fn best_consensus_connections( + &mut self, + authorization: &Arc, + web3_connections: &Web3Rpcs, + ) -> Option { + // TODO: attach context to these? + let highest_known_block = self.all_rpcs_group()?.highest_block.as_ref()?; + + trace!("highest_known_block: {}", highest_known_block); + + let min_block_num = self + .max_block_lag + .map(|x| highest_known_block.number().saturating_sub(x)) + // we also want to be sure we don't ever go backwards! + .max(web3_connections.head_block_num()); + + trace!("min_block_num: {:#?}", min_block_num); + + // TODO Should this be a Vec>>? + // TODO: how should errors be handled? + // TODO: find the best tier with a connectionsgroup. best case, this only queries the first tier + // TODO: do we need to calculate all of them? I think having highest_known_block included as part of min_block_num should make that unnecessary + for (i, x) in self.tiers.iter() { + trace!("checking tier {}", i); + if let Ok(consensus_head_connections) = x + .consensus_head_connections(authorization, web3_connections, min_block_num) + .await + { + trace!("success on tier {}", i); + // we got one! hopefully it didn't need to use any backups. + // but even if it did need backup servers, that is better than going to a worse tier + return Some(consensus_head_connections); + } + } + + return None; + } +} diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 83eb3922..e5293d2d 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -1,8 +1,8 @@ ///! Load balanced communication with a group of web3 rpc providers -use super::blockchain::{ArcBlock, BlockHashesCache}; +use super::blockchain::{BlockHashesCache, Web3ProxyBlock}; +use super::consensus::ConsensusWeb3Rpcs; use super::one::Web3Rpc; use super::request::{OpenRequestHandle, OpenRequestResult, RequestRevertHandler}; -use super::synced_connections::ConsensusWeb3Rpcs; use crate::app::{flatten_handle, AnyhowJoinHandle}; use crate::config::{BlockAndRpc, TxHashAndRpc, Web3RpcConfig}; use crate::frontend::authorization::{Authorization, RequestMetadata}; @@ -38,9 +38,9 @@ pub struct Web3Rpcs { /// any requests will be forwarded to one (or more) of these connections pub(crate) conns: HashMap>, /// all providers with the same consensus head block. won't update if there is no `self.watch_consensus_head_sender` - pub(super) watch_consensus_connections_sender: watch::Sender>, + pub(super) watch_consensus_rpcs_sender: watch::Sender>, /// this head receiver makes it easy to wait until there is a new block - pub(super) watch_consensus_head_receiver: Option>, + pub(super) watch_consensus_head_receiver: Option>, pub(super) pending_transactions: Cache, /// TODO: this map is going to grow forever unless we do some sort of pruning. maybe store pruned in redis? @@ -48,8 +48,14 @@ pub struct Web3Rpcs { pub(super) block_hashes: BlockHashesCache, /// blocks on the heaviest chain pub(super) block_numbers: Cache, + /// the number of rpcs required to agree on consensus for the head block (thundering herd protection) pub(super) min_head_rpcs: usize, + /// the soft limit required to agree on consensus for the head block. (thundering herd protection) pub(super) min_sum_soft_limit: u32, + /// how far behind the highest known block height we can be before we stop serving requests + pub(super) max_block_lag: Option, + /// how old our consensus head block we can be before we stop serving requests + pub(super) max_block_age: Option, } impl Web3Rpcs { @@ -60,13 +66,15 @@ impl Web3Rpcs { chain_id: u64, db_conn: Option, http_client: Option, + max_block_age: Option, + max_block_lag: Option, min_head_rpcs: usize, min_sum_soft_limit: u32, pending_transactions: Cache, pending_tx_sender: Option>, redis_pool: Option, server_configs: HashMap, - watch_consensus_head_sender: Option>, + watch_consensus_head_sender: Option>, ) -> anyhow::Result<(Arc, AnyhowJoinHandle<()>)> { let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded(); let (block_sender, block_receiver) = flume::unbounded::(); @@ -212,13 +220,15 @@ impl Web3Rpcs { let connections = Arc::new(Self { conns: connections, - watch_consensus_connections_sender, + watch_consensus_rpcs_sender: watch_consensus_connections_sender, watch_consensus_head_receiver, pending_transactions, block_hashes, block_numbers, min_sum_soft_limit, min_head_rpcs, + max_block_age, + max_block_lag, }); let authorization = Arc::new(Authorization::internal(db_conn.clone())?); @@ -254,7 +264,7 @@ impl Web3Rpcs { authorization: Arc, pending_tx_id_receiver: flume::Receiver, block_receiver: flume::Receiver, - head_block_sender: Option>, + head_block_sender: Option>, pending_tx_sender: Option>, ) -> anyhow::Result<()> { let mut futures = vec![]; @@ -455,7 +465,7 @@ impl Web3Rpcs { max_block_needed: Option<&U64>, ) -> anyhow::Result { let usable_rpcs_by_head_num_and_weight: BTreeMap<(Option, u64), Vec>> = { - let synced_connections = self.watch_consensus_connections_sender.borrow().clone(); + let synced_connections = self.watch_consensus_rpcs_sender.borrow().clone(); let head_block_num = if let Some(head_block) = synced_connections.head_block.as_ref() { head_block.number() @@ -499,7 +509,7 @@ impl Web3Rpcs { match x_head_block { None => continue, Some(x_head) => { - let key = (Some(x_head.number()), u64::MAX - x.tier); + let key = (Some(*x_head.number()), u64::MAX - x.tier); m.entry(key).or_insert_with(Vec::new).push(x); } @@ -508,6 +518,7 @@ impl Web3Rpcs { } cmp::Ordering::Equal => { // need the consensus head block. filter the synced rpcs + // TODO: this doesn't properly check the allow_backups variable! for x in synced_connections .conns .iter() @@ -519,7 +530,7 @@ impl Web3Rpcs { } } cmp::Ordering::Greater => { - // TODO? if the blocks is close and wait_for_sync and allow_backups, wait for change on a watch_consensus_connections_receiver().subscribe() + // TODO? if the blocks is close, wait for change on a watch_consensus_connections_receiver().subscribe() return Ok(OpenRequestResult::NotReady(allow_backups)); } } @@ -670,11 +681,7 @@ impl Web3Rpcs { let mut tried = HashSet::new(); - let mut synced_conns = self - .watch_consensus_connections_sender - .borrow() - .conns - .clone(); + let mut synced_conns = self.watch_consensus_rpcs_sender.borrow().conns.clone(); // synced connections are all on the same block. sort them by tier with higher soft limits first synced_conns.sort_by_cached_key(rpc_sync_status_sort_key); @@ -754,7 +761,7 @@ impl Web3Rpcs { let mut skip_rpcs = vec![]; let mut method_not_available_response = None; - let mut watch_consensus_connections = self.watch_consensus_connections_sender.subscribe(); + let mut watch_consensus_connections = self.watch_consensus_rpcs_sender.subscribe(); // TODO: maximum retries? right now its the total number of servers loop { @@ -1144,7 +1151,7 @@ impl Serialize for Web3Rpcs { state.serialize_field("conns", &conns)?; { - let consensus_connections = self.watch_consensus_connections_sender.borrow().clone(); + let consensus_connections = self.watch_consensus_rpcs_sender.borrow().clone(); // TODO: rename synced_connections to consensus_connections? state.serialize_field("synced_connections", &consensus_connections)?; } @@ -1181,10 +1188,8 @@ mod tests { // TODO: why is this allow needed? does tokio::test get in the way somehow? #![allow(unused_imports)] use super::*; - use crate::rpcs::{ - blockchain::{ConsensusFinder, SavedBlock}, - provider::Web3Provider, - }; + use crate::rpcs::consensus::ConsensusFinder; + use crate::rpcs::{blockchain::Web3ProxyBlock, provider::Web3Provider}; use ethers::types::{Block, U256}; use log::{trace, LevelFilter}; use parking_lot::RwLock; @@ -1213,7 +1218,7 @@ mod tests { let blocks: Vec<_> = [block_0, block_1, block_2] .into_iter() - .map(|x| SavedBlock::new(Arc::new(x))) + .map(|x| Web3ProxyBlock::new(Arc::new(x))) .collect(); let mut rpcs: Vec<_> = [ @@ -1298,9 +1303,8 @@ mod tests { let lagged_block = Arc::new(lagged_block); let head_block = Arc::new(head_block); - // TODO: write a impl From for Block -> BlockId? - let mut lagged_block: SavedBlock = lagged_block.into(); - let mut head_block: SavedBlock = head_block.into(); + let mut lagged_block: Web3ProxyBlock = lagged_block.into(); + let mut head_block: Web3ProxyBlock = head_block.into(); let block_data_limit = u64::MAX; @@ -1312,6 +1316,7 @@ mod tests { block_data_limit: block_data_limit.into(), tier: 0, head_block: RwLock::new(Some(head_block.clone())), + provider: AsyncRwLock::new(Some(Arc::new(Web3Provider::Mock))), ..Default::default() }; @@ -1323,6 +1328,7 @@ mod tests { block_data_limit: block_data_limit.into(), tier: 0, head_block: RwLock::new(Some(lagged_block.clone())), + provider: AsyncRwLock::new(Some(Arc::new(Web3Provider::Mock))), ..Default::default() }; @@ -1340,13 +1346,13 @@ mod tests { (lagged_rpc.name.clone(), lagged_rpc.clone()), ]); - let (watch_consensus_connections_sender, _) = watch::channel(Default::default()); + let (watch_consensus_rpcs_sender, _) = watch::channel(Default::default()); // TODO: make a Web3Rpcs::new let conns = Web3Rpcs { conns, watch_consensus_head_receiver: None, - watch_consensus_connections_sender, + watch_consensus_rpcs_sender, pending_transactions: Cache::builder() .max_capacity(10_000) .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()), @@ -1356,32 +1362,37 @@ mod tests { block_numbers: Cache::builder() .max_capacity(10_000) .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()), + // TODO: test max_block_age? + max_block_age: None, + // TODO: test max_block_lag? + max_block_lag: None, min_head_rpcs: 1, min_sum_soft_limit: 1, }; let authorization = Arc::new(Authorization::internal(None).unwrap()); - let (head_block_sender, _head_block_receiver) = - watch::channel::(Default::default()); - let mut connection_heads = ConsensusFinder::default(); + let (head_block_sender, _head_block_receiver) = watch::channel(Default::default()); + let mut consensus_finder = ConsensusFinder::new(None, None); // process None so that conns .process_block_from_rpc( &authorization, - &mut connection_heads, + &mut consensus_finder, None, lagged_rpc.clone(), &head_block_sender, &None, ) .await - .unwrap(); + .expect( + "its lagged, but it should still be seen as consensus if its the first to report", + ); conns .process_block_from_rpc( &authorization, - &mut connection_heads, + &mut consensus_finder, None, head_rpc.clone(), &head_block_sender, @@ -1414,12 +1425,12 @@ mod tests { assert!(matches!(x, OpenRequestResult::NotReady(true))); // add lagged blocks to the conns. both servers should be allowed - lagged_block.block = conns.save_block(lagged_block.block, true).await.unwrap(); + lagged_block = conns.try_cache_block(lagged_block, true).await.unwrap(); conns .process_block_from_rpc( &authorization, - &mut connection_heads, + &mut consensus_finder, Some(lagged_block.clone()), lagged_rpc, &head_block_sender, @@ -1430,7 +1441,7 @@ mod tests { conns .process_block_from_rpc( &authorization, - &mut connection_heads, + &mut consensus_finder, Some(lagged_block.clone()), head_rpc.clone(), &head_block_sender, @@ -1442,12 +1453,12 @@ mod tests { assert_eq!(conns.num_synced_rpcs(), 2); // add head block to the conns. lagged_rpc should not be available - head_block.block = conns.save_block(head_block.block, true).await.unwrap(); + head_block = conns.try_cache_block(head_block, true).await.unwrap(); conns .process_block_from_rpc( &authorization, - &mut connection_heads, + &mut consensus_finder, Some(head_block.clone()), head_rpc, &head_block_sender, @@ -1511,7 +1522,7 @@ mod tests { ..Default::default() }; - let head_block: SavedBlock = Arc::new(head_block).into(); + let head_block: Web3ProxyBlock = Arc::new(head_block).into(); let pruned_rpc = Web3Rpc { name: "pruned".to_string(), @@ -1548,13 +1559,13 @@ mod tests { (archive_rpc.name.clone(), archive_rpc.clone()), ]); - let (watch_consensus_connections_sender, _) = watch::channel(Default::default()); + let (watch_consensus_rpcs_sender, _) = watch::channel(Default::default()); // TODO: make a Web3Rpcs::new let conns = Web3Rpcs { conns, watch_consensus_head_receiver: None, - watch_consensus_connections_sender, + watch_consensus_rpcs_sender, pending_transactions: Cache::builder() .max_capacity(10) .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()), @@ -1566,13 +1577,14 @@ mod tests { .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()), min_head_rpcs: 1, min_sum_soft_limit: 3_000, + max_block_age: None, + max_block_lag: None, }; let authorization = Arc::new(Authorization::internal(None).unwrap()); - let (head_block_sender, _head_block_receiver) = - watch::channel::(Default::default()); - let mut connection_heads = ConsensusFinder::default(); + let (head_block_sender, _head_block_receiver) = watch::channel(Default::default()); + let mut connection_heads = ConsensusFinder::new(None, None); conns .process_block_from_rpc( diff --git a/web3_proxy/src/rpcs/mod.rs b/web3_proxy/src/rpcs/mod.rs index 44ea5afe..41b7a6ea 100644 --- a/web3_proxy/src/rpcs/mod.rs +++ b/web3_proxy/src/rpcs/mod.rs @@ -1,8 +1,8 @@ // TODO: all pub, or export useful things here instead? pub mod blockchain; +pub mod consensus; pub mod many; pub mod one; pub mod provider; pub mod request; -pub mod synced_connections; pub mod transactions; diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index dfa08a4f..8b4decc4 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -1,5 +1,5 @@ ///! Rate-limited communication with a web3 provider. -use super::blockchain::{ArcBlock, BlockHashesCache, SavedBlock}; +use super::blockchain::{ArcBlock, BlockHashesCache, Web3ProxyBlock}; use super::provider::Web3Provider; use super::request::{OpenRequestHandle, OpenRequestResult}; use crate::app::{flatten_handle, AnyhowJoinHandle}; @@ -81,7 +81,7 @@ pub struct Web3Rpc { /// Lower tiers are higher priority when sending requests pub(super) tier: u64, /// TODO: change this to a watch channel so that http providers can subscribe and take action on change. - pub(super) head_block: RwLock>, + pub(super) head_block: RwLock>, /// Track how fast this RPC is pub(super) latency: Web3RpcLatencies, } @@ -308,9 +308,9 @@ impl Web3Rpc { } pub fn has_block_data(&self, needed_block_num: &U64) -> bool { - let head_block_num = match self.head_block.read().clone() { + let head_block_num = match self.head_block.read().as_ref() { None => return false, - Some(x) => x.number(), + Some(x) => *x.number(), }; // this rpc doesn't have that block yet. still syncing @@ -525,9 +525,9 @@ impl Web3Rpc { None } Ok(Some(new_head_block)) => { - let new_hash = new_head_block - .hash - .context("sending block to connections")?; + let new_head_block = Web3ProxyBlock::new(new_head_block); + + let new_hash = *new_head_block.hash(); // if we already have this block saved, set new_head_block to that arc. otherwise store this copy let new_head_block = block_map @@ -628,6 +628,7 @@ impl Web3Rpc { if let Some(client) = &*conn.provider.read().await { // trace!("health check unlocked with error on {}", conn); // returning error will trigger a reconnect + // also, do the health check as a way of keeping this rpc's request_ewma accurate // TODO: do a query of some kind } @@ -1164,7 +1165,7 @@ mod tests { let random_block = Arc::new(random_block); - let head_block = SavedBlock::new(random_block); + let head_block = Web3ProxyBlock::new(random_block); let block_data_limit = u64::MAX; let x = Web3Rpc { @@ -1194,7 +1195,7 @@ mod tests { .as_secs() .into(); - let head_block: SavedBlock = Arc::new(Block { + let head_block: Web3ProxyBlock = Arc::new(Block { hash: Some(H256::random()), number: Some(1_000_000.into()), timestamp: now, diff --git a/web3_proxy/src/rpcs/synced_connections.rs b/web3_proxy/src/rpcs/synced_connections.rs deleted file mode 100644 index e285c307..00000000 --- a/web3_proxy/src/rpcs/synced_connections.rs +++ /dev/null @@ -1,71 +0,0 @@ -use super::blockchain::{ArcBlock, SavedBlock}; -use super::many::Web3Rpcs; -use super::one::Web3Rpc; -use ethers::prelude::{H256, U64}; -use serde::Serialize; -use std::fmt; -use std::sync::Arc; - -/// A collection of Web3Rpcs that are on the same block. -/// Serialize is so we can print it on our debug endpoint -#[derive(Clone, Default, Serialize)] -pub struct ConsensusWeb3Rpcs { - // TODO: store ArcBlock instead? - pub(super) head_block: Option, - // TODO: this should be able to serialize, but it isn't - #[serde(skip_serializing)] - pub(super) conns: Vec>, - pub(super) num_checked_conns: usize, - pub(super) includes_backups: bool, -} - -impl ConsensusWeb3Rpcs { - pub fn num_conns(&self) -> usize { - self.conns.len() - } - - pub fn sum_soft_limit(&self) -> u32 { - self.conns.iter().fold(0, |sum, rpc| sum + rpc.soft_limit) - } - - // TODO: sum_hard_limit? -} - -impl fmt::Debug for ConsensusWeb3Rpcs { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - // TODO: the default formatter takes forever to write. this is too quiet though - // TODO: print the actual conns? - f.debug_struct("ConsensusConnections") - .field("head_block", &self.head_block) - .field("num_conns", &self.conns.len()) - .finish_non_exhaustive() - } -} - -impl Web3Rpcs { - pub fn head_block(&self) -> Option { - self.watch_consensus_head_receiver - .as_ref() - .map(|x| x.borrow().clone()) - } - - pub fn head_block_hash(&self) -> Option { - self.head_block().and_then(|x| x.hash) - } - - pub fn head_block_num(&self) -> Option { - self.head_block().and_then(|x| x.number) - } - - pub fn synced(&self) -> bool { - !self - .watch_consensus_connections_sender - .borrow() - .conns - .is_empty() - } - - pub fn num_synced_rpcs(&self) -> usize { - self.watch_consensus_connections_sender.borrow().conns.len() - } -}