diff --git a/TODO.md b/TODO.md index 04f253b8..352fca4d 100644 --- a/TODO.md +++ b/TODO.md @@ -677,3 +677,5 @@ in another repo: event subscriber - [ ] give public_recent_ips_salt a better, more general, name - [ ] include tier in the head block logs? - [ ] i think i use FuturesUnordered when a try_join_all might be better +- [ ] since we are read-heavy on our configs, maybe we should use a cache + - "using a thread local storage and explicit types" https://docs.rs/arc-swap/latest/arc_swap/cache/struct.Cache.html diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index da58a0ab..ef75f72b 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -8,7 +8,6 @@ use crate::{config::BlockAndRpc, jsonrpc::JsonRpcRequest}; use anyhow::{anyhow, Context}; use derive_more::From; use ethers::prelude::{Block, TxHash, H256, U64}; -use hashbrown::HashSet; use log::{debug, error, trace, warn, Level}; use moka::future::Cache; use serde::Serialize; @@ -339,18 +338,7 @@ impl Web3Rpcs { // Geth's subscriptions have the same potential for skipping blocks. pending_tx_sender: Option>, ) -> anyhow::Result<()> { - // TODO: indexmap or hashmap? what hasher? with_capacity? - // TODO: this will grow unbounded. prune old heads on this at the same time we prune the graph? - let configured_tiers: Vec = self - .by_name - .values() - .map(|x| x.tier) - .collect::>() - .into_iter() - .collect(); - - let mut connection_heads = - ConsensusFinder::new(&configured_tiers, self.max_block_age, self.max_block_lag); + let mut connection_heads = ConsensusFinder::new(self.max_block_age, self.max_block_lag); loop { match block_receiver.recv_async().await { @@ -385,13 +373,13 @@ impl Web3Rpcs { &self, authorization: &Arc, consensus_finder: &mut ConsensusFinder, - rpc_head_block: Option, + new_block: Option, rpc: Arc, pending_tx_sender: &Option>, ) -> anyhow::Result<()> { // TODO: how should we handle an error here? if !consensus_finder - .update_rpc(rpc_head_block.clone(), rpc.clone(), self) + .update_rpc(new_block.clone(), rpc.clone(), self) .await .context("failed to update rpc")? { @@ -422,7 +410,7 @@ impl Web3Rpcs { .all_rpcs_group() .map(|x| x.len()) .unwrap_or_default(); - let total_rpcs = self.by_name.len(); + let total_rpcs = self.by_name.read().len(); let old_consensus_head_connections = self .watch_consensus_rpcs_sender @@ -462,7 +450,7 @@ impl Web3Rpcs { } Some(old_head_block) => { // TODO: do this log item better - let rpc_head_str = rpc_head_block + let rpc_head_str = new_block .map(|x| x.to_string()) .unwrap_or_else(|| "None".to_string()); @@ -578,7 +566,7 @@ impl Web3Rpcs { } } else { // TODO: do this log item better - let rpc_head_str = rpc_head_block + let rpc_head_str = new_block .map(|x| x.to_string()) .unwrap_or_else(|| "None".to_string()); diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index c12f52f3..fd4036db 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -141,54 +141,6 @@ impl ConnectionsGroup { self.rpc_name_to_block.insert(rpc.name.clone(), block) } - // // TODO: do this during insert/remove? - // pub(self) async fn highest_block( - // &self, - // authorization: &Arc, - // web3_rpcs: &Web3Rpcs, - // ) -> Option { - // let mut checked_heads = HashSet::with_capacity(self.rpc_name_to_hash.len()); - // let mut highest_block = None::; - - // for (rpc_name, rpc_head_hash) in self.rpc_name_to_hash.iter() { - // // don't waste time checking the same hash multiple times - // if checked_heads.contains(rpc_head_hash) { - // continue; - // } - - // let rpc_block = match web3_rpcs - // .get_block_from_rpc(rpc_name, rpc_head_hash, authorization) - // .await - // { - // Ok(x) => x, - // Err(err) => { - // warn!( - // "failed getting block {} from {} while finding highest block number: {:?}", - // rpc_head_hash, rpc_name, err, - // ); - // continue; - // } - // }; - - // checked_heads.insert(rpc_head_hash); - - // // if this is the first block we've tried - // // or if this rpc's newest block has a higher number - // // we used to check total difficulty, but that isn't a thing anymore on ETH - // // TODO: we still need total difficulty on some other PoW chains. whats annoying is it isn't considered part of the "block header" just the block. so websockets don't return it - // let highest_num = highest_block - // .as_ref() - // .map(|x| x.number.expect("blocks here should always have a number")); - // let rpc_num = rpc_block.as_ref().number; - - // if rpc_num > highest_num { - // highest_block = Some(rpc_block); - // } - // } - - // highest_block - // } - /// min_consensus_block_num keeps us from ever going backwards. /// TODO: think about min_consensus_block_num more. i think this might cause an outage if the chain is doing weird things. but 503s is probably better than broken data. pub(self) async fn consensus_head_connections( @@ -266,7 +218,7 @@ impl ConnectionsGroup { continue; } - if let Some(rpc) = web3_rpcs.by_name.get(rpc_name.as_str()) { + if let Some(rpc) = web3_rpcs.by_name.read().get(rpc_name.as_str()) { if backup_rpcs_voted.is_some() { // backups already voted for a head block. don't change it } else { @@ -280,7 +232,7 @@ impl ConnectionsGroup { } else { // i don't think this is an error. i think its just if a reconnect is currently happening warn!("connection missing: {}", rpc_name); - debug!("web3_rpcs.conns: {:#?}", web3_rpcs.by_name); + debug!("web3_rpcs.by_name: {:#?}", web3_rpcs.by_name); } } @@ -363,7 +315,7 @@ impl ConnectionsGroup { // success! this block has enough soft limit and nodes on it (or on later blocks) let rpcs: Vec> = primary_consensus_rpcs .into_iter() - .filter_map(|conn_name| web3_rpcs.by_name.get(conn_name).cloned()) + .filter_map(|conn_name| web3_rpcs.by_name.read().get(conn_name).cloned()) .collect(); #[cfg(debug_assertions)] @@ -396,20 +348,16 @@ pub struct ConsensusFinder { } impl ConsensusFinder { - pub fn new( - configured_tiers: &[u64], - max_block_age: Option, - max_block_lag: Option, - ) -> Self { - // TODO: what's a good capacity for this? + pub fn new(max_block_age: Option, max_block_lag: Option) -> Self { + // TODO: what's a good capacity for this? it shouldn't need to be very large + // TODO: if we change Web3ProxyBlock to store the instance, i think we could use the block_by_hash cache let first_seen = Cache::builder() .max_capacity(16) .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()); - // TODO: this will need some thought when config reloading is written - let tiers = configured_tiers - .iter() - .map(|x| (*x, ConnectionsGroup::new(first_seen.clone()))) + // TODO: hard coding 0-9 isn't great, but its easier than refactoring this to be smart about config reloading + let tiers = (0..10) + .map(|x| (x, ConnectionsGroup::new(first_seen.clone()))) .collect(); Self { diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 1ae7ce9f..d323b554 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -21,6 +21,7 @@ use log::{debug, error, info, trace, warn, Level}; use migration::sea_orm::DatabaseConnection; use moka::future::{Cache, ConcurrentCacheExt}; use ordered_float::OrderedFloat; +use parking_lot::RwLock; use serde::ser::{SerializeStruct, Serializer}; use serde::Serialize; use serde_json::json; @@ -31,7 +32,7 @@ use std::sync::atomic::{self, Ordering}; use std::sync::Arc; use std::{cmp, fmt}; use thread_fast_rng::rand::seq::SliceRandom; -use tokio::sync::{broadcast, watch, RwLock as AsyncRwLock}; +use tokio::sync::{broadcast, watch}; use tokio::task; use tokio::time::{interval, sleep, sleep_until, Duration, Instant, MissedTickBehavior}; @@ -41,7 +42,9 @@ pub struct Web3Rpcs { /// if watch_consensus_head_sender is some, Web3Rpc inside self will send blocks here when they get them pub(crate) block_sender: flume::Sender<(Option, Arc)>, /// any requests will be forwarded to one (or more) of these connections - pub(crate) by_name: HashMap>, + /// TODO: i tried to make this an AsyncRwLock, but then we have trouble serializing it + /// TODO: maybe an ArcSwap would be better. writes are rare + pub(crate) by_name: RwLock>>, pub(crate) http_interval_sender: Option>>, /// all providers with the same consensus head block. won't update if there is no `self.watch_consensus_head_sender` /// TODO: document that this is a watch sender and not a broadcast! if things get busy, blocks might get missed @@ -269,16 +272,13 @@ impl Web3Rpcs { }) .collect(); - // map of connection names to their connection - let connections = AsyncRwLock::new(HashMap::new()); - while let Some(x) = spawn_handles.next().await { match x { Ok(Ok((connection, _handle))) => { // web3 connection worked - let old_rpc = connections + let old_rpc = self + .by_name .write() - .await .insert(connection.name.clone(), connection); if let Some(old_rpc) = old_rpc { @@ -303,11 +303,11 @@ impl Web3Rpcs { Ok(()) } - pub fn get(&self, conn_name: &str) -> Option<&Arc> { - self.by_name.get(conn_name) + pub fn get(&self, conn_name: &str) -> Option> { + self.by_name.read().get(conn_name).cloned() } - pub async fn min_head_rpcs(&self) -> usize { + pub fn min_head_rpcs(&self) -> usize { self.min_head_rpcs } @@ -526,6 +526,7 @@ impl Web3Rpcs { // TODO: maybe have a helper on synced_connections? that way sum_soft_limits/min_synced_rpcs will be DRY for x in self .by_name + .read() .values() .filter(|x| { // TODO: move a bunch of this onto a rpc.is_synced function @@ -744,7 +745,7 @@ impl Web3Rpcs { let mut max_count = if let Some(max_count) = max_count { max_count } else { - self.by_name.len() + self.by_name.read().len() }; let mut tried = HashSet::new(); @@ -756,7 +757,7 @@ impl Web3Rpcs { // if there aren't enough synced connections, include more connections // TODO: only do this sorting if the synced_conns isn't enough - let mut all_conns: Vec<_> = self.by_name.values().cloned().collect(); + let mut all_conns: Vec<_> = self.by_name.read().values().cloned().collect(); all_conns.sort_by_cached_key(rpc_sync_status_sort_key); for connection in itertools::chain(synced_conns, all_conns) { @@ -835,7 +836,7 @@ impl Web3Rpcs { loop { let num_skipped = skip_rpcs.len(); - if num_skipped == self.by_name.len() { + if num_skipped >= self.by_name.read().len() { break; } @@ -1008,7 +1009,7 @@ impl Web3Rpcs { return Ok(r); } - let num_conns = self.by_name.len(); + let num_conns = self.by_name.read().len(); let num_skipped = skip_rpcs.len(); if num_skipped == 0 { @@ -1171,9 +1172,12 @@ impl Serialize for Web3Rpcs { { let mut state = serializer.serialize_struct("Web3Rpcs", 6)?; - let rpcs: Vec<&Web3Rpc> = self.by_name.values().map(|x| x.as_ref()).collect(); - // TODO: coordinate with frontend team to rename "conns" to "rpcs" - state.serialize_field("conns", &rpcs)?; + { + let by_name = self.by_name.read(); + let rpcs: Vec<&Web3Rpc> = by_name.values().map(|x| x.as_ref()).collect(); + // TODO: coordinate with frontend team to rename "conns" to "rpcs" + state.serialize_field("conns", &rpcs)?; + } { let consensus_connections = self.watch_consensus_rpcs_sender.borrow().clone(); @@ -1387,7 +1391,7 @@ mod tests { // TODO: make a Web3Rpcs::new let rpcs = Web3Rpcs { block_sender, - by_name: rpcs_by_name, + by_name: RwLock::new(rpcs_by_name), http_interval_sender: None, watch_consensus_head_sender: Some(watch_consensus_head_sender), watch_consensus_rpcs_sender, @@ -1409,7 +1413,7 @@ mod tests { let authorization = Arc::new(Authorization::internal(None).unwrap()); - let mut consensus_finder = ConsensusFinder::new(&[0, 1, 2, 3], None, None); + let mut consensus_finder = ConsensusFinder::new(None, None); // process None so that rpcs.process_block_from_rpc( @@ -1588,7 +1592,7 @@ mod tests { // TODO: make a Web3Rpcs::new let rpcs = Web3Rpcs { block_sender, - by_name: rpcs_by_name, + by_name: RwLock::new(rpcs_by_name), http_interval_sender: None, watch_consensus_head_sender: Some(watch_consensus_head_sender), watch_consensus_rpcs_sender, @@ -1608,7 +1612,7 @@ mod tests { let authorization = Arc::new(Authorization::internal(None).unwrap()); - let mut connection_heads = ConsensusFinder::new(&[0, 1, 2, 3], None, None); + let mut connection_heads = ConsensusFinder::new(None, None); // min sum soft limit will require tier 2 rpcs.process_block_from_rpc(