From 789672be43282caa31eaa3197cd077435fd95ae7 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Wed, 14 Sep 2022 05:26:46 +0000 Subject: [PATCH] try fixing warning about missing blocks --- web3_proxy/src/lib.rs | 1 + web3_proxy/src/rate_limiter.rs | 0 web3_proxy/src/rpcs/blockchain.rs | 65 +++++++++++++++++------------- web3_proxy/src/rpcs/connections.rs | 8 +++- 4 files changed, 43 insertions(+), 31 deletions(-) create mode 100644 web3_proxy/src/rate_limiter.rs diff --git a/web3_proxy/src/lib.rs b/web3_proxy/src/lib.rs index a2da1712..d0c8e076 100644 --- a/web3_proxy/src/lib.rs +++ b/web3_proxy/src/lib.rs @@ -4,5 +4,6 @@ pub mod config; pub mod frontend; pub mod jsonrpc; pub mod metrics; +pub mod rate_limiter; pub mod rpcs; pub mod users; diff --git a/web3_proxy/src/rate_limiter.rs b/web3_proxy/src/rate_limiter.rs new file mode 100644 index 00000000..e69de29b diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index ef7cbf02..84f4c41e 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -40,21 +40,18 @@ impl Web3Connections { // TODO: i think we can rearrange this function to make it faster on the hot path let block_hash = block.hash.as_ref().context("no block hash")?; - // skip if + // skip Block::default() if block_hash.is_zero() { + debug!("Skipping block without hash!"); return Ok(()); } let block_num = block.number.as_ref().context("no block num")?; + let _block_td = block .total_difficulty .as_ref() - .expect("no block total difficulty"); - - // if self.block_hashes.contains_key(block_hash) { - // // this block is already included. no need to continue - // return Ok(()); - // } + .expect("no block total difficulty. this is a bug!"); let mut blockchain = self.blockchain_graphmap.write().await; @@ -62,25 +59,19 @@ impl Web3Connections { if heaviest_chain { // this is the only place that writes to block_numbers // its inside a write lock on blockchain_graphmap, so i think there is no race - if let Some(old_hash) = self.block_numbers.get(block_num) { - if block_hash == &old_hash { - // this block has already been saved - return Ok(()); - } - } - - // i think a race here isn't that big of a problem. just 2 inserts + // multiple inserts should be okay though self.block_numbers.insert(*block_num, *block_hash).await; } if blockchain.contains_node(*block_hash) { // this hash is already included + trace!(%block_hash, %block_num, "skipping saving existing block"); // return now since this work was already done. return Ok(()); } // TODO: prettier log? or probably move the log somewhere else - trace!(%block_hash, "new block"); + trace!(%block_hash, %block_num, "saving new block"); // TODO: theres a small race between contains_key and insert self.block_hashes @@ -140,7 +131,7 @@ impl Web3Connections { let block = Arc::new(block); // the block was fetched using eth_getBlockByHash, so it should have all fields - // TODO: fill in heaviest_chain! + // TODO: fill in heaviest_chain! if the block is old enough, is this definitely true? self.save_block(&block, false).await?; Ok(block) @@ -215,14 +206,19 @@ impl Web3Connections { let mut connection_heads = HashMap::new(); while let Ok((new_block, rpc)) = block_receiver.recv_async().await { - self.process_block_from_rpc( - &mut connection_heads, - new_block, - rpc, - &head_block_sender, - &pending_tx_sender, - ) - .await?; + let rpc_name = rpc.name.clone(); + if let Err(err) = self + .process_block_from_rpc( + &mut connection_heads, + new_block, + rpc, + &head_block_sender, + &pending_tx_sender, + ) + .await + { + warn!(rpc=%rpc_name, ?err, "unable to process block from rpc"); + } } // TODO: if there was an error, we should return it @@ -257,7 +253,6 @@ impl Web3Connections { None } else { // we don't know if its on the heaviest chain yet - debug!(?rpc_head_hash, ?rpc_head_num, %rpc.name, "saving"); self.save_block(&rpc_head_block, false).await?; connection_heads.insert(rpc.name.to_owned(), rpc_head_hash); @@ -292,9 +287,21 @@ impl Web3Connections { let conn_head_block = if let Some(x) = self.block_hashes.get(connection_head_hash) { x } else { - // TODO: why does this happen?!?! maybe we should do get_with? - warn!(%connection_head_hash, %conn_name, %rpc, "Missing connection_head_block in block_hashes"); - continue; + // TODO: why does this happen?!?! + // TODO: maybe we should do get_with? + // TODO: maybe we should just continue. this only seems to happen when an older block is received + warn!(%connection_head_hash, %conn_name, %rpc, "Missing connection_head_block in block_hashes. Fetching now"); + + // this option should always be populated + let conn_rpc = self.conns.get(conn_name); + + match self.block(connection_head_hash, conn_rpc).await { + Ok(block) => block, + Err(err) => { + warn!(%connection_head_hash, %conn_name, %rpc, ?err, "Failed fetching connection_head_block for block_hashe"); + continue; + } + } }; match &conn_head_block.total_difficulty { diff --git a/web3_proxy/src/rpcs/connections.rs b/web3_proxy/src/rpcs/connections.rs index 24a8a4be..86401fca 100644 --- a/web3_proxy/src/rpcs/connections.rs +++ b/web3_proxy/src/rpcs/connections.rs @@ -643,11 +643,15 @@ impl Serialize for Web3Connections { where S: Serializer, { - let conns: Vec<&Web3Connection> = self.conns.iter().map(|x| x.1.as_ref()).collect(); + let conns: Vec<&Web3Connection> = self.conns.values().map(|x| x.as_ref()).collect(); - let mut state = serializer.serialize_struct("Web3Connections", 2)?; + let mut state = serializer.serialize_struct("Web3Connections", 6)?; state.serialize_field("conns", &conns)?; state.serialize_field("synced_connections", &**self.synced_connections.load())?; + state.serialize_field("block_hashes_count", &self.block_hashes.entry_count())?; + state.serialize_field("block_hashes_size", &self.block_hashes.weighted_size())?; + state.serialize_field("block_numbers_count", &self.block_numbers.entry_count())?; + state.serialize_field("block_numbers_size", &self.block_numbers.weighted_size())?; state.end() } }