diff --git a/TODO.md b/TODO.md index f1a27537..573a911a 100644 --- a/TODO.md +++ b/TODO.md @@ -53,9 +53,11 @@ - [x] web3_sha3 rpc command - [x] test that launches anvil and connects the proxy to it and does some basic queries - [x] need to have some sort of shutdown signaling. doesn't need to be graceful at this point, but should be eventually -- [ ] if the fastest server has hit rate limits, we won't be able to serve any traffic until another server is synced. +- [x] if the fastest server has hit rate limits, we won't be able to serve any traffic until another server is synced. - thundering herd problem if we only allow a lag of 0 blocks - - we can improve this by only `publish`ing the sorted list once a threshold of total available soft and hard limits is passed. how can we do this without hammering redis? at least its only once per block per server + - we can improve this by only publishing the synced connections once a threshold of total available soft and hard limits is passed. how can we do this without hammering redis? at least its only once per block per server + - [x] instead of tracking `pending_synced_connections`, have a mapping of where all connections are individually. then each change, re-check for consensus. +- [ ] synced connections swap threshold should come from config - [ ] basic request method stats - [ ] nice output when cargo doc is run @@ -183,4 +185,5 @@ in another repo: event subscriber 2022-07-22T23:52:18.593956Z WARN block_receiver: web3_proxy::connections: chain is forked! 1 possible heads. 1/1/4 rpcs have 0xa906…5bc1 rpc=Web3Connection { url: "ws://127.0.0.1:8546", data: 64, .. } new_block_num=15195517 2022-07-22T23:52:18.983441Z WARN block_receiver: web3_proxy::connections: chain is forked! 1 possible heads. 1/1/4 rpcs have 0x70e8…48e0 rpc=Web3Connection { url: "ws://127.0.0.1:8546", data: 64, .. } new_block_num=15195517 2022-07-22T23:52:19.350720Z WARN block_receiver: web3_proxy::connections: chain is forked! 2 possible heads. 1/2/4 rpcs have 0x70e8…48e0 rpc=Web3Connection { url: "ws://127.0.0.1:8549", data: "archive", .. } new_block_num=15195517 - 2022-07-22T23:52:26.041140Z WARN block_receiver: web3_proxy::connections: chain is forked! 2 possible heads. 2/4/4 rpcs have 0x70e8…48e0 rpc=Web3Connection { url: "http://127.0.0.1:8549", data: "archive", .. } new_block_num=15195517 \ No newline at end of file + 2022-07-22T23:52:26.041140Z WARN block_receiver: web3_proxy::connections: chain is forked! 2 possible heads. 2/4/4 rpcs have 0x70e8…48e0 rpc=Web3Connection { url: "http://127.0.0.1:8549", data: "archive", .. } new_block_num=15195517 + - [ ] threshold should check actual available request limits (if any) instead of just the soft limit diff --git a/docker-compose.yml b/docker-compose.yml index 3c68d3c8..916ca9a0 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -5,6 +5,7 @@ version: "3.4" services: dev-redis: build: ./redis-cell-server/ + # TODO: expose these ports? dev-eth: extends: diff --git a/web3-proxy/src/app.rs b/web3-proxy/src/app.rs index b0b755d0..03e0c08a 100644 --- a/web3-proxy/src/app.rs +++ b/web3-proxy/src/app.rs @@ -824,6 +824,7 @@ impl Web3ProxyApp { "eth_blockNumber" => { let head_block_number = self.balanced_rpcs.get_head_block_num(); + // TODO: technically, block 0 is okay. i guess we should be using an option if head_block_number.as_u64() == 0 { return Err(anyhow::anyhow!("no servers synced")); } diff --git a/web3-proxy/src/connection.rs b/web3-proxy/src/connection.rs index bbecbbd3..5fb4cea6 100644 --- a/web3-proxy/src/connection.rs +++ b/web3-proxy/src/connection.rs @@ -268,13 +268,17 @@ impl Web3Connection { Ok((new_connection, handle)) } + pub fn url(&self) -> &str { + &self.url + } + /// TODO: this might be too simple. different nodes can prune differently - pub fn get_block_data_limit(&self) -> U64 { + pub fn block_data_limit(&self) -> U64 { self.block_data_limit.load(atomic::Ordering::Acquire).into() } pub fn has_block_data(&self, needed_block_num: &U64) -> bool { - let block_data_limit: U64 = self.get_block_data_limit(); + let block_data_limit: U64 = self.block_data_limit(); let newest_block_num = self.head_block.read().1; @@ -301,7 +305,7 @@ impl Web3Connection { *provider = None; - // tell the block subscriber that we are at 0 + // tell the block subscriber that we don't have any blocks if let Some(block_sender) = block_sender { block_sender .send_async((Arc::new(Block::default()), self.clone())) diff --git a/web3-proxy/src/connections.rs b/web3-proxy/src/connections.rs index d3775f13..21ec22a1 100644 --- a/web3-proxy/src/connections.rs +++ b/web3-proxy/src/connections.rs @@ -23,7 +23,7 @@ use std::time::Duration; use tokio::sync::{broadcast, watch}; use tokio::task; use tokio::time::{interval, sleep, MissedTickBehavior}; -use tracing::{debug, error, info, info_span, instrument, trace, warn}; +use tracing::{debug, error, info, instrument, trace, warn}; use crate::app::{flatten_handle, AnyhowJoinHandle, TxState}; use crate::config::Web3ConnectionConfig; @@ -38,7 +38,7 @@ struct SyncedConnections { // TODO: this should be able to serialize, but it isn't // TODO: use linkedhashmap? #[serde(skip_serializing)] - inner: IndexSet>, + conns: IndexSet>, } impl fmt::Debug for SyncedConnections { @@ -106,7 +106,7 @@ impl BlockChain { /// A collection of web3 connections. Sends requests either the current best server or all servers. #[derive(From)] pub struct Web3Connections { - inner: Vec>, + conns: IndexMap>, synced_connections: ArcSwap, pending_transactions: Arc>, // TODO: i think chain is what we want, but i'm not sure how we'll use it yet @@ -120,11 +120,10 @@ impl Serialize for Web3Connections { where S: Serializer, { - let inner: Vec<&Web3Connection> = self.inner.iter().map(|x| x.as_ref()).collect(); + let conns: Vec<&Web3Connection> = self.conns.iter().map(|x| x.1.as_ref()).collect(); - // 3 is the number of fields in the struct. let mut state = serializer.serialize_struct("Web3Connections", 2)?; - state.serialize_field("rpcs", &inner)?; + state.serialize_field("conns", &conns)?; state.serialize_field("synced_connections", &**self.synced_connections.load())?; state.end() } @@ -134,7 +133,7 @@ impl fmt::Debug for Web3Connections { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { // TODO: the default formatter takes forever to write. this is too quiet though f.debug_struct("Web3Connections") - .field("inner", &self.inner) + .field("conns", &self.conns) .finish_non_exhaustive() } } @@ -214,7 +213,7 @@ impl Web3Connections { }) .collect(); - let mut connections = vec![]; + let mut connections = IndexMap::new(); let mut handles = vec![]; // TODO: futures unordered? @@ -222,7 +221,7 @@ impl Web3Connections { // TODO: how should we handle errors here? one rpc being down shouldn't cause the program to exit match x { Ok(Ok((connection, handle))) => { - connections.push(connection); + connections.insert(connection.url().to_string(), connection); handles.push(handle); } Ok(Err(err)) => { @@ -243,7 +242,7 @@ impl Web3Connections { let synced_connections = SyncedConnections::default(); let connections = Arc::new(Self { - inner: connections, + conns: connections, synced_connections: ArcSwap::new(Arc::new(synced_connections)), pending_transactions, chain: Default::default(), @@ -516,14 +515,14 @@ impl Web3Connections { } pub fn has_synced_rpcs(&self) -> bool { - if self.synced_connections.load().inner.is_empty() { + if self.synced_connections.load().conns.is_empty() { return false; } self.get_head_block_num() > U64::zero() } pub fn num_synced_rpcs(&self) -> usize { - self.synced_connections.load().inner.len() + self.synced_connections.load().conns.len() } /// Send the same request to all the handles. Returning the most common success or most common error. @@ -592,187 +591,273 @@ impl Web3Connections { // TODO: use pending_tx_sender pending_tx_sender: Option>, ) -> anyhow::Result<()> { - let total_rpcs = self.inner.len(); + let total_rpcs = self.conns.len(); - let mut connection_states: HashMap, _> = - HashMap::with_capacity(total_rpcs); - - // keep a pending one so that we can delay publishing a new head block until multiple servers are synced - let mut pending_synced_connections = SyncedConnections::default(); + // TODO: rpc name instead of url (will do this with config reload revamp) + // TODO: indexmap or hashmap? what hasher? with_capacity? + let mut connection_heads = IndexMap::>>::new(); while let Ok((new_block, rpc)) = block_receiver.recv_async().await { - let new_block_num = match new_block.number { - Some(x) => x.as_u64(), - None => { - // block without a number is expected a node is syncing or - if new_block.hash.is_some() { - // this seems unlikely, but i'm pretty sure we see it - warn!(?new_block, "Block without number!"); - } - continue; - } + let new_block_hash = if let Some(hash) = new_block.hash { + hash + } else { + warn!(%rpc, ?new_block, "Block without hash!"); + + connection_heads.remove(rpc.url()); + + continue; + }; + + // TODO: dry this with the code above + let new_block_num = if let Some(num) = new_block.number { + num + } else { + // this seems unlikely, but i'm pretty sure we have seen it + // maybe when a node is syncing or reconnecting? + warn!(%rpc, ?new_block, "Block without number!"); + + connection_heads.remove(rpc.url()); + + continue; }; - let new_block_hash = new_block.hash.unwrap(); // TODO: span with more in it? // TODO: make sure i'm doing this span right // TODO: show the actual rpc url? - let span = info_span!("block_receiver", ?rpc, new_block_num); - // TODO: clippy lint to make sure we don't hold this across an awaited future - let _enter = span.enter(); + // TODO: what level? + // let _span = info_span!("block_receiver", %rpc, %new_block_num).entered(); - if new_block_num == 0 { - warn!("still syncing"); + if new_block_num == U64::zero() { + warn!(%rpc, %new_block_num, "still syncing"); + + connection_heads.remove(rpc.url()); + } else { + // TODO: no clone? we end up with different blocks for every rpc + connection_heads.insert(rpc.url().to_string(), new_block.clone()); + + self.chain.add_block(new_block.clone(), false); } - let mut new_head_block = false; + // iterate connection_heads to find the oldest block + let lowest_block_num = if let Some(lowest_block) = connection_heads + .values() + .min_by(|a, b| a.number.cmp(&b.number)) + { + lowest_block.number.unwrap() + } else { + continue; + }; - connection_states.insert(rpc.clone(), (new_block_num, new_block_hash)); + // iterate connection_heads to find the consensus block + let mut rpcs_by_num = IndexMap::>::new(); + let mut blocks_by_hash = IndexMap::>>::new(); + // block_hash => soft_limit, rpcs + // TODO: proper type for this? + let mut rpcs_by_hash = IndexMap::>::new(); + let mut total_soft_limit = 0; - // TODO: do something to update the synced blocks - match new_block_num.cmp(&pending_synced_connections.head_block_num) { - cmp::Ordering::Greater => { - // the rpc's newest block is the new overall best block - // TODO: if trace, do the full block hash? - // TODO: only accept this block if it is a child of the current head_block - info!("new head: {}", new_block_hash); + for (rpc_url, head_block) in connection_heads.iter() { + if let Some(rpc) = self.conns.get(rpc_url) { + // we need the total soft limit in order to know when its safe to update the backends + total_soft_limit += rpc.soft_limit(); - pending_synced_connections.inner.clear(); - pending_synced_connections.inner.insert(rpc); + let head_hash = head_block.hash.unwrap(); - pending_synced_connections.head_block_num = new_block_num; + // save the block + blocks_by_hash + .entry(head_hash) + .or_insert_with(|| head_block.clone()); - // TODO: if the parent hash isn't our previous best block, ignore it - pending_synced_connections.head_block_hash = new_block_hash; + // add the rpc to all relevant block heights + let mut block = head_block.clone(); + while block.number.unwrap() >= lowest_block_num { + let block_hash = block.hash.unwrap(); + let block_num = block.number.unwrap(); - // TODO: wait to send this until we publish + // save the rpcs and the sum of their soft limit by their head hash + let rpc_urls_by_hash = + rpcs_by_hash.entry(block_hash).or_insert_with(Vec::new); + + rpc_urls_by_hash.push(rpc_url); + + // save the rpcs by their number + let rpc_urls_by_num = rpcs_by_num.entry(block_num).or_insert_with(Vec::new); + + rpc_urls_by_num.push(rpc_url); + + if let Some(parent) = self.chain.get_block(&block.parent_hash) { + // save the parent block + blocks_by_hash.insert(block.parent_hash, parent.clone()); + + block = parent + } else { + // log this? eventually we will hit a block we don't have, so it's not an error + break; + } + } + } + } + + // TODO: configurable threshold. in test we have 4 servers so + // TODO: + // TODO: minimum total_soft_limit? without, when one server is in the loop + let min_soft_limit = total_soft_limit / 2; + + struct State<'a> { + block: &'a Arc>, + sum_soft_limit: u32, + conns: Vec<&'a str>, + } + + impl<'a> State<'a> { + // TODO: there are sortable traits, but this seems simpler + fn sortable_values(&self) -> (&U64, &u32, usize, &H256) { + let block_num = self.block.number.as_ref().unwrap(); + + let sum_soft_limit = &self.sum_soft_limit; + + let conns = self.conns.len(); + + let block_hash = self.block.hash.as_ref().unwrap(); + + (block_num, sum_soft_limit, conns, block_hash) + } + } + + trace!(?rpcs_by_hash); + + // TODO: i'm always getting None + if let Some(x) = rpcs_by_hash + .into_iter() + .filter_map(|(hash, conns)| { + // TODO: move this to `State::new` function on + let sum_soft_limit = conns + .iter() + .map(|rpc_url| { + if let Some(rpc) = self.conns.get(*rpc_url) { + rpc.soft_limit() + } else { + 0 + } + }) + .sum(); + + if sum_soft_limit < min_soft_limit { + trace!(?sum_soft_limit, ?min_soft_limit, "sum_soft_limit too low"); + None + } else { + let block = blocks_by_hash.get(&hash).unwrap(); + + Some(State { + block, + sum_soft_limit, + conns, + }) + } + }) + .max_by(|a, b| a.sortable_values().cmp(&b.sortable_values())) + { + let best_head_num = x.block.number.unwrap(); + let best_head_hash = x.block.hash.unwrap(); + let best_rpcs = x.conns; + + let synced_rpcs = rpcs_by_num.remove(&best_head_num).unwrap(); + + if best_rpcs.len() == synced_rpcs.len() { + trace!( + "{}/{}/{}/{} rpcs have {}", + best_rpcs.len(), + synced_rpcs.len(), + connection_heads.len(), + total_rpcs, + best_head_hash + ); + } else { + warn!( + "chain is forked! {} possible heads. {}/{}/{}/{} rpcs have {}", + "?", // TODO: how should we get this? + best_rpcs.len(), + synced_rpcs.len(), + connection_heads.len(), + total_rpcs, + best_head_hash + ); + } + + let num_best_rpcs = best_rpcs.len(); + + // TODOL: do this without clone? + let conns = best_rpcs + .into_iter() + .map(|x| self.conns.get(x).unwrap().clone()) + .collect(); + + let pending_synced_connections = SyncedConnections { + head_block_num: best_head_num.as_u64(), + head_block_hash: best_head_hash, + conns, + }; + + let current_head_block = self.get_head_block_hash(); + let new_head_block = + pending_synced_connections.head_block_hash != current_head_block; + + if new_head_block { + self.chain.add_block(new_block.clone(), true); + + info!( + "{}/{} rpcs at {} ({}). publishing new head!", + pending_synced_connections.conns.len(), + self.conns.len(), + pending_synced_connections.head_block_hash, + pending_synced_connections.head_block_num, + ); + // TODO: what if the hashes don't match? + if pending_synced_connections.head_block_hash == new_block_hash { + // mark all transactions in the block as confirmed + if pending_tx_sender.is_some() { + for tx_hash in &new_block.transactions { + // TODO: should we mark as confirmed via pending_tx_sender? + // TODO: possible deadlock here! + // trace!("removing {}...", tx_hash); + let _ = self.pending_transactions.remove(tx_hash); + // trace!("removed {}", tx_hash); + } + }; + + // TODO: mark any orphaned transactions as unconfirmed + } + } else { + // TODO: i'm seeing 4/4 print twice. maybe because of http providers? + // TODO: only do this log if there was a change + trace!( + "{}/{} rpcs at {} ({})", + pending_synced_connections.conns.len(), + self.conns.len(), + pending_synced_connections.head_block_hash, + pending_synced_connections.head_block_num, + ); + } + + // TODO: do this before or after processing all the transactions in this block? + // TODO: only swap if there is a change + trace!(?pending_synced_connections, "swapping"); + self.synced_connections + .swap(Arc::new(pending_synced_connections)); + + if new_head_block { + // TODO: this will need a refactor to only send once a minmum threshold has this block + // TODO: move this onto self.chain + // TODO: pending_synced_connections isn't published yet. which means fast queries using this block will fail head_block_sender .send(new_block.clone()) .context("head_block_sender")?; - - // TODO: mark all transactions as confirmed - // TODO: mark any orphaned transactions as unconfirmed - - // TODO: do not mark cannonical until a threshold of RPCs have this block! - new_head_block = true; - - self.chain.add_block(new_block.clone(), new_head_block); } - cmp::Ordering::Equal => { - if new_block_hash == pending_synced_connections.head_block_hash { - // this rpc has caught up with the best known head - // do not clear synced_connections. - // we just want to add this rpc to the end - // TODO: HashSet here? i think we get dupes if we don't - pending_synced_connections.inner.insert(rpc); - } else { - // same height, but different chain - - // check connection_states to see which head block is more popular! - // TODO: i don't think btreemap is what we want. i think we want indexmap or linkedhashmap - let mut rpc_ids_by_block = - IndexMap::>>::new(); - - let mut counted_rpcs = 0; - - for (rpc, (block_num, block_hash)) in connection_states.iter() { - if *block_num != new_block_num { - // this connection isn't synced. we don't care what hash it has - continue; - } - - counted_rpcs += 1; - - let count = rpc_ids_by_block - .entry(*block_hash) - .or_insert_with(|| Vec::with_capacity(total_rpcs - 1)); - - count.push(rpc.clone()); - } - - let most_common_head_hash = *rpc_ids_by_block - .iter() - .max_by(|a, b| a.1.len().cmp(&b.1.len())) - .map(|(k, _v)| k) - .unwrap(); - - let synced_rpcs = rpc_ids_by_block.remove(&most_common_head_hash).unwrap(); - - warn!( - "chain is forked! {} possible heads. {}/{}/{} rpcs have {}", - rpc_ids_by_block.len() + 1, - synced_rpcs.len(), - counted_rpcs, - total_rpcs, - most_common_head_hash - ); - - self.chain - .add_block(new_block.clone(), new_block_hash == most_common_head_hash); - - // TODO: do this more efficiently? - if pending_synced_connections.head_block_hash != most_common_head_hash { - pending_synced_connections.head_block_hash = most_common_head_hash; - } - - pending_synced_connections.inner = synced_rpcs.into_iter().collect(); - } - } - cmp::Ordering::Less => { - // this isn't the best block in the tier. don't do anything - if !pending_synced_connections.inner.remove(&rpc) { - // we didn't remove anything. nothing more to do - continue; - } - - // TODO: insert the hash if it isn't known? - - // we removed. don't continue so that we update self.synced_connections - } - } - - // the synced connections have changed - - if pending_synced_connections.inner.len() == total_rpcs { - // TODO: more metrics - trace!("all head: {}", new_block_hash); } else { - trace!( - "rpcs at {}: {:?}", - pending_synced_connections.head_block_hash, - pending_synced_connections.inner - ); - } - - // TODO: what if the hashes don't match? - if pending_synced_connections.head_block_hash == new_block_hash { - // mark all transactions in the block as confirmed - if pending_tx_sender.is_some() { - for tx_hash in &new_block.transactions { - // TODO: should we mark as confirmed via pending_tx_sender? - // TODO: possible deadlock here! - // trace!("removing {}...", tx_hash); - let _ = self.pending_transactions.remove(tx_hash); - // trace!("removed {}", tx_hash); - } - }; - - // TODO: mark any orphaned transactions as unconfirmed - } - - // TODO: only publish if there are x (default 50%) nodes synced to this block? - // TODO: do this before or after processing all the transactions in this block? - self.synced_connections - .swap(Arc::new(pending_synced_connections.clone())); - - if new_head_block { - // TODO: this will need a refactor to only send once a minmum threshold has this block - // TODO: move this onto self.chain - // TODO: pending_synced_connections isn't published yet. which means fast queries using this block will fail - head_block_sender - .send(new_block.clone()) - .context("head_block_sender")?; + // TODO: this expected when we first start + // TODO: make sure self.synced_connections is empty + warn!("not enough rpcs in sync"); } } @@ -797,8 +882,8 @@ impl Web3Connections { if let Some(min_block_needed) = min_block_needed { // TODO: this includes ALL archive servers. but we only want them if they are on a somewhat recent block // TODO: maybe instead of "archive_needed" bool it should be the minimum height. then even delayed servers might be fine. will need to track all heights then - self.inner - .iter() + self.conns + .values() .filter(|x| x.has_block_data(min_block_needed)) .filter(|x| !skip.contains(x)) .cloned() @@ -806,7 +891,7 @@ impl Web3Connections { } else { self.synced_connections .load() - .inner + .conns .iter() .filter(|x| !skip.contains(x)) .cloned() @@ -823,7 +908,7 @@ impl Web3Connections { // TODO: get active requests and the soft limit out of redis? let active_requests = rpc.active_requests(); let soft_limit = rpc.soft_limit(); - let block_data_limit = rpc.get_block_data_limit(); + let block_data_limit = rpc.block_data_limit(); let utilization = active_requests as f32 / soft_limit as f32; @@ -870,7 +955,7 @@ impl Web3Connections { // TODO: with capacity? let mut selected_rpcs = vec![]; - for connection in self.inner.iter() { + for connection in self.conns.values() { if let Some(min_block_needed) = min_block_needed { if !connection.has_block_data(min_block_needed) { continue; @@ -905,7 +990,7 @@ impl Web3Connections { // TODO: maximum retries? loop { - if skip_rpcs.len() == self.inner.len() { + if skip_rpcs.len() == self.conns.len() { break; } match self diff --git a/web3-proxy/src/frontend/ws_proxy.rs b/web3-proxy/src/frontend/ws_proxy.rs index 11e420b7..16f66c58 100644 --- a/web3-proxy/src/frontend/ws_proxy.rs +++ b/web3-proxy/src/frontend/ws_proxy.rs @@ -37,6 +37,7 @@ async fn proxy_web3_socket(app: Arc, socket: WebSocket) { tokio::spawn(read_web3_socket(app, ws_rx, response_tx)); } +/// websockets support a few more methods than http clients async fn handle_socket_payload( app: Arc, payload: &str,