From 6f0ae1ec35ac9f598d699cd080df6db613f1ec16 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Fri, 2 Sep 2022 21:35:03 +0000 Subject: [PATCH] slower but correct save_block --- web3_proxy/src/rpcs/blockchain.rs | 65 +++++++++++++++---------------- 1 file changed, 31 insertions(+), 34 deletions(-) diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index e63313b3..53da5eec 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -39,6 +39,7 @@ impl Display for BlockId { impl Web3Connections { /// add a block to our map and it's hash to our graphmap of the blockchain pub fn save_block(&self, block: &ArcBlock, heaviest_chain: Option) -> anyhow::Result<()> { + // TODO: i think we can rearrange this function to make it faster on the hot path let block_hash = block.hash.as_ref().context("no block hash")?; let block_num = block.number.as_ref().context("no block num")?; let _block_td = block @@ -46,21 +47,28 @@ impl Web3Connections { .as_ref() .context("no block total difficulty")?; + // if self.block_hashes.contains_key(block_hash) { + // // this block is already included. no need to continue + // return Ok(()); + // } + + let mut blockchain = self.blockchain_graphmap.write(); + // think more about heaviest_chain if heaviest_chain.unwrap_or(true) { match self.block_numbers.entry(*block_num) { Entry::Occupied(mut x) => { let old_hash = x.insert(*block_hash); - if block_hash == &old_hash { - // this block has already been saved - return Ok(()); - } + // if block_hash == &old_hash { + // // this block has already been saved + // return Ok(()); + // } // TODO: what should we do? // TODO: if old_hash's number is > block_num, we need to remove more entries warn!( - "do something with the old hash? we may need to update a bunch more block numbers" + "do something with the old hash ({}) for {}? we may need to update a bunch more block numbers", old_hash, block_num ) } Entry::Vacant(x) => { @@ -69,18 +77,11 @@ impl Web3Connections { } } - if self.block_hashes.contains_key(block_hash) { - // this block is already included. no need to continue - return Ok(()); - } - - let mut blockchain = self.blockchain_graphmap.write(); - - if blockchain.contains_node(*block_hash) { - // this hash is already included. we must have hit that race condition - // return now since this work was already done. - return Ok(()); - } + // if blockchain.contains_node(*block_hash) { + // // this hash is already included. we must have hit that race condition + // // return now since this work was already done. + // return Ok(()); + // } // TODO: theres a small race between contains_key and insert if let Some(_overwritten) = self.block_hashes.insert(*block_hash, block.clone()) { @@ -114,25 +115,25 @@ impl Web3Connections { rpc: Option<&Arc>, ) -> anyhow::Result { // first, try to get the hash from our cache + // the cache is set last, so if its here, its everywhere if let Some(block) = self.block_hashes.get(hash) { return Ok(block.clone()); } // block not in cache. we need to ask an rpc for it - let request_params = (hash, false); + let get_block_params = (hash, false); // TODO: if error, retry? let block: Block = match rpc { Some(rpc) => { rpc.wait_for_request_handle() .await? - .request("eth_getBlockByHash", request_params) + .request("eth_getBlockByHash", get_block_params) .await? } None => { // TODO: helper for method+params => JsonRpcRequest // TODO: does this id matter? - let request = - json!({ "id": "1", "method": "eth_getBlockByHash", "params": request_params }); + let request = json!({ "id": "1", "method": "eth_getBlockByHash", "params": get_block_params }); let request: JsonRpcRequest = serde_json::from_value(request)?; let response = self.try_send_best_upstream_server(request, None).await?; @@ -166,22 +167,10 @@ impl Web3Connections { // maybe save them during save_block in a blocks_by_number DashMap> // if theres multiple, use petgraph to find the one on the main chain (and remove the others if they have enough confirmations) - // first, try to get the hash from our cache - if let Some(block_hash) = self.block_numbers.get(num) { - let block = self - .block_hashes - .get(&block_hash) - .expect("block_numbers gave us this hash"); - - return Ok(block.clone()); - } - - // block not in cache. we need to ask an rpc for it - // but before we do any queries, be sure the requested block num exists + // be sure the requested block num exists let head_block_num = self .head_block_num() .ok_or_else(|| anyhow::anyhow!("no servers in sync"))?; - if num > &head_block_num { // TODO: i'm seeing this a lot when using ethspam. i dont know why though. i thought we delayed publishing // TODO: instead of error, maybe just sleep and try again? @@ -192,6 +181,14 @@ impl Web3Connections { )); } + // try to get the hash from our cache + // deref to not keep the lock open + if let Some(block_hash) = self.block_numbers.get(num).map(|x| *x) { + // TODO: sometimes this needs to fetch the block. why? i thought block_numbers would only be set if the block hash was set + return self.block(&block_hash, None).await; + } + + // block number not in cache. we need to ask an rpc for it // TODO: helper for method+params => JsonRpcRequest let request = json!({ "jsonrpc": "2.0", "id": "1", "method": "eth_getBlockByNumber", "params": (num, false) }); let request: JsonRpcRequest = serde_json::from_value(request)?;