None instead of Block::default() more places

This commit is contained in:
Bryan Stitt 2022-09-06 12:29:37 +00:00
parent e8977d203b
commit bb96757452
5 changed files with 61 additions and 49 deletions

@ -227,7 +227,7 @@ impl Web3ProxyApp {
} }
}; };
// TODO: this should be a broadcast channel // TODO: i don't like doing Block::default here! Change this to "None"?
let (head_block_sender, head_block_receiver) = watch::channel(Arc::new(Block::default())); let (head_block_sender, head_block_receiver) = watch::channel(Arc::new(Block::default()));
// TODO: will one receiver lagging be okay? how big should this be? // TODO: will one receiver lagging be okay? how big should this be?
let (pending_tx_sender, pending_tx_receiver) = broadcast::channel(256); let (pending_tx_sender, pending_tx_receiver) = broadcast::channel(256);

@ -10,7 +10,7 @@ use serde::Deserialize;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::broadcast; use tokio::sync::broadcast;
pub type BlockAndRpc = (ArcBlock, Arc<Web3Connection>); pub type BlockAndRpc = (Option<ArcBlock>, Arc<Web3Connection>);
pub type TxHashAndRpc = (TxHash, Arc<Web3Connection>); pub type TxHashAndRpc = (TxHash, Arc<Web3Connection>);
#[derive(Debug, FromArgs)] #[derive(Debug, FromArgs)]

@ -39,6 +39,12 @@ impl Web3Connections {
pub async fn save_block(&self, block: &ArcBlock, heaviest_chain: bool) -> anyhow::Result<()> { pub async fn save_block(&self, block: &ArcBlock, heaviest_chain: bool) -> anyhow::Result<()> {
// TODO: i think we can rearrange this function to make it faster on the hot path // TODO: i think we can rearrange this function to make it faster on the hot path
let block_hash = block.hash.as_ref().context("no block hash")?; let block_hash = block.hash.as_ref().context("no block hash")?;
// skip if
if block_hash.is_zero() {
return Ok(());
}
let block_num = block.number.as_ref().context("no block num")?; let block_num = block.number.as_ref().context("no block num")?;
let _block_td = block let _block_td = block
.total_difficulty .total_difficulty
@ -232,40 +238,34 @@ impl Web3Connections {
async fn process_block_from_rpc( async fn process_block_from_rpc(
&self, &self,
connection_heads: &mut HashMap<String, H256>, connection_heads: &mut HashMap<String, H256>,
rpc_head_block: ArcBlock, rpc_head_block: Option<ArcBlock>,
rpc: Arc<Web3Connection>, rpc: Arc<Web3Connection>,
head_block_sender: &watch::Sender<ArcBlock>, head_block_sender: &watch::Sender<ArcBlock>,
pending_tx_sender: &Option<broadcast::Sender<TxStatus>>, pending_tx_sender: &Option<broadcast::Sender<TxStatus>>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
// add the block to connection_heads // add the rpc's block to connection_heads, or remove the rpc from connection_heads
let rpc_block_id = match (rpc_head_block.hash, rpc_head_block.number) { match rpc_head_block {
(Some(rpc_head_hash), Some(rpc_head_num)) => { Some(rpc_head_block) => {
if rpc_head_num == U64::zero() { let rpc_head_num = rpc_head_block.number.unwrap();
let rpc_head_hash = rpc_head_block.hash.unwrap();
if rpc_head_num.is_zero() {
// TODO: i don't think we can get to this anymore now that we use Options
debug!(%rpc, "still syncing"); debug!(%rpc, "still syncing");
connection_heads.remove(&rpc.name); connection_heads.remove(&rpc.name);
None
} else { } else {
// we don't know if its on the heaviest chain yet // we don't know if its on the heaviest chain yet
self.save_block(&rpc_head_block, false).await?; self.save_block(&rpc_head_block, false).await?;
connection_heads.insert(rpc.name.to_owned(), rpc_head_hash); connection_heads.insert(rpc.name.to_owned(), rpc_head_hash);
Some(BlockId {
hash: rpc_head_hash,
num: rpc_head_num,
})
} }
} }
_ => { None => {
// TODO: warn is too verbose. this is expected if a node disconnects and has to reconnect // TODO: warn is too verbose. this is expected if a node disconnects and has to reconnect
trace!(%rpc, "Block without number or hash!"); trace!(%rpc, "Block without number or hash!");
connection_heads.remove(&rpc.name); connection_heads.remove(&rpc.name);
// don't return yet! self.synced_connections likely needs an update
None
} }
}; };
@ -284,7 +284,7 @@ impl Web3Connections {
x x
} else { } else {
// TODO: why does this happen? // TODO: why does this happen?
warn!(%conn_head_hash, %conn_name, "No block found"); warn!(%conn_head_hash, %conn_name, %rpc, "Missing block in connection_heads");
continue; continue;
}; };
@ -375,6 +375,9 @@ impl Web3Connections {
// TODO: if heavy_rpcs.is_empty, try another method of finding the head block // TODO: if heavy_rpcs.is_empty, try another method of finding the head block
let num_connection_heads = connection_heads.len();
let total_conns = self.conns.len();
// we've done all the searching for the heaviest block that we can // we've done all the searching for the heaviest block that we can
if heavy_rpcs.is_empty() { if heavy_rpcs.is_empty() {
// if we get here, something is wrong. clear synced connections // if we get here, something is wrong. clear synced connections
@ -385,8 +388,12 @@ impl Web3Connections {
.swap(Arc::new(empty_synced_connections)); .swap(Arc::new(empty_synced_connections));
// TODO: log different things depending on old_synced_connections // TODO: log different things depending on old_synced_connections
warn!("no consensus head!"); warn!(%rpc, "no consensus head! {}/{}/{}", 0, num_connection_heads, total_conns);
} else { } else {
// TODO: this is too verbose. move to trace
// i think "conns" is somehow getting dupes
debug!(?heavy_rpcs);
// success! this block has enough soft limit and nodes on it (or on later blocks) // success! this block has enough soft limit and nodes on it (or on later blocks)
let conns: Vec<Arc<Web3Connection>> = heavy_rpcs let conns: Vec<Arc<Web3Connection>> = heavy_rpcs
.into_iter() .into_iter()
@ -402,8 +409,6 @@ impl Web3Connections {
// TODO: add these to the log messages // TODO: add these to the log messages
let num_consensus_rpcs = conns.len(); let num_consensus_rpcs = conns.len();
let num_connection_heads = connection_heads.len();
let total_conns = self.conns.len();
let heavy_block_id = BlockId { let heavy_block_id = BlockId {
hash: heavy_hash, hash: heavy_hash,
@ -424,7 +429,7 @@ impl Web3Connections {
None => { None => {
debug!(block=%heavy_block_id, %rpc, "first consensus head {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns); debug!(block=%heavy_block_id, %rpc, "first consensus head {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns);
self.save_block(&rpc_head_block, true).await?; self.save_block(&heavy_block, true).await?;
head_block_sender.send(heavy_block)?; head_block_sender.send(heavy_block)?;
} }
@ -440,7 +445,7 @@ impl Web3Connections {
info!(heavy=%heavy_block_id, old=%old_block_id, %rpc, "unc block {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns); info!(heavy=%heavy_block_id, old=%old_block_id, %rpc, "unc block {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns);
// todo!("handle equal by updating the cannonical chain"); // todo!("handle equal by updating the cannonical chain");
self.save_block(&rpc_head_block, true).await?; self.save_block(&heavy_block, true).await?;
head_block_sender.send(heavy_block)?; head_block_sender.send(heavy_block)?;
} }
@ -450,7 +455,7 @@ impl Web3Connections {
// TODO: better log // TODO: better log
warn!(head=%heavy_block_id, %rpc, "chain rolled back {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns); warn!(head=%heavy_block_id, %rpc, "chain rolled back {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns);
self.save_block(&rpc_head_block, true).await?; self.save_block(&heavy_block, true).await?;
// todo!("handle less by removing higher blocks from the cannonical chain"); // todo!("handle less by removing higher blocks from the cannonical chain");
head_block_sender.send(heavy_block)?; head_block_sender.send(heavy_block)?;
@ -460,7 +465,7 @@ impl Web3Connections {
// todo!("handle greater by adding this block to and any missing parents to the cannonical chain"); // todo!("handle greater by adding this block to and any missing parents to the cannonical chain");
self.save_block(&rpc_head_block, true).await?; self.save_block(&heavy_block, true).await?;
head_block_sender.send(heavy_block)?; head_block_sender.send(heavy_block)?;
} }

@ -234,36 +234,41 @@ impl Web3Connection {
needed_block_num >= &oldest_block_num && needed_block_num <= &newest_block_num needed_block_num >= &oldest_block_num && needed_block_num <= &newest_block_num
} }
/// reconnect a websocket provider
#[instrument(skip_all)] #[instrument(skip_all)]
pub async fn reconnect( pub async fn reconnect(
self: &Arc<Self>, self: &Arc<Self>,
block_sender: Option<flume::Sender<BlockAndRpc>>, block_sender: Option<flume::Sender<BlockAndRpc>>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
// TODO: no-op if this called on a http provider
// websocket doesn't need the http client // websocket doesn't need the http client
let http_client = None; let http_client = None;
info!(%self, "reconnecting"); info!(rpc=%self, "reconnecting");
// since this lock is held open over an await, we use tokio's locking // since this lock is held open over an await, we use tokio's locking
// TODO: timeout on this lock. if its slow, something is wrong // TODO: timeout on this lock. if its slow, something is wrong
let mut provider = self.provider.write().await; {
let mut provider = self.provider.write().await;
*provider = None; *provider = None;
// TODO: if this fails, keep retrying
let new_provider = Web3Provider::from_str(&self.url, http_client)
.await
.unwrap();
*provider = Some(Arc::new(new_provider));
}
// tell the block subscriber that we don't have any blocks // tell the block subscriber that we don't have any blocks
if let Some(block_sender) = block_sender { if let Some(block_sender) = block_sender {
// TODO: maybe it would be better do send a "None" or an Option<Arc<Block<TxHash>>>
block_sender block_sender
.send_async((Arc::new(Block::default()), self.clone())) .send_async((None, self.clone()))
.await .await
.context("block_sender at 0")?; .context("block_sender during reconnect")?;
} }
// TODO: if this fails, keep retrying
let new_provider = Web3Provider::from_str(&self.url, http_client).await?;
*provider = Some(Arc::new(new_provider));
Ok(()) Ok(())
} }
@ -280,16 +285,19 @@ impl Web3Connection {
#[instrument(skip_all)] #[instrument(skip_all)]
async fn send_head_block_result( async fn send_head_block_result(
self: &Arc<Self>, self: &Arc<Self>,
new_head_block: Result<ArcBlock, ProviderError>, new_head_block: Result<Option<ArcBlock>, ProviderError>,
block_sender: &flume::Sender<BlockAndRpc>, block_sender: &flume::Sender<BlockAndRpc>,
block_map: BlockHashesMap, block_map: BlockHashesMap,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
match new_head_block { match new_head_block {
Ok(mut new_head_block) => { Ok(None) => {
todo!("handle no block")
}
Ok(Some(mut new_head_block)) => {
// TODO: is unwrap_or_default ok? we might have an empty block // TODO: is unwrap_or_default ok? we might have an empty block
let new_hash = new_head_block.hash.unwrap_or_default(); let new_hash = new_head_block.hash.unwrap_or_default();
// if we already have this block saved, we don't need to store this copy // if we already have this block saved, set new_block_head to that arc and don't store this copy
// TODO: small race here // TODO: small race here
new_head_block = if let Some(existing_block) = block_map.get(&new_hash) { new_head_block = if let Some(existing_block) = block_map.get(&new_hash) {
// we only save blocks with a total difficulty // we only save blocks with a total difficulty
@ -348,7 +356,7 @@ impl Web3Connection {
} }
block_sender block_sender
.send_async((new_head_block, self.clone())) .send_async((Some(new_head_block), self.clone()))
.await .await
.context("block_sender")?; .context("block_sender")?;
} }
@ -358,7 +366,7 @@ impl Web3Connection {
// send an empty block to take this server out of rotation // send an empty block to take this server out of rotation
block_sender block_sender
.send_async((Arc::new(Block::default()), self.clone())) .send_async((None, self.clone()))
.await .await
.context("block_sender")?; .context("block_sender")?;
} }
@ -474,7 +482,7 @@ impl Web3Connection {
last_hash = new_hash; last_hash = new_hash;
self.send_head_block_result( self.send_head_block_result(
Ok(Arc::new(block)), Ok(Some(Arc::new(block))),
&block_sender, &block_sender,
block_map.clone(), block_map.clone(),
) )
@ -526,19 +534,19 @@ impl Web3Connection {
// query the block once since the subscription doesn't send the current block // query the block once since the subscription doesn't send the current block
// there is a very small race condition here where the stream could send us a new block right now // there is a very small race condition here where the stream could send us a new block right now
// all it does is print "new block" for the same block as current block // all it does is print "new block" for the same block as current block
let block: Result<ArcBlock, _> = self let block: Result<Option<ArcBlock>, _> = self
.wait_for_request_handle() .wait_for_request_handle()
.await? .await?
.request("eth_getBlockByNumber", ("latest", false)) .request("eth_getBlockByNumber", ("latest", false))
.await .await
.map(Arc::new); .map(|x| Some(Arc::new(x)));
self.send_head_block_result(block, &block_sender, block_map.clone()) self.send_head_block_result(block, &block_sender, block_map.clone())
.await?; .await?;
while let Some(new_block) = stream.next().await { while let Some(new_block) = stream.next().await {
self.send_head_block_result( self.send_head_block_result(
Ok(Arc::new(new_block)), Ok(Some(Arc::new(new_block))),
&block_sender, &block_sender,
block_map.clone(), block_map.clone(),
) )

@ -10,7 +10,7 @@ use crate::rpcs::transactions::TxStatus;
use arc_swap::ArcSwap; use arc_swap::ArcSwap;
use counter::Counter; use counter::Counter;
use derive_more::From; use derive_more::From;
use ethers::prelude::{Block, ProviderError, TxHash, H256, U64}; use ethers::prelude::{ProviderError, TxHash, H256, U64};
use futures::future::{join_all, try_join_all}; use futures::future::{join_all, try_join_all};
use futures::stream::FuturesUnordered; use futures::stream::FuturesUnordered;
use futures::StreamExt; use futures::StreamExt;
@ -65,8 +65,7 @@ impl Web3Connections {
pending_transactions: Cache<TxHash, TxStatus>, pending_transactions: Cache<TxHash, TxStatus>,
) -> anyhow::Result<(Arc<Self>, AnyhowJoinHandle<()>)> { ) -> anyhow::Result<(Arc<Self>, AnyhowJoinHandle<()>)> {
let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded(); let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded();
let (block_sender, block_receiver) = let (block_sender, block_receiver) = flume::unbounded::<BlockAndRpc>();
flume::unbounded::<(Arc<Block<H256>>, Arc<Web3Connection>)>();
let http_interval_sender = if http_client.is_some() { let http_interval_sender = if http_client.is_some() {
let (sender, receiver) = broadcast::channel(1); let (sender, receiver) = broadcast::channel(1);