diff --git a/TODO.md b/TODO.md index 2f4b8754..afe04f5d 100644 --- a/TODO.md +++ b/TODO.md @@ -305,6 +305,8 @@ These are not yet ordered. There might be duplicates. We might not actually need - [x] status page should show version - [x] combine the proxy and cli into one bin - [x] improve rate limiting on websockets +- [x] retry another server if we get a jsonrpc response error about rate limits +- [x] major refactor to only use backup servers when absolutely necessary - [-] proxy mode for benchmarking all backends - [-] proxy mode for sending to multiple backends - [-] let users choose a % of reverts to log (or maybe x/second). someone like curve logging all reverts will be a BIG database very quickly diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 8c227695..055694f3 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -727,6 +727,10 @@ impl Web3ProxyApp { Ok((app, cancellable_handles, important_background_handles).into()) } + pub fn head_block_receiver(&self) -> watch::Receiver { + self.head_block_receiver.clone() + } + pub async fn prometheus_metrics(&self) -> String { let globals = HashMap::new(); // TODO: what globals? should this be the hostname or what? diff --git a/web3_proxy/src/bin/web3_proxy_cli/daemon.rs b/web3_proxy/src/bin/web3_proxy_cli/daemon.rs index 69d0e2c7..000b8b51 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/daemon.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/daemon.rs @@ -51,22 +51,25 @@ async fn run( let app_frontend_port = frontend_port; let app_prometheus_port = prometheus_port; + let mut shutdown_receiver = shutdown_sender.subscribe(); // start the main app let mut spawned_app = Web3ProxyApp::spawn(top_config, num_workers, shutdown_sender.subscribe()).await?; - let frontend_handle = tokio::spawn(frontend::serve(app_frontend_port, spawned_app.app.clone())); - - // TODO: should we put this in a dedicated thread? + // start the prometheus metrics port let prometheus_handle = tokio::spawn(metrics_frontend::serve( spawned_app.app.clone(), app_prometheus_port, )); - let mut shutdown_receiver = shutdown_sender.subscribe(); + // wait until the app has seen its first consensus head block + let _ = spawned_app.app.head_block_receiver().changed().await; - // if everything is working, these should both run forever + // start the frontend port + let frontend_handle = tokio::spawn(frontend::serve(app_frontend_port, spawned_app.app.clone())); + + // if everything is working, these should all run forever tokio::select! { x = flatten_handles(spawned_app.app_handles) => { match x { @@ -204,6 +207,7 @@ mod tests { disabled: false, display_name: None, url: anvil.endpoint(), + backup: Some(false), block_data_limit: None, soft_limit: 100, hard_limit: None, @@ -218,6 +222,7 @@ mod tests { disabled: false, display_name: None, url: anvil.ws_endpoint(), + backup: Some(false), block_data_limit: None, soft_limit: 100, hard_limit: None, diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index 9bb125e3..20aabee3 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -198,6 +198,8 @@ pub struct Web3ConnectionConfig { pub soft_limit: u32, /// the requests per second at which the server throws errors (rate limit or otherwise) pub hard_limit: Option, + /// only use this rpc if everything else is lagging too far. this allows us to ignore fast but very low limit rpcs + pub backup: Option, /// All else equal, a server with a lower tier receives all requests #[serde(default = "default_tier")] pub tier: u64, @@ -256,6 +258,8 @@ impl Web3ConnectionConfig { None }; + let backup = self.backup.unwrap_or(false); + Web3Connection::spawn( name, allowed_lag, @@ -267,6 +271,7 @@ impl Web3ConnectionConfig { http_interval_sender, hard_limit, self.soft_limit, + backup, self.block_data_limit, block_map, block_sender, diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index e03cc6fd..e3505c97 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -4,13 +4,13 @@ use super::connections::Web3Connections; use super::transactions::TxStatus; use crate::frontend::authorization::Authorization; use crate::{ - config::BlockAndRpc, jsonrpc::JsonRpcRequest, rpcs::synced_connections::SyncedConnections, + config::BlockAndRpc, jsonrpc::JsonRpcRequest, rpcs::synced_connections::ConsensusConnections, }; use anyhow::Context; use derive_more::From; use ethers::prelude::{Block, TxHash, H256, U64}; use hashbrown::{HashMap, HashSet}; -use log::{debug, warn, Level}; +use log::{debug, error, warn, Level}; use moka::future::Cache; use serde::Serialize; use serde_json::json; @@ -24,7 +24,7 @@ pub type ArcBlock = Arc>; pub type BlockHashesCache = Cache; -/// A block's hash and number. +/// A block and its age. #[derive(Clone, Debug, Default, From, Serialize)] pub struct SavedBlock { pub block: ArcBlock, @@ -99,14 +99,18 @@ impl Display for SavedBlock { impl Web3Connections { /// add a block to our mappings and track the heaviest chain - pub async fn save_block(&self, block: &ArcBlock, heaviest_chain: bool) -> anyhow::Result<()> { + pub async fn save_block( + &self, + block: ArcBlock, + heaviest_chain: bool, + ) -> anyhow::Result { // TODO: i think we can rearrange this function to make it faster on the hot path let block_hash = block.hash.as_ref().context("no block hash")?; // skip Block::default() if block_hash.is_zero() { debug!("Skipping block without hash!"); - return Ok(()); + return Ok(block); } let block_num = block.number.as_ref().context("no block num")?; @@ -121,15 +125,17 @@ impl Web3Connections { // this block is very likely already in block_hashes // TODO: use their get_with - self.block_hashes + let block = self + .block_hashes .get_with(*block_hash, async move { block.clone() }) .await; - Ok(()) + Ok(block) } /// Get a block from caches with fallback. /// Will query a specific node or the best available. + /// TODO: return anyhow::Result>? pub async fn block( &self, authorization: &Arc, @@ -138,6 +144,7 @@ impl Web3Connections { ) -> anyhow::Result { // first, try to get the hash from our cache // the cache is set last, so if its here, its everywhere + // TODO: use try_get_with if let Some(block) = self.block_hashes.get(hash) { return Ok(block); } @@ -178,7 +185,7 @@ impl Web3Connections { // the block was fetched using eth_getBlockByHash, so it should have all fields // TODO: fill in heaviest_chain! if the block is old enough, is this definitely true? - self.save_block(&block, false).await?; + let block = self.save_block(block, false).await?; Ok(block) } @@ -249,7 +256,7 @@ impl Web3Connections { let block: ArcBlock = serde_json::from_str(raw_block.get())?; // the block was fetched using eth_getBlockByNumber, so it should have all fields and be on the heaviest chain - self.save_block(&block, true).await?; + let block = self.save_block(block, true).await?; Ok((block, archive_needed)) } @@ -265,7 +272,7 @@ impl Web3Connections { ) -> anyhow::Result<()> { // TODO: indexmap or hashmap? what hasher? with_capacity? // TODO: this will grow unbounded. prune old heads on this at the same time we prune the graph? - let mut connection_heads = HashMap::new(); + let mut connection_heads = ConsensusFinder::default(); while let Ok((new_block, rpc)) = block_receiver.recv_async().await { let new_block = new_block.map(Into::into); @@ -287,7 +294,7 @@ impl Web3Connections { } } - // TODO: if there was an error, we should return it + // TODO: if there was an error, should we return it instead of an Ok? warn!("block_receiver exited!"); Ok(()) @@ -299,327 +306,590 @@ impl Web3Connections { pub(crate) async fn process_block_from_rpc( &self, authorization: &Arc, - connection_heads: &mut HashMap, + consensus_finder: &mut ConsensusFinder, rpc_head_block: Option, rpc: Arc, head_block_sender: &watch::Sender, pending_tx_sender: &Option>, ) -> anyhow::Result<()> { - // add the rpc's block to connection_heads, or remove the rpc from connection_heads - let rpc_head_block = match rpc_head_block { - Some(rpc_head_block) => { - // we don't know if its on the heaviest chain yet - self.save_block(&rpc_head_block.block, false).await?; - - // TODO: don't default to 60. different chains are differen - if rpc_head_block.syncing(60) { - if connection_heads.remove(&rpc.name).is_some() { - warn!("{} is behind by {} seconds", &rpc.name, rpc_head_block.age); - } else { - // we didn't remove anything and this block is old. exit early - return Ok(()); - }; - - None - } else { - let rpc_head_hash = rpc_head_block.hash(); - - if let Some(prev_hash) = - connection_heads.insert(rpc.name.to_owned(), rpc_head_hash) - { - if prev_hash == rpc_head_hash { - // this block was already sent by this node. return early - return Ok(()); - } - } - - // TODO: should we just keep the ArcBlock here? - Some(rpc_head_block) - } - } - None => { - // // trace!(%rpc, "Block without number or hash!"); - - if connection_heads.remove(&rpc.name).is_none() { - // this connection was already removed. - // return early. no need to process synced connections - return Ok(()); - } - - None - } - }; - - // iterate the known heads to find the highest_work_block - let mut checked_heads = HashSet::new(); - let mut highest_num_block: Option = None; - for (conn_name, connection_head_hash) in connection_heads.iter() { - if checked_heads.contains(connection_head_hash) { - // we already checked this head from another rpc - continue; - } - // don't check the same hash multiple times - checked_heads.insert(connection_head_hash); - - let conn_head_block = if let Some(x) = self.block_hashes.get(connection_head_hash) { - x - } else { - // TODO: why does this happen?!?! seems to only happen with uncled blocks - // TODO: maybe we should do get_with? - // TODO: maybe we should just continue. this only seems to happen when an older block is received - warn!("Missing connection_head_block in block_hashes. Fetching now. hash={}. other={}. rpc={}", connection_head_hash, conn_name, rpc); - - // this option should always be populated - let conn_rpc = self.conns.get(conn_name); - - match self - .block(authorization, connection_head_hash, conn_rpc) - .await - { - Ok(block) => block, - Err(err) => { - warn!("Processing {}. Failed fetching connection_head_block for block_hashes. {} head hash={}. err={:?}", rpc, conn_name, connection_head_hash, err); - continue; - } - } - }; - - match &conn_head_block.number { - None => { - panic!("block is missing number. this is a bug"); - } - Some(conn_head_num) => { - // if this is the first block we've tried - // or if this rpc's newest block has a higher number - // we used to check total difficulty, but that isn't a thing anymore - if highest_num_block.is_none() - || conn_head_num - > highest_num_block - .as_ref() - .expect("there should always be a block here") - .number - .as_ref() - .expect("there should always be number here") - { - highest_num_block = Some(conn_head_block); - } - } - } + // TODO: how should we handle an error here? + if !consensus_finder + .update_rpc(rpc_head_block.clone(), rpc.clone(), self) + .await? + { + // nothing changed. no need + return Ok(()); } - if let Some(mut maybe_head_block) = highest_num_block { - // track rpcs on this heaviest chain so we can build a new SyncedConnections - let mut highest_rpcs = HashSet::<&String>::new(); - // a running total of the soft limits covered by the rpcs that agree on the head block - let mut highest_rpcs_sum_soft_limit: u32 = 0; - // TODO: also track highest_rpcs_sum_hard_limit? llama doesn't need this, so it can wait + let new_synced_connections = consensus_finder + .best_consensus_connections(authorization, self) + .await; - // check the highest work block for a set of rpcs that can serve our request load - // if it doesn't have enough rpcs for our request load, check the parent block - // TODO: loop for how many parent blocks? we don't want to serve blocks that are too far behind. probably different per chain - // TODO: this loop is pretty long. any way to clean up this code? - for _ in 0..3 { - let maybe_head_hash = maybe_head_block - .hash - .as_ref() - .expect("blocks here always need hashes"); + let includes_backups = new_synced_connections.includes_backups; + let consensus_head_block = new_synced_connections.head_block.clone(); + let num_consensus_rpcs = new_synced_connections.num_conns(); + let num_checked_rpcs = new_synced_connections.num_checked_conns; + let num_active_rpcs = consensus_finder.all.rpc_name_to_hash.len(); + let total_rpcs = self.conns.len(); - // find all rpcs with maybe_head_block as their current head - for (conn_name, conn_head_hash) in connection_heads.iter() { - if conn_head_hash != maybe_head_hash { - // connection is not on the desired block - continue; - } - if highest_rpcs.contains(conn_name) { - // connection is on a child block - continue; + let old_synced_connections = self + .synced_connections + .swap(Arc::new(new_synced_connections)); + + if let Some(consensus_saved_block) = consensus_head_block { + match &old_synced_connections.head_block { + None => { + debug!( + "first {}/{}/{}/{} block={}, rpc={}", + num_consensus_rpcs, + num_checked_rpcs, + num_active_rpcs, + total_rpcs, + consensus_saved_block, + rpc + ); + + if includes_backups { + // TODO: what else should be in this error? + warn!("Backup RPCs are in use!"); } - if let Some(rpc) = self.conns.get(conn_name) { - highest_rpcs.insert(conn_name); - highest_rpcs_sum_soft_limit += rpc.soft_limit; - } else { - warn!("connection missing") - } + let consensus_head_block = + self.save_block(consensus_saved_block.block, true).await?; + + head_block_sender + .send(consensus_head_block) + .context("head_block_sender sending consensus_head_block")?; } + Some(old_head_block) => { + // TODO: do this log item better + let rpc_head_str = rpc_head_block + .map(|x| x.to_string()) + .unwrap_or_else(|| "None".to_string()); - if highest_rpcs_sum_soft_limit < self.min_sum_soft_limit - || highest_rpcs.len() < self.min_head_rpcs - { - // not enough rpcs yet. check the parent - if let Some(parent_block) = self.block_hashes.get(&maybe_head_block.parent_hash) - { - // // trace!( - // child=%maybe_head_hash, parent=%parent_block.hash.unwrap(), "avoiding thundering herd", - // ); - - maybe_head_block = parent_block; - continue; - } else { - // TODO: this message - warn!( - "soft limit {}/{} from {}/{} rpcs: {}%", - highest_rpcs_sum_soft_limit, - self.min_sum_soft_limit, - highest_rpcs.len(), - self.min_head_rpcs, - highest_rpcs_sum_soft_limit * 100 / self.min_sum_soft_limit - ); - break; - } - } - } - - // TODO: if consensus_head_rpcs.is_empty, try another method of finding the head block - - let num_connection_heads = connection_heads.len(); - let total_conns = self.conns.len(); - - // we've done all the searching for the heaviest block that we can - if highest_rpcs.is_empty() { - // if we get here, something is wrong. clear synced connections - let empty_synced_connections = SyncedConnections::default(); - - let _ = self - .synced_connections - .swap(Arc::new(empty_synced_connections)); - - // TODO: log different things depending on old_synced_connections - warn!( - "Processing {}. no consensus head! {}/{}/{}", - rpc, 0, num_connection_heads, total_conns - ); - } else { - // // trace!(?highest_rpcs); - - // TODO: if maybe_head_block.time() is old, ignore it - - // success! this block has enough soft limit and nodes on it (or on later blocks) - let conns: Vec> = highest_rpcs - .into_iter() - .filter_map(|conn_name| self.conns.get(conn_name).cloned()) - .collect(); - - // TODO: DEBUG only check - let _ = maybe_head_block - .hash - .expect("head blocks always have hashes"); - let _ = maybe_head_block - .number - .expect("head blocks always have numbers"); - - let num_consensus_rpcs = conns.len(); - - let consensus_head_block: SavedBlock = maybe_head_block.into(); - - let new_synced_connections = SyncedConnections { - head_block: Some(consensus_head_block.clone()), - conns, - }; - - let old_synced_connections = self - .synced_connections - .swap(Arc::new(new_synced_connections)); - - // TODO: if the rpc_head_block != consensus_head_block, log something? - match &old_synced_connections.head_block { - None => { - debug!( - "first {}/{}/{} block={}, rpc={}", - num_consensus_rpcs, - num_connection_heads, - total_conns, - consensus_head_block, - rpc - ); - - self.save_block(&consensus_head_block.block, true).await?; - - head_block_sender - .send(consensus_head_block.block) - .context("head_block_sender sending consensus_head_block")?; - } - Some(old_head_block) => { - // TODO: do this log item better - let rpc_head_str = rpc_head_block - .map(|x| x.to_string()) - .unwrap_or_else(|| "None".to_string()); - - match consensus_head_block.number().cmp(&old_head_block.number()) { - Ordering::Equal => { - // TODO: if rpc_block_id != consensus_head_block, do a different log? - - // multiple blocks with the same fork! - if consensus_head_block.hash() == old_head_block.hash() { - // no change in hash. no need to use head_block_sender - debug!( - "con {}/{}/{} con_head={} rpc_head={} rpc={}", - num_consensus_rpcs, - num_connection_heads, - total_conns, - consensus_head_block, - rpc_head_str, - rpc, - ) - } else { - // hash changed - debug!( - "unc {}/{}/{} con_head={} old={} rpc_head={} rpc={}", - num_consensus_rpcs, - num_connection_heads, - total_conns, - consensus_head_block, - old_head_block, - rpc_head_str, - rpc, - ); - - self.save_block(&consensus_head_block.block, true) - .await - .context("save consensus_head_block as heaviest chain")?; - - head_block_sender.send(consensus_head_block.block).context( - "head_block_sender sending consensus_head_block", - )?; - } - } - Ordering::Less => { - // this is unlikely but possible - // TODO: better log - warn!("chain rolled back {}/{}/{} con_head={} old_head={} rpc_head={} rpc={}", num_consensus_rpcs, num_connection_heads, total_conns, consensus_head_block, old_head_block, rpc_head_str, rpc); - - // TODO: tell save_block to remove any higher block numbers from the cache. not needed because we have other checks on requested blocks being > head, but still seems slike a good idea - self.save_block(&consensus_head_block.block, true) - .await - .context( - "save_block sending consensus_head_block as heaviest chain", - )?; - - head_block_sender - .send(consensus_head_block.block) - .context("head_block_sender sending consensus_head_block")?; - } - Ordering::Greater => { + match consensus_saved_block.number().cmp(&old_head_block.number()) { + Ordering::Equal => { + // multiple blocks with the same fork! + if consensus_saved_block.hash() == old_head_block.hash() { + // no change in hash. no need to use head_block_sender debug!( - "new {}/{}/{} con_head={} rpc_head={} rpc={}", + "con {}/{}/{}/{} con={} rpc={}@{}", num_consensus_rpcs, - num_connection_heads, - total_conns, - consensus_head_block, + num_checked_rpcs, + num_active_rpcs, + total_rpcs, + consensus_saved_block, + rpc, + rpc_head_str, + ) + } else { + // hash changed + + if includes_backups { + // TODO: what else should be in this error? + warn!("Backup RPCs are in use!"); + } + + debug!( + "unc {}/{}/{}/{} con_head={} old={} rpc={}@{}", + num_consensus_rpcs, + num_checked_rpcs, + num_active_rpcs, + total_rpcs, + consensus_saved_block, + old_head_block, + rpc, rpc_head_str, - rpc ); - self.save_block(&consensus_head_block.block, true).await?; + let consensus_head_block = self + .save_block(consensus_saved_block.block, true) + .await + .context("save consensus_head_block as heaviest chain")?; - head_block_sender.send(consensus_head_block.block)?; + head_block_sender + .send(consensus_head_block) + .context("head_block_sender sending consensus_head_block")?; } } + Ordering::Less => { + // this is unlikely but possible + // TODO: better log + warn!( + "chain rolled back {}/{}/{}/{} con={} old={} rpc={}@{}", + num_consensus_rpcs, + num_checked_rpcs, + num_active_rpcs, + total_rpcs, + consensus_saved_block, + old_head_block, + rpc, + rpc_head_str, + ); + + if includes_backups { + // TODO: what else should be in this error? + warn!("Backup RPCs are in use!"); + } + + // TODO: tell save_block to remove any higher block numbers from the cache. not needed because we have other checks on requested blocks being > head, but still seems like a good idea + let consensus_head_block = self + .save_block(consensus_saved_block.block, true) + .await + .context( + "save_block sending consensus_head_block as heaviest chain", + )?; + + head_block_sender + .send(consensus_head_block) + .context("head_block_sender sending consensus_head_block")?; + } + Ordering::Greater => { + debug!( + "new {}/{}/{}/{} con={} rpc={}@{}", + num_consensus_rpcs, + num_checked_rpcs, + num_active_rpcs, + total_rpcs, + consensus_saved_block, + rpc, + rpc_head_str, + ); + + if includes_backups { + // TODO: what else should be in this error? + warn!("Backup RPCs are in use!"); + } + + let consensus_head_block = + self.save_block(consensus_saved_block.block, true).await?; + + head_block_sender.send(consensus_head_block)?; + } } } } + } else { + // TODO: do this log item better + let rpc_head_str = rpc_head_block + .map(|x| x.to_string()) + .unwrap_or_else(|| "None".to_string()); + + if num_checked_rpcs >= self.min_head_rpcs { + error!( + "non {}/{}/{}/{} rpc={}@{}", + num_consensus_rpcs, + num_checked_rpcs, + num_active_rpcs, + total_rpcs, + rpc, + rpc_head_str, + ); + } else { + debug!( + "non {}/{}/{}/{} rpc={}@{}", + num_consensus_rpcs, + num_checked_rpcs, + num_active_rpcs, + total_rpcs, + rpc, + rpc_head_str, + ); + } } Ok(()) } } + +struct ConnectionsGroup { + includes_backups: bool, + rpc_name_to_hash: HashMap, +} + +impl ConnectionsGroup { + fn new(with_backups: bool) -> Self { + Self { + includes_backups: with_backups, + rpc_name_to_hash: Default::default(), + } + } + + fn without_backups() -> Self { + Self::new(false) + } + + fn with_backups() -> Self { + Self::new(true) + } + + fn remove(&mut self, rpc: &Web3Connection) -> Option { + self.rpc_name_to_hash.remove(rpc.name.as_str()) + } + + fn insert(&mut self, rpc: &Web3Connection, block_hash: H256) -> Option { + self.rpc_name_to_hash.insert(rpc.name.clone(), block_hash) + } + + // TODO: i don't love having this here. move to web3_connections? + async fn get_block_from_rpc( + &self, + rpc_name: &str, + hash: &H256, + authorization: &Arc, + web3_connections: &Web3Connections, + ) -> anyhow::Result { + // // TODO: why does this happen?!?! seems to only happen with uncled blocks + // // TODO: maybe we should do try_get_with? + // // TODO: maybe we should just continue. this only seems to happen when an older block is received + // warn!( + // "Missing connection_head_block in block_hashes. Fetching now. hash={}. other={}", + // connection_head_hash, conn_name + // ); + + // this option should almost always be populated. if the connection reconnects at a bad time it might not be available though + let rpc = web3_connections.conns.get(rpc_name); + + web3_connections.block(authorization, hash, rpc).await + } + + // TODO: do this during insert/remove? + pub(self) async fn highest_block( + &self, + authorization: &Arc, + web3_connections: &Web3Connections, + ) -> Option { + let mut checked_heads = HashSet::with_capacity(self.rpc_name_to_hash.len()); + let mut highest_block = None::; + + for (rpc_name, rpc_head_hash) in self.rpc_name_to_hash.iter() { + // don't waste time checking the same hash multiple times + if checked_heads.contains(rpc_head_hash) { + continue; + } + + let rpc_block = match self + .get_block_from_rpc(rpc_name, rpc_head_hash, authorization, web3_connections) + .await + { + Ok(x) => x, + Err(err) => { + warn!( + "failed getting block {} from {} while finding highest block number: {:?}", + rpc_head_hash, rpc_name, err, + ); + continue; + } + }; + + checked_heads.insert(rpc_head_hash); + + // if this is the first block we've tried + // or if this rpc's newest block has a higher number + // we used to check total difficulty, but that isn't a thing anymore on ETH + // TODO: we still need total difficulty on some other PoW chains. whats annoying is it isn't considered part of the "block header" just the block. so websockets don't return it + let highest_num = highest_block + .as_ref() + .map(|x| x.number.expect("blocks here should always have a number")); + let rpc_num = rpc_block.as_ref().number; + + if rpc_num > highest_num { + highest_block = Some(rpc_block); + } + } + + highest_block + } + + pub(self) async fn consensus_head_connections( + &self, + authorization: &Arc, + web3_connections: &Web3Connections, + ) -> anyhow::Result { + let mut maybe_head_block = match self.highest_block(authorization, web3_connections).await { + None => return Err(anyhow::anyhow!("No blocks known")), + Some(x) => x, + }; + + let num_known = self.rpc_name_to_hash.len(); + + // track rpcs on this heaviest chain so we can build a new ConsensusConnections + let mut highest_rpcs = HashSet::<&str>::new(); + // a running total of the soft limits covered by the rpcs that agree on the head block + let mut highest_rpcs_sum_soft_limit: u32 = 0; + // TODO: also track highest_rpcs_sum_hard_limit? llama doesn't need this, so it can wait + + // check the highest work block for a set of rpcs that can serve our request load + // if it doesn't have enough rpcs for our request load, check the parent block + // TODO: loop for how many parent blocks? we don't want to serve blocks that are too far behind. probably different per chain + // TODO: this loop is pretty long. any way to clean up this code? + for _ in 0..6 { + let maybe_head_hash = maybe_head_block + .hash + .as_ref() + .expect("blocks here always need hashes"); + + // find all rpcs with maybe_head_block as their current head + for (rpc_name, rpc_head_hash) in self.rpc_name_to_hash.iter() { + if rpc_head_hash != maybe_head_hash { + // connection is not on the desired block + continue; + } + if highest_rpcs.contains(rpc_name.as_str()) { + // connection is on a child block + continue; + } + + if let Some(rpc) = web3_connections.conns.get(rpc_name.as_str()) { + highest_rpcs.insert(rpc_name); + highest_rpcs_sum_soft_limit += rpc.soft_limit; + } else { + // i don't think this is an error. i think its just if a reconnect is currently happening + warn!("connection missing: {}", rpc_name); + } + } + + if highest_rpcs_sum_soft_limit >= web3_connections.min_sum_soft_limit + && highest_rpcs.len() >= web3_connections.min_head_rpcs + { + // we have enough servers with enough requests + break; + } + + // not enough rpcs yet. check the parent block + if let Some(parent_block) = web3_connections + .block_hashes + .get(&maybe_head_block.parent_hash) + { + // trace!( + // child=%maybe_head_hash, parent=%parent_block.hash.unwrap(), "avoiding thundering herd", + // ); + + maybe_head_block = parent_block; + continue; + } else { + if num_known < web3_connections.min_head_rpcs { + return Err(anyhow::anyhow!( + "not enough rpcs connected: {}/{}/{}", + highest_rpcs.len(), + num_known, + web3_connections.min_head_rpcs, + )); + } else { + let soft_limit_percent = (highest_rpcs_sum_soft_limit as f32 + / web3_connections.min_sum_soft_limit as f32) + * 100.0; + + return Err(anyhow::anyhow!( + "ran out of parents to check. rpcs {}/{}/{}. soft limit: {:.2}% ({}/{})", + highest_rpcs.len(), + num_known, + web3_connections.min_head_rpcs, + highest_rpcs_sum_soft_limit, + web3_connections.min_sum_soft_limit, + soft_limit_percent, + )); + } + } + } + + // TODO: if consensus_head_rpcs.is_empty, try another method of finding the head block. will need to change the return Err above into breaks. + + // we've done all the searching for the heaviest block that we can + if highest_rpcs.len() < web3_connections.min_head_rpcs + || highest_rpcs_sum_soft_limit < web3_connections.min_sum_soft_limit + { + // if we get here, not enough servers are synced. return an error + let soft_limit_percent = (highest_rpcs_sum_soft_limit as f32 + / web3_connections.min_sum_soft_limit as f32) + * 100.0; + + return Err(anyhow::anyhow!( + "Not enough resources. rpcs {}/{}/{}. soft limit: {:.2}% ({}/{})", + highest_rpcs.len(), + num_known, + web3_connections.min_head_rpcs, + highest_rpcs_sum_soft_limit, + web3_connections.min_sum_soft_limit, + soft_limit_percent, + )); + } + + // success! this block has enough soft limit and nodes on it (or on later blocks) + let conns: Vec> = highest_rpcs + .into_iter() + .filter_map(|conn_name| web3_connections.conns.get(conn_name).cloned()) + .collect(); + + // TODO: DEBUG only check + let _ = maybe_head_block + .hash + .expect("head blocks always have hashes"); + let _ = maybe_head_block + .number + .expect("head blocks always have numbers"); + + let consensus_head_block: SavedBlock = maybe_head_block.into(); + + Ok(ConsensusConnections { + head_block: Some(consensus_head_block), + conns, + num_checked_conns: self.rpc_name_to_hash.len(), + includes_backups: self.includes_backups, + }) + } +} + +/// A ConsensusConnections builder that tracks all connection heads across multiple groups of servers +pub struct ConsensusFinder { + /// only main servers + main: ConnectionsGroup, + /// main and backup servers + all: ConnectionsGroup, +} + +impl Default for ConsensusFinder { + fn default() -> Self { + Self { + main: ConnectionsGroup::without_backups(), + all: ConnectionsGroup::with_backups(), + } + } +} + +impl ConsensusFinder { + fn remove(&mut self, rpc: &Web3Connection) -> Option { + // TODO: should we have multiple backup tiers? (remote datacenters vs third party) + if !rpc.backup { + self.main.remove(rpc); + } + self.all.remove(rpc) + } + + fn insert(&mut self, rpc: &Web3Connection, new_hash: H256) -> Option { + // TODO: should we have multiple backup tiers? (remote datacenters vs third party) + if !rpc.backup { + self.main.insert(rpc, new_hash); + } + self.all.insert(rpc, new_hash) + } + + /// Update our tracking of the rpc and return true if something changed + async fn update_rpc( + &mut self, + rpc_head_block: Option, + rpc: Arc, + // we need this so we can save the block to caches. i don't like it though. maybe we should use a lazy_static Cache wrapper that has a "save_block" method?. i generally dislike globals but i also dislike all the types having to pass eachother around + web3_connections: &Web3Connections, + ) -> anyhow::Result { + // add the rpc's block to connection_heads, or remove the rpc from connection_heads + let changed = match rpc_head_block { + Some(mut rpc_head_block) => { + // we don't know if its on the heaviest chain yet + rpc_head_block.block = web3_connections + .save_block(rpc_head_block.block, false) + .await?; + + // we used to remove here if the block was too far behind. but it just made things more complicated + + let rpc_head_hash = rpc_head_block.hash(); + + if let Some(prev_hash) = self.insert(&rpc, rpc_head_hash) { + if prev_hash == rpc_head_hash { + // this block was already sent by this rpc. return early + false + } else { + // new block for this rpc + true + } + } else { + // first block for this rpc + true + } + } + None => { + if self.remove(&rpc).is_none() { + // this rpc was already removed + false + } else { + // rpc head changed from being synced to not + true + } + } + }; + + Ok(changed) + } + + // TODO: this could definitely be cleaner. i don't like the error handling/unwrapping + async fn best_consensus_connections( + &mut self, + authorization: &Arc, + web3_connections: &Web3Connections, + ) -> ConsensusConnections { + let highest_block_num = match self + .all + .highest_block(authorization, web3_connections) + .await + { + None => { + return ConsensusConnections::default(); + } + Some(x) => x.number.expect("blocks here should always have a number"), + }; + + let min_block_num = highest_block_num.saturating_sub(U64::from(5)); + + // TODO: pass `min_block_num` to consensus_head_connections? + let consensus_head_for_main = self + .main + .consensus_head_connections(authorization, web3_connections) + .await + .map_err(|err| err.context("cannot use main group")); + + let consensus_num_for_main = consensus_head_for_main + .as_ref() + .ok() + .map(|x| x.head_block.as_ref().unwrap().number()); + + if let Some(consensus_num_for_main) = consensus_num_for_main { + if consensus_num_for_main >= min_block_num { + return consensus_head_for_main.unwrap(); + } + } + + // TODO: pass `min_block_num` to consensus_head_connections? + let consensus_connections_for_all = match self + .all + .consensus_head_connections(authorization, web3_connections) + .await + { + Err(err) => { + warn!("Unable to find any consensus head: {}", err); + return ConsensusConnections::default(); + } + Ok(x) => x, + }; + + let consensus_num_for_all = consensus_connections_for_all + .head_block + .as_ref() + .map(|x| x.number()); + + if consensus_num_for_all > consensus_num_for_main { + if consensus_num_for_all < Some(min_block_num) { + // TODO: this should have an alarm in sentry + error!("CONSENSUS HEAD w/ BACKUP NODES IS VERY OLD!"); + } + consensus_connections_for_all + } else { + if let Ok(x) = consensus_head_for_main { + error!("CONSENSUS HEAD IS VERY OLD! Backup RPCs did not improve this situation"); + x + } else { + error!("NO CONSENSUS HEAD!"); + ConsensusConnections::default() + } + } + } +} diff --git a/web3_proxy/src/rpcs/connection.rs b/web3_proxy/src/rpcs/connection.rs index a4e83a76..7b650316 100644 --- a/web3_proxy/src/rpcs/connection.rs +++ b/web3_proxy/src/rpcs/connection.rs @@ -84,6 +84,8 @@ pub struct Web3Connection { pub(super) soft_limit: u32, /// use web3 queries to find the block data limit for archive/pruned nodes pub(super) automatic_block_limit: bool, + /// only use this rpc if everything else is lagging too far. this allows us to ignore fast but very low limit rpcs + pub(super) backup: bool, /// TODO: have an enum for this so that "no limit" prints pretty? pub(super) block_data_limit: AtomicU64, /// Lower tiers are higher priority when sending requests @@ -111,6 +113,7 @@ impl Web3Connection { hard_limit: Option<(u64, RedisPool)>, // TODO: think more about this type soft_limit: u32, + backup: bool, block_data_limit: Option, block_map: BlockHashesCache, block_sender: Option>, @@ -149,6 +152,7 @@ impl Web3Connection { hard_limit, soft_limit, automatic_block_limit, + backup, block_data_limit, head_block: RwLock::new(Default::default()), tier, @@ -304,6 +308,7 @@ impl Web3Connection { None => return false, Some(x) => { // TODO: this 60 second limit is causing our polygons to fall behind. change this to number of blocks? + // TODO: sometimes blocks might actually just take longer than 60 seconds if x.syncing(60) { // skip syncing nodes. even though they might be able to serve a query, // latency will be poor and it will get in the way of them syncing further @@ -648,7 +653,7 @@ impl Web3Connection { // if this block is too old, return an error so we reconnect let current_lag = x.lag(); if current_lag > allowed_lag { - let level = if warned == 0 { + let level = if warned == 0 && !conn.backup { log::Level::Warn } else if warned % 100 == 0 { log::Level::Debug @@ -1225,6 +1230,7 @@ mod tests { hard_limit: None, soft_limit: 1_000, automatic_block_limit: false, + backup: false, block_data_limit: block_data_limit.into(), tier: 0, head_block: RwLock::new(Some(head_block.clone())), @@ -1273,6 +1279,7 @@ mod tests { hard_limit: None, soft_limit: 1_000, automatic_block_limit: false, + backup: false, block_data_limit: block_data_limit.into(), tier: 0, head_block: RwLock::new(Some(head_block.clone())), @@ -1325,6 +1332,7 @@ mod tests { hard_limit: None, soft_limit: 1_000, automatic_block_limit: false, + backup: false, block_data_limit: block_data_limit.into(), tier: 0, head_block: RwLock::new(Some(head_block.clone())), diff --git a/web3_proxy/src/rpcs/connections.rs b/web3_proxy/src/rpcs/connections.rs index 82dcbbe7..93493716 100644 --- a/web3_proxy/src/rpcs/connections.rs +++ b/web3_proxy/src/rpcs/connections.rs @@ -4,7 +4,7 @@ use super::connection::Web3Connection; use super::request::{ OpenRequestHandle, OpenRequestHandleMetrics, OpenRequestResult, RequestRevertHandler, }; -use super::synced_connections::SyncedConnections; +use super::synced_connections::ConsensusConnections; use crate::app::{flatten_handle, AnyhowJoinHandle}; use crate::config::{BlockAndRpc, TxHashAndRpc, Web3ConnectionConfig}; use crate::frontend::authorization::{Authorization, RequestMetadata}; @@ -40,7 +40,7 @@ use tokio::time::{interval, sleep, sleep_until, Duration, Instant, MissedTickBeh pub struct Web3Connections { pub(crate) conns: HashMap>, /// any requests will be forwarded to one (or more) of these connections - pub(super) synced_connections: ArcSwap, + pub(super) synced_connections: ArcSwap, pub(super) pending_transactions: Cache, /// TODO: this map is going to grow forever unless we do some sort of pruning. maybe store pruned in redis? @@ -196,7 +196,7 @@ impl Web3Connections { } } - let synced_connections = SyncedConnections::default(); + let synced_connections = ConsensusConnections::default(); // TODO: max_capacity and time_to_idle from config // all block hashes are the same size, so no need for weigher @@ -329,6 +329,7 @@ impl Web3Connections { } /// Send the same request to all the handles. Returning the most common success or most common error. + /// TODO: option to return the fastest response and handles for all the others instead? pub async fn try_send_parallel_requests( &self, active_request_handles: Vec, @@ -501,7 +502,7 @@ impl Web3Connections { .collect(); trace!("minimum available requests: {}", minimum); - trace!("maximum available requests: {}", minimum); + trace!("maximum available requests: {}", maximum); if maximum < 0.0 { // TODO: if maximum < 0 and there are other tiers on the same block, we should include them now @@ -725,10 +726,20 @@ impl Web3Connections { } // some errors should be retried on other nodes + let error_msg = error.message.as_str(); + + // different providers do different codes. check all of them + // TODO: there's probably more strings to add here + let rate_limit_substrings = ["limit", "exceeded"]; + for rate_limit_substr in rate_limit_substrings { + if error_msg.contains(rate_limit_substr) { + warn!("rate limited by {:?}", skip_rpcs.last()); + continue; + } + } + match error.code { -32000 => { - let error_msg = error.message.as_str(); - // TODO: regex? let retry_prefixes = [ "header not found", @@ -866,7 +877,7 @@ impl Web3Connections { // TODO: return a 502? if it does? // return Err(anyhow::anyhow!("no available rpcs!")); // TODO: sleep how long? - // TODO: subscribe to something in SyncedConnections instead + // TODO: subscribe to something in ConsensusConnections instead sleep(Duration::from_millis(200)).await; continue; @@ -951,7 +962,11 @@ mod tests { // TODO: why is this allow needed? does tokio::test get in the way somehow? #![allow(unused_imports)] use super::*; - use crate::rpcs::{blockchain::SavedBlock, connection::ProviderState, provider::Web3Provider}; + use crate::rpcs::{ + blockchain::{ConsensusFinder, SavedBlock}, + connection::ProviderState, + provider::Web3Provider, + }; use ethers::types::{Block, U256}; use log::{trace, LevelFilter}; use parking_lot::RwLock; @@ -992,8 +1007,8 @@ mod tests { let head_block = Arc::new(head_block); // TODO: write a impl From for Block -> BlockId? - let lagged_block: SavedBlock = lagged_block.into(); - let head_block: SavedBlock = head_block.into(); + let mut lagged_block: SavedBlock = lagged_block.into(); + let mut head_block: SavedBlock = head_block.into(); let block_data_limit = u64::MAX; @@ -1012,6 +1027,7 @@ mod tests { hard_limit: None, soft_limit: 1_000, automatic_block_limit: true, + backup: false, block_data_limit: block_data_limit.into(), tier: 0, head_block: RwLock::new(Some(head_block.clone())), @@ -1032,6 +1048,7 @@ mod tests { hard_limit: None, soft_limit: 1_000, automatic_block_limit: false, + backup: false, block_data_limit: block_data_limit.into(), tier: 0, head_block: RwLock::new(Some(lagged_block.clone())), @@ -1072,7 +1089,7 @@ mod tests { let (head_block_sender, _head_block_receiver) = watch::channel::(Default::default()); - let mut connection_heads = HashMap::new(); + let mut connection_heads = ConsensusFinder::default(); // process None so that conns @@ -1123,7 +1140,7 @@ mod tests { assert!(matches!(x, OpenRequestResult::NotReady)); // add lagged blocks to the conns. both servers should be allowed - conns.save_block(&lagged_block.block, true).await.unwrap(); + lagged_block.block = conns.save_block(lagged_block.block, true).await.unwrap(); conns .process_block_from_rpc( @@ -1151,7 +1168,7 @@ mod tests { assert_eq!(conns.num_synced_rpcs(), 2); // add head block to the conns. lagged_rpc should not be available - conns.save_block(&head_block.block, true).await.unwrap(); + head_block.block = conns.save_block(head_block.block, true).await.unwrap(); conns .process_block_from_rpc( @@ -1236,6 +1253,7 @@ mod tests { hard_limit: None, soft_limit: 3_000, automatic_block_limit: false, + backup: false, block_data_limit: 64.into(), tier: 1, head_block: RwLock::new(Some(head_block.clone())), @@ -1256,6 +1274,7 @@ mod tests { hard_limit: None, soft_limit: 1_000, automatic_block_limit: false, + backup: false, block_data_limit: u64::MAX.into(), tier: 2, head_block: RwLock::new(Some(head_block.clone())), @@ -1295,7 +1314,7 @@ mod tests { let (head_block_sender, _head_block_receiver) = watch::channel::(Default::default()); - let mut connection_heads = HashMap::new(); + let mut connection_heads = ConsensusFinder::default(); conns .process_block_from_rpc( diff --git a/web3_proxy/src/rpcs/synced_connections.rs b/web3_proxy/src/rpcs/synced_connections.rs index f6a5e288..824857ce 100644 --- a/web3_proxy/src/rpcs/synced_connections.rs +++ b/web3_proxy/src/rpcs/synced_connections.rs @@ -9,19 +9,33 @@ use std::sync::Arc; /// A collection of Web3Connections that are on the same block. /// Serialize is so we can print it on our debug endpoint #[derive(Clone, Default, Serialize)] -pub struct SyncedConnections { +pub struct ConsensusConnections { // TODO: store ArcBlock instead? pub(super) head_block: Option, // TODO: this should be able to serialize, but it isn't #[serde(skip_serializing)] pub(super) conns: Vec>, + pub(super) num_checked_conns: usize, + pub(super) includes_backups: bool, } -impl fmt::Debug for SyncedConnections { +impl ConsensusConnections { + pub fn num_conns(&self) -> usize { + self.conns.len() + } + + pub fn sum_soft_limit(&self) -> u32 { + self.conns.iter().fold(0, |sum, rpc| sum + rpc.soft_limit) + } + + // TODO: sum_hard_limit? +} + +impl fmt::Debug for ConsensusConnections { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { // TODO: the default formatter takes forever to write. this is too quiet though // TODO: print the actual conns? - f.debug_struct("SyncedConnections") + f.debug_struct("ConsensusConnections") .field("head_block", &self.head_block) .field("num_conns", &self.conns.len()) .finish_non_exhaustive()