getting close

This commit is contained in:
Bryan Stitt 2023-02-14 17:41:40 -08:00
parent f1e6de3677
commit c65ffc9ce0
8 changed files with 352 additions and 322 deletions

@ -204,7 +204,7 @@ pub struct Web3ProxyApp {
response_cache: ResponseCache,
// don't drop this or the sender will stop working
// TODO: broadcast channel instead?
watch_consensus_head_receiver: watch::Receiver<Web3ProxyBlock>,
watch_consensus_head_receiver: watch::Receiver<Option<Web3ProxyBlock>>,
pending_tx_sender: broadcast::Sender<TxStatus>,
pub config: AppConfig,
pub db_conn: Option<sea_orm::DatabaseConnection>,
@ -541,8 +541,7 @@ impl Web3ProxyApp {
};
// TODO: i don't like doing Block::default here! Change this to "None"?
let (watch_consensus_head_sender, watch_consensus_head_receiver) =
watch::channel(Web3ProxyBlock::default());
let (watch_consensus_head_sender, watch_consensus_head_receiver) = watch::channel(None);
// TODO: will one receiver lagging be okay? how big should this be?
let (pending_tx_sender, pending_tx_receiver) = broadcast::channel(256);
@ -624,7 +623,7 @@ impl Web3ProxyApp {
.await
.context("spawning private_rpcs")?;
if private_rpcs.conns.is_empty() {
if private_rpcs.by_name.is_empty() {
None
} else {
// save the handle to catch any errors
@ -740,7 +739,7 @@ impl Web3ProxyApp {
Ok((app, cancellable_handles, important_background_handles).into())
}
pub fn head_block_receiver(&self) -> watch::Receiver<Web3ProxyBlock> {
pub fn head_block_receiver(&self) -> watch::Receiver<Option<Web3ProxyBlock>> {
self.watch_consensus_head_receiver.clone()
}
@ -938,7 +937,7 @@ impl Web3ProxyApp {
JsonRpcRequestEnum::Single(request) => {
let (response, rpcs) = timeout(
max_time,
self.proxy_cached_request(&authorization, request, proxy_mode),
self.proxy_cached_request(&authorization, request, proxy_mode, None),
)
.await??;
@ -971,10 +970,26 @@ impl Web3ProxyApp {
// TODO: spawn so the requests go in parallel? need to think about rate limiting more if we do that
// TODO: improve flattening
// get the head block now so that any requests that need it all use the same block
// TODO: FrontendErrorResponse that handles "no servers synced" in a consistent way
// TODO: this still has an edge condition if there is a reorg in the middle of the request!!!
let head_block_num = self
.balanced_rpcs
.head_block_num()
.context(anyhow::anyhow!("no servers synced"))?;
let responses = join_all(
requests
.into_iter()
.map(|request| self.proxy_cached_request(authorization, request, proxy_mode))
.map(|request| {
self.proxy_cached_request(
authorization,
request,
proxy_mode,
Some(head_block_num),
)
})
.collect::<Vec<_>>(),
)
.await;
@ -1023,6 +1038,7 @@ impl Web3ProxyApp {
authorization: &Arc<Authorization>,
mut request: JsonRpcRequest,
proxy_mode: ProxyMode,
head_block_num: Option<U64>,
) -> Result<(JsonRpcForwardedResponse, Vec<Arc<Web3Rpc>>), FrontendErrorResponse> {
// trace!("Received request: {:?}", request);
@ -1139,7 +1155,7 @@ impl Web3ProxyApp {
serde_json::Value::Array(vec![])
}
"eth_blockNumber" => {
match self.balanced_rpcs.head_block_num() {
match head_block_num.or(self.balanced_rpcs.head_block_num()) {
Some(head_block_num) => {
json!(head_block_num)
}
@ -1237,7 +1253,11 @@ impl Web3ProxyApp {
(&self.balanced_rpcs, default_num)
};
let head_block_num = self.balanced_rpcs.head_block_num();
let head_block_num = head_block_num
.or(self.balanced_rpcs.head_block_num())
.ok_or_else(|| anyhow::anyhow!("no servers synced"))?;
// TODO: error/wait if no head block!
// try_send_all_upstream_servers puts the request id into the response. no need to do that ourselves here.
let mut response = private_rpcs
@ -1245,7 +1265,7 @@ impl Web3ProxyApp {
authorization,
&request,
Some(request_metadata.clone()),
head_block_num.as_ref(),
Some(&head_block_num),
None,
Level::Trace,
num,
@ -1440,9 +1460,8 @@ impl Web3ProxyApp {
// emit stats
// TODO: if no servers synced, wait for them to be synced? probably better to error and let haproxy retry another server
let head_block_num = self
.balanced_rpcs
.head_block_num()
let head_block_num = head_block_num
.or(self.balanced_rpcs.head_block_num())
.context("no servers synced")?;
// we do this check before checking caches because it might modify the request params

@ -61,6 +61,12 @@ impl Web3ProxyApp {
);
while let Some(new_head) = head_block_receiver.next().await {
let new_head = if let Some(new_head) = new_head {
new_head
} else {
continue;
};
// TODO: what should the payload for RequestMetadata be?
let request_metadata =
Arc::new(RequestMetadata::new(REQUEST_PERIOD, 0).unwrap());

@ -64,7 +64,7 @@ async fn run(
));
// wait until the app has seen its first consensus head block
// TODO: if backups were included, wait a little longer
// TODO: if backups were included, wait a little longer?
let _ = spawned_app.app.head_block_receiver().changed().await;
// start the frontend port

@ -5,7 +5,7 @@ use super::one::Web3Rpc;
use super::transactions::TxStatus;
use crate::frontend::authorization::Authorization;
use crate::{config::BlockAndRpc, jsonrpc::JsonRpcRequest};
use anyhow::Context;
use anyhow::{anyhow, Context};
use derive_more::From;
use ethers::prelude::{Block, TxHash, H256, U64};
use hashbrown::HashSet;
@ -45,7 +45,11 @@ impl PartialEq for Web3ProxyBlock {
impl Web3ProxyBlock {
/// A new block has arrived over a subscription
pub fn new(block: ArcBlock) -> Self {
pub fn try_new(block: ArcBlock) -> Option<Self> {
if block.number.is_none() || block.hash.is_none() {
return None;
}
let mut x = Self {
block,
received_age: None,
@ -56,7 +60,7 @@ impl Web3ProxyBlock {
// TODO: emit a stat for received_age
x.received_age = Some(x.age());
x
Some(x)
}
pub fn age(&self) -> u64 {
@ -97,12 +101,20 @@ impl Web3ProxyBlock {
}
}
impl From<ArcBlock> for Web3ProxyBlock {
fn from(x: ArcBlock) -> Self {
Web3ProxyBlock {
impl TryFrom<ArcBlock> for Web3ProxyBlock {
type Error = anyhow::Error;
fn try_from(x: ArcBlock) -> Result<Self, Self::Error> {
if x.number.is_none() || x.hash.is_none() {
return Err(anyhow!("Blocks here must have a number of hash"));
}
let b = Web3ProxyBlock {
block: x,
received_age: None,
}
};
Ok(b)
}
}
@ -184,7 +196,13 @@ impl Web3Rpcs {
None,
)
.await?
.map(Into::into)
.and_then(|x| {
if x.number.is_none() {
None
} else {
x.try_into().ok()
}
})
.context("no block!")?,
None => {
// TODO: helper for method+params => JsonRpcRequest
@ -208,8 +226,10 @@ impl Web3Rpcs {
let block: Option<ArcBlock> = serde_json::from_str(block.get())?;
// TODO: from isn't great here. received time is going to be weird
block.map(Into::into).context("no block!")?
let block: ArcBlock = block.context("no block in the response")?;
// TODO: received time is going to be weird
Web3ProxyBlock::try_from(block)?
}
};
@ -252,7 +272,11 @@ impl Web3Rpcs {
// be sure the requested block num exists
// TODO: is this okay? what if we aren't synced?!
let mut head_block_num = *consensus_head_receiver.borrow_and_update().number();
let mut head_block_num = *consensus_head_receiver
.borrow_and_update()
.as_ref()
.context("no consensus head block")?
.number();
loop {
if num <= &head_block_num {
@ -262,7 +286,9 @@ impl Web3Rpcs {
trace!("waiting for future block {} > {}", num, head_block_num);
consensus_head_receiver.changed().await?;
head_block_num = *consensus_head_receiver.borrow_and_update().number();
if let Some(head) = consensus_head_receiver.borrow_and_update().as_ref() {
head_block_num = *head.number();
}
}
let block_depth = (head_block_num - num).as_u64();
@ -297,7 +323,7 @@ impl Web3Rpcs {
let block: ArcBlock = serde_json::from_str(raw_block.get())?;
let block = Web3ProxyBlock::from(block);
let block = Web3ProxyBlock::try_from(block)?;
// the block was fetched using eth_getBlockByNumber, so it should have all fields and be on the heaviest chain
let block = self.try_cache_block(block, true).await?;
@ -311,13 +337,13 @@ impl Web3Rpcs {
block_receiver: flume::Receiver<BlockAndRpc>,
// TODO: document that this is a watch sender and not a broadcast! if things get busy, blocks might get missed
// Geth's subscriptions have the same potential for skipping blocks.
head_block_sender: watch::Sender<Web3ProxyBlock>,
head_block_sender: watch::Sender<Option<Web3ProxyBlock>>,
pending_tx_sender: Option<broadcast::Sender<TxStatus>>,
) -> anyhow::Result<()> {
// TODO: indexmap or hashmap? what hasher? with_capacity?
// TODO: this will grow unbounded. prune old heads on this at the same time we prune the graph?
let configured_tiers: Vec<u64> = self
.conns
.by_name
.values()
.map(|x| x.tier)
.collect::<HashSet<u64>>()
@ -363,7 +389,7 @@ impl Web3Rpcs {
consensus_finder: &mut ConsensusFinder,
rpc_head_block: Option<Web3ProxyBlock>,
rpc: Arc<Web3Rpc>,
head_block_sender: &watch::Sender<Web3ProxyBlock>,
head_block_sender: &watch::Sender<Option<Web3ProxyBlock>>,
pending_tx_sender: &Option<broadcast::Sender<TxStatus>>,
) -> anyhow::Result<()> {
// TODO: how should we handle an error here?
@ -392,12 +418,11 @@ impl Web3Rpcs {
let backups_needed = new_synced_connections.backups_needed;
let consensus_head_block = new_synced_connections.head_block.clone();
let num_consensus_rpcs = new_synced_connections.num_conns();
let num_checked_rpcs = 0; // TODO: figure this out
let num_active_rpcs = consensus_finder
.all_rpcs_group()
.map(|x| x.len())
.unwrap_or_default();
let total_rpcs = self.conns.len();
let total_rpcs = self.by_name.len();
let old_consensus_head_connections = self
.watch_consensus_rpcs_sender
@ -409,10 +434,9 @@ impl Web3Rpcs {
match &old_consensus_head_connections.head_block {
None => {
debug!(
"first {}{}/{}/{}/{} block={}, rpc={}",
"first {}{}/{}/{} block={}, rpc={}",
backups_voted_str,
num_consensus_rpcs,
num_checked_rpcs,
num_active_rpcs,
total_rpcs,
consensus_head_block,
@ -429,7 +453,7 @@ impl Web3Rpcs {
self.try_cache_block(consensus_head_block, true).await?;
head_block_sender
.send(consensus_head_block)
.send(Some(consensus_head_block))
.context("head_block_sender sending consensus_head_block")?;
}
Some(old_head_block) => {
@ -445,10 +469,9 @@ impl Web3Rpcs {
// no change in hash. no need to use head_block_sender
// TODO: trace level if rpc is backup
debug!(
"con {}{}/{}/{}/{} con={} rpc={}@{}",
"con {}{}/{}/{} con={} rpc={}@{}",
backups_voted_str,
num_consensus_rpcs,
num_checked_rpcs,
num_active_rpcs,
total_rpcs,
consensus_head_block,
@ -463,10 +486,9 @@ impl Web3Rpcs {
}
debug!(
"unc {}{}/{}/{}/{} con_head={} old={} rpc={}@{}",
"unc {}{}/{}/{} con_head={} old={} rpc={}@{}",
backups_voted_str,
num_consensus_rpcs,
num_checked_rpcs,
num_active_rpcs,
total_rpcs,
consensus_head_block,
@ -481,7 +503,7 @@ impl Web3Rpcs {
.context("save consensus_head_block as heaviest chain")?;
head_block_sender
.send(consensus_head_block)
.send(Some(consensus_head_block))
.context("head_block_sender sending consensus_head_block")?;
}
}
@ -489,10 +511,9 @@ impl Web3Rpcs {
// this is unlikely but possible
// TODO: better log
warn!(
"chain rolled back {}{}/{}/{}/{} con={} old={} rpc={}@{}",
"chain rolled back {}{}/{}/{} con={} old={} rpc={}@{}",
backups_voted_str,
num_consensus_rpcs,
num_checked_rpcs,
num_active_rpcs,
total_rpcs,
consensus_head_block,
@ -515,15 +536,14 @@ impl Web3Rpcs {
)?;
head_block_sender
.send(consensus_head_block)
.send(Some(consensus_head_block))
.context("head_block_sender sending consensus_head_block")?;
}
Ordering::Greater => {
debug!(
"new {}{}/{}/{}/{} con={} rpc={}@{}",
"new {}{}/{}/{} con={} rpc={}@{}",
backups_voted_str,
num_consensus_rpcs,
num_checked_rpcs,
num_active_rpcs,
total_rpcs,
consensus_head_block,
@ -539,7 +559,7 @@ impl Web3Rpcs {
let consensus_head_block =
self.try_cache_block(consensus_head_block, true).await?;
head_block_sender.send(consensus_head_block)?;
head_block_sender.send(Some(consensus_head_block))?;
}
}
}
@ -550,23 +570,23 @@ impl Web3Rpcs {
.map(|x| x.to_string())
.unwrap_or_else(|| "None".to_string());
if num_checked_rpcs >= self.min_head_rpcs {
if num_active_rpcs >= self.min_head_rpcs {
// no consensus!!!
error!(
"non {}{}/{}/{}/{} rpc={}@{}",
"non {}{}/{}/{} rpc={}@{}",
backups_voted_str,
num_consensus_rpcs,
num_checked_rpcs,
num_active_rpcs,
total_rpcs,
rpc,
rpc_head_str,
);
} else {
// no consensus, but we do not have enough rpcs connected yet to panic
debug!(
"non {}{}/{}/{}/{} rpc={}@{}",
"non {}{}/{}/{} rpc={}@{}",
backups_voted_str,
num_consensus_rpcs,
num_checked_rpcs,
num_active_rpcs,
total_rpcs,
rpc,

@ -19,18 +19,18 @@ pub struct ConsensusWeb3Rpcs {
pub(super) head_block: Option<Web3ProxyBlock>,
// TODO: this should be able to serialize, but it isn't
#[serde(skip_serializing)]
pub(super) conns: Vec<Arc<Web3Rpc>>,
pub(super) rpcs: Vec<Arc<Web3Rpc>>,
pub(super) backups_voted: Option<Web3ProxyBlock>,
pub(super) backups_needed: bool,
}
impl ConsensusWeb3Rpcs {
pub fn num_conns(&self) -> usize {
self.conns.len()
self.rpcs.len()
}
pub fn sum_soft_limit(&self) -> u32 {
self.conns.iter().fold(0, |sum, rpc| sum + rpc.soft_limit)
self.rpcs.iter().fold(0, |sum, rpc| sum + rpc.soft_limit)
}
// TODO: sum_hard_limit?
@ -42,7 +42,7 @@ impl fmt::Debug for ConsensusWeb3Rpcs {
// TODO: print the actual conns?
f.debug_struct("ConsensusConnections")
.field("head_block", &self.head_block)
.field("num_conns", &self.conns.len())
.field("num_conns", &self.rpcs.len())
.finish_non_exhaustive()
}
}
@ -52,7 +52,7 @@ impl Web3Rpcs {
pub fn head_block(&self) -> Option<Web3ProxyBlock> {
self.watch_consensus_head_receiver
.as_ref()
.map(|x| x.borrow().clone())
.and_then(|x| x.borrow().clone())
}
// TODO: return a ref?
@ -66,11 +66,11 @@ impl Web3Rpcs {
}
pub fn synced(&self) -> bool {
!self.watch_consensus_rpcs_sender.borrow().conns.is_empty()
!self.watch_consensus_rpcs_sender.borrow().rpcs.is_empty()
}
pub fn num_synced_rpcs(&self) -> usize {
self.watch_consensus_rpcs_sender.borrow().conns.len()
self.watch_consensus_rpcs_sender.borrow().rpcs.len()
}
}
@ -243,7 +243,7 @@ impl ConnectionsGroup {
continue;
}
if let Some(rpc) = web3_rpcs.conns.get(rpc_name.as_str()) {
if let Some(rpc) = web3_rpcs.by_name.get(rpc_name.as_str()) {
if backup_rpcs_voted.is_some() {
// backups already voted for a head block. don't change it
} else {
@ -257,7 +257,7 @@ impl ConnectionsGroup {
} else {
// i don't think this is an error. i think its just if a reconnect is currently happening
warn!("connection missing: {}", rpc_name);
debug!("web3_rpcs.conns: {:#?}", web3_rpcs.conns);
debug!("web3_rpcs.conns: {:#?}", web3_rpcs.by_name);
}
}
@ -340,7 +340,7 @@ impl ConnectionsGroup {
// success! this block has enough soft limit and nodes on it (or on later blocks)
let conns: Vec<Arc<Web3Rpc>> = primary_consensus_rpcs
.into_iter()
.filter_map(|conn_name| web3_rpcs.conns.get(conn_name).cloned())
.filter_map(|conn_name| web3_rpcs.by_name.get(conn_name).cloned())
.collect();
#[cfg(debug_assertions)]
@ -350,7 +350,7 @@ impl ConnectionsGroup {
Ok(ConsensusWeb3Rpcs {
head_block: Some(maybe_head_block),
conns,
rpcs: conns,
backups_voted: backup_rpcs_voted,
backups_needed: primary_rpcs_voted.is_none(),
})
@ -528,7 +528,7 @@ impl ConsensusFinder {
// TODO: find the best tier with a connectionsgroup. best case, this only queries the first tier
// TODO: do we need to calculate all of them? I think having highest_known_block included as part of min_block_num should make that unnecessary
for (i, x) in self.tiers.iter() {
trace!("checking tier {}", i);
trace!("checking tier {}: {:#?}", i, x.rpc_name_to_block);
if let Ok(consensus_head_connections) = x
.consensus_head_connections(authorization, web3_connections, min_block_num)
.await
@ -543,3 +543,11 @@ impl ConsensusFinder {
return Err(anyhow::anyhow!("failed finding consensus on all tiers"));
}
}
#[cfg(test)]
mod test {
#[test]
fn test_simplest_case_consensus_head_connections() {
todo!();
}
}

@ -16,6 +16,7 @@ use futures::future::try_join_all;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use hashbrown::{HashMap, HashSet};
use itertools::Itertools;
use log::{debug, error, info, trace, warn, Level};
use migration::sea_orm::DatabaseConnection;
use moka::future::{Cache, ConcurrentCacheExt};
@ -23,6 +24,7 @@ use serde::ser::{SerializeStruct, Serializer};
use serde::Serialize;
use serde_json::json;
use serde_json::value::RawValue;
use std::cmp::min_by_key;
use std::collections::BTreeMap;
use std::sync::atomic::Ordering;
use std::sync::Arc;
@ -36,11 +38,11 @@ use tokio::time::{interval, sleep, sleep_until, Duration, Instant, MissedTickBeh
#[derive(From)]
pub struct Web3Rpcs {
/// any requests will be forwarded to one (or more) of these connections
pub(crate) conns: HashMap<String, Arc<Web3Rpc>>,
pub(crate) by_name: HashMap<String, Arc<Web3Rpc>>,
/// all providers with the same consensus head block. won't update if there is no `self.watch_consensus_head_sender`
pub(super) watch_consensus_rpcs_sender: watch::Sender<Arc<ConsensusWeb3Rpcs>>,
/// this head receiver makes it easy to wait until there is a new block
pub(super) watch_consensus_head_receiver: Option<watch::Receiver<Web3ProxyBlock>>,
pub(super) watch_consensus_head_receiver: Option<watch::Receiver<Option<Web3ProxyBlock>>>,
pub(super) pending_transactions:
Cache<TxHash, TxStatus, hashbrown::hash_map::DefaultHashBuilder>,
/// TODO: this map is going to grow forever unless we do some sort of pruning. maybe store pruned in redis?
@ -74,7 +76,7 @@ impl Web3Rpcs {
pending_tx_sender: Option<broadcast::Sender<TxStatus>>,
redis_pool: Option<redis_rate_limiter::RedisPool>,
server_configs: HashMap<String, Web3RpcConfig>,
watch_consensus_head_sender: Option<watch::Sender<Web3ProxyBlock>>,
watch_consensus_head_sender: Option<watch::Sender<Option<Web3ProxyBlock>>>,
) -> anyhow::Result<(Arc<Self>, AnyhowJoinHandle<()>)> {
let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded();
let (block_sender, block_receiver) = flume::unbounded::<BlockAndRpc>();
@ -219,7 +221,7 @@ impl Web3Rpcs {
watch_consensus_head_sender.as_ref().map(|x| x.subscribe());
let connections = Arc::new(Self {
conns: connections,
by_name: connections,
watch_consensus_rpcs_sender: watch_consensus_connections_sender,
watch_consensus_head_receiver,
pending_transactions,
@ -253,7 +255,7 @@ impl Web3Rpcs {
}
pub fn get(&self, conn_name: &str) -> Option<&Arc<Web3Rpc>> {
self.conns.get(conn_name)
self.by_name.get(conn_name)
}
/// subscribe to blocks and transactions from all the backend rpcs.
@ -264,7 +266,7 @@ impl Web3Rpcs {
authorization: Arc<Authorization>,
pending_tx_id_receiver: flume::Receiver<TxHashAndRpc>,
block_receiver: flume::Receiver<BlockAndRpc>,
head_block_sender: Option<watch::Sender<Web3ProxyBlock>>,
head_block_sender: Option<watch::Sender<Option<Web3ProxyBlock>>>,
pending_tx_sender: Option<broadcast::Sender<TxStatus>>,
) -> anyhow::Result<()> {
let mut futures = vec![];
@ -426,70 +428,64 @@ impl Web3Rpcs {
min_block_needed: Option<&U64>,
max_block_needed: Option<&U64>,
) -> anyhow::Result<OpenRequestResult> {
if let Ok(without_backups) = self
._best_consensus_head_connection(
false,
authorization,
request_metadata,
skip,
min_block_needed,
max_block_needed,
)
.await
{
// TODO: this might use backups too eagerly. but even when we allow backups, we still prioritize our own
if matches!(without_backups, OpenRequestResult::Handle(_)) {
return Ok(without_backups);
}
}
self._best_consensus_head_connection(
true,
authorization,
request_metadata,
skip,
min_block_needed,
max_block_needed,
)
.await
}
/// get the best available rpc server with the consensus head block. it might have blocks after the consensus head
async fn _best_consensus_head_connection(
&self,
allow_backups: bool,
authorization: &Arc<Authorization>,
request_metadata: Option<&Arc<RequestMetadata>>,
skip: &[Arc<Web3Rpc>],
min_block_needed: Option<&U64>,
max_block_needed: Option<&U64>,
) -> anyhow::Result<OpenRequestResult> {
let usable_rpcs_by_head_num_and_weight: BTreeMap<(Option<U64>, u64), Vec<Arc<Web3Rpc>>> = {
let usable_rpcs_by_tier_and_head_number: BTreeMap<(u64, Option<U64>), Vec<Arc<Web3Rpc>>> = {
let synced_connections = self.watch_consensus_rpcs_sender.borrow().clone();
let head_block_num = if let Some(head_block) = synced_connections.head_block.as_ref() {
head_block.number()
} else {
// TODO: optionally wait for a head block >= min_block_needed
return Ok(OpenRequestResult::NotReady(allow_backups));
let (head_block_num, head_block_age) =
if let Some(head_block) = synced_connections.head_block.as_ref() {
(head_block.number(), head_block.age())
} else {
// TODO: optionally wait for a head_block.number() >= min_block_needed
// TODO: though i think that wait would actually need to be earlier in the request
return Ok(OpenRequestResult::NotReady);
};
let needed_blocks_comparison = match (min_block_needed, max_block_needed) {
(None, None) => {
// no required block given. treat this like the requested the consensus head block
cmp::Ordering::Equal
}
(None, Some(max_block_needed)) => max_block_needed.cmp(head_block_num),
(Some(min_block_needed), None) => min_block_needed.cmp(head_block_num),
(Some(min_block_needed), Some(max_block_needed)) => {
match min_block_needed.cmp(max_block_needed) {
cmp::Ordering::Equal => min_block_needed.cmp(head_block_num),
cmp::Ordering::Greater => {
return Err(anyhow::anyhow!(
"Invalid blocks bounds requested. min ({}) > max ({})",
min_block_needed,
max_block_needed
))
}
cmp::Ordering::Less => {
// hmmmm
todo!("now what do we do?");
}
}
}
};
let min_block_needed = min_block_needed.unwrap_or(&head_block_num);
// collect "usable_rpcs_by_head_num_and_weight"
// TODO: MAKE SURE None SORTS LAST?
let mut m = BTreeMap::new();
match min_block_needed.cmp(&head_block_num) {
match needed_blocks_comparison {
cmp::Ordering::Less => {
// need an old block. check all the rpcs. prefer the most synced
// need an old block. check all the rpcs. ignore rpcs that are still syncing
let min_block_age =
self.max_block_age.map(|x| head_block_age.saturating_sub(x));
let min_sync_num = self.max_block_lag.map(|x| head_block_num.saturating_sub(x));
// TODO: cache this somehow?
// TODO: maybe have a helper on synced_connections? that way sum_soft_limits/min_synced_rpcs will be DRY
for x in self
.conns
.by_name
.values()
.filter(|x| {
if !allow_backups && x.backup {
false
} else if skip.contains(x) {
false
} else if !x.has_block_data(min_block_needed) {
// TODO: move a bunch of this onto a rpc.is_synced function
if skip.contains(x) {
// we've already tried this server or have some other reason to skip it
false
} else if max_block_needed
.and_then(|max_block_needed| {
@ -497,8 +493,18 @@ impl Web3Rpcs {
})
.unwrap_or(false)
{
// server does not have the max block
false
} else if min_block_needed
.and_then(|min_block_needed| {
Some(!x.has_block_data(min_block_needed))
})
.unwrap_or(false)
{
// server does not have the min block
false
} else {
// server has the block we need!
true
}
})
@ -506,32 +512,43 @@ impl Web3Rpcs {
{
let x_head_block = x.head_block.read().clone();
match x_head_block {
None => continue,
Some(x_head) => {
let key = (Some(*x_head.number()), u64::MAX - x.tier);
if let Some(x_head) = x_head_block {
// TODO: should nodes that are ahead of the consensus block have priority? seems better to spread the load
let x_head_num = x_head.number().min(head_block_num);
m.entry(key).or_insert_with(Vec::new).push(x);
// TODO: do we really need to check head_num and age?
if let Some(min_sync_num) = min_sync_num.as_ref() {
if x_head_num < min_sync_num {
continue;
}
}
if let Some(min_block_age) = min_block_age {
if x_head.age() < min_block_age {
// rpc is still syncing
continue;
}
}
let key = (x.tier, Some(*x_head_num));
m.entry(key).or_insert_with(Vec::new).push(x);
}
}
// TODO: check min_synced_rpcs and min_sum_soft_limits? or maybe better to just try to serve the request?
}
cmp::Ordering::Equal => {
// need the consensus head block. filter the synced rpcs
// TODO: this doesn't properly check the allow_backups variable!
for x in synced_connections
.conns
.iter()
.filter(|x| !skip.contains(x))
{
let key = (None, u64::MAX - x.tier);
for x in synced_connections.rpcs.iter().filter(|x| !skip.contains(x)) {
// the key doesn't matter if we are checking synced connections. its already sized to what we need
let key = (0, None);
m.entry(key).or_insert_with(Vec::new).push(x.clone());
}
}
cmp::Ordering::Greater => {
// TODO? if the blocks is close, wait for change on a watch_consensus_connections_receiver().subscribe()
return Ok(OpenRequestResult::NotReady(allow_backups));
// TODO? if the blocks is close, maybe we could wait for change on a watch_consensus_connections_receiver().subscribe()
return Ok(OpenRequestResult::NotReady);
}
}
@ -540,42 +557,24 @@ impl Web3Rpcs {
let mut earliest_retry_at = None;
for usable_rpcs in usable_rpcs_by_head_num_and_weight.into_values().rev() {
// we sort on a combination of values. cache them here so that we don't do this math multiple times.
// TODO: is this necessary if we use sort_by_cached_key?
let available_request_map: HashMap<_, f64> = usable_rpcs
.iter()
.map(|rpc| {
// TODO: weighted sort by remaining hard limit?
// TODO: weighted sort by soft_limit - ewma_active_requests? that assumes soft limits are any good
(rpc, 1.0)
})
.collect();
warn!("todo: better sort here");
let sorted_rpcs = {
if usable_rpcs.len() == 1 {
// TODO: try the next tier
vec![usable_rpcs.get(0).expect("there should be 1")]
} else {
let mut rng = thread_fast_rng::thread_fast_rng();
usable_rpcs
.choose_multiple_weighted(&mut rng, usable_rpcs.len(), |rpc| {
*available_request_map
.get(rpc)
.expect("rpc should always be in available_request_map")
})
.unwrap()
.collect::<Vec<_>>()
}
for mut usable_rpcs in usable_rpcs_by_tier_and_head_number.into_values() {
// sort the tier randomly
if usable_rpcs.len() == 1 {
// TODO: include an rpc from the next tier?
} else {
// we can't get the rng outside of this loop because it is not Send
// this function should be pretty fast anyway, so it shouldn't matter too much
let mut rng = thread_fast_rng::thread_fast_rng();
usable_rpcs.shuffle(&mut rng);
};
// now that the rpcs are sorted, try to get an active request handle for one of them
// TODO: pick two randomly and choose the one with the lower rpc.latency.ewma
for best_rpc in sorted_rpcs.into_iter() {
// increment our connection counter
// now that the rpcs are shuffled, try to get an active request handle for one of them
// pick the first two and try the one with the lower rpc.latency.ewma
// TODO: chunks or tuple windows?
for (rpc_a, rpc_b) in usable_rpcs.into_iter().circular_tuple_windows() {
let best_rpc = min_by_key(rpc_a, rpc_b, |x| x.latency.request_ewma);
// just because it has lower latency doesn't mean we are sure to get a connection
match best_rpc.try_request_handle(authorization, None).await {
Ok(OpenRequestResult::Handle(handle)) => {
// trace!("opened handle: {}", best_rpc);
@ -584,7 +583,7 @@ impl Web3Rpcs {
Ok(OpenRequestResult::RetryAt(retry_at)) => {
earliest_retry_at = earliest_retry_at.min(Some(retry_at));
}
Ok(OpenRequestResult::NotReady(_)) => {
Ok(OpenRequestResult::NotReady) => {
// TODO: log a warning? emit a stat?
}
Err(err) => {
@ -614,7 +613,7 @@ impl Web3Rpcs {
// TODO: should we log here?
Ok(OpenRequestResult::NotReady(allow_backups))
Ok(OpenRequestResult::NotReady)
}
Some(earliest_retry_at) => {
warn!("no servers on {:?}! {:?}", self, earliest_retry_at);
@ -676,19 +675,19 @@ impl Web3Rpcs {
let mut max_count = if let Some(max_count) = max_count {
max_count
} else {
self.conns.len()
self.by_name.len()
};
let mut tried = HashSet::new();
let mut synced_conns = self.watch_consensus_rpcs_sender.borrow().conns.clone();
let mut synced_conns = self.watch_consensus_rpcs_sender.borrow().rpcs.clone();
// synced connections are all on the same block. sort them by tier with higher soft limits first
synced_conns.sort_by_cached_key(rpc_sync_status_sort_key);
// if there aren't enough synced connections, include more connections
// TODO: only do this sorting if the synced_conns isn't enough
let mut all_conns: Vec<_> = self.conns.values().cloned().collect();
let mut all_conns: Vec<_> = self.by_name.values().cloned().collect();
all_conns.sort_by_cached_key(rpc_sync_status_sort_key);
for connection in itertools::chain(synced_conns, all_conns) {
@ -728,7 +727,7 @@ impl Web3Rpcs {
max_count -= 1;
selected_rpcs.push(handle)
}
Ok(OpenRequestResult::NotReady(_)) => {
Ok(OpenRequestResult::NotReady) => {
warn!("no request handle for {}", connection)
}
Err(err) => {
@ -767,7 +766,7 @@ impl Web3Rpcs {
loop {
let num_skipped = skip_rpcs.len();
if num_skipped == self.conns.len() {
if num_skipped == self.by_name.len() {
break;
}
@ -918,7 +917,7 @@ impl Web3Rpcs {
}
}
}
OpenRequestResult::NotReady(backups_included) => {
OpenRequestResult::NotReady => {
if let Some(request_metadata) = request_metadata {
request_metadata.no_servers.fetch_add(1, Ordering::Release);
}
@ -930,7 +929,7 @@ impl Web3Rpcs {
if let Some(min_block_needed) = min_block_needed {
let mut theres_a_chance = false;
for potential_conn in self.conns.values() {
for potential_conn in self.by_name.values() {
if skip_rpcs.contains(potential_conn) {
continue;
}
@ -951,23 +950,10 @@ impl Web3Rpcs {
}
}
if backups_included {
// if NotReady and we tried backups, there's no chance
warn!("No servers ready even after checking backups");
break;
}
debug!("No servers ready. Waiting up for change in synced servers");
debug!("No servers ready. Waiting up to 1 second for change in synced servers");
// TODO: exponential backoff?
tokio::select! {
_ = sleep(Duration::from_secs(1)) => {
// do NOT pop the last rpc off skip here
}
_ = watch_consensus_connections.changed() => {
watch_consensus_connections.borrow_and_update();
}
}
watch_consensus_connections.changed().await?;
watch_consensus_connections.borrow_and_update();
}
}
}
@ -984,7 +970,7 @@ impl Web3Rpcs {
.store(true, Ordering::Release);
}
let num_conns = self.conns.len();
let num_conns = self.by_name.len();
let num_skipped = skip_rpcs.len();
if num_skipped == 0 {
@ -1135,7 +1121,7 @@ impl fmt::Debug for Web3Rpcs {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// TODO: the default formatter takes forever to write. this is too quiet though
f.debug_struct("Web3Rpcs")
.field("conns", &self.conns)
.field("rpcs", &self.by_name)
.finish_non_exhaustive()
}
}
@ -1147,8 +1133,8 @@ impl Serialize for Web3Rpcs {
{
let mut state = serializer.serialize_struct("Web3Rpcs", 6)?;
let conns: Vec<&Web3Rpc> = self.conns.values().map(|x| x.as_ref()).collect();
state.serialize_field("conns", &conns)?;
let rpcs: Vec<&Web3Rpc> = self.by_name.values().map(|x| x.as_ref()).collect();
state.serialize_field("rpcs", &rpcs)?;
{
let consensus_connections = self.watch_consensus_rpcs_sender.borrow().clone();
@ -1218,7 +1204,7 @@ mod tests {
let blocks: Vec<_> = [block_0, block_1, block_2]
.into_iter()
.map(|x| Web3ProxyBlock::new(Arc::new(x)))
.map(|x| Web3ProxyBlock::try_new(Arc::new(x)).unwrap())
.collect();
let mut rpcs: Vec<_> = [
@ -1303,8 +1289,8 @@ mod tests {
let lagged_block = Arc::new(lagged_block);
let head_block = Arc::new(head_block);
let mut lagged_block: Web3ProxyBlock = lagged_block.into();
let mut head_block: Web3ProxyBlock = head_block.into();
let mut lagged_block: Web3ProxyBlock = lagged_block.try_into().unwrap();
let mut head_block: Web3ProxyBlock = head_block.try_into().unwrap();
let block_data_limit = u64::MAX;
@ -1341,7 +1327,7 @@ mod tests {
let head_rpc = Arc::new(head_rpc);
let lagged_rpc = Arc::new(lagged_rpc);
let conns = HashMap::from([
let rpcs_by_name = HashMap::from([
(head_rpc.name.clone(), head_rpc.clone()),
(lagged_rpc.name.clone(), lagged_rpc.clone()),
]);
@ -1349,8 +1335,8 @@ mod tests {
let (watch_consensus_rpcs_sender, _) = watch::channel(Default::default());
// TODO: make a Web3Rpcs::new
let conns = Web3Rpcs {
conns,
let rpcs = Web3Rpcs {
by_name: rpcs_by_name,
watch_consensus_head_receiver: None,
watch_consensus_rpcs_sender,
pending_transactions: Cache::builder()
@ -1376,38 +1362,33 @@ mod tests {
let mut consensus_finder = ConsensusFinder::new(&[0, 1, 2, 3], None, None);
// process None so that
conns
.process_block_from_rpc(
&authorization,
&mut consensus_finder,
None,
lagged_rpc.clone(),
&head_block_sender,
&None,
)
.await
.expect(
"its lagged, but it should still be seen as consensus if its the first to report",
);
conns
.process_block_from_rpc(
&authorization,
&mut consensus_finder,
None,
head_rpc.clone(),
&head_block_sender,
&None,
)
.await
.unwrap();
rpcs.process_block_from_rpc(
&authorization,
&mut consensus_finder,
None,
lagged_rpc.clone(),
&head_block_sender,
&None,
)
.await
.expect("its lagged, but it should still be seen as consensus if its the first to report");
rpcs.process_block_from_rpc(
&authorization,
&mut consensus_finder,
None,
head_rpc.clone(),
&head_block_sender,
&None,
)
.await
.unwrap();
// no head block because the rpcs haven't communicated through their channels
assert!(conns.head_block_hash().is_none());
assert!(rpcs.head_block_hash().is_none());
// all_backend_connections gives all non-backup servers regardless of sync status
assert_eq!(
conns
.all_connections(&authorization, None, None, None, false)
rpcs.all_connections(&authorization, None, None, None, false)
.await
.unwrap()
.len(),
@ -1415,87 +1396,80 @@ mod tests {
);
// best_synced_backend_connection requires servers to be synced with the head block
let x = conns
let x = rpcs
.best_consensus_head_connection(&authorization, None, &[], None, None)
.await
.unwrap();
dbg!(&x);
assert!(matches!(x, OpenRequestResult::NotReady(true)));
assert!(matches!(x, OpenRequestResult::NotReady));
// add lagged blocks to the conns. both servers should be allowed
lagged_block = conns.try_cache_block(lagged_block, true).await.unwrap();
// add lagged blocks to the rpcs. both servers should be allowed
lagged_block = rpcs.try_cache_block(lagged_block, true).await.unwrap();
conns
.process_block_from_rpc(
&authorization,
&mut consensus_finder,
Some(lagged_block.clone()),
lagged_rpc,
&head_block_sender,
&None,
)
.await
.unwrap();
conns
.process_block_from_rpc(
&authorization,
&mut consensus_finder,
Some(lagged_block.clone()),
head_rpc.clone(),
&head_block_sender,
&None,
)
.await
.unwrap();
rpcs.process_block_from_rpc(
&authorization,
&mut consensus_finder,
Some(lagged_block.clone()),
lagged_rpc,
&head_block_sender,
&None,
)
.await
.unwrap();
rpcs.process_block_from_rpc(
&authorization,
&mut consensus_finder,
Some(lagged_block.clone()),
head_rpc.clone(),
&head_block_sender,
&None,
)
.await
.unwrap();
assert_eq!(conns.num_synced_rpcs(), 2);
assert_eq!(rpcs.num_synced_rpcs(), 2);
// add head block to the conns. lagged_rpc should not be available
head_block = conns.try_cache_block(head_block, true).await.unwrap();
// add head block to the rpcs. lagged_rpc should not be available
head_block = rpcs.try_cache_block(head_block, true).await.unwrap();
conns
.process_block_from_rpc(
&authorization,
&mut consensus_finder,
Some(head_block.clone()),
head_rpc,
&head_block_sender,
&None,
)
.await
.unwrap();
rpcs.process_block_from_rpc(
&authorization,
&mut consensus_finder,
Some(head_block.clone()),
head_rpc,
&head_block_sender,
&None,
)
.await
.unwrap();
assert_eq!(conns.num_synced_rpcs(), 1);
assert_eq!(rpcs.num_synced_rpcs(), 1);
assert!(matches!(
conns
.best_consensus_head_connection(&authorization, None, &[], None, None)
rpcs.best_consensus_head_connection(&authorization, None, &[], None, None)
.await,
Ok(OpenRequestResult::Handle(_))
));
assert!(matches!(
conns
.best_consensus_head_connection(&authorization, None, &[], Some(&0.into()), None)
rpcs.best_consensus_head_connection(&authorization, None, &[], Some(&0.into()), None)
.await,
Ok(OpenRequestResult::Handle(_))
));
assert!(matches!(
conns
.best_consensus_head_connection(&authorization, None, &[], Some(&1.into()), None)
rpcs.best_consensus_head_connection(&authorization, None, &[], Some(&1.into()), None)
.await,
Ok(OpenRequestResult::Handle(_))
));
// future block should not get a handle
assert!(matches!(
conns
.best_consensus_head_connection(&authorization, None, &[], Some(&2.into()), None)
rpcs.best_consensus_head_connection(&authorization, None, &[], Some(&2.into()), None)
.await,
Ok(OpenRequestResult::NotReady(true))
Ok(OpenRequestResult::NotReady)
));
}
@ -1522,7 +1496,7 @@ mod tests {
..Default::default()
};
let head_block: Web3ProxyBlock = Arc::new(head_block).into();
let head_block: Web3ProxyBlock = Arc::new(head_block).try_into().unwrap();
let pruned_rpc = Web3Rpc {
name: "pruned".to_string(),
@ -1554,7 +1528,7 @@ mod tests {
let pruned_rpc = Arc::new(pruned_rpc);
let archive_rpc = Arc::new(archive_rpc);
let conns = HashMap::from([
let rpcs_by_name = HashMap::from([
(pruned_rpc.name.clone(), pruned_rpc.clone()),
(archive_rpc.name.clone(), archive_rpc.clone()),
]);
@ -1562,8 +1536,8 @@ mod tests {
let (watch_consensus_rpcs_sender, _) = watch::channel(Default::default());
// TODO: make a Web3Rpcs::new
let conns = Web3Rpcs {
conns,
let rpcs = Web3Rpcs {
by_name: rpcs_by_name,
watch_consensus_head_receiver: None,
watch_consensus_rpcs_sender,
pending_transactions: Cache::builder()
@ -1576,7 +1550,7 @@ mod tests {
.max_capacity(10)
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()),
min_head_rpcs: 1,
min_sum_soft_limit: 3_000,
min_sum_soft_limit: 4_000,
max_block_age: None,
max_block_lag: None,
};
@ -1586,34 +1560,34 @@ mod tests {
let (head_block_sender, _head_block_receiver) = watch::channel(Default::default());
let mut connection_heads = ConsensusFinder::new(&[0, 1, 2, 3], None, None);
conns
.process_block_from_rpc(
&authorization,
&mut connection_heads,
Some(head_block.clone()),
pruned_rpc.clone(),
&head_block_sender,
&None,
)
.await
.unwrap();
conns
.process_block_from_rpc(
&authorization,
&mut connection_heads,
Some(head_block.clone()),
archive_rpc.clone(),
&head_block_sender,
&None,
)
.await
.unwrap();
// min sum soft limit will require tier 2
rpcs.process_block_from_rpc(
&authorization,
&mut connection_heads,
Some(head_block.clone()),
pruned_rpc.clone(),
&head_block_sender,
&None,
)
.await
.unwrap_err();
assert_eq!(conns.num_synced_rpcs(), 2);
rpcs.process_block_from_rpc(
&authorization,
&mut connection_heads,
Some(head_block.clone()),
archive_rpc.clone(),
&head_block_sender,
&None,
)
.await
.unwrap();
assert_eq!(rpcs.num_synced_rpcs(), 2);
// best_synced_backend_connection requires servers to be synced with the head block
// TODO: test with and without passing the head_block.number?
let best_head_server = conns
let best_head_server = rpcs
.best_consensus_head_connection(
&authorization,
None,
@ -1623,12 +1597,14 @@ mod tests {
)
.await;
debug!("best_head_server: {:#?}", best_head_server);
assert!(matches!(
best_head_server.unwrap(),
OpenRequestResult::Handle(_)
));
let best_archive_server = conns
let best_archive_server = rpcs
.best_consensus_head_connection(&authorization, None, &[], Some(&1.into()), None)
.await;

@ -41,6 +41,7 @@ pub struct Web3RpcLatencies {
impl Default for Web3RpcLatencies {
fn default() -> Self {
todo!("use ewma crate, not u32");
Self {
new_head: Histogram::new(3).unwrap(),
new_head_ewma: 0,
@ -525,7 +526,7 @@ impl Web3Rpc {
None
}
Ok(Some(new_head_block)) => {
let new_head_block = Web3ProxyBlock::new(new_head_block);
let new_head_block = Web3ProxyBlock::try_new(new_head_block).unwrap();
let new_hash = *new_head_block.hash();
@ -955,7 +956,7 @@ impl Web3Rpc {
sleep_until(retry_at).await;
}
Ok(OpenRequestResult::NotReady(_)) => {
Ok(OpenRequestResult::NotReady) => {
// TODO: when can this happen? log? emit a stat?
trace!("{} has no handle ready", self);
@ -987,7 +988,7 @@ impl Web3Rpc {
if unlocked_provider.is_some() || self.provider.read().await.is_some() {
// we already have an unlocked provider. no need to lock
} else {
return Ok(OpenRequestResult::NotReady(self.backup));
return Ok(OpenRequestResult::NotReady);
}
if let Some(hard_limit_until) = self.hard_limit_until.as_ref() {
@ -1029,7 +1030,7 @@ impl Web3Rpc {
return Ok(OpenRequestResult::RetryAt(retry_at));
}
RedisRateLimitResult::RetryNever => {
return Ok(OpenRequestResult::NotReady(self.backup));
return Ok(OpenRequestResult::NotReady);
}
}
};
@ -1165,7 +1166,7 @@ mod tests {
let random_block = Arc::new(random_block);
let head_block = Web3ProxyBlock::new(random_block);
let head_block = Web3ProxyBlock::try_new(random_block).unwrap();
let block_data_limit = u64::MAX;
let x = Web3Rpc {
@ -1201,7 +1202,8 @@ mod tests {
timestamp: now,
..Default::default()
})
.into();
.try_into()
.unwrap();
let block_data_limit = 64;

@ -20,9 +20,8 @@ pub enum OpenRequestResult {
Handle(OpenRequestHandle),
/// Unable to start a request. Retry at the given time.
RetryAt(Instant),
/// Unable to start a request because the server is not synced
/// contains "true" if backup servers were attempted
NotReady(bool),
/// Unable to start a request because no servers are synced
NotReady,
}
/// Make RPC requests through this handle and drop it when you are done.