From bb96757452d2d628bf1155a9732c1e2d47fe0af3 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Tue, 6 Sep 2022 12:29:37 +0000 Subject: [PATCH] None instead of Block::default() more places --- web3_proxy/src/app.rs | 2 +- web3_proxy/src/config.rs | 2 +- web3_proxy/src/rpcs/blockchain.rs | 53 ++++++++++++++++-------------- web3_proxy/src/rpcs/connection.rs | 48 ++++++++++++++++----------- web3_proxy/src/rpcs/connections.rs | 5 ++- 5 files changed, 61 insertions(+), 49 deletions(-) diff --git a/web3_proxy/src/app.rs b/web3_proxy/src/app.rs index 1cf30536..6bf434d5 100644 --- a/web3_proxy/src/app.rs +++ b/web3_proxy/src/app.rs @@ -227,7 +227,7 @@ impl Web3ProxyApp { } }; - // TODO: this should be a broadcast channel + // TODO: i don't like doing Block::default here! Change this to "None"? let (head_block_sender, head_block_receiver) = watch::channel(Arc::new(Block::default())); // TODO: will one receiver lagging be okay? how big should this be? let (pending_tx_sender, pending_tx_receiver) = broadcast::channel(256); diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index 6b9ad45b..df1bda6d 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -10,7 +10,7 @@ use serde::Deserialize; use std::sync::Arc; use tokio::sync::broadcast; -pub type BlockAndRpc = (ArcBlock, Arc); +pub type BlockAndRpc = (Option, Arc); pub type TxHashAndRpc = (TxHash, Arc); #[derive(Debug, FromArgs)] diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index 3e99cbdc..b36ee81a 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -39,6 +39,12 @@ impl Web3Connections { pub async fn save_block(&self, block: &ArcBlock, heaviest_chain: bool) -> anyhow::Result<()> { // TODO: i think we can rearrange this function to make it faster on the hot path let block_hash = block.hash.as_ref().context("no block hash")?; + + // skip if + if block_hash.is_zero() { + return Ok(()); + } + let block_num = block.number.as_ref().context("no block num")?; let _block_td = block .total_difficulty @@ -232,40 +238,34 @@ impl Web3Connections { async fn process_block_from_rpc( &self, connection_heads: &mut HashMap, - rpc_head_block: ArcBlock, + rpc_head_block: Option, rpc: Arc, head_block_sender: &watch::Sender, pending_tx_sender: &Option>, ) -> anyhow::Result<()> { - // add the block to connection_heads - let rpc_block_id = match (rpc_head_block.hash, rpc_head_block.number) { - (Some(rpc_head_hash), Some(rpc_head_num)) => { - if rpc_head_num == U64::zero() { + // add the rpc's block to connection_heads, or remove the rpc from connection_heads + match rpc_head_block { + Some(rpc_head_block) => { + let rpc_head_num = rpc_head_block.number.unwrap(); + let rpc_head_hash = rpc_head_block.hash.unwrap(); + + if rpc_head_num.is_zero() { + // TODO: i don't think we can get to this anymore now that we use Options debug!(%rpc, "still syncing"); connection_heads.remove(&rpc.name); - - None } else { // we don't know if its on the heaviest chain yet self.save_block(&rpc_head_block, false).await?; connection_heads.insert(rpc.name.to_owned(), rpc_head_hash); - - Some(BlockId { - hash: rpc_head_hash, - num: rpc_head_num, - }) } } - _ => { + None => { // TODO: warn is too verbose. this is expected if a node disconnects and has to reconnect trace!(%rpc, "Block without number or hash!"); connection_heads.remove(&rpc.name); - - // don't return yet! self.synced_connections likely needs an update - None } }; @@ -284,7 +284,7 @@ impl Web3Connections { x } else { // TODO: why does this happen? - warn!(%conn_head_hash, %conn_name, "No block found"); + warn!(%conn_head_hash, %conn_name, %rpc, "Missing block in connection_heads"); continue; }; @@ -375,6 +375,9 @@ impl Web3Connections { // TODO: if heavy_rpcs.is_empty, try another method of finding the head block + let num_connection_heads = connection_heads.len(); + let total_conns = self.conns.len(); + // we've done all the searching for the heaviest block that we can if heavy_rpcs.is_empty() { // if we get here, something is wrong. clear synced connections @@ -385,8 +388,12 @@ impl Web3Connections { .swap(Arc::new(empty_synced_connections)); // TODO: log different things depending on old_synced_connections - warn!("no consensus head!"); + warn!(%rpc, "no consensus head! {}/{}/{}", 0, num_connection_heads, total_conns); } else { + // TODO: this is too verbose. move to trace + // i think "conns" is somehow getting dupes + debug!(?heavy_rpcs); + // success! this block has enough soft limit and nodes on it (or on later blocks) let conns: Vec> = heavy_rpcs .into_iter() @@ -402,8 +409,6 @@ impl Web3Connections { // TODO: add these to the log messages let num_consensus_rpcs = conns.len(); - let num_connection_heads = connection_heads.len(); - let total_conns = self.conns.len(); let heavy_block_id = BlockId { hash: heavy_hash, @@ -424,7 +429,7 @@ impl Web3Connections { None => { debug!(block=%heavy_block_id, %rpc, "first consensus head {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns); - self.save_block(&rpc_head_block, true).await?; + self.save_block(&heavy_block, true).await?; head_block_sender.send(heavy_block)?; } @@ -440,7 +445,7 @@ impl Web3Connections { info!(heavy=%heavy_block_id, old=%old_block_id, %rpc, "unc block {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns); // todo!("handle equal by updating the cannonical chain"); - self.save_block(&rpc_head_block, true).await?; + self.save_block(&heavy_block, true).await?; head_block_sender.send(heavy_block)?; } @@ -450,7 +455,7 @@ impl Web3Connections { // TODO: better log warn!(head=%heavy_block_id, %rpc, "chain rolled back {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns); - self.save_block(&rpc_head_block, true).await?; + self.save_block(&heavy_block, true).await?; // todo!("handle less by removing higher blocks from the cannonical chain"); head_block_sender.send(heavy_block)?; @@ -460,7 +465,7 @@ impl Web3Connections { // todo!("handle greater by adding this block to and any missing parents to the cannonical chain"); - self.save_block(&rpc_head_block, true).await?; + self.save_block(&heavy_block, true).await?; head_block_sender.send(heavy_block)?; } diff --git a/web3_proxy/src/rpcs/connection.rs b/web3_proxy/src/rpcs/connection.rs index 24c2e724..aef181fb 100644 --- a/web3_proxy/src/rpcs/connection.rs +++ b/web3_proxy/src/rpcs/connection.rs @@ -234,36 +234,41 @@ impl Web3Connection { needed_block_num >= &oldest_block_num && needed_block_num <= &newest_block_num } + /// reconnect a websocket provider #[instrument(skip_all)] pub async fn reconnect( self: &Arc, block_sender: Option>, ) -> anyhow::Result<()> { + // TODO: no-op if this called on a http provider // websocket doesn't need the http client let http_client = None; - info!(%self, "reconnecting"); + info!(rpc=%self, "reconnecting"); // since this lock is held open over an await, we use tokio's locking // TODO: timeout on this lock. if its slow, something is wrong - let mut provider = self.provider.write().await; + { + let mut provider = self.provider.write().await; - *provider = None; + *provider = None; + + // TODO: if this fails, keep retrying + let new_provider = Web3Provider::from_str(&self.url, http_client) + .await + .unwrap(); + + *provider = Some(Arc::new(new_provider)); + } // tell the block subscriber that we don't have any blocks if let Some(block_sender) = block_sender { - // TODO: maybe it would be better do send a "None" or an Option>> block_sender - .send_async((Arc::new(Block::default()), self.clone())) + .send_async((None, self.clone())) .await - .context("block_sender at 0")?; + .context("block_sender during reconnect")?; } - // TODO: if this fails, keep retrying - let new_provider = Web3Provider::from_str(&self.url, http_client).await?; - - *provider = Some(Arc::new(new_provider)); - Ok(()) } @@ -280,16 +285,19 @@ impl Web3Connection { #[instrument(skip_all)] async fn send_head_block_result( self: &Arc, - new_head_block: Result, + new_head_block: Result, ProviderError>, block_sender: &flume::Sender, block_map: BlockHashesMap, ) -> anyhow::Result<()> { match new_head_block { - Ok(mut new_head_block) => { + Ok(None) => { + todo!("handle no block") + } + Ok(Some(mut new_head_block)) => { // TODO: is unwrap_or_default ok? we might have an empty block let new_hash = new_head_block.hash.unwrap_or_default(); - // if we already have this block saved, we don't need to store this copy + // if we already have this block saved, set new_block_head to that arc and don't store this copy // TODO: small race here new_head_block = if let Some(existing_block) = block_map.get(&new_hash) { // we only save blocks with a total difficulty @@ -348,7 +356,7 @@ impl Web3Connection { } block_sender - .send_async((new_head_block, self.clone())) + .send_async((Some(new_head_block), self.clone())) .await .context("block_sender")?; } @@ -358,7 +366,7 @@ impl Web3Connection { // send an empty block to take this server out of rotation block_sender - .send_async((Arc::new(Block::default()), self.clone())) + .send_async((None, self.clone())) .await .context("block_sender")?; } @@ -474,7 +482,7 @@ impl Web3Connection { last_hash = new_hash; self.send_head_block_result( - Ok(Arc::new(block)), + Ok(Some(Arc::new(block))), &block_sender, block_map.clone(), ) @@ -526,19 +534,19 @@ impl Web3Connection { // query the block once since the subscription doesn't send the current block // there is a very small race condition here where the stream could send us a new block right now // all it does is print "new block" for the same block as current block - let block: Result = self + let block: Result, _> = self .wait_for_request_handle() .await? .request("eth_getBlockByNumber", ("latest", false)) .await - .map(Arc::new); + .map(|x| Some(Arc::new(x))); self.send_head_block_result(block, &block_sender, block_map.clone()) .await?; while let Some(new_block) = stream.next().await { self.send_head_block_result( - Ok(Arc::new(new_block)), + Ok(Some(Arc::new(new_block))), &block_sender, block_map.clone(), ) diff --git a/web3_proxy/src/rpcs/connections.rs b/web3_proxy/src/rpcs/connections.rs index c747d1f8..2dbba595 100644 --- a/web3_proxy/src/rpcs/connections.rs +++ b/web3_proxy/src/rpcs/connections.rs @@ -10,7 +10,7 @@ use crate::rpcs::transactions::TxStatus; use arc_swap::ArcSwap; use counter::Counter; use derive_more::From; -use ethers::prelude::{Block, ProviderError, TxHash, H256, U64}; +use ethers::prelude::{ProviderError, TxHash, H256, U64}; use futures::future::{join_all, try_join_all}; use futures::stream::FuturesUnordered; use futures::StreamExt; @@ -65,8 +65,7 @@ impl Web3Connections { pending_transactions: Cache, ) -> anyhow::Result<(Arc, AnyhowJoinHandle<()>)> { let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded(); - let (block_sender, block_receiver) = - flume::unbounded::<(Arc>, Arc)>(); + let (block_sender, block_receiver) = flume::unbounded::(); let http_interval_sender = if http_client.is_some() { let (sender, receiver) = broadcast::channel(1);