well it compiles. doesnt work yet. but it compiles
This commit is contained in:
parent
53757621ef
commit
da33ec32eb
25
Cargo.lock
generated
25
Cargo.lock
generated
@ -282,13 +282,13 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
|
||||
|
||||
[[package]]
|
||||
name = "axum"
|
||||
version = "0.6.5"
|
||||
version = "0.6.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3114e77b361ec716aa429ae5c04243abe00cf7548e870b9370affcc5c491a7d0"
|
||||
checksum = "4e246206a63c9830e118d12c894f56a82033da1a2361f5544deeee3df85c99d9"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"axum-core",
|
||||
"base64 0.20.0",
|
||||
"base64 0.21.0",
|
||||
"bitflags",
|
||||
"bytes",
|
||||
"futures-util",
|
||||
@ -347,9 +347,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "axum-macros"
|
||||
version = "0.3.3"
|
||||
version = "0.3.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "cb6bee4e05a5e0a5a67515ab24978efa7a80575a7a41a9fae35bb27fed6645d2"
|
||||
checksum = "5fbf955307ff8addb48d2399393c9e2740dd491537ec562b66ab364fc4a38841"
|
||||
dependencies = [
|
||||
"heck 0.4.0",
|
||||
"proc-macro2",
|
||||
@ -419,12 +419,6 @@ version = "0.13.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8"
|
||||
|
||||
[[package]]
|
||||
name = "base64"
|
||||
version = "0.20.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0ea22880d78093b0cbe17c89f64a7d457941e65759157ec6cb31a31d652b05e5"
|
||||
|
||||
[[package]]
|
||||
name = "base64"
|
||||
version = "0.21.0"
|
||||
@ -3093,9 +3087,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "once_cell"
|
||||
version = "1.17.0"
|
||||
version = "1.17.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6f61fba1741ea2b3d6a1e3178721804bb716a68a6aeba1149b5d52e3d464ea66"
|
||||
checksum = "b7e5500299e16ebb147ae15a00a942af264cf3688f47923b8fc2cd5858f23ad3"
|
||||
|
||||
[[package]]
|
||||
name = "opaque-debug"
|
||||
@ -4520,9 +4514,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "serde_prometheus"
|
||||
version = "0.2.0"
|
||||
version = "0.2.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1bfb6048d9e4ebc41f7d1a42c79b04c5b460633be307620a0e34a8f81970ea47"
|
||||
checksum = "9c1a4ca38f4e746460d1dbd3711b8ca8ae314d1b21247edeff61dd20325b5a6f"
|
||||
dependencies = [
|
||||
"heapless",
|
||||
"nom",
|
||||
@ -5793,6 +5787,7 @@ dependencies = [
|
||||
"notify",
|
||||
"num",
|
||||
"num-traits",
|
||||
"once_cell",
|
||||
"pagerduty-rs",
|
||||
"parking_lot 0.12.1",
|
||||
"prettytable",
|
||||
|
@ -27,9 +27,9 @@ thread-fast-rng = { path = "../thread-fast-rng" }
|
||||
|
||||
anyhow = { version = "1.0.69", features = ["backtrace"] }
|
||||
argh = "0.1.10"
|
||||
axum = { version = "0.6.5", features = ["headers", "ws"] }
|
||||
axum = { version = "0.6.6", features = ["headers", "ws"] }
|
||||
axum-client-ip = "0.4.0"
|
||||
axum-macros = "0.3.3"
|
||||
axum-macros = "0.3.4"
|
||||
chrono = "0.4.23"
|
||||
counter = "0.5.7"
|
||||
derive_more = "0.99.17"
|
||||
@ -52,6 +52,7 @@ moka = { version = "0.10.0", default-features = false, features = ["future"] }
|
||||
notify = "5.1.0"
|
||||
num = "0.4.0"
|
||||
num-traits = "0.2.15"
|
||||
once_cell = { version = "1.17.1" }
|
||||
pagerduty-rs = { version = "0.1.6", default-features = false, features = ["async", "rustls", "sync"] }
|
||||
parking_lot = { version = "0.12.1", features = ["arc_lock"] }
|
||||
prettytable = "*"
|
||||
@ -62,7 +63,7 @@ rustc-hash = "1.1.0"
|
||||
sentry = { version = "0.29.3", default-features = false, features = ["backtrace", "contexts", "panic", "anyhow", "reqwest", "rustls", "log", "sentry-log"] }
|
||||
serde = { version = "1.0.152", features = [] }
|
||||
serde_json = { version = "1.0.93", default-features = false, features = ["alloc", "raw_value"] }
|
||||
serde_prometheus = "0.2.0"
|
||||
serde_prometheus = "0.2.1"
|
||||
siwe = "0.5.0"
|
||||
time = "0.3.17"
|
||||
tokio = { version = "1.25.0", features = ["full"] }
|
||||
|
@ -10,7 +10,7 @@ use crate::frontend::rpc_proxy_ws::ProxyMode;
|
||||
use crate::jsonrpc::{
|
||||
JsonRpcForwardedResponse, JsonRpcForwardedResponseEnum, JsonRpcRequest, JsonRpcRequestEnum,
|
||||
};
|
||||
use crate::rpcs::blockchain::{ArcBlock, SavedBlock};
|
||||
use crate::rpcs::blockchain::{BlockHashesCache, Web3ProxyBlock};
|
||||
use crate::rpcs::many::Web3Rpcs;
|
||||
use crate::rpcs::one::Web3Rpc;
|
||||
use crate::rpcs::transactions::TxStatus;
|
||||
@ -23,7 +23,7 @@ use derive_more::From;
|
||||
use entities::sea_orm_active_enums::LogLevel;
|
||||
use entities::user;
|
||||
use ethers::core::utils::keccak256;
|
||||
use ethers::prelude::{Address, Block, Bytes, Transaction, TxHash, H256, U64};
|
||||
use ethers::prelude::{Address, Bytes, Transaction, TxHash, H256, U64};
|
||||
use ethers::types::U256;
|
||||
use ethers::utils::rlp::{Decodable, Rlp};
|
||||
use futures::future::join_all;
|
||||
@ -69,9 +69,9 @@ pub static REQUEST_PERIOD: u64 = 60;
|
||||
#[derive(From)]
|
||||
struct ResponseCacheKey {
|
||||
// if none, this is cached until evicted
|
||||
from_block: Option<SavedBlock>,
|
||||
from_block: Option<Web3ProxyBlock>,
|
||||
// to_block is only set when ranges of blocks are requested (like with eth_getLogs)
|
||||
to_block: Option<SavedBlock>,
|
||||
to_block: Option<Web3ProxyBlock>,
|
||||
method: String,
|
||||
// TODO: better type for this
|
||||
params: Option<serde_json::Value>,
|
||||
@ -204,7 +204,7 @@ pub struct Web3ProxyApp {
|
||||
response_cache: ResponseCache,
|
||||
// don't drop this or the sender will stop working
|
||||
// TODO: broadcast channel instead?
|
||||
watch_consensus_head_receiver: watch::Receiver<ArcBlock>,
|
||||
watch_consensus_head_receiver: watch::Receiver<Web3ProxyBlock>,
|
||||
pending_tx_sender: broadcast::Sender<TxStatus>,
|
||||
pub config: AppConfig,
|
||||
pub db_conn: Option<sea_orm::DatabaseConnection>,
|
||||
@ -542,7 +542,7 @@ impl Web3ProxyApp {
|
||||
|
||||
// TODO: i don't like doing Block::default here! Change this to "None"?
|
||||
let (watch_consensus_head_sender, watch_consensus_head_receiver) =
|
||||
watch::channel(Arc::new(Block::default()));
|
||||
watch::channel(Web3ProxyBlock::default());
|
||||
// TODO: will one receiver lagging be okay? how big should this be?
|
||||
let (pending_tx_sender, pending_tx_receiver) = broadcast::channel(256);
|
||||
|
||||
@ -563,11 +563,11 @@ impl Web3ProxyApp {
|
||||
// TODO: limits from config
|
||||
// these blocks don't have full transactions, but they do have rather variable amounts of transaction hashes
|
||||
// TODO: how can we do the weigher better?
|
||||
let block_map = Cache::builder()
|
||||
let block_map: BlockHashesCache = Cache::builder()
|
||||
.max_capacity(1024 * 1024 * 1024)
|
||||
.weigher(|_k, v: &ArcBlock| {
|
||||
.weigher(|_k, v: &Web3ProxyBlock| {
|
||||
// TODO: is this good enough?
|
||||
1 + v.transactions.len().try_into().unwrap_or(u32::MAX)
|
||||
1 + v.block.transactions.len().try_into().unwrap_or(u32::MAX)
|
||||
})
|
||||
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default());
|
||||
|
||||
@ -577,6 +577,8 @@ impl Web3ProxyApp {
|
||||
top_config.app.chain_id,
|
||||
db_conn.clone(),
|
||||
http_client.clone(),
|
||||
top_config.app.max_block_age,
|
||||
top_config.app.max_block_lag,
|
||||
top_config.app.min_synced_rpcs,
|
||||
top_config.app.min_sum_soft_limit,
|
||||
pending_transactions.clone(),
|
||||
@ -603,6 +605,9 @@ impl Web3ProxyApp {
|
||||
top_config.app.chain_id,
|
||||
db_conn.clone(),
|
||||
http_client.clone(),
|
||||
// private rpcs don't get subscriptions, so no need for max_block_age or max_block_lag
|
||||
None,
|
||||
None,
|
||||
0,
|
||||
0,
|
||||
pending_transactions.clone(),
|
||||
@ -735,7 +740,7 @@ impl Web3ProxyApp {
|
||||
Ok((app, cancellable_handles, important_background_handles).into())
|
||||
}
|
||||
|
||||
pub fn head_block_receiver(&self) -> watch::Receiver<ArcBlock> {
|
||||
pub fn head_block_receiver(&self) -> watch::Receiver<Web3ProxyBlock> {
|
||||
self.watch_consensus_head_receiver.clone()
|
||||
}
|
||||
|
||||
@ -1481,7 +1486,7 @@ impl Web3ProxyApp {
|
||||
.await?;
|
||||
|
||||
Some(ResponseCacheKey {
|
||||
from_block: Some(SavedBlock::new(request_block)),
|
||||
from_block: Some(request_block),
|
||||
to_block: None,
|
||||
method: method.to_string(),
|
||||
// TODO: hash here?
|
||||
@ -1521,8 +1526,8 @@ impl Web3ProxyApp {
|
||||
.await?;
|
||||
|
||||
Some(ResponseCacheKey {
|
||||
from_block: Some(SavedBlock::new(from_block)),
|
||||
to_block: Some(SavedBlock::new(to_block)),
|
||||
from_block: Some(from_block),
|
||||
to_block: Some(to_block),
|
||||
method: method.to_string(),
|
||||
// TODO: hash here?
|
||||
params: request.params.clone(),
|
||||
@ -1537,8 +1542,8 @@ impl Web3ProxyApp {
|
||||
let authorization = authorization.clone();
|
||||
|
||||
if let Some(cache_key) = cache_key {
|
||||
let from_block_num = cache_key.from_block.as_ref().map(|x| x.number());
|
||||
let to_block_num = cache_key.to_block.as_ref().map(|x| x.number());
|
||||
let from_block_num = cache_key.from_block.as_ref().map(|x| *x.number());
|
||||
let to_block_num = cache_key.to_block.as_ref().map(|x| *x.number());
|
||||
|
||||
self.response_cache
|
||||
.try_get_with(cache_key, async move {
|
||||
|
@ -72,7 +72,7 @@ impl Web3ProxyApp {
|
||||
"params": {
|
||||
"subscription": subscription_id,
|
||||
// TODO: option to include full transaction objects instead of just the hashes?
|
||||
"result": new_head.as_ref(),
|
||||
"result": new_head.block,
|
||||
},
|
||||
});
|
||||
|
||||
|
@ -80,12 +80,7 @@ pub async fn clean_block_number(
|
||||
.context("fetching block number from hash")?;
|
||||
|
||||
// TODO: set change to true? i think not we should probably use hashes for everything.
|
||||
(
|
||||
block
|
||||
.number
|
||||
.expect("blocks here should always have numbers"),
|
||||
false,
|
||||
)
|
||||
(*block.number(), false)
|
||||
} else {
|
||||
return Err(anyhow::anyhow!("blockHash missing"));
|
||||
}
|
||||
|
@ -1,9 +1,9 @@
|
||||
use crate::rpcs::blockchain::BlockHashesCache;
|
||||
use crate::app::AnyhowJoinHandle;
|
||||
use crate::rpcs::blockchain::{BlockHashesCache, Web3ProxyBlock};
|
||||
use crate::rpcs::one::Web3Rpc;
|
||||
use crate::{app::AnyhowJoinHandle, rpcs::blockchain::ArcBlock};
|
||||
use argh::FromArgs;
|
||||
use ethers::prelude::TxHash;
|
||||
use ethers::types::U256;
|
||||
use ethers::types::{U256, U64};
|
||||
use hashbrown::HashMap;
|
||||
use log::warn;
|
||||
use migration::sea_orm::DatabaseConnection;
|
||||
@ -11,7 +11,7 @@ use serde::Deserialize;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::broadcast;
|
||||
|
||||
pub type BlockAndRpc = (Option<ArcBlock>, Arc<Web3Rpc>);
|
||||
pub type BlockAndRpc = (Option<Web3ProxyBlock>, Arc<Web3Rpc>);
|
||||
pub type TxHashAndRpc = (TxHash, Arc<Web3Rpc>);
|
||||
|
||||
#[derive(Debug, FromArgs)]
|
||||
@ -105,6 +105,12 @@ pub struct AppConfig {
|
||||
pub invite_code: Option<String>,
|
||||
pub login_domain: Option<String>,
|
||||
|
||||
/// do not serve any requests if the best known block is older than this many seconds.
|
||||
pub max_block_age: Option<u64>,
|
||||
|
||||
/// do not serve any requests if the best known block is behind the best known block by more than this many blocks.
|
||||
pub max_block_lag: Option<U64>,
|
||||
|
||||
/// Rate limit for bearer token authenticated entrypoints.
|
||||
/// This is separate from the rpc limits.
|
||||
#[serde(default = "default_bearer_token_max_concurrent_requests")]
|
||||
|
@ -1,16 +1,14 @@
|
||||
use super::consensus::ConsensusFinder;
|
||||
use super::many::Web3Rpcs;
|
||||
///! Keep track of the blockchain as seen by a Web3Rpcs.
|
||||
use super::one::Web3Rpc;
|
||||
use super::transactions::TxStatus;
|
||||
use crate::frontend::authorization::Authorization;
|
||||
use crate::{
|
||||
config::BlockAndRpc, jsonrpc::JsonRpcRequest, rpcs::synced_connections::ConsensusWeb3Rpcs,
|
||||
};
|
||||
use crate::{config::BlockAndRpc, jsonrpc::JsonRpcRequest};
|
||||
use anyhow::Context;
|
||||
use derive_more::From;
|
||||
use ethers::prelude::{Block, TxHash, H256, U64};
|
||||
use hashbrown::{HashMap, HashSet};
|
||||
use log::{debug, error, warn, Level};
|
||||
use log::{debug, error, trace, warn, Level};
|
||||
use moka::future::Cache;
|
||||
use serde::Serialize;
|
||||
use serde_json::json;
|
||||
@ -22,17 +20,18 @@ use tokio::time::Duration;
|
||||
// TODO: type for Hydrated Blocks with their full transactions?
|
||||
pub type ArcBlock = Arc<Block<TxHash>>;
|
||||
|
||||
pub type BlockHashesCache = Cache<H256, ArcBlock, hashbrown::hash_map::DefaultHashBuilder>;
|
||||
pub type BlockHashesCache = Cache<H256, Web3ProxyBlock, hashbrown::hash_map::DefaultHashBuilder>;
|
||||
|
||||
/// A block and its age.
|
||||
#[derive(Clone, Debug, Default, From, Serialize)]
|
||||
pub struct SavedBlock {
|
||||
pub struct Web3ProxyBlock {
|
||||
pub block: ArcBlock,
|
||||
/// number of seconds this block was behind the current time when received
|
||||
pub age: u64,
|
||||
/// this is only set if the block is from a subscription
|
||||
pub received_age: Option<u64>,
|
||||
}
|
||||
|
||||
impl PartialEq for SavedBlock {
|
||||
impl PartialEq for Web3ProxyBlock {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
match (self.block.hash, other.block.hash) {
|
||||
(None, None) => true,
|
||||
@ -43,18 +42,23 @@ impl PartialEq for SavedBlock {
|
||||
}
|
||||
}
|
||||
|
||||
impl SavedBlock {
|
||||
impl Web3ProxyBlock {
|
||||
/// A new block has arrived over a subscription
|
||||
pub fn new(block: ArcBlock) -> Self {
|
||||
let mut x = Self { block, age: 0 };
|
||||
let mut x = Self {
|
||||
block,
|
||||
received_age: None,
|
||||
};
|
||||
|
||||
// no need to recalulate lag every time
|
||||
// if the head block gets too old, a health check restarts this connection
|
||||
x.age = x.lag();
|
||||
// TODO: emit a stat for received_age
|
||||
x.received_age = Some(x.age());
|
||||
|
||||
x
|
||||
}
|
||||
|
||||
pub fn lag(&self) -> u64 {
|
||||
pub fn age(&self) -> u64 {
|
||||
let now = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.expect("there should always be time");
|
||||
@ -70,37 +74,58 @@ impl SavedBlock {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn hash(&self) -> H256 {
|
||||
self.block.hash.expect("saved blocks must have a hash")
|
||||
#[inline(always)]
|
||||
pub fn parent_hash(&self) -> &H256 {
|
||||
&self.block.parent_hash
|
||||
}
|
||||
|
||||
// TODO: return as U64 or u64?
|
||||
pub fn number(&self) -> U64 {
|
||||
self.block.number.expect("saved blocks must have a number")
|
||||
#[inline(always)]
|
||||
pub fn hash(&self) -> &H256 {
|
||||
self.block
|
||||
.hash
|
||||
.as_ref()
|
||||
.expect("saved blocks must have a hash")
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
pub fn number(&self) -> &U64 {
|
||||
self.block
|
||||
.number
|
||||
.as_ref()
|
||||
.expect("saved blocks must have a number")
|
||||
}
|
||||
}
|
||||
|
||||
impl From<ArcBlock> for SavedBlock {
|
||||
impl From<ArcBlock> for Web3ProxyBlock {
|
||||
fn from(x: ArcBlock) -> Self {
|
||||
SavedBlock::new(x)
|
||||
Web3ProxyBlock {
|
||||
block: x,
|
||||
received_age: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for SavedBlock {
|
||||
impl Display for Web3ProxyBlock {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{} ({}, {}s old)", self.number(), self.hash(), self.age)
|
||||
write!(
|
||||
f,
|
||||
"{} ({}, {}s old)",
|
||||
self.number(),
|
||||
self.hash(),
|
||||
self.age()
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl Web3Rpcs {
|
||||
/// add a block to our mappings and track the heaviest chain
|
||||
pub async fn save_block(
|
||||
pub async fn try_cache_block(
|
||||
&self,
|
||||
block: ArcBlock,
|
||||
block: Web3ProxyBlock,
|
||||
heaviest_chain: bool,
|
||||
) -> anyhow::Result<ArcBlock> {
|
||||
) -> anyhow::Result<Web3ProxyBlock> {
|
||||
// TODO: i think we can rearrange this function to make it faster on the hot path
|
||||
let block_hash = block.hash.as_ref().context("no block hash")?;
|
||||
let block_hash = block.hash();
|
||||
|
||||
// skip Block::default()
|
||||
if block_hash.is_zero() {
|
||||
@ -108,7 +133,7 @@ impl Web3Rpcs {
|
||||
return Ok(block);
|
||||
}
|
||||
|
||||
let block_num = block.number.as_ref().context("no block num")?;
|
||||
let block_num = block.number();
|
||||
|
||||
// TODO: think more about heaviest_chain. would be better to do the check inside this function
|
||||
if heaviest_chain {
|
||||
@ -136,7 +161,7 @@ impl Web3Rpcs {
|
||||
authorization: &Arc<Authorization>,
|
||||
hash: &H256,
|
||||
rpc: Option<&Arc<Web3Rpc>>,
|
||||
) -> anyhow::Result<ArcBlock> {
|
||||
) -> anyhow::Result<Web3ProxyBlock> {
|
||||
// first, try to get the hash from our cache
|
||||
// the cache is set last, so if its here, its everywhere
|
||||
// TODO: use try_get_with
|
||||
@ -147,17 +172,18 @@ impl Web3Rpcs {
|
||||
// block not in cache. we need to ask an rpc for it
|
||||
let get_block_params = (*hash, false);
|
||||
// TODO: if error, retry?
|
||||
let block: ArcBlock = match rpc {
|
||||
let block: Web3ProxyBlock = match rpc {
|
||||
Some(rpc) => rpc
|
||||
.wait_for_request_handle(authorization, Some(Duration::from_secs(30)), None)
|
||||
.await?
|
||||
.request::<_, Option<_>>(
|
||||
.request::<_, Option<ArcBlock>>(
|
||||
"eth_getBlockByHash",
|
||||
&json!(get_block_params),
|
||||
Level::Error.into(),
|
||||
None,
|
||||
)
|
||||
.await?
|
||||
.map(Into::into)
|
||||
.context("no block!")?,
|
||||
None => {
|
||||
// TODO: helper for method+params => JsonRpcRequest
|
||||
@ -181,13 +207,14 @@ impl Web3Rpcs {
|
||||
|
||||
let block: Option<ArcBlock> = serde_json::from_str(block.get())?;
|
||||
|
||||
block.context("no block!")?
|
||||
// TODO: from isn't great here. received time is going to be weird
|
||||
block.map(Into::into).context("no block!")?
|
||||
}
|
||||
};
|
||||
|
||||
// the block was fetched using eth_getBlockByHash, so it should have all fields
|
||||
// TODO: fill in heaviest_chain! if the block is old enough, is this definitely true?
|
||||
let block = self.save_block(block, false).await?;
|
||||
let block = self.try_cache_block(block, false).await?;
|
||||
|
||||
Ok(block)
|
||||
}
|
||||
@ -200,7 +227,7 @@ impl Web3Rpcs {
|
||||
) -> anyhow::Result<(H256, u64)> {
|
||||
let (block, block_depth) = self.cannonical_block(authorization, num).await?;
|
||||
|
||||
let hash = block.hash.expect("Saved blocks should always have hashes");
|
||||
let hash = *block.hash();
|
||||
|
||||
Ok((hash, block_depth))
|
||||
}
|
||||
@ -211,7 +238,7 @@ impl Web3Rpcs {
|
||||
&self,
|
||||
authorization: &Arc<Authorization>,
|
||||
num: &U64,
|
||||
) -> anyhow::Result<(ArcBlock, u64)> {
|
||||
) -> anyhow::Result<(Web3ProxyBlock, u64)> {
|
||||
// we only have blocks by hash now
|
||||
// maybe save them during save_block in a blocks_by_number Cache<U64, Vec<ArcBlock>>
|
||||
// if theres multiple, use petgraph to find the one on the main chain (and remove the others if they have enough confirmations)
|
||||
@ -223,28 +250,21 @@ impl Web3Rpcs {
|
||||
.clone();
|
||||
|
||||
// be sure the requested block num exists
|
||||
let mut head_block_num = consensus_head_receiver.borrow_and_update().number;
|
||||
// TODO: is this okay? what if we aren't synced?!
|
||||
let mut head_block_num = *consensus_head_receiver.borrow_and_update().number();
|
||||
|
||||
loop {
|
||||
if let Some(head_block_num) = head_block_num {
|
||||
if num <= &head_block_num {
|
||||
break;
|
||||
}
|
||||
if num <= &head_block_num {
|
||||
break;
|
||||
}
|
||||
|
||||
trace!("waiting for future block {} > {}", num, head_block_num);
|
||||
consensus_head_receiver.changed().await?;
|
||||
|
||||
head_block_num = consensus_head_receiver.borrow_and_update().number;
|
||||
head_block_num = *consensus_head_receiver.borrow_and_update().number();
|
||||
}
|
||||
|
||||
let head_block_num =
|
||||
head_block_num.expect("we should only get here if we have a head block");
|
||||
|
||||
let block_depth = if num >= &head_block_num {
|
||||
0
|
||||
} else {
|
||||
(head_block_num - num).as_u64()
|
||||
};
|
||||
let block_depth = (head_block_num - num).as_u64();
|
||||
|
||||
// try to get the hash from our cache
|
||||
// deref to not keep the lock open
|
||||
@ -276,8 +296,10 @@ impl Web3Rpcs {
|
||||
|
||||
let block: ArcBlock = serde_json::from_str(raw_block.get())?;
|
||||
|
||||
let block = Web3ProxyBlock::from(block);
|
||||
|
||||
// the block was fetched using eth_getBlockByNumber, so it should have all fields and be on the heaviest chain
|
||||
let block = self.save_block(block, true).await?;
|
||||
let block = self.try_cache_block(block, true).await?;
|
||||
|
||||
Ok((block, block_depth))
|
||||
}
|
||||
@ -288,18 +310,16 @@ impl Web3Rpcs {
|
||||
block_receiver: flume::Receiver<BlockAndRpc>,
|
||||
// TODO: document that this is a watch sender and not a broadcast! if things get busy, blocks might get missed
|
||||
// Geth's subscriptions have the same potential for skipping blocks.
|
||||
head_block_sender: watch::Sender<ArcBlock>,
|
||||
head_block_sender: watch::Sender<Web3ProxyBlock>,
|
||||
pending_tx_sender: Option<broadcast::Sender<TxStatus>>,
|
||||
) -> anyhow::Result<()> {
|
||||
// TODO: indexmap or hashmap? what hasher? with_capacity?
|
||||
// TODO: this will grow unbounded. prune old heads on this at the same time we prune the graph?
|
||||
let mut connection_heads = ConsensusFinder::default();
|
||||
let mut connection_heads = ConsensusFinder::new(self.max_block_age, self.max_block_lag);
|
||||
|
||||
loop {
|
||||
match block_receiver.recv_async().await {
|
||||
Ok((new_block, rpc)) => {
|
||||
let new_block = new_block.map(Into::into);
|
||||
|
||||
let rpc_name = rpc.name.clone();
|
||||
|
||||
if let Err(err) = self
|
||||
@ -313,7 +333,7 @@ impl Web3Rpcs {
|
||||
)
|
||||
.await
|
||||
{
|
||||
warn!("unable to process block from rpc {}: {:?}", rpc_name, err);
|
||||
warn!("unable to process block from rpc {}: {:#?}", rpc_name, err);
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
@ -331,60 +351,72 @@ impl Web3Rpcs {
|
||||
&self,
|
||||
authorization: &Arc<Authorization>,
|
||||
consensus_finder: &mut ConsensusFinder,
|
||||
rpc_head_block: Option<SavedBlock>,
|
||||
rpc_head_block: Option<Web3ProxyBlock>,
|
||||
rpc: Arc<Web3Rpc>,
|
||||
head_block_sender: &watch::Sender<ArcBlock>,
|
||||
head_block_sender: &watch::Sender<Web3ProxyBlock>,
|
||||
pending_tx_sender: &Option<broadcast::Sender<TxStatus>>,
|
||||
) -> anyhow::Result<()> {
|
||||
// TODO: how should we handle an error here?
|
||||
if !consensus_finder
|
||||
.update_rpc(rpc_head_block.clone(), rpc.clone(), self)
|
||||
.await?
|
||||
.await
|
||||
.context("failed to update rpc")?
|
||||
{
|
||||
// nothing changed. no need
|
||||
// nothing changed. no need to scan for a new consensus head
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let new_synced_connections = consensus_finder
|
||||
.best_consensus_connections(authorization, self)
|
||||
.await;
|
||||
.await
|
||||
.context("no consensus head block!")
|
||||
.map_err(|err| {
|
||||
self.watch_consensus_rpcs_sender
|
||||
.send_replace(Arc::new(Default::default()));
|
||||
|
||||
err
|
||||
})?;
|
||||
|
||||
// TODO: what should we do if the block number of new_synced_connections is < old_synced_connections? wait?
|
||||
|
||||
let includes_backups = new_synced_connections.includes_backups;
|
||||
let backups_needed = new_synced_connections.backups_needed;
|
||||
let consensus_head_block = new_synced_connections.head_block.clone();
|
||||
let num_consensus_rpcs = new_synced_connections.num_conns();
|
||||
let num_checked_rpcs = new_synced_connections.num_checked_conns;
|
||||
let num_active_rpcs = consensus_finder.all.rpc_name_to_hash.len();
|
||||
let num_checked_rpcs = 0; // TODO: figure this out
|
||||
let num_active_rpcs = consensus_finder
|
||||
.all_rpcs_group()
|
||||
.map(|x| x.len())
|
||||
.unwrap_or_default();
|
||||
let total_rpcs = self.conns.len();
|
||||
|
||||
let old_consensus_head_connections = self
|
||||
.watch_consensus_connections_sender
|
||||
.watch_consensus_rpcs_sender
|
||||
.send_replace(Arc::new(new_synced_connections));
|
||||
|
||||
let includes_backups_str = if includes_backups { "B " } else { "" };
|
||||
let backups_voted_str = if backups_needed { "B " } else { "" };
|
||||
|
||||
if let Some(consensus_saved_block) = consensus_head_block {
|
||||
if let Some(consensus_head_block) = consensus_head_block {
|
||||
match &old_consensus_head_connections.head_block {
|
||||
None => {
|
||||
debug!(
|
||||
"first {}{}/{}/{}/{} block={}, rpc={}",
|
||||
includes_backups_str,
|
||||
backups_voted_str,
|
||||
num_consensus_rpcs,
|
||||
num_checked_rpcs,
|
||||
num_active_rpcs,
|
||||
total_rpcs,
|
||||
consensus_saved_block,
|
||||
consensus_head_block,
|
||||
rpc,
|
||||
);
|
||||
|
||||
if includes_backups {
|
||||
if backups_needed {
|
||||
// TODO: what else should be in this error?
|
||||
warn!("Backup RPCs are in use!");
|
||||
}
|
||||
|
||||
// this should already be cached
|
||||
let consensus_head_block =
|
||||
self.save_block(consensus_saved_block.block, true).await?;
|
||||
self.try_cache_block(consensus_head_block, true).await?;
|
||||
|
||||
head_block_sender
|
||||
.send(consensus_head_block)
|
||||
@ -396,46 +428,45 @@ impl Web3Rpcs {
|
||||
.map(|x| x.to_string())
|
||||
.unwrap_or_else(|| "None".to_string());
|
||||
|
||||
match consensus_saved_block.number().cmp(&old_head_block.number()) {
|
||||
match consensus_head_block.number().cmp(&old_head_block.number()) {
|
||||
Ordering::Equal => {
|
||||
// multiple blocks with the same fork!
|
||||
if consensus_saved_block.hash() == old_head_block.hash() {
|
||||
if consensus_head_block.hash() == old_head_block.hash() {
|
||||
// no change in hash. no need to use head_block_sender
|
||||
// TODO: trace level if rpc is backup
|
||||
debug!(
|
||||
"con {}{}/{}/{}/{} con={} rpc={}@{}",
|
||||
includes_backups_str,
|
||||
backups_voted_str,
|
||||
num_consensus_rpcs,
|
||||
num_checked_rpcs,
|
||||
num_active_rpcs,
|
||||
total_rpcs,
|
||||
consensus_saved_block,
|
||||
consensus_head_block,
|
||||
rpc,
|
||||
rpc_head_str,
|
||||
)
|
||||
} else {
|
||||
// hash changed
|
||||
|
||||
if includes_backups {
|
||||
if backups_needed {
|
||||
// TODO: what else should be in this error?
|
||||
warn!("Backup RPCs are in use!");
|
||||
}
|
||||
|
||||
debug!(
|
||||
"unc {}{}/{}/{}/{} con_head={} old={} rpc={}@{}",
|
||||
includes_backups_str,
|
||||
backups_voted_str,
|
||||
num_consensus_rpcs,
|
||||
num_checked_rpcs,
|
||||
num_active_rpcs,
|
||||
total_rpcs,
|
||||
consensus_saved_block,
|
||||
consensus_head_block,
|
||||
old_head_block,
|
||||
rpc,
|
||||
rpc_head_str,
|
||||
);
|
||||
|
||||
let consensus_head_block = self
|
||||
.save_block(consensus_saved_block.block, true)
|
||||
.try_cache_block(consensus_head_block, true)
|
||||
.await
|
||||
.context("save consensus_head_block as heaviest chain")?;
|
||||
|
||||
@ -449,25 +480,25 @@ impl Web3Rpcs {
|
||||
// TODO: better log
|
||||
warn!(
|
||||
"chain rolled back {}{}/{}/{}/{} con={} old={} rpc={}@{}",
|
||||
includes_backups_str,
|
||||
backups_voted_str,
|
||||
num_consensus_rpcs,
|
||||
num_checked_rpcs,
|
||||
num_active_rpcs,
|
||||
total_rpcs,
|
||||
consensus_saved_block,
|
||||
consensus_head_block,
|
||||
old_head_block,
|
||||
rpc,
|
||||
rpc_head_str,
|
||||
);
|
||||
|
||||
if includes_backups {
|
||||
if backups_needed {
|
||||
// TODO: what else should be in this error?
|
||||
warn!("Backup RPCs are in use!");
|
||||
}
|
||||
|
||||
// TODO: tell save_block to remove any higher block numbers from the cache. not needed because we have other checks on requested blocks being > head, but still seems like a good idea
|
||||
let consensus_head_block = self
|
||||
.save_block(consensus_saved_block.block, true)
|
||||
.try_cache_block(consensus_head_block, true)
|
||||
.await
|
||||
.context(
|
||||
"save_block sending consensus_head_block as heaviest chain",
|
||||
@ -480,23 +511,23 @@ impl Web3Rpcs {
|
||||
Ordering::Greater => {
|
||||
debug!(
|
||||
"new {}{}/{}/{}/{} con={} rpc={}@{}",
|
||||
includes_backups_str,
|
||||
backups_voted_str,
|
||||
num_consensus_rpcs,
|
||||
num_checked_rpcs,
|
||||
num_active_rpcs,
|
||||
total_rpcs,
|
||||
consensus_saved_block,
|
||||
consensus_head_block,
|
||||
rpc,
|
||||
rpc_head_str,
|
||||
);
|
||||
|
||||
if includes_backups {
|
||||
if backups_needed {
|
||||
// TODO: what else should be in this error?
|
||||
warn!("Backup RPCs are in use!");
|
||||
}
|
||||
|
||||
let consensus_head_block =
|
||||
self.save_block(consensus_saved_block.block, true).await?;
|
||||
self.try_cache_block(consensus_head_block, true).await?;
|
||||
|
||||
head_block_sender.send(consensus_head_block)?;
|
||||
}
|
||||
@ -512,7 +543,7 @@ impl Web3Rpcs {
|
||||
if num_checked_rpcs >= self.min_head_rpcs {
|
||||
error!(
|
||||
"non {}{}/{}/{}/{} rpc={}@{}",
|
||||
includes_backups_str,
|
||||
backups_voted_str,
|
||||
num_consensus_rpcs,
|
||||
num_checked_rpcs,
|
||||
num_active_rpcs,
|
||||
@ -523,7 +554,7 @@ impl Web3Rpcs {
|
||||
} else {
|
||||
debug!(
|
||||
"non {}{}/{}/{}/{} rpc={}@{}",
|
||||
includes_backups_str,
|
||||
backups_voted_str,
|
||||
num_consensus_rpcs,
|
||||
num_checked_rpcs,
|
||||
num_active_rpcs,
|
||||
@ -537,403 +568,3 @@ impl Web3Rpcs {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
struct ConnectionsGroup {
|
||||
/// TODO: this group might not actually include backups, but they were at leastchecked
|
||||
includes_backups: bool,
|
||||
rpc_name_to_hash: HashMap<String, H256>,
|
||||
}
|
||||
|
||||
impl ConnectionsGroup {
|
||||
fn new(with_backups: bool) -> Self {
|
||||
Self {
|
||||
includes_backups: with_backups,
|
||||
rpc_name_to_hash: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
fn without_backups() -> Self {
|
||||
Self::new(false)
|
||||
}
|
||||
|
||||
fn with_backups() -> Self {
|
||||
Self::new(true)
|
||||
}
|
||||
|
||||
fn remove(&mut self, rpc: &Web3Rpc) -> Option<H256> {
|
||||
self.rpc_name_to_hash.remove(rpc.name.as_str())
|
||||
}
|
||||
|
||||
fn insert(&mut self, rpc: &Web3Rpc, block_hash: H256) -> Option<H256> {
|
||||
self.rpc_name_to_hash.insert(rpc.name.clone(), block_hash)
|
||||
}
|
||||
|
||||
// TODO: i don't love having this here. move to web3_connections?
|
||||
async fn get_block_from_rpc(
|
||||
&self,
|
||||
rpc_name: &str,
|
||||
hash: &H256,
|
||||
authorization: &Arc<Authorization>,
|
||||
web3_rpcs: &Web3Rpcs,
|
||||
) -> anyhow::Result<ArcBlock> {
|
||||
// // TODO: why does this happen?!?! seems to only happen with uncled blocks
|
||||
// // TODO: maybe we should do try_get_with?
|
||||
// // TODO: maybe we should just continue. this only seems to happen when an older block is received
|
||||
// warn!(
|
||||
// "Missing connection_head_block in block_hashes. Fetching now. hash={}. other={}",
|
||||
// connection_head_hash, conn_name
|
||||
// );
|
||||
|
||||
// this option should almost always be populated. if the connection reconnects at a bad time it might not be available though
|
||||
// TODO: if this is None, I think we should error.
|
||||
let rpc = web3_rpcs.conns.get(rpc_name);
|
||||
|
||||
web3_rpcs.block(authorization, hash, rpc).await
|
||||
}
|
||||
|
||||
// TODO: do this during insert/remove?
|
||||
pub(self) async fn highest_block(
|
||||
&self,
|
||||
authorization: &Arc<Authorization>,
|
||||
web3_rpcs: &Web3Rpcs,
|
||||
) -> Option<ArcBlock> {
|
||||
let mut checked_heads = HashSet::with_capacity(self.rpc_name_to_hash.len());
|
||||
let mut highest_block = None::<ArcBlock>;
|
||||
|
||||
for (rpc_name, rpc_head_hash) in self.rpc_name_to_hash.iter() {
|
||||
// don't waste time checking the same hash multiple times
|
||||
if checked_heads.contains(rpc_head_hash) {
|
||||
continue;
|
||||
}
|
||||
|
||||
let rpc_block = match self
|
||||
.get_block_from_rpc(rpc_name, rpc_head_hash, authorization, web3_rpcs)
|
||||
.await
|
||||
{
|
||||
Ok(x) => x,
|
||||
Err(err) => {
|
||||
warn!(
|
||||
"failed getting block {} from {} while finding highest block number: {:?}",
|
||||
rpc_head_hash, rpc_name, err,
|
||||
);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
checked_heads.insert(rpc_head_hash);
|
||||
|
||||
// if this is the first block we've tried
|
||||
// or if this rpc's newest block has a higher number
|
||||
// we used to check total difficulty, but that isn't a thing anymore on ETH
|
||||
// TODO: we still need total difficulty on some other PoW chains. whats annoying is it isn't considered part of the "block header" just the block. so websockets don't return it
|
||||
let highest_num = highest_block
|
||||
.as_ref()
|
||||
.map(|x| x.number.expect("blocks here should always have a number"));
|
||||
let rpc_num = rpc_block.as_ref().number;
|
||||
|
||||
if rpc_num > highest_num {
|
||||
highest_block = Some(rpc_block);
|
||||
}
|
||||
}
|
||||
|
||||
highest_block
|
||||
}
|
||||
|
||||
pub(self) async fn consensus_head_connections(
|
||||
&self,
|
||||
authorization: &Arc<Authorization>,
|
||||
web3_rpcs: &Web3Rpcs,
|
||||
) -> anyhow::Result<ConsensusWeb3Rpcs> {
|
||||
let mut maybe_head_block = match self.highest_block(authorization, web3_rpcs).await {
|
||||
None => return Err(anyhow::anyhow!("No blocks known")),
|
||||
Some(x) => x,
|
||||
};
|
||||
|
||||
let num_known = self.rpc_name_to_hash.len();
|
||||
|
||||
// track rpcs on this heaviest chain so we can build a new ConsensusConnections
|
||||
let mut highest_rpcs = HashSet::<&str>::new();
|
||||
// a running total of the soft limits covered by the rpcs that agree on the head block
|
||||
let mut highest_rpcs_sum_soft_limit: u32 = 0;
|
||||
// TODO: also track highest_rpcs_sum_hard_limit? llama doesn't need this, so it can wait
|
||||
|
||||
// check the highest work block for a set of rpcs that can serve our request load
|
||||
// if it doesn't have enough rpcs for our request load, check the parent block
|
||||
// TODO: loop for how many parent blocks? we don't want to serve blocks that are too far behind. probably different per chain
|
||||
// TODO: this loop is pretty long. any way to clean up this code?
|
||||
for _ in 0..6 {
|
||||
let maybe_head_hash = maybe_head_block
|
||||
.hash
|
||||
.as_ref()
|
||||
.expect("blocks here always need hashes");
|
||||
|
||||
// find all rpcs with maybe_head_block as their current head
|
||||
for (rpc_name, rpc_head_hash) in self.rpc_name_to_hash.iter() {
|
||||
if rpc_head_hash != maybe_head_hash {
|
||||
// connection is not on the desired block
|
||||
continue;
|
||||
}
|
||||
if highest_rpcs.contains(rpc_name.as_str()) {
|
||||
// connection is on a child block
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Some(rpc) = web3_rpcs.conns.get(rpc_name.as_str()) {
|
||||
highest_rpcs.insert(rpc_name);
|
||||
highest_rpcs_sum_soft_limit += rpc.soft_limit;
|
||||
} else {
|
||||
// i don't think this is an error. i think its just if a reconnect is currently happening
|
||||
warn!("connection missing: {}", rpc_name);
|
||||
debug!("web3_rpcs.conns: {:#?}", web3_rpcs.conns);
|
||||
}
|
||||
}
|
||||
|
||||
if highest_rpcs_sum_soft_limit >= web3_rpcs.min_sum_soft_limit
|
||||
&& highest_rpcs.len() >= web3_rpcs.min_head_rpcs
|
||||
{
|
||||
// we have enough servers with enough requests
|
||||
break;
|
||||
}
|
||||
|
||||
// not enough rpcs yet. check the parent block
|
||||
if let Some(parent_block) = web3_rpcs.block_hashes.get(&maybe_head_block.parent_hash) {
|
||||
// trace!(
|
||||
// child=%maybe_head_hash, parent=%parent_block.hash.unwrap(), "avoiding thundering herd",
|
||||
// );
|
||||
|
||||
maybe_head_block = parent_block;
|
||||
continue;
|
||||
} else {
|
||||
if num_known < web3_rpcs.min_head_rpcs {
|
||||
return Err(anyhow::anyhow!(
|
||||
"not enough rpcs connected: {}/{}/{}",
|
||||
highest_rpcs.len(),
|
||||
num_known,
|
||||
web3_rpcs.min_head_rpcs,
|
||||
));
|
||||
} else {
|
||||
let soft_limit_percent = (highest_rpcs_sum_soft_limit as f32
|
||||
/ web3_rpcs.min_sum_soft_limit as f32)
|
||||
* 100.0;
|
||||
|
||||
return Err(anyhow::anyhow!(
|
||||
"ran out of parents to check. rpcs {}/{}/{}. soft limit: {:.2}% ({}/{})",
|
||||
highest_rpcs.len(),
|
||||
num_known,
|
||||
web3_rpcs.min_head_rpcs,
|
||||
highest_rpcs_sum_soft_limit,
|
||||
web3_rpcs.min_sum_soft_limit,
|
||||
soft_limit_percent,
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: if consensus_head_rpcs.is_empty, try another method of finding the head block. will need to change the return Err above into breaks.
|
||||
|
||||
// we've done all the searching for the heaviest block that we can
|
||||
if highest_rpcs.len() < web3_rpcs.min_head_rpcs
|
||||
|| highest_rpcs_sum_soft_limit < web3_rpcs.min_sum_soft_limit
|
||||
{
|
||||
// if we get here, not enough servers are synced. return an error
|
||||
let soft_limit_percent =
|
||||
(highest_rpcs_sum_soft_limit as f32 / web3_rpcs.min_sum_soft_limit as f32) * 100.0;
|
||||
|
||||
return Err(anyhow::anyhow!(
|
||||
"Not enough resources. rpcs {}/{}/{}. soft limit: {:.2}% ({}/{})",
|
||||
highest_rpcs.len(),
|
||||
num_known,
|
||||
web3_rpcs.min_head_rpcs,
|
||||
highest_rpcs_sum_soft_limit,
|
||||
web3_rpcs.min_sum_soft_limit,
|
||||
soft_limit_percent,
|
||||
));
|
||||
}
|
||||
|
||||
// success! this block has enough soft limit and nodes on it (or on later blocks)
|
||||
let conns: Vec<Arc<Web3Rpc>> = highest_rpcs
|
||||
.into_iter()
|
||||
.filter_map(|conn_name| web3_rpcs.conns.get(conn_name).cloned())
|
||||
.collect();
|
||||
|
||||
// TODO: DEBUG only check
|
||||
let _ = maybe_head_block
|
||||
.hash
|
||||
.expect("head blocks always have hashes");
|
||||
let _ = maybe_head_block
|
||||
.number
|
||||
.expect("head blocks always have numbers");
|
||||
|
||||
let consensus_head_block: SavedBlock = maybe_head_block.into();
|
||||
|
||||
Ok(ConsensusWeb3Rpcs {
|
||||
head_block: Some(consensus_head_block),
|
||||
conns,
|
||||
num_checked_conns: self.rpc_name_to_hash.len(),
|
||||
includes_backups: self.includes_backups,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// A ConsensusConnections builder that tracks all connection heads across multiple groups of servers
|
||||
pub struct ConsensusFinder {
|
||||
/// only main servers
|
||||
main: ConnectionsGroup,
|
||||
/// main and backup servers
|
||||
all: ConnectionsGroup,
|
||||
}
|
||||
|
||||
impl Default for ConsensusFinder {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
main: ConnectionsGroup::without_backups(),
|
||||
all: ConnectionsGroup::with_backups(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ConsensusFinder {
|
||||
fn remove(&mut self, rpc: &Web3Rpc) -> Option<H256> {
|
||||
// TODO: should we have multiple backup tiers? (remote datacenters vs third party)
|
||||
if !rpc.backup {
|
||||
self.main.remove(rpc);
|
||||
}
|
||||
self.all.remove(rpc)
|
||||
}
|
||||
|
||||
fn insert(&mut self, rpc: &Web3Rpc, new_hash: H256) -> Option<H256> {
|
||||
// TODO: should we have multiple backup tiers? (remote datacenters vs third party)
|
||||
if !rpc.backup {
|
||||
self.main.insert(rpc, new_hash);
|
||||
}
|
||||
self.all.insert(rpc, new_hash)
|
||||
}
|
||||
|
||||
/// Update our tracking of the rpc and return true if something changed
|
||||
async fn update_rpc(
|
||||
&mut self,
|
||||
rpc_head_block: Option<SavedBlock>,
|
||||
rpc: Arc<Web3Rpc>,
|
||||
// we need this so we can save the block to caches. i don't like it though. maybe we should use a lazy_static Cache wrapper that has a "save_block" method?. i generally dislike globals but i also dislike all the types having to pass eachother around
|
||||
web3_connections: &Web3Rpcs,
|
||||
) -> anyhow::Result<bool> {
|
||||
// add the rpc's block to connection_heads, or remove the rpc from connection_heads
|
||||
let changed = match rpc_head_block {
|
||||
Some(mut rpc_head_block) => {
|
||||
// we don't know if its on the heaviest chain yet
|
||||
rpc_head_block.block = web3_connections
|
||||
.save_block(rpc_head_block.block, false)
|
||||
.await?;
|
||||
|
||||
// we used to remove here if the block was too far behind. but it just made things more complicated
|
||||
|
||||
let rpc_head_hash = rpc_head_block.hash();
|
||||
|
||||
if let Some(prev_hash) = self.insert(&rpc, rpc_head_hash) {
|
||||
if prev_hash == rpc_head_hash {
|
||||
// this block was already sent by this rpc. return early
|
||||
false
|
||||
} else {
|
||||
// new block for this rpc
|
||||
true
|
||||
}
|
||||
} else {
|
||||
// first block for this rpc
|
||||
true
|
||||
}
|
||||
}
|
||||
None => {
|
||||
if self.remove(&rpc).is_none() {
|
||||
// this rpc was already removed
|
||||
false
|
||||
} else {
|
||||
// rpc head changed from being synced to not
|
||||
true
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
Ok(changed)
|
||||
}
|
||||
|
||||
// TODO: this could definitely be cleaner. i don't like the error handling/unwrapping
|
||||
async fn best_consensus_connections(
|
||||
&mut self,
|
||||
authorization: &Arc<Authorization>,
|
||||
web3_connections: &Web3Rpcs,
|
||||
) -> ConsensusWeb3Rpcs {
|
||||
let highest_block_num = match self
|
||||
.all
|
||||
.highest_block(authorization, web3_connections)
|
||||
.await
|
||||
{
|
||||
None => {
|
||||
return ConsensusWeb3Rpcs::default();
|
||||
}
|
||||
Some(x) => x.number.expect("blocks here should always have a number"),
|
||||
};
|
||||
|
||||
// TODO: also needs to be not less than our current head
|
||||
let mut min_block_num = highest_block_num.saturating_sub(U64::from(5));
|
||||
|
||||
// we also want to be sure we don't ever go backwards!
|
||||
if let Some(current_consensus_head_num) = web3_connections.head_block_num() {
|
||||
min_block_num = min_block_num.max(current_consensus_head_num);
|
||||
}
|
||||
|
||||
// TODO: pass `min_block_num` to consensus_head_connections?
|
||||
let consensus_head_for_main = self
|
||||
.main
|
||||
.consensus_head_connections(authorization, web3_connections)
|
||||
.await
|
||||
.map_err(|err| err.context("cannot use main group"));
|
||||
|
||||
let consensus_num_for_main = consensus_head_for_main
|
||||
.as_ref()
|
||||
.ok()
|
||||
.map(|x| x.head_block.as_ref().unwrap().number());
|
||||
|
||||
if let Some(consensus_num_for_main) = consensus_num_for_main {
|
||||
if consensus_num_for_main >= min_block_num {
|
||||
return consensus_head_for_main.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: pass `min_block_num` to consensus_head_connections?
|
||||
let consensus_connections_for_all = match self
|
||||
.all
|
||||
.consensus_head_connections(authorization, web3_connections)
|
||||
.await
|
||||
{
|
||||
Err(err) => {
|
||||
if self.all.rpc_name_to_hash.len() < web3_connections.min_head_rpcs {
|
||||
debug!("No consensus head yet: {}", err);
|
||||
}
|
||||
return ConsensusWeb3Rpcs::default();
|
||||
}
|
||||
Ok(x) => x,
|
||||
};
|
||||
|
||||
let consensus_num_for_all = consensus_connections_for_all
|
||||
.head_block
|
||||
.as_ref()
|
||||
.map(|x| x.number());
|
||||
|
||||
if consensus_num_for_all > consensus_num_for_main {
|
||||
if consensus_num_for_all < Some(min_block_num) {
|
||||
// TODO: this should have an alarm in sentry
|
||||
error!("CONSENSUS HEAD w/ BACKUP NODES IS VERY OLD!");
|
||||
}
|
||||
consensus_connections_for_all
|
||||
} else {
|
||||
if let Ok(x) = consensus_head_for_main {
|
||||
error!("CONSENSUS HEAD IS VERY OLD! Backup RPCs did not improve this situation");
|
||||
x
|
||||
} else {
|
||||
// TODO: i don't think we need this error. and i doublt we'll ever even get here
|
||||
error!("NO CONSENSUS HEAD!");
|
||||
ConsensusWeb3Rpcs::default()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
527
web3_proxy/src/rpcs/consensus.rs
Normal file
527
web3_proxy/src/rpcs/consensus.rs
Normal file
@ -0,0 +1,527 @@
|
||||
use crate::frontend::authorization::Authorization;
|
||||
|
||||
use super::blockchain::Web3ProxyBlock;
|
||||
use super::many::Web3Rpcs;
|
||||
use super::one::Web3Rpc;
|
||||
use ethers::prelude::{H256, U64};
|
||||
use hashbrown::{HashMap, HashSet};
|
||||
use log::{debug, trace, warn};
|
||||
use serde::Serialize;
|
||||
use std::collections::BTreeMap;
|
||||
use std::fmt;
|
||||
use std::sync::Arc;
|
||||
|
||||
/// A collection of Web3Rpcs that are on the same block.
|
||||
/// Serialize is so we can print it on our debug endpoint
|
||||
#[derive(Clone, Default, Serialize)]
|
||||
pub struct ConsensusWeb3Rpcs {
|
||||
pub(super) head_block: Option<Web3ProxyBlock>,
|
||||
// TODO: this should be able to serialize, but it isn't
|
||||
#[serde(skip_serializing)]
|
||||
pub(super) conns: Vec<Arc<Web3Rpc>>,
|
||||
pub(super) backups_voted: Option<Web3ProxyBlock>,
|
||||
pub(super) backups_needed: bool,
|
||||
}
|
||||
|
||||
impl ConsensusWeb3Rpcs {
|
||||
pub fn num_conns(&self) -> usize {
|
||||
self.conns.len()
|
||||
}
|
||||
|
||||
pub fn sum_soft_limit(&self) -> u32 {
|
||||
self.conns.iter().fold(0, |sum, rpc| sum + rpc.soft_limit)
|
||||
}
|
||||
|
||||
// TODO: sum_hard_limit?
|
||||
}
|
||||
|
||||
impl fmt::Debug for ConsensusWeb3Rpcs {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
// TODO: the default formatter takes forever to write. this is too quiet though
|
||||
// TODO: print the actual conns?
|
||||
f.debug_struct("ConsensusConnections")
|
||||
.field("head_block", &self.head_block)
|
||||
.field("num_conns", &self.conns.len())
|
||||
.finish_non_exhaustive()
|
||||
}
|
||||
}
|
||||
|
||||
impl Web3Rpcs {
|
||||
// TODO: return a ref?
|
||||
pub fn head_block(&self) -> Option<Web3ProxyBlock> {
|
||||
self.watch_consensus_head_receiver
|
||||
.as_ref()
|
||||
.map(|x| x.borrow().clone())
|
||||
}
|
||||
|
||||
// TODO: return a ref?
|
||||
pub fn head_block_hash(&self) -> Option<H256> {
|
||||
self.head_block().map(|x| *x.hash())
|
||||
}
|
||||
|
||||
// TODO: return a ref?
|
||||
pub fn head_block_num(&self) -> Option<U64> {
|
||||
self.head_block().map(|x| *x.number())
|
||||
}
|
||||
|
||||
pub fn synced(&self) -> bool {
|
||||
!self.watch_consensus_rpcs_sender.borrow().conns.is_empty()
|
||||
}
|
||||
|
||||
pub fn num_synced_rpcs(&self) -> usize {
|
||||
self.watch_consensus_rpcs_sender.borrow().conns.len()
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ConnectionsGroup {
|
||||
rpc_name_to_block: HashMap<String, Web3ProxyBlock>,
|
||||
// TODO: what if there are two blocks with the same number?
|
||||
highest_block: Option<Web3ProxyBlock>,
|
||||
}
|
||||
|
||||
impl Default for ConnectionsGroup {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
rpc_name_to_block: Default::default(),
|
||||
highest_block: Default::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ConnectionsGroup {
|
||||
pub fn len(&self) -> usize {
|
||||
self.rpc_name_to_block.len()
|
||||
}
|
||||
|
||||
fn remove(&mut self, rpc_name: &str) -> Option<Web3ProxyBlock> {
|
||||
if let Some(removed_block) = self.rpc_name_to_block.remove(rpc_name) {
|
||||
match self.highest_block.as_mut() {
|
||||
None => {}
|
||||
Some(current_highest_block) => {
|
||||
if removed_block.hash() == current_highest_block.hash() {
|
||||
for maybe_highest_block in self.rpc_name_to_block.values() {
|
||||
if maybe_highest_block.number() > current_highest_block.number() {
|
||||
*current_highest_block = maybe_highest_block.clone();
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Some(removed_block)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
fn insert(&mut self, rpc: &Web3Rpc, block: Web3ProxyBlock) -> Option<Web3ProxyBlock> {
|
||||
// TODO: what about a reorg to the same height?
|
||||
if Some(block.number()) > self.highest_block.as_ref().map(|x| x.number()) {
|
||||
self.highest_block = Some(block.clone());
|
||||
}
|
||||
|
||||
self.rpc_name_to_block.insert(rpc.name.clone(), block)
|
||||
}
|
||||
|
||||
// // TODO: do this during insert/remove?
|
||||
// pub(self) async fn highest_block(
|
||||
// &self,
|
||||
// authorization: &Arc<Authorization>,
|
||||
// web3_rpcs: &Web3Rpcs,
|
||||
// ) -> Option<ArcBlock> {
|
||||
// let mut checked_heads = HashSet::with_capacity(self.rpc_name_to_hash.len());
|
||||
// let mut highest_block = None::<ArcBlock>;
|
||||
|
||||
// for (rpc_name, rpc_head_hash) in self.rpc_name_to_hash.iter() {
|
||||
// // don't waste time checking the same hash multiple times
|
||||
// if checked_heads.contains(rpc_head_hash) {
|
||||
// continue;
|
||||
// }
|
||||
|
||||
// let rpc_block = match web3_rpcs
|
||||
// .get_block_from_rpc(rpc_name, rpc_head_hash, authorization)
|
||||
// .await
|
||||
// {
|
||||
// Ok(x) => x,
|
||||
// Err(err) => {
|
||||
// warn!(
|
||||
// "failed getting block {} from {} while finding highest block number: {:?}",
|
||||
// rpc_head_hash, rpc_name, err,
|
||||
// );
|
||||
// continue;
|
||||
// }
|
||||
// };
|
||||
|
||||
// checked_heads.insert(rpc_head_hash);
|
||||
|
||||
// // if this is the first block we've tried
|
||||
// // or if this rpc's newest block has a higher number
|
||||
// // we used to check total difficulty, but that isn't a thing anymore on ETH
|
||||
// // TODO: we still need total difficulty on some other PoW chains. whats annoying is it isn't considered part of the "block header" just the block. so websockets don't return it
|
||||
// let highest_num = highest_block
|
||||
// .as_ref()
|
||||
// .map(|x| x.number.expect("blocks here should always have a number"));
|
||||
// let rpc_num = rpc_block.as_ref().number;
|
||||
|
||||
// if rpc_num > highest_num {
|
||||
// highest_block = Some(rpc_block);
|
||||
// }
|
||||
// }
|
||||
|
||||
// highest_block
|
||||
// }
|
||||
|
||||
/// min_consensus_block_num keeps us from ever going backwards.
|
||||
/// TODO: think about min_consensus_block_num more. i think this might cause an outage if the chain is doing weird things. but 503s is probably better than broken data.
|
||||
pub(self) async fn consensus_head_connections(
|
||||
&self,
|
||||
authorization: &Arc<Authorization>,
|
||||
web3_rpcs: &Web3Rpcs,
|
||||
min_consensus_block_num: Option<U64>,
|
||||
) -> anyhow::Result<ConsensusWeb3Rpcs> {
|
||||
let mut maybe_head_block = match self.highest_block.clone() {
|
||||
None => return Err(anyhow::anyhow!("no blocks known")),
|
||||
Some(x) => x,
|
||||
};
|
||||
|
||||
// TODO: take max_distance_consensus_to_highest as an argument?
|
||||
// TODO: what if someone's backup node is misconfigured and goes on a really fast forked chain?
|
||||
let max_lag_consensus_to_highest =
|
||||
if let Some(min_consensus_block_num) = min_consensus_block_num {
|
||||
maybe_head_block
|
||||
.number()
|
||||
.saturating_sub(min_consensus_block_num)
|
||||
.as_u64()
|
||||
} else {
|
||||
// TODO: get from app config? different chains probably should have different values. 10 is probably too much
|
||||
10
|
||||
};
|
||||
|
||||
let num_known = self.rpc_name_to_block.len();
|
||||
|
||||
if num_known < web3_rpcs.min_head_rpcs {
|
||||
return Err(anyhow::anyhow!(
|
||||
"not enough rpcs connected: {}/{}",
|
||||
num_known,
|
||||
web3_rpcs.min_head_rpcs,
|
||||
));
|
||||
}
|
||||
|
||||
let mut primary_rpcs_voted: Option<Web3ProxyBlock> = None;
|
||||
let mut backup_rpcs_voted: Option<Web3ProxyBlock> = None;
|
||||
|
||||
// track rpcs on this heaviest chain so we can build a new ConsensusConnections
|
||||
let mut primary_consensus_rpcs = HashSet::<&str>::new();
|
||||
let mut backup_consensus_rpcs = HashSet::<&str>::new();
|
||||
|
||||
// a running total of the soft limits covered by the rpcs that agree on the head block
|
||||
let mut primary_sum_soft_limit: u32 = 0;
|
||||
let mut backup_sum_soft_limit: u32 = 0;
|
||||
|
||||
// TODO: also track the sum of *available* hard_limits. if any servers have no hard limits, use their soft limit or no limit?
|
||||
|
||||
// check the highest work block for a set of rpcs that can serve our request load
|
||||
// if it doesn't have enough rpcs for our request load, check the parent block
|
||||
// TODO: loop for how many parent blocks? we don't want to serve blocks that are too far behind. probably different per chain
|
||||
// TODO: this loop is pretty long. any way to clean up this code?
|
||||
for _ in 0..max_lag_consensus_to_highest {
|
||||
let maybe_head_hash = maybe_head_block.hash();
|
||||
|
||||
// find all rpcs with maybe_head_hash as their current head
|
||||
for (rpc_name, rpc_head) in self.rpc_name_to_block.iter() {
|
||||
if rpc_head.hash() != maybe_head_hash {
|
||||
// connection is not on the desired block
|
||||
continue;
|
||||
}
|
||||
if backup_consensus_rpcs.contains(rpc_name.as_str()) {
|
||||
// connection is on a later block in this same chain
|
||||
continue;
|
||||
}
|
||||
if primary_consensus_rpcs.contains(rpc_name.as_str()) {
|
||||
// connection is on a later block in this same chain
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Some(rpc) = web3_rpcs.conns.get(rpc_name.as_str()) {
|
||||
if backup_rpcs_voted.is_some() {
|
||||
// backups already voted for a head block. don't change it
|
||||
} else {
|
||||
backup_consensus_rpcs.insert(rpc_name);
|
||||
backup_sum_soft_limit += rpc.soft_limit;
|
||||
}
|
||||
if !rpc.backup {
|
||||
primary_consensus_rpcs.insert(rpc_name);
|
||||
primary_sum_soft_limit += rpc.soft_limit;
|
||||
}
|
||||
} else {
|
||||
// i don't think this is an error. i think its just if a reconnect is currently happening
|
||||
warn!("connection missing: {}", rpc_name);
|
||||
debug!("web3_rpcs.conns: {:#?}", web3_rpcs.conns);
|
||||
}
|
||||
}
|
||||
|
||||
if primary_sum_soft_limit >= web3_rpcs.min_sum_soft_limit
|
||||
&& primary_consensus_rpcs.len() >= web3_rpcs.min_head_rpcs
|
||||
{
|
||||
// we have enough servers with enough requests! yey!
|
||||
primary_rpcs_voted = Some(maybe_head_block.clone());
|
||||
break;
|
||||
}
|
||||
|
||||
if backup_rpcs_voted.is_none()
|
||||
&& backup_consensus_rpcs != primary_consensus_rpcs
|
||||
&& backup_sum_soft_limit >= web3_rpcs.min_sum_soft_limit
|
||||
&& backup_consensus_rpcs.len() >= web3_rpcs.min_head_rpcs
|
||||
{
|
||||
// if we include backup servers, we have enough servers with high enough limits
|
||||
backup_rpcs_voted = Some(maybe_head_block.clone());
|
||||
}
|
||||
|
||||
// not enough rpcs on this block. check the parent block
|
||||
match web3_rpcs
|
||||
.block(authorization, &maybe_head_block.parent_hash(), None)
|
||||
.await
|
||||
{
|
||||
Ok(parent_block) => {
|
||||
// trace!(
|
||||
// child=%maybe_head_hash, parent=%parent_block.hash.unwrap(), "avoiding thundering herd. checking consensus on parent block",
|
||||
// );
|
||||
maybe_head_block = parent_block.into();
|
||||
continue;
|
||||
}
|
||||
Err(err) => {
|
||||
let soft_limit_percent = (primary_sum_soft_limit as f32
|
||||
/ web3_rpcs.min_sum_soft_limit as f32)
|
||||
* 100.0;
|
||||
|
||||
let err_msg = format!("ran out of parents to check. rpcs {}/{}/{}. soft limit: {:.2}% ({}/{}). err: {:#?}",
|
||||
primary_consensus_rpcs.len(),
|
||||
num_known,
|
||||
web3_rpcs.min_head_rpcs,
|
||||
primary_sum_soft_limit,
|
||||
web3_rpcs.min_sum_soft_limit,
|
||||
soft_limit_percent,
|
||||
err,
|
||||
);
|
||||
|
||||
if backup_rpcs_voted.is_some() {
|
||||
warn!("{}", err_msg);
|
||||
break;
|
||||
} else {
|
||||
return Err(anyhow::anyhow!(err_msg));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: if consensus_head_rpcs.is_empty, try another method of finding the head block. will need to change the return Err above into breaks.
|
||||
|
||||
// we've done all the searching for the heaviest block that we can
|
||||
if (primary_consensus_rpcs.len() < web3_rpcs.min_head_rpcs
|
||||
|| primary_sum_soft_limit < web3_rpcs.min_sum_soft_limit)
|
||||
&& backup_rpcs_voted.is_none()
|
||||
{
|
||||
// if we get here, not enough servers are synced. return an error
|
||||
let soft_limit_percent =
|
||||
(primary_sum_soft_limit as f32 / web3_rpcs.min_sum_soft_limit as f32) * 100.0;
|
||||
|
||||
return Err(anyhow::anyhow!(
|
||||
"Not enough resources. rpcs {}/{}/{}. soft limit: {:.2}% ({}/{})",
|
||||
primary_consensus_rpcs.len(),
|
||||
num_known,
|
||||
web3_rpcs.min_head_rpcs,
|
||||
primary_sum_soft_limit,
|
||||
web3_rpcs.min_sum_soft_limit,
|
||||
soft_limit_percent,
|
||||
));
|
||||
}
|
||||
|
||||
// success! this block has enough soft limit and nodes on it (or on later blocks)
|
||||
let conns: Vec<Arc<Web3Rpc>> = primary_consensus_rpcs
|
||||
.into_iter()
|
||||
.filter_map(|conn_name| web3_rpcs.conns.get(conn_name).cloned())
|
||||
.collect();
|
||||
|
||||
#[cfg(debug_assertions)]
|
||||
let _ = maybe_head_block.hash();
|
||||
#[cfg(debug_assertions)]
|
||||
let _ = maybe_head_block.number();
|
||||
|
||||
Ok(ConsensusWeb3Rpcs {
|
||||
head_block: Some(maybe_head_block),
|
||||
conns,
|
||||
backups_voted: backup_rpcs_voted,
|
||||
backups_needed: primary_rpcs_voted.is_none(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// A ConsensusConnections builder that tracks all connection heads across multiple groups of servers
|
||||
pub struct ConsensusFinder {
|
||||
/// backups for all tiers are only used if necessary
|
||||
/// tiers[0] = only tier 0.
|
||||
/// tiers[1] = tier 0 and tier 1
|
||||
/// tiers[n] = tier 0..=n
|
||||
/// This is a BTreeMap and not a Vec because sometimes a tier is empty
|
||||
tiers: BTreeMap<u64, ConnectionsGroup>,
|
||||
/// never serve blocks that are too old
|
||||
max_block_age: Option<u64>,
|
||||
/// tier 0 will be prefered as long as the distance between it and the other tiers is <= max_tier_lag
|
||||
max_block_lag: Option<U64>,
|
||||
}
|
||||
|
||||
impl ConsensusFinder {
|
||||
pub fn new(max_block_age: Option<u64>, max_block_lag: Option<U64>) -> Self {
|
||||
Self {
|
||||
tiers: Default::default(),
|
||||
max_block_age,
|
||||
max_block_lag,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ConsensusFinder {
|
||||
/// get the ConnectionsGroup that contains all rpcs
|
||||
/// panics if there are no tiers
|
||||
pub fn all_rpcs_group(&self) -> Option<&ConnectionsGroup> {
|
||||
self.tiers.values().last()
|
||||
}
|
||||
|
||||
/// get the mutable ConnectionsGroup that contains all rpcs
|
||||
pub fn all_mut(&mut self) -> Option<&mut ConnectionsGroup> {
|
||||
self.tiers.values_mut().last()
|
||||
}
|
||||
|
||||
pub fn remove(&mut self, rpc: &Web3Rpc) -> Option<Web3ProxyBlock> {
|
||||
let mut removed = None;
|
||||
|
||||
for (i, tier_group) in self.tiers.iter_mut().rev() {
|
||||
if i < &rpc.tier {
|
||||
break;
|
||||
}
|
||||
let x = tier_group.remove(rpc.name.as_str());
|
||||
|
||||
if removed.is_none() && x.is_some() {
|
||||
removed = x;
|
||||
}
|
||||
}
|
||||
|
||||
removed
|
||||
}
|
||||
|
||||
/// returns the block that the rpc was on before updating to the new_block
|
||||
pub fn insert(&mut self, rpc: &Web3Rpc, new_block: Web3ProxyBlock) -> Option<Web3ProxyBlock> {
|
||||
let mut old = None;
|
||||
|
||||
for (i, tier_group) in self.tiers.iter_mut().rev() {
|
||||
if i > &rpc.tier {
|
||||
break;
|
||||
}
|
||||
|
||||
// TODO: should new_block be a ref?
|
||||
let x = tier_group.insert(rpc, new_block.clone());
|
||||
|
||||
if old.is_none() && x.is_some() {
|
||||
old = x;
|
||||
}
|
||||
}
|
||||
|
||||
old
|
||||
}
|
||||
|
||||
/// Update our tracking of the rpc and return true if something changed
|
||||
pub(crate) async fn update_rpc(
|
||||
&mut self,
|
||||
rpc_head_block: Option<Web3ProxyBlock>,
|
||||
rpc: Arc<Web3Rpc>,
|
||||
// we need this so we can save the block to caches. i don't like it though. maybe we should use a lazy_static Cache wrapper that has a "save_block" method?. i generally dislike globals but i also dislike all the types having to pass eachother around
|
||||
web3_connections: &Web3Rpcs,
|
||||
) -> anyhow::Result<bool> {
|
||||
// add the rpc's block to connection_heads, or remove the rpc from connection_heads
|
||||
let changed = match rpc_head_block {
|
||||
Some(mut rpc_head_block) => {
|
||||
// we don't know if its on the heaviest chain yet
|
||||
rpc_head_block = web3_connections
|
||||
.try_cache_block(rpc_head_block, false)
|
||||
.await?;
|
||||
|
||||
// if let Some(max_block_lag) = max_block_lag {
|
||||
// if rpc_head_block.number() < ??? {
|
||||
// trace!("rpc_head_block from {} is too far behind! {}", rpc, rpc_head_block);
|
||||
// return Ok(self.remove(&rpc).is_some());
|
||||
// }
|
||||
// }
|
||||
|
||||
if let Some(max_age) = self.max_block_age {
|
||||
if rpc_head_block.age() > max_age {
|
||||
trace!("rpc_head_block from {} is too old! {}", rpc, rpc_head_block);
|
||||
return Ok(self.remove(&rpc).is_some());
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(prev_block) = self.insert(&rpc, rpc_head_block.clone()) {
|
||||
if prev_block.hash() == rpc_head_block.hash() {
|
||||
// this block was already sent by this rpc. return early
|
||||
false
|
||||
} else {
|
||||
// new block for this rpc
|
||||
true
|
||||
}
|
||||
} else {
|
||||
// first block for this rpc
|
||||
true
|
||||
}
|
||||
}
|
||||
None => {
|
||||
if self.remove(&rpc).is_none() {
|
||||
// this rpc was already removed
|
||||
false
|
||||
} else {
|
||||
// rpc head changed from being synced to not
|
||||
true
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
Ok(changed)
|
||||
}
|
||||
|
||||
// TODO: this could definitely be cleaner. i don't like the error handling/unwrapping
|
||||
pub async fn best_consensus_connections(
|
||||
&mut self,
|
||||
authorization: &Arc<Authorization>,
|
||||
web3_connections: &Web3Rpcs,
|
||||
) -> Option<ConsensusWeb3Rpcs> {
|
||||
// TODO: attach context to these?
|
||||
let highest_known_block = self.all_rpcs_group()?.highest_block.as_ref()?;
|
||||
|
||||
trace!("highest_known_block: {}", highest_known_block);
|
||||
|
||||
let min_block_num = self
|
||||
.max_block_lag
|
||||
.map(|x| highest_known_block.number().saturating_sub(x))
|
||||
// we also want to be sure we don't ever go backwards!
|
||||
.max(web3_connections.head_block_num());
|
||||
|
||||
trace!("min_block_num: {:#?}", min_block_num);
|
||||
|
||||
// TODO Should this be a Vec<Result<Option<_, _>>>?
|
||||
// TODO: how should errors be handled?
|
||||
// TODO: find the best tier with a connectionsgroup. best case, this only queries the first tier
|
||||
// TODO: do we need to calculate all of them? I think having highest_known_block included as part of min_block_num should make that unnecessary
|
||||
for (i, x) in self.tiers.iter() {
|
||||
trace!("checking tier {}", i);
|
||||
if let Ok(consensus_head_connections) = x
|
||||
.consensus_head_connections(authorization, web3_connections, min_block_num)
|
||||
.await
|
||||
{
|
||||
trace!("success on tier {}", i);
|
||||
// we got one! hopefully it didn't need to use any backups.
|
||||
// but even if it did need backup servers, that is better than going to a worse tier
|
||||
return Some(consensus_head_connections);
|
||||
}
|
||||
}
|
||||
|
||||
return None;
|
||||
}
|
||||
}
|
@ -1,8 +1,8 @@
|
||||
///! Load balanced communication with a group of web3 rpc providers
|
||||
use super::blockchain::{ArcBlock, BlockHashesCache};
|
||||
use super::blockchain::{BlockHashesCache, Web3ProxyBlock};
|
||||
use super::consensus::ConsensusWeb3Rpcs;
|
||||
use super::one::Web3Rpc;
|
||||
use super::request::{OpenRequestHandle, OpenRequestResult, RequestRevertHandler};
|
||||
use super::synced_connections::ConsensusWeb3Rpcs;
|
||||
use crate::app::{flatten_handle, AnyhowJoinHandle};
|
||||
use crate::config::{BlockAndRpc, TxHashAndRpc, Web3RpcConfig};
|
||||
use crate::frontend::authorization::{Authorization, RequestMetadata};
|
||||
@ -38,9 +38,9 @@ pub struct Web3Rpcs {
|
||||
/// any requests will be forwarded to one (or more) of these connections
|
||||
pub(crate) conns: HashMap<String, Arc<Web3Rpc>>,
|
||||
/// all providers with the same consensus head block. won't update if there is no `self.watch_consensus_head_sender`
|
||||
pub(super) watch_consensus_connections_sender: watch::Sender<Arc<ConsensusWeb3Rpcs>>,
|
||||
pub(super) watch_consensus_rpcs_sender: watch::Sender<Arc<ConsensusWeb3Rpcs>>,
|
||||
/// this head receiver makes it easy to wait until there is a new block
|
||||
pub(super) watch_consensus_head_receiver: Option<watch::Receiver<ArcBlock>>,
|
||||
pub(super) watch_consensus_head_receiver: Option<watch::Receiver<Web3ProxyBlock>>,
|
||||
pub(super) pending_transactions:
|
||||
Cache<TxHash, TxStatus, hashbrown::hash_map::DefaultHashBuilder>,
|
||||
/// TODO: this map is going to grow forever unless we do some sort of pruning. maybe store pruned in redis?
|
||||
@ -48,8 +48,14 @@ pub struct Web3Rpcs {
|
||||
pub(super) block_hashes: BlockHashesCache,
|
||||
/// blocks on the heaviest chain
|
||||
pub(super) block_numbers: Cache<U64, H256, hashbrown::hash_map::DefaultHashBuilder>,
|
||||
/// the number of rpcs required to agree on consensus for the head block (thundering herd protection)
|
||||
pub(super) min_head_rpcs: usize,
|
||||
/// the soft limit required to agree on consensus for the head block. (thundering herd protection)
|
||||
pub(super) min_sum_soft_limit: u32,
|
||||
/// how far behind the highest known block height we can be before we stop serving requests
|
||||
pub(super) max_block_lag: Option<U64>,
|
||||
/// how old our consensus head block we can be before we stop serving requests
|
||||
pub(super) max_block_age: Option<u64>,
|
||||
}
|
||||
|
||||
impl Web3Rpcs {
|
||||
@ -60,13 +66,15 @@ impl Web3Rpcs {
|
||||
chain_id: u64,
|
||||
db_conn: Option<DatabaseConnection>,
|
||||
http_client: Option<reqwest::Client>,
|
||||
max_block_age: Option<u64>,
|
||||
max_block_lag: Option<U64>,
|
||||
min_head_rpcs: usize,
|
||||
min_sum_soft_limit: u32,
|
||||
pending_transactions: Cache<TxHash, TxStatus, hashbrown::hash_map::DefaultHashBuilder>,
|
||||
pending_tx_sender: Option<broadcast::Sender<TxStatus>>,
|
||||
redis_pool: Option<redis_rate_limiter::RedisPool>,
|
||||
server_configs: HashMap<String, Web3RpcConfig>,
|
||||
watch_consensus_head_sender: Option<watch::Sender<ArcBlock>>,
|
||||
watch_consensus_head_sender: Option<watch::Sender<Web3ProxyBlock>>,
|
||||
) -> anyhow::Result<(Arc<Self>, AnyhowJoinHandle<()>)> {
|
||||
let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded();
|
||||
let (block_sender, block_receiver) = flume::unbounded::<BlockAndRpc>();
|
||||
@ -212,13 +220,15 @@ impl Web3Rpcs {
|
||||
|
||||
let connections = Arc::new(Self {
|
||||
conns: connections,
|
||||
watch_consensus_connections_sender,
|
||||
watch_consensus_rpcs_sender: watch_consensus_connections_sender,
|
||||
watch_consensus_head_receiver,
|
||||
pending_transactions,
|
||||
block_hashes,
|
||||
block_numbers,
|
||||
min_sum_soft_limit,
|
||||
min_head_rpcs,
|
||||
max_block_age,
|
||||
max_block_lag,
|
||||
});
|
||||
|
||||
let authorization = Arc::new(Authorization::internal(db_conn.clone())?);
|
||||
@ -254,7 +264,7 @@ impl Web3Rpcs {
|
||||
authorization: Arc<Authorization>,
|
||||
pending_tx_id_receiver: flume::Receiver<TxHashAndRpc>,
|
||||
block_receiver: flume::Receiver<BlockAndRpc>,
|
||||
head_block_sender: Option<watch::Sender<ArcBlock>>,
|
||||
head_block_sender: Option<watch::Sender<Web3ProxyBlock>>,
|
||||
pending_tx_sender: Option<broadcast::Sender<TxStatus>>,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut futures = vec![];
|
||||
@ -455,7 +465,7 @@ impl Web3Rpcs {
|
||||
max_block_needed: Option<&U64>,
|
||||
) -> anyhow::Result<OpenRequestResult> {
|
||||
let usable_rpcs_by_head_num_and_weight: BTreeMap<(Option<U64>, u64), Vec<Arc<Web3Rpc>>> = {
|
||||
let synced_connections = self.watch_consensus_connections_sender.borrow().clone();
|
||||
let synced_connections = self.watch_consensus_rpcs_sender.borrow().clone();
|
||||
|
||||
let head_block_num = if let Some(head_block) = synced_connections.head_block.as_ref() {
|
||||
head_block.number()
|
||||
@ -499,7 +509,7 @@ impl Web3Rpcs {
|
||||
match x_head_block {
|
||||
None => continue,
|
||||
Some(x_head) => {
|
||||
let key = (Some(x_head.number()), u64::MAX - x.tier);
|
||||
let key = (Some(*x_head.number()), u64::MAX - x.tier);
|
||||
|
||||
m.entry(key).or_insert_with(Vec::new).push(x);
|
||||
}
|
||||
@ -508,6 +518,7 @@ impl Web3Rpcs {
|
||||
}
|
||||
cmp::Ordering::Equal => {
|
||||
// need the consensus head block. filter the synced rpcs
|
||||
// TODO: this doesn't properly check the allow_backups variable!
|
||||
for x in synced_connections
|
||||
.conns
|
||||
.iter()
|
||||
@ -519,7 +530,7 @@ impl Web3Rpcs {
|
||||
}
|
||||
}
|
||||
cmp::Ordering::Greater => {
|
||||
// TODO? if the blocks is close and wait_for_sync and allow_backups, wait for change on a watch_consensus_connections_receiver().subscribe()
|
||||
// TODO? if the blocks is close, wait for change on a watch_consensus_connections_receiver().subscribe()
|
||||
return Ok(OpenRequestResult::NotReady(allow_backups));
|
||||
}
|
||||
}
|
||||
@ -670,11 +681,7 @@ impl Web3Rpcs {
|
||||
|
||||
let mut tried = HashSet::new();
|
||||
|
||||
let mut synced_conns = self
|
||||
.watch_consensus_connections_sender
|
||||
.borrow()
|
||||
.conns
|
||||
.clone();
|
||||
let mut synced_conns = self.watch_consensus_rpcs_sender.borrow().conns.clone();
|
||||
|
||||
// synced connections are all on the same block. sort them by tier with higher soft limits first
|
||||
synced_conns.sort_by_cached_key(rpc_sync_status_sort_key);
|
||||
@ -754,7 +761,7 @@ impl Web3Rpcs {
|
||||
let mut skip_rpcs = vec![];
|
||||
let mut method_not_available_response = None;
|
||||
|
||||
let mut watch_consensus_connections = self.watch_consensus_connections_sender.subscribe();
|
||||
let mut watch_consensus_connections = self.watch_consensus_rpcs_sender.subscribe();
|
||||
|
||||
// TODO: maximum retries? right now its the total number of servers
|
||||
loop {
|
||||
@ -1144,7 +1151,7 @@ impl Serialize for Web3Rpcs {
|
||||
state.serialize_field("conns", &conns)?;
|
||||
|
||||
{
|
||||
let consensus_connections = self.watch_consensus_connections_sender.borrow().clone();
|
||||
let consensus_connections = self.watch_consensus_rpcs_sender.borrow().clone();
|
||||
// TODO: rename synced_connections to consensus_connections?
|
||||
state.serialize_field("synced_connections", &consensus_connections)?;
|
||||
}
|
||||
@ -1181,10 +1188,8 @@ mod tests {
|
||||
// TODO: why is this allow needed? does tokio::test get in the way somehow?
|
||||
#![allow(unused_imports)]
|
||||
use super::*;
|
||||
use crate::rpcs::{
|
||||
blockchain::{ConsensusFinder, SavedBlock},
|
||||
provider::Web3Provider,
|
||||
};
|
||||
use crate::rpcs::consensus::ConsensusFinder;
|
||||
use crate::rpcs::{blockchain::Web3ProxyBlock, provider::Web3Provider};
|
||||
use ethers::types::{Block, U256};
|
||||
use log::{trace, LevelFilter};
|
||||
use parking_lot::RwLock;
|
||||
@ -1213,7 +1218,7 @@ mod tests {
|
||||
|
||||
let blocks: Vec<_> = [block_0, block_1, block_2]
|
||||
.into_iter()
|
||||
.map(|x| SavedBlock::new(Arc::new(x)))
|
||||
.map(|x| Web3ProxyBlock::new(Arc::new(x)))
|
||||
.collect();
|
||||
|
||||
let mut rpcs: Vec<_> = [
|
||||
@ -1298,9 +1303,8 @@ mod tests {
|
||||
let lagged_block = Arc::new(lagged_block);
|
||||
let head_block = Arc::new(head_block);
|
||||
|
||||
// TODO: write a impl From for Block -> BlockId?
|
||||
let mut lagged_block: SavedBlock = lagged_block.into();
|
||||
let mut head_block: SavedBlock = head_block.into();
|
||||
let mut lagged_block: Web3ProxyBlock = lagged_block.into();
|
||||
let mut head_block: Web3ProxyBlock = head_block.into();
|
||||
|
||||
let block_data_limit = u64::MAX;
|
||||
|
||||
@ -1312,6 +1316,7 @@ mod tests {
|
||||
block_data_limit: block_data_limit.into(),
|
||||
tier: 0,
|
||||
head_block: RwLock::new(Some(head_block.clone())),
|
||||
provider: AsyncRwLock::new(Some(Arc::new(Web3Provider::Mock))),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
@ -1323,6 +1328,7 @@ mod tests {
|
||||
block_data_limit: block_data_limit.into(),
|
||||
tier: 0,
|
||||
head_block: RwLock::new(Some(lagged_block.clone())),
|
||||
provider: AsyncRwLock::new(Some(Arc::new(Web3Provider::Mock))),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
@ -1340,13 +1346,13 @@ mod tests {
|
||||
(lagged_rpc.name.clone(), lagged_rpc.clone()),
|
||||
]);
|
||||
|
||||
let (watch_consensus_connections_sender, _) = watch::channel(Default::default());
|
||||
let (watch_consensus_rpcs_sender, _) = watch::channel(Default::default());
|
||||
|
||||
// TODO: make a Web3Rpcs::new
|
||||
let conns = Web3Rpcs {
|
||||
conns,
|
||||
watch_consensus_head_receiver: None,
|
||||
watch_consensus_connections_sender,
|
||||
watch_consensus_rpcs_sender,
|
||||
pending_transactions: Cache::builder()
|
||||
.max_capacity(10_000)
|
||||
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()),
|
||||
@ -1356,32 +1362,37 @@ mod tests {
|
||||
block_numbers: Cache::builder()
|
||||
.max_capacity(10_000)
|
||||
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()),
|
||||
// TODO: test max_block_age?
|
||||
max_block_age: None,
|
||||
// TODO: test max_block_lag?
|
||||
max_block_lag: None,
|
||||
min_head_rpcs: 1,
|
||||
min_sum_soft_limit: 1,
|
||||
};
|
||||
|
||||
let authorization = Arc::new(Authorization::internal(None).unwrap());
|
||||
|
||||
let (head_block_sender, _head_block_receiver) =
|
||||
watch::channel::<ArcBlock>(Default::default());
|
||||
let mut connection_heads = ConsensusFinder::default();
|
||||
let (head_block_sender, _head_block_receiver) = watch::channel(Default::default());
|
||||
let mut consensus_finder = ConsensusFinder::new(None, None);
|
||||
|
||||
// process None so that
|
||||
conns
|
||||
.process_block_from_rpc(
|
||||
&authorization,
|
||||
&mut connection_heads,
|
||||
&mut consensus_finder,
|
||||
None,
|
||||
lagged_rpc.clone(),
|
||||
&head_block_sender,
|
||||
&None,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
.expect(
|
||||
"its lagged, but it should still be seen as consensus if its the first to report",
|
||||
);
|
||||
conns
|
||||
.process_block_from_rpc(
|
||||
&authorization,
|
||||
&mut connection_heads,
|
||||
&mut consensus_finder,
|
||||
None,
|
||||
head_rpc.clone(),
|
||||
&head_block_sender,
|
||||
@ -1414,12 +1425,12 @@ mod tests {
|
||||
assert!(matches!(x, OpenRequestResult::NotReady(true)));
|
||||
|
||||
// add lagged blocks to the conns. both servers should be allowed
|
||||
lagged_block.block = conns.save_block(lagged_block.block, true).await.unwrap();
|
||||
lagged_block = conns.try_cache_block(lagged_block, true).await.unwrap();
|
||||
|
||||
conns
|
||||
.process_block_from_rpc(
|
||||
&authorization,
|
||||
&mut connection_heads,
|
||||
&mut consensus_finder,
|
||||
Some(lagged_block.clone()),
|
||||
lagged_rpc,
|
||||
&head_block_sender,
|
||||
@ -1430,7 +1441,7 @@ mod tests {
|
||||
conns
|
||||
.process_block_from_rpc(
|
||||
&authorization,
|
||||
&mut connection_heads,
|
||||
&mut consensus_finder,
|
||||
Some(lagged_block.clone()),
|
||||
head_rpc.clone(),
|
||||
&head_block_sender,
|
||||
@ -1442,12 +1453,12 @@ mod tests {
|
||||
assert_eq!(conns.num_synced_rpcs(), 2);
|
||||
|
||||
// add head block to the conns. lagged_rpc should not be available
|
||||
head_block.block = conns.save_block(head_block.block, true).await.unwrap();
|
||||
head_block = conns.try_cache_block(head_block, true).await.unwrap();
|
||||
|
||||
conns
|
||||
.process_block_from_rpc(
|
||||
&authorization,
|
||||
&mut connection_heads,
|
||||
&mut consensus_finder,
|
||||
Some(head_block.clone()),
|
||||
head_rpc,
|
||||
&head_block_sender,
|
||||
@ -1511,7 +1522,7 @@ mod tests {
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let head_block: SavedBlock = Arc::new(head_block).into();
|
||||
let head_block: Web3ProxyBlock = Arc::new(head_block).into();
|
||||
|
||||
let pruned_rpc = Web3Rpc {
|
||||
name: "pruned".to_string(),
|
||||
@ -1548,13 +1559,13 @@ mod tests {
|
||||
(archive_rpc.name.clone(), archive_rpc.clone()),
|
||||
]);
|
||||
|
||||
let (watch_consensus_connections_sender, _) = watch::channel(Default::default());
|
||||
let (watch_consensus_rpcs_sender, _) = watch::channel(Default::default());
|
||||
|
||||
// TODO: make a Web3Rpcs::new
|
||||
let conns = Web3Rpcs {
|
||||
conns,
|
||||
watch_consensus_head_receiver: None,
|
||||
watch_consensus_connections_sender,
|
||||
watch_consensus_rpcs_sender,
|
||||
pending_transactions: Cache::builder()
|
||||
.max_capacity(10)
|
||||
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()),
|
||||
@ -1566,13 +1577,14 @@ mod tests {
|
||||
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()),
|
||||
min_head_rpcs: 1,
|
||||
min_sum_soft_limit: 3_000,
|
||||
max_block_age: None,
|
||||
max_block_lag: None,
|
||||
};
|
||||
|
||||
let authorization = Arc::new(Authorization::internal(None).unwrap());
|
||||
|
||||
let (head_block_sender, _head_block_receiver) =
|
||||
watch::channel::<ArcBlock>(Default::default());
|
||||
let mut connection_heads = ConsensusFinder::default();
|
||||
let (head_block_sender, _head_block_receiver) = watch::channel(Default::default());
|
||||
let mut connection_heads = ConsensusFinder::new(None, None);
|
||||
|
||||
conns
|
||||
.process_block_from_rpc(
|
||||
|
@ -1,8 +1,8 @@
|
||||
// TODO: all pub, or export useful things here instead?
|
||||
pub mod blockchain;
|
||||
pub mod consensus;
|
||||
pub mod many;
|
||||
pub mod one;
|
||||
pub mod provider;
|
||||
pub mod request;
|
||||
pub mod synced_connections;
|
||||
pub mod transactions;
|
||||
|
@ -1,5 +1,5 @@
|
||||
///! Rate-limited communication with a web3 provider.
|
||||
use super::blockchain::{ArcBlock, BlockHashesCache, SavedBlock};
|
||||
use super::blockchain::{ArcBlock, BlockHashesCache, Web3ProxyBlock};
|
||||
use super::provider::Web3Provider;
|
||||
use super::request::{OpenRequestHandle, OpenRequestResult};
|
||||
use crate::app::{flatten_handle, AnyhowJoinHandle};
|
||||
@ -81,7 +81,7 @@ pub struct Web3Rpc {
|
||||
/// Lower tiers are higher priority when sending requests
|
||||
pub(super) tier: u64,
|
||||
/// TODO: change this to a watch channel so that http providers can subscribe and take action on change.
|
||||
pub(super) head_block: RwLock<Option<SavedBlock>>,
|
||||
pub(super) head_block: RwLock<Option<Web3ProxyBlock>>,
|
||||
/// Track how fast this RPC is
|
||||
pub(super) latency: Web3RpcLatencies,
|
||||
}
|
||||
@ -308,9 +308,9 @@ impl Web3Rpc {
|
||||
}
|
||||
|
||||
pub fn has_block_data(&self, needed_block_num: &U64) -> bool {
|
||||
let head_block_num = match self.head_block.read().clone() {
|
||||
let head_block_num = match self.head_block.read().as_ref() {
|
||||
None => return false,
|
||||
Some(x) => x.number(),
|
||||
Some(x) => *x.number(),
|
||||
};
|
||||
|
||||
// this rpc doesn't have that block yet. still syncing
|
||||
@ -525,9 +525,9 @@ impl Web3Rpc {
|
||||
None
|
||||
}
|
||||
Ok(Some(new_head_block)) => {
|
||||
let new_hash = new_head_block
|
||||
.hash
|
||||
.context("sending block to connections")?;
|
||||
let new_head_block = Web3ProxyBlock::new(new_head_block);
|
||||
|
||||
let new_hash = *new_head_block.hash();
|
||||
|
||||
// if we already have this block saved, set new_head_block to that arc. otherwise store this copy
|
||||
let new_head_block = block_map
|
||||
@ -628,6 +628,7 @@ impl Web3Rpc {
|
||||
if let Some(client) = &*conn.provider.read().await {
|
||||
// trace!("health check unlocked with error on {}", conn);
|
||||
// returning error will trigger a reconnect
|
||||
// also, do the health check as a way of keeping this rpc's request_ewma accurate
|
||||
// TODO: do a query of some kind
|
||||
}
|
||||
|
||||
@ -1164,7 +1165,7 @@ mod tests {
|
||||
|
||||
let random_block = Arc::new(random_block);
|
||||
|
||||
let head_block = SavedBlock::new(random_block);
|
||||
let head_block = Web3ProxyBlock::new(random_block);
|
||||
let block_data_limit = u64::MAX;
|
||||
|
||||
let x = Web3Rpc {
|
||||
@ -1194,7 +1195,7 @@ mod tests {
|
||||
.as_secs()
|
||||
.into();
|
||||
|
||||
let head_block: SavedBlock = Arc::new(Block {
|
||||
let head_block: Web3ProxyBlock = Arc::new(Block {
|
||||
hash: Some(H256::random()),
|
||||
number: Some(1_000_000.into()),
|
||||
timestamp: now,
|
||||
|
@ -1,71 +0,0 @@
|
||||
use super::blockchain::{ArcBlock, SavedBlock};
|
||||
use super::many::Web3Rpcs;
|
||||
use super::one::Web3Rpc;
|
||||
use ethers::prelude::{H256, U64};
|
||||
use serde::Serialize;
|
||||
use std::fmt;
|
||||
use std::sync::Arc;
|
||||
|
||||
/// A collection of Web3Rpcs that are on the same block.
|
||||
/// Serialize is so we can print it on our debug endpoint
|
||||
#[derive(Clone, Default, Serialize)]
|
||||
pub struct ConsensusWeb3Rpcs {
|
||||
// TODO: store ArcBlock instead?
|
||||
pub(super) head_block: Option<SavedBlock>,
|
||||
// TODO: this should be able to serialize, but it isn't
|
||||
#[serde(skip_serializing)]
|
||||
pub(super) conns: Vec<Arc<Web3Rpc>>,
|
||||
pub(super) num_checked_conns: usize,
|
||||
pub(super) includes_backups: bool,
|
||||
}
|
||||
|
||||
impl ConsensusWeb3Rpcs {
|
||||
pub fn num_conns(&self) -> usize {
|
||||
self.conns.len()
|
||||
}
|
||||
|
||||
pub fn sum_soft_limit(&self) -> u32 {
|
||||
self.conns.iter().fold(0, |sum, rpc| sum + rpc.soft_limit)
|
||||
}
|
||||
|
||||
// TODO: sum_hard_limit?
|
||||
}
|
||||
|
||||
impl fmt::Debug for ConsensusWeb3Rpcs {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
// TODO: the default formatter takes forever to write. this is too quiet though
|
||||
// TODO: print the actual conns?
|
||||
f.debug_struct("ConsensusConnections")
|
||||
.field("head_block", &self.head_block)
|
||||
.field("num_conns", &self.conns.len())
|
||||
.finish_non_exhaustive()
|
||||
}
|
||||
}
|
||||
|
||||
impl Web3Rpcs {
|
||||
pub fn head_block(&self) -> Option<ArcBlock> {
|
||||
self.watch_consensus_head_receiver
|
||||
.as_ref()
|
||||
.map(|x| x.borrow().clone())
|
||||
}
|
||||
|
||||
pub fn head_block_hash(&self) -> Option<H256> {
|
||||
self.head_block().and_then(|x| x.hash)
|
||||
}
|
||||
|
||||
pub fn head_block_num(&self) -> Option<U64> {
|
||||
self.head_block().and_then(|x| x.number)
|
||||
}
|
||||
|
||||
pub fn synced(&self) -> bool {
|
||||
!self
|
||||
.watch_consensus_connections_sender
|
||||
.borrow()
|
||||
.conns
|
||||
.is_empty()
|
||||
}
|
||||
|
||||
pub fn num_synced_rpcs(&self) -> usize {
|
||||
self.watch_consensus_connections_sender.borrow().conns.len()
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user