From 11c66636bb936fa17ac59259c55b67548b274401 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Sat, 27 Aug 2022 23:49:41 +0000 Subject: [PATCH] rewrite cannonical block --- TODO.md | 44 ++++++++------- web3_proxy/src/app.rs | 4 +- web3_proxy/src/config.rs | 4 +- web3_proxy/src/rpcs/blockchain.rs | 90 ++++++++++++++++++++---------- web3_proxy/src/rpcs/connection.rs | 10 ++-- web3_proxy/src/rpcs/connections.rs | 10 ++-- 6 files changed, 99 insertions(+), 63 deletions(-) diff --git a/TODO.md b/TODO.md index 7275c8ea..c25bbe10 100644 --- a/TODO.md +++ b/TODO.md @@ -91,27 +91,29 @@ - [x] create user script should allow setting requests per minute - [x] cache api keys that are not in the database - [x] improve consensus block selection. Our goal is to find the highest work chain with a block over a minimum threshold of sum_soft_limit. - - [x] A new block arrives at a connection. - - [x] It checks that it isn't the same that it already has (which is a problem with polling nodes) - - [x] If its new to this node... - - [x] if the block does not have total work, check our cache. otherwise, query the node - - [x] save the block num and hash so that http polling doesn't send duplicates - - [x] send the deduped block through a channel to be handled by the connections grouping. - - [x] The connections group... - - [x] input = rpc, new_block - - [x] adds the block and rpc to it's internal maps - - [x] connection_heads: HashMap - - [x] block_map: DashMap> - - [x] blockchain: DiGraphMap - - [x] iterate the rpc_map to find the highest_work_block - - [?] oldest_block_num = highest_work_block.number - 256 - - think more about this. if we have to go back more than a couple blocks, we will serve very stale data - - [x] while sum_soft_limit < min_sum_soft_limit: - - [x] consensus_head_hash = block.parent_hash - - [x] sum_soft_limit = ??? (something with iterating rpc_map, caches, and petgraph's all_simple_paths) - - if all_simple_paths returns no paths, warn about a chain split? - - [x] now that we have a consensus head with enough soft limit (or an empty set), update SyncedConnections - - [x] send the block through new head_block_sender + - [x] A new block arrives at a connection. + - [x] It checks that it isn't the same that it already has (which is a problem with polling nodes) + - [x] If its new to this node... + - [x] if the block does not have total work, check our cache. otherwise, query the node + - [x] save the block num and hash so that http polling doesn't send duplicates + - [x] send the deduped block through a channel to be handled by the connections grouping. + - [x] The connections group... + - [x] input = rpc, new_block + - [x] adds the block and rpc to it's internal maps + - [x] connection_heads: HashMap + - [x] block_map: DashMap> + - [x] block_num: DashMap + - [x] blockchain: DiGraphMap + - [x] iterate the rpc_map to find the highest_work_block + - [?] oldest_block_num = highest_work_block.number - 256 + - think more about this. if we have to go back more than a couple blocks, we will serve very stale data + - [x] while sum_soft_limit < min_sum_soft_limit: + - [x] consensus_head_hash = block.parent_hash + - [x] sum_soft_limit = ??? (something with iterating rpc_map, caches, and petgraph's all_simple_paths) + - if all_simple_paths returns no paths, warn about a chain split? + - [x] now that we have a consensus head with enough soft limit (or an empty set), update SyncedConnections + - [x] send the block through new head_block_sender + - [x] rewrite cannonical_block - [-] use siwe messages and signatures for sign up and login - [-] requests for "Get transactions receipts" are routed to the private_rpcs and not the balanced_rpcs. do this better. - [x] quick fix, send to balanced_rpcs for now. we will just live with errors on new transactions. diff --git a/web3_proxy/src/app.rs b/web3_proxy/src/app.rs index 715f57d1..2e36d5fc 100644 --- a/web3_proxy/src/app.rs +++ b/web3_proxy/src/app.rs @@ -6,7 +6,7 @@ use crate::jsonrpc::JsonRpcForwardedResponse; use crate::jsonrpc::JsonRpcForwardedResponseEnum; use crate::jsonrpc::JsonRpcRequest; use crate::jsonrpc::JsonRpcRequestEnum; -use crate::rpcs::connections::{BlockMap, Web3Connections}; +use crate::rpcs::connections::{BlockHashesMap, Web3Connections}; use crate::rpcs::transactions::TxStatus; use crate::stats::AppStats; use anyhow::Context; @@ -246,7 +246,7 @@ impl Web3ProxyApp { // TODO: we should still have some sort of expiration or maximum size limit for the map // this block map is shared between balanced_rpcs and private_rpcs. - let block_map = BlockMap::default(); + let block_map = BlockHashesMap::default(); let (balanced_rpcs, balanced_handle) = Web3Connections::spawn( top_config.app.chain_id, diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index 62e80fab..e4a4f7fb 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -1,6 +1,6 @@ use crate::app::AnyhowJoinHandle; use crate::rpcs::connection::Web3Connection; -use crate::rpcs::connections::BlockMap; +use crate::rpcs::connections::BlockHashesMap; use argh::FromArgs; use derive_more::Constructor; use ethers::prelude::{Block, TxHash}; @@ -105,7 +105,7 @@ impl Web3ConnectionConfig { chain_id: u64, http_client: Option, http_interval_sender: Option>>, - block_map: BlockMap, + block_map: BlockHashesMap, block_sender: Option>, tx_id_sender: Option)>>, ) -> anyhow::Result<(Arc, AnyhowJoinHandle<()>)> { diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index 138af61d..f9c59a39 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -5,7 +5,7 @@ use super::transactions::TxStatus; use crate::{ config::BlockAndRpc, jsonrpc::JsonRpcRequest, rpcs::synced_connections::SyncedConnections, }; -use dashmap::mapref::one::Ref; +use dashmap::mapref::{entry::Entry, one::Ref}; use derive_more::From; use ethers::prelude::{Block, TxHash, H256, U64}; use hashbrown::{HashMap, HashSet}; @@ -25,39 +25,52 @@ pub struct BlockId { impl Web3Connections { /// add a block to our map and it's hash to our graphmap of the blockchain pub fn save_block(&self, block: &Arc>) -> anyhow::Result<()> { - let hash = block.hash.ok_or_else(|| anyhow::anyhow!("no block hash"))?; + let block_hash = block.hash.ok_or_else(|| anyhow::anyhow!("no block hash"))?; + let block_num = block + .number + .ok_or_else(|| anyhow::anyhow!("no block num"))?; - if self.block_map.contains_key(&hash) { + if self.block_hashes.contains_key(&block_hash) { // this block is already included. no need to continue return Ok(()); } let mut blockchain = self.blockchain_graphmap.write(); - // TODO: theres a small race between contains_key and insert - if let Some(overwritten) = self.block_map.insert(hash, block.clone()) { - // there was a race and another thread wrote this block - // no need to continue because that other thread would have written (or soon will) write the - return Ok(()); - } - - if blockchain.contains_node(hash) { - // this hash is already included. we must have hit another race condition + if blockchain.contains_node(block_hash) { + // this hash is already included. we must have hit that race condition // return now since this work was already done. return Ok(()); } + // TODO: theres a small race between contains_key and insert + if let Some(_overwritten) = self.block_hashes.insert(block_hash, block.clone()) { + // there was a race and another thread wrote this block + // i don't think this will happen. the blockchain.conains_node above should be enough + // no need to continue because that other thread would have written (or soon will) write the + return Ok(()); + } + + match self.block_numbers.entry(block_num) { + Entry::Occupied(mut x) => { + x.get_mut().push(block_hash); + } + Entry::Vacant(x) => { + x.insert(vec![block_hash]); + } + } + // TODO: prettier log? or probably move the log somewhere else - trace!(%hash, "new block"); + trace!(%block_hash, "new block"); - // TODO: prune block_map to only keep a configurable (256 on ETH?) number of blocks? - - blockchain.add_node(hash); + blockchain.add_node(block_hash); // what should edge weight be? and should the nodes be the blocks instead? // TODO: maybe the weight should be the block? // we store parent_hash -> hash because the block already stores the parent_hash - blockchain.add_edge(block.parent_hash, hash, 0); + blockchain.add_edge(block.parent_hash, block_hash, 0); + + // TODO: prune block_numbers and block_map to only keep a configurable (256 on ETH?) number of blocks? Ok(()) } @@ -70,7 +83,7 @@ impl Web3Connections { rpc: Option<&Arc>, ) -> anyhow::Result>> { // first, try to get the hash from our cache - if let Some(block) = self.block_map.get(hash) { + if let Some(block) = self.block_hashes.get(hash) { return Ok(block.clone()); } @@ -121,12 +134,31 @@ impl Web3Connections { /// Get the heaviest chain's block from cache or backend rpc pub async fn cannonical_block(&self, num: &U64) -> anyhow::Result>> { - todo!(); + // we only have blocks by hash now + // maybe save them during save_block in a blocks_by_number DashMap>>> + // if theres multiple, use petgraph to find the one on the main chain (and remove the others if they have enough confirmations) - /* // first, try to get the hash from our cache - if let Some(block) = self.chain_map.get(num) { - return Ok(block.clone()); + if let Some(block_hash) = self.block_numbers.get(num) { + match block_hash.len() { + 0 => { + unimplemented!("block_numbers is broken") + } + 1 => { + let block_hash = block_hash.get(0).expect("length was checked"); + + let block = self + .block_hashes + .get(block_hash) + .expect("block_numbers gave us this hash"); + + return Ok(block.clone()); + } + _ => { + // TODO: maybe the vec should be sorted by total difficulty. + todo!("pick the block on the current consensus chain") + } + } } // block not in cache. we need to ask an rpc for it @@ -143,7 +175,6 @@ impl Web3Connections { } // TODO: helper for method+params => JsonRpcRequest - // TODO: get block with the transactions? let request = json!({ "jsonrpc": "2.0", "id": "1", "method": "eth_getBlockByNumber", "params": (num, false) }); let request: JsonRpcRequest = serde_json::from_value(request)?; @@ -158,10 +189,9 @@ impl Web3Connections { let block = Arc::new(block); - self.add_block(block.clone(), true); + self.save_block(&block)?; Ok(block) - */ } pub(super) async fn process_incoming_blocks( @@ -236,7 +266,7 @@ impl Web3Connections { checked_heads.insert(rpc_head_hash); - let rpc_head_block = self.block_map.get(rpc_head_hash).unwrap(); + let rpc_head_block = self.block_hashes.get(rpc_head_hash).unwrap(); if highest_work_block.is_none() || rpc_head_block.total_difficulty @@ -322,7 +352,7 @@ impl Web3Connections { // we give known stale data just because we don't have enough capacity to serve the latest. // TODO: maybe we should delay serving requests if this happens. // TODO: don't unwrap. break if none? - match self.block_map.get(&highest_work_block.parent_hash) { + match self.block_hashes.get(&highest_work_block.parent_hash) { None => { warn!( "ran out of parents to check. soft limit only {}/{}: {}%", @@ -385,8 +415,10 @@ impl Web3Connections { let old_head_hash = old_synced_connections.head_block_hash; - if Some(consensus_block_hash) != rpc_head_block.hash { - info!("non consensus block") + if rpc_head_block.hash.is_some() && Some(consensus_block_hash) != rpc_head_block.hash { + info!(new=%rpc_head_block.hash.unwrap(), new_num=?rpc_head_block.number.unwrap(), consensus=%consensus_block_hash, num=%consensus_block_num, %rpc, "non consensus head"); + // TODO: anything else to do? maybe warn if these blocks are very far apart or forked for an extended period of time + // TODO: if there is any non-consensus head log how many nodes are on it } if consensus_block_hash == old_head_hash { diff --git a/web3_proxy/src/rpcs/connection.rs b/web3_proxy/src/rpcs/connection.rs index 8dfac2c6..97c78e44 100644 --- a/web3_proxy/src/rpcs/connection.rs +++ b/web3_proxy/src/rpcs/connection.rs @@ -1,6 +1,6 @@ ///! Rate-limited communication with a web3 provider. use super::blockchain::BlockId; -use super::connections::BlockMap; +use super::connections::BlockHashesMap; use super::provider::Web3Provider; use super::request::OpenRequestHandle; use super::request::OpenRequestResult; @@ -61,7 +61,7 @@ impl Web3Connection { hard_limit: Option<(u64, RedisPool)>, // TODO: think more about this type soft_limit: u32, - block_map: BlockMap, + block_map: BlockHashesMap, block_sender: Option>, tx_id_sender: Option)>>, reconnect: bool, @@ -275,7 +275,7 @@ impl Web3Connection { self: &Arc, new_head_block: Result>, ProviderError>, block_sender: &flume::Sender, - block_map: BlockMap, + block_map: BlockHashesMap, ) -> anyhow::Result<()> { match new_head_block { Ok(new_head_block) => { @@ -335,7 +335,7 @@ impl Web3Connection { async fn subscribe( self: Arc, http_interval_sender: Option>>, - block_map: BlockMap, + block_map: BlockHashesMap, block_sender: Option>, tx_id_sender: Option)>>, reconnect: bool, @@ -404,7 +404,7 @@ impl Web3Connection { self: Arc, http_interval_receiver: Option>, block_sender: flume::Sender, - block_map: BlockMap, + block_map: BlockHashesMap, ) -> anyhow::Result<()> { info!(?self, "watching new_heads"); diff --git a/web3_proxy/src/rpcs/connections.rs b/web3_proxy/src/rpcs/connections.rs index 1dc65722..5162ccf3 100644 --- a/web3_proxy/src/rpcs/connections.rs +++ b/web3_proxy/src/rpcs/connections.rs @@ -30,7 +30,7 @@ use tokio::time::{interval, sleep, sleep_until, MissedTickBehavior}; use tokio::time::{Duration, Instant}; use tracing::{error, info, instrument, trace, warn}; -pub type BlockMap = Arc>>>; +pub type BlockHashesMap = Arc>>>; /// A collection of web3 connections. Sends requests either the current best server or all servers. #[derive(From)] @@ -41,7 +41,8 @@ pub struct Web3Connections { pub(super) pending_transactions: Arc>, /// TODO: this map is going to grow forever unless we do some sort of pruning. maybe store pruned in redis? /// all blocks, including orphans - pub(super) block_map: BlockMap, + pub(super) block_hashes: BlockHashesMap, + pub(super) block_numbers: DashMap>, /// TODO: this map is going to grow forever unless we do some sort of pruning. maybe store pruned in redis? /// TODO: what should we use for edges? pub(super) blockchain_graphmap: RwLock>, @@ -57,7 +58,7 @@ impl Web3Connections { server_configs: HashMap, http_client: Option, redis_client_pool: Option, - block_map: BlockMap, + block_map: BlockHashesMap, head_block_sender: Option>>>, min_sum_soft_limit: u32, min_synced_rpcs: u32, @@ -171,7 +172,8 @@ impl Web3Connections { conns: connections, synced_connections: ArcSwap::new(Arc::new(synced_connections)), pending_transactions, - block_map: Default::default(), + block_hashes: Default::default(), + block_numbers: Default::default(), blockchain_graphmap: Default::default(), min_sum_soft_limit, min_synced_rpcs,