well it compiles. doesnt work yet. but it compiles

This commit is contained in:
Bryan Stitt 2023-02-14 12:14:50 -08:00
parent 70105bc7bb
commit 824a6fa1f0
12 changed files with 759 additions and 657 deletions

25
Cargo.lock generated

@ -282,13 +282,13 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]]
name = "axum"
version = "0.6.5"
version = "0.6.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3114e77b361ec716aa429ae5c04243abe00cf7548e870b9370affcc5c491a7d0"
checksum = "4e246206a63c9830e118d12c894f56a82033da1a2361f5544deeee3df85c99d9"
dependencies = [
"async-trait",
"axum-core",
"base64 0.20.0",
"base64 0.21.0",
"bitflags",
"bytes",
"futures-util",
@ -347,9 +347,9 @@ dependencies = [
[[package]]
name = "axum-macros"
version = "0.3.3"
version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cb6bee4e05a5e0a5a67515ab24978efa7a80575a7a41a9fae35bb27fed6645d2"
checksum = "5fbf955307ff8addb48d2399393c9e2740dd491537ec562b66ab364fc4a38841"
dependencies = [
"heck 0.4.0",
"proc-macro2",
@ -419,12 +419,6 @@ version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8"
[[package]]
name = "base64"
version = "0.20.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ea22880d78093b0cbe17c89f64a7d457941e65759157ec6cb31a31d652b05e5"
[[package]]
name = "base64"
version = "0.21.0"
@ -3093,9 +3087,9 @@ dependencies = [
[[package]]
name = "once_cell"
version = "1.17.0"
version = "1.17.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6f61fba1741ea2b3d6a1e3178721804bb716a68a6aeba1149b5d52e3d464ea66"
checksum = "b7e5500299e16ebb147ae15a00a942af264cf3688f47923b8fc2cd5858f23ad3"
[[package]]
name = "opaque-debug"
@ -4520,9 +4514,9 @@ dependencies = [
[[package]]
name = "serde_prometheus"
version = "0.2.0"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1bfb6048d9e4ebc41f7d1a42c79b04c5b460633be307620a0e34a8f81970ea47"
checksum = "9c1a4ca38f4e746460d1dbd3711b8ca8ae314d1b21247edeff61dd20325b5a6f"
dependencies = [
"heapless",
"nom",
@ -5793,6 +5787,7 @@ dependencies = [
"notify",
"num",
"num-traits",
"once_cell",
"pagerduty-rs",
"parking_lot 0.12.1",
"prettytable",

@ -27,9 +27,9 @@ thread-fast-rng = { path = "../thread-fast-rng" }
anyhow = { version = "1.0.69", features = ["backtrace"] }
argh = "0.1.10"
axum = { version = "0.6.5", features = ["headers", "ws"] }
axum = { version = "0.6.6", features = ["headers", "ws"] }
axum-client-ip = "0.4.0"
axum-macros = "0.3.3"
axum-macros = "0.3.4"
chrono = "0.4.23"
counter = "0.5.7"
derive_more = "0.99.17"
@ -52,6 +52,7 @@ moka = { version = "0.10.0", default-features = false, features = ["future"] }
notify = "5.1.0"
num = "0.4.0"
num-traits = "0.2.15"
once_cell = { version = "1.17.1" }
pagerduty-rs = { version = "0.1.6", default-features = false, features = ["async", "rustls", "sync"] }
parking_lot = { version = "0.12.1", features = ["arc_lock"] }
prettytable = "*"
@ -62,7 +63,7 @@ rustc-hash = "1.1.0"
sentry = { version = "0.29.3", default-features = false, features = ["backtrace", "contexts", "panic", "anyhow", "reqwest", "rustls", "log", "sentry-log"] }
serde = { version = "1.0.152", features = [] }
serde_json = { version = "1.0.93", default-features = false, features = ["alloc", "raw_value"] }
serde_prometheus = "0.2.0"
serde_prometheus = "0.2.1"
siwe = "0.5.0"
time = "0.3.17"
tokio = { version = "1.25.0", features = ["full"] }

@ -10,7 +10,7 @@ use crate::frontend::rpc_proxy_ws::ProxyMode;
use crate::jsonrpc::{
JsonRpcForwardedResponse, JsonRpcForwardedResponseEnum, JsonRpcRequest, JsonRpcRequestEnum,
};
use crate::rpcs::blockchain::{ArcBlock, SavedBlock};
use crate::rpcs::blockchain::{BlockHashesCache, Web3ProxyBlock};
use crate::rpcs::many::Web3Rpcs;
use crate::rpcs::one::Web3Rpc;
use crate::rpcs::transactions::TxStatus;
@ -23,7 +23,7 @@ use derive_more::From;
use entities::sea_orm_active_enums::LogLevel;
use entities::user;
use ethers::core::utils::keccak256;
use ethers::prelude::{Address, Block, Bytes, Transaction, TxHash, H256, U64};
use ethers::prelude::{Address, Bytes, Transaction, TxHash, H256, U64};
use ethers::types::U256;
use ethers::utils::rlp::{Decodable, Rlp};
use futures::future::join_all;
@ -69,9 +69,9 @@ pub static REQUEST_PERIOD: u64 = 60;
#[derive(From)]
struct ResponseCacheKey {
// if none, this is cached until evicted
from_block: Option<SavedBlock>,
from_block: Option<Web3ProxyBlock>,
// to_block is only set when ranges of blocks are requested (like with eth_getLogs)
to_block: Option<SavedBlock>,
to_block: Option<Web3ProxyBlock>,
method: String,
// TODO: better type for this
params: Option<serde_json::Value>,
@ -204,7 +204,7 @@ pub struct Web3ProxyApp {
response_cache: ResponseCache,
// don't drop this or the sender will stop working
// TODO: broadcast channel instead?
watch_consensus_head_receiver: watch::Receiver<ArcBlock>,
watch_consensus_head_receiver: watch::Receiver<Web3ProxyBlock>,
pending_tx_sender: broadcast::Sender<TxStatus>,
pub config: AppConfig,
pub db_conn: Option<sea_orm::DatabaseConnection>,
@ -542,7 +542,7 @@ impl Web3ProxyApp {
// TODO: i don't like doing Block::default here! Change this to "None"?
let (watch_consensus_head_sender, watch_consensus_head_receiver) =
watch::channel(Arc::new(Block::default()));
watch::channel(Web3ProxyBlock::default());
// TODO: will one receiver lagging be okay? how big should this be?
let (pending_tx_sender, pending_tx_receiver) = broadcast::channel(256);
@ -563,11 +563,11 @@ impl Web3ProxyApp {
// TODO: limits from config
// these blocks don't have full transactions, but they do have rather variable amounts of transaction hashes
// TODO: how can we do the weigher better?
let block_map = Cache::builder()
let block_map: BlockHashesCache = Cache::builder()
.max_capacity(1024 * 1024 * 1024)
.weigher(|_k, v: &ArcBlock| {
.weigher(|_k, v: &Web3ProxyBlock| {
// TODO: is this good enough?
1 + v.transactions.len().try_into().unwrap_or(u32::MAX)
1 + v.block.transactions.len().try_into().unwrap_or(u32::MAX)
})
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default());
@ -577,6 +577,8 @@ impl Web3ProxyApp {
top_config.app.chain_id,
db_conn.clone(),
http_client.clone(),
top_config.app.max_block_age,
top_config.app.max_block_lag,
top_config.app.min_synced_rpcs,
top_config.app.min_sum_soft_limit,
pending_transactions.clone(),
@ -603,6 +605,9 @@ impl Web3ProxyApp {
top_config.app.chain_id,
db_conn.clone(),
http_client.clone(),
// private rpcs don't get subscriptions, so no need for max_block_age or max_block_lag
None,
None,
0,
0,
pending_transactions.clone(),
@ -735,7 +740,7 @@ impl Web3ProxyApp {
Ok((app, cancellable_handles, important_background_handles).into())
}
pub fn head_block_receiver(&self) -> watch::Receiver<ArcBlock> {
pub fn head_block_receiver(&self) -> watch::Receiver<Web3ProxyBlock> {
self.watch_consensus_head_receiver.clone()
}
@ -1481,7 +1486,7 @@ impl Web3ProxyApp {
.await?;
Some(ResponseCacheKey {
from_block: Some(SavedBlock::new(request_block)),
from_block: Some(request_block),
to_block: None,
method: method.to_string(),
// TODO: hash here?
@ -1521,8 +1526,8 @@ impl Web3ProxyApp {
.await?;
Some(ResponseCacheKey {
from_block: Some(SavedBlock::new(from_block)),
to_block: Some(SavedBlock::new(to_block)),
from_block: Some(from_block),
to_block: Some(to_block),
method: method.to_string(),
// TODO: hash here?
params: request.params.clone(),
@ -1537,8 +1542,8 @@ impl Web3ProxyApp {
let authorization = authorization.clone();
if let Some(cache_key) = cache_key {
let from_block_num = cache_key.from_block.as_ref().map(|x| x.number());
let to_block_num = cache_key.to_block.as_ref().map(|x| x.number());
let from_block_num = cache_key.from_block.as_ref().map(|x| *x.number());
let to_block_num = cache_key.to_block.as_ref().map(|x| *x.number());
self.response_cache
.try_get_with(cache_key, async move {

@ -72,7 +72,7 @@ impl Web3ProxyApp {
"params": {
"subscription": subscription_id,
// TODO: option to include full transaction objects instead of just the hashes?
"result": new_head.as_ref(),
"result": new_head.block,
},
});

@ -80,12 +80,7 @@ pub async fn clean_block_number(
.context("fetching block number from hash")?;
// TODO: set change to true? i think not we should probably use hashes for everything.
(
block
.number
.expect("blocks here should always have numbers"),
false,
)
(*block.number(), false)
} else {
return Err(anyhow::anyhow!("blockHash missing"));
}

@ -1,9 +1,9 @@
use crate::rpcs::blockchain::BlockHashesCache;
use crate::app::AnyhowJoinHandle;
use crate::rpcs::blockchain::{BlockHashesCache, Web3ProxyBlock};
use crate::rpcs::one::Web3Rpc;
use crate::{app::AnyhowJoinHandle, rpcs::blockchain::ArcBlock};
use argh::FromArgs;
use ethers::prelude::TxHash;
use ethers::types::U256;
use ethers::types::{U256, U64};
use hashbrown::HashMap;
use log::warn;
use migration::sea_orm::DatabaseConnection;
@ -11,7 +11,7 @@ use serde::Deserialize;
use std::sync::Arc;
use tokio::sync::broadcast;
pub type BlockAndRpc = (Option<ArcBlock>, Arc<Web3Rpc>);
pub type BlockAndRpc = (Option<Web3ProxyBlock>, Arc<Web3Rpc>);
pub type TxHashAndRpc = (TxHash, Arc<Web3Rpc>);
#[derive(Debug, FromArgs)]
@ -105,6 +105,12 @@ pub struct AppConfig {
pub invite_code: Option<String>,
pub login_domain: Option<String>,
/// do not serve any requests if the best known block is older than this many seconds.
pub max_block_age: Option<u64>,
/// do not serve any requests if the best known block is behind the best known block by more than this many blocks.
pub max_block_lag: Option<U64>,
/// Rate limit for bearer token authenticated entrypoints.
/// This is separate from the rpc limits.
#[serde(default = "default_bearer_token_max_concurrent_requests")]

@ -1,16 +1,14 @@
use super::consensus::ConsensusFinder;
use super::many::Web3Rpcs;
///! Keep track of the blockchain as seen by a Web3Rpcs.
use super::one::Web3Rpc;
use super::transactions::TxStatus;
use crate::frontend::authorization::Authorization;
use crate::{
config::BlockAndRpc, jsonrpc::JsonRpcRequest, rpcs::synced_connections::ConsensusWeb3Rpcs,
};
use crate::{config::BlockAndRpc, jsonrpc::JsonRpcRequest};
use anyhow::Context;
use derive_more::From;
use ethers::prelude::{Block, TxHash, H256, U64};
use hashbrown::{HashMap, HashSet};
use log::{debug, error, warn, Level};
use log::{debug, error, trace, warn, Level};
use moka::future::Cache;
use serde::Serialize;
use serde_json::json;
@ -22,17 +20,18 @@ use tokio::time::Duration;
// TODO: type for Hydrated Blocks with their full transactions?
pub type ArcBlock = Arc<Block<TxHash>>;
pub type BlockHashesCache = Cache<H256, ArcBlock, hashbrown::hash_map::DefaultHashBuilder>;
pub type BlockHashesCache = Cache<H256, Web3ProxyBlock, hashbrown::hash_map::DefaultHashBuilder>;
/// A block and its age.
#[derive(Clone, Debug, Default, From, Serialize)]
pub struct SavedBlock {
pub struct Web3ProxyBlock {
pub block: ArcBlock,
/// number of seconds this block was behind the current time when received
pub age: u64,
/// this is only set if the block is from a subscription
pub received_age: Option<u64>,
}
impl PartialEq for SavedBlock {
impl PartialEq for Web3ProxyBlock {
fn eq(&self, other: &Self) -> bool {
match (self.block.hash, other.block.hash) {
(None, None) => true,
@ -43,18 +42,23 @@ impl PartialEq for SavedBlock {
}
}
impl SavedBlock {
impl Web3ProxyBlock {
/// A new block has arrived over a subscription
pub fn new(block: ArcBlock) -> Self {
let mut x = Self { block, age: 0 };
let mut x = Self {
block,
received_age: None,
};
// no need to recalulate lag every time
// if the head block gets too old, a health check restarts this connection
x.age = x.lag();
// TODO: emit a stat for received_age
x.received_age = Some(x.age());
x
}
pub fn lag(&self) -> u64 {
pub fn age(&self) -> u64 {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("there should always be time");
@ -70,37 +74,58 @@ impl SavedBlock {
}
}
pub fn hash(&self) -> H256 {
self.block.hash.expect("saved blocks must have a hash")
#[inline(always)]
pub fn parent_hash(&self) -> &H256 {
&self.block.parent_hash
}
// TODO: return as U64 or u64?
pub fn number(&self) -> U64 {
self.block.number.expect("saved blocks must have a number")
#[inline(always)]
pub fn hash(&self) -> &H256 {
self.block
.hash
.as_ref()
.expect("saved blocks must have a hash")
}
#[inline(always)]
pub fn number(&self) -> &U64 {
self.block
.number
.as_ref()
.expect("saved blocks must have a number")
}
}
impl From<ArcBlock> for SavedBlock {
impl From<ArcBlock> for Web3ProxyBlock {
fn from(x: ArcBlock) -> Self {
SavedBlock::new(x)
Web3ProxyBlock {
block: x,
received_age: None,
}
}
}
impl Display for SavedBlock {
impl Display for Web3ProxyBlock {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{} ({}, {}s old)", self.number(), self.hash(), self.age)
write!(
f,
"{} ({}, {}s old)",
self.number(),
self.hash(),
self.age()
)
}
}
impl Web3Rpcs {
/// add a block to our mappings and track the heaviest chain
pub async fn save_block(
pub async fn try_cache_block(
&self,
block: ArcBlock,
block: Web3ProxyBlock,
heaviest_chain: bool,
) -> anyhow::Result<ArcBlock> {
) -> anyhow::Result<Web3ProxyBlock> {
// TODO: i think we can rearrange this function to make it faster on the hot path
let block_hash = block.hash.as_ref().context("no block hash")?;
let block_hash = block.hash();
// skip Block::default()
if block_hash.is_zero() {
@ -108,7 +133,7 @@ impl Web3Rpcs {
return Ok(block);
}
let block_num = block.number.as_ref().context("no block num")?;
let block_num = block.number();
// TODO: think more about heaviest_chain. would be better to do the check inside this function
if heaviest_chain {
@ -136,7 +161,7 @@ impl Web3Rpcs {
authorization: &Arc<Authorization>,
hash: &H256,
rpc: Option<&Arc<Web3Rpc>>,
) -> anyhow::Result<ArcBlock> {
) -> anyhow::Result<Web3ProxyBlock> {
// first, try to get the hash from our cache
// the cache is set last, so if its here, its everywhere
// TODO: use try_get_with
@ -147,17 +172,18 @@ impl Web3Rpcs {
// block not in cache. we need to ask an rpc for it
let get_block_params = (*hash, false);
// TODO: if error, retry?
let block: ArcBlock = match rpc {
let block: Web3ProxyBlock = match rpc {
Some(rpc) => rpc
.wait_for_request_handle(authorization, Some(Duration::from_secs(30)), None)
.await?
.request::<_, Option<_>>(
.request::<_, Option<ArcBlock>>(
"eth_getBlockByHash",
&json!(get_block_params),
Level::Error.into(),
None,
)
.await?
.map(Into::into)
.context("no block!")?,
None => {
// TODO: helper for method+params => JsonRpcRequest
@ -181,13 +207,14 @@ impl Web3Rpcs {
let block: Option<ArcBlock> = serde_json::from_str(block.get())?;
block.context("no block!")?
// TODO: from isn't great here. received time is going to be weird
block.map(Into::into).context("no block!")?
}
};
// the block was fetched using eth_getBlockByHash, so it should have all fields
// TODO: fill in heaviest_chain! if the block is old enough, is this definitely true?
let block = self.save_block(block, false).await?;
let block = self.try_cache_block(block, false).await?;
Ok(block)
}
@ -200,7 +227,7 @@ impl Web3Rpcs {
) -> anyhow::Result<(H256, u64)> {
let (block, block_depth) = self.cannonical_block(authorization, num).await?;
let hash = block.hash.expect("Saved blocks should always have hashes");
let hash = *block.hash();
Ok((hash, block_depth))
}
@ -211,7 +238,7 @@ impl Web3Rpcs {
&self,
authorization: &Arc<Authorization>,
num: &U64,
) -> anyhow::Result<(ArcBlock, u64)> {
) -> anyhow::Result<(Web3ProxyBlock, u64)> {
// we only have blocks by hash now
// maybe save them during save_block in a blocks_by_number Cache<U64, Vec<ArcBlock>>
// if theres multiple, use petgraph to find the one on the main chain (and remove the others if they have enough confirmations)
@ -223,28 +250,21 @@ impl Web3Rpcs {
.clone();
// be sure the requested block num exists
let mut head_block_num = consensus_head_receiver.borrow_and_update().number;
// TODO: is this okay? what if we aren't synced?!
let mut head_block_num = *consensus_head_receiver.borrow_and_update().number();
loop {
if let Some(head_block_num) = head_block_num {
if num <= &head_block_num {
break;
}
if num <= &head_block_num {
break;
}
trace!("waiting for future block {} > {}", num, head_block_num);
consensus_head_receiver.changed().await?;
head_block_num = consensus_head_receiver.borrow_and_update().number;
head_block_num = *consensus_head_receiver.borrow_and_update().number();
}
let head_block_num =
head_block_num.expect("we should only get here if we have a head block");
let block_depth = if num >= &head_block_num {
0
} else {
(head_block_num - num).as_u64()
};
let block_depth = (head_block_num - num).as_u64();
// try to get the hash from our cache
// deref to not keep the lock open
@ -276,8 +296,10 @@ impl Web3Rpcs {
let block: ArcBlock = serde_json::from_str(raw_block.get())?;
let block = Web3ProxyBlock::from(block);
// the block was fetched using eth_getBlockByNumber, so it should have all fields and be on the heaviest chain
let block = self.save_block(block, true).await?;
let block = self.try_cache_block(block, true).await?;
Ok((block, block_depth))
}
@ -288,18 +310,16 @@ impl Web3Rpcs {
block_receiver: flume::Receiver<BlockAndRpc>,
// TODO: document that this is a watch sender and not a broadcast! if things get busy, blocks might get missed
// Geth's subscriptions have the same potential for skipping blocks.
head_block_sender: watch::Sender<ArcBlock>,
head_block_sender: watch::Sender<Web3ProxyBlock>,
pending_tx_sender: Option<broadcast::Sender<TxStatus>>,
) -> anyhow::Result<()> {
// TODO: indexmap or hashmap? what hasher? with_capacity?
// TODO: this will grow unbounded. prune old heads on this at the same time we prune the graph?
let mut connection_heads = ConsensusFinder::default();
let mut connection_heads = ConsensusFinder::new(self.max_block_age, self.max_block_lag);
loop {
match block_receiver.recv_async().await {
Ok((new_block, rpc)) => {
let new_block = new_block.map(Into::into);
let rpc_name = rpc.name.clone();
if let Err(err) = self
@ -313,7 +333,7 @@ impl Web3Rpcs {
)
.await
{
warn!("unable to process block from rpc {}: {:?}", rpc_name, err);
warn!("unable to process block from rpc {}: {:#?}", rpc_name, err);
}
}
Err(err) => {
@ -331,60 +351,72 @@ impl Web3Rpcs {
&self,
authorization: &Arc<Authorization>,
consensus_finder: &mut ConsensusFinder,
rpc_head_block: Option<SavedBlock>,
rpc_head_block: Option<Web3ProxyBlock>,
rpc: Arc<Web3Rpc>,
head_block_sender: &watch::Sender<ArcBlock>,
head_block_sender: &watch::Sender<Web3ProxyBlock>,
pending_tx_sender: &Option<broadcast::Sender<TxStatus>>,
) -> anyhow::Result<()> {
// TODO: how should we handle an error here?
if !consensus_finder
.update_rpc(rpc_head_block.clone(), rpc.clone(), self)
.await?
.await
.context("failed to update rpc")?
{
// nothing changed. no need
// nothing changed. no need to scan for a new consensus head
return Ok(());
}
let new_synced_connections = consensus_finder
.best_consensus_connections(authorization, self)
.await;
.await
.context("no consensus head block!")
.map_err(|err| {
self.watch_consensus_rpcs_sender
.send_replace(Arc::new(Default::default()));
err
})?;
// TODO: what should we do if the block number of new_synced_connections is < old_synced_connections? wait?
let includes_backups = new_synced_connections.includes_backups;
let backups_needed = new_synced_connections.backups_needed;
let consensus_head_block = new_synced_connections.head_block.clone();
let num_consensus_rpcs = new_synced_connections.num_conns();
let num_checked_rpcs = new_synced_connections.num_checked_conns;
let num_active_rpcs = consensus_finder.all.rpc_name_to_hash.len();
let num_checked_rpcs = 0; // TODO: figure this out
let num_active_rpcs = consensus_finder
.all_rpcs_group()
.map(|x| x.len())
.unwrap_or_default();
let total_rpcs = self.conns.len();
let old_consensus_head_connections = self
.watch_consensus_connections_sender
.watch_consensus_rpcs_sender
.send_replace(Arc::new(new_synced_connections));
let includes_backups_str = if includes_backups { "B " } else { "" };
let backups_voted_str = if backups_needed { "B " } else { "" };
if let Some(consensus_saved_block) = consensus_head_block {
if let Some(consensus_head_block) = consensus_head_block {
match &old_consensus_head_connections.head_block {
None => {
debug!(
"first {}{}/{}/{}/{} block={}, rpc={}",
includes_backups_str,
backups_voted_str,
num_consensus_rpcs,
num_checked_rpcs,
num_active_rpcs,
total_rpcs,
consensus_saved_block,
consensus_head_block,
rpc,
);
if includes_backups {
if backups_needed {
// TODO: what else should be in this error?
warn!("Backup RPCs are in use!");
}
// this should already be cached
let consensus_head_block =
self.save_block(consensus_saved_block.block, true).await?;
self.try_cache_block(consensus_head_block, true).await?;
head_block_sender
.send(consensus_head_block)
@ -396,46 +428,45 @@ impl Web3Rpcs {
.map(|x| x.to_string())
.unwrap_or_else(|| "None".to_string());
match consensus_saved_block.number().cmp(&old_head_block.number()) {
match consensus_head_block.number().cmp(&old_head_block.number()) {
Ordering::Equal => {
// multiple blocks with the same fork!
if consensus_saved_block.hash() == old_head_block.hash() {
if consensus_head_block.hash() == old_head_block.hash() {
// no change in hash. no need to use head_block_sender
// TODO: trace level if rpc is backup
debug!(
"con {}{}/{}/{}/{} con={} rpc={}@{}",
includes_backups_str,
backups_voted_str,
num_consensus_rpcs,
num_checked_rpcs,
num_active_rpcs,
total_rpcs,
consensus_saved_block,
consensus_head_block,
rpc,
rpc_head_str,
)
} else {
// hash changed
if includes_backups {
if backups_needed {
// TODO: what else should be in this error?
warn!("Backup RPCs are in use!");
}
debug!(
"unc {}{}/{}/{}/{} con_head={} old={} rpc={}@{}",
includes_backups_str,
backups_voted_str,
num_consensus_rpcs,
num_checked_rpcs,
num_active_rpcs,
total_rpcs,
consensus_saved_block,
consensus_head_block,
old_head_block,
rpc,
rpc_head_str,
);
let consensus_head_block = self
.save_block(consensus_saved_block.block, true)
.try_cache_block(consensus_head_block, true)
.await
.context("save consensus_head_block as heaviest chain")?;
@ -449,25 +480,25 @@ impl Web3Rpcs {
// TODO: better log
warn!(
"chain rolled back {}{}/{}/{}/{} con={} old={} rpc={}@{}",
includes_backups_str,
backups_voted_str,
num_consensus_rpcs,
num_checked_rpcs,
num_active_rpcs,
total_rpcs,
consensus_saved_block,
consensus_head_block,
old_head_block,
rpc,
rpc_head_str,
);
if includes_backups {
if backups_needed {
// TODO: what else should be in this error?
warn!("Backup RPCs are in use!");
}
// TODO: tell save_block to remove any higher block numbers from the cache. not needed because we have other checks on requested blocks being > head, but still seems like a good idea
let consensus_head_block = self
.save_block(consensus_saved_block.block, true)
.try_cache_block(consensus_head_block, true)
.await
.context(
"save_block sending consensus_head_block as heaviest chain",
@ -480,23 +511,23 @@ impl Web3Rpcs {
Ordering::Greater => {
debug!(
"new {}{}/{}/{}/{} con={} rpc={}@{}",
includes_backups_str,
backups_voted_str,
num_consensus_rpcs,
num_checked_rpcs,
num_active_rpcs,
total_rpcs,
consensus_saved_block,
consensus_head_block,
rpc,
rpc_head_str,
);
if includes_backups {
if backups_needed {
// TODO: what else should be in this error?
warn!("Backup RPCs are in use!");
}
let consensus_head_block =
self.save_block(consensus_saved_block.block, true).await?;
self.try_cache_block(consensus_head_block, true).await?;
head_block_sender.send(consensus_head_block)?;
}
@ -512,7 +543,7 @@ impl Web3Rpcs {
if num_checked_rpcs >= self.min_head_rpcs {
error!(
"non {}{}/{}/{}/{} rpc={}@{}",
includes_backups_str,
backups_voted_str,
num_consensus_rpcs,
num_checked_rpcs,
num_active_rpcs,
@ -523,7 +554,7 @@ impl Web3Rpcs {
} else {
debug!(
"non {}{}/{}/{}/{} rpc={}@{}",
includes_backups_str,
backups_voted_str,
num_consensus_rpcs,
num_checked_rpcs,
num_active_rpcs,
@ -537,403 +568,3 @@ impl Web3Rpcs {
Ok(())
}
}
struct ConnectionsGroup {
/// TODO: this group might not actually include backups, but they were at leastchecked
includes_backups: bool,
rpc_name_to_hash: HashMap<String, H256>,
}
impl ConnectionsGroup {
fn new(with_backups: bool) -> Self {
Self {
includes_backups: with_backups,
rpc_name_to_hash: Default::default(),
}
}
fn without_backups() -> Self {
Self::new(false)
}
fn with_backups() -> Self {
Self::new(true)
}
fn remove(&mut self, rpc: &Web3Rpc) -> Option<H256> {
self.rpc_name_to_hash.remove(rpc.name.as_str())
}
fn insert(&mut self, rpc: &Web3Rpc, block_hash: H256) -> Option<H256> {
self.rpc_name_to_hash.insert(rpc.name.clone(), block_hash)
}
// TODO: i don't love having this here. move to web3_connections?
async fn get_block_from_rpc(
&self,
rpc_name: &str,
hash: &H256,
authorization: &Arc<Authorization>,
web3_rpcs: &Web3Rpcs,
) -> anyhow::Result<ArcBlock> {
// // TODO: why does this happen?!?! seems to only happen with uncled blocks
// // TODO: maybe we should do try_get_with?
// // TODO: maybe we should just continue. this only seems to happen when an older block is received
// warn!(
// "Missing connection_head_block in block_hashes. Fetching now. hash={}. other={}",
// connection_head_hash, conn_name
// );
// this option should almost always be populated. if the connection reconnects at a bad time it might not be available though
// TODO: if this is None, I think we should error.
let rpc = web3_rpcs.conns.get(rpc_name);
web3_rpcs.block(authorization, hash, rpc).await
}
// TODO: do this during insert/remove?
pub(self) async fn highest_block(
&self,
authorization: &Arc<Authorization>,
web3_rpcs: &Web3Rpcs,
) -> Option<ArcBlock> {
let mut checked_heads = HashSet::with_capacity(self.rpc_name_to_hash.len());
let mut highest_block = None::<ArcBlock>;
for (rpc_name, rpc_head_hash) in self.rpc_name_to_hash.iter() {
// don't waste time checking the same hash multiple times
if checked_heads.contains(rpc_head_hash) {
continue;
}
let rpc_block = match self
.get_block_from_rpc(rpc_name, rpc_head_hash, authorization, web3_rpcs)
.await
{
Ok(x) => x,
Err(err) => {
warn!(
"failed getting block {} from {} while finding highest block number: {:?}",
rpc_head_hash, rpc_name, err,
);
continue;
}
};
checked_heads.insert(rpc_head_hash);
// if this is the first block we've tried
// or if this rpc's newest block has a higher number
// we used to check total difficulty, but that isn't a thing anymore on ETH
// TODO: we still need total difficulty on some other PoW chains. whats annoying is it isn't considered part of the "block header" just the block. so websockets don't return it
let highest_num = highest_block
.as_ref()
.map(|x| x.number.expect("blocks here should always have a number"));
let rpc_num = rpc_block.as_ref().number;
if rpc_num > highest_num {
highest_block = Some(rpc_block);
}
}
highest_block
}
pub(self) async fn consensus_head_connections(
&self,
authorization: &Arc<Authorization>,
web3_rpcs: &Web3Rpcs,
) -> anyhow::Result<ConsensusWeb3Rpcs> {
let mut maybe_head_block = match self.highest_block(authorization, web3_rpcs).await {
None => return Err(anyhow::anyhow!("No blocks known")),
Some(x) => x,
};
let num_known = self.rpc_name_to_hash.len();
// track rpcs on this heaviest chain so we can build a new ConsensusConnections
let mut highest_rpcs = HashSet::<&str>::new();
// a running total of the soft limits covered by the rpcs that agree on the head block
let mut highest_rpcs_sum_soft_limit: u32 = 0;
// TODO: also track highest_rpcs_sum_hard_limit? llama doesn't need this, so it can wait
// check the highest work block for a set of rpcs that can serve our request load
// if it doesn't have enough rpcs for our request load, check the parent block
// TODO: loop for how many parent blocks? we don't want to serve blocks that are too far behind. probably different per chain
// TODO: this loop is pretty long. any way to clean up this code?
for _ in 0..6 {
let maybe_head_hash = maybe_head_block
.hash
.as_ref()
.expect("blocks here always need hashes");
// find all rpcs with maybe_head_block as their current head
for (rpc_name, rpc_head_hash) in self.rpc_name_to_hash.iter() {
if rpc_head_hash != maybe_head_hash {
// connection is not on the desired block
continue;
}
if highest_rpcs.contains(rpc_name.as_str()) {
// connection is on a child block
continue;
}
if let Some(rpc) = web3_rpcs.conns.get(rpc_name.as_str()) {
highest_rpcs.insert(rpc_name);
highest_rpcs_sum_soft_limit += rpc.soft_limit;
} else {
// i don't think this is an error. i think its just if a reconnect is currently happening
warn!("connection missing: {}", rpc_name);
debug!("web3_rpcs.conns: {:#?}", web3_rpcs.conns);
}
}
if highest_rpcs_sum_soft_limit >= web3_rpcs.min_sum_soft_limit
&& highest_rpcs.len() >= web3_rpcs.min_head_rpcs
{
// we have enough servers with enough requests
break;
}
// not enough rpcs yet. check the parent block
if let Some(parent_block) = web3_rpcs.block_hashes.get(&maybe_head_block.parent_hash) {
// trace!(
// child=%maybe_head_hash, parent=%parent_block.hash.unwrap(), "avoiding thundering herd",
// );
maybe_head_block = parent_block;
continue;
} else {
if num_known < web3_rpcs.min_head_rpcs {
return Err(anyhow::anyhow!(
"not enough rpcs connected: {}/{}/{}",
highest_rpcs.len(),
num_known,
web3_rpcs.min_head_rpcs,
));
} else {
let soft_limit_percent = (highest_rpcs_sum_soft_limit as f32
/ web3_rpcs.min_sum_soft_limit as f32)
* 100.0;
return Err(anyhow::anyhow!(
"ran out of parents to check. rpcs {}/{}/{}. soft limit: {:.2}% ({}/{})",
highest_rpcs.len(),
num_known,
web3_rpcs.min_head_rpcs,
highest_rpcs_sum_soft_limit,
web3_rpcs.min_sum_soft_limit,
soft_limit_percent,
));
}
}
}
// TODO: if consensus_head_rpcs.is_empty, try another method of finding the head block. will need to change the return Err above into breaks.
// we've done all the searching for the heaviest block that we can
if highest_rpcs.len() < web3_rpcs.min_head_rpcs
|| highest_rpcs_sum_soft_limit < web3_rpcs.min_sum_soft_limit
{
// if we get here, not enough servers are synced. return an error
let soft_limit_percent =
(highest_rpcs_sum_soft_limit as f32 / web3_rpcs.min_sum_soft_limit as f32) * 100.0;
return Err(anyhow::anyhow!(
"Not enough resources. rpcs {}/{}/{}. soft limit: {:.2}% ({}/{})",
highest_rpcs.len(),
num_known,
web3_rpcs.min_head_rpcs,
highest_rpcs_sum_soft_limit,
web3_rpcs.min_sum_soft_limit,
soft_limit_percent,
));
}
// success! this block has enough soft limit and nodes on it (or on later blocks)
let conns: Vec<Arc<Web3Rpc>> = highest_rpcs
.into_iter()
.filter_map(|conn_name| web3_rpcs.conns.get(conn_name).cloned())
.collect();
// TODO: DEBUG only check
let _ = maybe_head_block
.hash
.expect("head blocks always have hashes");
let _ = maybe_head_block
.number
.expect("head blocks always have numbers");
let consensus_head_block: SavedBlock = maybe_head_block.into();
Ok(ConsensusWeb3Rpcs {
head_block: Some(consensus_head_block),
conns,
num_checked_conns: self.rpc_name_to_hash.len(),
includes_backups: self.includes_backups,
})
}
}
/// A ConsensusConnections builder that tracks all connection heads across multiple groups of servers
pub struct ConsensusFinder {
/// only main servers
main: ConnectionsGroup,
/// main and backup servers
all: ConnectionsGroup,
}
impl Default for ConsensusFinder {
fn default() -> Self {
Self {
main: ConnectionsGroup::without_backups(),
all: ConnectionsGroup::with_backups(),
}
}
}
impl ConsensusFinder {
fn remove(&mut self, rpc: &Web3Rpc) -> Option<H256> {
// TODO: should we have multiple backup tiers? (remote datacenters vs third party)
if !rpc.backup {
self.main.remove(rpc);
}
self.all.remove(rpc)
}
fn insert(&mut self, rpc: &Web3Rpc, new_hash: H256) -> Option<H256> {
// TODO: should we have multiple backup tiers? (remote datacenters vs third party)
if !rpc.backup {
self.main.insert(rpc, new_hash);
}
self.all.insert(rpc, new_hash)
}
/// Update our tracking of the rpc and return true if something changed
async fn update_rpc(
&mut self,
rpc_head_block: Option<SavedBlock>,
rpc: Arc<Web3Rpc>,
// we need this so we can save the block to caches. i don't like it though. maybe we should use a lazy_static Cache wrapper that has a "save_block" method?. i generally dislike globals but i also dislike all the types having to pass eachother around
web3_connections: &Web3Rpcs,
) -> anyhow::Result<bool> {
// add the rpc's block to connection_heads, or remove the rpc from connection_heads
let changed = match rpc_head_block {
Some(mut rpc_head_block) => {
// we don't know if its on the heaviest chain yet
rpc_head_block.block = web3_connections
.save_block(rpc_head_block.block, false)
.await?;
// we used to remove here if the block was too far behind. but it just made things more complicated
let rpc_head_hash = rpc_head_block.hash();
if let Some(prev_hash) = self.insert(&rpc, rpc_head_hash) {
if prev_hash == rpc_head_hash {
// this block was already sent by this rpc. return early
false
} else {
// new block for this rpc
true
}
} else {
// first block for this rpc
true
}
}
None => {
if self.remove(&rpc).is_none() {
// this rpc was already removed
false
} else {
// rpc head changed from being synced to not
true
}
}
};
Ok(changed)
}
// TODO: this could definitely be cleaner. i don't like the error handling/unwrapping
async fn best_consensus_connections(
&mut self,
authorization: &Arc<Authorization>,
web3_connections: &Web3Rpcs,
) -> ConsensusWeb3Rpcs {
let highest_block_num = match self
.all
.highest_block(authorization, web3_connections)
.await
{
None => {
return ConsensusWeb3Rpcs::default();
}
Some(x) => x.number.expect("blocks here should always have a number"),
};
// TODO: also needs to be not less than our current head
let mut min_block_num = highest_block_num.saturating_sub(U64::from(5));
// we also want to be sure we don't ever go backwards!
if let Some(current_consensus_head_num) = web3_connections.head_block_num() {
min_block_num = min_block_num.max(current_consensus_head_num);
}
// TODO: pass `min_block_num` to consensus_head_connections?
let consensus_head_for_main = self
.main
.consensus_head_connections(authorization, web3_connections)
.await
.map_err(|err| err.context("cannot use main group"));
let consensus_num_for_main = consensus_head_for_main
.as_ref()
.ok()
.map(|x| x.head_block.as_ref().unwrap().number());
if let Some(consensus_num_for_main) = consensus_num_for_main {
if consensus_num_for_main >= min_block_num {
return consensus_head_for_main.unwrap();
}
}
// TODO: pass `min_block_num` to consensus_head_connections?
let consensus_connections_for_all = match self
.all
.consensus_head_connections(authorization, web3_connections)
.await
{
Err(err) => {
if self.all.rpc_name_to_hash.len() < web3_connections.min_head_rpcs {
debug!("No consensus head yet: {}", err);
}
return ConsensusWeb3Rpcs::default();
}
Ok(x) => x,
};
let consensus_num_for_all = consensus_connections_for_all
.head_block
.as_ref()
.map(|x| x.number());
if consensus_num_for_all > consensus_num_for_main {
if consensus_num_for_all < Some(min_block_num) {
// TODO: this should have an alarm in sentry
error!("CONSENSUS HEAD w/ BACKUP NODES IS VERY OLD!");
}
consensus_connections_for_all
} else {
if let Ok(x) = consensus_head_for_main {
error!("CONSENSUS HEAD IS VERY OLD! Backup RPCs did not improve this situation");
x
} else {
// TODO: i don't think we need this error. and i doublt we'll ever even get here
error!("NO CONSENSUS HEAD!");
ConsensusWeb3Rpcs::default()
}
}
}
}

@ -0,0 +1,527 @@
use crate::frontend::authorization::Authorization;
use super::blockchain::Web3ProxyBlock;
use super::many::Web3Rpcs;
use super::one::Web3Rpc;
use ethers::prelude::{H256, U64};
use hashbrown::{HashMap, HashSet};
use log::{debug, trace, warn};
use serde::Serialize;
use std::collections::BTreeMap;
use std::fmt;
use std::sync::Arc;
/// A collection of Web3Rpcs that are on the same block.
/// Serialize is so we can print it on our debug endpoint
#[derive(Clone, Default, Serialize)]
pub struct ConsensusWeb3Rpcs {
pub(super) head_block: Option<Web3ProxyBlock>,
// TODO: this should be able to serialize, but it isn't
#[serde(skip_serializing)]
pub(super) conns: Vec<Arc<Web3Rpc>>,
pub(super) backups_voted: Option<Web3ProxyBlock>,
pub(super) backups_needed: bool,
}
impl ConsensusWeb3Rpcs {
pub fn num_conns(&self) -> usize {
self.conns.len()
}
pub fn sum_soft_limit(&self) -> u32 {
self.conns.iter().fold(0, |sum, rpc| sum + rpc.soft_limit)
}
// TODO: sum_hard_limit?
}
impl fmt::Debug for ConsensusWeb3Rpcs {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// TODO: the default formatter takes forever to write. this is too quiet though
// TODO: print the actual conns?
f.debug_struct("ConsensusConnections")
.field("head_block", &self.head_block)
.field("num_conns", &self.conns.len())
.finish_non_exhaustive()
}
}
impl Web3Rpcs {
// TODO: return a ref?
pub fn head_block(&self) -> Option<Web3ProxyBlock> {
self.watch_consensus_head_receiver
.as_ref()
.map(|x| x.borrow().clone())
}
// TODO: return a ref?
pub fn head_block_hash(&self) -> Option<H256> {
self.head_block().map(|x| *x.hash())
}
// TODO: return a ref?
pub fn head_block_num(&self) -> Option<U64> {
self.head_block().map(|x| *x.number())
}
pub fn synced(&self) -> bool {
!self.watch_consensus_rpcs_sender.borrow().conns.is_empty()
}
pub fn num_synced_rpcs(&self) -> usize {
self.watch_consensus_rpcs_sender.borrow().conns.len()
}
}
pub struct ConnectionsGroup {
rpc_name_to_block: HashMap<String, Web3ProxyBlock>,
// TODO: what if there are two blocks with the same number?
highest_block: Option<Web3ProxyBlock>,
}
impl Default for ConnectionsGroup {
fn default() -> Self {
Self {
rpc_name_to_block: Default::default(),
highest_block: Default::default(),
}
}
}
impl ConnectionsGroup {
pub fn len(&self) -> usize {
self.rpc_name_to_block.len()
}
fn remove(&mut self, rpc_name: &str) -> Option<Web3ProxyBlock> {
if let Some(removed_block) = self.rpc_name_to_block.remove(rpc_name) {
match self.highest_block.as_mut() {
None => {}
Some(current_highest_block) => {
if removed_block.hash() == current_highest_block.hash() {
for maybe_highest_block in self.rpc_name_to_block.values() {
if maybe_highest_block.number() > current_highest_block.number() {
*current_highest_block = maybe_highest_block.clone();
};
}
}
}
}
Some(removed_block)
} else {
None
}
}
fn insert(&mut self, rpc: &Web3Rpc, block: Web3ProxyBlock) -> Option<Web3ProxyBlock> {
// TODO: what about a reorg to the same height?
if Some(block.number()) > self.highest_block.as_ref().map(|x| x.number()) {
self.highest_block = Some(block.clone());
}
self.rpc_name_to_block.insert(rpc.name.clone(), block)
}
// // TODO: do this during insert/remove?
// pub(self) async fn highest_block(
// &self,
// authorization: &Arc<Authorization>,
// web3_rpcs: &Web3Rpcs,
// ) -> Option<ArcBlock> {
// let mut checked_heads = HashSet::with_capacity(self.rpc_name_to_hash.len());
// let mut highest_block = None::<ArcBlock>;
// for (rpc_name, rpc_head_hash) in self.rpc_name_to_hash.iter() {
// // don't waste time checking the same hash multiple times
// if checked_heads.contains(rpc_head_hash) {
// continue;
// }
// let rpc_block = match web3_rpcs
// .get_block_from_rpc(rpc_name, rpc_head_hash, authorization)
// .await
// {
// Ok(x) => x,
// Err(err) => {
// warn!(
// "failed getting block {} from {} while finding highest block number: {:?}",
// rpc_head_hash, rpc_name, err,
// );
// continue;
// }
// };
// checked_heads.insert(rpc_head_hash);
// // if this is the first block we've tried
// // or if this rpc's newest block has a higher number
// // we used to check total difficulty, but that isn't a thing anymore on ETH
// // TODO: we still need total difficulty on some other PoW chains. whats annoying is it isn't considered part of the "block header" just the block. so websockets don't return it
// let highest_num = highest_block
// .as_ref()
// .map(|x| x.number.expect("blocks here should always have a number"));
// let rpc_num = rpc_block.as_ref().number;
// if rpc_num > highest_num {
// highest_block = Some(rpc_block);
// }
// }
// highest_block
// }
/// min_consensus_block_num keeps us from ever going backwards.
/// TODO: think about min_consensus_block_num more. i think this might cause an outage if the chain is doing weird things. but 503s is probably better than broken data.
pub(self) async fn consensus_head_connections(
&self,
authorization: &Arc<Authorization>,
web3_rpcs: &Web3Rpcs,
min_consensus_block_num: Option<U64>,
) -> anyhow::Result<ConsensusWeb3Rpcs> {
let mut maybe_head_block = match self.highest_block.clone() {
None => return Err(anyhow::anyhow!("no blocks known")),
Some(x) => x,
};
// TODO: take max_distance_consensus_to_highest as an argument?
// TODO: what if someone's backup node is misconfigured and goes on a really fast forked chain?
let max_lag_consensus_to_highest =
if let Some(min_consensus_block_num) = min_consensus_block_num {
maybe_head_block
.number()
.saturating_sub(min_consensus_block_num)
.as_u64()
} else {
// TODO: get from app config? different chains probably should have different values. 10 is probably too much
10
};
let num_known = self.rpc_name_to_block.len();
if num_known < web3_rpcs.min_head_rpcs {
return Err(anyhow::anyhow!(
"not enough rpcs connected: {}/{}",
num_known,
web3_rpcs.min_head_rpcs,
));
}
let mut primary_rpcs_voted: Option<Web3ProxyBlock> = None;
let mut backup_rpcs_voted: Option<Web3ProxyBlock> = None;
// track rpcs on this heaviest chain so we can build a new ConsensusConnections
let mut primary_consensus_rpcs = HashSet::<&str>::new();
let mut backup_consensus_rpcs = HashSet::<&str>::new();
// a running total of the soft limits covered by the rpcs that agree on the head block
let mut primary_sum_soft_limit: u32 = 0;
let mut backup_sum_soft_limit: u32 = 0;
// TODO: also track the sum of *available* hard_limits. if any servers have no hard limits, use their soft limit or no limit?
// check the highest work block for a set of rpcs that can serve our request load
// if it doesn't have enough rpcs for our request load, check the parent block
// TODO: loop for how many parent blocks? we don't want to serve blocks that are too far behind. probably different per chain
// TODO: this loop is pretty long. any way to clean up this code?
for _ in 0..max_lag_consensus_to_highest {
let maybe_head_hash = maybe_head_block.hash();
// find all rpcs with maybe_head_hash as their current head
for (rpc_name, rpc_head) in self.rpc_name_to_block.iter() {
if rpc_head.hash() != maybe_head_hash {
// connection is not on the desired block
continue;
}
if backup_consensus_rpcs.contains(rpc_name.as_str()) {
// connection is on a later block in this same chain
continue;
}
if primary_consensus_rpcs.contains(rpc_name.as_str()) {
// connection is on a later block in this same chain
continue;
}
if let Some(rpc) = web3_rpcs.conns.get(rpc_name.as_str()) {
if backup_rpcs_voted.is_some() {
// backups already voted for a head block. don't change it
} else {
backup_consensus_rpcs.insert(rpc_name);
backup_sum_soft_limit += rpc.soft_limit;
}
if !rpc.backup {
primary_consensus_rpcs.insert(rpc_name);
primary_sum_soft_limit += rpc.soft_limit;
}
} else {
// i don't think this is an error. i think its just if a reconnect is currently happening
warn!("connection missing: {}", rpc_name);
debug!("web3_rpcs.conns: {:#?}", web3_rpcs.conns);
}
}
if primary_sum_soft_limit >= web3_rpcs.min_sum_soft_limit
&& primary_consensus_rpcs.len() >= web3_rpcs.min_head_rpcs
{
// we have enough servers with enough requests! yey!
primary_rpcs_voted = Some(maybe_head_block.clone());
break;
}
if backup_rpcs_voted.is_none()
&& backup_consensus_rpcs != primary_consensus_rpcs
&& backup_sum_soft_limit >= web3_rpcs.min_sum_soft_limit
&& backup_consensus_rpcs.len() >= web3_rpcs.min_head_rpcs
{
// if we include backup servers, we have enough servers with high enough limits
backup_rpcs_voted = Some(maybe_head_block.clone());
}
// not enough rpcs on this block. check the parent block
match web3_rpcs
.block(authorization, &maybe_head_block.parent_hash(), None)
.await
{
Ok(parent_block) => {
// trace!(
// child=%maybe_head_hash, parent=%parent_block.hash.unwrap(), "avoiding thundering herd. checking consensus on parent block",
// );
maybe_head_block = parent_block.into();
continue;
}
Err(err) => {
let soft_limit_percent = (primary_sum_soft_limit as f32
/ web3_rpcs.min_sum_soft_limit as f32)
* 100.0;
let err_msg = format!("ran out of parents to check. rpcs {}/{}/{}. soft limit: {:.2}% ({}/{}). err: {:#?}",
primary_consensus_rpcs.len(),
num_known,
web3_rpcs.min_head_rpcs,
primary_sum_soft_limit,
web3_rpcs.min_sum_soft_limit,
soft_limit_percent,
err,
);
if backup_rpcs_voted.is_some() {
warn!("{}", err_msg);
break;
} else {
return Err(anyhow::anyhow!(err_msg));
}
}
}
}
// TODO: if consensus_head_rpcs.is_empty, try another method of finding the head block. will need to change the return Err above into breaks.
// we've done all the searching for the heaviest block that we can
if (primary_consensus_rpcs.len() < web3_rpcs.min_head_rpcs
|| primary_sum_soft_limit < web3_rpcs.min_sum_soft_limit)
&& backup_rpcs_voted.is_none()
{
// if we get here, not enough servers are synced. return an error
let soft_limit_percent =
(primary_sum_soft_limit as f32 / web3_rpcs.min_sum_soft_limit as f32) * 100.0;
return Err(anyhow::anyhow!(
"Not enough resources. rpcs {}/{}/{}. soft limit: {:.2}% ({}/{})",
primary_consensus_rpcs.len(),
num_known,
web3_rpcs.min_head_rpcs,
primary_sum_soft_limit,
web3_rpcs.min_sum_soft_limit,
soft_limit_percent,
));
}
// success! this block has enough soft limit and nodes on it (or on later blocks)
let conns: Vec<Arc<Web3Rpc>> = primary_consensus_rpcs
.into_iter()
.filter_map(|conn_name| web3_rpcs.conns.get(conn_name).cloned())
.collect();
#[cfg(debug_assertions)]
let _ = maybe_head_block.hash();
#[cfg(debug_assertions)]
let _ = maybe_head_block.number();
Ok(ConsensusWeb3Rpcs {
head_block: Some(maybe_head_block),
conns,
backups_voted: backup_rpcs_voted,
backups_needed: primary_rpcs_voted.is_none(),
})
}
}
/// A ConsensusConnections builder that tracks all connection heads across multiple groups of servers
pub struct ConsensusFinder {
/// backups for all tiers are only used if necessary
/// tiers[0] = only tier 0.
/// tiers[1] = tier 0 and tier 1
/// tiers[n] = tier 0..=n
/// This is a BTreeMap and not a Vec because sometimes a tier is empty
tiers: BTreeMap<u64, ConnectionsGroup>,
/// never serve blocks that are too old
max_block_age: Option<u64>,
/// tier 0 will be prefered as long as the distance between it and the other tiers is <= max_tier_lag
max_block_lag: Option<U64>,
}
impl ConsensusFinder {
pub fn new(max_block_age: Option<u64>, max_block_lag: Option<U64>) -> Self {
Self {
tiers: Default::default(),
max_block_age,
max_block_lag,
}
}
}
impl ConsensusFinder {
/// get the ConnectionsGroup that contains all rpcs
/// panics if there are no tiers
pub fn all_rpcs_group(&self) -> Option<&ConnectionsGroup> {
self.tiers.values().last()
}
/// get the mutable ConnectionsGroup that contains all rpcs
pub fn all_mut(&mut self) -> Option<&mut ConnectionsGroup> {
self.tiers.values_mut().last()
}
pub fn remove(&mut self, rpc: &Web3Rpc) -> Option<Web3ProxyBlock> {
let mut removed = None;
for (i, tier_group) in self.tiers.iter_mut().rev() {
if i < &rpc.tier {
break;
}
let x = tier_group.remove(rpc.name.as_str());
if removed.is_none() && x.is_some() {
removed = x;
}
}
removed
}
/// returns the block that the rpc was on before updating to the new_block
pub fn insert(&mut self, rpc: &Web3Rpc, new_block: Web3ProxyBlock) -> Option<Web3ProxyBlock> {
let mut old = None;
for (i, tier_group) in self.tiers.iter_mut().rev() {
if i > &rpc.tier {
break;
}
// TODO: should new_block be a ref?
let x = tier_group.insert(rpc, new_block.clone());
if old.is_none() && x.is_some() {
old = x;
}
}
old
}
/// Update our tracking of the rpc and return true if something changed
pub(crate) async fn update_rpc(
&mut self,
rpc_head_block: Option<Web3ProxyBlock>,
rpc: Arc<Web3Rpc>,
// we need this so we can save the block to caches. i don't like it though. maybe we should use a lazy_static Cache wrapper that has a "save_block" method?. i generally dislike globals but i also dislike all the types having to pass eachother around
web3_connections: &Web3Rpcs,
) -> anyhow::Result<bool> {
// add the rpc's block to connection_heads, or remove the rpc from connection_heads
let changed = match rpc_head_block {
Some(mut rpc_head_block) => {
// we don't know if its on the heaviest chain yet
rpc_head_block = web3_connections
.try_cache_block(rpc_head_block, false)
.await?;
// if let Some(max_block_lag) = max_block_lag {
// if rpc_head_block.number() < ??? {
// trace!("rpc_head_block from {} is too far behind! {}", rpc, rpc_head_block);
// return Ok(self.remove(&rpc).is_some());
// }
// }
if let Some(max_age) = self.max_block_age {
if rpc_head_block.age() > max_age {
trace!("rpc_head_block from {} is too old! {}", rpc, rpc_head_block);
return Ok(self.remove(&rpc).is_some());
}
}
if let Some(prev_block) = self.insert(&rpc, rpc_head_block.clone()) {
if prev_block.hash() == rpc_head_block.hash() {
// this block was already sent by this rpc. return early
false
} else {
// new block for this rpc
true
}
} else {
// first block for this rpc
true
}
}
None => {
if self.remove(&rpc).is_none() {
// this rpc was already removed
false
} else {
// rpc head changed from being synced to not
true
}
}
};
Ok(changed)
}
// TODO: this could definitely be cleaner. i don't like the error handling/unwrapping
pub async fn best_consensus_connections(
&mut self,
authorization: &Arc<Authorization>,
web3_connections: &Web3Rpcs,
) -> Option<ConsensusWeb3Rpcs> {
// TODO: attach context to these?
let highest_known_block = self.all_rpcs_group()?.highest_block.as_ref()?;
trace!("highest_known_block: {}", highest_known_block);
let min_block_num = self
.max_block_lag
.map(|x| highest_known_block.number().saturating_sub(x))
// we also want to be sure we don't ever go backwards!
.max(web3_connections.head_block_num());
trace!("min_block_num: {:#?}", min_block_num);
// TODO Should this be a Vec<Result<Option<_, _>>>?
// TODO: how should errors be handled?
// TODO: find the best tier with a connectionsgroup. best case, this only queries the first tier
// TODO: do we need to calculate all of them? I think having highest_known_block included as part of min_block_num should make that unnecessary
for (i, x) in self.tiers.iter() {
trace!("checking tier {}", i);
if let Ok(consensus_head_connections) = x
.consensus_head_connections(authorization, web3_connections, min_block_num)
.await
{
trace!("success on tier {}", i);
// we got one! hopefully it didn't need to use any backups.
// but even if it did need backup servers, that is better than going to a worse tier
return Some(consensus_head_connections);
}
}
return None;
}
}

@ -1,8 +1,8 @@
///! Load balanced communication with a group of web3 rpc providers
use super::blockchain::{ArcBlock, BlockHashesCache};
use super::blockchain::{BlockHashesCache, Web3ProxyBlock};
use super::consensus::ConsensusWeb3Rpcs;
use super::one::Web3Rpc;
use super::request::{OpenRequestHandle, OpenRequestResult, RequestRevertHandler};
use super::synced_connections::ConsensusWeb3Rpcs;
use crate::app::{flatten_handle, AnyhowJoinHandle};
use crate::config::{BlockAndRpc, TxHashAndRpc, Web3RpcConfig};
use crate::frontend::authorization::{Authorization, RequestMetadata};
@ -38,9 +38,9 @@ pub struct Web3Rpcs {
/// any requests will be forwarded to one (or more) of these connections
pub(crate) conns: HashMap<String, Arc<Web3Rpc>>,
/// all providers with the same consensus head block. won't update if there is no `self.watch_consensus_head_sender`
pub(super) watch_consensus_connections_sender: watch::Sender<Arc<ConsensusWeb3Rpcs>>,
pub(super) watch_consensus_rpcs_sender: watch::Sender<Arc<ConsensusWeb3Rpcs>>,
/// this head receiver makes it easy to wait until there is a new block
pub(super) watch_consensus_head_receiver: Option<watch::Receiver<ArcBlock>>,
pub(super) watch_consensus_head_receiver: Option<watch::Receiver<Web3ProxyBlock>>,
pub(super) pending_transactions:
Cache<TxHash, TxStatus, hashbrown::hash_map::DefaultHashBuilder>,
/// TODO: this map is going to grow forever unless we do some sort of pruning. maybe store pruned in redis?
@ -48,8 +48,14 @@ pub struct Web3Rpcs {
pub(super) block_hashes: BlockHashesCache,
/// blocks on the heaviest chain
pub(super) block_numbers: Cache<U64, H256, hashbrown::hash_map::DefaultHashBuilder>,
/// the number of rpcs required to agree on consensus for the head block (thundering herd protection)
pub(super) min_head_rpcs: usize,
/// the soft limit required to agree on consensus for the head block. (thundering herd protection)
pub(super) min_sum_soft_limit: u32,
/// how far behind the highest known block height we can be before we stop serving requests
pub(super) max_block_lag: Option<U64>,
/// how old our consensus head block we can be before we stop serving requests
pub(super) max_block_age: Option<u64>,
}
impl Web3Rpcs {
@ -60,13 +66,15 @@ impl Web3Rpcs {
chain_id: u64,
db_conn: Option<DatabaseConnection>,
http_client: Option<reqwest::Client>,
max_block_age: Option<u64>,
max_block_lag: Option<U64>,
min_head_rpcs: usize,
min_sum_soft_limit: u32,
pending_transactions: Cache<TxHash, TxStatus, hashbrown::hash_map::DefaultHashBuilder>,
pending_tx_sender: Option<broadcast::Sender<TxStatus>>,
redis_pool: Option<redis_rate_limiter::RedisPool>,
server_configs: HashMap<String, Web3RpcConfig>,
watch_consensus_head_sender: Option<watch::Sender<ArcBlock>>,
watch_consensus_head_sender: Option<watch::Sender<Web3ProxyBlock>>,
) -> anyhow::Result<(Arc<Self>, AnyhowJoinHandle<()>)> {
let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded();
let (block_sender, block_receiver) = flume::unbounded::<BlockAndRpc>();
@ -212,13 +220,15 @@ impl Web3Rpcs {
let connections = Arc::new(Self {
conns: connections,
watch_consensus_connections_sender,
watch_consensus_rpcs_sender: watch_consensus_connections_sender,
watch_consensus_head_receiver,
pending_transactions,
block_hashes,
block_numbers,
min_sum_soft_limit,
min_head_rpcs,
max_block_age,
max_block_lag,
});
let authorization = Arc::new(Authorization::internal(db_conn.clone())?);
@ -254,7 +264,7 @@ impl Web3Rpcs {
authorization: Arc<Authorization>,
pending_tx_id_receiver: flume::Receiver<TxHashAndRpc>,
block_receiver: flume::Receiver<BlockAndRpc>,
head_block_sender: Option<watch::Sender<ArcBlock>>,
head_block_sender: Option<watch::Sender<Web3ProxyBlock>>,
pending_tx_sender: Option<broadcast::Sender<TxStatus>>,
) -> anyhow::Result<()> {
let mut futures = vec![];
@ -455,7 +465,7 @@ impl Web3Rpcs {
max_block_needed: Option<&U64>,
) -> anyhow::Result<OpenRequestResult> {
let usable_rpcs_by_head_num_and_weight: BTreeMap<(Option<U64>, u64), Vec<Arc<Web3Rpc>>> = {
let synced_connections = self.watch_consensus_connections_sender.borrow().clone();
let synced_connections = self.watch_consensus_rpcs_sender.borrow().clone();
let head_block_num = if let Some(head_block) = synced_connections.head_block.as_ref() {
head_block.number()
@ -499,7 +509,7 @@ impl Web3Rpcs {
match x_head_block {
None => continue,
Some(x_head) => {
let key = (Some(x_head.number()), u64::MAX - x.tier);
let key = (Some(*x_head.number()), u64::MAX - x.tier);
m.entry(key).or_insert_with(Vec::new).push(x);
}
@ -508,6 +518,7 @@ impl Web3Rpcs {
}
cmp::Ordering::Equal => {
// need the consensus head block. filter the synced rpcs
// TODO: this doesn't properly check the allow_backups variable!
for x in synced_connections
.conns
.iter()
@ -519,7 +530,7 @@ impl Web3Rpcs {
}
}
cmp::Ordering::Greater => {
// TODO? if the blocks is close and wait_for_sync and allow_backups, wait for change on a watch_consensus_connections_receiver().subscribe()
// TODO? if the blocks is close, wait for change on a watch_consensus_connections_receiver().subscribe()
return Ok(OpenRequestResult::NotReady(allow_backups));
}
}
@ -670,11 +681,7 @@ impl Web3Rpcs {
let mut tried = HashSet::new();
let mut synced_conns = self
.watch_consensus_connections_sender
.borrow()
.conns
.clone();
let mut synced_conns = self.watch_consensus_rpcs_sender.borrow().conns.clone();
// synced connections are all on the same block. sort them by tier with higher soft limits first
synced_conns.sort_by_cached_key(rpc_sync_status_sort_key);
@ -754,7 +761,7 @@ impl Web3Rpcs {
let mut skip_rpcs = vec![];
let mut method_not_available_response = None;
let mut watch_consensus_connections = self.watch_consensus_connections_sender.subscribe();
let mut watch_consensus_connections = self.watch_consensus_rpcs_sender.subscribe();
// TODO: maximum retries? right now its the total number of servers
loop {
@ -1144,7 +1151,7 @@ impl Serialize for Web3Rpcs {
state.serialize_field("conns", &conns)?;
{
let consensus_connections = self.watch_consensus_connections_sender.borrow().clone();
let consensus_connections = self.watch_consensus_rpcs_sender.borrow().clone();
// TODO: rename synced_connections to consensus_connections?
state.serialize_field("synced_connections", &consensus_connections)?;
}
@ -1181,10 +1188,8 @@ mod tests {
// TODO: why is this allow needed? does tokio::test get in the way somehow?
#![allow(unused_imports)]
use super::*;
use crate::rpcs::{
blockchain::{ConsensusFinder, SavedBlock},
provider::Web3Provider,
};
use crate::rpcs::consensus::ConsensusFinder;
use crate::rpcs::{blockchain::Web3ProxyBlock, provider::Web3Provider};
use ethers::types::{Block, U256};
use log::{trace, LevelFilter};
use parking_lot::RwLock;
@ -1213,7 +1218,7 @@ mod tests {
let blocks: Vec<_> = [block_0, block_1, block_2]
.into_iter()
.map(|x| SavedBlock::new(Arc::new(x)))
.map(|x| Web3ProxyBlock::new(Arc::new(x)))
.collect();
let mut rpcs: Vec<_> = [
@ -1298,9 +1303,8 @@ mod tests {
let lagged_block = Arc::new(lagged_block);
let head_block = Arc::new(head_block);
// TODO: write a impl From for Block -> BlockId?
let mut lagged_block: SavedBlock = lagged_block.into();
let mut head_block: SavedBlock = head_block.into();
let mut lagged_block: Web3ProxyBlock = lagged_block.into();
let mut head_block: Web3ProxyBlock = head_block.into();
let block_data_limit = u64::MAX;
@ -1312,6 +1316,7 @@ mod tests {
block_data_limit: block_data_limit.into(),
tier: 0,
head_block: RwLock::new(Some(head_block.clone())),
provider: AsyncRwLock::new(Some(Arc::new(Web3Provider::Mock))),
..Default::default()
};
@ -1323,6 +1328,7 @@ mod tests {
block_data_limit: block_data_limit.into(),
tier: 0,
head_block: RwLock::new(Some(lagged_block.clone())),
provider: AsyncRwLock::new(Some(Arc::new(Web3Provider::Mock))),
..Default::default()
};
@ -1340,13 +1346,13 @@ mod tests {
(lagged_rpc.name.clone(), lagged_rpc.clone()),
]);
let (watch_consensus_connections_sender, _) = watch::channel(Default::default());
let (watch_consensus_rpcs_sender, _) = watch::channel(Default::default());
// TODO: make a Web3Rpcs::new
let conns = Web3Rpcs {
conns,
watch_consensus_head_receiver: None,
watch_consensus_connections_sender,
watch_consensus_rpcs_sender,
pending_transactions: Cache::builder()
.max_capacity(10_000)
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()),
@ -1356,32 +1362,37 @@ mod tests {
block_numbers: Cache::builder()
.max_capacity(10_000)
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()),
// TODO: test max_block_age?
max_block_age: None,
// TODO: test max_block_lag?
max_block_lag: None,
min_head_rpcs: 1,
min_sum_soft_limit: 1,
};
let authorization = Arc::new(Authorization::internal(None).unwrap());
let (head_block_sender, _head_block_receiver) =
watch::channel::<ArcBlock>(Default::default());
let mut connection_heads = ConsensusFinder::default();
let (head_block_sender, _head_block_receiver) = watch::channel(Default::default());
let mut consensus_finder = ConsensusFinder::new(None, None);
// process None so that
conns
.process_block_from_rpc(
&authorization,
&mut connection_heads,
&mut consensus_finder,
None,
lagged_rpc.clone(),
&head_block_sender,
&None,
)
.await
.unwrap();
.expect(
"its lagged, but it should still be seen as consensus if its the first to report",
);
conns
.process_block_from_rpc(
&authorization,
&mut connection_heads,
&mut consensus_finder,
None,
head_rpc.clone(),
&head_block_sender,
@ -1414,12 +1425,12 @@ mod tests {
assert!(matches!(x, OpenRequestResult::NotReady(true)));
// add lagged blocks to the conns. both servers should be allowed
lagged_block.block = conns.save_block(lagged_block.block, true).await.unwrap();
lagged_block = conns.try_cache_block(lagged_block, true).await.unwrap();
conns
.process_block_from_rpc(
&authorization,
&mut connection_heads,
&mut consensus_finder,
Some(lagged_block.clone()),
lagged_rpc,
&head_block_sender,
@ -1430,7 +1441,7 @@ mod tests {
conns
.process_block_from_rpc(
&authorization,
&mut connection_heads,
&mut consensus_finder,
Some(lagged_block.clone()),
head_rpc.clone(),
&head_block_sender,
@ -1442,12 +1453,12 @@ mod tests {
assert_eq!(conns.num_synced_rpcs(), 2);
// add head block to the conns. lagged_rpc should not be available
head_block.block = conns.save_block(head_block.block, true).await.unwrap();
head_block = conns.try_cache_block(head_block, true).await.unwrap();
conns
.process_block_from_rpc(
&authorization,
&mut connection_heads,
&mut consensus_finder,
Some(head_block.clone()),
head_rpc,
&head_block_sender,
@ -1511,7 +1522,7 @@ mod tests {
..Default::default()
};
let head_block: SavedBlock = Arc::new(head_block).into();
let head_block: Web3ProxyBlock = Arc::new(head_block).into();
let pruned_rpc = Web3Rpc {
name: "pruned".to_string(),
@ -1548,13 +1559,13 @@ mod tests {
(archive_rpc.name.clone(), archive_rpc.clone()),
]);
let (watch_consensus_connections_sender, _) = watch::channel(Default::default());
let (watch_consensus_rpcs_sender, _) = watch::channel(Default::default());
// TODO: make a Web3Rpcs::new
let conns = Web3Rpcs {
conns,
watch_consensus_head_receiver: None,
watch_consensus_connections_sender,
watch_consensus_rpcs_sender,
pending_transactions: Cache::builder()
.max_capacity(10)
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()),
@ -1566,13 +1577,14 @@ mod tests {
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()),
min_head_rpcs: 1,
min_sum_soft_limit: 3_000,
max_block_age: None,
max_block_lag: None,
};
let authorization = Arc::new(Authorization::internal(None).unwrap());
let (head_block_sender, _head_block_receiver) =
watch::channel::<ArcBlock>(Default::default());
let mut connection_heads = ConsensusFinder::default();
let (head_block_sender, _head_block_receiver) = watch::channel(Default::default());
let mut connection_heads = ConsensusFinder::new(None, None);
conns
.process_block_from_rpc(

@ -1,8 +1,8 @@
// TODO: all pub, or export useful things here instead?
pub mod blockchain;
pub mod consensus;
pub mod many;
pub mod one;
pub mod provider;
pub mod request;
pub mod synced_connections;
pub mod transactions;

@ -1,5 +1,5 @@
///! Rate-limited communication with a web3 provider.
use super::blockchain::{ArcBlock, BlockHashesCache, SavedBlock};
use super::blockchain::{ArcBlock, BlockHashesCache, Web3ProxyBlock};
use super::provider::Web3Provider;
use super::request::{OpenRequestHandle, OpenRequestResult};
use crate::app::{flatten_handle, AnyhowJoinHandle};
@ -81,7 +81,7 @@ pub struct Web3Rpc {
/// Lower tiers are higher priority when sending requests
pub(super) tier: u64,
/// TODO: change this to a watch channel so that http providers can subscribe and take action on change.
pub(super) head_block: RwLock<Option<SavedBlock>>,
pub(super) head_block: RwLock<Option<Web3ProxyBlock>>,
/// Track how fast this RPC is
pub(super) latency: Web3RpcLatencies,
}
@ -308,9 +308,9 @@ impl Web3Rpc {
}
pub fn has_block_data(&self, needed_block_num: &U64) -> bool {
let head_block_num = match self.head_block.read().clone() {
let head_block_num = match self.head_block.read().as_ref() {
None => return false,
Some(x) => x.number(),
Some(x) => *x.number(),
};
// this rpc doesn't have that block yet. still syncing
@ -525,9 +525,9 @@ impl Web3Rpc {
None
}
Ok(Some(new_head_block)) => {
let new_hash = new_head_block
.hash
.context("sending block to connections")?;
let new_head_block = Web3ProxyBlock::new(new_head_block);
let new_hash = *new_head_block.hash();
// if we already have this block saved, set new_head_block to that arc. otherwise store this copy
let new_head_block = block_map
@ -628,6 +628,7 @@ impl Web3Rpc {
if let Some(client) = &*conn.provider.read().await {
// trace!("health check unlocked with error on {}", conn);
// returning error will trigger a reconnect
// also, do the health check as a way of keeping this rpc's request_ewma accurate
// TODO: do a query of some kind
}
@ -1164,7 +1165,7 @@ mod tests {
let random_block = Arc::new(random_block);
let head_block = SavedBlock::new(random_block);
let head_block = Web3ProxyBlock::new(random_block);
let block_data_limit = u64::MAX;
let x = Web3Rpc {
@ -1194,7 +1195,7 @@ mod tests {
.as_secs()
.into();
let head_block: SavedBlock = Arc::new(Block {
let head_block: Web3ProxyBlock = Arc::new(Block {
hash: Some(H256::random()),
number: Some(1_000_000.into()),
timestamp: now,

@ -1,71 +0,0 @@
use super::blockchain::{ArcBlock, SavedBlock};
use super::many::Web3Rpcs;
use super::one::Web3Rpc;
use ethers::prelude::{H256, U64};
use serde::Serialize;
use std::fmt;
use std::sync::Arc;
/// A collection of Web3Rpcs that are on the same block.
/// Serialize is so we can print it on our debug endpoint
#[derive(Clone, Default, Serialize)]
pub struct ConsensusWeb3Rpcs {
// TODO: store ArcBlock instead?
pub(super) head_block: Option<SavedBlock>,
// TODO: this should be able to serialize, but it isn't
#[serde(skip_serializing)]
pub(super) conns: Vec<Arc<Web3Rpc>>,
pub(super) num_checked_conns: usize,
pub(super) includes_backups: bool,
}
impl ConsensusWeb3Rpcs {
pub fn num_conns(&self) -> usize {
self.conns.len()
}
pub fn sum_soft_limit(&self) -> u32 {
self.conns.iter().fold(0, |sum, rpc| sum + rpc.soft_limit)
}
// TODO: sum_hard_limit?
}
impl fmt::Debug for ConsensusWeb3Rpcs {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// TODO: the default formatter takes forever to write. this is too quiet though
// TODO: print the actual conns?
f.debug_struct("ConsensusConnections")
.field("head_block", &self.head_block)
.field("num_conns", &self.conns.len())
.finish_non_exhaustive()
}
}
impl Web3Rpcs {
pub fn head_block(&self) -> Option<ArcBlock> {
self.watch_consensus_head_receiver
.as_ref()
.map(|x| x.borrow().clone())
}
pub fn head_block_hash(&self) -> Option<H256> {
self.head_block().and_then(|x| x.hash)
}
pub fn head_block_num(&self) -> Option<U64> {
self.head_block().and_then(|x| x.number)
}
pub fn synced(&self) -> bool {
!self
.watch_consensus_connections_sender
.borrow()
.conns
.is_empty()
}
pub fn num_synced_rpcs(&self) -> usize {
self.watch_consensus_connections_sender.borrow().conns.len()
}
}