Revert "Revert "include from_block and to_block for caching getLogs""
This reverts commit 13bab2c9cfcc09f8d503ff24f6e2c2a3fc0faaa2.
This commit is contained in:
parent
13bab2c9cf
commit
8bcf40b62d
@ -71,7 +71,9 @@ pub static REQUEST_PERIOD: u64 = 60;
|
|||||||
#[derive(From)]
|
#[derive(From)]
|
||||||
struct ResponseCacheKey {
|
struct ResponseCacheKey {
|
||||||
// if none, this is cached until evicted
|
// if none, this is cached until evicted
|
||||||
block: Option<SavedBlock>,
|
from_block: Option<SavedBlock>,
|
||||||
|
// to_block is only set when ranges of blocks are requested (like with eth_getLogs)
|
||||||
|
to_block: Option<SavedBlock>,
|
||||||
method: String,
|
method: String,
|
||||||
// TODO: better type for this
|
// TODO: better type for this
|
||||||
params: Option<serde_json::Value>,
|
params: Option<serde_json::Value>,
|
||||||
@ -96,7 +98,22 @@ impl PartialEq for ResponseCacheKey {
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
match (self.block.as_ref(), other.block.as_ref()) {
|
match (self.from_block.as_ref(), other.from_block.as_ref()) {
|
||||||
|
(None, None) => {}
|
||||||
|
(None, Some(_)) => {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
(Some(_), None) => {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
(Some(s), Some(o)) => {
|
||||||
|
if s != o {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
match (self.to_block.as_ref(), other.to_block.as_ref()) {
|
||||||
(None, None) => {}
|
(None, None) => {}
|
||||||
(None, Some(_)) => {
|
(None, Some(_)) => {
|
||||||
return false;
|
return false;
|
||||||
@ -123,7 +140,8 @@ impl Eq for ResponseCacheKey {}
|
|||||||
|
|
||||||
impl Hash for ResponseCacheKey {
|
impl Hash for ResponseCacheKey {
|
||||||
fn hash<H: Hasher>(&self, state: &mut H) {
|
fn hash<H: Hasher>(&self, state: &mut H) {
|
||||||
self.block.as_ref().map(|x| x.hash()).hash(state);
|
self.from_block.as_ref().map(|x| x.hash()).hash(state);
|
||||||
|
self.to_block.as_ref().map(|x| x.hash()).hash(state);
|
||||||
self.method.hash(state);
|
self.method.hash(state);
|
||||||
self.params.as_ref().map(|x| x.to_string()).hash(state);
|
self.params.as_ref().map(|x| x.to_string()).hash(state);
|
||||||
self.cache_errors.hash(state)
|
self.cache_errors.hash(state)
|
||||||
@ -1434,7 +1452,8 @@ impl Web3ProxyApp {
|
|||||||
.await?
|
.await?
|
||||||
{
|
{
|
||||||
BlockNeeded::CacheSuccessForever => Some(ResponseCacheKey {
|
BlockNeeded::CacheSuccessForever => Some(ResponseCacheKey {
|
||||||
block: None,
|
from_block: None,
|
||||||
|
to_block: None,
|
||||||
method: method.to_string(),
|
method: method.to_string(),
|
||||||
params: request.params.clone(),
|
params: request.params.clone(),
|
||||||
cache_errors: false,
|
cache_errors: false,
|
||||||
@ -1461,7 +1480,48 @@ impl Web3ProxyApp {
|
|||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
Some(ResponseCacheKey {
|
Some(ResponseCacheKey {
|
||||||
block: Some(SavedBlock::new(request_block)),
|
from_block: Some(SavedBlock::new(request_block)),
|
||||||
|
to_block: None,
|
||||||
|
method: method.to_string(),
|
||||||
|
// TODO: hash here?
|
||||||
|
params: request.params.clone(),
|
||||||
|
cache_errors,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
BlockNeeded::CacheRange {
|
||||||
|
from_block_num,
|
||||||
|
to_block_num,
|
||||||
|
cache_errors,
|
||||||
|
} => {
|
||||||
|
let (from_block_hash, archive_needed) = self
|
||||||
|
.balanced_rpcs
|
||||||
|
.block_hash(authorization, &from_block_num)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
if archive_needed {
|
||||||
|
request_metadata
|
||||||
|
.archive_request
|
||||||
|
.store(true, atomic::Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
|
let from_block = self
|
||||||
|
.balanced_rpcs
|
||||||
|
.block(authorization, &from_block_hash, None)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let (to_block_hash, _) = self
|
||||||
|
.balanced_rpcs
|
||||||
|
.block_hash(authorization, &to_block_num)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let to_block = self
|
||||||
|
.balanced_rpcs
|
||||||
|
.block(authorization, &to_block_hash, None)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Some(ResponseCacheKey {
|
||||||
|
from_block: Some(SavedBlock::new(from_block)),
|
||||||
|
to_block: Some(SavedBlock::new(to_block)),
|
||||||
method: method.to_string(),
|
method: method.to_string(),
|
||||||
// TODO: hash here?
|
// TODO: hash here?
|
||||||
params: request.params.clone(),
|
params: request.params.clone(),
|
||||||
@ -1476,7 +1536,7 @@ impl Web3ProxyApp {
|
|||||||
let authorization = authorization.clone();
|
let authorization = authorization.clone();
|
||||||
|
|
||||||
if let Some(cache_key) = cache_key {
|
if let Some(cache_key) = cache_key {
|
||||||
let request_block_number = cache_key.block.as_ref().map(|x| x.number());
|
let from_block_num = cache_key.from_block.as_ref().map(|x| x.number());
|
||||||
|
|
||||||
self.response_cache
|
self.response_cache
|
||||||
.try_get_with(cache_key, async move {
|
.try_get_with(cache_key, async move {
|
||||||
@ -1491,7 +1551,7 @@ impl Web3ProxyApp {
|
|||||||
&authorization,
|
&authorization,
|
||||||
request,
|
request,
|
||||||
Some(&request_metadata),
|
Some(&request_metadata),
|
||||||
request_block_number.as_ref(),
|
from_block_num.as_ref(),
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
|
@ -11,31 +11,29 @@ use std::sync::Arc;
|
|||||||
use crate::{frontend::authorization::Authorization, rpcs::connections::Web3Connections};
|
use crate::{frontend::authorization::Authorization, rpcs::connections::Web3Connections};
|
||||||
|
|
||||||
#[allow(non_snake_case)]
|
#[allow(non_snake_case)]
|
||||||
pub fn block_num_to_U64(block_num: BlockNumber, latest_block: U64) -> U64 {
|
pub fn block_num_to_U64(block_num: BlockNumber, latest_block: U64) -> (U64, bool) {
|
||||||
match block_num {
|
match block_num {
|
||||||
BlockNumber::Earliest => {
|
BlockNumber::Earliest => (U64::zero(), false),
|
||||||
// modified is false because we want the backend to see "pending"
|
|
||||||
U64::zero()
|
|
||||||
}
|
|
||||||
BlockNumber::Finalized => {
|
BlockNumber::Finalized => {
|
||||||
warn!("finalized block requested! not yet implemented!");
|
warn!("finalized block requested! not yet implemented!");
|
||||||
latest_block - 10
|
(latest_block - 10, false)
|
||||||
}
|
}
|
||||||
BlockNumber::Latest => {
|
BlockNumber::Latest => {
|
||||||
// change "latest" to a number
|
// change "latest" to a number
|
||||||
latest_block
|
(latest_block, true)
|
||||||
}
|
}
|
||||||
BlockNumber::Number(x) => {
|
BlockNumber::Number(x) => {
|
||||||
// we already have a number
|
// we already have a number
|
||||||
x
|
(x, false)
|
||||||
}
|
}
|
||||||
BlockNumber::Pending => {
|
BlockNumber::Pending => {
|
||||||
|
// modified is false because we want the backend to see "pending"
|
||||||
// TODO: think more about how to handle Pending
|
// TODO: think more about how to handle Pending
|
||||||
latest_block
|
(latest_block, false)
|
||||||
}
|
}
|
||||||
BlockNumber::Safe => {
|
BlockNumber::Safe => {
|
||||||
warn!("finalized block requested! not yet implemented!");
|
warn!("finalized block requested! not yet implemented!");
|
||||||
latest_block - 3
|
(latest_block - 3, false)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -72,7 +70,7 @@ pub async fn clean_block_number(
|
|||||||
let start = x.clone();
|
let start = x.clone();
|
||||||
|
|
||||||
// convert the json value to a BlockNumber
|
// convert the json value to a BlockNumber
|
||||||
let block_num = if let Some(obj) = x.as_object_mut() {
|
let (block_num, change) = if let Some(obj) = x.as_object_mut() {
|
||||||
// it might be a Map like `{"blockHash": String("0xa5626dc20d3a0a209b1de85521717a3e859698de8ce98bca1b16822b7501f74b")}`
|
// it might be a Map like `{"blockHash": String("0xa5626dc20d3a0a209b1de85521717a3e859698de8ce98bca1b16822b7501f74b")}`
|
||||||
if let Some(block_hash) = obj.remove("blockHash") {
|
if let Some(block_hash) = obj.remove("blockHash") {
|
||||||
let block_hash: H256 =
|
let block_hash: H256 =
|
||||||
@ -80,9 +78,13 @@ pub async fn clean_block_number(
|
|||||||
|
|
||||||
let block = rpcs.block(authorization, &block_hash, None).await?;
|
let block = rpcs.block(authorization, &block_hash, None).await?;
|
||||||
|
|
||||||
block
|
// TODO: set change to true? i think not we should probably use hashes for everything.
|
||||||
.number
|
(
|
||||||
.expect("blocks here should always have numbers")
|
block
|
||||||
|
.number
|
||||||
|
.expect("blocks here should always have numbers"),
|
||||||
|
false,
|
||||||
|
)
|
||||||
} else {
|
} else {
|
||||||
return Err(anyhow::anyhow!("blockHash missing"));
|
return Err(anyhow::anyhow!("blockHash missing"));
|
||||||
}
|
}
|
||||||
@ -95,7 +97,9 @@ pub async fn clean_block_number(
|
|||||||
};
|
};
|
||||||
|
|
||||||
// if we changed "latest" to a number, update the params to match
|
// if we changed "latest" to a number, update the params to match
|
||||||
*x = serde_json::to_value(block_num)?;
|
if change {
|
||||||
|
*x = serde_json::to_value(block_num)?;
|
||||||
|
}
|
||||||
|
|
||||||
// TODO: only do this if trace logging is enabled
|
// TODO: only do this if trace logging is enabled
|
||||||
if x.as_u64() != start.as_u64() {
|
if x.as_u64() != start.as_u64() {
|
||||||
@ -112,7 +116,15 @@ pub async fn clean_block_number(
|
|||||||
pub enum BlockNeeded {
|
pub enum BlockNeeded {
|
||||||
CacheSuccessForever,
|
CacheSuccessForever,
|
||||||
CacheNever,
|
CacheNever,
|
||||||
Cache { block_num: U64, cache_errors: bool },
|
Cache {
|
||||||
|
block_num: U64,
|
||||||
|
cache_errors: bool,
|
||||||
|
},
|
||||||
|
CacheRange {
|
||||||
|
from_block_num: U64,
|
||||||
|
to_block_num: U64,
|
||||||
|
cache_errors: bool,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn block_needed(
|
pub async fn block_needed(
|
||||||
@ -122,12 +134,13 @@ pub async fn block_needed(
|
|||||||
head_block_num: U64,
|
head_block_num: U64,
|
||||||
rpcs: &Web3Connections,
|
rpcs: &Web3Connections,
|
||||||
) -> anyhow::Result<BlockNeeded> {
|
) -> anyhow::Result<BlockNeeded> {
|
||||||
// if no params, no block is needed
|
|
||||||
let params = if let Some(params) = params {
|
let params = if let Some(params) = params {
|
||||||
|
// grab the params so we can inspect and potentially modify them
|
||||||
params
|
params
|
||||||
} else {
|
} else {
|
||||||
|
// if no params, no block is needed
|
||||||
// TODO: check all the methods with no params, some might not be cacheable
|
// TODO: check all the methods with no params, some might not be cacheable
|
||||||
// caching for one block should always be okay
|
// caching with the head block /should/ always be okay
|
||||||
return Ok(BlockNeeded::Cache {
|
return Ok(BlockNeeded::Cache {
|
||||||
block_num: head_block_num,
|
block_num: head_block_num,
|
||||||
cache_errors: true,
|
cache_errors: true,
|
||||||
@ -168,39 +181,42 @@ pub async fn block_needed(
|
|||||||
.as_object_mut()
|
.as_object_mut()
|
||||||
.ok_or_else(|| anyhow::anyhow!("invalid format"))?;
|
.ok_or_else(|| anyhow::anyhow!("invalid format"))?;
|
||||||
|
|
||||||
if let Some(x) = obj.get_mut("fromBlock") {
|
|
||||||
let block_num: BlockNumber = serde_json::from_value(x.take())?;
|
|
||||||
|
|
||||||
let block_num = block_num_to_U64(block_num, head_block_num);
|
|
||||||
|
|
||||||
*x = json!(block_num);
|
|
||||||
|
|
||||||
// TODO: maybe don't return. instead check toBlock too?
|
|
||||||
// TODO: if there is a very wide fromBlock and toBlock, we need to check that our rpcs have both!
|
|
||||||
return Ok(BlockNeeded::Cache {
|
|
||||||
block_num,
|
|
||||||
cache_errors: false,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Some(x) = obj.get_mut("toBlock") {
|
|
||||||
let block_num: BlockNumber = serde_json::from_value(x.take())?;
|
|
||||||
|
|
||||||
let block_num = block_num_to_U64(block_num, head_block_num);
|
|
||||||
|
|
||||||
*x = json!(block_num);
|
|
||||||
|
|
||||||
return Ok(BlockNeeded::Cache {
|
|
||||||
block_num,
|
|
||||||
cache_errors: false,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
if obj.contains_key("blockHash") {
|
if obj.contains_key("blockHash") {
|
||||||
1
|
1
|
||||||
} else {
|
} else {
|
||||||
return Ok(BlockNeeded::Cache {
|
let from_block_num = if let Some(x) = obj.get_mut("fromBlock") {
|
||||||
block_num: head_block_num,
|
let block_num: BlockNumber = serde_json::from_value(x.take())?;
|
||||||
|
|
||||||
|
let (block_num, change) = block_num_to_U64(block_num, head_block_num);
|
||||||
|
|
||||||
|
if change {
|
||||||
|
*x = json!(block_num);
|
||||||
|
}
|
||||||
|
|
||||||
|
block_num
|
||||||
|
} else {
|
||||||
|
let (block_num, _) = block_num_to_U64(BlockNumber::Earliest, head_block_num);
|
||||||
|
|
||||||
|
block_num
|
||||||
|
};
|
||||||
|
|
||||||
|
let to_block_num = if let Some(x) = obj.get_mut("toBlock") {
|
||||||
|
let block_num: BlockNumber = serde_json::from_value(x.take())?;
|
||||||
|
|
||||||
|
let (block_num, change) = block_num_to_U64(block_num, head_block_num);
|
||||||
|
|
||||||
|
if change {
|
||||||
|
*x = json!(block_num);
|
||||||
|
}
|
||||||
|
|
||||||
|
block_num
|
||||||
|
} else {
|
||||||
|
head_block_num
|
||||||
|
};
|
||||||
|
|
||||||
|
return Ok(BlockNeeded::CacheRange {
|
||||||
|
from_block_num: from_block_num,
|
||||||
|
to_block_num: to_block_num,
|
||||||
cache_errors: true,
|
cache_errors: true,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
@ -234,6 +234,7 @@ impl Web3Connections {
|
|||||||
head_block_num.expect("we should only get here if we have a head block");
|
head_block_num.expect("we should only get here if we have a head block");
|
||||||
|
|
||||||
// TODO: geth does 64, erigon does 90k. sometimes we run a mix
|
// TODO: geth does 64, erigon does 90k. sometimes we run a mix
|
||||||
|
// TODO: do this dynamically based on balanced_rpcs block_data_limit
|
||||||
let archive_needed = num < &(head_block_num - U64::from(64));
|
let archive_needed = num < &(head_block_num - U64::from(64));
|
||||||
|
|
||||||
// try to get the hash from our cache
|
// try to get the hash from our cache
|
||||||
|
Loading…
Reference in New Issue
Block a user