one list for ranked rpcs

This commit is contained in:
Bryan Stitt 2023-06-27 12:36:41 -07:00
parent 6c5637260d
commit cdea61cb6b
14 changed files with 389 additions and 544 deletions

23
Cargo.lock generated

@ -1676,6 +1676,19 @@ dependencies = [
"syn 1.0.109",
]
[[package]]
name = "env_logger"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "85cdab6a89accf66733ad5a1693a4dcced6aeff64602b634530dd73c1f3ee9f0"
dependencies = [
"humantime",
"is-terminal",
"log",
"regex",
"termcolor",
]
[[package]]
name = "equivalent"
version = "1.0.0"
@ -6102,11 +6115,12 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]]
name = "tokio"
version = "1.28.2"
version = "1.29.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94d7b1cfd2aa4011f2de74c2c4c63665e27a71006b0a192dcd2710272e73dfa2"
checksum = "374442f06ee49c3a28a8fc9f01a2596fed7559c6b99b31279c3261778e77d84f"
dependencies = [
"autocfg",
"backtrace",
"bytes",
"libc",
"mio 0.8.8",
@ -6418,9 +6432,9 @@ dependencies = [
[[package]]
name = "tracing-attributes"
version = "0.1.24"
version = "0.1.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0f57e3ca2a01450b1a921183a9c9cbfda207fd822cef4ccb00a65402cbba7a74"
checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab"
dependencies = [
"proc-macro2",
"quote",
@ -6937,6 +6951,7 @@ dependencies = [
"derivative",
"derive_more",
"entities",
"env_logger",
"ethbloom",
"ethers",
"fdlimit",

@ -11,6 +11,6 @@ anyhow = "1.0.71"
hashbrown = "0.14.0"
log = "0.4.19"
moka = { version = "0.11.2", features = ["future"] }
tokio = "1.28.2"
tokio = "1.29.0"
tracing = "0.1.37"
workspace-hack = { version = "0.1", path = "../workspace-hack" }

@ -10,10 +10,10 @@ flume = "0.10.14"
log = "0.4.19"
portable-atomic = { version = "1.3.3", features = ["float"] }
serde = { version = "1.0.164", features = [] }
tokio = { version = "1.28.2", features = ["full"] }
tokio = { version = "1.29.0", features = ["full"] }
tracing = "0.1.37"
watermill = "0.1.1"
workspace-hack = { version = "0.1", path = "../workspace-hack" }
[dev-dependencies]
tokio = { version = "1.28.2", features = ["full", "test-util"] }
tokio = { version = "1.29.0", features = ["full", "test-util"] }

@ -9,7 +9,7 @@ name = "migration"
path = "src/lib.rs"
[dependencies]
tokio = { version = "1.28.2", features = ["full", "tracing"] }
tokio = { version = "1.29.0", features = ["full", "tracing"] }
workspace-hack = { version = "0.1", path = "../workspace-hack" }
[dependencies.sea-orm-migration]

@ -5,5 +5,5 @@ authors = ["Bryan Stitt <bryan@llamanodes.com>"]
edition = "2021"
[dependencies]
tokio = { version = "1.28.2", features = ["time"] }
tokio = { version = "1.29.0", features = ["time"] }
workspace-hack = { version = "0.1", path = "../workspace-hack" }

@ -8,5 +8,5 @@ edition = "2021"
anyhow = "1.0.71"
chrono = "0.4.26"
deadpool-redis = { version = "0.12.0", features = ["rt_tokio_1", "serde"] }
tokio = "1.28.2"
tokio = "1.29.0"
workspace-hack = { version = "0.1", path = "../workspace-hack" }

@ -90,7 +90,7 @@ serde_json = { version = "1.0.99", default-features = false, features = ["raw_va
serde_prometheus = "0.2.3"
strum = { version = "0.25.0", features = ["derive"] }
time = { version = "0.3.22" }
tokio = { version = "1.28.2", features = ["full", "tracing"] }
tokio = { version = "1.29.0", features = ["full", "tracing"] }
tokio-console = { version = "0.1.8", optional = true }
tokio-stream = { version = "0.1.14", features = ["sync"] }
tokio-uring = { version = "0.4.0", optional = true }
@ -105,8 +105,12 @@ uuid = { version = "1.4.0", default-features = false, features = ["fast-rng", "v
derivative = "2.2.0"
workspace-hack = { version = "0.1", path = "../workspace-hack" }
[dev-dependencies]
# TODO: why doesn't this work in dev-dependencies
test-log = { version = "0.2.12", default-features = false, features = ["trace"] }
tokio = { version = "1.28.2", features = ["full", "test-util"] }
[dev-dependencies]
env_logger = "0.10"
test-log = "0.2.12"
tokio = { version = "1.29.0", features = ["full", "test-util"] }
tracing = {version = "0.1", default-features = false}
tracing-subscriber = {version = "0.3", default-features = false, features = ["env-filter", "fmt"]}

@ -17,7 +17,7 @@ use crate::response_cache::{
JsonRpcQueryCacheKey, JsonRpcResponseCache, JsonRpcResponseEnum, JsonRpcResponseWeigher,
};
use crate::rpcs::blockchain::Web3ProxyBlock;
use crate::rpcs::consensus::ConsensusWeb3Rpcs;
use crate::rpcs::consensus::RankedRpcs;
use crate::rpcs::many::Web3Rpcs;
use crate::rpcs::one::Web3Rpc;
use crate::rpcs::provider::{connect_http, EthersHttpProvider};
@ -175,9 +175,9 @@ pub struct Web3ProxyAppSpawn {
/// these are important and must be allowed to finish
pub background_handles: FuturesUnordered<Web3ProxyJoinHandle<()>>,
/// config changes are sent here
pub new_top_config_sender: watch::Sender<TopConfig>,
pub new_top_config: watch::Sender<TopConfig>,
/// watch this to know when the app is ready to serve requests
pub consensus_connections_watcher: watch::Receiver<Option<Arc<ConsensusWeb3Rpcs>>>,
pub ranked_rpcs: watch::Receiver<Option<Arc<RankedRpcs>>>,
}
impl Web3ProxyApp {
@ -1083,7 +1083,6 @@ impl Web3ProxyApp {
Some(Duration::from_secs(30)),
Some(Level::TRACE.into()),
None,
true,
)
.await;
@ -1114,7 +1113,6 @@ impl Web3ProxyApp {
Some(Duration::from_secs(30)),
Some(Level::TRACE.into()),
num_public_rpcs,
true,
)
.await
}

@ -89,7 +89,7 @@ async fn run(
// start thread for watching config
if let Some(top_config_path) = top_config_path {
let config_sender = spawned_app.new_top_config_sender;
let config_sender = spawned_app.new_top_config;
{
let mut current_config = config_sender.borrow().clone();

@ -4,7 +4,10 @@
//! They will eventually move to another port.
use super::{ResponseCache, ResponseCacheKey};
use crate::app::{Web3ProxyApp, APP_USER_AGENT};
use crate::{
app::{Web3ProxyApp, APP_USER_AGENT},
errors::Web3ProxyError,
};
use axum::{
body::{Bytes, Full},
http::StatusCode,
@ -19,7 +22,8 @@ use moka::future::Cache;
use once_cell::sync::Lazy;
use serde::{ser::SerializeStruct, Serialize};
use serde_json::json;
use std::sync::Arc;
use std::{sync::Arc, time::Duration};
use tokio::time::timeout;
use tracing::trace;
static HEALTH_OK: Lazy<Bytes> = Lazy::new(|| Bytes::from("OK\n"));
@ -74,16 +78,20 @@ pub async fn debug_request(
pub async fn health(
Extension(app): Extension<Arc<Web3ProxyApp>>,
Extension(cache): Extension<Arc<ResponseCache>>,
) -> impl IntoResponse {
let (code, content_type, body) = cache
.get_with(ResponseCacheKey::Health, async move { _health(app).await })
.await;
) -> Result<impl IntoResponse, Web3ProxyError> {
let (code, content_type, body) = timeout(
Duration::from_secs(3),
cache.get_with(ResponseCacheKey::Health, async move { _health(app).await }),
)
.await?;
Response::builder()
let x = Response::builder()
.status(code)
.header("content-type", content_type)
.body(Full::from(body))
.unwrap()
.unwrap();
Ok(x)
}
// TODO: _health doesn't need to be async, but _quick_cache_ttl needs an async function
@ -107,18 +115,22 @@ async fn _health(app: Arc<Web3ProxyApp>) -> (StatusCode, &'static str, Bytes) {
pub async fn backups_needed(
Extension(app): Extension<Arc<Web3ProxyApp>>,
Extension(cache): Extension<Arc<ResponseCache>>,
) -> impl IntoResponse {
let (code, content_type, body) = cache
.get_with(ResponseCacheKey::BackupsNeeded, async move {
) -> Result<impl IntoResponse, Web3ProxyError> {
let (code, content_type, body) = timeout(
Duration::from_secs(3),
cache.get_with(ResponseCacheKey::BackupsNeeded, async move {
_backups_needed(app).await
})
.await;
}),
)
.await?;
Response::builder()
let x = Response::builder()
.status(code)
.header("content-type", content_type)
.body(Full::from(body))
.unwrap()
.unwrap();
Ok(x)
}
#[inline]
@ -126,11 +138,7 @@ async fn _backups_needed(app: Arc<Web3ProxyApp>) -> (StatusCode, &'static str, B
trace!("backups_needed is not cached");
let code = {
let consensus_rpcs = app
.balanced_rpcs
.watch_consensus_rpcs_sender
.borrow()
.clone();
let consensus_rpcs = app.balanced_rpcs.watch_ranked_rpcs.borrow().clone();
if let Some(ref consensus_rpcs) = consensus_rpcs {
if consensus_rpcs.backups_needed {
@ -158,16 +166,20 @@ async fn _backups_needed(app: Arc<Web3ProxyApp>) -> (StatusCode, &'static str, B
pub async fn status(
Extension(app): Extension<Arc<Web3ProxyApp>>,
Extension(cache): Extension<Arc<ResponseCache>>,
) -> impl IntoResponse {
let (code, content_type, body) = cache
.get_with(ResponseCacheKey::Status, async move { _status(app).await })
.await;
) -> Result<impl IntoResponse, Web3ProxyError> {
let (code, content_type, body) = timeout(
Duration::from_secs(3),
cache.get_with(ResponseCacheKey::Status, async move { _status(app).await }),
)
.await?;
Response::builder()
let x = Response::builder()
.status(code)
.header("content-type", content_type)
.body(Full::from(body))
.unwrap()
.unwrap();
Ok(x)
}
// TODO: _status doesn't need to be async, but _quick_cache_ttl needs an async function

@ -351,7 +351,7 @@ impl Web3Rpcs {
// if theres multiple, use petgraph to find the one on the main chain (and remove the others if they have enough confirmations)
let mut consensus_head_receiver = self
.watch_consensus_head_sender
.watch_head_block
.as_ref()
.web3_context("need new head subscriptions to fetch cannonical_block")?
.subscribe();

@ -14,7 +14,6 @@ use itertools::{Itertools, MinMaxResult};
use moka::future::Cache;
use serde::Serialize;
use std::cmp::{Ordering, Reverse};
use std::collections::BTreeMap;
use std::fmt;
use std::sync::{atomic, Arc};
use std::time::Duration;
@ -51,9 +50,10 @@ impl ConsensusRpcData {
#[derive(Constructor, Clone, Copy, Debug, Default, Eq, PartialEq, Serialize)]
pub struct RpcRanking {
tier: u32,
backup: bool,
head_num: Option<U64>,
/// note: the servers in this tier might have blocks higher than this
consensus_head_num: Option<U64>,
tier: u32,
}
impl RpcRanking {
@ -65,11 +65,11 @@ impl RpcRanking {
}
fn sort_key(&self) -> (bool, Reverse<Option<U64>>, u32) {
// TODO: add soft_limit here? add peak_ewma here?
// TODO: should backup or tier be checked first? now that tiers are automated, backups
// TODO: add sum_soft_limit here? add peak_ewma here?
// TODO: should backup or tier be checked first? now that tiers are automated, backups should be more reliable, but still leave them last
// while tier might give us better latency, giving requests to a server that is behind by a block will get in the way of it syncing. better to only query synced servers
// TODO: should we include a random number in here?
// TODO: should we include peak_ewma_latency or weighted_peak_ewma_latency?
(!self.backup, Reverse(self.head_num), self.tier)
(!self.backup, Reverse(self.consensus_head_num), self.tier)
}
}
@ -85,8 +85,6 @@ impl PartialOrd for RpcRanking {
}
}
pub type RankedRpcMap = BTreeMap<RpcRanking, Vec<Arc<Web3Rpc>>>;
pub enum ShouldWaitForBlock {
Ready,
// BackupReady,
@ -99,31 +97,125 @@ pub enum ShouldWaitForBlock {
/// Serialize is so we can print it on our /status endpoint
/// TODO: remove head_block/head_rpcs/tier and replace with one RankedRpcMap
/// TODO: add `best_rpc(method_data_kind, min_block_needed, max_block_needed, include_backups)`
/// TODO: make serializing work. the key needs to be a string. I think we need `serialize_with`
#[derive(Clone, Serialize)]
pub struct ConsensusWeb3Rpcs {
pub(crate) tier: u32,
pub(crate) backups_needed: bool,
pub struct RankedRpcs {
pub head_block: Web3ProxyBlock,
pub num_synced: usize,
pub backups_needed: bool,
// TODO: Don't skip serializing, instead make a shorter serialize
#[serde(skip_serializing)]
pub(crate) head_block: Web3ProxyBlock,
// TODO: make a shorter serialize
pub(crate) head_rpcs: Vec<Arc<Web3Rpc>>,
// TODO: make serializing work. the key needs to be a string. I think we need `serialize_with`
#[serde(skip_serializing)]
pub(crate) other_rpcs: RankedRpcMap,
inner: Vec<Arc<Web3Rpc>>,
// TODO: make serializing work. the key needs to be a string. I think we need `serialize_with`
#[serde(skip_serializing)]
rpc_data: HashMap<Arc<Web3Rpc>, ConsensusRpcData>,
}
impl ConsensusWeb3Rpcs {
impl RankedRpcs {
// /// useful when Web3Rpcs does not track the head block
// pub fn from_all(rpcs: &Web3Rpcs) -> Self {
// let inner = vec![(
// RpcRanking::default(),
// rpcs.by_name
// .read()
// .values()
// .cloned()
// .collect::<Vec<Arc<_>>>(),
// )];
// todo!()
// }
pub fn from_votes(
min_synced_rpcs: usize,
min_sum_soft_limit: u32,
max_lag_block: U64,
votes: HashMap<Web3ProxyBlock, (HashSet<&Arc<Web3Rpc>>, u32)>,
heads: HashMap<Arc<Web3Rpc>, Web3ProxyBlock>,
) -> Option<Self> {
// find the blocks that meets our min_sum_soft_limit and min_synced_rpcs
let mut votes: Vec<_> = votes
.into_iter()
.filter_map(|(block, (rpcs, sum_soft_limit))| {
if *block.number() < max_lag_block
|| sum_soft_limit < min_sum_soft_limit
|| rpcs.len() < min_synced_rpcs
{
None
} else {
Some((block, sum_soft_limit, rpcs))
}
})
.collect();
// sort the votes
votes.sort_by_key(|(block, sum_soft_limit, _)| {
(
Reverse(*block.number()),
// TODO: block total difficulty (if we have it)
Reverse(*sum_soft_limit),
// TODO: median/peak latency here?
)
});
// return the first result that exceededs confgured minimums (if any)
if let Some((best_block, _, best_rpcs)) = votes.into_iter().next() {
let mut ranked_rpcs: Vec<_> = best_rpcs.into_iter().map(Arc::clone).collect();
let mut rpc_data = HashMap::new();
let backups_needed = ranked_rpcs.iter().any(|x| x.backup);
let num_synced = ranked_rpcs.len();
// TODO: add all the unsynced rpcs
for (x, x_head) in heads.iter() {
let data = ConsensusRpcData::new(x, x_head);
rpc_data.insert(x.clone(), data);
if ranked_rpcs.contains(x) {
continue;
}
if *x_head.number() < max_lag_block {
// server is too far behind
continue;
}
ranked_rpcs.push(x.clone());
}
ranked_rpcs
.sort_by_cached_key(|x| x.sort_for_load_balancing_on(Some(*best_block.number())));
// consensus found!
trace!(?ranked_rpcs);
let consensus = RankedRpcs {
backups_needed,
head_block: best_block,
rpc_data,
inner: ranked_rpcs,
num_synced,
};
return Some(consensus);
}
None
}
pub fn all(&self) -> &[Arc<Web3Rpc>] {
&self.inner
}
pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}
#[inline]
pub fn num_consensus_rpcs(&self) -> usize {
self.head_rpcs.len()
// TODO! wrong! this is now the total num of rpcs. we should keep the number on the head block saved
self.inner.len()
}
/// will tell you if waiting will eventually should wait for a block
@ -135,56 +227,17 @@ impl ConsensusWeb3Rpcs {
needed_block_num: Option<&U64>,
skip_rpcs: &[Arc<Web3Rpc>],
) -> ShouldWaitForBlock {
// TODO: i think checking synced is always a waste of time. though i guess there could be a race
if self
.head_rpcs
.iter()
.any(|rpc| self.rpc_will_work_eventually(rpc, needed_block_num, skip_rpcs))
{
let head_num = self.head_block.number();
if Some(head_num) >= needed_block_num {
trace!("best (head) block: {}", head_num);
return ShouldWaitForBlock::Ready;
for rpc in self.inner.iter() {
match self.rpc_will_work_eventually(rpc, needed_block_num, skip_rpcs) {
ShouldWaitForBlock::NeverReady => continue,
x => return x,
}
}
// all of the head rpcs are skipped
let mut best_num = None;
// iterate the other rpc tiers to find the next best block
for (next_ranking, next_rpcs) in self.other_rpcs.iter() {
if !next_rpcs
.iter()
.any(|rpc| self.rpc_will_work_eventually(rpc, needed_block_num, skip_rpcs))
{
trace!("everything in this ranking ({:?}) is skipped", next_ranking);
continue;
}
let next_head_num = next_ranking.head_num.as_ref();
if next_head_num >= needed_block_num {
trace!("best (head) block: {:?}", next_head_num);
return ShouldWaitForBlock::Ready;
}
best_num = next_head_num;
}
// TODO: this seems wrong
if best_num.is_some() {
trace!("best (old) block: {:?}", best_num);
ShouldWaitForBlock::Wait {
current: best_num.copied(),
}
} else {
trace!("never ready");
ShouldWaitForBlock::NeverReady
}
ShouldWaitForBlock::NeverReady
}
/// TODO: change this to take a min and a max
pub fn has_block_data(&self, rpc: &Web3Rpc, block_num: &U64) -> bool {
self.rpc_data
.get(rpc)
@ -193,15 +246,17 @@ impl ConsensusWeb3Rpcs {
}
// TODO: take method as a param, too. mark nodes with supported methods (maybe do it optimistically? on)
// TODO: move this onto Web3Rpc?
// TODO: this needs min and max block on it
pub fn rpc_will_work_eventually(
&self,
rpc: &Arc<Web3Rpc>,
needed_block_num: Option<&U64>,
skip_rpcs: &[Arc<Web3Rpc>],
) -> bool {
) -> ShouldWaitForBlock {
if skip_rpcs.contains(rpc) {
// if rpc is skipped, it must have already been determined it is unable to serve the request
return false;
return ShouldWaitForBlock::NeverReady;
}
if let Some(needed_block_num) = needed_block_num {
@ -210,26 +265,27 @@ impl ConsensusWeb3Rpcs {
Ordering::Less => {
trace!("{} is behind. let it catch up", rpc);
// TODO: what if this is a pruned rpc that is behind by a lot, and the block is old, too?
return true;
return ShouldWaitForBlock::Wait {
current: Some(rpc_data.head_block_num),
};
}
Ordering::Greater | Ordering::Equal => {
// rpc is synced past the needed block. make sure the block isn't too old for it
if self.has_block_data(rpc, needed_block_num) {
trace!("{} has {}", rpc, needed_block_num);
return true;
return ShouldWaitForBlock::Ready;
} else {
trace!("{} does not have {}", rpc, needed_block_num);
return false;
return ShouldWaitForBlock::NeverReady;
}
}
}
}
// no rpc data for this rpc. thats not promising
false
warn!(%rpc, "no rpc data for this rpc. thats not promising");
ShouldWaitForBlock::NeverReady
} else {
// if no needed_block_num was specified, then this should work
true
ShouldWaitForBlock::Ready
}
}
@ -284,14 +340,10 @@ impl ConsensusWeb3Rpcs {
// TODO: sum_hard_limit?
}
impl fmt::Debug for ConsensusWeb3Rpcs {
impl fmt::Debug for RankedRpcs {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// TODO: the default formatter takes forever to write. this is too quiet though
// TODO: print the actual conns?
f.debug_struct("ConsensusWeb3Rpcs")
.field("head_block", &self.head_block)
.field("num_conns", &self.head_rpcs.len())
.finish_non_exhaustive()
// TODO: the default formatter takes forever to write. we need to print something though
f.debug_struct("RankedRpcs").finish_non_exhaustive()
}
}
@ -299,7 +351,7 @@ impl fmt::Debug for ConsensusWeb3Rpcs {
impl Web3Rpcs {
// TODO: return a ref?
pub fn head_block(&self) -> Option<Web3ProxyBlock> {
self.watch_consensus_head_sender
self.watch_head_block
.as_ref()
.and_then(|x| x.borrow().clone())
}
@ -315,20 +367,20 @@ impl Web3Rpcs {
}
pub fn synced(&self) -> bool {
let consensus = self.watch_consensus_rpcs_sender.borrow();
let consensus = self.watch_ranked_rpcs.borrow();
if let Some(consensus) = consensus.as_ref() {
!consensus.head_rpcs.is_empty()
!consensus.is_empty()
} else {
false
}
}
pub fn num_synced_rpcs(&self) -> usize {
let consensus = self.watch_consensus_rpcs_sender.borrow();
let consensus = self.watch_ranked_rpcs.borrow();
if let Some(consensus) = consensus.as_ref() {
consensus.head_rpcs.len()
consensus.num_synced
} else {
0
}
@ -381,26 +433,22 @@ impl ConsensusFinder {
authorization: &Arc<Authorization>,
rpc: Option<&Arc<Web3Rpc>>,
new_block: Option<Web3ProxyBlock>,
) -> Web3ProxyResult<()> {
) -> Web3ProxyResult<bool> {
let new_consensus_rpcs = match self
.find_consensus_connections(authorization, web3_rpcs)
.await
.web3_context("error while finding consensus head block!")?
{
Err(err) => {
return Err(err).web3_context("error while finding consensus head block!");
}
Ok(None) => {
return Err(Web3ProxyError::NoConsensusHeadBlock);
}
Ok(Some(x)) => x,
None => return Ok(false),
Some(x) => x,
};
trace!("new_synced_connections: {:#?}", new_consensus_rpcs);
trace!(?new_consensus_rpcs);
let watch_consensus_head_sender = web3_rpcs.watch_consensus_head_sender.as_ref().unwrap();
let consensus_tier = new_consensus_rpcs.tier;
// TODO: think more about the default for total_tiers
let total_tiers = self.worst_tier().unwrap_or_default();
let watch_consensus_head_sender = web3_rpcs.watch_head_block.as_ref().unwrap();
// TODO: think more about the default for tiers
let best_tier = self.best_tier().unwrap_or_default();
let worst_tier = self.worst_tier().unwrap_or_default();
let backups_needed = new_consensus_rpcs.backups_needed;
let consensus_head_block = new_consensus_rpcs.head_block.clone();
let num_consensus_rpcs = new_consensus_rpcs.num_consensus_rpcs();
@ -410,7 +458,7 @@ impl ConsensusFinder {
let new_consensus_rpcs = Arc::new(new_consensus_rpcs);
let old_consensus_head_connections = web3_rpcs
.watch_consensus_rpcs_sender
.watch_ranked_rpcs
.send_replace(Some(new_consensus_rpcs.clone()));
let backups_voted_str = if backups_needed { "B " } else { "" };
@ -431,8 +479,8 @@ impl ConsensusFinder {
None => {
debug!(
"first {}/{} {}{}/{}/{} block={}, rpc={}",
consensus_tier,
total_tiers,
best_tier,
worst_tier,
backups_voted_str,
num_consensus_rpcs,
num_active_rpcs,
@ -469,8 +517,8 @@ impl ConsensusFinder {
// TODO: trace level if rpc is backup
debug!(
"con {}/{} {}{}/{}/{} con={} rpc={}",
consensus_tier,
total_tiers,
best_tier,
worst_tier,
backups_voted_str,
num_consensus_rpcs,
num_active_rpcs,
@ -483,8 +531,8 @@ impl ConsensusFinder {
debug!(
"unc {}/{} {}{}/{}/{} con={} old={} rpc={}",
consensus_tier,
total_tiers,
best_tier,
worst_tier,
backups_voted_str,
num_consensus_rpcs,
num_active_rpcs,
@ -510,8 +558,8 @@ impl ConsensusFinder {
// TODO: better log that includes all the votes
warn!(
"chain rolled back {}/{} {}{}/{}/{} con={} old={} rpc={}",
consensus_tier,
total_tiers,
best_tier,
worst_tier,
backups_voted_str,
num_consensus_rpcs,
num_active_rpcs,
@ -542,8 +590,8 @@ impl ConsensusFinder {
Ordering::Greater => {
info!(
"new {}/{} {}{}/{}/{} con={} rpc={}",
consensus_tier,
total_tiers,
best_tier,
worst_tier,
backups_voted_str,
num_consensus_rpcs,
num_active_rpcs,
@ -569,7 +617,7 @@ impl ConsensusFinder {
}
}
Ok(())
Ok(true)
}
pub(super) async fn process_block_from_rpc(
@ -579,7 +627,7 @@ impl ConsensusFinder {
new_block: Option<Web3ProxyBlock>,
rpc: Arc<Web3Rpc>,
_pending_tx_sender: &Option<broadcast::Sender<TxStatus>>,
) -> Web3ProxyResult<()> {
) -> Web3ProxyResult<bool> {
// TODO: how should we handle an error here?
if !self
.update_rpc(new_block.clone(), rpc.clone(), web3_rpcs)
@ -587,7 +635,8 @@ impl ConsensusFinder {
.web3_context("failed to update rpc")?
{
// nothing changed. no need to scan for a new consensus head
return Ok(());
// TODO: this should this be true if there is an existing consensus?
return Ok(false);
}
self.refresh(web3_rpcs, authorization, Some(&rpc), new_block)
@ -754,7 +803,7 @@ impl ConsensusFinder {
&mut self,
authorization: &Arc<Authorization>,
web3_rpcs: &Web3Rpcs,
) -> Web3ProxyResult<Option<ConsensusWeb3Rpcs>> {
) -> Web3ProxyResult<Option<RankedRpcs>> {
self.update_tiers().await?;
let minmax_block = self.rpc_heads.values().minmax_by_key(|&x| x.number());
@ -801,6 +850,12 @@ impl ConsensusFinder {
let mut block_to_check = rpc_head.clone();
while block_to_check.number() >= lowest_block_number {
if let Some(max_age) = self.max_head_block_age {
if block_to_check.age() > max_age {
break;
}
}
if !rpc.backup {
// backup nodes are excluded from the primary voting
let entry = primary_votes.entry(block_to_check.clone()).or_default();
@ -815,7 +870,6 @@ impl ConsensusFinder {
backup_entry.0.insert(rpc);
backup_entry.1 += rpc.soft_limit;
// we used to specify rpc on this, but it shouldn't be necessary
let parent_hash = block_to_check.parent_hash();
// TODO: i flip flop on passing rpc to this or not
match web3_rpcs
@ -837,99 +891,31 @@ impl ConsensusFinder {
}
// we finished processing all tiers. check for primary results (if anything but the last tier found consensus, we already returned above)
if let Some(consensus) = self.count_votes(&primary_votes, web3_rpcs) {
if let Some(consensus) = RankedRpcs::from_votes(
web3_rpcs.min_synced_rpcs,
web3_rpcs.min_sum_soft_limit,
max_lag_block_number,
primary_votes,
self.rpc_heads.clone(),
) {
return Ok(Some(consensus));
}
// primary votes didn't work. hopefully backup tiers are synced
Ok(self.count_votes(&backup_votes, web3_rpcs))
Ok(RankedRpcs::from_votes(
web3_rpcs.min_synced_rpcs,
web3_rpcs.min_sum_soft_limit,
max_lag_block_number,
backup_votes,
self.rpc_heads.clone(),
))
}
// TODO: have min_sum_soft_limit and min_head_rpcs on self instead of on Web3Rpcs
fn count_votes(
&self,
votes: &HashMap<Web3ProxyBlock, (HashSet<&Arc<Web3Rpc>>, u32)>,
web3_rpcs: &Web3Rpcs,
) -> Option<ConsensusWeb3Rpcs> {
// sort the primary votes ascending by tier and descending by block num
let mut votes: Vec<_> = votes
.into_iter()
.map(|(block, (rpc_names, sum_soft_limit))| (block, sum_soft_limit, rpc_names))
.collect();
votes.sort_by_key(|(block, sum_soft_limit, _)| {
(Reverse(*block.number()), Reverse(*sum_soft_limit))
});
// return the first result that exceededs confgured minimums (if any)
for (maybe_head_block, sum_soft_limit, rpc_names) in votes {
if *sum_soft_limit < web3_rpcs.min_sum_soft_limit {
continue;
}
// TODO: different mins for backup vs primary
if rpc_names.len() < web3_rpcs.min_synced_rpcs {
continue;
}
trace!("rpc_names: {:#?}", rpc_names);
if rpc_names.len() < web3_rpcs.min_synced_rpcs {
continue;
}
// consensus found!
let consensus_rpcs: Vec<Arc<_>> = rpc_names.iter().map(|x| Arc::clone(x)).collect();
let tier = consensus_rpcs
.iter()
.map(|x| x.tier.load(atomic::Ordering::Relaxed))
.max()
.expect("there should always be a max");
let backups_needed = consensus_rpcs.iter().any(|x| x.backup);
let mut other_rpcs = BTreeMap::new();
for (x, x_head) in self
.rpc_heads
.iter()
.filter(|(k, _)| !consensus_rpcs.contains(k))
{
let x_head_num = *x_head.number();
let key: RpcRanking = RpcRanking::new(
x.tier.load(atomic::Ordering::Relaxed),
x.backup,
Some(x_head_num),
);
other_rpcs
.entry(key)
.or_insert_with(Vec::new)
.push(x.clone());
}
// TODO: how should we populate this?
let mut rpc_data = HashMap::with_capacity(self.rpc_heads.len());
for (x, x_head) in self.rpc_heads.iter() {
let y = ConsensusRpcData::new(x, x_head);
rpc_data.insert(x.clone(), y);
}
let consensus = ConsensusWeb3Rpcs {
tier,
head_block: maybe_head_block.clone(),
head_rpcs: consensus_rpcs,
other_rpcs,
backups_needed,
rpc_data,
};
return Some(consensus);
}
None
pub fn best_tier(&self) -> Option<u32> {
self.rpc_heads
.iter()
.map(|(x, _)| x.tier.load(atomic::Ordering::Relaxed))
.min()
}
pub fn worst_tier(&self) -> Option<u32> {

@ -1,6 +1,6 @@
//! Load balanced communication with a group of web3 rpc providers
use super::blockchain::{BlocksByHashCache, BlocksByNumberCache, Web3ProxyBlock};
use super::consensus::{ConsensusWeb3Rpcs, ShouldWaitForBlock};
use super::consensus::{RankedRpcs, ShouldWaitForBlock};
use super::one::Web3Rpc;
use super::request::{OpenRequestHandle, OpenRequestResult, RequestErrorHandler};
use crate::app::{flatten_handle, Web3ProxyApp, Web3ProxyJoinHandle};
@ -17,7 +17,7 @@ use ethers::prelude::{ProviderError, TxHash, U64};
use futures::future::try_join_all;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use hashbrown::{HashMap, HashSet};
use hashbrown::HashMap;
use itertools::Itertools;
use migration::sea_orm::DatabaseConnection;
use moka::future::{Cache, CacheBuilder};
@ -50,9 +50,9 @@ pub struct Web3Rpcs {
/// TODO: document that this is a watch sender and not a broadcast! if things get busy, blocks might get missed
/// TODO: why is watch_consensus_head_sender in an Option, but this one isn't?
/// Geth's subscriptions have the same potential for skipping blocks.
pub(crate) watch_consensus_rpcs_sender: watch::Sender<Option<Arc<ConsensusWeb3Rpcs>>>,
pub(crate) watch_ranked_rpcs: watch::Sender<Option<Arc<RankedRpcs>>>,
/// this head receiver makes it easy to wait until there is a new block
pub(super) watch_consensus_head_sender: Option<watch::Sender<Option<Web3ProxyBlock>>>,
pub(super) watch_head_block: Option<watch::Sender<Option<Web3ProxyBlock>>>,
/// keep track of transactions that we have sent through subscriptions
pub(super) pending_transaction_cache: Cache<TxHash, TxStatus>,
pub(super) pending_tx_id_receiver: flume::Receiver<TxHashAndRpc>,
@ -89,8 +89,7 @@ impl Web3Rpcs {
) -> anyhow::Result<(
Arc<Self>,
Web3ProxyJoinHandle<()>,
watch::Receiver<Option<Arc<ConsensusWeb3Rpcs>>>,
// watch::Receiver<Arc<ConsensusWeb3Rpcs>>,
watch::Receiver<Option<Arc<RankedRpcs>>>,
)> {
let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded();
let (block_sender, block_receiver) = flume::unbounded::<BlockAndRpc>();
@ -136,8 +135,8 @@ impl Web3Rpcs {
pending_transaction_cache,
pending_tx_id_receiver,
pending_tx_id_sender,
watch_consensus_head_sender,
watch_consensus_rpcs_sender,
watch_head_block: watch_consensus_head_sender,
watch_ranked_rpcs: watch_consensus_rpcs_sender,
});
let authorization = Arc::new(Authorization::internal(db_conn)?);
@ -198,7 +197,7 @@ impl Web3Rpcs {
let http_client = app.http_client.clone();
let vredis_pool = app.vredis_pool.clone();
let block_sender = if self.watch_consensus_head_sender.is_some() {
let block_sender = if self.watch_head_block.is_some() {
Some(self.block_sender.clone())
} else {
None
@ -342,7 +341,7 @@ impl Web3Rpcs {
}
// setup the block funnel
if self.watch_consensus_head_sender.is_some() {
if self.watch_head_block.is_some() {
let connections = Arc::clone(&self);
let pending_tx_sender = pending_tx_sender.clone();
@ -526,211 +525,92 @@ impl Web3Rpcs {
.and_then(|x| x.authorization.clone())
.unwrap_or_default();
if self.watch_consensus_head_sender.is_none() {
// this Web3Rpcs is not tracking head blocks. pick any server
let mut watch_ranked_rpcs = self.watch_ranked_rpcs.subscribe();
let mut potential_rpcs: Vec<_> = self
.by_name
.read()
.values()
.filter(|rpc| !skip_rpcs.contains(rpc))
.filter(|rpc| {
min_block_needed
.map(|x| rpc.has_block_data(x))
.unwrap_or(true)
})
.filter(|rpc| {
max_block_needed
.map(|x| rpc.has_block_data(x))
.unwrap_or(true)
})
.cloned()
.collect();
let mut potential_rpcs = Vec::with_capacity(self.len());
potential_rpcs
.sort_by_cached_key(|x| x.shuffle_for_load_balancing_on(max_block_needed.copied()));
loop {
// TODO: need a change so that protected and 4337 rpcs set watch_consensus_rpcs on start
let ranked_rpcs: Option<Arc<RankedRpcs>> =
watch_ranked_rpcs.borrow_and_update().clone();
match self
._best_available_rpc(&authorization, error_handler, &potential_rpcs, skip_rpcs)
.await
{
OpenRequestResult::Handle(x) => return Ok(OpenRequestResult::Handle(x)),
OpenRequestResult::NotReady => {}
OpenRequestResult::RetryAt(retry_at) => {
if earliest_retry_at.is_none() {
earliest_retry_at = Some(retry_at);
} else {
earliest_retry_at = earliest_retry_at.min(Some(retry_at));
// first check everything that is synced
// even though we might be querying an old block that an unsynced server can handle,
// it is best to not send queries to a syncing server. that slows down sync and can bloat erigon's disk usage.
if let Some(ranked_rpcs) = ranked_rpcs {
potential_rpcs.extend(
ranked_rpcs
.all()
.iter()
.filter(|rpc| {
ranked_rpcs.rpc_will_work_now(
skip_rpcs,
min_block_needed,
max_block_needed,
rpc,
)
})
.cloned(),
);
if potential_rpcs.len() >= self.min_synced_rpcs {
// we have enough potential rpcs. try to load balance
potential_rpcs.sort_by_cached_key(|x| {
x.shuffle_for_load_balancing_on(max_block_needed.copied())
});
match self
._best_available_rpc(
&authorization,
error_handler,
&potential_rpcs,
skip_rpcs,
)
.await
{
OpenRequestResult::Handle(x) => return Ok(OpenRequestResult::Handle(x)),
OpenRequestResult::NotReady => {}
OpenRequestResult::RetryAt(retry_at) => {
if earliest_retry_at.is_none() {
earliest_retry_at = Some(retry_at);
} else {
earliest_retry_at = earliest_retry_at.min(Some(retry_at));
}
}
}
}
}
} else {
let mut watch_consensus_rpcs = self.watch_consensus_rpcs_sender.subscribe();
let mut potential_rpcs = Vec::with_capacity(self.len());
let waiting_for = min_block_needed.max(max_block_needed);
loop {
let consensus_rpcs = watch_consensus_rpcs.borrow_and_update().clone();
potential_rpcs.clear();
// first check everything that is synced
// even though we might be querying an old block that an unsynced server can handle,
// it is best to not send queries to a syncing server. that slows down sync and can bloat erigon's disk usage.
if let Some(consensus_rpcs) = consensus_rpcs {
potential_rpcs.extend(
consensus_rpcs
.head_rpcs
.iter()
.filter(|rpc| {
consensus_rpcs.rpc_will_work_now(
skip_rpcs,
min_block_needed,
max_block_needed,
rpc,
)
})
.cloned(),
);
if potential_rpcs.len() >= self.min_synced_rpcs {
// we have enough potential rpcs. try to load balance
potential_rpcs.sort_by_cached_key(|x| {
x.shuffle_for_load_balancing_on(max_block_needed.copied())
});
match self
._best_available_rpc(
&authorization,
error_handler,
&potential_rpcs,
skip_rpcs,
)
.await
{
OpenRequestResult::Handle(x) => {
return Ok(OpenRequestResult::Handle(x))
}
OpenRequestResult::NotReady => {}
OpenRequestResult::RetryAt(retry_at) => {
if earliest_retry_at.is_none() {
earliest_retry_at = Some(retry_at);
} else {
earliest_retry_at = earliest_retry_at.min(Some(retry_at));
}
if let Some(max_wait) = max_wait {
match ranked_rpcs.should_wait_for_block(waiting_for, skip_rpcs) {
ShouldWaitForBlock::NeverReady => break,
ShouldWaitForBlock::Ready => {
if start.elapsed() > max_wait {
break;
}
}
// these rpcs were tried. don't try them again
potential_rpcs.clear();
}
for next_rpcs in consensus_rpcs.other_rpcs.values() {
let more_rpcs = next_rpcs
.iter()
.filter(|rpc| {
consensus_rpcs.rpc_will_work_now(
skip_rpcs,
min_block_needed,
max_block_needed,
rpc,
)
})
.cloned();
potential_rpcs.extend(more_rpcs);
if potential_rpcs.len() >= self.min_synced_rpcs {
// we have enough potential rpcs. try to load balance
potential_rpcs.sort_by_cached_key(|x| {
x.shuffle_for_load_balancing_on(max_block_needed.copied())
});
match self
._best_available_rpc(
&authorization,
error_handler,
&potential_rpcs,
skip_rpcs,
)
.await
{
OpenRequestResult::Handle(x) => {
return Ok(OpenRequestResult::Handle(x))
}
OpenRequestResult::NotReady => {}
OpenRequestResult::RetryAt(retry_at) => {
if earliest_retry_at.is_none() {
earliest_retry_at = Some(retry_at);
} else {
earliest_retry_at = earliest_retry_at.min(Some(retry_at));
}
}
}
// these rpcs were tried. don't try them again
potential_rpcs.clear();
}
}
if !potential_rpcs.is_empty() {
// even after scanning all the tiers, there are not enough rpcs that can serve this request. try anyways
potential_rpcs.sort_by_cached_key(|x| {
x.shuffle_for_load_balancing_on(max_block_needed.copied())
});
match self
._best_available_rpc(
&authorization,
error_handler,
&potential_rpcs,
skip_rpcs,
)
.await
{
OpenRequestResult::Handle(x) => {
return Ok(OpenRequestResult::Handle(x))
}
OpenRequestResult::NotReady => {}
OpenRequestResult::RetryAt(retry_at) => {
if earliest_retry_at.is_none() {
earliest_retry_at = Some(retry_at);
} else {
earliest_retry_at = earliest_retry_at.min(Some(retry_at));
}
}
}
}
let waiting_for = min_block_needed.max(max_block_needed);
if let Some(max_wait) = max_wait {
match consensus_rpcs.should_wait_for_block(waiting_for, skip_rpcs) {
ShouldWaitForBlock::NeverReady => break,
ShouldWaitForBlock::Ready => {
if start.elapsed() > max_wait {
break;
}
}
ShouldWaitForBlock::Wait { .. } => select! {
_ = watch_consensus_rpcs.changed() => {
// no need to borrow_and_update because we do that at the top of the loop
},
_ = sleep_until(start + max_wait) => break,
ShouldWaitForBlock::Wait { .. } => select! {
_ = watch_ranked_rpcs.changed() => {
// no need to borrow_and_update because we do that at the top of the loop
},
}
}
} else if let Some(max_wait) = max_wait {
select! {
_ = watch_consensus_rpcs.changed() => {
// no need to borrow_and_update because we do that at the top of the loop
_ = sleep_until(start + max_wait) => break,
},
_ = sleep_until(start + max_wait) => break,
}
} else {
break;
}
} else if let Some(max_wait) = max_wait {
select! {
_ = watch_ranked_rpcs.changed() => {
// no need to borrow_and_update because we do that at the top of the loop
},
_ = sleep_until(start + max_wait) => break,
}
} else {
break;
}
// clear for the next loop
potential_rpcs.clear();
}
if let Some(request_metadata) = request_metadata {
@ -765,7 +645,6 @@ impl Web3Rpcs {
min_block_needed: Option<&U64>,
max_block_needed: Option<&U64>,
max_count: Option<usize>,
allow_backups: bool,
error_level: Option<RequestErrorHandler>,
) -> Result<Vec<OpenRequestHandle>, Option<Instant>> {
let mut earliest_retry_at = None;
@ -785,27 +664,10 @@ impl Web3Rpcs {
let mut selected_rpcs = Vec::with_capacity(max_count);
let mut tried = HashSet::new();
let mut synced_rpcs = {
let synced_rpcs = self.watch_consensus_rpcs_sender.borrow();
if let Some(synced_rpcs) = synced_rpcs.as_ref() {
synced_rpcs.head_rpcs.clone()
} else {
// TODO: make this an Option instead of making an empty vec?
vec![]
}
};
// synced connections are all on the same block. sort them by tier with higher soft limits first
synced_rpcs.sort_by_cached_key(|x| x.sort_for_load_balancing_on(max_block_needed.copied()));
trace!("synced_rpcs: {:#?}", synced_rpcs);
// if there aren't enough synced connections, include more connections
// TODO: only do this sorting if the synced_rpcs isn't enough
// TODO: filter the rpcs here
let mut all_rpcs: Vec<_> = self.by_name.read().values().cloned().collect();
// TODO: this sorts them all even though we probably won't need all of them. think about this more
all_rpcs.sort_by_cached_key(|x| x.sort_for_load_balancing_on(max_block_needed.copied()));
trace!("all_rpcs: {:#?}", all_rpcs);
@ -814,21 +676,10 @@ impl Web3Rpcs {
.and_then(|x| x.authorization.clone())
.unwrap_or_default();
for rpc in itertools::chain(synced_rpcs, all_rpcs) {
if tried.contains(&rpc) {
continue;
}
for rpc in all_rpcs {
trace!("trying {}", rpc);
tried.insert(rpc.clone());
if !allow_backups && rpc.backup {
trace!("{} is a backup. skipping", rpc);
continue;
}
// TODO: this has_block_data check is in a few places now. move it onto the rpc
// TODO: use a helper function for these
if let Some(block_needed) = min_block_needed {
if !rpc.has_block_data(block_needed) {
trace!("{} is missing min_block_needed. skipping", rpc);
@ -900,7 +751,7 @@ impl Web3Rpcs {
let mut skip_rpcs = vec![];
let mut method_not_available_response = None;
let mut watch_consensus_rpcs = self.watch_consensus_rpcs_sender.subscribe();
let mut watch_consensus_rpcs = self.watch_ranked_rpcs.subscribe();
let start = Instant::now();
@ -1182,9 +1033,8 @@ impl Web3Rpcs {
max_wait: Option<Duration>,
error_level: Option<RequestErrorHandler>,
max_sends: Option<usize>,
include_backups: bool,
) -> Web3ProxyResult<Box<RawValue>> {
let mut watch_consensus_rpcs = self.watch_consensus_rpcs_sender.subscribe();
let mut watch_consensus_rpcs = self.watch_ranked_rpcs.subscribe();
let start = Instant::now();
@ -1201,7 +1051,6 @@ impl Web3Rpcs {
min_block_needed,
max_block_needed,
max_sends,
include_backups,
error_level,
)
.await
@ -1359,7 +1208,7 @@ impl Serialize for Web3Rpcs {
}
{
let consensus_rpcs = self.watch_consensus_rpcs_sender.borrow().clone();
let consensus_rpcs = self.watch_ranked_rpcs.borrow().clone();
// TODO: rename synced_connections to consensus_rpcs
if let Some(consensus_rpcs) = consensus_rpcs.as_ref() {
@ -1382,10 +1231,10 @@ impl Serialize for Web3Rpcs {
state.serialize_field(
"watch_consensus_rpcs_receivers",
&self.watch_consensus_rpcs_sender.receiver_count(),
&self.watch_ranked_rpcs.receiver_count(),
)?;
if let Some(ref x) = self.watch_consensus_head_sender {
if let Some(ref x) = self.watch_head_block {
state.serialize_field("watch_consensus_head_receivers", &x.receiver_count())?;
} else {
state.serialize_field("watch_consensus_head_receivers", &None::<()>)?;
@ -1415,16 +1264,8 @@ mod tests {
PeakEwmaLatency::spawn(Duration::from_secs(1), 4, Duration::from_secs(1))
}
// TODO: logging
#[tokio::test(start_paused = true)]
#[test_log::test(tokio::test)]
async fn test_sort_connections_by_sync_status() {
// TODO: how should we do test logging setup with tracing?
// let _ = env_logger::builder()
// .filter_level(LevelFilter::Error)
// .filter_module("web3_proxy", LevelFilter::Trace)
// .is_test(true)
// .try_init();
let block_0 = Block {
number: Some(0.into()),
hash: Some(H256::random()),
@ -1511,15 +1352,8 @@ mod tests {
assert_eq!(names_in_sort_order, ["c", "f", "b", "e", "a", "d"]);
}
#[tokio::test(start_paused = true)]
#[test_log::test(tokio::test)]
async fn test_server_selection_by_height() {
// // TODO: how should we do test logging setup with tracing?
// let _ = env_logger::builder()
// .filter_level(LevelFilter::Error)
// .filter_module("web3_proxy", LevelFilter::Trace)
// .is_test(true)
// .try_init();
let now = chrono::Utc::now().timestamp().into();
let lagged_block = Block {
@ -1550,7 +1384,6 @@ mod tests {
automatic_block_limit: false,
backup: false,
block_data_limit: block_data_limit.into(),
// tier: 0,
head_block: Some(tx_synced),
peak_latency: Some(new_peak_latency()),
..Default::default()
@ -1564,7 +1397,6 @@ mod tests {
automatic_block_limit: false,
backup: false,
block_data_limit: block_data_limit.into(),
// tier: 0,
head_block: Some(tx_lagged),
peak_latency: Some(new_peak_latency()),
..Default::default()
@ -1586,7 +1418,7 @@ mod tests {
let (block_sender, _block_receiver) = flume::unbounded();
let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded();
let (watch_consensus_rpcs_sender, _watch_consensus_rpcs_receiver) = watch::channel(None);
let (watch_ranked_rpcs, _watch_consensus_rpcs_receiver) = watch::channel(None);
let (watch_consensus_head_sender, _watch_consensus_head_receiver) = watch::channel(None);
let chain_id = 1;
@ -1597,8 +1429,8 @@ mod tests {
by_name: RwLock::new(rpcs_by_name),
chain_id,
name: "test".to_string(),
watch_consensus_head_sender: Some(watch_consensus_head_sender),
watch_consensus_rpcs_sender,
watch_head_block: Some(watch_consensus_head_sender),
watch_ranked_rpcs,
pending_transaction_cache: CacheBuilder::new(100)
.time_to_live(Duration::from_secs(60))
.build(),
@ -1638,7 +1470,7 @@ mod tests {
// all_backend_connections gives all non-backup servers regardless of sync status
assert_eq!(
rpcs.all_connections(None, None, None, None, false, None)
rpcs.all_connections(None, None, None, None, None)
.await
.unwrap()
.len(),
@ -1802,15 +1634,8 @@ mod tests {
assert!(matches!(future_rpc, Ok(OpenRequestResult::NotReady)));
}
#[tokio::test(start_paused = true)]
#[test_log::test(tokio::test)]
async fn test_server_selection_by_archive() {
// // TODO: how should we do test logging setup with tracing?
// let _ = env_logger::builder()
// .filter_level(LevelFilter::Error)
// .filter_module("web3_proxy", LevelFilter::Trace)
// .is_test(true)
// .try_init();
let now = chrono::Utc::now().timestamp().into();
let head_block = Block {
@ -1831,7 +1656,7 @@ mod tests {
automatic_block_limit: false,
backup: false,
block_data_limit: 64.into(),
// tier: 1,
tier: 1.into(),
head_block: Some(tx_pruned),
..Default::default()
};
@ -1844,7 +1669,7 @@ mod tests {
automatic_block_limit: false,
backup: false,
block_data_limit: u64::MAX.into(),
// tier: 2,
tier: 2.into(),
head_block: Some(tx_archive),
..Default::default()
};
@ -1864,7 +1689,7 @@ mod tests {
let (block_sender, _) = flume::unbounded();
let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded();
let (watch_consensus_rpcs_sender, _) = watch::channel(None);
let (watch_ranked_rpcs, _) = watch::channel(None);
let (watch_consensus_head_sender, _watch_consensus_head_receiver) = watch::channel(None);
let chain_id = 1;
@ -1874,8 +1699,8 @@ mod tests {
by_name: RwLock::new(rpcs_by_name),
chain_id,
name: "test".to_string(),
watch_consensus_head_sender: Some(watch_consensus_head_sender),
watch_consensus_rpcs_sender,
watch_head_block: Some(watch_consensus_head_sender),
watch_ranked_rpcs,
pending_transaction_cache: CacheBuilder::new(100)
.time_to_live(Duration::from_secs(120))
.build(),
@ -1897,8 +1722,8 @@ mod tests {
let mut connection_heads = ConsensusFinder::new(None, None);
// min sum soft limit will require tier 2
connection_heads
// min sum soft limit will require 2 servers
let x = connection_heads
.process_block_from_rpc(
&rpcs,
&authorization,
@ -1907,9 +1732,12 @@ mod tests {
&None,
)
.await
.unwrap_err();
.unwrap();
assert!(!x);
connection_heads
assert_eq!(rpcs.num_synced_rpcs(), 0);
let x = connection_heads
.process_block_from_rpc(
&rpcs,
&authorization,
@ -1919,6 +1747,7 @@ mod tests {
)
.await
.unwrap();
assert!(x);
assert_eq!(rpcs.num_synced_rpcs(), 2);
@ -1976,8 +1805,7 @@ mod tests {
}
}
// TODO: #[test_log::test(tokio::test)]
#[tokio::test]
#[test_log::test(tokio::test)]
async fn test_all_connections() {
// TODO: use chrono, not SystemTime
let now: U256 = SystemTime::now()
@ -2049,7 +1877,7 @@ mod tests {
let (block_sender, _) = flume::unbounded();
let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded();
let (watch_consensus_rpcs_sender, _) = watch::channel(None);
let (watch_ranked_rpcs, _) = watch::channel(None);
let (watch_consensus_head_sender, _watch_consensus_head_receiver) = watch::channel(None);
let chain_id = 1;
@ -2060,8 +1888,8 @@ mod tests {
by_name: RwLock::new(rpcs_by_name),
chain_id,
name: "test".to_string(),
watch_consensus_head_sender: Some(watch_consensus_head_sender),
watch_consensus_rpcs_sender,
watch_head_block: Some(watch_consensus_head_sender),
watch_ranked_rpcs,
pending_transaction_cache: Cache::new(10_000),
pending_tx_id_receiver,
pending_tx_id_sender,
@ -2104,7 +1932,7 @@ mod tests {
// best_synced_backend_connection requires servers to be synced with the head block
// TODO: test with and without passing the head_block.number?
let head_connections = rpcs
.all_connections(None, Some(block_2.number()), None, None, false, None)
.all_connections(None, Some(block_2.number()), None, None, None)
.await;
debug!("head_connections: {:#?}", head_connections);
@ -2116,7 +1944,7 @@ mod tests {
);
let all_connections = rpcs
.all_connections(None, Some(block_1.number()), None, None, false, None)
.all_connections(None, Some(block_1.number()), None, None, None)
.await;
debug!("all_connections: {:#?}", all_connections);
@ -2127,9 +1955,7 @@ mod tests {
"wrong number of connections"
);
let all_connections = rpcs
.all_connections(None, None, None, None, false, None)
.await;
let all_connections = rpcs.all_connections(None, None, None, None, None).await;
debug!("all_connections: {:#?}", all_connections);

@ -234,6 +234,7 @@ impl Web3Rpc {
/// TODO: tests on this!
/// TODO: should tier or block number take priority?
/// TODO: should this return a struct that implements sorting traits?
/// TODO: move this to consensus.rs
fn sort_on(&self, max_block: Option<U64>) -> (bool, Reverse<U64>, u32) {
let mut head_block = self
.head_block
@ -252,6 +253,7 @@ impl Web3Rpc {
(!backup, Reverse(head_block), tier)
}
/// TODO: move this to consensus.rs
pub fn sort_for_load_balancing_on(
&self,
max_block: Option<U64>,
@ -268,6 +270,7 @@ impl Web3Rpc {
}
/// like sort_for_load_balancing, but shuffles tiers randomly instead of sorting by weighted_peak_latency
/// TODO: move this to consensus.rs
pub fn shuffle_for_load_balancing_on(
&self,
max_block: Option<U64>,
@ -444,6 +447,7 @@ impl Web3Rpc {
.into());
}
// TODO: only do this for balanced_rpcs. this errors on 4337 rpcs
self.check_block_data_limit()
.await
.context(format!("unable to check_block_data_limit of {}", self))?;