improve cache keys and allow failures while parsing params
This commit is contained in:
parent
df865292a7
commit
3cfbc5baa4
@ -1,6 +1,6 @@
|
||||
mod ws;
|
||||
|
||||
use crate::block_number::{block_needed, BlockNeeded};
|
||||
use crate::block_number::CacheMode;
|
||||
use crate::config::{AppConfig, TopConfig};
|
||||
use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult};
|
||||
use crate::frontend::authorization::{
|
||||
@ -992,17 +992,18 @@ impl Web3ProxyApp {
|
||||
|
||||
// get the head block now so that any requests that need it all use the same block
|
||||
// TODO: this still has an edge condition if there is a reorg in the middle of the request!!!
|
||||
let head_block_num = self
|
||||
let head_block: Web3ProxyBlock = self
|
||||
.balanced_rpcs
|
||||
.head_block_num()
|
||||
.ok_or(Web3ProxyError::NoServersSynced)?;
|
||||
.head_block()
|
||||
.ok_or(Web3ProxyError::NoServersSynced)?
|
||||
.clone();
|
||||
|
||||
// TODO: use streams and buffers so we don't overwhelm our server
|
||||
let responses = join_all(
|
||||
requests
|
||||
.into_iter()
|
||||
.map(|request| {
|
||||
self.proxy_request(request, authorization.clone(), Some(head_block_num))
|
||||
self.proxy_request(request, authorization.clone(), Some(&head_block))
|
||||
})
|
||||
.collect::<Vec<_>>(),
|
||||
)
|
||||
@ -1122,13 +1123,13 @@ impl Web3ProxyApp {
|
||||
self: &Arc<Self>,
|
||||
mut request: JsonRpcRequest,
|
||||
authorization: Arc<Authorization>,
|
||||
head_block_num: Option<U64>,
|
||||
head_block: Option<&Web3ProxyBlock>,
|
||||
) -> (StatusCode, JsonRpcForwardedResponse, Vec<Arc<Web3Rpc>>) {
|
||||
let request_metadata = RequestMetadata::new(
|
||||
self,
|
||||
authorization,
|
||||
RequestOrMethod::Request(&request),
|
||||
head_block_num.as_ref(),
|
||||
head_block,
|
||||
)
|
||||
.await;
|
||||
|
||||
@ -1144,7 +1145,7 @@ impl Web3ProxyApp {
|
||||
._proxy_request_with_caching(
|
||||
&request.method,
|
||||
&mut request.params,
|
||||
head_block_num,
|
||||
head_block,
|
||||
Some(2),
|
||||
&request_metadata,
|
||||
)
|
||||
@ -1188,7 +1189,7 @@ impl Web3ProxyApp {
|
||||
self: &Arc<Self>,
|
||||
method: &str,
|
||||
params: &mut serde_json::Value,
|
||||
head_block_num: Option<U64>,
|
||||
head_block: Option<&Web3ProxyBlock>,
|
||||
max_tries: Option<usize>,
|
||||
request_metadata: &Arc<RequestMetadata>,
|
||||
) -> Web3ProxyResult<JsonRpcResponseEnum<Arc<RawValue>>> {
|
||||
@ -1322,7 +1323,7 @@ impl Web3ProxyApp {
|
||||
},
|
||||
"eth_accounts" => JsonRpcResponseEnum::from(serde_json::Value::Array(vec![])),
|
||||
"eth_blockNumber" => {
|
||||
match head_block_num.or(self.balanced_rpcs.head_block_num()) {
|
||||
match head_block.cloned().or(self.balanced_rpcs.head_block()) {
|
||||
Some(head_block_num) => JsonRpcResponseEnum::from(json!(head_block_num)),
|
||||
None => {
|
||||
// TODO: what does geth do if this happens?
|
||||
@ -1612,38 +1613,36 @@ impl Web3ProxyApp {
|
||||
}
|
||||
|
||||
// TODO: if no servers synced, wait for them to be synced? probably better to error and let haproxy retry another server
|
||||
let head_block_num = head_block_num
|
||||
.or(self.balanced_rpcs.head_block_num())
|
||||
let head_block: Web3ProxyBlock = head_block
|
||||
.cloned()
|
||||
.or_else(|| self.balanced_rpcs.head_block())
|
||||
.ok_or(Web3ProxyError::NoServersSynced)?;
|
||||
|
||||
// we do this check before checking caches because it might modify the request params
|
||||
// TODO: add a stat for archive vs full since they should probably cost different
|
||||
// TODO: this cache key can be rather large. is that okay?
|
||||
let cache_key: Option<JsonRpcQueryCacheKey> = match block_needed(
|
||||
let cache_key: Option<JsonRpcQueryCacheKey> = match CacheMode::new(
|
||||
&authorization,
|
||||
method,
|
||||
params,
|
||||
head_block_num,
|
||||
&head_block,
|
||||
&self.balanced_rpcs,
|
||||
)
|
||||
.await?
|
||||
.await
|
||||
{
|
||||
BlockNeeded::CacheSuccessForever => Some(JsonRpcQueryCacheKey::new(
|
||||
CacheMode::CacheSuccessForever => Some(JsonRpcQueryCacheKey::new(
|
||||
None,
|
||||
None,
|
||||
method,
|
||||
params,
|
||||
false,
|
||||
)),
|
||||
BlockNeeded::CacheNever => None,
|
||||
BlockNeeded::Cache {
|
||||
block_num,
|
||||
CacheMode::CacheNever => None,
|
||||
CacheMode::Cache {
|
||||
block,
|
||||
cache_errors,
|
||||
} => {
|
||||
let (request_block_hash, block_depth) = self
|
||||
.balanced_rpcs
|
||||
.block_hash(&authorization, &block_num)
|
||||
.await?;
|
||||
let block_depth = (head_block.number() - block.num()).as_u64();
|
||||
|
||||
if block_depth < self.config.archive_depth {
|
||||
request_metadata
|
||||
@ -1651,29 +1650,20 @@ impl Web3ProxyApp {
|
||||
.store(true, atomic::Ordering::Release);
|
||||
}
|
||||
|
||||
let request_block = self
|
||||
.balanced_rpcs
|
||||
.block(&authorization, &request_block_hash, None, Some(3), None)
|
||||
.await?
|
||||
.block;
|
||||
|
||||
Some(JsonRpcQueryCacheKey::new(
|
||||
Some(request_block),
|
||||
Some(block),
|
||||
None,
|
||||
method,
|
||||
params,
|
||||
cache_errors,
|
||||
))
|
||||
}
|
||||
BlockNeeded::CacheRange {
|
||||
from_block_num,
|
||||
to_block_num,
|
||||
CacheMode::CacheRange {
|
||||
from_block,
|
||||
to_block,
|
||||
cache_errors,
|
||||
} => {
|
||||
let (from_block_hash, block_depth) = self
|
||||
.balanced_rpcs
|
||||
.block_hash(&authorization, &from_block_num)
|
||||
.await?;
|
||||
let block_depth = (head_block.number() - from_block.num()).as_u64();
|
||||
|
||||
if block_depth < self.config.archive_depth {
|
||||
request_metadata
|
||||
@ -1681,23 +1671,6 @@ impl Web3ProxyApp {
|
||||
.store(true, atomic::Ordering::Release);
|
||||
}
|
||||
|
||||
let from_block = self
|
||||
.balanced_rpcs
|
||||
.block(&authorization, &from_block_hash, None, Some(3), None)
|
||||
.await?
|
||||
.block;
|
||||
|
||||
let (to_block_hash, _) = self
|
||||
.balanced_rpcs
|
||||
.block_hash(&authorization, &to_block_num)
|
||||
.await?;
|
||||
|
||||
let to_block = self
|
||||
.balanced_rpcs
|
||||
.block(&authorization, &to_block_hash, None, Some(3), None)
|
||||
.await?
|
||||
.block;
|
||||
|
||||
Some(JsonRpcQueryCacheKey::new(
|
||||
Some(from_block),
|
||||
Some(to_block),
|
||||
@ -1709,11 +1682,11 @@ impl Web3ProxyApp {
|
||||
};
|
||||
|
||||
// TODO: different timeouts for different user tiers. get the duration out of the request_metadata
|
||||
let max_wait = Duration::from_secs(240);
|
||||
let backend_request_timetout = Duration::from_secs(240);
|
||||
|
||||
if let Some(cache_key) = cache_key {
|
||||
let from_block_num = cache_key.from_block_num();
|
||||
let to_block_num = cache_key.to_block_num();
|
||||
let from_block_num = cache_key.from_block_num().copied();
|
||||
let to_block_num = cache_key.to_block_num().copied();
|
||||
let cache_jsonrpc_errors = cache_key.cache_errors();
|
||||
|
||||
// TODO: try to fetch out of s3
|
||||
@ -1722,14 +1695,14 @@ impl Web3ProxyApp {
|
||||
.jsonrpc_response_cache
|
||||
.try_get_with::<_, Web3ProxyError>(cache_key.hash(), async {
|
||||
let response_data = timeout(
|
||||
max_wait + Duration::from_millis(100),
|
||||
backend_request_timetout + Duration::from_millis(100),
|
||||
self.balanced_rpcs
|
||||
.try_proxy_connection::<_, Arc<RawValue>>(
|
||||
method,
|
||||
params,
|
||||
Some(request_metadata),
|
||||
max_tries,
|
||||
Some(max_wait),
|
||||
Some(backend_request_timetout),
|
||||
from_block_num.as_ref(),
|
||||
to_block_num.as_ref(),
|
||||
))
|
||||
@ -1749,14 +1722,14 @@ impl Web3ProxyApp {
|
||||
}).await?
|
||||
} else {
|
||||
let x = timeout(
|
||||
max_wait + Duration::from_millis(100),
|
||||
backend_request_timetout + Duration::from_millis(100),
|
||||
self.balanced_rpcs
|
||||
.try_proxy_connection::<_, Arc<RawValue>>(
|
||||
method,
|
||||
params,
|
||||
Some(request_metadata),
|
||||
max_tries,
|
||||
Some(max_wait),
|
||||
Some(backend_request_timetout),
|
||||
None,
|
||||
None,
|
||||
)
|
||||
|
@ -79,7 +79,7 @@ impl Web3ProxyApp {
|
||||
&app,
|
||||
authorization.clone(),
|
||||
RequestOrMethod::Method("eth_subscribe(newHeads)", 0),
|
||||
Some(new_head.number()),
|
||||
Some(&new_head),
|
||||
)
|
||||
.await;
|
||||
|
||||
|
@ -1,6 +1,10 @@
|
||||
//! Helper functions for turning ether's BlockNumber into numbers and updating incoming queries to match.
|
||||
use crate::errors::{Web3ProxyError, Web3ProxyResult};
|
||||
use crate::{
|
||||
errors::{Web3ProxyError, Web3ProxyResult},
|
||||
rpcs::blockchain::Web3ProxyBlock,
|
||||
};
|
||||
use anyhow::Context;
|
||||
use derive_more::From;
|
||||
use ethers::{
|
||||
prelude::{BlockNumber, U64},
|
||||
types::H256,
|
||||
@ -12,16 +16,16 @@ use tracing::{trace, warn};
|
||||
use crate::{frontend::authorization::Authorization, rpcs::many::Web3Rpcs};
|
||||
|
||||
#[allow(non_snake_case)]
|
||||
pub fn block_num_to_U64(block_num: BlockNumber, latest_block: U64) -> (U64, bool) {
|
||||
pub fn BlockNumber_to_U64(block_num: BlockNumber, latest_block: &U64) -> (U64, bool) {
|
||||
match block_num {
|
||||
BlockNumber::Earliest => (U64::zero(), false),
|
||||
BlockNumber::Finalized => {
|
||||
warn!("finalized block requested! not yet implemented!");
|
||||
(latest_block - 10, false)
|
||||
(*latest_block - 10, false)
|
||||
}
|
||||
BlockNumber::Latest => {
|
||||
// change "latest" to a number
|
||||
(latest_block, true)
|
||||
(*latest_block, true)
|
||||
}
|
||||
BlockNumber::Number(x) => {
|
||||
// we already have a number
|
||||
@ -30,24 +34,44 @@ pub fn block_num_to_U64(block_num: BlockNumber, latest_block: U64) -> (U64, bool
|
||||
BlockNumber::Pending => {
|
||||
// modified is false because we want the backend to see "pending"
|
||||
// TODO: think more about how to handle Pending
|
||||
(latest_block, false)
|
||||
(*latest_block, false)
|
||||
}
|
||||
BlockNumber::Safe => {
|
||||
warn!("finalized block requested! not yet implemented!");
|
||||
(latest_block - 3, false)
|
||||
warn!("safe block requested! not yet implemented!");
|
||||
(*latest_block - 3, false)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// modify params to always have a block number and not "latest"
|
||||
#[derive(Clone, Debug, Eq, From, PartialEq)]
|
||||
pub struct BlockNumAndHash(U64, H256);
|
||||
|
||||
impl BlockNumAndHash {
|
||||
pub fn num(&self) -> &U64 {
|
||||
&self.0
|
||||
}
|
||||
pub fn hash(&self) -> &H256 {
|
||||
&self.1
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&Web3ProxyBlock> for BlockNumAndHash {
|
||||
fn from(value: &Web3ProxyBlock) -> Self {
|
||||
let n = *value.number();
|
||||
let h = *value.hash();
|
||||
|
||||
Self(n, h)
|
||||
}
|
||||
}
|
||||
|
||||
/// modify params to always have a block number and not "latest"
|
||||
pub async fn clean_block_number(
|
||||
authorization: &Arc<Authorization>,
|
||||
params: &mut serde_json::Value,
|
||||
block_param_id: usize,
|
||||
latest_block: U64,
|
||||
latest_block: &Web3ProxyBlock,
|
||||
rpcs: &Web3Rpcs,
|
||||
) -> anyhow::Result<U64> {
|
||||
) -> anyhow::Result<BlockNumAndHash> {
|
||||
match params.as_array_mut() {
|
||||
None => {
|
||||
// TODO: this needs the correct error code in the response
|
||||
@ -65,11 +89,11 @@ pub async fn clean_block_number(
|
||||
}
|
||||
|
||||
// don't modify params, just cache with the current block
|
||||
Ok(latest_block)
|
||||
Ok(latest_block.into())
|
||||
}
|
||||
Some(x) => {
|
||||
// convert the json value to a BlockNumber
|
||||
let (block_num, change) = if let Some(obj) = x.as_object_mut() {
|
||||
let (block, change) = if let Some(obj) = x.as_object_mut() {
|
||||
// it might be a Map like `{"blockHash": String("0xa5626dc20d3a0a209b1de85521717a3e859698de8ce98bca1b16822b7501f74b")}`
|
||||
if let Some(block_hash) = obj.get("blockHash").cloned() {
|
||||
let block_hash: H256 =
|
||||
@ -80,207 +104,256 @@ pub async fn clean_block_number(
|
||||
.await
|
||||
.context("fetching block number from hash")?;
|
||||
|
||||
// TODO: we do not change the
|
||||
(*block.number(), false)
|
||||
(BlockNumAndHash::from(&block), false)
|
||||
} else {
|
||||
return Err(anyhow::anyhow!("blockHash missing"));
|
||||
}
|
||||
} else {
|
||||
// it might be a string like "latest" or a block number
|
||||
// it might be a string like "latest" or a block number or a block hash
|
||||
// TODO: "BlockNumber" needs a better name
|
||||
// TODO: use take instead of clone
|
||||
let block_number = serde_json::from_value::<BlockNumber>(x.clone())
|
||||
.context("checking params for BlockNumber")?;
|
||||
// TODO: move this to a helper function?
|
||||
if let Ok(block_number) = serde_json::from_value::<BlockNumber>(x.clone()) {
|
||||
let (block_num, change) =
|
||||
BlockNumber_to_U64(block_number, latest_block.number());
|
||||
|
||||
block_num_to_U64(block_number, latest_block)
|
||||
let (block_hash, _) = rpcs
|
||||
.block_hash(authorization, &block_num)
|
||||
.await
|
||||
.context("fetching block hash from number")?;
|
||||
|
||||
let block = rpcs
|
||||
.block(authorization, &block_hash, None, Some(3), None)
|
||||
.await
|
||||
.context("fetching block from hash")?;
|
||||
|
||||
(BlockNumAndHash::from(&block), change)
|
||||
} else if let Ok(block_hash) = serde_json::from_value::<H256>(x.clone()) {
|
||||
let block = rpcs
|
||||
.block(authorization, &block_hash, None, Some(3), None)
|
||||
.await
|
||||
.context("fetching block number from hash")?;
|
||||
|
||||
(BlockNumAndHash::from(&block), false)
|
||||
} else {
|
||||
return Err(anyhow::anyhow!(
|
||||
"param not a block identifier, block number, or block hash"
|
||||
));
|
||||
}
|
||||
};
|
||||
|
||||
// if we changed "latest" to a number, update the params to match
|
||||
if change {
|
||||
*x = json!(block_num);
|
||||
trace!(old=%x, new=%block.hash(), "changing block number");
|
||||
*x = json!(block.hash());
|
||||
}
|
||||
|
||||
Ok(block_num)
|
||||
Ok(block)
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
/// TODO: change this to also return the hash needed?
|
||||
pub enum BlockNeeded {
|
||||
pub enum CacheMode {
|
||||
CacheSuccessForever,
|
||||
CacheNever,
|
||||
Cache {
|
||||
block_num: U64,
|
||||
block: BlockNumAndHash,
|
||||
/// cache jsonrpc errors (server errors are never cached)
|
||||
cache_errors: bool,
|
||||
},
|
||||
CacheRange {
|
||||
from_block_num: U64,
|
||||
to_block_num: U64,
|
||||
from_block: BlockNumAndHash,
|
||||
to_block: BlockNumAndHash,
|
||||
/// cache jsonrpc errors (server errors are never cached)
|
||||
cache_errors: bool,
|
||||
},
|
||||
}
|
||||
|
||||
pub async fn block_needed(
|
||||
authorization: &Arc<Authorization>,
|
||||
method: &str,
|
||||
params: &mut serde_json::Value,
|
||||
head_block_num: U64,
|
||||
rpcs: &Web3Rpcs,
|
||||
) -> Web3ProxyResult<BlockNeeded> {
|
||||
// some requests have potentially very large responses
|
||||
// TODO: only skip caching if the response actually is large
|
||||
if method.starts_with("trace_") || method == "debug_traceTransaction" {
|
||||
return Ok(BlockNeeded::CacheNever);
|
||||
}
|
||||
|
||||
if matches!(params, serde_json::Value::Null) {
|
||||
// no params given
|
||||
return Ok(BlockNeeded::Cache {
|
||||
block_num: head_block_num,
|
||||
cache_errors: true,
|
||||
});
|
||||
}
|
||||
|
||||
// get the index for the BlockNumber
|
||||
// The BlockNumber is usually the last element.
|
||||
// TODO: double check these. i think some of the getBlock stuff will never need archive
|
||||
let block_param_id = match method {
|
||||
"eth_call" => 1,
|
||||
"eth_estimateGas" => 1,
|
||||
"eth_getBalance" => 1,
|
||||
"eth_getBlockByHash" => {
|
||||
// TODO: double check that any node can serve this
|
||||
// TODO: can a block change? like what if it gets orphaned?
|
||||
return Ok(BlockNeeded::CacheSuccessForever);
|
||||
impl CacheMode {
|
||||
pub async fn new(
|
||||
authorization: &Arc<Authorization>,
|
||||
method: &str,
|
||||
params: &mut serde_json::Value,
|
||||
head_block: &Web3ProxyBlock,
|
||||
rpcs: &Web3Rpcs,
|
||||
) -> Self {
|
||||
match Self::try_new(authorization, method, params, head_block, rpcs).await {
|
||||
Ok(x) => x,
|
||||
Err(err) => {
|
||||
warn!(?err, "unable to determine cache mode from params");
|
||||
Self::CacheNever
|
||||
}
|
||||
}
|
||||
"eth_getBlockByNumber" => {
|
||||
// TODO: double check that any node can serve this
|
||||
// TODO: CacheSuccessForever if the block is old enough
|
||||
return Ok(BlockNeeded::Cache {
|
||||
block_num: head_block_num,
|
||||
}
|
||||
|
||||
pub async fn try_new(
|
||||
authorization: &Arc<Authorization>,
|
||||
method: &str,
|
||||
params: &mut serde_json::Value,
|
||||
head_block: &Web3ProxyBlock,
|
||||
rpcs: &Web3Rpcs,
|
||||
) -> Web3ProxyResult<Self> {
|
||||
// some requests have potentially very large responses
|
||||
// TODO: only skip caching if the response actually is large
|
||||
if method.starts_with("trace_") || method == "debug_traceTransaction" {
|
||||
return Ok(Self::CacheNever);
|
||||
}
|
||||
|
||||
if matches!(params, serde_json::Value::Null) {
|
||||
// no params given
|
||||
return Ok(Self::Cache {
|
||||
block: head_block.into(),
|
||||
cache_errors: true,
|
||||
});
|
||||
}
|
||||
"eth_getBlockReceipts" => 0,
|
||||
"eth_getBlockTransactionCountByHash" => {
|
||||
// TODO: double check that any node can serve this
|
||||
return Ok(BlockNeeded::CacheSuccessForever);
|
||||
}
|
||||
"eth_getBlockTransactionCountByNumber" => 0,
|
||||
"eth_getCode" => 1,
|
||||
"eth_getLogs" => {
|
||||
// TODO: think about this more
|
||||
// TODO: jsonrpc has a specific code for this
|
||||
let obj = params
|
||||
.get_mut(0)
|
||||
.ok_or_else(|| Web3ProxyError::BadRequest("invalid format. no params".into()))?
|
||||
.as_object_mut()
|
||||
.ok_or_else(|| {
|
||||
Web3ProxyError::BadRequest("invalid format. params not object".into())
|
||||
})?;
|
||||
|
||||
if obj.contains_key("blockHash") {
|
||||
return Ok(BlockNeeded::CacheSuccessForever);
|
||||
} else {
|
||||
let from_block_num = if let Some(x) = obj.get_mut("fromBlock") {
|
||||
// TODO: use .take instead of clone
|
||||
let block_num: BlockNumber = serde_json::from_value(x.clone())?;
|
||||
|
||||
let (block_num, change) = block_num_to_U64(block_num, head_block_num);
|
||||
|
||||
if change {
|
||||
trace!("changing fromBlock in eth_getLogs. {} -> {}", x, block_num);
|
||||
*x = json!(block_num);
|
||||
}
|
||||
|
||||
block_num
|
||||
} else {
|
||||
let (block_num, _) = block_num_to_U64(BlockNumber::Earliest, head_block_num);
|
||||
|
||||
block_num
|
||||
};
|
||||
|
||||
let to_block_num = if let Some(x) = obj.get_mut("toBlock") {
|
||||
// TODO: use .take instead of clone
|
||||
let block_num: BlockNumber = serde_json::from_value(x.clone())?;
|
||||
|
||||
let (block_num, change) = block_num_to_U64(block_num, head_block_num);
|
||||
|
||||
if change {
|
||||
trace!("changing toBlock in eth_getLogs. {} -> {}", x, block_num);
|
||||
*x = json!(block_num);
|
||||
}
|
||||
|
||||
block_num
|
||||
} else {
|
||||
head_block_num
|
||||
};
|
||||
|
||||
return Ok(BlockNeeded::CacheRange {
|
||||
from_block_num,
|
||||
to_block_num,
|
||||
// get the index for the BlockNumber
|
||||
// The BlockNumber is usually the last element.
|
||||
// TODO: double check these. i think some of the getBlock stuff will never need archive
|
||||
let block_param_id = match method {
|
||||
"eth_call" => 1,
|
||||
"eth_estimateGas" => 1,
|
||||
"eth_getBalance" => 1,
|
||||
"eth_getBlockByHash" => {
|
||||
// TODO: double check that any node can serve this
|
||||
// TODO: can a block change? like what if it gets orphaned?
|
||||
return Ok(CacheMode::CacheSuccessForever);
|
||||
}
|
||||
"eth_getBlockByNumber" => {
|
||||
// TODO: double check that any node can serve this
|
||||
// TODO: CacheSuccessForever if the block is old enough
|
||||
return Ok(CacheMode::Cache {
|
||||
block: head_block.into(),
|
||||
cache_errors: true,
|
||||
});
|
||||
}
|
||||
}
|
||||
"eth_getStorageAt" => 2,
|
||||
"eth_getTransactionByHash" => {
|
||||
// TODO: not sure how best to look these up
|
||||
// try full nodes first. retry will use archive
|
||||
return Ok(BlockNeeded::Cache {
|
||||
block_num: head_block_num,
|
||||
cache_errors: true,
|
||||
});
|
||||
}
|
||||
"eth_getTransactionByBlockHashAndIndex" => {
|
||||
// TODO: check a Cache of recent hashes
|
||||
// try full nodes first. retry will use archive
|
||||
return Ok(BlockNeeded::CacheSuccessForever);
|
||||
}
|
||||
"eth_getTransactionByBlockNumberAndIndex" => 0,
|
||||
"eth_getTransactionCount" => 1,
|
||||
"eth_getTransactionReceipt" => {
|
||||
// TODO: not sure how best to look these up
|
||||
// try full nodes first. retry will use archive
|
||||
return Ok(BlockNeeded::Cache {
|
||||
block_num: head_block_num,
|
||||
cache_errors: true,
|
||||
});
|
||||
}
|
||||
"eth_getUncleByBlockHashAndIndex" => {
|
||||
// TODO: check a Cache of recent hashes
|
||||
// try full nodes first. retry will use archive
|
||||
return Ok(BlockNeeded::CacheSuccessForever);
|
||||
}
|
||||
"eth_getUncleByBlockNumberAndIndex" => 0,
|
||||
"eth_getUncleCountByBlockHash" => {
|
||||
// TODO: check a Cache of recent hashes
|
||||
// try full nodes first. retry will use archive
|
||||
return Ok(BlockNeeded::CacheSuccessForever);
|
||||
}
|
||||
"eth_getUncleCountByBlockNumber" => 0,
|
||||
_ => {
|
||||
// some other command that doesn't take block numbers as an argument
|
||||
// since we are caching with the head block, it should be safe to cache_errors
|
||||
return Ok(BlockNeeded::Cache {
|
||||
block_num: head_block_num,
|
||||
cache_errors: true,
|
||||
});
|
||||
}
|
||||
};
|
||||
"eth_getBlockReceipts" => 0,
|
||||
"eth_getBlockTransactionCountByHash" => {
|
||||
// TODO: double check that any node can serve this
|
||||
return Ok(CacheMode::CacheSuccessForever);
|
||||
}
|
||||
"eth_getBlockTransactionCountByNumber" => 0,
|
||||
"eth_getCode" => 1,
|
||||
"eth_getLogs" => {
|
||||
// TODO: think about this more
|
||||
// TODO: jsonrpc has a specific code for this
|
||||
let obj = params
|
||||
.get_mut(0)
|
||||
.ok_or_else(|| Web3ProxyError::BadRequest("invalid format. no params".into()))?
|
||||
.as_object_mut()
|
||||
.ok_or_else(|| {
|
||||
Web3ProxyError::BadRequest("invalid format. params not object".into())
|
||||
})?;
|
||||
|
||||
match clean_block_number(authorization, params, block_param_id, head_block_num, rpcs).await {
|
||||
Ok(block_num) => Ok(BlockNeeded::Cache {
|
||||
block_num,
|
||||
cache_errors: true,
|
||||
}),
|
||||
Err(err) => {
|
||||
warn!("could not get block from params. err={:?}", err);
|
||||
Ok(BlockNeeded::Cache {
|
||||
block_num: head_block_num,
|
||||
if obj.contains_key("blockHash") {
|
||||
return Ok(CacheMode::CacheSuccessForever);
|
||||
} else {
|
||||
let from_block = if let Some(x) = obj.get_mut("fromBlock") {
|
||||
// TODO: use .take instead of clone
|
||||
// what if its a hash?
|
||||
let block_num: BlockNumber = serde_json::from_value(x.clone())?;
|
||||
|
||||
let (block_num, change) =
|
||||
BlockNumber_to_U64(block_num, head_block.number());
|
||||
|
||||
if change {
|
||||
// TODO: include the hash instead of the number?
|
||||
trace!("changing fromBlock in eth_getLogs. {} -> {}", x, block_num);
|
||||
*x = json!(block_num);
|
||||
}
|
||||
|
||||
todo!();
|
||||
} else {
|
||||
warn!("if no from, what should we default? 0 or head?");
|
||||
head_block.into()
|
||||
};
|
||||
|
||||
let to_block = if let Some(x) = obj.get_mut("toBlock") {
|
||||
// TODO: use .take instead of clone
|
||||
// what if its a hash?
|
||||
let block_num: BlockNumber = serde_json::from_value(x.clone())?;
|
||||
|
||||
let (block_num, change) =
|
||||
BlockNumber_to_U64(block_num, head_block.number());
|
||||
|
||||
if change {
|
||||
trace!("changing toBlock in eth_getLogs. {} -> {}", x, block_num);
|
||||
*x = json!(block_num);
|
||||
}
|
||||
|
||||
todo!();
|
||||
} else {
|
||||
head_block.into()
|
||||
};
|
||||
|
||||
return Ok(CacheMode::CacheRange {
|
||||
from_block,
|
||||
to_block,
|
||||
cache_errors: true,
|
||||
});
|
||||
}
|
||||
}
|
||||
"eth_getStorageAt" => 2,
|
||||
"eth_getTransactionByHash" => {
|
||||
// TODO: not sure how best to look these up
|
||||
// try full nodes first. retry will use archive
|
||||
return Ok(CacheMode::Cache {
|
||||
block: head_block.into(),
|
||||
cache_errors: true,
|
||||
});
|
||||
}
|
||||
"eth_getTransactionByBlockHashAndIndex" => {
|
||||
// TODO: check a Cache of recent hashes
|
||||
// try full nodes first. retry will use archive
|
||||
return Ok(CacheMode::CacheSuccessForever);
|
||||
}
|
||||
"eth_getTransactionByBlockNumberAndIndex" => 0,
|
||||
"eth_getTransactionCount" => 1,
|
||||
"eth_getTransactionReceipt" => {
|
||||
// TODO: not sure how best to look these up
|
||||
// try full nodes first. retry will use archive
|
||||
return Ok(CacheMode::Cache {
|
||||
block: head_block.into(),
|
||||
cache_errors: true,
|
||||
});
|
||||
}
|
||||
"eth_getUncleByBlockHashAndIndex" => {
|
||||
// TODO: check a Cache of recent hashes
|
||||
// try full nodes first. retry will use archive
|
||||
// TODO: what happens if this block is uncled later?
|
||||
return Ok(CacheMode::CacheSuccessForever);
|
||||
}
|
||||
"eth_getUncleByBlockNumberAndIndex" => 0,
|
||||
"eth_getUncleCountByBlockHash" => {
|
||||
// TODO: check a Cache of recent hashes
|
||||
// try full nodes first. retry will use archive
|
||||
// TODO: what happens if this block is uncled later?
|
||||
return Ok(CacheMode::CacheSuccessForever);
|
||||
}
|
||||
"eth_getUncleCountByBlockNumber" => 0,
|
||||
_ => {
|
||||
// some other command that doesn't take block numbers as an argument
|
||||
// since we are caching with the head block, it should be safe to cache_errors
|
||||
return Ok(CacheMode::Cache {
|
||||
block: head_block.into(),
|
||||
cache_errors: true,
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
match clean_block_number(authorization, params, block_param_id, head_block, rpcs).await {
|
||||
Ok(block) => Ok(CacheMode::Cache {
|
||||
block,
|
||||
cache_errors: true,
|
||||
})
|
||||
}),
|
||||
Err(err) => {
|
||||
warn!("could not get block from params. err={:?}", err);
|
||||
Ok(CacheMode::Cache {
|
||||
block: head_block.into(),
|
||||
cache_errors: true,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -965,8 +965,8 @@ impl Web3ProxyError {
|
||||
(
|
||||
StatusCode::OK,
|
||||
JsonRpcErrorData {
|
||||
message: format!("unknown block hash ({})", hash).into(),
|
||||
code: None,
|
||||
message: format!("block {} not found", hash).into(),
|
||||
code: -32000,
|
||||
data: None,
|
||||
},
|
||||
)
|
||||
@ -976,12 +976,9 @@ impl Web3ProxyError {
|
||||
(
|
||||
StatusCode::OK,
|
||||
JsonRpcErrorData {
|
||||
message: format!(
|
||||
"unknown block number. known: {}. unknown: {}",
|
||||
known, unknown
|
||||
)
|
||||
.into(),
|
||||
code: None,
|
||||
message: format!("block #{} not found. best known is #{}", unknown, known)
|
||||
.into(),
|
||||
code: -32000,
|
||||
data: None,
|
||||
},
|
||||
)
|
||||
|
@ -4,6 +4,7 @@ use super::rpc_proxy_ws::ProxyMode;
|
||||
use crate::app::{Web3ProxyApp, APP_USER_AGENT};
|
||||
use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult};
|
||||
use crate::jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest};
|
||||
use crate::rpcs::blockchain::Web3ProxyBlock;
|
||||
use crate::rpcs::one::Web3Rpc;
|
||||
use crate::stats::{AppStat, BackendRequests, RpcQueryStats};
|
||||
use crate::user_token::UserBearerToken;
|
||||
@ -476,7 +477,7 @@ impl RequestMetadata {
|
||||
app: &Web3ProxyApp,
|
||||
authorization: Arc<Authorization>,
|
||||
request: R,
|
||||
head_block_num: Option<&U64>,
|
||||
head_block: Option<&Web3ProxyBlock>,
|
||||
) -> Arc<Self> {
|
||||
let request = request.into();
|
||||
|
||||
@ -493,7 +494,7 @@ impl RequestMetadata {
|
||||
KafkaDebugLogger::try_new(
|
||||
app,
|
||||
authorization.clone(),
|
||||
head_block_num,
|
||||
head_block.map(|x| x.number()),
|
||||
"web3_proxy:rpc",
|
||||
request_ulid,
|
||||
)
|
||||
|
@ -1,8 +1,10 @@
|
||||
use crate::{errors::Web3ProxyError, jsonrpc::JsonRpcErrorData, rpcs::blockchain::ArcBlock};
|
||||
use crate::{
|
||||
block_number::BlockNumAndHash, errors::Web3ProxyError, jsonrpc::JsonRpcErrorData,
|
||||
};
|
||||
use derive_more::From;
|
||||
use ethers::{
|
||||
providers::{HttpClientError, JsonRpcError, ProviderError, WsClientError},
|
||||
types::U64,
|
||||
types::{U64},
|
||||
};
|
||||
use hashbrown::hash_map::DefaultHashBuilder;
|
||||
use moka::future::Cache;
|
||||
@ -14,9 +16,10 @@ use std::{
|
||||
|
||||
#[derive(Clone, Debug, Eq, From)]
|
||||
pub struct JsonRpcQueryCacheKey {
|
||||
/// hashed inputs
|
||||
hash: u64,
|
||||
from_block_num: Option<U64>,
|
||||
to_block_num: Option<U64>,
|
||||
from_block: Option<BlockNumAndHash>,
|
||||
to_block: Option<BlockNumAndHash>,
|
||||
cache_errors: bool,
|
||||
}
|
||||
|
||||
@ -24,11 +27,11 @@ impl JsonRpcQueryCacheKey {
|
||||
pub fn hash(&self) -> u64 {
|
||||
self.hash
|
||||
}
|
||||
pub fn from_block_num(&self) -> Option<U64> {
|
||||
self.from_block_num
|
||||
pub fn from_block_num(&self) -> Option<&U64> {
|
||||
self.from_block.as_ref().map(|x| x.num())
|
||||
}
|
||||
pub fn to_block_num(&self) -> Option<U64> {
|
||||
self.to_block_num
|
||||
pub fn to_block_num(&self) -> Option<&U64> {
|
||||
self.to_block.as_ref().map(|x| x.num())
|
||||
}
|
||||
pub fn cache_errors(&self) -> bool {
|
||||
self.cache_errors
|
||||
@ -50,19 +53,19 @@ impl Hash for JsonRpcQueryCacheKey {
|
||||
|
||||
impl JsonRpcQueryCacheKey {
|
||||
pub fn new(
|
||||
from_block: Option<ArcBlock>,
|
||||
to_block: Option<ArcBlock>,
|
||||
from_block: Option<BlockNumAndHash>,
|
||||
to_block: Option<BlockNumAndHash>,
|
||||
method: &str,
|
||||
params: &serde_json::Value,
|
||||
cache_errors: bool,
|
||||
) -> Self {
|
||||
let from_block_num = from_block.as_ref().and_then(|x| x.number);
|
||||
let to_block_num = to_block.as_ref().and_then(|x| x.number);
|
||||
let from_block_hash = from_block.as_ref().map(|x| x.hash());
|
||||
let to_block_hash = to_block.as_ref().map(|x| x.hash());
|
||||
|
||||
let mut hasher = DefaultHashBuilder::default().build_hasher();
|
||||
|
||||
from_block.as_ref().and_then(|x| x.hash).hash(&mut hasher);
|
||||
to_block.as_ref().and_then(|x| x.hash).hash(&mut hasher);
|
||||
from_block_hash.hash(&mut hasher);
|
||||
to_block_hash.hash(&mut hasher);
|
||||
|
||||
method.hash(&mut hasher);
|
||||
|
||||
@ -76,8 +79,8 @@ impl JsonRpcQueryCacheKey {
|
||||
|
||||
Self {
|
||||
hash,
|
||||
from_block_num,
|
||||
to_block_num,
|
||||
from_block,
|
||||
to_block,
|
||||
cache_errors,
|
||||
}
|
||||
}
|
||||
|
@ -17,7 +17,7 @@ use std::time::Duration;
|
||||
use std::{fmt::Display, sync::Arc};
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::time::timeout;
|
||||
use tracing::{debug, error, trace};
|
||||
use tracing::{debug, error};
|
||||
|
||||
// TODO: type for Hydrated Blocks with their full transactions?
|
||||
pub type ArcBlock = Arc<Block<TxHash>>;
|
||||
@ -376,7 +376,8 @@ impl Web3Rpcs {
|
||||
break;
|
||||
}
|
||||
|
||||
trace!("waiting for future block {} > {}", num, head_block_num);
|
||||
debug!(%head_block_num, %num, "waiting for future block");
|
||||
|
||||
consensus_head_receiver.changed().await?;
|
||||
|
||||
if let Some(head) = consensus_head_receiver.borrow_and_update().as_ref() {
|
||||
|
@ -349,19 +349,20 @@ impl fmt::Debug for RankedRpcs {
|
||||
|
||||
// TODO: refs for all of these. borrow on a Sender is cheap enough
|
||||
impl Web3Rpcs {
|
||||
// TODO: return a ref?
|
||||
pub fn head_block(&self) -> Option<Web3ProxyBlock> {
|
||||
self.watch_head_block
|
||||
.as_ref()
|
||||
.and_then(|x| x.borrow().clone())
|
||||
}
|
||||
|
||||
// TODO: return a ref?
|
||||
/// note: you probably want to use `head_block` instead
|
||||
/// TODO: return a ref?
|
||||
pub fn head_block_hash(&self) -> Option<H256> {
|
||||
self.head_block().map(|x| *x.hash())
|
||||
}
|
||||
|
||||
// TODO: return a ref?
|
||||
/// note: you probably want to use `head_block` instead
|
||||
/// TODO: return a ref?
|
||||
pub fn head_block_num(&self) -> Option<U64> {
|
||||
self.head_block().map(|x| *x.number())
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user