From 4d357977e9875db1e876a704e91a648102f7cfc6 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Tue, 23 Aug 2022 23:56:47 +0000 Subject: [PATCH] split Web3Connections across multiple files --- web3_proxy/src/app.rs | 2 +- .../src/{block_helpers.rs => block_number.rs} | 1 + web3_proxy/src/lib.rs | 2 +- web3_proxy/src/rpcs/blockchain.rs | 423 ++++++++++++++ web3_proxy/src/rpcs/connection.rs | 20 +- web3_proxy/src/rpcs/connections.rs | 552 ++---------------- web3_proxy/src/rpcs/mod.rs | 2 + web3_proxy/src/rpcs/provider.rs | 9 + web3_proxy/src/rpcs/synced_connections.rs | 47 +- 9 files changed, 539 insertions(+), 519 deletions(-) rename web3_proxy/src/{block_helpers.rs => block_number.rs} (98%) create mode 100644 web3_proxy/src/rpcs/blockchain.rs create mode 100644 web3_proxy/src/rpcs/provider.rs diff --git a/web3_proxy/src/app.rs b/web3_proxy/src/app.rs index de5a5035..e8844be2 100644 --- a/web3_proxy/src/app.rs +++ b/web3_proxy/src/app.rs @@ -35,7 +35,7 @@ use tokio_stream::wrappers::{BroadcastStream, WatchStream}; use tracing::{info, info_span, instrument, trace, warn, Instrument}; use uuid::Uuid; -use crate::block_helpers::block_needed; +use crate::block_number::block_needed; use crate::config::{AppConfig, TopConfig}; use crate::jsonrpc::JsonRpcForwardedResponse; use crate::jsonrpc::JsonRpcForwardedResponseEnum; diff --git a/web3_proxy/src/block_helpers.rs b/web3_proxy/src/block_number.rs similarity index 98% rename from web3_proxy/src/block_helpers.rs rename to web3_proxy/src/block_number.rs index 89183671..846514e6 100644 --- a/web3_proxy/src/block_helpers.rs +++ b/web3_proxy/src/block_number.rs @@ -1,3 +1,4 @@ +//! Helper functions for turning ether's BlockNumber into numbers and updating incoming queries to match. use ethers::prelude::{BlockNumber, U64}; use tracing::warn; diff --git a/web3_proxy/src/lib.rs b/web3_proxy/src/lib.rs index 289bfa36..dbcbee5c 100644 --- a/web3_proxy/src/lib.rs +++ b/web3_proxy/src/lib.rs @@ -1,5 +1,5 @@ pub mod app; -pub mod block_helpers; +pub mod block_number; pub mod config; pub mod frontend; pub mod jsonrpc; diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs new file mode 100644 index 00000000..c59621f7 --- /dev/null +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -0,0 +1,423 @@ +///! Keep track of the blockchain as seen by a Web3Connections. +use super::SyncedConnections; +use super::Web3Connection; +use super::Web3Connections; +use crate::app::TxState; +use crate::jsonrpc::JsonRpcRequest; +use anyhow::Context; +use ethers::prelude::{Block, TxHash, H256, U256, U64}; +use indexmap::IndexMap; +use serde_json::json; +use std::sync::Arc; +use tokio::sync::{broadcast, watch}; +use tracing::{debug, trace, warn}; + +pub(super) struct BlockMetadata<'a> { + pub(super) block: &'a Arc>, + pub(super) sum_soft_limit: u32, + pub(super) conns: Vec<&'a str>, +} + +impl<'a> BlockMetadata<'a> { + // TODO: there are sortable traits, but this seems simpler + /// sort the blocks in descending height + pub fn sortable_values(&self) -> (&U64, &u32, &U256, &H256) { + // trace!(?self.block, ?self.conns); + + // first we care about the block number + let block_num = self.block.number.as_ref().unwrap(); + + // if block_num ties, the block with the highest total difficulty *should* be the winner + // TODO: sometimes i see a block with no total difficulty. websocket subscription doesn't get everything + // let total_difficulty = self.block.total_difficulty.as_ref().expect("wat"); + + // all the nodes should already be doing this fork priority logic themselves + // so, it should be safe to just look at whatever our node majority thinks and go with that + let sum_soft_limit = &self.sum_soft_limit; + + let difficulty = &self.block.difficulty; + + // if we are still tied (unlikely). this will definitely break the tie + // TODO: what does geth do? + let block_hash = self.block.hash.as_ref().unwrap(); + + (block_num, sum_soft_limit, difficulty, block_hash) + } +} + +impl Web3Connections { + pub fn add_block(&self, block: Arc>, cannonical: bool) { + let hash = block.hash.unwrap(); + + if cannonical { + let num = block.number.unwrap(); + + let entry = self.chain_map.entry(num); + + let mut is_new = false; + + // TODO: this might be wrong. we might need to update parents, too + entry.or_insert_with(|| { + is_new = true; + block.clone() + }); + + // TODO: prune chain_map? + + if !is_new { + return; + } + } + + // TODO: prune block_map? + + self.block_map.entry(hash).or_insert(block); + } + + pub async fn block(&self, hash: &H256) -> anyhow::Result>> { + // first, try to get the hash from our cache + if let Some(block) = self.block_map.get(hash) { + return Ok(block.clone()); + } + + // block not in cache. we need to ask an rpc for it + + // TODO: helper for method+params => JsonRpcRequest + // TODO: get block with the transactions? + let request = json!({ "id": "1", "method": "eth_getBlockByHash", "params": (hash, false) }); + let request: JsonRpcRequest = serde_json::from_value(request)?; + + // TODO: if error, retry? + let response = self.try_send_best_upstream_server(request, None).await?; + + let block = response.result.unwrap(); + + let block: Block = serde_json::from_str(block.get())?; + + let block = Arc::new(block); + + self.add_block(block.clone(), false); + + Ok(block) + } + + /// Convenience method to get the cannonical block at a given block height. + pub async fn block_hash(&self, num: &U64) -> anyhow::Result { + let block = self.cannonical_block(num).await?; + + let hash = block.hash.unwrap(); + + Ok(hash) + } + + /// Get the heaviest chain's block from cache or backend rpc + pub async fn cannonical_block(&self, num: &U64) -> anyhow::Result>> { + // first, try to get the hash from our cache + if let Some(block) = self.chain_map.get(num) { + return Ok(block.clone()); + } + + // block not in cache. we need to ask an rpc for it + // but before we do any queries, be sure the requested block num exists + let head_block_num = self.head_block_num(); + if num > &head_block_num { + // TODO: i'm seeing this a lot when using ethspam. i dont know why though. i thought we delayed publishing + // TODO: instead of error, maybe just sleep and try again? + return Err(anyhow::anyhow!( + "Head block is #{}, but #{} was requested", + head_block_num, + num + )); + } + + // TODO: helper for method+params => JsonRpcRequest + // TODO: get block with the transactions? + let request = json!({ "jsonrpc": "2.0", "id": "1", "method": "eth_getBlockByNumber", "params": (num, false) }); + let request: JsonRpcRequest = serde_json::from_value(request)?; + + // TODO: if error, retry? + let response = self + .try_send_best_upstream_server(request, Some(num)) + .await?; + + let block = response.result.unwrap(); + + let block: Block = serde_json::from_str(block.get())?; + + let block = Arc::new(block); + + self.add_block(block.clone(), true); + + Ok(block) + } + + pub async fn recv_block_from_rpc( + &self, + connection_heads: &mut IndexMap>>, + new_block: Arc>, + rpc: Arc, + head_block_sender: &watch::Sender>>, + pending_tx_sender: &Option>, + ) -> anyhow::Result<()> { + let new_block_hash = if let Some(hash) = new_block.hash { + hash + } else { + // TODO: rpc name instead of url (will do this with config reload revamp) + connection_heads.remove(&rpc.url); + + // TODO: return here is wrong. synced rpcs needs an update + return Ok(()); + }; + + // TODO: dry this with the code above + let new_block_num = if let Some(num) = new_block.number { + num + } else { + // this seems unlikely, but i'm pretty sure we have seen it + // maybe when a node is syncing or reconnecting? + warn!(%rpc, ?new_block, "Block without number!"); + + // TODO: rpc name instead of url (will do this with config reload revamp) + connection_heads.remove(&rpc.url); + + // TODO: return here is wrong. synced rpcs needs an update + return Ok(()); + }; + + // TODO: span with more in it? + // TODO: make sure i'm doing this span right + // TODO: show the actual rpc url? + // TODO: clippy lint to make sure we don't hold this across an awaited future + // TODO: what level? + // let _span = info_span!("block_receiver", %rpc, %new_block_num).entered(); + + if new_block_num == U64::zero() { + warn!(%rpc, %new_block_num, "still syncing"); + + connection_heads.remove(&rpc.url); + } else { + connection_heads.insert(rpc.url.clone(), new_block.clone()); + + self.add_block(new_block.clone(), false); + } + + // iterate connection_heads to find the oldest block + let lowest_block_num = if let Some(lowest_block) = connection_heads + .values() + .min_by(|a, b| a.number.cmp(&b.number)) + { + lowest_block + .number + .expect("all blocks here should have a number") + } else { + // TODO: return here is wrong. synced rpcs needs an update + return Ok(()); + }; + + // iterate connection_heads to find the consensus block + let mut rpcs_by_num = IndexMap::>::new(); + let mut blocks_by_hash = IndexMap::>>::new(); + // block_hash => soft_limit, rpcs + // TODO: proper type for this? + let mut rpcs_by_hash = IndexMap::>::new(); + let mut total_soft_limit = 0; + + for (rpc_url, head_block) in connection_heads.iter() { + if let Some(rpc) = self.conns.get(rpc_url) { + // we need the total soft limit in order to know when its safe to update the backends + total_soft_limit += rpc.soft_limit; + + let head_hash = head_block.hash.unwrap(); + + // save the block + blocks_by_hash + .entry(head_hash) + .or_insert_with(|| head_block.clone()); + + // add the rpc to all relevant block heights + let mut block = head_block.clone(); + while block.number.unwrap() >= lowest_block_num { + let block_hash = block.hash.unwrap(); + let block_num = block.number.unwrap(); + + // save the rpcs and the sum of their soft limit by their head hash + let rpc_urls_by_hash = rpcs_by_hash.entry(block_hash).or_insert_with(Vec::new); + + rpc_urls_by_hash.push(rpc_url); + + // save the rpcs by their number + let rpc_urls_by_num = rpcs_by_num.entry(block_num).or_insert_with(Vec::new); + + rpc_urls_by_num.push(rpc_url); + + if let Ok(parent) = self.block(&block.parent_hash).await { + // save the parent block + blocks_by_hash.insert(block.parent_hash, parent.clone()); + + block = parent + } else { + // log this? eventually we will hit a block we don't have, so it's not an error + break; + } + } + } + } + + // TODO: default_min_soft_limit? without, we start serving traffic at the start too quickly + // let min_soft_limit = total_soft_limit / 2; + let min_soft_limit = 1; + let num_possible_heads = rpcs_by_hash.len(); + + trace!(?rpcs_by_hash); + + let total_rpcs = self.conns.len(); + + // TODO: this needs tests + if let Some(x) = rpcs_by_hash + .into_iter() + .filter_map(|(hash, conns)| { + // TODO: move this to `State::new` function on + let sum_soft_limit = conns + .iter() + .map(|rpc_url| { + if let Some(rpc) = self.conns.get(*rpc_url) { + rpc.soft_limit + } else { + 0 + } + }) + .sum(); + + if sum_soft_limit < min_soft_limit { + trace!(?sum_soft_limit, ?min_soft_limit, "sum_soft_limit too low"); + None + } else { + let block = blocks_by_hash.get(&hash).unwrap(); + + Some(BlockMetadata { + block, + sum_soft_limit, + conns, + }) + } + }) + // sort b to a for descending order. sort a to b for ascending order? maybe not "max_by" is smart + .max_by(|a, b| a.sortable_values().cmp(&b.sortable_values())) + { + let best_head_num = x.block.number.unwrap(); + let best_head_hash = x.block.hash.unwrap(); + let best_rpcs = x.conns; + + let synced_rpcs = rpcs_by_num.remove(&best_head_num).unwrap(); + + if best_rpcs.len() == synced_rpcs.len() { + trace!( + "{}/{}/{}/{} rpcs have {}", + best_rpcs.len(), + synced_rpcs.len(), + connection_heads.len(), + total_rpcs, + best_head_hash + ); + } else { + // TODO: this isn't necessarily a fork. this might just be an rpc being slow + // TODO: log all the heads? + warn!( + "chain is forked! {} possible heads. {}/{}/{}/{} rpcs have {}", + num_possible_heads, + best_rpcs.len(), + synced_rpcs.len(), + connection_heads.len(), + total_rpcs, + best_head_hash + ); + } + + let num_best_rpcs = best_rpcs.len(); + + // TODOL: do this without clone? + let conns = best_rpcs + .into_iter() + .map(|x| self.conns.get(x).unwrap().clone()) + .collect(); + + let pending_synced_connections = SyncedConnections { + head_block_num: best_head_num, + head_block_hash: best_head_hash, + conns, + }; + + let current_head_block = self.head_block_hash(); + let new_head_block = pending_synced_connections.head_block_hash != current_head_block; + + if new_head_block { + self.add_block(new_block.clone(), true); + + debug!( + "{}/{} rpcs at {} ({}). head at {:?}", + pending_synced_connections.conns.len(), + self.conns.len(), + pending_synced_connections.head_block_num, + pending_synced_connections.head_block_hash, + pending_synced_connections + .conns + .iter() + .map(|x| format!("{}", x)) + .collect::>(), + ); + // TODO: what if the hashes don't match? + if pending_synced_connections.head_block_hash == new_block_hash { + // mark all transactions in the block as confirmed + if pending_tx_sender.is_some() { + for tx_hash in &new_block.transactions { + // TODO: should we mark as confirmed via pending_tx_sender? + // TODO: possible deadlock here! + // trace!("removing {}...", tx_hash); + let _ = self.pending_transactions.remove(tx_hash); + // trace!("removed {}", tx_hash); + } + }; + + // TODO: mark any orphaned transactions as unconfirmed + } + } else if num_best_rpcs == self.conns.len() { + trace!( + "all {} rpcs at {} ({})", + num_best_rpcs, + pending_synced_connections.head_block_num, + pending_synced_connections.head_block_hash, + ); + } else { + trace!( + ?pending_synced_connections, + "{}/{} rpcs at {} ({})", + num_best_rpcs, + self.conns.len(), + pending_synced_connections.head_block_num, + pending_synced_connections.head_block_hash, + ); + } + + // TODO: do this before or after processing all the transactions in this block? + // TODO: only swap if there is a change? + trace!(?pending_synced_connections, "swapping"); + self.synced_connections + .swap(Arc::new(pending_synced_connections)); + + if new_head_block { + // TODO: is new_head_block accurate? + // TODO: move this onto self.chain? + head_block_sender + .send(new_block.clone()) + .context("head_block_sender")?; + } + } else { + // TODO: is this expected when we first start? + // TODO: make sure self.synced_connections is empty + // TODO: return an error + warn!("not enough rpcs in sync"); + } + + Ok(()) + } +} diff --git a/web3_proxy/src/rpcs/connection.rs b/web3_proxy/src/rpcs/connection.rs index 64bbec6c..e6a41ef1 100644 --- a/web3_proxy/src/rpcs/connection.rs +++ b/web3_proxy/src/rpcs/connection.rs @@ -1,6 +1,8 @@ ///! Rate-limited communication with a web3 provider +use super::provider::Web3Provider; +use crate::app::{flatten_handle, AnyhowJoinHandle}; +use crate::config::BlockAndRpc; use anyhow::Context; -use derive_more::From; use ethers::prelude::{Block, Bytes, Middleware, ProviderError, TxHash, H256, U64}; use futures::future::try_join_all; use futures::StreamExt; @@ -17,9 +19,6 @@ use tokio::sync::RwLock as AsyncRwLock; use tokio::time::{interval, sleep, sleep_until, Duration, Instant, MissedTickBehavior}; use tracing::{error, info, info_span, instrument, trace, warn, Instrument}; -use crate::app::{flatten_handle, AnyhowJoinHandle}; -use crate::config::BlockAndRpc; - // TODO: rename this pub enum HandleResult { ActiveRequest(ActiveRequestHandle), @@ -27,13 +26,6 @@ pub enum HandleResult { None, } -/// TODO: instead of an enum, I tried to use Box, but hit -#[derive(From)] -pub enum Web3Provider { - Http(ethers::providers::Provider), - Ws(ethers::providers::Provider), -} - /// An active connection to a Web3Rpc pub struct Web3Connection { name: String, @@ -53,6 +45,9 @@ pub struct Web3Connection { head_block: RwLock<(H256, U64)>, } +/// Drop this once a connection completes +pub struct ActiveRequestHandle(Arc); + impl Web3Provider { #[instrument] async fn from_str(url_str: &str, http_client: Option) -> anyhow::Result { @@ -638,9 +633,6 @@ impl Hash for Web3Connection { } } -/// Drop this once a connection completes -pub struct ActiveRequestHandle(Arc); - impl ActiveRequestHandle { fn new(connection: Arc) -> Self { // TODO: attach a unique id to this? diff --git a/web3_proxy/src/rpcs/connections.rs b/web3_proxy/src/rpcs/connections.rs index 7f423941..4e3d3953 100644 --- a/web3_proxy/src/rpcs/connections.rs +++ b/web3_proxy/src/rpcs/connections.rs @@ -1,28 +1,24 @@ ///! Load balanced communication with a group of web3 providers -use anyhow::Context; -use arc_swap::ArcSwap; -use counter::Counter; -use dashmap::DashMap; -use derive_more::From; -use ethers::prelude::{Block, ProviderError, Transaction, TxHash, H256, U256, U64}; -use futures::future::{join_all, try_join_all}; -use futures::stream::FuturesUnordered; -use futures::StreamExt; -use hashbrown::HashMap; -use indexmap::{IndexMap, IndexSet}; -use std::cmp::Reverse; -// use parking_lot::RwLock; -// use petgraph::graphmap::DiGraphMap; use super::SyncedConnections; use super::{ActiveRequestHandle, HandleResult, Web3Connection}; use crate::app::{flatten_handle, AnyhowJoinHandle, TxState}; use crate::config::Web3ConnectionConfig; use crate::jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest}; +use arc_swap::ArcSwap; +use counter::Counter; +use dashmap::DashMap; +use derive_more::From; +use ethers::prelude::{Block, ProviderError, Transaction, TxHash, H256, U64}; +use futures::future::{join_all, try_join_all}; +use futures::stream::FuturesUnordered; +use futures::StreamExt; +use hashbrown::HashMap; +use indexmap::IndexMap; use serde::ser::{SerializeStruct, Serializer}; use serde::Serialize; -use serde_json::json; use serde_json::value::RawValue; use std::cmp; +use std::cmp::Reverse; use std::fmt; use std::sync::Arc; use tokio::sync::{broadcast, watch}; @@ -31,85 +27,23 @@ use tokio::time::{interval, sleep, sleep_until, MissedTickBehavior}; use tokio::time::{Duration, Instant}; use tracing::{debug, error, info, instrument, trace, warn}; -#[derive(Default)] -pub struct BlockChain { - /// only includes blocks on the main chain. - chain_map: DashMap>>, - /// all blocks, including orphans - block_map: DashMap>>, - // TODO: petgraph? -} - -impl BlockChain { - pub fn add_block(&self, block: Arc>, cannonical: bool) { - let hash = block.hash.unwrap(); - - if cannonical { - let num = block.number.unwrap(); - - let entry = self.chain_map.entry(num); - - let mut is_new = false; - - entry.or_insert_with(|| { - is_new = true; - block.clone() - }); - - if !is_new { - return; - } - } - - self.block_map.entry(hash).or_insert(block); - } - - pub fn cannonical_block(&self, num: &U64) -> Option>> { - self.chain_map.get(num).map(|x| x.clone()) - } - - pub fn block(&self, hash: &H256) -> Option>> { - self.block_map.get(hash).map(|x| x.clone()) - } -} - /// A collection of web3 connections. Sends requests either the current best server or all servers. #[derive(From)] pub struct Web3Connections { - conns: IndexMap>, - synced_connections: ArcSwap, - pending_transactions: Arc>, - // TODO: i think chain is what we want, but i'm not sure how we'll use it yet - // TODO: this graph is going to grow forever unless we do some sort of pruning. maybe store pruned in redis? - // chain: Arc>>>, - chain: BlockChain, -} - -impl Serialize for Web3Connections { - fn serialize(&self, serializer: S) -> Result - where - S: Serializer, - { - let conns: Vec<&Web3Connection> = self.conns.iter().map(|x| x.1.as_ref()).collect(); - - let mut state = serializer.serialize_struct("Web3Connections", 2)?; - state.serialize_field("conns", &conns)?; - state.serialize_field("synced_connections", &**self.synced_connections.load())?; - state.end() - } -} - -impl fmt::Debug for Web3Connections { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - // TODO: the default formatter takes forever to write. this is too quiet though - f.debug_struct("Web3Connections") - .field("conns", &self.conns) - .finish_non_exhaustive() - } + pub(super) conns: IndexMap>, + pub(super) synced_connections: ArcSwap, + pub(super) pending_transactions: Arc>, + /// only includes blocks on the main chain. + /// TODO: this map is going to grow forever unless we do some sort of pruning. maybe store pruned in redis? + pub(super) chain_map: DashMap>>, + /// all blocks, including orphans + /// TODO: this map is going to grow forever unless we do some sort of pruning. maybe store pruned in redis? + pub(super) block_map: DashMap>>, + // TODO: petgraph? might help with pruning the maps } impl Web3Connections { - // #[instrument(name = "spawn_Web3Connections", skip_all)] + /// Spawn durable connections to multiple Web3 providers. pub async fn spawn( chain_id: u64, server_configs: HashMap, @@ -223,7 +157,8 @@ impl Web3Connections { conns: connections, synced_connections: ArcSwap::new(Arc::new(synced_connections)), pending_transactions, - chain: Default::default(), + chain_map: Default::default(), + block_map: Default::default(), }); let handle = { @@ -409,113 +344,6 @@ impl Web3Connections { Ok(()) } - pub async fn block(&self, hash: &H256) -> anyhow::Result>> { - // first, try to get the hash from our cache - if let Some(block) = self.chain.block(hash) { - return Ok(block); - } - - // block not in cache. we need to ask an rpc for it - - // TODO: helper for method+params => JsonRpcRequest - // TODO: get block with the transactions? - let request = json!({ "id": "1", "method": "eth_getBlockByHash", "params": (hash, false) }); - let request: JsonRpcRequest = serde_json::from_value(request)?; - - // TODO: if error, retry? - let response = self.try_send_best_upstream_server(request, None).await?; - - let block = response.result.unwrap(); - - let block: Block = serde_json::from_str(block.get())?; - - let block = Arc::new(block); - - self.chain.add_block(block.clone(), false); - - Ok(block) - } - - /// Get the heaviest chain's block from cache or backend rpc - pub async fn cannonical_block(&self, num: &U64) -> anyhow::Result>> { - // first, try to get the hash from our cache - if let Some(block) = self.chain.cannonical_block(num) { - return Ok(block); - } - - // block not in cache. we need to ask an rpc for it - // but before we do any queries, be sure the requested block num exists - let head_block_num = self.head_block_num(); - if num > &head_block_num { - // TODO: i'm seeing this a lot when using ethspam. i dont know why though. i thought we delayed publishing - // TODO: instead of error, maybe just sleep and try again? - return Err(anyhow::anyhow!( - "Head block is #{}, but #{} was requested", - head_block_num, - num - )); - } - - // TODO: helper for method+params => JsonRpcRequest - // TODO: get block with the transactions? - let request = json!({ "jsonrpc": "2.0", "id": "1", "method": "eth_getBlockByNumber", "params": (num, false) }); - let request: JsonRpcRequest = serde_json::from_value(request)?; - - // TODO: if error, retry? - let response = self - .try_send_best_upstream_server(request, Some(num)) - .await?; - - let block = response.result.unwrap(); - - let block: Block = serde_json::from_str(block.get())?; - - let block = Arc::new(block); - - self.chain.add_block(block.clone(), true); - - Ok(block) - } - - /// Convenience method to get the cannonical block at a given block height. - pub async fn block_hash(&self, num: &U64) -> anyhow::Result { - let block = self.cannonical_block(num).await?; - - let hash = block.hash.unwrap(); - - Ok(hash) - } - - pub fn head_block(&self) -> (U64, H256) { - let synced_connections = self.synced_connections.load(); - - ( - synced_connections.head_block_num, - synced_connections.head_block_hash, - ) - } - - pub fn head_block_hash(&self) -> H256 { - self.synced_connections.load().head_block_hash - } - - pub fn head_block_num(&self) -> U64 { - self.synced_connections.load().head_block_num - } - - pub fn synced(&self) -> bool { - // TODO: require a minimum number of synced rpcs - // TODO: move this whole function to SyncedConnections - if self.synced_connections.load().conns.is_empty() { - return false; - } - self.head_block_num() > U64::zero() - } - - pub fn num_synced_rpcs(&self) -> usize { - self.synced_connections.load().conns.len() - } - /// Send the same request to all the handles. Returning the most common success or most common error. #[instrument(skip_all)] pub async fn try_send_parallel_requests( @@ -577,303 +405,23 @@ impl Web3Connections { async fn update_synced_rpcs( &self, block_receiver: flume::Receiver<(Arc>, Arc)>, + // TODO: head_block_sender should be a broadcast_sender like pending_tx_sender head_block_sender: watch::Sender>>, - // TODO: use pending_tx_sender pending_tx_sender: Option>, ) -> anyhow::Result<()> { - let total_rpcs = self.conns.len(); - - // TODO: rpc name instead of url (will do this with config reload revamp) // TODO: indexmap or hashmap? what hasher? with_capacity? // TODO: this will grow unbounded. prune old heads automatically let mut connection_heads = IndexMap::>>::new(); while let Ok((new_block, rpc)) = block_receiver.recv_async().await { - let new_block_hash = if let Some(hash) = new_block.hash { - hash - } else { - connection_heads.remove(&rpc.url); - - continue; - }; - - // TODO: dry this with the code above - let new_block_num = if let Some(num) = new_block.number { - num - } else { - // this seems unlikely, but i'm pretty sure we have seen it - // maybe when a node is syncing or reconnecting? - warn!(%rpc, ?new_block, "Block without number!"); - - connection_heads.remove(&rpc.url); - - continue; - }; - - // TODO: span with more in it? - // TODO: make sure i'm doing this span right - // TODO: show the actual rpc url? - // TODO: clippy lint to make sure we don't hold this across an awaited future - // TODO: what level? - // let _span = info_span!("block_receiver", %rpc, %new_block_num).entered(); - - if new_block_num == U64::zero() { - warn!(%rpc, %new_block_num, "still syncing"); - - connection_heads.remove(&rpc.url); - } else { - connection_heads.insert(rpc.url.clone(), new_block.clone()); - - self.chain.add_block(new_block.clone(), false); - } - - // iterate connection_heads to find the oldest block - let lowest_block_num = if let Some(lowest_block) = connection_heads - .values() - .min_by(|a, b| a.number.cmp(&b.number)) - { - lowest_block - .number - .expect("all blocks here should have a number") - } else { - continue; - }; - - // iterate connection_heads to find the consensus block - let mut rpcs_by_num = IndexMap::>::new(); - let mut blocks_by_hash = IndexMap::>>::new(); - // block_hash => soft_limit, rpcs - // TODO: proper type for this? - let mut rpcs_by_hash = IndexMap::>::new(); - let mut total_soft_limit = 0; - - for (rpc_url, head_block) in connection_heads.iter() { - if let Some(rpc) = self.conns.get(rpc_url) { - // we need the total soft limit in order to know when its safe to update the backends - total_soft_limit += rpc.soft_limit; - - let head_hash = head_block.hash.unwrap(); - - // save the block - blocks_by_hash - .entry(head_hash) - .or_insert_with(|| head_block.clone()); - - // add the rpc to all relevant block heights - let mut block = head_block.clone(); - while block.number.unwrap() >= lowest_block_num { - let block_hash = block.hash.unwrap(); - let block_num = block.number.unwrap(); - - // save the rpcs and the sum of their soft limit by their head hash - let rpc_urls_by_hash = - rpcs_by_hash.entry(block_hash).or_insert_with(Vec::new); - - rpc_urls_by_hash.push(rpc_url); - - // save the rpcs by their number - let rpc_urls_by_num = rpcs_by_num.entry(block_num).or_insert_with(Vec::new); - - rpc_urls_by_num.push(rpc_url); - - if let Some(parent) = self.chain.block(&block.parent_hash) { - // save the parent block - blocks_by_hash.insert(block.parent_hash, parent.clone()); - - block = parent - } else { - // log this? eventually we will hit a block we don't have, so it's not an error - break; - } - } - } - } - - // TODO: default_min_soft_limit? without, we start serving traffic at the start too quickly - // let min_soft_limit = total_soft_limit / 2; - let min_soft_limit = 1; - let num_possible_heads = rpcs_by_hash.len(); - - trace!(?rpcs_by_hash); - - struct State<'a> { - block: &'a Arc>, - sum_soft_limit: u32, - conns: Vec<&'a str>, - } - - impl<'a> State<'a> { - // TODO: there are sortable traits, but this seems simpler - /// sort the blocks in descending height - fn sortable_values(&self) -> (&U64, &u32, &U256, &H256) { - // trace!(?self.block, ?self.conns); - - // first we care about the block number - let block_num = self.block.number.as_ref().unwrap(); - - // if block_num ties, the block with the highest total difficulty *should* be the winner - // TODO: sometimes i see a block with no total difficulty. websocket subscription doesn't get everything - // let total_difficulty = self.block.total_difficulty.as_ref().expect("wat"); - - // all the nodes should already be doing this fork priority logic themselves - // so, it should be safe to just look at whatever our node majority thinks and go with that - let sum_soft_limit = &self.sum_soft_limit; - - let difficulty = &self.block.difficulty; - - // if we are still tied (unlikely). this will definitely break the tie - // TODO: what does geth do? - let block_hash = self.block.hash.as_ref().unwrap(); - - (block_num, sum_soft_limit, difficulty, block_hash) - } - } - - // TODO: this needs tests - if let Some(x) = rpcs_by_hash - .into_iter() - .filter_map(|(hash, conns)| { - // TODO: move this to `State::new` function on - let sum_soft_limit = conns - .iter() - .map(|rpc_url| { - if let Some(rpc) = self.conns.get(*rpc_url) { - rpc.soft_limit - } else { - 0 - } - }) - .sum(); - - if sum_soft_limit < min_soft_limit { - trace!(?sum_soft_limit, ?min_soft_limit, "sum_soft_limit too low"); - None - } else { - let block = blocks_by_hash.get(&hash).unwrap(); - - Some(State { - block, - sum_soft_limit, - conns, - }) - } - }) - // sort b to a for descending order. sort a to b for ascending order? maybe not "max_by" is smart - .max_by(|a, b| a.sortable_values().cmp(&b.sortable_values())) - { - let best_head_num = x.block.number.unwrap(); - let best_head_hash = x.block.hash.unwrap(); - let best_rpcs = x.conns; - - let synced_rpcs = rpcs_by_num.remove(&best_head_num).unwrap(); - - if best_rpcs.len() == synced_rpcs.len() { - trace!( - "{}/{}/{}/{} rpcs have {}", - best_rpcs.len(), - synced_rpcs.len(), - connection_heads.len(), - total_rpcs, - best_head_hash - ); - } else { - // TODO: this isn't necessarily a fork. this might just be an rpc being slow - // TODO: log all the heads? - warn!( - "chain is forked! {} possible heads. {}/{}/{}/{} rpcs have {}", - num_possible_heads, - best_rpcs.len(), - synced_rpcs.len(), - connection_heads.len(), - total_rpcs, - best_head_hash - ); - } - - let num_best_rpcs = best_rpcs.len(); - - // TODOL: do this without clone? - let conns = best_rpcs - .into_iter() - .map(|x| self.conns.get(x).unwrap().clone()) - .collect(); - - let pending_synced_connections = SyncedConnections { - head_block_num: best_head_num, - head_block_hash: best_head_hash, - conns, - }; - - let current_head_block = self.head_block_hash(); - let new_head_block = - pending_synced_connections.head_block_hash != current_head_block; - - if new_head_block { - self.chain.add_block(new_block.clone(), true); - - debug!( - "{}/{} rpcs at {} ({}). head at {:?}", - pending_synced_connections.conns.len(), - self.conns.len(), - pending_synced_connections.head_block_num, - pending_synced_connections.head_block_hash, - pending_synced_connections - .conns - .iter() - .map(|x| format!("{}", x)) - .collect::>(), - ); - // TODO: what if the hashes don't match? - if pending_synced_connections.head_block_hash == new_block_hash { - // mark all transactions in the block as confirmed - if pending_tx_sender.is_some() { - for tx_hash in &new_block.transactions { - // TODO: should we mark as confirmed via pending_tx_sender? - // TODO: possible deadlock here! - // trace!("removing {}...", tx_hash); - let _ = self.pending_transactions.remove(tx_hash); - // trace!("removed {}", tx_hash); - } - }; - - // TODO: mark any orphaned transactions as unconfirmed - } - } else if num_best_rpcs == self.conns.len() { - trace!( - "all {} rpcs at {} ({})", - num_best_rpcs, - pending_synced_connections.head_block_num, - pending_synced_connections.head_block_hash, - ); - } else { - trace!( - ?pending_synced_connections, - "{}/{} rpcs at {} ({})", - num_best_rpcs, - self.conns.len(), - pending_synced_connections.head_block_num, - pending_synced_connections.head_block_hash, - ); - } - - // TODO: do this before or after processing all the transactions in this block? - // TODO: only swap if there is a change? - trace!(?pending_synced_connections, "swapping"); - self.synced_connections - .swap(Arc::new(pending_synced_connections)); - - if new_head_block { - // TODO: is new_head_block accurate? - // TODO: move this onto self.chain? - head_block_sender - .send(new_block.clone()) - .context("head_block_sender")?; - } - } else { - // TODO: is this expected when we first start? - // TODO: make sure self.synced_connections is empty - warn!("not enough rpcs in sync"); - } + self.recv_block_from_rpc( + &mut connection_heads, + new_block, + rpc, + &head_block_sender, + &pending_tx_sender, + ) + .await?; } // TODO: if there was an error, we should return it @@ -1165,25 +713,25 @@ impl Web3Connections { } } -mod tests { - #[test] - fn test_false_before_true() { - let mut x = [true, false, true]; - - x.sort_unstable(); - - assert_eq!(x, [false, true, true]) - } -} - -impl fmt::Debug for SyncedConnections { +impl fmt::Debug for Web3Connections { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { // TODO: the default formatter takes forever to write. this is too quiet though - // TODO: print the actual conns? - f.debug_struct("SyncedConnections") - .field("head_num", &self.head_block_num) - .field("head_hash", &self.head_block_hash) - .field("num_conns", &self.conns.len()) + f.debug_struct("Web3Connections") + .field("conns", &self.conns) .finish_non_exhaustive() } } + +impl Serialize for Web3Connections { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let conns: Vec<&Web3Connection> = self.conns.iter().map(|x| x.1.as_ref()).collect(); + + let mut state = serializer.serialize_struct("Web3Connections", 2)?; + state.serialize_field("conns", &conns)?; + state.serialize_field("synced_connections", &**self.synced_connections.load())?; + state.end() + } +} diff --git a/web3_proxy/src/rpcs/mod.rs b/web3_proxy/src/rpcs/mod.rs index c2337ac9..ff2ae8c7 100644 --- a/web3_proxy/src/rpcs/mod.rs +++ b/web3_proxy/src/rpcs/mod.rs @@ -1,5 +1,7 @@ +mod blockchain; mod connection; mod connections; +mod provider; mod synced_connections; pub use connection::{ActiveRequestHandle, HandleResult, Web3Connection}; diff --git a/web3_proxy/src/rpcs/provider.rs b/web3_proxy/src/rpcs/provider.rs new file mode 100644 index 00000000..c65e1a13 --- /dev/null +++ b/web3_proxy/src/rpcs/provider.rs @@ -0,0 +1,9 @@ +use derive_more::From; + +/// Use HTTP and WS providers. +// TODO: instead of an enum, I tried to use Box, but hit +#[derive(From)] +pub enum Web3Provider { + Http(ethers::providers::Provider), + Ws(ethers::providers::Provider), +} diff --git a/web3_proxy/src/rpcs/synced_connections.rs b/web3_proxy/src/rpcs/synced_connections.rs index d2350bca..5245f0d8 100644 --- a/web3_proxy/src/rpcs/synced_connections.rs +++ b/web3_proxy/src/rpcs/synced_connections.rs @@ -1,7 +1,8 @@ -use super::Web3Connection; +use super::{Web3Connection, Web3Connections}; use ethers::prelude::{H256, U64}; use indexmap::IndexSet; use serde::Serialize; +use std::fmt; use std::sync::Arc; /// A collection of Web3Connections that are on the same block. @@ -14,3 +15,47 @@ pub struct SyncedConnections { #[serde(skip_serializing)] pub(super) conns: IndexSet>, } + +impl fmt::Debug for SyncedConnections { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + // TODO: the default formatter takes forever to write. this is too quiet though + // TODO: print the actual conns? + f.debug_struct("SyncedConnections") + .field("head_num", &self.head_block_num) + .field("head_hash", &self.head_block_hash) + .field("num_conns", &self.conns.len()) + .finish_non_exhaustive() + } +} + +impl Web3Connections { + pub fn head_block(&self) -> (U64, H256) { + let synced_connections = self.synced_connections.load(); + + ( + synced_connections.head_block_num, + synced_connections.head_block_hash, + ) + } + + pub fn head_block_hash(&self) -> H256 { + self.synced_connections.load().head_block_hash + } + + pub fn head_block_num(&self) -> U64 { + self.synced_connections.load().head_block_num + } + + pub fn synced(&self) -> bool { + // TODO: require a minimum number of synced rpcs + // TODO: move this whole function to SyncedConnections + if self.synced_connections.load().conns.is_empty() { + return false; + } + self.head_block_num() > U64::zero() + } + + pub fn num_synced_rpcs(&self) -> usize { + self.synced_connections.load().conns.len() + } +}