improve request caching

This commit is contained in:
Bryan Stitt 2022-12-16 20:05:01 -08:00
parent ecd2ba5c87
commit f04905698a
7 changed files with 232 additions and 95 deletions

@ -163,4 +163,6 @@ Note: Testing with `getLatestBlockByNumber.lua` is not great because the latest
Run [ethspam](https://github.com/INFURA/versus) and [versus](https://github.com/shazow/ethspam) for a more realistic load test:
ethspam --rpc http://127.0.0.1:8544 | versus --concurrency=100 --stop-after=10000 http://127.0.0.1:8544
ethspam --rpc http://127.0.0.1:8544/u/$API_KEY | versus --concurrency=100 --stop-after=10000 http://127.0.0.1:8544/u/$API_KEY

@ -576,4 +576,5 @@ in another repo: event subscriber
- [ ] if it is too long, (the last 4 bytes must be zero), give an error so descriptions like this stand out
- [ ] we need to use docker-compose's proper environment variable handling. because now if someone tries to start dev containers in their prod, remove orphans stops and removes them
- [ ] change invite codes to set the user_tier
- [ ] some cli commands should use the replica if possible
- [ ] some cli commands should use the replica if possible
- [ ] some third party rpcs have limits on the size of eth_getLogs. include those limits in server config

@ -2,7 +2,7 @@
mod ws;
use crate::app_stats::{ProxyResponseStat, StatEmitter, Web3ProxyStat};
use crate::block_number::block_needed;
use crate::block_number::{block_needed, BlockNeeded};
use crate::config::{AppConfig, TopConfig};
use crate::frontend::authorization::{Authorization, RequestMetadata};
use crate::jsonrpc::JsonRpcForwardedResponse;
@ -37,6 +37,7 @@ use redis_rate_limiter::{DeadpoolRuntime, RedisConfig, RedisPool, RedisRateLimit
use serde::Serialize;
use serde_json::json;
use std::fmt;
use std::hash::{Hash, Hasher};
use std::net::IpAddr;
use std::num::NonZeroU64;
use std::str::FromStr;
@ -59,9 +60,68 @@ pub static APP_USER_AGENT: &str = concat!(
/// TODO: allow customizing the request period?
pub static REQUEST_PERIOD: u64 = 60;
/// block hash, method, params
// TODO: better name
type ResponseCacheKey = (H256, String, Option<String>);
#[derive(From)]
struct ResponseCacheKey {
// if none, this is cached until evicted
block: Option<SavedBlock>,
method: String,
// TODO: better type for this
params: Option<serde_json::Value>,
cache_errors: bool,
}
impl ResponseCacheKey {
fn weight(&self) -> usize {
let mut w = self.method.len();
if let Some(p) = self.params.as_ref() {
w += p.to_string().len();
}
w
}
}
impl PartialEq for ResponseCacheKey {
fn eq(&self, other: &Self) -> bool {
if self.cache_errors != other.cache_errors {
return false;
}
match (self.block.as_ref(), other.block.as_ref()) {
(None, None) => {}
(None, Some(_)) => {
return false;
}
(Some(_), None) => {
return false;
}
(Some(s), Some(o)) => {
if s != o {
return false;
}
}
}
if self.method != other.method {
return false;
}
self.params == other.params
}
}
impl Eq for ResponseCacheKey {}
impl Hash for ResponseCacheKey {
fn hash<H: Hasher>(&self, state: &mut H) {
self.block.as_ref().map(|x| x.hash()).hash(state);
self.method.hash(state);
self.params.as_ref().map(|x| x.to_string()).hash(state);
self.cache_errors.hash(state)
}
}
type ResponseCache =
Cache<ResponseCacheKey, JsonRpcForwardedResponse, hashbrown::hash_map::DefaultHashBuilder>;
@ -560,19 +620,13 @@ impl Web3ProxyApp {
// TODO: don't allow any response to be bigger than X% of the cache
let response_cache = Cache::builder()
.max_capacity(1024 * 1024 * 1024)
.weigher(|k: &(H256, String, Option<String>), v| {
// TODO: make this weigher past. serializing json is not fast
let mut size = (k.1).len();
if let Some(params) = &k.2 {
size += params.len()
}
.weigher(|k: &ResponseCacheKey, v| {
// TODO: is this good?
if let Ok(v) = serde_json::to_string(v) {
size += v.len();
let weight = k.weight() + v.len();
// the or in unwrap_or is probably never called
size.try_into().unwrap_or(u32::MAX)
weight.try_into().unwrap_or(u32::MAX)
} else {
// this seems impossible
u32::MAX
@ -974,7 +1028,8 @@ impl Web3ProxyApp {
// we do this check before checking caches because it might modify the request params
// TODO: add a stat for archive vs full since they should probably cost different
let request_block = if let Some(request_block_needed) = block_needed(
// TODO: this cache key can be rather large. is that okay?
let cache_key: Option<ResponseCacheKey> = match block_needed(
authorization,
method,
request.params.as_mut(),
@ -983,69 +1038,96 @@ impl Web3ProxyApp {
)
.await?
{
// TODO: maybe this should be on the app and not on balanced_rpcs
let (request_block_hash, archive_needed) = self
.balanced_rpcs
.block_hash(authorization, &request_block_needed)
.await?;
BlockNeeded::CacheSuccessForever => Some(ResponseCacheKey {
block: None,
method: method.to_string(),
params: request.params.clone(),
cache_errors: false,
}),
BlockNeeded::CacheNever => None,
BlockNeeded::Cache {
block_num,
cache_errors,
} => {
let (request_block_hash, archive_needed) = self
.balanced_rpcs
.block_hash(authorization, &block_num)
.await?;
if archive_needed {
request_metadata
.archive_request
.store(true, atomic::Ordering::Relaxed);
if archive_needed {
request_metadata
.archive_request
.store(true, atomic::Ordering::Relaxed);
}
let request_block = self
.balanced_rpcs
.block(authorization, &request_block_hash, None)
.await?;
Some(ResponseCacheKey {
block: Some(SavedBlock::new(request_block)),
method: method.to_string(),
// TODO: hash here?
params: request.params.clone(),
cache_errors,
})
}
let request_block = self
.balanced_rpcs
.block(authorization, &request_block_hash, None)
.await?;
SavedBlock::new(request_block)
} else {
head_block
};
// TODO: struct for this?
// TODO: this can be rather large. is that okay?
let cache_key = (
request_block.hash(),
request.method.clone(),
request.params.clone().map(|x| x.to_string()),
);
let mut response = {
let request_metadata = request_metadata.clone();
let authorization = authorization.clone();
self.response_cache
.try_get_with(cache_key, async move {
// TODO: retry some failures automatically!
// TODO: try private_rpcs if all the balanced_rpcs fail!
// TODO: put the hash here instead?
let mut response = self
.balanced_rpcs
.try_send_best_upstream_server(
&authorization,
request,
Some(&request_metadata),
Some(&request_block.number()),
)
.await?;
if let Some(cache_key) = cache_key {
let request_block_number = cache_key.block.as_ref().map(|x| x.number());
// discard their id by replacing it with an empty
response.id = Default::default();
self.response_cache
.try_get_with(cache_key, async move {
// TODO: retry some failures automatically!
// TODO: try private_rpcs if all the balanced_rpcs fail!
// TODO: put the hash here instead?
let mut response = self
.balanced_rpcs
.try_send_best_upstream_server(
&authorization,
request,
Some(&request_metadata),
request_block_number.as_ref(),
)
.await?;
// TODO: only cache the inner response (or error)
Ok::<_, anyhow::Error>(response)
})
.await
// TODO: what is the best way to handle an Arc here?
.map_err(|err| {
// TODO: emit a stat for an error
anyhow::anyhow!(err)
})
.context("caching response")?
// discard their id by replacing it with an empty
response.id = Default::default();
// TODO: only cache the inner response (or error)
Ok::<_, anyhow::Error>(response)
})
.await
// TODO: what is the best way to handle an Arc here?
.map_err(|err| {
// TODO: emit a stat for an error
anyhow::anyhow!(err)
})
.context("caching response")?
} else {
let mut response = self
.balanced_rpcs
.try_send_best_upstream_server(
&authorization,
request,
Some(&request_metadata),
None,
)
.await?;
// discard their id by replacing it with an empty
response.id = Default::default();
// TODO: only cache the inner response (or error)
response
}
};
// since this data came likely out of a cache, the id is not going to match

@ -5,6 +5,7 @@ use ethers::{
types::H256,
};
use log::warn;
use serde_json::json;
use std::sync::Arc;
use crate::{frontend::authorization::Authorization, rpcs::connections::Web3Connections};
@ -97,7 +98,12 @@ pub async fn clean_block_number(
}
}
// TODO: change this to also return the hash needed?
/// TODO: change this to also return the hash needed?
pub enum BlockNeeded {
CacheSuccessForever,
CacheNever,
Cache { block_num: U64, cache_errors: bool },
}
pub async fn block_needed(
authorization: &Arc<Authorization>,
@ -105,12 +111,17 @@ pub async fn block_needed(
params: Option<&mut serde_json::Value>,
head_block_num: U64,
rpcs: &Web3Connections,
) -> anyhow::Result<Option<U64>> {
) -> anyhow::Result<BlockNeeded> {
// if no params, no block is needed
let params = if let Some(params) = params {
params
} else {
return Ok(None);
// TODO: check all the methods with no params, some might not be cacheable
// caching for one block should always be okay
return Ok(BlockNeeded::Cache {
block_num: head_block_num,
cache_errors: true,
});
};
// get the index for the BlockNumber or return None to say no block is needed.
@ -122,20 +133,26 @@ pub async fn block_needed(
"eth_getBalance" => 1,
"eth_getBlockByHash" => {
// TODO: double check that any node can serve this
return Ok(None);
// TODO: can a block change? like what if it gets orphaned?
return Ok(BlockNeeded::CacheSuccessForever);
}
"eth_getBlockByNumber" => {
// TODO: double check that any node can serve this
return Ok(None);
// TODO: CacheSuccessForever if the block is old enough
return Ok(BlockNeeded::Cache {
block_num: head_block_num,
cache_errors: true,
});
}
"eth_getBlockReceipts" => 0,
"eth_getBlockTransactionCountByHash" => {
// TODO: double check that any node can serve this
return Ok(None);
return Ok(BlockNeeded::CacheSuccessForever);
}
"eth_getBlockTransactionCountByNumber" => 0,
"eth_getCode" => 1,
"eth_getLogs" => {
// TODO: think about this more
// TODO: jsonrpc has a specific code for this
let obj = params[0]
.as_object_mut()
@ -146,12 +163,14 @@ pub async fn block_needed(
let block_num = block_num_to_u64(block_num, head_block_num);
*x =
serde_json::to_value(block_num).expect("U64 can always be a serde_json::Value");
*x = json!(block_num);
// TODO: maybe don't return. instead check toBlock too?
// TODO: if there is a very wide fromBlock and toBlock, we need to check that our rpcs have both!
return Ok(Some(block_num));
return Ok(BlockNeeded::Cache {
block_num,
cache_errors: false,
});
}
if let Some(x) = obj.get_mut("toBlock") {
@ -159,60 +178,80 @@ pub async fn block_needed(
let block_num = block_num_to_u64(block_num, head_block_num);
*x = serde_json::to_value(block_num)
.expect("block_num should always turn into a value");
*x = json!(block_num);
return Ok(Some(block_num));
return Ok(BlockNeeded::Cache {
block_num,
cache_errors: false,
});
}
if obj.contains_key("blockHash") {
1
} else {
return Ok(None);
return Ok(BlockNeeded::Cache {
block_num: head_block_num,
cache_errors: true,
});
}
}
"eth_getStorageAt" => 2,
"eth_getTransactionByHash" => {
// TODO: not sure how best to look these up
// try full nodes first. retry will use archive
return Ok(None);
return Ok(BlockNeeded::Cache {
block_num: head_block_num,
cache_errors: true,
});
}
"eth_getTransactionByBlockHashAndIndex" => {
// TODO: check a Cache of recent hashes
// try full nodes first. retry will use archive
return Ok(None);
return Ok(BlockNeeded::CacheSuccessForever);
}
"eth_getTransactionByBlockNumberAndIndex" => 0,
"eth_getTransactionCount" => 1,
"eth_getTransactionReceipt" => {
// TODO: not sure how best to look these up
// try full nodes first. retry will use archive
return Ok(None);
return Ok(BlockNeeded::Cache {
block_num: head_block_num,
cache_errors: true,
});
}
"eth_getUncleByBlockHashAndIndex" => {
// TODO: check a Cache of recent hashes
// try full nodes first. retry will use archive
return Ok(None);
return Ok(BlockNeeded::CacheSuccessForever);
}
"eth_getUncleByBlockNumberAndIndex" => 0,
"eth_getUncleCountByBlockHash" => {
// TODO: check a Cache of recent hashes
// try full nodes first. retry will use archive
return Ok(None);
return Ok(BlockNeeded::CacheSuccessForever);
}
"eth_getUncleCountByBlockNumber" => 0,
_ => {
// some other command that doesn't take block numbers as an argument
return Ok(None);
// since we are caching with the head block, it should be safe to cache_errors
return Ok(BlockNeeded::Cache {
block_num: head_block_num,
cache_errors: true,
});
}
};
match clean_block_number(authorization, params, block_param_id, head_block_num, rpcs).await {
Ok(block) => Ok(Some(block)),
Ok(block_num) => Ok(BlockNeeded::Cache {
block_num,
cache_errors: true,
}),
Err(err) => {
// TODO: seems unlikely that we will get here
warn!("could not get block from params. err={:?}", err);
Ok(None)
Ok(BlockNeeded::Cache {
block_num: head_block_num,
cache_errors: true,
})
}
}
}

@ -31,7 +31,7 @@ pub async fn proxy_web3_rpc(
// TODO: spawn earlier? i think we want ip_is_authorized in this future
let f = tokio::spawn(async move { app.proxy_web3_rpc(authorization, payload).await });
let response = f.await.expect("joinhandle should always work")?;
let response = f.await??;
Ok(Json(&response).into_response())
}

@ -32,6 +32,17 @@ pub struct SavedBlock {
pub lag: u64,
}
impl PartialEq for SavedBlock {
fn eq(&self, other: &Self) -> bool {
match (self.block.hash, other.block.hash) {
(None, None) => true,
(Some(_), None) => false,
(None, Some(_)) => false,
(Some(s), Some(o)) => s == o,
}
}
}
impl SavedBlock {
pub fn new(block: ArcBlock) -> Self {
let mut x = Self { block, lag: 0 };
@ -67,12 +78,12 @@ impl SavedBlock {
}
pub fn hash(&self) -> H256 {
self.block.hash.unwrap()
self.block.hash.expect("saved blocks must have a hash")
}
// TODO: return as U64 or u64?
pub fn number(&self) -> U64 {
self.block.number.unwrap()
self.block.number.expect("saved blocks must have a number")
}
/// When the block was received, this node was still syncing

@ -18,7 +18,7 @@ use futures::future::{join_all, try_join_all};
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use hashbrown::HashMap;
use log::{error, info, trace, warn, Level};
use log::{debug, error, info, trace, warn, Level};
use migration::sea_orm::DatabaseConnection;
use moka::future::{Cache, ConcurrentCacheExt};
use serde::ser::{SerializeStruct, Serializer};
@ -663,7 +663,9 @@ impl Web3Connections {
.last()
.expect("there must have been a provider if we got an error");
warn!(
// TODO: emit a stat. if a server is getting skipped a lot, something is not right
debug!(
"Backend server error on {}! Retrying on another. err={:?}",
rpc, err
);