From 203969f62833668c4081acbb2188989fd6545c24 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Mon, 30 Jan 2023 15:47:17 -0800 Subject: [PATCH] include from_block and to_block for caching getLogs --- web3_proxy/src/app/mod.rs | 74 ++++++++++++++++++-- web3_proxy/src/block_number.rs | 112 +++++++++++++++++------------- web3_proxy/src/rpcs/blockchain.rs | 1 + 3 files changed, 132 insertions(+), 55 deletions(-) diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index f41c1210..8c85785c 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -71,7 +71,9 @@ pub static REQUEST_PERIOD: u64 = 60; #[derive(From)] struct ResponseCacheKey { // if none, this is cached until evicted - block: Option, + from_block: Option, + // to_block is only set when ranges of blocks are requested (like with eth_getLogs) + to_block: Option, method: String, // TODO: better type for this params: Option, @@ -96,7 +98,22 @@ impl PartialEq for ResponseCacheKey { return false; } - match (self.block.as_ref(), other.block.as_ref()) { + match (self.from_block.as_ref(), other.from_block.as_ref()) { + (None, None) => {} + (None, Some(_)) => { + return false; + } + (Some(_), None) => { + return false; + } + (Some(s), Some(o)) => { + if s != o { + return false; + } + } + } + + match (self.to_block.as_ref(), other.to_block.as_ref()) { (None, None) => {} (None, Some(_)) => { return false; @@ -123,7 +140,8 @@ impl Eq for ResponseCacheKey {} impl Hash for ResponseCacheKey { fn hash(&self, state: &mut H) { - self.block.as_ref().map(|x| x.hash()).hash(state); + self.from_block.as_ref().map(|x| x.hash()).hash(state); + self.to_block.as_ref().map(|x| x.hash()).hash(state); self.method.hash(state); self.params.as_ref().map(|x| x.to_string()).hash(state); self.cache_errors.hash(state) @@ -1434,7 +1452,8 @@ impl Web3ProxyApp { .await? { BlockNeeded::CacheSuccessForever => Some(ResponseCacheKey { - block: None, + from_block: None, + to_block: None, method: method.to_string(), params: request.params.clone(), cache_errors: false, @@ -1461,7 +1480,48 @@ impl Web3ProxyApp { .await?; Some(ResponseCacheKey { - block: Some(SavedBlock::new(request_block)), + from_block: Some(SavedBlock::new(request_block)), + to_block: None, + method: method.to_string(), + // TODO: hash here? + params: request.params.clone(), + cache_errors, + }) + } + BlockNeeded::CacheRange { + from_block_num, + to_block_num, + cache_errors, + } => { + let (from_block_hash, archive_needed) = self + .balanced_rpcs + .block_hash(authorization, &from_block_num) + .await?; + + if archive_needed { + request_metadata + .archive_request + .store(true, atomic::Ordering::Relaxed); + } + + let from_block = self + .balanced_rpcs + .block(authorization, &from_block_hash, None) + .await?; + + let (to_block_hash, _) = self + .balanced_rpcs + .block_hash(authorization, &to_block_num) + .await?; + + let to_block = self + .balanced_rpcs + .block(authorization, &to_block_hash, None) + .await?; + + Some(ResponseCacheKey { + from_block: Some(SavedBlock::new(from_block)), + to_block: Some(SavedBlock::new(to_block)), method: method.to_string(), // TODO: hash here? params: request.params.clone(), @@ -1476,7 +1536,7 @@ impl Web3ProxyApp { let authorization = authorization.clone(); if let Some(cache_key) = cache_key { - let request_block_number = cache_key.block.as_ref().map(|x| x.number()); + let from_block_num = cache_key.from_block.as_ref().map(|x| x.number()); self.response_cache .try_get_with(cache_key, async move { @@ -1491,7 +1551,7 @@ impl Web3ProxyApp { &authorization, request, Some(&request_metadata), - request_block_number.as_ref(), + from_block_num.as_ref(), ) .await?; diff --git a/web3_proxy/src/block_number.rs b/web3_proxy/src/block_number.rs index 4e52788f..50e21ee0 100644 --- a/web3_proxy/src/block_number.rs +++ b/web3_proxy/src/block_number.rs @@ -11,31 +11,29 @@ use std::sync::Arc; use crate::{frontend::authorization::Authorization, rpcs::connections::Web3Connections}; #[allow(non_snake_case)] -pub fn block_num_to_U64(block_num: BlockNumber, latest_block: U64) -> U64 { +pub fn block_num_to_U64(block_num: BlockNumber, latest_block: U64) -> (U64, bool) { match block_num { - BlockNumber::Earliest => { - // modified is false because we want the backend to see "pending" - U64::zero() - } + BlockNumber::Earliest => (U64::zero(), false), BlockNumber::Finalized => { warn!("finalized block requested! not yet implemented!"); - latest_block - 10 + (latest_block - 10, false) } BlockNumber::Latest => { // change "latest" to a number - latest_block + (latest_block, true) } BlockNumber::Number(x) => { // we already have a number - x + (x, false) } BlockNumber::Pending => { + // modified is false because we want the backend to see "pending" // TODO: think more about how to handle Pending - latest_block + (latest_block, false) } BlockNumber::Safe => { warn!("finalized block requested! not yet implemented!"); - latest_block - 3 + (latest_block - 3, false) } } } @@ -72,7 +70,7 @@ pub async fn clean_block_number( let start = x.clone(); // convert the json value to a BlockNumber - let block_num = if let Some(obj) = x.as_object_mut() { + let (block_num, change) = if let Some(obj) = x.as_object_mut() { // it might be a Map like `{"blockHash": String("0xa5626dc20d3a0a209b1de85521717a3e859698de8ce98bca1b16822b7501f74b")}` if let Some(block_hash) = obj.remove("blockHash") { let block_hash: H256 = @@ -80,9 +78,13 @@ pub async fn clean_block_number( let block = rpcs.block(authorization, &block_hash, None).await?; - block - .number - .expect("blocks here should always have numbers") + // TODO: set change to true? i think not we should probably use hashes for everything. + ( + block + .number + .expect("blocks here should always have numbers"), + false, + ) } else { return Err(anyhow::anyhow!("blockHash missing")); } @@ -95,7 +97,9 @@ pub async fn clean_block_number( }; // if we changed "latest" to a number, update the params to match - *x = serde_json::to_value(block_num)?; + if change { + *x = serde_json::to_value(block_num)?; + } // TODO: only do this if trace logging is enabled if x.as_u64() != start.as_u64() { @@ -112,7 +116,15 @@ pub async fn clean_block_number( pub enum BlockNeeded { CacheSuccessForever, CacheNever, - Cache { block_num: U64, cache_errors: bool }, + Cache { + block_num: U64, + cache_errors: bool, + }, + CacheRange { + from_block_num: U64, + to_block_num: U64, + cache_errors: bool, + }, } pub async fn block_needed( @@ -122,12 +134,13 @@ pub async fn block_needed( head_block_num: U64, rpcs: &Web3Connections, ) -> anyhow::Result { - // if no params, no block is needed let params = if let Some(params) = params { + // grab the params so we can inspect and potentially modify them params } else { + // if no params, no block is needed // TODO: check all the methods with no params, some might not be cacheable - // caching for one block should always be okay + // caching with the head block /should/ always be okay return Ok(BlockNeeded::Cache { block_num: head_block_num, cache_errors: true, @@ -168,39 +181,42 @@ pub async fn block_needed( .as_object_mut() .ok_or_else(|| anyhow::anyhow!("invalid format"))?; - if let Some(x) = obj.get_mut("fromBlock") { - let block_num: BlockNumber = serde_json::from_value(x.take())?; - - let block_num = block_num_to_U64(block_num, head_block_num); - - *x = json!(block_num); - - // TODO: maybe don't return. instead check toBlock too? - // TODO: if there is a very wide fromBlock and toBlock, we need to check that our rpcs have both! - return Ok(BlockNeeded::Cache { - block_num, - cache_errors: false, - }); - } - - if let Some(x) = obj.get_mut("toBlock") { - let block_num: BlockNumber = serde_json::from_value(x.take())?; - - let block_num = block_num_to_U64(block_num, head_block_num); - - *x = json!(block_num); - - return Ok(BlockNeeded::Cache { - block_num, - cache_errors: false, - }); - } - if obj.contains_key("blockHash") { 1 } else { - return Ok(BlockNeeded::Cache { - block_num: head_block_num, + let from_block_num = if let Some(x) = obj.get_mut("fromBlock") { + let block_num: BlockNumber = serde_json::from_value(x.take())?; + + let (block_num, change) = block_num_to_U64(block_num, head_block_num); + + if change { + *x = json!(block_num); + } + + block_num + } else { + let (block_num, _) = block_num_to_U64(BlockNumber::Earliest, head_block_num); + + block_num + }; + + let to_block_num = if let Some(x) = obj.get_mut("toBlock") { + let block_num: BlockNumber = serde_json::from_value(x.take())?; + + let (block_num, change) = block_num_to_U64(block_num, head_block_num); + + if change { + *x = json!(block_num); + } + + block_num + } else { + head_block_num + }; + + return Ok(BlockNeeded::CacheRange { + from_block_num: from_block_num, + to_block_num: to_block_num, cache_errors: true, }); } diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index da1c2188..20a1be0f 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -234,6 +234,7 @@ impl Web3Connections { head_block_num.expect("we should only get here if we have a head block"); // TODO: geth does 64, erigon does 90k. sometimes we run a mix + // TODO: do this dynamically based on balanced_rpcs block_data_limit let archive_needed = num < &(head_block_num - U64::from(64)); // try to get the hash from our cache