diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 68ef6bb4..bcb54937 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -204,7 +204,7 @@ pub struct Web3ProxyApp { response_cache: ResponseCache, // don't drop this or the sender will stop working // TODO: broadcast channel instead? - watch_consensus_head_receiver: watch::Receiver, + watch_consensus_head_receiver: watch::Receiver>, pending_tx_sender: broadcast::Sender, pub config: AppConfig, pub db_conn: Option, @@ -541,8 +541,7 @@ impl Web3ProxyApp { }; // TODO: i don't like doing Block::default here! Change this to "None"? - let (watch_consensus_head_sender, watch_consensus_head_receiver) = - watch::channel(Web3ProxyBlock::default()); + let (watch_consensus_head_sender, watch_consensus_head_receiver) = watch::channel(None); // TODO: will one receiver lagging be okay? how big should this be? let (pending_tx_sender, pending_tx_receiver) = broadcast::channel(256); @@ -624,7 +623,7 @@ impl Web3ProxyApp { .await .context("spawning private_rpcs")?; - if private_rpcs.conns.is_empty() { + if private_rpcs.by_name.is_empty() { None } else { // save the handle to catch any errors @@ -740,7 +739,7 @@ impl Web3ProxyApp { Ok((app, cancellable_handles, important_background_handles).into()) } - pub fn head_block_receiver(&self) -> watch::Receiver { + pub fn head_block_receiver(&self) -> watch::Receiver> { self.watch_consensus_head_receiver.clone() } @@ -938,7 +937,7 @@ impl Web3ProxyApp { JsonRpcRequestEnum::Single(request) => { let (response, rpcs) = timeout( max_time, - self.proxy_cached_request(&authorization, request, proxy_mode), + self.proxy_cached_request(&authorization, request, proxy_mode, None), ) .await??; @@ -971,10 +970,26 @@ impl Web3ProxyApp { // TODO: spawn so the requests go in parallel? need to think about rate limiting more if we do that // TODO: improve flattening + + // get the head block now so that any requests that need it all use the same block + // TODO: FrontendErrorResponse that handles "no servers synced" in a consistent way + // TODO: this still has an edge condition if there is a reorg in the middle of the request!!! + let head_block_num = self + .balanced_rpcs + .head_block_num() + .context(anyhow::anyhow!("no servers synced"))?; + let responses = join_all( requests .into_iter() - .map(|request| self.proxy_cached_request(authorization, request, proxy_mode)) + .map(|request| { + self.proxy_cached_request( + authorization, + request, + proxy_mode, + Some(head_block_num), + ) + }) .collect::>(), ) .await; @@ -1023,6 +1038,7 @@ impl Web3ProxyApp { authorization: &Arc, mut request: JsonRpcRequest, proxy_mode: ProxyMode, + head_block_num: Option, ) -> Result<(JsonRpcForwardedResponse, Vec>), FrontendErrorResponse> { // trace!("Received request: {:?}", request); @@ -1139,7 +1155,7 @@ impl Web3ProxyApp { serde_json::Value::Array(vec![]) } "eth_blockNumber" => { - match self.balanced_rpcs.head_block_num() { + match head_block_num.or(self.balanced_rpcs.head_block_num()) { Some(head_block_num) => { json!(head_block_num) } @@ -1237,7 +1253,11 @@ impl Web3ProxyApp { (&self.balanced_rpcs, default_num) }; - let head_block_num = self.balanced_rpcs.head_block_num(); + let head_block_num = head_block_num + .or(self.balanced_rpcs.head_block_num()) + .ok_or_else(|| anyhow::anyhow!("no servers synced"))?; + + // TODO: error/wait if no head block! // try_send_all_upstream_servers puts the request id into the response. no need to do that ourselves here. let mut response = private_rpcs @@ -1245,7 +1265,7 @@ impl Web3ProxyApp { authorization, &request, Some(request_metadata.clone()), - head_block_num.as_ref(), + Some(&head_block_num), None, Level::Trace, num, @@ -1440,9 +1460,8 @@ impl Web3ProxyApp { // emit stats // TODO: if no servers synced, wait for them to be synced? probably better to error and let haproxy retry another server - let head_block_num = self - .balanced_rpcs - .head_block_num() + let head_block_num = head_block_num + .or(self.balanced_rpcs.head_block_num()) .context("no servers synced")?; // we do this check before checking caches because it might modify the request params diff --git a/web3_proxy/src/app/ws.rs b/web3_proxy/src/app/ws.rs index e61db2c2..b125a5fa 100644 --- a/web3_proxy/src/app/ws.rs +++ b/web3_proxy/src/app/ws.rs @@ -61,6 +61,12 @@ impl Web3ProxyApp { ); while let Some(new_head) = head_block_receiver.next().await { + let new_head = if let Some(new_head) = new_head { + new_head + } else { + continue; + }; + // TODO: what should the payload for RequestMetadata be? let request_metadata = Arc::new(RequestMetadata::new(REQUEST_PERIOD, 0).unwrap()); diff --git a/web3_proxy/src/bin/web3_proxy_cli/daemon.rs b/web3_proxy/src/bin/web3_proxy_cli/daemon.rs index cf2f4cf8..465e545e 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/daemon.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/daemon.rs @@ -64,7 +64,7 @@ async fn run( )); // wait until the app has seen its first consensus head block - // TODO: if backups were included, wait a little longer + // TODO: if backups were included, wait a little longer? let _ = spawned_app.app.head_block_receiver().changed().await; // start the frontend port diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index 8ce7f495..9aa018a0 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -5,7 +5,7 @@ use super::one::Web3Rpc; use super::transactions::TxStatus; use crate::frontend::authorization::Authorization; use crate::{config::BlockAndRpc, jsonrpc::JsonRpcRequest}; -use anyhow::Context; +use anyhow::{anyhow, Context}; use derive_more::From; use ethers::prelude::{Block, TxHash, H256, U64}; use hashbrown::HashSet; @@ -45,7 +45,11 @@ impl PartialEq for Web3ProxyBlock { impl Web3ProxyBlock { /// A new block has arrived over a subscription - pub fn new(block: ArcBlock) -> Self { + pub fn try_new(block: ArcBlock) -> Option { + if block.number.is_none() || block.hash.is_none() { + return None; + } + let mut x = Self { block, received_age: None, @@ -56,7 +60,7 @@ impl Web3ProxyBlock { // TODO: emit a stat for received_age x.received_age = Some(x.age()); - x + Some(x) } pub fn age(&self) -> u64 { @@ -97,12 +101,20 @@ impl Web3ProxyBlock { } } -impl From for Web3ProxyBlock { - fn from(x: ArcBlock) -> Self { - Web3ProxyBlock { +impl TryFrom for Web3ProxyBlock { + type Error = anyhow::Error; + + fn try_from(x: ArcBlock) -> Result { + if x.number.is_none() || x.hash.is_none() { + return Err(anyhow!("Blocks here must have a number of hash")); + } + + let b = Web3ProxyBlock { block: x, received_age: None, - } + }; + + Ok(b) } } @@ -184,7 +196,13 @@ impl Web3Rpcs { None, ) .await? - .map(Into::into) + .and_then(|x| { + if x.number.is_none() { + None + } else { + x.try_into().ok() + } + }) .context("no block!")?, None => { // TODO: helper for method+params => JsonRpcRequest @@ -208,8 +226,10 @@ impl Web3Rpcs { let block: Option = serde_json::from_str(block.get())?; - // TODO: from isn't great here. received time is going to be weird - block.map(Into::into).context("no block!")? + let block: ArcBlock = block.context("no block in the response")?; + + // TODO: received time is going to be weird + Web3ProxyBlock::try_from(block)? } }; @@ -252,7 +272,11 @@ impl Web3Rpcs { // be sure the requested block num exists // TODO: is this okay? what if we aren't synced?! - let mut head_block_num = *consensus_head_receiver.borrow_and_update().number(); + let mut head_block_num = *consensus_head_receiver + .borrow_and_update() + .as_ref() + .context("no consensus head block")? + .number(); loop { if num <= &head_block_num { @@ -262,7 +286,9 @@ impl Web3Rpcs { trace!("waiting for future block {} > {}", num, head_block_num); consensus_head_receiver.changed().await?; - head_block_num = *consensus_head_receiver.borrow_and_update().number(); + if let Some(head) = consensus_head_receiver.borrow_and_update().as_ref() { + head_block_num = *head.number(); + } } let block_depth = (head_block_num - num).as_u64(); @@ -297,7 +323,7 @@ impl Web3Rpcs { let block: ArcBlock = serde_json::from_str(raw_block.get())?; - let block = Web3ProxyBlock::from(block); + let block = Web3ProxyBlock::try_from(block)?; // the block was fetched using eth_getBlockByNumber, so it should have all fields and be on the heaviest chain let block = self.try_cache_block(block, true).await?; @@ -311,13 +337,13 @@ impl Web3Rpcs { block_receiver: flume::Receiver, // TODO: document that this is a watch sender and not a broadcast! if things get busy, blocks might get missed // Geth's subscriptions have the same potential for skipping blocks. - head_block_sender: watch::Sender, + head_block_sender: watch::Sender>, pending_tx_sender: Option>, ) -> anyhow::Result<()> { // TODO: indexmap or hashmap? what hasher? with_capacity? // TODO: this will grow unbounded. prune old heads on this at the same time we prune the graph? let configured_tiers: Vec = self - .conns + .by_name .values() .map(|x| x.tier) .collect::>() @@ -363,7 +389,7 @@ impl Web3Rpcs { consensus_finder: &mut ConsensusFinder, rpc_head_block: Option, rpc: Arc, - head_block_sender: &watch::Sender, + head_block_sender: &watch::Sender>, pending_tx_sender: &Option>, ) -> anyhow::Result<()> { // TODO: how should we handle an error here? @@ -392,12 +418,11 @@ impl Web3Rpcs { let backups_needed = new_synced_connections.backups_needed; let consensus_head_block = new_synced_connections.head_block.clone(); let num_consensus_rpcs = new_synced_connections.num_conns(); - let num_checked_rpcs = 0; // TODO: figure this out let num_active_rpcs = consensus_finder .all_rpcs_group() .map(|x| x.len()) .unwrap_or_default(); - let total_rpcs = self.conns.len(); + let total_rpcs = self.by_name.len(); let old_consensus_head_connections = self .watch_consensus_rpcs_sender @@ -409,10 +434,9 @@ impl Web3Rpcs { match &old_consensus_head_connections.head_block { None => { debug!( - "first {}{}/{}/{}/{} block={}, rpc={}", + "first {}{}/{}/{} block={}, rpc={}", backups_voted_str, num_consensus_rpcs, - num_checked_rpcs, num_active_rpcs, total_rpcs, consensus_head_block, @@ -429,7 +453,7 @@ impl Web3Rpcs { self.try_cache_block(consensus_head_block, true).await?; head_block_sender - .send(consensus_head_block) + .send(Some(consensus_head_block)) .context("head_block_sender sending consensus_head_block")?; } Some(old_head_block) => { @@ -445,10 +469,9 @@ impl Web3Rpcs { // no change in hash. no need to use head_block_sender // TODO: trace level if rpc is backup debug!( - "con {}{}/{}/{}/{} con={} rpc={}@{}", + "con {}{}/{}/{} con={} rpc={}@{}", backups_voted_str, num_consensus_rpcs, - num_checked_rpcs, num_active_rpcs, total_rpcs, consensus_head_block, @@ -463,10 +486,9 @@ impl Web3Rpcs { } debug!( - "unc {}{}/{}/{}/{} con_head={} old={} rpc={}@{}", + "unc {}{}/{}/{} con_head={} old={} rpc={}@{}", backups_voted_str, num_consensus_rpcs, - num_checked_rpcs, num_active_rpcs, total_rpcs, consensus_head_block, @@ -481,7 +503,7 @@ impl Web3Rpcs { .context("save consensus_head_block as heaviest chain")?; head_block_sender - .send(consensus_head_block) + .send(Some(consensus_head_block)) .context("head_block_sender sending consensus_head_block")?; } } @@ -489,10 +511,9 @@ impl Web3Rpcs { // this is unlikely but possible // TODO: better log warn!( - "chain rolled back {}{}/{}/{}/{} con={} old={} rpc={}@{}", + "chain rolled back {}{}/{}/{} con={} old={} rpc={}@{}", backups_voted_str, num_consensus_rpcs, - num_checked_rpcs, num_active_rpcs, total_rpcs, consensus_head_block, @@ -515,15 +536,14 @@ impl Web3Rpcs { )?; head_block_sender - .send(consensus_head_block) + .send(Some(consensus_head_block)) .context("head_block_sender sending consensus_head_block")?; } Ordering::Greater => { debug!( - "new {}{}/{}/{}/{} con={} rpc={}@{}", + "new {}{}/{}/{} con={} rpc={}@{}", backups_voted_str, num_consensus_rpcs, - num_checked_rpcs, num_active_rpcs, total_rpcs, consensus_head_block, @@ -539,7 +559,7 @@ impl Web3Rpcs { let consensus_head_block = self.try_cache_block(consensus_head_block, true).await?; - head_block_sender.send(consensus_head_block)?; + head_block_sender.send(Some(consensus_head_block))?; } } } @@ -550,23 +570,23 @@ impl Web3Rpcs { .map(|x| x.to_string()) .unwrap_or_else(|| "None".to_string()); - if num_checked_rpcs >= self.min_head_rpcs { + if num_active_rpcs >= self.min_head_rpcs { + // no consensus!!! error!( - "non {}{}/{}/{}/{} rpc={}@{}", + "non {}{}/{}/{} rpc={}@{}", backups_voted_str, num_consensus_rpcs, - num_checked_rpcs, num_active_rpcs, total_rpcs, rpc, rpc_head_str, ); } else { + // no consensus, but we do not have enough rpcs connected yet to panic debug!( - "non {}{}/{}/{}/{} rpc={}@{}", + "non {}{}/{}/{} rpc={}@{}", backups_voted_str, num_consensus_rpcs, - num_checked_rpcs, num_active_rpcs, total_rpcs, rpc, diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index bcaf1f56..847892cf 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -19,18 +19,18 @@ pub struct ConsensusWeb3Rpcs { pub(super) head_block: Option, // TODO: this should be able to serialize, but it isn't #[serde(skip_serializing)] - pub(super) conns: Vec>, + pub(super) rpcs: Vec>, pub(super) backups_voted: Option, pub(super) backups_needed: bool, } impl ConsensusWeb3Rpcs { pub fn num_conns(&self) -> usize { - self.conns.len() + self.rpcs.len() } pub fn sum_soft_limit(&self) -> u32 { - self.conns.iter().fold(0, |sum, rpc| sum + rpc.soft_limit) + self.rpcs.iter().fold(0, |sum, rpc| sum + rpc.soft_limit) } // TODO: sum_hard_limit? @@ -42,7 +42,7 @@ impl fmt::Debug for ConsensusWeb3Rpcs { // TODO: print the actual conns? f.debug_struct("ConsensusConnections") .field("head_block", &self.head_block) - .field("num_conns", &self.conns.len()) + .field("num_conns", &self.rpcs.len()) .finish_non_exhaustive() } } @@ -52,7 +52,7 @@ impl Web3Rpcs { pub fn head_block(&self) -> Option { self.watch_consensus_head_receiver .as_ref() - .map(|x| x.borrow().clone()) + .and_then(|x| x.borrow().clone()) } // TODO: return a ref? @@ -66,11 +66,11 @@ impl Web3Rpcs { } pub fn synced(&self) -> bool { - !self.watch_consensus_rpcs_sender.borrow().conns.is_empty() + !self.watch_consensus_rpcs_sender.borrow().rpcs.is_empty() } pub fn num_synced_rpcs(&self) -> usize { - self.watch_consensus_rpcs_sender.borrow().conns.len() + self.watch_consensus_rpcs_sender.borrow().rpcs.len() } } @@ -243,7 +243,7 @@ impl ConnectionsGroup { continue; } - if let Some(rpc) = web3_rpcs.conns.get(rpc_name.as_str()) { + if let Some(rpc) = web3_rpcs.by_name.get(rpc_name.as_str()) { if backup_rpcs_voted.is_some() { // backups already voted for a head block. don't change it } else { @@ -257,7 +257,7 @@ impl ConnectionsGroup { } else { // i don't think this is an error. i think its just if a reconnect is currently happening warn!("connection missing: {}", rpc_name); - debug!("web3_rpcs.conns: {:#?}", web3_rpcs.conns); + debug!("web3_rpcs.conns: {:#?}", web3_rpcs.by_name); } } @@ -340,7 +340,7 @@ impl ConnectionsGroup { // success! this block has enough soft limit and nodes on it (or on later blocks) let conns: Vec> = primary_consensus_rpcs .into_iter() - .filter_map(|conn_name| web3_rpcs.conns.get(conn_name).cloned()) + .filter_map(|conn_name| web3_rpcs.by_name.get(conn_name).cloned()) .collect(); #[cfg(debug_assertions)] @@ -350,7 +350,7 @@ impl ConnectionsGroup { Ok(ConsensusWeb3Rpcs { head_block: Some(maybe_head_block), - conns, + rpcs: conns, backups_voted: backup_rpcs_voted, backups_needed: primary_rpcs_voted.is_none(), }) @@ -528,7 +528,7 @@ impl ConsensusFinder { // TODO: find the best tier with a connectionsgroup. best case, this only queries the first tier // TODO: do we need to calculate all of them? I think having highest_known_block included as part of min_block_num should make that unnecessary for (i, x) in self.tiers.iter() { - trace!("checking tier {}", i); + trace!("checking tier {}: {:#?}", i, x.rpc_name_to_block); if let Ok(consensus_head_connections) = x .consensus_head_connections(authorization, web3_connections, min_block_num) .await @@ -543,3 +543,11 @@ impl ConsensusFinder { return Err(anyhow::anyhow!("failed finding consensus on all tiers")); } } + +#[cfg(test)] +mod test { + #[test] + fn test_simplest_case_consensus_head_connections() { + todo!(); + } +} diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 2a3bd24a..c449b241 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -16,6 +16,7 @@ use futures::future::try_join_all; use futures::stream::FuturesUnordered; use futures::StreamExt; use hashbrown::{HashMap, HashSet}; +use itertools::Itertools; use log::{debug, error, info, trace, warn, Level}; use migration::sea_orm::DatabaseConnection; use moka::future::{Cache, ConcurrentCacheExt}; @@ -23,6 +24,7 @@ use serde::ser::{SerializeStruct, Serializer}; use serde::Serialize; use serde_json::json; use serde_json::value::RawValue; +use std::cmp::min_by_key; use std::collections::BTreeMap; use std::sync::atomic::Ordering; use std::sync::Arc; @@ -36,11 +38,11 @@ use tokio::time::{interval, sleep, sleep_until, Duration, Instant, MissedTickBeh #[derive(From)] pub struct Web3Rpcs { /// any requests will be forwarded to one (or more) of these connections - pub(crate) conns: HashMap>, + pub(crate) by_name: HashMap>, /// all providers with the same consensus head block. won't update if there is no `self.watch_consensus_head_sender` pub(super) watch_consensus_rpcs_sender: watch::Sender>, /// this head receiver makes it easy to wait until there is a new block - pub(super) watch_consensus_head_receiver: Option>, + pub(super) watch_consensus_head_receiver: Option>>, pub(super) pending_transactions: Cache, /// TODO: this map is going to grow forever unless we do some sort of pruning. maybe store pruned in redis? @@ -74,7 +76,7 @@ impl Web3Rpcs { pending_tx_sender: Option>, redis_pool: Option, server_configs: HashMap, - watch_consensus_head_sender: Option>, + watch_consensus_head_sender: Option>>, ) -> anyhow::Result<(Arc, AnyhowJoinHandle<()>)> { let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded(); let (block_sender, block_receiver) = flume::unbounded::(); @@ -219,7 +221,7 @@ impl Web3Rpcs { watch_consensus_head_sender.as_ref().map(|x| x.subscribe()); let connections = Arc::new(Self { - conns: connections, + by_name: connections, watch_consensus_rpcs_sender: watch_consensus_connections_sender, watch_consensus_head_receiver, pending_transactions, @@ -253,7 +255,7 @@ impl Web3Rpcs { } pub fn get(&self, conn_name: &str) -> Option<&Arc> { - self.conns.get(conn_name) + self.by_name.get(conn_name) } /// subscribe to blocks and transactions from all the backend rpcs. @@ -264,7 +266,7 @@ impl Web3Rpcs { authorization: Arc, pending_tx_id_receiver: flume::Receiver, block_receiver: flume::Receiver, - head_block_sender: Option>, + head_block_sender: Option>>, pending_tx_sender: Option>, ) -> anyhow::Result<()> { let mut futures = vec![]; @@ -426,70 +428,64 @@ impl Web3Rpcs { min_block_needed: Option<&U64>, max_block_needed: Option<&U64>, ) -> anyhow::Result { - if let Ok(without_backups) = self - ._best_consensus_head_connection( - false, - authorization, - request_metadata, - skip, - min_block_needed, - max_block_needed, - ) - .await - { - // TODO: this might use backups too eagerly. but even when we allow backups, we still prioritize our own - if matches!(without_backups, OpenRequestResult::Handle(_)) { - return Ok(without_backups); - } - } - - self._best_consensus_head_connection( - true, - authorization, - request_metadata, - skip, - min_block_needed, - max_block_needed, - ) - .await - } - - /// get the best available rpc server with the consensus head block. it might have blocks after the consensus head - async fn _best_consensus_head_connection( - &self, - allow_backups: bool, - authorization: &Arc, - request_metadata: Option<&Arc>, - skip: &[Arc], - min_block_needed: Option<&U64>, - max_block_needed: Option<&U64>, - ) -> anyhow::Result { - let usable_rpcs_by_head_num_and_weight: BTreeMap<(Option, u64), Vec>> = { + let usable_rpcs_by_tier_and_head_number: BTreeMap<(u64, Option), Vec>> = { let synced_connections = self.watch_consensus_rpcs_sender.borrow().clone(); - let head_block_num = if let Some(head_block) = synced_connections.head_block.as_ref() { - head_block.number() - } else { - // TODO: optionally wait for a head block >= min_block_needed - return Ok(OpenRequestResult::NotReady(allow_backups)); + let (head_block_num, head_block_age) = + if let Some(head_block) = synced_connections.head_block.as_ref() { + (head_block.number(), head_block.age()) + } else { + // TODO: optionally wait for a head_block.number() >= min_block_needed + // TODO: though i think that wait would actually need to be earlier in the request + return Ok(OpenRequestResult::NotReady); + }; + + let needed_blocks_comparison = match (min_block_needed, max_block_needed) { + (None, None) => { + // no required block given. treat this like the requested the consensus head block + cmp::Ordering::Equal + } + (None, Some(max_block_needed)) => max_block_needed.cmp(head_block_num), + (Some(min_block_needed), None) => min_block_needed.cmp(head_block_num), + (Some(min_block_needed), Some(max_block_needed)) => { + match min_block_needed.cmp(max_block_needed) { + cmp::Ordering::Equal => min_block_needed.cmp(head_block_num), + cmp::Ordering::Greater => { + return Err(anyhow::anyhow!( + "Invalid blocks bounds requested. min ({}) > max ({})", + min_block_needed, + max_block_needed + )) + } + cmp::Ordering::Less => { + // hmmmm + todo!("now what do we do?"); + } + } + } }; - let min_block_needed = min_block_needed.unwrap_or(&head_block_num); - + // collect "usable_rpcs_by_head_num_and_weight" + // TODO: MAKE SURE None SORTS LAST? let mut m = BTreeMap::new(); - match min_block_needed.cmp(&head_block_num) { + match needed_blocks_comparison { cmp::Ordering::Less => { - // need an old block. check all the rpcs. prefer the most synced + // need an old block. check all the rpcs. ignore rpcs that are still syncing + + let min_block_age = + self.max_block_age.map(|x| head_block_age.saturating_sub(x)); + let min_sync_num = self.max_block_lag.map(|x| head_block_num.saturating_sub(x)); + + // TODO: cache this somehow? + // TODO: maybe have a helper on synced_connections? that way sum_soft_limits/min_synced_rpcs will be DRY for x in self - .conns + .by_name .values() .filter(|x| { - if !allow_backups && x.backup { - false - } else if skip.contains(x) { - false - } else if !x.has_block_data(min_block_needed) { + // TODO: move a bunch of this onto a rpc.is_synced function + if skip.contains(x) { + // we've already tried this server or have some other reason to skip it false } else if max_block_needed .and_then(|max_block_needed| { @@ -497,8 +493,18 @@ impl Web3Rpcs { }) .unwrap_or(false) { + // server does not have the max block + false + } else if min_block_needed + .and_then(|min_block_needed| { + Some(!x.has_block_data(min_block_needed)) + }) + .unwrap_or(false) + { + // server does not have the min block false } else { + // server has the block we need! true } }) @@ -506,32 +512,43 @@ impl Web3Rpcs { { let x_head_block = x.head_block.read().clone(); - match x_head_block { - None => continue, - Some(x_head) => { - let key = (Some(*x_head.number()), u64::MAX - x.tier); + if let Some(x_head) = x_head_block { + // TODO: should nodes that are ahead of the consensus block have priority? seems better to spread the load + let x_head_num = x_head.number().min(head_block_num); - m.entry(key).or_insert_with(Vec::new).push(x); + // TODO: do we really need to check head_num and age? + if let Some(min_sync_num) = min_sync_num.as_ref() { + if x_head_num < min_sync_num { + continue; + } } + if let Some(min_block_age) = min_block_age { + if x_head.age() < min_block_age { + // rpc is still syncing + continue; + } + } + + let key = (x.tier, Some(*x_head_num)); + + m.entry(key).or_insert_with(Vec::new).push(x); } } + + // TODO: check min_synced_rpcs and min_sum_soft_limits? or maybe better to just try to serve the request? } cmp::Ordering::Equal => { // need the consensus head block. filter the synced rpcs - // TODO: this doesn't properly check the allow_backups variable! - for x in synced_connections - .conns - .iter() - .filter(|x| !skip.contains(x)) - { - let key = (None, u64::MAX - x.tier); + for x in synced_connections.rpcs.iter().filter(|x| !skip.contains(x)) { + // the key doesn't matter if we are checking synced connections. its already sized to what we need + let key = (0, None); m.entry(key).or_insert_with(Vec::new).push(x.clone()); } } cmp::Ordering::Greater => { - // TODO? if the blocks is close, wait for change on a watch_consensus_connections_receiver().subscribe() - return Ok(OpenRequestResult::NotReady(allow_backups)); + // TODO? if the blocks is close, maybe we could wait for change on a watch_consensus_connections_receiver().subscribe() + return Ok(OpenRequestResult::NotReady); } } @@ -540,42 +557,24 @@ impl Web3Rpcs { let mut earliest_retry_at = None; - for usable_rpcs in usable_rpcs_by_head_num_and_weight.into_values().rev() { - // we sort on a combination of values. cache them here so that we don't do this math multiple times. - // TODO: is this necessary if we use sort_by_cached_key? - let available_request_map: HashMap<_, f64> = usable_rpcs - .iter() - .map(|rpc| { - // TODO: weighted sort by remaining hard limit? - // TODO: weighted sort by soft_limit - ewma_active_requests? that assumes soft limits are any good - (rpc, 1.0) - }) - .collect(); - - warn!("todo: better sort here"); - - let sorted_rpcs = { - if usable_rpcs.len() == 1 { - // TODO: try the next tier - vec![usable_rpcs.get(0).expect("there should be 1")] - } else { - let mut rng = thread_fast_rng::thread_fast_rng(); - - usable_rpcs - .choose_multiple_weighted(&mut rng, usable_rpcs.len(), |rpc| { - *available_request_map - .get(rpc) - .expect("rpc should always be in available_request_map") - }) - .unwrap() - .collect::>() - } + for mut usable_rpcs in usable_rpcs_by_tier_and_head_number.into_values() { + // sort the tier randomly + if usable_rpcs.len() == 1 { + // TODO: include an rpc from the next tier? + } else { + // we can't get the rng outside of this loop because it is not Send + // this function should be pretty fast anyway, so it shouldn't matter too much + let mut rng = thread_fast_rng::thread_fast_rng(); + usable_rpcs.shuffle(&mut rng); }; - // now that the rpcs are sorted, try to get an active request handle for one of them - // TODO: pick two randomly and choose the one with the lower rpc.latency.ewma - for best_rpc in sorted_rpcs.into_iter() { - // increment our connection counter + // now that the rpcs are shuffled, try to get an active request handle for one of them + // pick the first two and try the one with the lower rpc.latency.ewma + // TODO: chunks or tuple windows? + for (rpc_a, rpc_b) in usable_rpcs.into_iter().circular_tuple_windows() { + let best_rpc = min_by_key(rpc_a, rpc_b, |x| x.latency.request_ewma); + + // just because it has lower latency doesn't mean we are sure to get a connection match best_rpc.try_request_handle(authorization, None).await { Ok(OpenRequestResult::Handle(handle)) => { // trace!("opened handle: {}", best_rpc); @@ -584,7 +583,7 @@ impl Web3Rpcs { Ok(OpenRequestResult::RetryAt(retry_at)) => { earliest_retry_at = earliest_retry_at.min(Some(retry_at)); } - Ok(OpenRequestResult::NotReady(_)) => { + Ok(OpenRequestResult::NotReady) => { // TODO: log a warning? emit a stat? } Err(err) => { @@ -614,7 +613,7 @@ impl Web3Rpcs { // TODO: should we log here? - Ok(OpenRequestResult::NotReady(allow_backups)) + Ok(OpenRequestResult::NotReady) } Some(earliest_retry_at) => { warn!("no servers on {:?}! {:?}", self, earliest_retry_at); @@ -676,19 +675,19 @@ impl Web3Rpcs { let mut max_count = if let Some(max_count) = max_count { max_count } else { - self.conns.len() + self.by_name.len() }; let mut tried = HashSet::new(); - let mut synced_conns = self.watch_consensus_rpcs_sender.borrow().conns.clone(); + let mut synced_conns = self.watch_consensus_rpcs_sender.borrow().rpcs.clone(); // synced connections are all on the same block. sort them by tier with higher soft limits first synced_conns.sort_by_cached_key(rpc_sync_status_sort_key); // if there aren't enough synced connections, include more connections // TODO: only do this sorting if the synced_conns isn't enough - let mut all_conns: Vec<_> = self.conns.values().cloned().collect(); + let mut all_conns: Vec<_> = self.by_name.values().cloned().collect(); all_conns.sort_by_cached_key(rpc_sync_status_sort_key); for connection in itertools::chain(synced_conns, all_conns) { @@ -728,7 +727,7 @@ impl Web3Rpcs { max_count -= 1; selected_rpcs.push(handle) } - Ok(OpenRequestResult::NotReady(_)) => { + Ok(OpenRequestResult::NotReady) => { warn!("no request handle for {}", connection) } Err(err) => { @@ -767,7 +766,7 @@ impl Web3Rpcs { loop { let num_skipped = skip_rpcs.len(); - if num_skipped == self.conns.len() { + if num_skipped == self.by_name.len() { break; } @@ -918,7 +917,7 @@ impl Web3Rpcs { } } } - OpenRequestResult::NotReady(backups_included) => { + OpenRequestResult::NotReady => { if let Some(request_metadata) = request_metadata { request_metadata.no_servers.fetch_add(1, Ordering::Release); } @@ -930,7 +929,7 @@ impl Web3Rpcs { if let Some(min_block_needed) = min_block_needed { let mut theres_a_chance = false; - for potential_conn in self.conns.values() { + for potential_conn in self.by_name.values() { if skip_rpcs.contains(potential_conn) { continue; } @@ -951,23 +950,10 @@ impl Web3Rpcs { } } - if backups_included { - // if NotReady and we tried backups, there's no chance - warn!("No servers ready even after checking backups"); - break; - } + debug!("No servers ready. Waiting up for change in synced servers"); - debug!("No servers ready. Waiting up to 1 second for change in synced servers"); - - // TODO: exponential backoff? - tokio::select! { - _ = sleep(Duration::from_secs(1)) => { - // do NOT pop the last rpc off skip here - } - _ = watch_consensus_connections.changed() => { - watch_consensus_connections.borrow_and_update(); - } - } + watch_consensus_connections.changed().await?; + watch_consensus_connections.borrow_and_update(); } } } @@ -984,7 +970,7 @@ impl Web3Rpcs { .store(true, Ordering::Release); } - let num_conns = self.conns.len(); + let num_conns = self.by_name.len(); let num_skipped = skip_rpcs.len(); if num_skipped == 0 { @@ -1135,7 +1121,7 @@ impl fmt::Debug for Web3Rpcs { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { // TODO: the default formatter takes forever to write. this is too quiet though f.debug_struct("Web3Rpcs") - .field("conns", &self.conns) + .field("rpcs", &self.by_name) .finish_non_exhaustive() } } @@ -1147,8 +1133,8 @@ impl Serialize for Web3Rpcs { { let mut state = serializer.serialize_struct("Web3Rpcs", 6)?; - let conns: Vec<&Web3Rpc> = self.conns.values().map(|x| x.as_ref()).collect(); - state.serialize_field("conns", &conns)?; + let rpcs: Vec<&Web3Rpc> = self.by_name.values().map(|x| x.as_ref()).collect(); + state.serialize_field("rpcs", &rpcs)?; { let consensus_connections = self.watch_consensus_rpcs_sender.borrow().clone(); @@ -1218,7 +1204,7 @@ mod tests { let blocks: Vec<_> = [block_0, block_1, block_2] .into_iter() - .map(|x| Web3ProxyBlock::new(Arc::new(x))) + .map(|x| Web3ProxyBlock::try_new(Arc::new(x)).unwrap()) .collect(); let mut rpcs: Vec<_> = [ @@ -1303,8 +1289,8 @@ mod tests { let lagged_block = Arc::new(lagged_block); let head_block = Arc::new(head_block); - let mut lagged_block: Web3ProxyBlock = lagged_block.into(); - let mut head_block: Web3ProxyBlock = head_block.into(); + let mut lagged_block: Web3ProxyBlock = lagged_block.try_into().unwrap(); + let mut head_block: Web3ProxyBlock = head_block.try_into().unwrap(); let block_data_limit = u64::MAX; @@ -1341,7 +1327,7 @@ mod tests { let head_rpc = Arc::new(head_rpc); let lagged_rpc = Arc::new(lagged_rpc); - let conns = HashMap::from([ + let rpcs_by_name = HashMap::from([ (head_rpc.name.clone(), head_rpc.clone()), (lagged_rpc.name.clone(), lagged_rpc.clone()), ]); @@ -1349,8 +1335,8 @@ mod tests { let (watch_consensus_rpcs_sender, _) = watch::channel(Default::default()); // TODO: make a Web3Rpcs::new - let conns = Web3Rpcs { - conns, + let rpcs = Web3Rpcs { + by_name: rpcs_by_name, watch_consensus_head_receiver: None, watch_consensus_rpcs_sender, pending_transactions: Cache::builder() @@ -1376,38 +1362,33 @@ mod tests { let mut consensus_finder = ConsensusFinder::new(&[0, 1, 2, 3], None, None); // process None so that - conns - .process_block_from_rpc( - &authorization, - &mut consensus_finder, - None, - lagged_rpc.clone(), - &head_block_sender, - &None, - ) - .await - .expect( - "its lagged, but it should still be seen as consensus if its the first to report", - ); - conns - .process_block_from_rpc( - &authorization, - &mut consensus_finder, - None, - head_rpc.clone(), - &head_block_sender, - &None, - ) - .await - .unwrap(); + rpcs.process_block_from_rpc( + &authorization, + &mut consensus_finder, + None, + lagged_rpc.clone(), + &head_block_sender, + &None, + ) + .await + .expect("its lagged, but it should still be seen as consensus if its the first to report"); + rpcs.process_block_from_rpc( + &authorization, + &mut consensus_finder, + None, + head_rpc.clone(), + &head_block_sender, + &None, + ) + .await + .unwrap(); // no head block because the rpcs haven't communicated through their channels - assert!(conns.head_block_hash().is_none()); + assert!(rpcs.head_block_hash().is_none()); // all_backend_connections gives all non-backup servers regardless of sync status assert_eq!( - conns - .all_connections(&authorization, None, None, None, false) + rpcs.all_connections(&authorization, None, None, None, false) .await .unwrap() .len(), @@ -1415,87 +1396,80 @@ mod tests { ); // best_synced_backend_connection requires servers to be synced with the head block - let x = conns + let x = rpcs .best_consensus_head_connection(&authorization, None, &[], None, None) .await .unwrap(); dbg!(&x); - assert!(matches!(x, OpenRequestResult::NotReady(true))); + assert!(matches!(x, OpenRequestResult::NotReady)); - // add lagged blocks to the conns. both servers should be allowed - lagged_block = conns.try_cache_block(lagged_block, true).await.unwrap(); + // add lagged blocks to the rpcs. both servers should be allowed + lagged_block = rpcs.try_cache_block(lagged_block, true).await.unwrap(); - conns - .process_block_from_rpc( - &authorization, - &mut consensus_finder, - Some(lagged_block.clone()), - lagged_rpc, - &head_block_sender, - &None, - ) - .await - .unwrap(); - conns - .process_block_from_rpc( - &authorization, - &mut consensus_finder, - Some(lagged_block.clone()), - head_rpc.clone(), - &head_block_sender, - &None, - ) - .await - .unwrap(); + rpcs.process_block_from_rpc( + &authorization, + &mut consensus_finder, + Some(lagged_block.clone()), + lagged_rpc, + &head_block_sender, + &None, + ) + .await + .unwrap(); + rpcs.process_block_from_rpc( + &authorization, + &mut consensus_finder, + Some(lagged_block.clone()), + head_rpc.clone(), + &head_block_sender, + &None, + ) + .await + .unwrap(); - assert_eq!(conns.num_synced_rpcs(), 2); + assert_eq!(rpcs.num_synced_rpcs(), 2); - // add head block to the conns. lagged_rpc should not be available - head_block = conns.try_cache_block(head_block, true).await.unwrap(); + // add head block to the rpcs. lagged_rpc should not be available + head_block = rpcs.try_cache_block(head_block, true).await.unwrap(); - conns - .process_block_from_rpc( - &authorization, - &mut consensus_finder, - Some(head_block.clone()), - head_rpc, - &head_block_sender, - &None, - ) - .await - .unwrap(); + rpcs.process_block_from_rpc( + &authorization, + &mut consensus_finder, + Some(head_block.clone()), + head_rpc, + &head_block_sender, + &None, + ) + .await + .unwrap(); - assert_eq!(conns.num_synced_rpcs(), 1); + assert_eq!(rpcs.num_synced_rpcs(), 1); assert!(matches!( - conns - .best_consensus_head_connection(&authorization, None, &[], None, None) + rpcs.best_consensus_head_connection(&authorization, None, &[], None, None) .await, Ok(OpenRequestResult::Handle(_)) )); assert!(matches!( - conns - .best_consensus_head_connection(&authorization, None, &[], Some(&0.into()), None) + rpcs.best_consensus_head_connection(&authorization, None, &[], Some(&0.into()), None) .await, Ok(OpenRequestResult::Handle(_)) )); assert!(matches!( - conns - .best_consensus_head_connection(&authorization, None, &[], Some(&1.into()), None) + rpcs.best_consensus_head_connection(&authorization, None, &[], Some(&1.into()), None) .await, Ok(OpenRequestResult::Handle(_)) )); // future block should not get a handle assert!(matches!( - conns - .best_consensus_head_connection(&authorization, None, &[], Some(&2.into()), None) + rpcs.best_consensus_head_connection(&authorization, None, &[], Some(&2.into()), None) .await, - Ok(OpenRequestResult::NotReady(true)) + Ok(OpenRequestResult::NotReady) )); } @@ -1522,7 +1496,7 @@ mod tests { ..Default::default() }; - let head_block: Web3ProxyBlock = Arc::new(head_block).into(); + let head_block: Web3ProxyBlock = Arc::new(head_block).try_into().unwrap(); let pruned_rpc = Web3Rpc { name: "pruned".to_string(), @@ -1554,7 +1528,7 @@ mod tests { let pruned_rpc = Arc::new(pruned_rpc); let archive_rpc = Arc::new(archive_rpc); - let conns = HashMap::from([ + let rpcs_by_name = HashMap::from([ (pruned_rpc.name.clone(), pruned_rpc.clone()), (archive_rpc.name.clone(), archive_rpc.clone()), ]); @@ -1562,8 +1536,8 @@ mod tests { let (watch_consensus_rpcs_sender, _) = watch::channel(Default::default()); // TODO: make a Web3Rpcs::new - let conns = Web3Rpcs { - conns, + let rpcs = Web3Rpcs { + by_name: rpcs_by_name, watch_consensus_head_receiver: None, watch_consensus_rpcs_sender, pending_transactions: Cache::builder() @@ -1576,7 +1550,7 @@ mod tests { .max_capacity(10) .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()), min_head_rpcs: 1, - min_sum_soft_limit: 3_000, + min_sum_soft_limit: 4_000, max_block_age: None, max_block_lag: None, }; @@ -1586,34 +1560,34 @@ mod tests { let (head_block_sender, _head_block_receiver) = watch::channel(Default::default()); let mut connection_heads = ConsensusFinder::new(&[0, 1, 2, 3], None, None); - conns - .process_block_from_rpc( - &authorization, - &mut connection_heads, - Some(head_block.clone()), - pruned_rpc.clone(), - &head_block_sender, - &None, - ) - .await - .unwrap(); - conns - .process_block_from_rpc( - &authorization, - &mut connection_heads, - Some(head_block.clone()), - archive_rpc.clone(), - &head_block_sender, - &None, - ) - .await - .unwrap(); + // min sum soft limit will require tier 2 + rpcs.process_block_from_rpc( + &authorization, + &mut connection_heads, + Some(head_block.clone()), + pruned_rpc.clone(), + &head_block_sender, + &None, + ) + .await + .unwrap_err(); - assert_eq!(conns.num_synced_rpcs(), 2); + rpcs.process_block_from_rpc( + &authorization, + &mut connection_heads, + Some(head_block.clone()), + archive_rpc.clone(), + &head_block_sender, + &None, + ) + .await + .unwrap(); + + assert_eq!(rpcs.num_synced_rpcs(), 2); // best_synced_backend_connection requires servers to be synced with the head block // TODO: test with and without passing the head_block.number? - let best_head_server = conns + let best_head_server = rpcs .best_consensus_head_connection( &authorization, None, @@ -1623,12 +1597,14 @@ mod tests { ) .await; + debug!("best_head_server: {:#?}", best_head_server); + assert!(matches!( best_head_server.unwrap(), OpenRequestResult::Handle(_) )); - let best_archive_server = conns + let best_archive_server = rpcs .best_consensus_head_connection(&authorization, None, &[], Some(&1.into()), None) .await; diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index 8b4decc4..5b030bad 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -41,6 +41,7 @@ pub struct Web3RpcLatencies { impl Default for Web3RpcLatencies { fn default() -> Self { + todo!("use ewma crate, not u32"); Self { new_head: Histogram::new(3).unwrap(), new_head_ewma: 0, @@ -525,7 +526,7 @@ impl Web3Rpc { None } Ok(Some(new_head_block)) => { - let new_head_block = Web3ProxyBlock::new(new_head_block); + let new_head_block = Web3ProxyBlock::try_new(new_head_block).unwrap(); let new_hash = *new_head_block.hash(); @@ -955,7 +956,7 @@ impl Web3Rpc { sleep_until(retry_at).await; } - Ok(OpenRequestResult::NotReady(_)) => { + Ok(OpenRequestResult::NotReady) => { // TODO: when can this happen? log? emit a stat? trace!("{} has no handle ready", self); @@ -987,7 +988,7 @@ impl Web3Rpc { if unlocked_provider.is_some() || self.provider.read().await.is_some() { // we already have an unlocked provider. no need to lock } else { - return Ok(OpenRequestResult::NotReady(self.backup)); + return Ok(OpenRequestResult::NotReady); } if let Some(hard_limit_until) = self.hard_limit_until.as_ref() { @@ -1029,7 +1030,7 @@ impl Web3Rpc { return Ok(OpenRequestResult::RetryAt(retry_at)); } RedisRateLimitResult::RetryNever => { - return Ok(OpenRequestResult::NotReady(self.backup)); + return Ok(OpenRequestResult::NotReady); } } }; @@ -1165,7 +1166,7 @@ mod tests { let random_block = Arc::new(random_block); - let head_block = Web3ProxyBlock::new(random_block); + let head_block = Web3ProxyBlock::try_new(random_block).unwrap(); let block_data_limit = u64::MAX; let x = Web3Rpc { @@ -1201,7 +1202,8 @@ mod tests { timestamp: now, ..Default::default() }) - .into(); + .try_into() + .unwrap(); let block_data_limit = 64; diff --git a/web3_proxy/src/rpcs/request.rs b/web3_proxy/src/rpcs/request.rs index 2c66307e..b3f4864a 100644 --- a/web3_proxy/src/rpcs/request.rs +++ b/web3_proxy/src/rpcs/request.rs @@ -20,9 +20,8 @@ pub enum OpenRequestResult { Handle(OpenRequestHandle), /// Unable to start a request. Retry at the given time. RetryAt(Instant), - /// Unable to start a request because the server is not synced - /// contains "true" if backup servers were attempted - NotReady(bool), + /// Unable to start a request because no servers are synced + NotReady, } /// Make RPC requests through this handle and drop it when you are done.