From 5719397466a2648e1695ece880715fe131d4cafd Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Fri, 26 Aug 2022 23:44:25 +0000 Subject: [PATCH] i think it works --- TODO.md | 20 +- web3_proxy/src/rpcs/blockchain.rs | 444 +++++++++------------- web3_proxy/src/rpcs/connection.rs | 4 +- web3_proxy/src/rpcs/connections.rs | 10 +- web3_proxy/src/rpcs/synced_connections.rs | 8 +- 5 files changed, 200 insertions(+), 286 deletions(-) diff --git a/TODO.md b/TODO.md index 653925f9..d54c2add 100644 --- a/TODO.md +++ b/TODO.md @@ -90,28 +90,28 @@ - whenever blocks were slow, we started checking as fast as possible - [x] create user script should allow setting requests per minute - [x] cache api keys that are not in the database -- [ ] improve consensus block selection. Our goal is to find the highest work chain with a block over a minimum threshold of sum_soft_limit. +- [x] improve consensus block selection. Our goal is to find the highest work chain with a block over a minimum threshold of sum_soft_limit. - [x] A new block arrives at a connection. - [x] It checks that it isn't the same that it already has (which is a problem with polling nodes) - [x] If its new to this node... - [x] if the block does not have total work, check our cache. otherwise, query the node - [x] save the block num and hash so that http polling doesn't send duplicates - [x] send the deduped block through a channel to be handled by the connections grouping. - - [ ] The connections group... + - [x] The connections group... - [x] input = rpc, new_block - - [ ] adds the block and rpc to it's internal BlockchainMap (this persists). + - [x] adds the block and rpc to it's internal maps - [x] connection_heads: HashMap - [x] block_map: DashMap> - [x] blockchain: DiGraphMap - - [ ] iterate the rpc_map to find the highest_work_block - - [ ] oldest_block_num = highest_work_block.number - 256 + - [x] iterate the rpc_map to find the highest_work_block + - [?] oldest_block_num = highest_work_block.number - 256 - think more about this. if we have to go back more than a couple blocks, we will serve very stale data - - [ ] while sum_soft_limit < min_sum_soft_limit: - - [ ] consensus_head_hash = block.parent_hash - - [ ] sum_soft_limit = ??? (something with iterating rpc_map, caches, and petgraph's all_simple_paths) + - [x] while sum_soft_limit < min_sum_soft_limit: + - [x] consensus_head_hash = block.parent_hash + - [x] sum_soft_limit = ??? (something with iterating rpc_map, caches, and petgraph's all_simple_paths) - if all_simple_paths returns no paths, warn about a chain split? - - [ ] error if this is too old? sucks to have downtime, but its the chain thats having problems - - [ ] now that we have a consensus head with enough soft limit, update SyncedConnections + - [x] now that we have a consensus head with enough soft limit (or an empty set), update SyncedConnections + - [x] send the block through new head_block_sender - [-] use siwe messages and signatures for sign up and login - [-] requests for "Get transactions receipts" are routed to the private_rpcs and not the balanced_rpcs. do this better. - [x] quick fix, send to balanced_rpcs for now. we will just live with errors on new transactions. diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index 7adb694f..72c759a3 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -2,15 +2,18 @@ use super::connection::Web3Connection; use super::connections::Web3Connections; use super::transactions::TxStatus; -use crate::{config::BlockAndRpc, jsonrpc::JsonRpcRequest}; +use crate::{ + config::BlockAndRpc, jsonrpc::JsonRpcRequest, rpcs::synced_connections::SyncedConnections, +}; +use dashmap::mapref::one::Ref; use derive_more::From; use ethers::prelude::{Block, TxHash, H256, U256, U64}; -use hashbrown::HashMap; -use petgraph::prelude::DiGraphMap; +use hashbrown::{HashMap, HashSet}; +use petgraph::algo::all_simple_paths; use serde_json::json; use std::sync::Arc; use tokio::sync::{broadcast, watch}; -use tracing::{debug, warn}; +use tracing::{debug, info, trace, warn}; #[derive(Default, From)] pub struct BlockId { @@ -25,97 +28,6 @@ pub(super) struct BlockMetadata<'a> { pub(super) sum_soft_limit: u32, } -/// TODO: do we need this? probably big refactor still to do -/// The RPCs grouped by number and hash. -#[derive(Default)] -struct BlockchainAndRpcs<'a> { - // TODO: fifomap? or just manually remove once we add too much - rpcs_by_num: HashMap>, - rpcs_by_hash: HashMap>, - blocks_by_hash: HashMap>>, - /// Node is the blockhash. - /// You can get the blocks from block_map on the Web3Connections - /// TODO: what should the edge weight be? difficulty? - blockchain: DiGraphMap, - total_soft_limit: u32, -} - -impl<'a> BlockchainAndRpcs<'a> { - /// group the RPCs by their current head block - pub async fn new( - // TODO: think more about this key. maybe it should be an Arc? - connection_heads: &'a HashMap>>, - web3_conns: &Web3Connections, - ) -> Option> { - let mut new = Self::default(); - - let lowest_block_num = if let Some(lowest_block) = connection_heads - .values() - .min_by(|a, b| a.number.cmp(&b.number)) - { - lowest_block - .number - .expect("all blocks here should have a number") - } else { - // if no lowest block number, then no servers are in sync - return None; - }; - - // TODO: what if lowest_block_num is far from the highest head block num? - - for (rpc_name, head_block) in connection_heads.iter() { - if let Some(rpc) = web3_conns.get(rpc_name) { - // we need the total soft limit in order to know when its safe to update the backends - new.total_soft_limit += rpc.soft_limit; - - let head_hash = head_block.hash.unwrap(); - - // save the block - new.blocks_by_hash - .entry(head_hash) - .or_insert_with(|| head_block.clone()); - - // add the rpc to all relevant block heights - // TODO: i feel like we should be able to do this with a graph - let mut block = head_block.clone(); - while block.number.unwrap() >= lowest_block_num { - let block_hash = block.hash.unwrap(); - let block_num = block.number.unwrap(); - - // save the rpc by the head hash - let rpc_urls_by_hash = - new.rpcs_by_hash.entry(block_hash).or_insert_with(Vec::new); - rpc_urls_by_hash.push(rpc_name); - - // save the rpc by the head number - let rpc_names_by_num = - new.rpcs_by_num.entry(block_num).or_insert_with(Vec::new); - rpc_names_by_num.push(rpc_name); - - if let Ok(parent) = web3_conns - .block(&block.parent_hash, Some(rpc.as_ref())) - .await - { - // save the parent block - new.blocks_by_hash.insert(block.parent_hash, parent.clone()); - - block = parent - } else { - // log this? eventually we will hit a block we don't have, so it's not an error - break; - } - } - } - } - - Some(new) - } - - fn consensus_head() { - todo!() - } -} - impl<'a> BlockMetadata<'a> { // TODO: there are sortable traits, but this seems simpler /// sort the blocks in descending height @@ -145,32 +57,38 @@ impl<'a> BlockMetadata<'a> { impl Web3Connections { /// adds a block to our map of the blockchain - pub fn add_block_to_chain(&self, block: Arc>) -> anyhow::Result<()> { + pub fn add_block_to_chain(&self, block: &Arc>) -> anyhow::Result<()> { let hash = block.hash.ok_or_else(|| anyhow::anyhow!("no block hash"))?; - if self.blockchain_map.read().contains_node(hash) { - // this block is already included + if self.block_map.contains_key(&hash) { + // this block is already included. no need to continue return Ok(()); } - // theres a small race having the read and then the write - let mut blockchain = self.blockchain_map.write(); + let mut blockchain = self.blockchain_graphmap.write(); + + // TODO: theres a small race between contains_key and insert + if let Some(overwritten) = self.block_map.insert(hash, block.clone()) { + // there was a race and another thread wrote this block + // no need to continue because that other thread would have written (or soon will) write the + return Ok(()); + } if blockchain.contains_node(hash) { - // this hash is already included. we must have hit that race condition + // this hash is already included. we must have hit another race condition // return now since this work was already done. return Ok(()); } // TODO: prettier log? or probably move the log somewhere else - debug!(%hash, "new block"); + trace!(%hash, "new block"); // TODO: prune block_map to only keep a configurable (256 on ETH?) number of blocks? blockchain.add_node(hash); // what should edge weight be? and should the nodes be the blocks instead? - // maybe the weight should be the height + // TODO: maybe the weight should be the block? // we store parent_hash -> hash because the block already stores the parent_hash blockchain.add_edge(block.parent_hash, hash, 0); @@ -209,7 +127,7 @@ impl Web3Connections { let block = Arc::new(block); - self.add_block_to_chain(block.clone())?; + self.add_block_to_chain(&block)?; Ok(block) } @@ -280,7 +198,7 @@ impl Web3Connections { let mut connection_heads = HashMap::new(); while let Ok((new_block, rpc)) = block_receiver.recv_async().await { - self.recv_block_from_rpc( + self.process_block_from_rpc( &mut connection_heads, new_block, rpc, @@ -298,8 +216,8 @@ impl Web3Connections { /// `connection_heads` is a mapping of rpc_names to head block hashes. /// self.blockchain_map is a mapping of hashes to the complete Block. - /// - async fn recv_block_from_rpc( + /// TODO: return something? + async fn process_block_from_rpc( &self, connection_heads: &mut HashMap, new_block: Arc>, @@ -317,7 +235,7 @@ impl Web3Connections { } else { connection_heads.insert(rpc.name.clone(), hash); - self.add_block_to_chain(new_block.clone())?; + self.add_block_to_chain(&new_block)?; } } _ => { @@ -329,166 +247,172 @@ impl Web3Connections { } } - let mut chain_and_rpcs = BlockchainAndRpcs::default(); + // iterate the rpc_map to find the highest_work_block + let mut checked_heads = HashSet::new(); + let mut highest_work_block: Option>>> = None; - // TODO: default_min_soft_limit? without, we start serving traffic at the start too quickly - // let min_soft_limit = total_soft_limit / 2; - let min_soft_limit = 1; - - let num_possible_heads = chain_and_rpcs.rpcs_by_hash.len(); - - // trace!(?rpcs_by_hash); - - let total_rpcs = self.conns.len(); - - /* - // TODO: this needs tests - if let Some(x) = rpcs_by_hash - .into_iter() - .filter_map(|(hash, conns)| { - // TODO: move this to `State::new` function on - let sum_soft_limit = conns - .iter() - .map(|rpc_url| { - if let Some(rpc) = self.conns.get(*rpc_url) { - rpc.soft_limit - } else { - 0 - } - }) - .sum(); - - if sum_soft_limit < min_soft_limit { - trace!(?sum_soft_limit, ?min_soft_limit, "sum_soft_limit too low"); - None - } else { - let block = blocks_by_hash.get(&hash).unwrap(); - - Some(BlockMetadata { - block, - sum_soft_limit, - conns, - }) - } - }) - // sort b to a for descending order. sort a to b for ascending order? maybe not "max_by" is smart - .max_by(|a, b| a.sortable_values().cmp(&b.sortable_values())) - { - let best_head_num = x.block.number.unwrap(); - let best_head_hash = x.block.hash.unwrap(); - let best_rpcs = x.conns; - - let synced_rpcs = rpcs_by_num.remove(&best_head_num).unwrap(); - - if best_rpcs.len() == synced_rpcs.len() { - trace!( - "{}/{}/{}/{} rpcs have {}", - best_rpcs.len(), - synced_rpcs.len(), - connection_heads.len(), - total_rpcs, - best_head_hash - ); - } else { - // TODO: this isn't necessarily a fork. this might just be an rpc being slow - // TODO: log all the heads? - warn!( - "chain is forked! {} possible heads. {}/{}/{}/{} rpcs have {}", - num_possible_heads, - best_rpcs.len(), - synced_rpcs.len(), - connection_heads.len(), - total_rpcs, - best_head_hash - ); + for (_rpc_name, rpc_head_hash) in connection_heads.iter() { + if checked_heads.contains(rpc_head_hash) { + continue; } - let num_best_rpcs = best_rpcs.len(); + checked_heads.insert(rpc_head_hash); - // TODOL: do this without clone? - let conns = best_rpcs - .into_iter() - .map(|x| self.conns.get(x).unwrap().clone()) - .collect(); + let rpc_head_block = self.block_map.get(rpc_head_hash).unwrap(); - let pending_synced_connections = SyncedConnections { - head_block_num: best_head_num, - head_block_hash: best_head_hash, - conns, - }; - - let current_head_block = self.head_block_hash(); - let new_head_block = pending_synced_connections.head_block_hash != current_head_block; - - if new_head_block { - self.add_block(new_block.clone(), true); - - debug!( - "{}/{} rpcs at {} ({}). head at {:?}", - pending_synced_connections.conns.len(), - self.conns.len(), - pending_synced_connections.head_block_num, - pending_synced_connections.head_block_hash, - pending_synced_connections - .conns - .iter() - .map(|x| format!("{}", x)) - .collect::>(), - ); - // TODO: what if the hashes don't match? - if Some(pending_synced_connections.head_block_hash) == new_block.hash { - // mark all transactions in the block as confirmed - if pending_tx_sender.is_some() { - for tx_hash in &new_block.transactions { - // TODO: should we mark as confirmed via pending_tx_sender? - // TODO: possible deadlock here! - // trace!("removing {}...", tx_hash); - let _ = self.pending_transactions.remove(tx_hash); - // trace!("removed {}", tx_hash); - } - }; - - // TODO: mark any orphaned transactions as unconfirmed - } - } else if num_best_rpcs == self.conns.len() { - trace!( - "all {} rpcs at {} ({})", - num_best_rpcs, - pending_synced_connections.head_block_num, - pending_synced_connections.head_block_hash, - ); - } else { - trace!( - ?pending_synced_connections, - "{}/{} rpcs at {} ({})", - num_best_rpcs, - self.conns.len(), - pending_synced_connections.head_block_num, - pending_synced_connections.head_block_hash, - ); + if highest_work_block.is_none() + || rpc_head_block.total_difficulty + > highest_work_block.as_ref().unwrap().total_difficulty + { + highest_work_block = Some(rpc_head_block); } - - // TODO: do this before or after processing all the transactions in this block? - // TODO: only swap if there is a change? - trace!(?pending_synced_connections, "swapping"); - self.synced_connections - .swap(Arc::new(pending_synced_connections)); - - if new_head_block { - // TODO: is new_head_block accurate? - // TODO: move this onto self.chain? - head_block_sender - .send(new_block.clone()) - .context("head_block_sender")?; - } - } else { - // TODO: is this expected when we first start? - // TODO: make sure self.synced_connections is empty - // TODO: return an error - warn!("not enough rpcs in sync"); } - */ + // clone to release the read lock + let highest_work_block = highest_work_block.map(|x| x.clone()); + + // TODO: default min_soft_limit? without, we start serving traffic at the start too quickly + // let min_sum_soft_limit = total_soft_limit / 2; + // TODO: this should be configurable + let min_sum_soft_limit = 1; + + let mut highest_work_block = match highest_work_block { + None => todo!("no servers are in sync"), + Some(highest_work_block) => highest_work_block, + }; + + // track names so we don't check the same node multiple times + let mut consensus_names: HashSet<&String> = HashSet::new(); + // track rpcs so we can build a new SyncedConnections + let mut consensus_rpcs: Vec<&Arc> = vec![]; + // a running total of the soft limits covered by the rpcs + let mut consensus_soft_limit = 0; + + // check the highest work block and its parents for a set of rpcs that can serve our request load + // TODO: loop for how many parent blocks? we don't want to serve blocks that are too far behind + let blockchain_guard = self.blockchain_graphmap.read(); + for _ in 0..3 { + let highest_work_hash = highest_work_block.hash.as_ref().unwrap(); + + for (rpc_name, rpc_head_hash) in connection_heads.iter() { + if consensus_names.contains(rpc_name) { + // this block is already included + continue; + } + + // TODO: does all_simple_paths make this check? + if rpc_head_hash == highest_work_hash { + if let Some(rpc) = self.conns.get(rpc_name) { + consensus_names.insert(rpc_name); + consensus_rpcs.push(rpc); + consensus_soft_limit += rpc.soft_limit; + } + continue; + } + + // TODO: cache all_simple_paths. there should be a high hit rate + // TODO: use an algo that saves scratch space? + // TODO: how slow is this? + let is_connected = all_simple_paths::, _>( + &*blockchain_guard, + *highest_work_hash, + *rpc_head_hash, + 0, + // TODO: what should this max be? probably configurable per chain + Some(10), + ) + .next() + .is_some(); + + if is_connected { + if let Some(rpc) = self.conns.get(rpc_name) { + consensus_rpcs.push(rpc); + consensus_soft_limit += rpc.soft_limit; + } + } + } + + if consensus_soft_limit >= min_sum_soft_limit { + // success! this block has enough nodes on it + break; + } + // else, we need to try the parent block + + trace!(%consensus_soft_limit, ?highest_work_hash, "avoiding thundering herd"); + + // // TODO: this automatically queries for parents, but need to rearrange lifetimes to make an await work here + // highest_work_block = self + // .block(&highest_work_block.parent_hash, Some(&rpc)) + // .await?; + // we give known stale data just because we don't have enough capacity to serve the latest. + // TODO: maybe we should delay serving requests if this happens. + // TODO: don't unwrap. break if none? + match self.block_map.get(&highest_work_block.parent_hash) { + None => { + warn!( + "ran out of parents to check. soft limit only {}/{}: {}%", + consensus_soft_limit, + min_sum_soft_limit, + consensus_soft_limit * 100 / min_sum_soft_limit + ); + break; + } + Some(parent_block) => { + highest_work_block = parent_block.clone(); + } + } + } + // unlock self.blockchain_graphmap + drop(blockchain_guard); + + let soft_limit_met = consensus_soft_limit >= min_sum_soft_limit; + let num_synced_rpcs = consensus_rpcs.len(); + // TODO: put this in config + let min_synced_rpcs = 2; + + let new_synced_connections = if soft_limit_met { + // we have a consensus large enough to serve traffic + let head_block_hash = highest_work_block.hash.unwrap(); + let head_block_num = highest_work_block.number.unwrap(); + + if num_synced_rpcs < min_synced_rpcs { + trace!(hash=%head_block_hash, num=?head_block_num, "not enough rpcs are synced to advance"); + + return Ok(()); + } else { + // TODO: wait until at least most of the rpcs have given their initial block? + // otherwise, if there is a syncing node that is fast, our first head block might not be good + // TODO: have a configurable "minimum rpcs" number that we can set + + // TODO: this logs too much. only log when the hash is first updated? + debug!(hash=%head_block_hash, num=%head_block_num, rpcs=%num_synced_rpcs, limit=%consensus_soft_limit, "consensus head"); + + // TODO: sort by weight and soft limit? do we need an IndexSet, or is a Vec fine? + let conns = consensus_rpcs.into_iter().cloned().collect(); + + SyncedConnections { + head_block_num, + head_block_hash, + conns, + } + } + } else { + // failure even after checking parent heads! + // not enough servers are in sync to server traffic + // TODO: at startup this is fine, but later its a problem + warn!("empty SyncedConnections"); + + SyncedConnections::default() + }; + + let old_synced_connections = Arc::new(new_synced_connections); + + if soft_limit_met && Some(old_synced_connections.head_block_hash) != highest_work_block.hash + { + // the head hash changed. forward to any subscribers + head_block_sender.send(highest_work_block)?; + } + Ok(()) } } diff --git a/web3_proxy/src/rpcs/connection.rs b/web3_proxy/src/rpcs/connection.rs index 48b05391..28d0adf6 100644 --- a/web3_proxy/src/rpcs/connection.rs +++ b/web3_proxy/src/rpcs/connection.rs @@ -1,6 +1,6 @@ +///! Rate-limited communication with a web3 provider. use super::blockchain::BlockId; use super::connections::BlockMap; -///! Rate-limited communication with a web3 provider. use super::provider::Web3Provider; use super::request::OpenRequestHandle; use super::request::OpenRequestResult; @@ -8,7 +8,6 @@ use crate::app::{flatten_handle, AnyhowJoinHandle}; use crate::config::BlockAndRpc; use anyhow::Context; use dashmap::mapref::entry::Entry; -use dashmap::DashMap; use ethers::prelude::{Block, Bytes, Middleware, ProviderError, TxHash, H256, U64}; use futures::future::try_join_all; use futures::StreamExt; @@ -23,7 +22,6 @@ use std::{cmp::Ordering, sync::Arc}; use tokio::sync::broadcast; use tokio::sync::RwLock as AsyncRwLock; use tokio::time::{interval, sleep, sleep_until, Duration, MissedTickBehavior}; -use tracing::debug; use tracing::{error, info, instrument, trace, warn}; /// An active connection to a Web3Rpc diff --git a/web3_proxy/src/rpcs/connections.rs b/web3_proxy/src/rpcs/connections.rs index 6cb9c0a6..bd1612ec 100644 --- a/web3_proxy/src/rpcs/connections.rs +++ b/web3_proxy/src/rpcs/connections.rs @@ -32,15 +32,11 @@ use tracing::{error, info, instrument, trace, warn}; pub type BlockMap = Arc>>>; -pub struct BlockchainAndHeads { - pub(super) graph: DiGraphMap>>, - pub(super) heads: HashMap, -} - /// A collection of web3 connections. Sends requests either the current best server or all servers. #[derive(From)] pub struct Web3Connections { pub(super) conns: HashMap>, + /// any requests will be forwarded to one (or more) of these connections pub(super) synced_connections: ArcSwap, pub(super) pending_transactions: Arc>, /// TODO: this map is going to grow forever unless we do some sort of pruning. maybe store pruned in redis? @@ -48,7 +44,7 @@ pub struct Web3Connections { pub(super) block_map: BlockMap, /// TODO: this map is going to grow forever unless we do some sort of pruning. maybe store pruned in redis? /// TODO: what should we use for edges? - pub(super) blockchain_map: RwLock>, + pub(super) blockchain_graphmap: RwLock>, } impl Web3Connections { @@ -172,7 +168,7 @@ impl Web3Connections { synced_connections: ArcSwap::new(Arc::new(synced_connections)), pending_transactions, block_map: Default::default(), - blockchain_map: Default::default(), + blockchain_graphmap: Default::default(), }); let handle = { diff --git a/web3_proxy/src/rpcs/synced_connections.rs b/web3_proxy/src/rpcs/synced_connections.rs index 311960c6..8e4a75f9 100644 --- a/web3_proxy/src/rpcs/synced_connections.rs +++ b/web3_proxy/src/rpcs/synced_connections.rs @@ -10,6 +10,7 @@ use std::sync::Arc; /// Serialize is so we can print it on our debug endpoint #[derive(Clone, Default, Serialize)] pub struct SyncedConnections { + // TODO: store Arc> instead? pub(super) head_block_num: U64, pub(super) head_block_hash: H256, // TODO: this should be able to serialize, but it isn't @@ -48,12 +49,7 @@ impl Web3Connections { } pub fn synced(&self) -> bool { - // TODO: require a minimum number of synced rpcs - // TODO: move this whole function to SyncedConnections - if self.synced_connections.load().conns.is_empty() { - return false; - } - self.head_block_num() > U64::zero() + !self.synced_connections.load().conns.is_empty() } pub fn num_synced_rpcs(&self) -> usize {