diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index f0f7e765..0d2a3504 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -1,6 +1,6 @@ mod ws; -use crate::block_number::{block_needed, BlockNeeded}; +use crate::block_number::CacheMode; use crate::config::{AppConfig, TopConfig}; use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult}; use crate::frontend::authorization::{ @@ -992,17 +992,18 @@ impl Web3ProxyApp { // get the head block now so that any requests that need it all use the same block // TODO: this still has an edge condition if there is a reorg in the middle of the request!!! - let head_block_num = self + let head_block: Web3ProxyBlock = self .balanced_rpcs - .head_block_num() - .ok_or(Web3ProxyError::NoServersSynced)?; + .head_block() + .ok_or(Web3ProxyError::NoServersSynced)? + .clone(); // TODO: use streams and buffers so we don't overwhelm our server let responses = join_all( requests .into_iter() .map(|request| { - self.proxy_request(request, authorization.clone(), Some(head_block_num)) + self.proxy_request(request, authorization.clone(), Some(&head_block)) }) .collect::>(), ) @@ -1122,13 +1123,13 @@ impl Web3ProxyApp { self: &Arc, mut request: JsonRpcRequest, authorization: Arc, - head_block_num: Option, + head_block: Option<&Web3ProxyBlock>, ) -> (StatusCode, JsonRpcForwardedResponse, Vec>) { let request_metadata = RequestMetadata::new( self, authorization, RequestOrMethod::Request(&request), - head_block_num.as_ref(), + head_block, ) .await; @@ -1144,7 +1145,7 @@ impl Web3ProxyApp { ._proxy_request_with_caching( &request.method, &mut request.params, - head_block_num, + head_block, Some(2), &request_metadata, ) @@ -1188,7 +1189,7 @@ impl Web3ProxyApp { self: &Arc, method: &str, params: &mut serde_json::Value, - head_block_num: Option, + head_block: Option<&Web3ProxyBlock>, max_tries: Option, request_metadata: &Arc, ) -> Web3ProxyResult>> { @@ -1322,7 +1323,7 @@ impl Web3ProxyApp { }, "eth_accounts" => JsonRpcResponseEnum::from(serde_json::Value::Array(vec![])), "eth_blockNumber" => { - match head_block_num.or(self.balanced_rpcs.head_block_num()) { + match head_block.cloned().or(self.balanced_rpcs.head_block()) { Some(head_block_num) => JsonRpcResponseEnum::from(json!(head_block_num)), None => { // TODO: what does geth do if this happens? @@ -1612,38 +1613,36 @@ impl Web3ProxyApp { } // TODO: if no servers synced, wait for them to be synced? probably better to error and let haproxy retry another server - let head_block_num = head_block_num - .or(self.balanced_rpcs.head_block_num()) + let head_block: Web3ProxyBlock = head_block + .cloned() + .or_else(|| self.balanced_rpcs.head_block()) .ok_or(Web3ProxyError::NoServersSynced)?; // we do this check before checking caches because it might modify the request params // TODO: add a stat for archive vs full since they should probably cost different // TODO: this cache key can be rather large. is that okay? - let cache_key: Option = match block_needed( + let cache_key: Option = match CacheMode::new( &authorization, method, params, - head_block_num, + &head_block, &self.balanced_rpcs, ) - .await? + .await { - BlockNeeded::CacheSuccessForever => Some(JsonRpcQueryCacheKey::new( + CacheMode::CacheSuccessForever => Some(JsonRpcQueryCacheKey::new( None, None, method, params, false, )), - BlockNeeded::CacheNever => None, - BlockNeeded::Cache { - block_num, + CacheMode::CacheNever => None, + CacheMode::Cache { + block, cache_errors, } => { - let (request_block_hash, block_depth) = self - .balanced_rpcs - .block_hash(&authorization, &block_num) - .await?; + let block_depth = (head_block.number() - block.num()).as_u64(); if block_depth < self.config.archive_depth { request_metadata @@ -1651,29 +1650,20 @@ impl Web3ProxyApp { .store(true, atomic::Ordering::Release); } - let request_block = self - .balanced_rpcs - .block(&authorization, &request_block_hash, None, Some(3), None) - .await? - .block; - Some(JsonRpcQueryCacheKey::new( - Some(request_block), + Some(block), None, method, params, cache_errors, )) } - BlockNeeded::CacheRange { - from_block_num, - to_block_num, + CacheMode::CacheRange { + from_block, + to_block, cache_errors, } => { - let (from_block_hash, block_depth) = self - .balanced_rpcs - .block_hash(&authorization, &from_block_num) - .await?; + let block_depth = (head_block.number() - from_block.num()).as_u64(); if block_depth < self.config.archive_depth { request_metadata @@ -1681,23 +1671,6 @@ impl Web3ProxyApp { .store(true, atomic::Ordering::Release); } - let from_block = self - .balanced_rpcs - .block(&authorization, &from_block_hash, None, Some(3), None) - .await? - .block; - - let (to_block_hash, _) = self - .balanced_rpcs - .block_hash(&authorization, &to_block_num) - .await?; - - let to_block = self - .balanced_rpcs - .block(&authorization, &to_block_hash, None, Some(3), None) - .await? - .block; - Some(JsonRpcQueryCacheKey::new( Some(from_block), Some(to_block), @@ -1709,11 +1682,11 @@ impl Web3ProxyApp { }; // TODO: different timeouts for different user tiers. get the duration out of the request_metadata - let max_wait = Duration::from_secs(240); + let backend_request_timetout = Duration::from_secs(240); if let Some(cache_key) = cache_key { - let from_block_num = cache_key.from_block_num(); - let to_block_num = cache_key.to_block_num(); + let from_block_num = cache_key.from_block_num().copied(); + let to_block_num = cache_key.to_block_num().copied(); let cache_jsonrpc_errors = cache_key.cache_errors(); // TODO: try to fetch out of s3 @@ -1722,14 +1695,14 @@ impl Web3ProxyApp { .jsonrpc_response_cache .try_get_with::<_, Web3ProxyError>(cache_key.hash(), async { let response_data = timeout( - max_wait + Duration::from_millis(100), + backend_request_timetout + Duration::from_millis(100), self.balanced_rpcs .try_proxy_connection::<_, Arc>( method, params, Some(request_metadata), max_tries, - Some(max_wait), + Some(backend_request_timetout), from_block_num.as_ref(), to_block_num.as_ref(), )) @@ -1749,14 +1722,14 @@ impl Web3ProxyApp { }).await? } else { let x = timeout( - max_wait + Duration::from_millis(100), + backend_request_timetout + Duration::from_millis(100), self.balanced_rpcs .try_proxy_connection::<_, Arc>( method, params, Some(request_metadata), max_tries, - Some(max_wait), + Some(backend_request_timetout), None, None, ) diff --git a/web3_proxy/src/app/ws.rs b/web3_proxy/src/app/ws.rs index e47036b3..78d44f49 100644 --- a/web3_proxy/src/app/ws.rs +++ b/web3_proxy/src/app/ws.rs @@ -79,7 +79,7 @@ impl Web3ProxyApp { &app, authorization.clone(), RequestOrMethod::Method("eth_subscribe(newHeads)", 0), - Some(new_head.number()), + Some(&new_head), ) .await; diff --git a/web3_proxy/src/block_number.rs b/web3_proxy/src/block_number.rs index 54d6dbbf..99f98345 100644 --- a/web3_proxy/src/block_number.rs +++ b/web3_proxy/src/block_number.rs @@ -1,6 +1,10 @@ //! Helper functions for turning ether's BlockNumber into numbers and updating incoming queries to match. -use crate::errors::{Web3ProxyError, Web3ProxyResult}; +use crate::{ + errors::{Web3ProxyError, Web3ProxyResult}, + rpcs::blockchain::Web3ProxyBlock, +}; use anyhow::Context; +use derive_more::From; use ethers::{ prelude::{BlockNumber, U64}, types::H256, @@ -12,16 +16,16 @@ use tracing::{trace, warn}; use crate::{frontend::authorization::Authorization, rpcs::many::Web3Rpcs}; #[allow(non_snake_case)] -pub fn block_num_to_U64(block_num: BlockNumber, latest_block: U64) -> (U64, bool) { +pub fn BlockNumber_to_U64(block_num: BlockNumber, latest_block: &U64) -> (U64, bool) { match block_num { BlockNumber::Earliest => (U64::zero(), false), BlockNumber::Finalized => { warn!("finalized block requested! not yet implemented!"); - (latest_block - 10, false) + (*latest_block - 10, false) } BlockNumber::Latest => { // change "latest" to a number - (latest_block, true) + (*latest_block, true) } BlockNumber::Number(x) => { // we already have a number @@ -30,24 +34,44 @@ pub fn block_num_to_U64(block_num: BlockNumber, latest_block: U64) -> (U64, bool BlockNumber::Pending => { // modified is false because we want the backend to see "pending" // TODO: think more about how to handle Pending - (latest_block, false) + (*latest_block, false) } BlockNumber::Safe => { - warn!("finalized block requested! not yet implemented!"); - (latest_block - 3, false) + warn!("safe block requested! not yet implemented!"); + (*latest_block - 3, false) } } } -/// modify params to always have a block number and not "latest" +#[derive(Clone, Debug, Eq, From, PartialEq)] +pub struct BlockNumAndHash(U64, H256); +impl BlockNumAndHash { + pub fn num(&self) -> &U64 { + &self.0 + } + pub fn hash(&self) -> &H256 { + &self.1 + } +} + +impl From<&Web3ProxyBlock> for BlockNumAndHash { + fn from(value: &Web3ProxyBlock) -> Self { + let n = *value.number(); + let h = *value.hash(); + + Self(n, h) + } +} + +/// modify params to always have a block number and not "latest" pub async fn clean_block_number( authorization: &Arc, params: &mut serde_json::Value, block_param_id: usize, - latest_block: U64, + latest_block: &Web3ProxyBlock, rpcs: &Web3Rpcs, -) -> anyhow::Result { +) -> anyhow::Result { match params.as_array_mut() { None => { // TODO: this needs the correct error code in the response @@ -65,11 +89,11 @@ pub async fn clean_block_number( } // don't modify params, just cache with the current block - Ok(latest_block) + Ok(latest_block.into()) } Some(x) => { // convert the json value to a BlockNumber - let (block_num, change) = if let Some(obj) = x.as_object_mut() { + let (block, change) = if let Some(obj) = x.as_object_mut() { // it might be a Map like `{"blockHash": String("0xa5626dc20d3a0a209b1de85521717a3e859698de8ce98bca1b16822b7501f74b")}` if let Some(block_hash) = obj.get("blockHash").cloned() { let block_hash: H256 = @@ -80,207 +104,256 @@ pub async fn clean_block_number( .await .context("fetching block number from hash")?; - // TODO: we do not change the - (*block.number(), false) + (BlockNumAndHash::from(&block), false) } else { return Err(anyhow::anyhow!("blockHash missing")); } } else { - // it might be a string like "latest" or a block number + // it might be a string like "latest" or a block number or a block hash // TODO: "BlockNumber" needs a better name - // TODO: use take instead of clone - let block_number = serde_json::from_value::(x.clone()) - .context("checking params for BlockNumber")?; + // TODO: move this to a helper function? + if let Ok(block_number) = serde_json::from_value::(x.clone()) { + let (block_num, change) = + BlockNumber_to_U64(block_number, latest_block.number()); - block_num_to_U64(block_number, latest_block) + let (block_hash, _) = rpcs + .block_hash(authorization, &block_num) + .await + .context("fetching block hash from number")?; + + let block = rpcs + .block(authorization, &block_hash, None, Some(3), None) + .await + .context("fetching block from hash")?; + + (BlockNumAndHash::from(&block), change) + } else if let Ok(block_hash) = serde_json::from_value::(x.clone()) { + let block = rpcs + .block(authorization, &block_hash, None, Some(3), None) + .await + .context("fetching block number from hash")?; + + (BlockNumAndHash::from(&block), false) + } else { + return Err(anyhow::anyhow!( + "param not a block identifier, block number, or block hash" + )); + } }; // if we changed "latest" to a number, update the params to match if change { - *x = json!(block_num); + trace!(old=%x, new=%block.hash(), "changing block number"); + *x = json!(block.hash()); } - Ok(block_num) + Ok(block) } }, } } /// TODO: change this to also return the hash needed? -pub enum BlockNeeded { +pub enum CacheMode { CacheSuccessForever, CacheNever, Cache { - block_num: U64, + block: BlockNumAndHash, + /// cache jsonrpc errors (server errors are never cached) cache_errors: bool, }, CacheRange { - from_block_num: U64, - to_block_num: U64, + from_block: BlockNumAndHash, + to_block: BlockNumAndHash, + /// cache jsonrpc errors (server errors are never cached) cache_errors: bool, }, } -pub async fn block_needed( - authorization: &Arc, - method: &str, - params: &mut serde_json::Value, - head_block_num: U64, - rpcs: &Web3Rpcs, -) -> Web3ProxyResult { - // some requests have potentially very large responses - // TODO: only skip caching if the response actually is large - if method.starts_with("trace_") || method == "debug_traceTransaction" { - return Ok(BlockNeeded::CacheNever); - } - - if matches!(params, serde_json::Value::Null) { - // no params given - return Ok(BlockNeeded::Cache { - block_num: head_block_num, - cache_errors: true, - }); - } - - // get the index for the BlockNumber - // The BlockNumber is usually the last element. - // TODO: double check these. i think some of the getBlock stuff will never need archive - let block_param_id = match method { - "eth_call" => 1, - "eth_estimateGas" => 1, - "eth_getBalance" => 1, - "eth_getBlockByHash" => { - // TODO: double check that any node can serve this - // TODO: can a block change? like what if it gets orphaned? - return Ok(BlockNeeded::CacheSuccessForever); +impl CacheMode { + pub async fn new( + authorization: &Arc, + method: &str, + params: &mut serde_json::Value, + head_block: &Web3ProxyBlock, + rpcs: &Web3Rpcs, + ) -> Self { + match Self::try_new(authorization, method, params, head_block, rpcs).await { + Ok(x) => x, + Err(err) => { + warn!(?err, "unable to determine cache mode from params"); + Self::CacheNever + } } - "eth_getBlockByNumber" => { - // TODO: double check that any node can serve this - // TODO: CacheSuccessForever if the block is old enough - return Ok(BlockNeeded::Cache { - block_num: head_block_num, + } + + pub async fn try_new( + authorization: &Arc, + method: &str, + params: &mut serde_json::Value, + head_block: &Web3ProxyBlock, + rpcs: &Web3Rpcs, + ) -> Web3ProxyResult { + // some requests have potentially very large responses + // TODO: only skip caching if the response actually is large + if method.starts_with("trace_") || method == "debug_traceTransaction" { + return Ok(Self::CacheNever); + } + + if matches!(params, serde_json::Value::Null) { + // no params given + return Ok(Self::Cache { + block: head_block.into(), cache_errors: true, }); } - "eth_getBlockReceipts" => 0, - "eth_getBlockTransactionCountByHash" => { - // TODO: double check that any node can serve this - return Ok(BlockNeeded::CacheSuccessForever); - } - "eth_getBlockTransactionCountByNumber" => 0, - "eth_getCode" => 1, - "eth_getLogs" => { - // TODO: think about this more - // TODO: jsonrpc has a specific code for this - let obj = params - .get_mut(0) - .ok_or_else(|| Web3ProxyError::BadRequest("invalid format. no params".into()))? - .as_object_mut() - .ok_or_else(|| { - Web3ProxyError::BadRequest("invalid format. params not object".into()) - })?; - if obj.contains_key("blockHash") { - return Ok(BlockNeeded::CacheSuccessForever); - } else { - let from_block_num = if let Some(x) = obj.get_mut("fromBlock") { - // TODO: use .take instead of clone - let block_num: BlockNumber = serde_json::from_value(x.clone())?; - - let (block_num, change) = block_num_to_U64(block_num, head_block_num); - - if change { - trace!("changing fromBlock in eth_getLogs. {} -> {}", x, block_num); - *x = json!(block_num); - } - - block_num - } else { - let (block_num, _) = block_num_to_U64(BlockNumber::Earliest, head_block_num); - - block_num - }; - - let to_block_num = if let Some(x) = obj.get_mut("toBlock") { - // TODO: use .take instead of clone - let block_num: BlockNumber = serde_json::from_value(x.clone())?; - - let (block_num, change) = block_num_to_U64(block_num, head_block_num); - - if change { - trace!("changing toBlock in eth_getLogs. {} -> {}", x, block_num); - *x = json!(block_num); - } - - block_num - } else { - head_block_num - }; - - return Ok(BlockNeeded::CacheRange { - from_block_num, - to_block_num, + // get the index for the BlockNumber + // The BlockNumber is usually the last element. + // TODO: double check these. i think some of the getBlock stuff will never need archive + let block_param_id = match method { + "eth_call" => 1, + "eth_estimateGas" => 1, + "eth_getBalance" => 1, + "eth_getBlockByHash" => { + // TODO: double check that any node can serve this + // TODO: can a block change? like what if it gets orphaned? + return Ok(CacheMode::CacheSuccessForever); + } + "eth_getBlockByNumber" => { + // TODO: double check that any node can serve this + // TODO: CacheSuccessForever if the block is old enough + return Ok(CacheMode::Cache { + block: head_block.into(), cache_errors: true, }); } - } - "eth_getStorageAt" => 2, - "eth_getTransactionByHash" => { - // TODO: not sure how best to look these up - // try full nodes first. retry will use archive - return Ok(BlockNeeded::Cache { - block_num: head_block_num, - cache_errors: true, - }); - } - "eth_getTransactionByBlockHashAndIndex" => { - // TODO: check a Cache of recent hashes - // try full nodes first. retry will use archive - return Ok(BlockNeeded::CacheSuccessForever); - } - "eth_getTransactionByBlockNumberAndIndex" => 0, - "eth_getTransactionCount" => 1, - "eth_getTransactionReceipt" => { - // TODO: not sure how best to look these up - // try full nodes first. retry will use archive - return Ok(BlockNeeded::Cache { - block_num: head_block_num, - cache_errors: true, - }); - } - "eth_getUncleByBlockHashAndIndex" => { - // TODO: check a Cache of recent hashes - // try full nodes first. retry will use archive - return Ok(BlockNeeded::CacheSuccessForever); - } - "eth_getUncleByBlockNumberAndIndex" => 0, - "eth_getUncleCountByBlockHash" => { - // TODO: check a Cache of recent hashes - // try full nodes first. retry will use archive - return Ok(BlockNeeded::CacheSuccessForever); - } - "eth_getUncleCountByBlockNumber" => 0, - _ => { - // some other command that doesn't take block numbers as an argument - // since we are caching with the head block, it should be safe to cache_errors - return Ok(BlockNeeded::Cache { - block_num: head_block_num, - cache_errors: true, - }); - } - }; + "eth_getBlockReceipts" => 0, + "eth_getBlockTransactionCountByHash" => { + // TODO: double check that any node can serve this + return Ok(CacheMode::CacheSuccessForever); + } + "eth_getBlockTransactionCountByNumber" => 0, + "eth_getCode" => 1, + "eth_getLogs" => { + // TODO: think about this more + // TODO: jsonrpc has a specific code for this + let obj = params + .get_mut(0) + .ok_or_else(|| Web3ProxyError::BadRequest("invalid format. no params".into()))? + .as_object_mut() + .ok_or_else(|| { + Web3ProxyError::BadRequest("invalid format. params not object".into()) + })?; - match clean_block_number(authorization, params, block_param_id, head_block_num, rpcs).await { - Ok(block_num) => Ok(BlockNeeded::Cache { - block_num, - cache_errors: true, - }), - Err(err) => { - warn!("could not get block from params. err={:?}", err); - Ok(BlockNeeded::Cache { - block_num: head_block_num, + if obj.contains_key("blockHash") { + return Ok(CacheMode::CacheSuccessForever); + } else { + let from_block = if let Some(x) = obj.get_mut("fromBlock") { + // TODO: use .take instead of clone + // what if its a hash? + let block_num: BlockNumber = serde_json::from_value(x.clone())?; + + let (block_num, change) = + BlockNumber_to_U64(block_num, head_block.number()); + + if change { + // TODO: include the hash instead of the number? + trace!("changing fromBlock in eth_getLogs. {} -> {}", x, block_num); + *x = json!(block_num); + } + + todo!(); + } else { + warn!("if no from, what should we default? 0 or head?"); + head_block.into() + }; + + let to_block = if let Some(x) = obj.get_mut("toBlock") { + // TODO: use .take instead of clone + // what if its a hash? + let block_num: BlockNumber = serde_json::from_value(x.clone())?; + + let (block_num, change) = + BlockNumber_to_U64(block_num, head_block.number()); + + if change { + trace!("changing toBlock in eth_getLogs. {} -> {}", x, block_num); + *x = json!(block_num); + } + + todo!(); + } else { + head_block.into() + }; + + return Ok(CacheMode::CacheRange { + from_block, + to_block, + cache_errors: true, + }); + } + } + "eth_getStorageAt" => 2, + "eth_getTransactionByHash" => { + // TODO: not sure how best to look these up + // try full nodes first. retry will use archive + return Ok(CacheMode::Cache { + block: head_block.into(), + cache_errors: true, + }); + } + "eth_getTransactionByBlockHashAndIndex" => { + // TODO: check a Cache of recent hashes + // try full nodes first. retry will use archive + return Ok(CacheMode::CacheSuccessForever); + } + "eth_getTransactionByBlockNumberAndIndex" => 0, + "eth_getTransactionCount" => 1, + "eth_getTransactionReceipt" => { + // TODO: not sure how best to look these up + // try full nodes first. retry will use archive + return Ok(CacheMode::Cache { + block: head_block.into(), + cache_errors: true, + }); + } + "eth_getUncleByBlockHashAndIndex" => { + // TODO: check a Cache of recent hashes + // try full nodes first. retry will use archive + // TODO: what happens if this block is uncled later? + return Ok(CacheMode::CacheSuccessForever); + } + "eth_getUncleByBlockNumberAndIndex" => 0, + "eth_getUncleCountByBlockHash" => { + // TODO: check a Cache of recent hashes + // try full nodes first. retry will use archive + // TODO: what happens if this block is uncled later? + return Ok(CacheMode::CacheSuccessForever); + } + "eth_getUncleCountByBlockNumber" => 0, + _ => { + // some other command that doesn't take block numbers as an argument + // since we are caching with the head block, it should be safe to cache_errors + return Ok(CacheMode::Cache { + block: head_block.into(), + cache_errors: true, + }); + } + }; + + match clean_block_number(authorization, params, block_param_id, head_block, rpcs).await { + Ok(block) => Ok(CacheMode::Cache { + block, cache_errors: true, - }) + }), + Err(err) => { + warn!("could not get block from params. err={:?}", err); + Ok(CacheMode::Cache { + block: head_block.into(), + cache_errors: true, + }) + } } } } diff --git a/web3_proxy/src/errors.rs b/web3_proxy/src/errors.rs index 5f3fe543..38ca9eea 100644 --- a/web3_proxy/src/errors.rs +++ b/web3_proxy/src/errors.rs @@ -965,8 +965,8 @@ impl Web3ProxyError { ( StatusCode::OK, JsonRpcErrorData { - message: format!("unknown block hash ({})", hash).into(), - code: None, + message: format!("block {} not found", hash).into(), + code: -32000, data: None, }, ) @@ -976,12 +976,9 @@ impl Web3ProxyError { ( StatusCode::OK, JsonRpcErrorData { - message: format!( - "unknown block number. known: {}. unknown: {}", - known, unknown - ) - .into(), - code: None, + message: format!("block #{} not found. best known is #{}", unknown, known) + .into(), + code: -32000, data: None, }, ) diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index 90c69608..3499dfbf 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -4,6 +4,7 @@ use super::rpc_proxy_ws::ProxyMode; use crate::app::{Web3ProxyApp, APP_USER_AGENT}; use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult}; use crate::jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest}; +use crate::rpcs::blockchain::Web3ProxyBlock; use crate::rpcs::one::Web3Rpc; use crate::stats::{AppStat, BackendRequests, RpcQueryStats}; use crate::user_token::UserBearerToken; @@ -476,7 +477,7 @@ impl RequestMetadata { app: &Web3ProxyApp, authorization: Arc, request: R, - head_block_num: Option<&U64>, + head_block: Option<&Web3ProxyBlock>, ) -> Arc { let request = request.into(); @@ -493,7 +494,7 @@ impl RequestMetadata { KafkaDebugLogger::try_new( app, authorization.clone(), - head_block_num, + head_block.map(|x| x.number()), "web3_proxy:rpc", request_ulid, ) diff --git a/web3_proxy/src/response_cache.rs b/web3_proxy/src/response_cache.rs index a4f7150d..aa0d11be 100644 --- a/web3_proxy/src/response_cache.rs +++ b/web3_proxy/src/response_cache.rs @@ -1,8 +1,10 @@ -use crate::{errors::Web3ProxyError, jsonrpc::JsonRpcErrorData, rpcs::blockchain::ArcBlock}; +use crate::{ + block_number::BlockNumAndHash, errors::Web3ProxyError, jsonrpc::JsonRpcErrorData, +}; use derive_more::From; use ethers::{ providers::{HttpClientError, JsonRpcError, ProviderError, WsClientError}, - types::U64, + types::{U64}, }; use hashbrown::hash_map::DefaultHashBuilder; use moka::future::Cache; @@ -14,9 +16,10 @@ use std::{ #[derive(Clone, Debug, Eq, From)] pub struct JsonRpcQueryCacheKey { + /// hashed inputs hash: u64, - from_block_num: Option, - to_block_num: Option, + from_block: Option, + to_block: Option, cache_errors: bool, } @@ -24,11 +27,11 @@ impl JsonRpcQueryCacheKey { pub fn hash(&self) -> u64 { self.hash } - pub fn from_block_num(&self) -> Option { - self.from_block_num + pub fn from_block_num(&self) -> Option<&U64> { + self.from_block.as_ref().map(|x| x.num()) } - pub fn to_block_num(&self) -> Option { - self.to_block_num + pub fn to_block_num(&self) -> Option<&U64> { + self.to_block.as_ref().map(|x| x.num()) } pub fn cache_errors(&self) -> bool { self.cache_errors @@ -50,19 +53,19 @@ impl Hash for JsonRpcQueryCacheKey { impl JsonRpcQueryCacheKey { pub fn new( - from_block: Option, - to_block: Option, + from_block: Option, + to_block: Option, method: &str, params: &serde_json::Value, cache_errors: bool, ) -> Self { - let from_block_num = from_block.as_ref().and_then(|x| x.number); - let to_block_num = to_block.as_ref().and_then(|x| x.number); + let from_block_hash = from_block.as_ref().map(|x| x.hash()); + let to_block_hash = to_block.as_ref().map(|x| x.hash()); let mut hasher = DefaultHashBuilder::default().build_hasher(); - from_block.as_ref().and_then(|x| x.hash).hash(&mut hasher); - to_block.as_ref().and_then(|x| x.hash).hash(&mut hasher); + from_block_hash.hash(&mut hasher); + to_block_hash.hash(&mut hasher); method.hash(&mut hasher); @@ -76,8 +79,8 @@ impl JsonRpcQueryCacheKey { Self { hash, - from_block_num, - to_block_num, + from_block, + to_block, cache_errors, } } diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index 3ed641f6..bffce3b8 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -17,7 +17,7 @@ use std::time::Duration; use std::{fmt::Display, sync::Arc}; use tokio::sync::broadcast; use tokio::time::timeout; -use tracing::{debug, error, trace}; +use tracing::{debug, error}; // TODO: type for Hydrated Blocks with their full transactions? pub type ArcBlock = Arc>; @@ -376,7 +376,8 @@ impl Web3Rpcs { break; } - trace!("waiting for future block {} > {}", num, head_block_num); + debug!(%head_block_num, %num, "waiting for future block"); + consensus_head_receiver.changed().await?; if let Some(head) = consensus_head_receiver.borrow_and_update().as_ref() { diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index e4815f49..cc09f7e5 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -349,19 +349,20 @@ impl fmt::Debug for RankedRpcs { // TODO: refs for all of these. borrow on a Sender is cheap enough impl Web3Rpcs { - // TODO: return a ref? pub fn head_block(&self) -> Option { self.watch_head_block .as_ref() .and_then(|x| x.borrow().clone()) } - // TODO: return a ref? + /// note: you probably want to use `head_block` instead + /// TODO: return a ref? pub fn head_block_hash(&self) -> Option { self.head_block().map(|x| *x.hash()) } - // TODO: return a ref? + /// note: you probably want to use `head_block` instead + /// TODO: return a ref? pub fn head_block_num(&self) -> Option { self.head_block().map(|x| *x.number()) }