include from_block and to_block for caching getLogs

This commit is contained in:
Bryan Stitt 2023-01-30 15:47:17 -08:00
parent b1f3adfc76
commit 203969f628
3 changed files with 132 additions and 55 deletions

@ -71,7 +71,9 @@ pub static REQUEST_PERIOD: u64 = 60;
#[derive(From)]
struct ResponseCacheKey {
// if none, this is cached until evicted
block: Option<SavedBlock>,
from_block: Option<SavedBlock>,
// to_block is only set when ranges of blocks are requested (like with eth_getLogs)
to_block: Option<SavedBlock>,
method: String,
// TODO: better type for this
params: Option<serde_json::Value>,
@ -96,7 +98,22 @@ impl PartialEq for ResponseCacheKey {
return false;
}
match (self.block.as_ref(), other.block.as_ref()) {
match (self.from_block.as_ref(), other.from_block.as_ref()) {
(None, None) => {}
(None, Some(_)) => {
return false;
}
(Some(_), None) => {
return false;
}
(Some(s), Some(o)) => {
if s != o {
return false;
}
}
}
match (self.to_block.as_ref(), other.to_block.as_ref()) {
(None, None) => {}
(None, Some(_)) => {
return false;
@ -123,7 +140,8 @@ impl Eq for ResponseCacheKey {}
impl Hash for ResponseCacheKey {
fn hash<H: Hasher>(&self, state: &mut H) {
self.block.as_ref().map(|x| x.hash()).hash(state);
self.from_block.as_ref().map(|x| x.hash()).hash(state);
self.to_block.as_ref().map(|x| x.hash()).hash(state);
self.method.hash(state);
self.params.as_ref().map(|x| x.to_string()).hash(state);
self.cache_errors.hash(state)
@ -1434,7 +1452,8 @@ impl Web3ProxyApp {
.await?
{
BlockNeeded::CacheSuccessForever => Some(ResponseCacheKey {
block: None,
from_block: None,
to_block: None,
method: method.to_string(),
params: request.params.clone(),
cache_errors: false,
@ -1461,7 +1480,48 @@ impl Web3ProxyApp {
.await?;
Some(ResponseCacheKey {
block: Some(SavedBlock::new(request_block)),
from_block: Some(SavedBlock::new(request_block)),
to_block: None,
method: method.to_string(),
// TODO: hash here?
params: request.params.clone(),
cache_errors,
})
}
BlockNeeded::CacheRange {
from_block_num,
to_block_num,
cache_errors,
} => {
let (from_block_hash, archive_needed) = self
.balanced_rpcs
.block_hash(authorization, &from_block_num)
.await?;
if archive_needed {
request_metadata
.archive_request
.store(true, atomic::Ordering::Relaxed);
}
let from_block = self
.balanced_rpcs
.block(authorization, &from_block_hash, None)
.await?;
let (to_block_hash, _) = self
.balanced_rpcs
.block_hash(authorization, &to_block_num)
.await?;
let to_block = self
.balanced_rpcs
.block(authorization, &to_block_hash, None)
.await?;
Some(ResponseCacheKey {
from_block: Some(SavedBlock::new(from_block)),
to_block: Some(SavedBlock::new(to_block)),
method: method.to_string(),
// TODO: hash here?
params: request.params.clone(),
@ -1476,7 +1536,7 @@ impl Web3ProxyApp {
let authorization = authorization.clone();
if let Some(cache_key) = cache_key {
let request_block_number = cache_key.block.as_ref().map(|x| x.number());
let from_block_num = cache_key.from_block.as_ref().map(|x| x.number());
self.response_cache
.try_get_with(cache_key, async move {
@ -1491,7 +1551,7 @@ impl Web3ProxyApp {
&authorization,
request,
Some(&request_metadata),
request_block_number.as_ref(),
from_block_num.as_ref(),
)
.await?;

@ -11,31 +11,29 @@ use std::sync::Arc;
use crate::{frontend::authorization::Authorization, rpcs::connections::Web3Connections};
#[allow(non_snake_case)]
pub fn block_num_to_U64(block_num: BlockNumber, latest_block: U64) -> U64 {
pub fn block_num_to_U64(block_num: BlockNumber, latest_block: U64) -> (U64, bool) {
match block_num {
BlockNumber::Earliest => {
// modified is false because we want the backend to see "pending"
U64::zero()
}
BlockNumber::Earliest => (U64::zero(), false),
BlockNumber::Finalized => {
warn!("finalized block requested! not yet implemented!");
latest_block - 10
(latest_block - 10, false)
}
BlockNumber::Latest => {
// change "latest" to a number
latest_block
(latest_block, true)
}
BlockNumber::Number(x) => {
// we already have a number
x
(x, false)
}
BlockNumber::Pending => {
// modified is false because we want the backend to see "pending"
// TODO: think more about how to handle Pending
latest_block
(latest_block, false)
}
BlockNumber::Safe => {
warn!("finalized block requested! not yet implemented!");
latest_block - 3
(latest_block - 3, false)
}
}
}
@ -72,7 +70,7 @@ pub async fn clean_block_number(
let start = x.clone();
// convert the json value to a BlockNumber
let block_num = if let Some(obj) = x.as_object_mut() {
let (block_num, change) = if let Some(obj) = x.as_object_mut() {
// it might be a Map like `{"blockHash": String("0xa5626dc20d3a0a209b1de85521717a3e859698de8ce98bca1b16822b7501f74b")}`
if let Some(block_hash) = obj.remove("blockHash") {
let block_hash: H256 =
@ -80,9 +78,13 @@ pub async fn clean_block_number(
let block = rpcs.block(authorization, &block_hash, None).await?;
block
.number
.expect("blocks here should always have numbers")
// TODO: set change to true? i think not we should probably use hashes for everything.
(
block
.number
.expect("blocks here should always have numbers"),
false,
)
} else {
return Err(anyhow::anyhow!("blockHash missing"));
}
@ -95,7 +97,9 @@ pub async fn clean_block_number(
};
// if we changed "latest" to a number, update the params to match
*x = serde_json::to_value(block_num)?;
if change {
*x = serde_json::to_value(block_num)?;
}
// TODO: only do this if trace logging is enabled
if x.as_u64() != start.as_u64() {
@ -112,7 +116,15 @@ pub async fn clean_block_number(
pub enum BlockNeeded {
CacheSuccessForever,
CacheNever,
Cache { block_num: U64, cache_errors: bool },
Cache {
block_num: U64,
cache_errors: bool,
},
CacheRange {
from_block_num: U64,
to_block_num: U64,
cache_errors: bool,
},
}
pub async fn block_needed(
@ -122,12 +134,13 @@ pub async fn block_needed(
head_block_num: U64,
rpcs: &Web3Connections,
) -> anyhow::Result<BlockNeeded> {
// if no params, no block is needed
let params = if let Some(params) = params {
// grab the params so we can inspect and potentially modify them
params
} else {
// if no params, no block is needed
// TODO: check all the methods with no params, some might not be cacheable
// caching for one block should always be okay
// caching with the head block /should/ always be okay
return Ok(BlockNeeded::Cache {
block_num: head_block_num,
cache_errors: true,
@ -168,39 +181,42 @@ pub async fn block_needed(
.as_object_mut()
.ok_or_else(|| anyhow::anyhow!("invalid format"))?;
if let Some(x) = obj.get_mut("fromBlock") {
let block_num: BlockNumber = serde_json::from_value(x.take())?;
let block_num = block_num_to_U64(block_num, head_block_num);
*x = json!(block_num);
// TODO: maybe don't return. instead check toBlock too?
// TODO: if there is a very wide fromBlock and toBlock, we need to check that our rpcs have both!
return Ok(BlockNeeded::Cache {
block_num,
cache_errors: false,
});
}
if let Some(x) = obj.get_mut("toBlock") {
let block_num: BlockNumber = serde_json::from_value(x.take())?;
let block_num = block_num_to_U64(block_num, head_block_num);
*x = json!(block_num);
return Ok(BlockNeeded::Cache {
block_num,
cache_errors: false,
});
}
if obj.contains_key("blockHash") {
1
} else {
return Ok(BlockNeeded::Cache {
block_num: head_block_num,
let from_block_num = if let Some(x) = obj.get_mut("fromBlock") {
let block_num: BlockNumber = serde_json::from_value(x.take())?;
let (block_num, change) = block_num_to_U64(block_num, head_block_num);
if change {
*x = json!(block_num);
}
block_num
} else {
let (block_num, _) = block_num_to_U64(BlockNumber::Earliest, head_block_num);
block_num
};
let to_block_num = if let Some(x) = obj.get_mut("toBlock") {
let block_num: BlockNumber = serde_json::from_value(x.take())?;
let (block_num, change) = block_num_to_U64(block_num, head_block_num);
if change {
*x = json!(block_num);
}
block_num
} else {
head_block_num
};
return Ok(BlockNeeded::CacheRange {
from_block_num: from_block_num,
to_block_num: to_block_num,
cache_errors: true,
});
}

@ -234,6 +234,7 @@ impl Web3Connections {
head_block_num.expect("we should only get here if we have a head block");
// TODO: geth does 64, erigon does 90k. sometimes we run a mix
// TODO: do this dynamically based on balanced_rpcs block_data_limit
let archive_needed = num < &(head_block_num - U64::from(64));
// try to get the hash from our cache