From 81254a24bedb1540a2e6bd050487a820c30699a9 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Fri, 26 Aug 2022 17:26:17 +0000 Subject: [PATCH] wip --- TODO.md | 22 ++ web3_proxy/src/app.rs | 7 +- web3_proxy/src/config.rs | 8 +- web3_proxy/src/frontend/mod.rs | 4 +- web3_proxy/src/rpcs/blockchain.rs | 320 ++++++++++++++++------------ web3_proxy/src/rpcs/connection.rs | 171 ++++++++++----- web3_proxy/src/rpcs/connections.rs | 64 ++++-- web3_proxy/src/rpcs/transactions.rs | 4 +- 8 files changed, 371 insertions(+), 229 deletions(-) diff --git a/TODO.md b/TODO.md index 6622ff6d..653925f9 100644 --- a/TODO.md +++ b/TODO.md @@ -90,6 +90,28 @@ - whenever blocks were slow, we started checking as fast as possible - [x] create user script should allow setting requests per minute - [x] cache api keys that are not in the database +- [ ] improve consensus block selection. Our goal is to find the highest work chain with a block over a minimum threshold of sum_soft_limit. + - [x] A new block arrives at a connection. + - [x] It checks that it isn't the same that it already has (which is a problem with polling nodes) + - [x] If its new to this node... + - [x] if the block does not have total work, check our cache. otherwise, query the node + - [x] save the block num and hash so that http polling doesn't send duplicates + - [x] send the deduped block through a channel to be handled by the connections grouping. + - [ ] The connections group... + - [x] input = rpc, new_block + - [ ] adds the block and rpc to it's internal BlockchainMap (this persists). + - [x] connection_heads: HashMap + - [x] block_map: DashMap> + - [x] blockchain: DiGraphMap + - [ ] iterate the rpc_map to find the highest_work_block + - [ ] oldest_block_num = highest_work_block.number - 256 + - think more about this. if we have to go back more than a couple blocks, we will serve very stale data + - [ ] while sum_soft_limit < min_sum_soft_limit: + - [ ] consensus_head_hash = block.parent_hash + - [ ] sum_soft_limit = ??? (something with iterating rpc_map, caches, and petgraph's all_simple_paths) + - if all_simple_paths returns no paths, warn about a chain split? + - [ ] error if this is too old? sucks to have downtime, but its the chain thats having problems + - [ ] now that we have a consensus head with enough soft limit, update SyncedConnections - [-] use siwe messages and signatures for sign up and login - [-] requests for "Get transactions receipts" are routed to the private_rpcs and not the balanced_rpcs. do this better. - [x] quick fix, send to balanced_rpcs for now. we will just live with errors on new transactions. diff --git a/web3_proxy/src/app.rs b/web3_proxy/src/app.rs index c61bc472..4b7b577c 100644 --- a/web3_proxy/src/app.rs +++ b/web3_proxy/src/app.rs @@ -6,7 +6,7 @@ use crate::jsonrpc::JsonRpcForwardedResponse; use crate::jsonrpc::JsonRpcForwardedResponseEnum; use crate::jsonrpc::JsonRpcRequest; use crate::jsonrpc::JsonRpcRequestEnum; -use crate::rpcs::connections::Web3Connections; +use crate::rpcs::connections::{BlockMap, Web3Connections}; use crate::rpcs::transactions::TxStatus; use crate::stats::AppStats; use anyhow::Context; @@ -245,11 +245,15 @@ impl Web3ProxyApp { // TODO: once a transaction is "Confirmed" we remove it from the map. this should prevent major memory leaks. // TODO: we should still have some sort of expiration or maximum size limit for the map + // this block map is shared between balanced_rpcs and private_rpcs. + let block_map = BlockMap::default(); + let (balanced_rpcs, balanced_handle) = Web3Connections::spawn( top_config.app.chain_id, balanced_rpcs, http_client.clone(), redis_pool.clone(), + block_map.clone(), Some(head_block_sender), Some(pending_tx_sender.clone()), pending_transactions.clone(), @@ -269,6 +273,7 @@ impl Web3ProxyApp { private_rpcs, http_client.clone(), redis_pool.clone(), + block_map, // subscribing to new heads here won't work well. if they are fast, they might be ahead of balanced_rpcs None, // TODO: subscribe to pending transactions on the private rpcs? they seem to have low rate limits diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index b22254d2..a21798f2 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -1,3 +1,6 @@ +use crate::app::AnyhowJoinHandle; +use crate::rpcs::connection::Web3Connection; +use crate::rpcs::connections::BlockMap; use argh::FromArgs; use derive_more::Constructor; use ethers::prelude::{Block, TxHash}; @@ -6,9 +9,6 @@ use serde::Deserialize; use std::sync::Arc; use tokio::sync::broadcast; -use crate::app::AnyhowJoinHandle; -use crate::rpcs::connection::Web3Connection; - pub type BlockAndRpc = (Arc>, Arc); #[derive(Debug, FromArgs)] @@ -85,6 +85,7 @@ impl Web3ConnectionConfig { chain_id: u64, http_client: Option, http_interval_sender: Option>>, + block_map: BlockMap, block_sender: Option>, tx_id_sender: Option)>>, ) -> anyhow::Result<(Arc, AnyhowJoinHandle<()>)> { @@ -107,6 +108,7 @@ impl Web3ConnectionConfig { http_interval_sender, hard_limit, self.soft_limit, + block_map, block_sender, tx_id_sender, true, diff --git a/web3_proxy/src/frontend/mod.rs b/web3_proxy/src/frontend/mod.rs index a4c0abed..007e5654 100644 --- a/web3_proxy/src/frontend/mod.rs +++ b/web3_proxy/src/frontend/mod.rs @@ -70,16 +70,14 @@ pub async fn serve(port: u16, proxy_app: Arc) -> anyhow::Result<() // TODO: allow only listening on localhost? top_config.app.host.parse()? let addr = SocketAddr::from(([0, 0, 0, 0], port)); info!("listening on port {}", port); - // TODO: into_make_service is enough if we always run behind a proxy. make into_make_service_with_connect_info optional? + // TODO: into_make_service is enough if we always run behind a proxy. make into_make_service_with_connect_info optional? /* It sequentially looks for an IP in: - x-forwarded-for header (de-facto standard) - x-real-ip header - forwarded header (new standard) - axum::extract::ConnectInfo (if not behind proxy) - - So we probably won't need into_make_service_with_connect_info, but it shouldn't hurt */ let service = app.into_make_service_with_connect_info::(); // let service = app.into_make_service(); diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index ec01ea63..7adb694f 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -1,21 +1,119 @@ ///! Keep track of the blockchain as seen by a Web3Connections. use super::connection::Web3Connection; use super::connections::Web3Connections; -use super::synced_connections::SyncedConnections; use super::transactions::TxStatus; -use crate::jsonrpc::JsonRpcRequest; -use anyhow::Context; +use crate::{config::BlockAndRpc, jsonrpc::JsonRpcRequest}; +use derive_more::From; use ethers::prelude::{Block, TxHash, H256, U256, U64}; -use indexmap::IndexMap; +use hashbrown::HashMap; +use petgraph::prelude::DiGraphMap; use serde_json::json; use std::sync::Arc; use tokio::sync::{broadcast, watch}; -use tracing::{debug, trace, warn}; +use tracing::{debug, warn}; +#[derive(Default, From)] +pub struct BlockId { + pub(super) hash: H256, + pub(super) num: U64, +} + +/// TODO: do we need this? probably big refactor still to do pub(super) struct BlockMetadata<'a> { pub(super) block: &'a Arc>, + pub(super) rpc_names: Vec<&'a str>, pub(super) sum_soft_limit: u32, - pub(super) conns: Vec<&'a str>, +} + +/// TODO: do we need this? probably big refactor still to do +/// The RPCs grouped by number and hash. +#[derive(Default)] +struct BlockchainAndRpcs<'a> { + // TODO: fifomap? or just manually remove once we add too much + rpcs_by_num: HashMap>, + rpcs_by_hash: HashMap>, + blocks_by_hash: HashMap>>, + /// Node is the blockhash. + /// You can get the blocks from block_map on the Web3Connections + /// TODO: what should the edge weight be? difficulty? + blockchain: DiGraphMap, + total_soft_limit: u32, +} + +impl<'a> BlockchainAndRpcs<'a> { + /// group the RPCs by their current head block + pub async fn new( + // TODO: think more about this key. maybe it should be an Arc? + connection_heads: &'a HashMap>>, + web3_conns: &Web3Connections, + ) -> Option> { + let mut new = Self::default(); + + let lowest_block_num = if let Some(lowest_block) = connection_heads + .values() + .min_by(|a, b| a.number.cmp(&b.number)) + { + lowest_block + .number + .expect("all blocks here should have a number") + } else { + // if no lowest block number, then no servers are in sync + return None; + }; + + // TODO: what if lowest_block_num is far from the highest head block num? + + for (rpc_name, head_block) in connection_heads.iter() { + if let Some(rpc) = web3_conns.get(rpc_name) { + // we need the total soft limit in order to know when its safe to update the backends + new.total_soft_limit += rpc.soft_limit; + + let head_hash = head_block.hash.unwrap(); + + // save the block + new.blocks_by_hash + .entry(head_hash) + .or_insert_with(|| head_block.clone()); + + // add the rpc to all relevant block heights + // TODO: i feel like we should be able to do this with a graph + let mut block = head_block.clone(); + while block.number.unwrap() >= lowest_block_num { + let block_hash = block.hash.unwrap(); + let block_num = block.number.unwrap(); + + // save the rpc by the head hash + let rpc_urls_by_hash = + new.rpcs_by_hash.entry(block_hash).or_insert_with(Vec::new); + rpc_urls_by_hash.push(rpc_name); + + // save the rpc by the head number + let rpc_names_by_num = + new.rpcs_by_num.entry(block_num).or_insert_with(Vec::new); + rpc_names_by_num.push(rpc_name); + + if let Ok(parent) = web3_conns + .block(&block.parent_hash, Some(rpc.as_ref())) + .await + { + // save the parent block + new.blocks_by_hash.insert(block.parent_hash, parent.clone()); + + block = parent + } else { + // log this? eventually we will hit a block we don't have, so it's not an error + break; + } + } + } + } + + Some(new) + } + + fn consensus_head() { + todo!() + } } impl<'a> BlockMetadata<'a> { @@ -46,35 +144,44 @@ impl<'a> BlockMetadata<'a> { } impl Web3Connections { - pub fn add_block(&self, block: Arc>, cannonical: bool) { - let hash = block.hash.unwrap(); + /// adds a block to our map of the blockchain + pub fn add_block_to_chain(&self, block: Arc>) -> anyhow::Result<()> { + let hash = block.hash.ok_or_else(|| anyhow::anyhow!("no block hash"))?; - if cannonical { - let num = block.number.unwrap(); - - let entry = self.chain_map.entry(num); - - let mut is_new = false; - - // TODO: this might be wrong. we might need to update parents, too - entry.or_insert_with(|| { - is_new = true; - block.clone() - }); - - // TODO: prune chain_map? - - if !is_new { - return; - } + if self.blockchain_map.read().contains_node(hash) { + // this block is already included + return Ok(()); } - // TODO: prune block_map? + // theres a small race having the read and then the write + let mut blockchain = self.blockchain_map.write(); - self.block_map.entry(hash).or_insert(block); + if blockchain.contains_node(hash) { + // this hash is already included. we must have hit that race condition + // return now since this work was already done. + return Ok(()); + } + + // TODO: prettier log? or probably move the log somewhere else + debug!(%hash, "new block"); + + // TODO: prune block_map to only keep a configurable (256 on ETH?) number of blocks? + + blockchain.add_node(hash); + + // what should edge weight be? and should the nodes be the blocks instead? + // maybe the weight should be the height + // we store parent_hash -> hash because the block already stores the parent_hash + blockchain.add_edge(block.parent_hash, hash, 0); + + Ok(()) } - pub async fn block(&self, hash: &H256) -> anyhow::Result>> { + pub async fn block( + &self, + hash: &H256, + rpc: Option<&Web3Connection>, + ) -> anyhow::Result>> { // first, try to get the hash from our cache if let Some(block) = self.block_map.get(hash) { return Ok(block.clone()); @@ -84,11 +191,17 @@ impl Web3Connections { // TODO: helper for method+params => JsonRpcRequest // TODO: get block with the transactions? + // TODO: does this id matter? let request = json!({ "id": "1", "method": "eth_getBlockByHash", "params": (hash, false) }); let request: JsonRpcRequest = serde_json::from_value(request)?; // TODO: if error, retry? - let response = self.try_send_best_upstream_server(request, None).await?; + let response = match rpc { + Some(rpc) => { + todo!("send request to this rpc") + } + None => self.try_send_best_upstream_server(request, None).await?, + }; let block = response.result.unwrap(); @@ -96,7 +209,7 @@ impl Web3Connections { let block = Arc::new(block); - self.add_block(block.clone(), false); + self.add_block_to_chain(block.clone())?; Ok(block) } @@ -112,6 +225,9 @@ impl Web3Connections { /// Get the heaviest chain's block from cache or backend rpc pub async fn cannonical_block(&self, num: &U64) -> anyhow::Result>> { + todo!(); + + /* // first, try to get the hash from our cache if let Some(block) = self.chain_map.get(num) { return Ok(block.clone()); @@ -149,19 +265,19 @@ impl Web3Connections { self.add_block(block.clone(), true); Ok(block) + */ } - // TODO: rename this? - pub(super) async fn update_synced_rpcs( + pub(super) async fn process_incoming_blocks( &self, - block_receiver: flume::Receiver<(Arc>, Arc)>, + block_receiver: flume::Receiver, // TODO: head_block_sender should be a broadcast_sender like pending_tx_sender head_block_sender: watch::Sender>>, pending_tx_sender: Option>, ) -> anyhow::Result<()> { // TODO: indexmap or hashmap? what hasher? with_capacity? - // TODO: this will grow unbounded. prune old heads automatically - let mut connection_heads = IndexMap::>>::new(); + // TODO: this will grow unbounded. prune old heads on this at the same time we prune the graph? + let mut connection_heads = HashMap::new(); while let Ok((new_block, rpc)) = block_receiver.recv_async().await { self.recv_block_from_rpc( @@ -180,127 +296,52 @@ impl Web3Connections { Ok(()) } - pub async fn recv_block_from_rpc( + /// `connection_heads` is a mapping of rpc_names to head block hashes. + /// self.blockchain_map is a mapping of hashes to the complete Block. + /// + async fn recv_block_from_rpc( &self, - connection_heads: &mut IndexMap>>, + connection_heads: &mut HashMap, new_block: Arc>, rpc: Arc, head_block_sender: &watch::Sender>>, pending_tx_sender: &Option>, ) -> anyhow::Result<()> { - let new_block_hash = if let Some(hash) = new_block.hash { - hash - } else { - // TODO: rpc name instead of url (will do this with config reload revamp) - connection_heads.remove(&rpc.name); + // add the block to connection_heads + match (new_block.hash, new_block.number) { + (Some(hash), Some(num)) => { + if num == U64::zero() { + debug!(%rpc, "still syncing"); - // TODO: return here is wrong. synced rpcs needs an update - return Ok(()); - }; + connection_heads.remove(&rpc.name); + } else { + connection_heads.insert(rpc.name.clone(), hash); - // TODO: dry this with the code above - let new_block_num = if let Some(num) = new_block.number { - num - } else { - // this seems unlikely, but i'm pretty sure we have seen it - // maybe when a node is syncing or reconnecting? - warn!(%rpc, ?new_block, "Block without number!"); - - // TODO: rpc name instead of url (will do this with config reload revamp) - connection_heads.remove(&rpc.name); - - // TODO: return here is wrong. synced rpcs needs an update - return Ok(()); - }; - - // TODO: span with more in it? - // TODO: make sure i'm doing this span right - // TODO: show the actual rpc url? - // TODO: clippy lint to make sure we don't hold this across an awaited future - // TODO: what level? - // let _span = info_span!("block_receiver", %rpc, %new_block_num).entered(); - - if new_block_num == U64::zero() { - warn!(%rpc, %new_block_num, "still syncing"); - - connection_heads.remove(&rpc.name); - } else { - connection_heads.insert(rpc.name.clone(), new_block.clone()); - - self.add_block(new_block.clone(), false); - } - - // iterate connection_heads to find the oldest block - let lowest_block_num = if let Some(lowest_block) = connection_heads - .values() - .min_by(|a, b| a.number.cmp(&b.number)) - { - lowest_block - .number - .expect("all blocks here should have a number") - } else { - // TODO: return here is wrong. synced rpcs needs an update - return Ok(()); - }; - - // iterate connection_heads to find the consensus block - let mut rpcs_by_num = IndexMap::>::new(); - let mut blocks_by_hash = IndexMap::>>::new(); - // block_hash => soft_limit, rpcs - // TODO: proper type for this? - let mut rpcs_by_hash = IndexMap::>::new(); - let mut total_soft_limit = 0; - - for (rpc_url, head_block) in connection_heads.iter() { - if let Some(rpc) = self.conns.get(rpc_url) { - // we need the total soft limit in order to know when its safe to update the backends - total_soft_limit += rpc.soft_limit; - - let head_hash = head_block.hash.unwrap(); - - // save the block - blocks_by_hash - .entry(head_hash) - .or_insert_with(|| head_block.clone()); - - // add the rpc to all relevant block heights - let mut block = head_block.clone(); - while block.number.unwrap() >= lowest_block_num { - let block_hash = block.hash.unwrap(); - let block_num = block.number.unwrap(); - - // save the rpcs and the sum of their soft limit by their head hash - let rpc_urls_by_hash = rpcs_by_hash.entry(block_hash).or_insert_with(Vec::new); - - rpc_urls_by_hash.push(rpc_url); - - // save the rpcs by their number - let rpc_urls_by_num = rpcs_by_num.entry(block_num).or_insert_with(Vec::new); - - rpc_urls_by_num.push(rpc_url); - - if let Ok(parent) = self.block(&block.parent_hash).await { - // save the parent block - blocks_by_hash.insert(block.parent_hash, parent.clone()); - - block = parent - } else { - // log this? eventually we will hit a block we don't have, so it's not an error - break; - } + self.add_block_to_chain(new_block.clone())?; } } + _ => { + warn!(%rpc, ?new_block, "Block without number or hash!"); + + connection_heads.remove(&rpc.name); + + // don't return yet! self.synced_connections likely needs an update + } } + let mut chain_and_rpcs = BlockchainAndRpcs::default(); + // TODO: default_min_soft_limit? without, we start serving traffic at the start too quickly // let min_soft_limit = total_soft_limit / 2; let min_soft_limit = 1; - let num_possible_heads = rpcs_by_hash.len(); - trace!(?rpcs_by_hash); + let num_possible_heads = chain_and_rpcs.rpcs_by_hash.len(); + + // trace!(?rpcs_by_hash); let total_rpcs = self.conns.len(); + /* // TODO: this needs tests if let Some(x) = rpcs_by_hash .into_iter() @@ -395,7 +436,7 @@ impl Web3Connections { .collect::>(), ); // TODO: what if the hashes don't match? - if pending_synced_connections.head_block_hash == new_block_hash { + if Some(pending_synced_connections.head_block_hash) == new_block.hash { // mark all transactions in the block as confirmed if pending_tx_sender.is_some() { for tx_hash in &new_block.transactions { @@ -447,6 +488,7 @@ impl Web3Connections { warn!("not enough rpcs in sync"); } + */ Ok(()) } } diff --git a/web3_proxy/src/rpcs/connection.rs b/web3_proxy/src/rpcs/connection.rs index a20e270c..48b05391 100644 --- a/web3_proxy/src/rpcs/connection.rs +++ b/web3_proxy/src/rpcs/connection.rs @@ -1,3 +1,5 @@ +use super::blockchain::BlockId; +use super::connections::BlockMap; ///! Rate-limited communication with a web3 provider. use super::provider::Web3Provider; use super::request::OpenRequestHandle; @@ -5,6 +7,8 @@ use super::request::OpenRequestResult; use crate::app::{flatten_handle, AnyhowJoinHandle}; use crate::config::BlockAndRpc; use anyhow::Context; +use dashmap::mapref::entry::Entry; +use dashmap::DashMap; use ethers::prelude::{Block, Bytes, Middleware, ProviderError, TxHash, H256, U64}; use futures::future::try_join_all; use futures::StreamExt; @@ -19,13 +23,14 @@ use std::{cmp::Ordering, sync::Arc}; use tokio::sync::broadcast; use tokio::sync::RwLock as AsyncRwLock; use tokio::time::{interval, sleep, sleep_until, Duration, MissedTickBehavior}; +use tracing::debug; use tracing::{error, info, instrument, trace, warn}; /// An active connection to a Web3Rpc pub struct Web3Connection { pub name: String, /// TODO: can we get this from the provider? do we even need it? - pub url: String, + url: String, /// keep track of currently open requests. We sort on this pub(super) active_requests: AtomicU32, /// provider is in a RwLock so that we can replace it if re-connecting @@ -34,10 +39,12 @@ pub struct Web3Connection { /// rate limits are stored in a central redis so that multiple proxies can share their rate limits hard_limit: Option, /// used for load balancing to the least loaded server - pub soft_limit: u32, + pub(super) soft_limit: u32, block_data_limit: AtomicU64, - pub weight: u32, - head_block: RwLock<(H256, U64)>, + /// Lower weight are higher priority when sending requests + pub(super) weight: u32, + // TODO: async lock? + pub(super) head_block: RwLock, } impl Web3Connection { @@ -56,6 +63,7 @@ impl Web3Connection { hard_limit: Option<(u64, RedisPool)>, // TODO: think more about this type soft_limit: u32, + block_map: BlockMap, block_sender: Option>, tx_id_sender: Option)>>, reconnect: bool, @@ -83,7 +91,7 @@ impl Web3Connection { hard_limit, soft_limit, block_data_limit: Default::default(), - head_block: RwLock::new((H256::zero(), 0isize.into())), + head_block: RwLock::new(Default::default()), weight, }; @@ -125,7 +133,13 @@ impl Web3Connection { let new_connection = new_connection.clone(); tokio::spawn(async move { new_connection - .subscribe(http_interval_sender, block_sender, tx_id_sender, reconnect) + .subscribe( + http_interval_sender, + block_map, + block_sender, + tx_id_sender, + reconnect, + ) .await }) }; @@ -143,7 +157,7 @@ impl Web3Connection { sleep(Duration::from_millis(250)).await; for block_data_limit in [u64::MAX, 90_000, 128, 64, 32] { - let mut head_block_num = new_connection.head_block.read().1; + let mut head_block_num = new_connection.head_block.read().num; // TODO: wait until head block is set outside the loop? if we disconnect while starting we could actually get 0 though while head_block_num == U64::zero() { @@ -152,7 +166,7 @@ impl Web3Connection { // TODO: subscribe to a channel instead of polling? subscribe to http_interval_sender? sleep(Duration::from_secs(1)).await; - head_block_num = new_connection.head_block.read().1; + head_block_num = new_connection.head_block.read().num; } // TODO: subtract 1 from block_data_limit for safety? @@ -197,7 +211,7 @@ impl Web3Connection { pub fn has_block_data(&self, needed_block_num: &U64) -> bool { let block_data_limit: U64 = self.block_data_limit(); - let newest_block_num = self.head_block.read().1; + let newest_block_num = self.head_block.read().num; let oldest_block_num = newest_block_num .saturating_sub(block_data_limit) @@ -249,36 +263,59 @@ impl Web3Connection { } #[instrument(skip_all)] - async fn send_block_result( - self: Arc, - block: Result, ProviderError>, + async fn send_head_block_result( + self: &Arc, + new_head_block: Result>, ProviderError>, block_sender: &flume::Sender, + block_map: BlockMap, ) -> anyhow::Result<()> { - match block { - Ok(block) => { - { - // TODO: is this true? Block::default probably doesn't - let hash = block.hash.expect("blocks here should always have hashes"); - let num = block - .number - .expect("blocks here should always have numbers"); + match new_head_block { + Ok(new_head_block) => { + // TODO: is unwrap_or_default ok? we might have an empty block + let new_hash = new_head_block.hash.unwrap_or_default(); + // if we already have this block saved, we don't need to store this copy + let new_head_block = match block_map.entry(new_hash) { + Entry::Occupied(x) => x.get().clone(), + Entry::Vacant(x) => { + // TODO: remove this once https://github.com/ledgerwatch/erigon/issues/5190 is closed + // TODO: include transactions? + let new_head_block = if new_head_block.total_difficulty.is_none() { + self.wait_for_request_handle() + .await? + .request("eth_getBlockByHash", (new_hash, false)) + .await? + } else { + new_head_block + }; + + x.insert(new_head_block).clone() + } + }; + + let new_num = new_head_block.number.unwrap_or_default(); + + // save the block so we don't send the same one multiple times + // also save so that archive checks can know how far back to query + { let mut head_block = self.head_block.write(); - *head_block = (hash, num); + head_block.hash = new_hash; + head_block.num = new_num; } block_sender - .send_async((Arc::new(block), self)) + .send_async((new_head_block, self.clone())) .await .context("block_sender")?; } Err(e) => { warn!("unable to get block from {}: {}", self, e); + // TODO: do something to rpc_chain? // send an empty block to take this server out of rotation block_sender - .send_async((Arc::new(Block::default()), self)) + .send_async((Arc::new(Block::default()), self.clone())) .await .context("block_sender")?; } @@ -290,6 +327,7 @@ impl Web3Connection { async fn subscribe( self: Arc, http_interval_sender: Option>>, + block_map: BlockMap, block_sender: Option>, tx_id_sender: Option)>>, reconnect: bool, @@ -300,9 +338,11 @@ impl Web3Connection { let mut futures = vec![]; if let Some(block_sender) = &block_sender { - let f = self - .clone() - .subscribe_new_heads(http_interval_receiver, block_sender.clone()); + let f = self.clone().subscribe_new_heads( + http_interval_receiver, + block_sender.clone(), + block_map.clone(), + ); futures.push(flatten_handle(tokio::spawn(f))); } @@ -356,8 +396,9 @@ impl Web3Connection { self: Arc, http_interval_receiver: Option>, block_sender: flume::Sender, + block_map: BlockMap, ) -> anyhow::Result<()> { - info!("watching {}", self); + info!(?self, "watching new_heads"); // TODO: is a RwLock of an Option the right thing here? if let Some(provider) = self.provider.read().await.clone() { @@ -371,53 +412,60 @@ impl Web3Connection { let mut last_hash = H256::zero(); loop { - match self.try_open_request().await { - Ok(OpenRequestResult::Handle(active_request_handle)) => { + // TODO: try, or wait_for? + match self.wait_for_request_handle().await { + Ok(active_request_handle) => { let block: Result, _> = active_request_handle .request("eth_getBlockByNumber", ("latest", false)) .await; - if let Ok(block) = block { - // don't send repeat blocks - let new_hash = - block.hash.expect("blocks here should always have hashes"); + match block { + Ok(block) => { + // don't send repeat blocks + let new_hash = block + .hash + .expect("blocks here should always have hashes"); - if new_hash != last_hash { - last_hash = new_hash; + if new_hash != last_hash { + // new hash! + last_hash = new_hash; - self.clone() - .send_block_result(Ok(block), &block_sender) + self.send_head_block_result( + Ok(Arc::new(block)), + &block_sender, + block_map.clone(), + ) .await?; + } + } + Err(err) => { + // we did not get a block back. something is up with the server. take it out of rotation + self.send_head_block_result( + Err(err), + &block_sender, + block_map.clone(), + ) + .await?; } - } else { - // we did not get a block back. something is up with the server. take it out of rotation - self.clone().send_block_result(block, &block_sender).await?; } } - Ok(OpenRequestResult::RetryAt(retry_at)) => { - warn!(?retry_at, "Rate limited on latest block from {}", self); - sleep_until(retry_at).await; - continue; - } - Ok(OpenRequestResult::None) => { - warn!("No handle for latest block from {}", self); - // TODO: what should we do? - } Err(err) => { warn!(?err, "Internal error on latest block from {}", self); // TODO: what should we do? sleep? extra time? } } - // wait for the interval + // wait for the next interval // TODO: if error or rate limit, increase interval? while let Err(err) = http_interval_receiver.recv().await { match err { broadcast::error::RecvError::Closed => { + // channel is closed! that's not good. bubble the error up return Err(err.into()); } broadcast::error::RecvError::Lagged(lagged) => { - // querying the block was delayed. this can happen if tokio is very busy. + // querying the block was delayed + // this can happen if tokio is very busy or waiting for requests limits took too long warn!(?err, ?self, "http interval lagging by {}!", lagged); } } @@ -434,18 +482,23 @@ impl Web3Connection { // query the block once since the subscription doesn't send the current block // there is a very small race condition here where the stream could send us a new block right now // all it does is print "new block" for the same block as current block - let block: Result, _> = self + let block: Result>, _> = self .wait_for_request_handle() .await? .request("eth_getBlockByNumber", ("latest", false)) - .await; + .await + .map(Arc::new); - self.clone().send_block_result(block, &block_sender).await?; + self.send_head_block_result(block, &block_sender, block_map.clone()) + .await?; while let Some(new_block) = stream.next().await { - self.clone() - .send_block_result(Ok(new_block), &block_sender) - .await?; + self.send_head_block_result( + Ok(Arc::new(new_block)), + &block_sender, + block_map.clone(), + ) + .await?; } warn!(?self, "subscription ended"); @@ -526,7 +579,7 @@ impl Web3Connection { // TODO: maximum wait time? i think timeouts in other parts of the code are probably best loop { - match self.try_open_request().await { + match self.try_request_handle().await { Ok(OpenRequestResult::Handle(handle)) => return Ok(handle), Ok(OpenRequestResult::RetryAt(retry_at)) => { // TODO: emit a stat? @@ -543,7 +596,7 @@ impl Web3Connection { } } - pub async fn try_open_request(self: &Arc) -> anyhow::Result { + pub async fn try_request_handle(self: &Arc) -> anyhow::Result { // check that we are connected if !self.has_provider().await { // TODO: emit a stat? diff --git a/web3_proxy/src/rpcs/connections.rs b/web3_proxy/src/rpcs/connections.rs index d9fc797f..6cb9c0a6 100644 --- a/web3_proxy/src/rpcs/connections.rs +++ b/web3_proxy/src/rpcs/connections.rs @@ -3,7 +3,7 @@ use super::connection::Web3Connection; use super::request::{OpenRequestHandle, OpenRequestResult}; use super::synced_connections::SyncedConnections; use crate::app::{flatten_handle, AnyhowJoinHandle}; -use crate::config::Web3ConnectionConfig; +use crate::config::{BlockAndRpc, Web3ConnectionConfig}; use crate::jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest}; use crate::rpcs::transactions::TxStatus; use arc_swap::ArcSwap; @@ -15,7 +15,8 @@ use futures::future::{join_all, try_join_all}; use futures::stream::FuturesUnordered; use futures::StreamExt; use hashbrown::HashMap; -use indexmap::IndexMap; +use parking_lot::RwLock; +use petgraph::graphmap::DiGraphMap; use serde::ser::{SerializeStruct, Serializer}; use serde::Serialize; use serde_json::value::RawValue; @@ -29,28 +30,36 @@ use tokio::time::{interval, sleep, sleep_until, MissedTickBehavior}; use tokio::time::{Duration, Instant}; use tracing::{error, info, instrument, trace, warn}; +pub type BlockMap = Arc>>>; + +pub struct BlockchainAndHeads { + pub(super) graph: DiGraphMap>>, + pub(super) heads: HashMap, +} + /// A collection of web3 connections. Sends requests either the current best server or all servers. #[derive(From)] pub struct Web3Connections { - pub(super) conns: IndexMap>, + pub(super) conns: HashMap>, pub(super) synced_connections: ArcSwap, pub(super) pending_transactions: Arc>, - /// only includes blocks on the main chain. /// TODO: this map is going to grow forever unless we do some sort of pruning. maybe store pruned in redis? - pub(super) chain_map: DashMap>>, /// all blocks, including orphans + pub(super) block_map: BlockMap, /// TODO: this map is going to grow forever unless we do some sort of pruning. maybe store pruned in redis? - pub(super) block_map: DashMap>>, - // TODO: petgraph? might help with pruning the maps + /// TODO: what should we use for edges? + pub(super) blockchain_map: RwLock>, } impl Web3Connections { /// Spawn durable connections to multiple Web3 providers. + #[allow(clippy::too_many_arguments)] pub async fn spawn( chain_id: u64, server_configs: HashMap, http_client: Option, redis_client_pool: Option, + block_map: BlockMap, head_block_sender: Option>>>, pending_tx_sender: Option>, pending_transactions: Arc>, @@ -110,6 +119,7 @@ impl Web3Connections { }; let pending_tx_id_sender = Some(pending_tx_id_sender.clone()); + let block_map = block_map.clone(); tokio::spawn(async move { server_config @@ -119,6 +129,7 @@ impl Web3Connections { chain_id, http_client, http_interval_sender, + block_map, block_sender, pending_tx_id_sender, ) @@ -128,7 +139,7 @@ impl Web3Connections { .collect(); // map of connection names to their connection - let mut connections = IndexMap::new(); + let mut connections = HashMap::new(); let mut handles = vec![]; // TODO: futures unordered? @@ -160,8 +171,8 @@ impl Web3Connections { conns: connections, synced_connections: ArcSwap::new(Arc::new(synced_connections)), pending_transactions, - chain_map: Default::default(), block_map: Default::default(), + blockchain_map: Default::default(), }); let handle = { @@ -183,13 +194,17 @@ impl Web3Connections { Ok((connections, handle)) } + pub fn get(&self, conn_name: &str) -> Option<&Arc> { + self.conns.get(conn_name) + } + /// subscribe to blocks and transactions from all the backend rpcs. /// blocks are processed by all the `Web3Connection`s and then sent to the `block_receiver` /// transaction ids from all the `Web3Connection`s are deduplicated and forwarded to `pending_tx_sender` async fn subscribe( self: Arc, pending_tx_id_receiver: flume::Receiver<(TxHash, Arc)>, - block_receiver: flume::Receiver<(Arc>, Arc)>, + block_receiver: flume::Receiver, head_block_sender: Option>>>, pending_tx_sender: Option>, ) -> anyhow::Result<()> { @@ -200,11 +215,11 @@ impl Web3Connections { // fetches new transactions from the notifying rpc // forwards new transacitons to pending_tx_receipt_sender if let Some(pending_tx_sender) = pending_tx_sender.clone() { - // TODO: do something with the handle so we can catch any errors let clone = self.clone(); let handle = task::spawn(async move { + // TODO: set up this future the same as the block funnel while let Ok((pending_tx_id, rpc)) = pending_tx_id_receiver.recv_async().await { - let f = clone.clone().funnel_transaction( + let f = clone.clone().process_incoming_tx_id( rpc, pending_tx_id, pending_tx_sender.clone(), @@ -223,10 +238,14 @@ impl Web3Connections { let connections = Arc::clone(&self); let pending_tx_sender = pending_tx_sender.clone(); let handle = task::Builder::default() - .name("update_synced_rpcs") + .name("process_incoming_blocks") .spawn(async move { connections - .update_synced_rpcs(block_receiver, head_block_sender, pending_tx_sender) + .process_incoming_blocks( + block_receiver, + head_block_sender, + pending_tx_sender, + ) .await }); @@ -235,11 +254,11 @@ impl Web3Connections { if futures.is_empty() { // no transaction or block subscriptions. - // todo!("every second, check that the provider is still connected");? let handle = task::Builder::default().name("noop").spawn(async move { loop { sleep(Duration::from_secs(600)).await; + // TODO: "every interval, check that the provider is still connected" } }); @@ -265,7 +284,7 @@ impl Web3Connections { // TODO: remove this box once i figure out how to do the options params: Option<&serde_json::Value>, ) -> Result, ProviderError> { - // TODO: if only 1 active_request_handles, do self.try_send_request + // TODO: if only 1 active_request_handles, do self.try_send_request? let responses = active_request_handles .into_iter() @@ -283,6 +302,8 @@ impl Web3Connections { let mut counts: Counter = Counter::new(); let mut any_ok = false; for response in responses { + // TODO: i think we need to do something smarter with provider error. we at least need to wrap it up as JSON + // TODO: emit stats errors? let s = format!("{:?}", response); if count_map.get(&s).is_none() { @@ -325,12 +346,10 @@ impl Web3Connections { // TODO: we are going to be checking "has_block_data" a lot now. i think we pretty much always have min_block_needed now that we override "latest" let mut synced_rpcs: Vec> = if let Some(min_block_needed) = min_block_needed { - // TODO: this includes ALL archive servers. but we only want them if they are on a somewhat recent block - // TODO: maybe instead of "archive_needed" bool it should be the minimum height. then even delayed servers might be fine. will need to track all heights then self.conns .values() - .filter(|x| x.has_block_data(min_block_needed)) .filter(|x| !skip.contains(x)) + .filter(|x| x.has_block_data(min_block_needed)) .cloned() .collect() } else { @@ -344,7 +363,8 @@ impl Web3Connections { }; if synced_rpcs.is_empty() { - // TODO: what should happen here? might be nicer to retry in a second + // TODO: what should happen here? automatic retry? + // TODO: more detailed error return Err(anyhow::anyhow!("not synced")); } @@ -375,7 +395,7 @@ impl Web3Connections { // now that the rpcs are sorted, try to get an active request handle for one of them for rpc in synced_rpcs.into_iter() { // increment our connection counter - match rpc.try_open_request().await { + match rpc.try_request_handle().await { Ok(OpenRequestResult::Handle(handle)) => { trace!("next server on {:?}: {:?}", self, rpc); return Ok(OpenRequestResult::Handle(handle)); @@ -420,7 +440,7 @@ impl Web3Connections { } // check rate limits and increment our connection counter - match connection.try_open_request().await { + match connection.try_request_handle().await { Ok(OpenRequestResult::RetryAt(retry_at)) => { // this rpc is not available. skip it earliest_retry_at = earliest_retry_at.min(Some(retry_at)); diff --git a/web3_proxy/src/rpcs/transactions.rs b/web3_proxy/src/rpcs/transactions.rs index 55b3b4c4..d62ac4be 100644 --- a/web3_proxy/src/rpcs/transactions.rs +++ b/web3_proxy/src/rpcs/transactions.rs @@ -24,7 +24,7 @@ impl Web3Connections { // TODO: there is a race here on geth. sometimes the rpc isn't yet ready to serve the transaction (even though they told us about it!) // TODO: yearn devs have had better luck with batching these, but i think that's likely just adding a delay itself // TODO: if one rpc fails, try another? - let tx: Transaction = match rpc.try_open_request().await { + let tx: Transaction = match rpc.try_request_handle().await { Ok(OpenRequestResult::Handle(handle)) => { handle .request("eth_getTransactionByHash", (pending_tx_id,)) @@ -55,7 +55,7 @@ impl Web3Connections { } /// dedupe transaction and send them to any listening clients - pub(super) async fn funnel_transaction( + pub(super) async fn process_incoming_tx_id( self: Arc, rpc: Arc, pending_tx_id: TxHash,