dry cache code

This commit is contained in:
Bryan Stitt 2022-09-07 03:54:16 +00:00
parent 5eef5173a1
commit c9b2c0c0d2
9 changed files with 87 additions and 113 deletions

@ -94,21 +94,21 @@ TODO: also enable debug symbols in the release build by modifying the root Cargo
Test the proxy:
wrk -s ./data/wrk/getBlockNumber.lua -t12 -c400 -d30s --latency http://127.0.0.1:8544
wrk -s ./data/wrk/getLatestBlockByNumber.lua -t12 -c400 -d30s --latency http://127.0.0.1:8544
wrk -s ./wrk/getBlockNumber.lua -t12 -c400 -d30s --latency http://127.0.0.1:8544/u/$API_KEY
wrk -s ./wrk/getLatestBlockByNumber.lua -t12 -c400 -d30s --latency http://127.0.0.1:8544/u/$API_KEY
Test geth (assuming it is on 8545):
wrk -s ./data/wrk/getBlockNumber.lua -t12 -c400 -d30s --latency http://127.0.0.1:8545
wrk -s ./data/wrk/getLatestBlockByNumber.lua -t12 -c400 -d30s --latency http://127.0.0.1:8545
wrk -s ./wrk/getBlockNumber.lua -t12 -c400 -d30s --latency http://127.0.0.1:8545
wrk -s ./wrk/getLatestBlockByNumber.lua -t12 -c400 -d30s --latency http://127.0.0.1:8545
Test erigon (assuming it is on 8945):
wrk -s ./data/wrk/getBlockNumber.lua -t12 -c400 -d30s --latency http://127.0.0.1:8945
wrk -s ./data/wrk/getLatestBlockByNumber.lua -t12 -c400 -d30s --latency http://127.0.0.1:8945
wrk -s ./wrk/getBlockNumber.lua -t12 -c400 -d30s --latency http://127.0.0.1:8945
wrk -s ./wrk/getLatestBlockByNumber.lua -t12 -c400 -d30s --latency http://127.0.0.1:8945
Note: Testing with `getLatestBlockByNumber.lua` is not great because the latest block changes and so one run is likely to be very different than another.
Run [ethspam](https://github.com/INFURA/versus) and [versus](https://github.com/shazow/ethspam) for a more realistic load test:
ethspam --rpc http://127.0.0.1:8544/u/someuserkey | versus --concurrency=100 --stop-after=10000 http://127.0.0.1:8544/u/someuserkey
ethspam --rpc http://127.0.0.1:8544/u/$API_KEY | versus --concurrency=100 --stop-after=10000 http://127.0.0.1:8544/u/$API_KEY

@ -6,7 +6,7 @@ use crate::jsonrpc::JsonRpcForwardedResponse;
use crate::jsonrpc::JsonRpcForwardedResponseEnum;
use crate::jsonrpc::JsonRpcRequest;
use crate::jsonrpc::JsonRpcRequestEnum;
use crate::rpcs::blockchain::{ArcBlock, BlockHashesMap};
use crate::rpcs::blockchain::{ArcBlock, BlockHashesMap, BlockId};
use crate::rpcs::connections::Web3Connections;
use crate::rpcs::transactions::TxStatus;
use crate::stats::AppStats;
@ -32,10 +32,10 @@ use serde_json::json;
use std::fmt;
use std::pin::Pin;
use std::str::FromStr;
use std::sync::atomic::{self, AtomicUsize};
use std::sync::atomic::{self, AtomicU64, AtomicUsize};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{broadcast, watch};
use tokio::sync::{broadcast, watch, Notify};
use tokio::task::JoinHandle;
use tokio::time::{timeout, Instant};
use tokio_stream::wrappers::{BroadcastStream, WatchStream};
@ -52,9 +52,13 @@ static APP_USER_AGENT: &str = concat!(
/// block hash, method, params
// TODO: better name
type ResponseCacheKey = (H256, String, Option<String>);
type Web3QueryCacheKey = (H256, String, Option<String>);
type ResponseCache = Cache<ResponseCacheKey, JsonRpcForwardedResponse>;
/// wait on this to
type ResponseCacheReady = Arc<Notify>;
type RequestCache = Cache<Web3QueryCacheKey, (u64, ResponseCacheReady)>;
type ResponseCache = Cache<Web3QueryCacheKey, JsonRpcForwardedResponse>;
pub type AnyhowJoinHandle<T> = JoinHandle<anyhow::Result<T>>;
@ -80,9 +84,12 @@ pub struct Web3ProxyApp {
pending_tx_sender: broadcast::Sender<TxStatus>,
pub config: AppConfig,
pub db_conn: Option<sea_orm::DatabaseConnection>,
/// store pending queries so that we don't send the same request to our backends multiple times
pub total_queries: AtomicU64,
pub active_queries: RequestCache,
/// store pending transactions that we've seen so that we don't send duplicates to subscribers
pub pending_transactions: Cache<TxHash, TxStatus>,
pub rate_limiter: Option<RedisRateLimit>,
pub frontend_rate_limiter: Option<RedisRateLimit>,
pub redis_pool: Option<RedisPool>,
pub stats: AppStats,
pub user_cache: Cache<Uuid, UserCacheValue>,
@ -304,6 +311,8 @@ impl Web3ProxyApp {
});
// TODO: change this to a sized cache
let total_queries = 0.into();
let active_queries = Cache::new(10_000);
let response_cache = Cache::new(10_000);
let user_cache = Cache::new(10_000);
@ -314,8 +323,10 @@ impl Web3ProxyApp {
response_cache,
head_block_receiver,
pending_tx_sender,
total_queries,
active_queries,
pending_transactions,
rate_limiter: frontend_rate_limiter,
frontend_rate_limiter,
db_conn,
redis_pool,
stats: app_stats,
@ -324,8 +335,6 @@ impl Web3ProxyApp {
let app = Arc::new(app);
// create a handle that returns on the first error
// TODO: move this to a helper. i think Web3Connections needs it too
let handle = Box::pin(flatten_handles(handles));
Ok((app, handle))
@ -586,48 +595,6 @@ impl Web3ProxyApp {
}
}
async fn cached_response_or_key(
&self,
// TODO: accept a block hash here also?
min_block_needed: Option<&U64>,
request: &JsonRpcRequest,
) -> anyhow::Result<Result<JsonRpcForwardedResponse, ResponseCacheKey>> {
// TODO: inspect the request to pick the right cache
// TODO: https://github.com/ethereum/web3.py/blob/master/web3/middleware/cache.py
let request_block_hash = if let Some(min_block_needed) = min_block_needed {
// TODO: maybe this should be on the app and not on balanced_rpcs
self.balanced_rpcs.block_hash(min_block_needed).await?
} else {
// TODO: maybe this should be on the app and not on balanced_rpcs
self.balanced_rpcs
.head_block_hash()
.context("no servers in sync")?
};
// TODO: better key? benchmark this
let key = (
request_block_hash,
request.method.clone(),
request.params.clone().map(|x| x.to_string()),
);
if let Some(response) = self.response_cache.get(&key) {
// TODO: emit a stat
trace!(?request.method, "cache hit!");
// TODO: can we make references work? maybe put them in an Arc?
return Ok(Ok(response));
}
// TODO: another lock here so that we don't send the same request to a backend more than onces xzs
// TODO: emit a stat
trace!(?request.method, "cache miss!");
Ok(Err(key))
}
async fn proxy_web3_rpc_request(
&self,
mut request: JsonRpcRequest,
@ -635,7 +602,7 @@ impl Web3ProxyApp {
trace!("Received request: {:?}", request);
// save the id so we can attach it to the response
let id = request.id.clone();
let request_id = request.id.clone();
// TODO: if eth_chainId or net_version, serve those without querying the backend
@ -807,45 +774,41 @@ impl Web3ProxyApp {
_ => return Err(anyhow::anyhow!("invalid request")),
}
}
// TODO: web3_sha3?
// anything else gets sent to backend rpcs and cached
method => {
// emit stats
let head_block_number = self
// TODO: wait for them to be synced?
let head_block_id = self
.balanced_rpcs
.head_block_num()
.head_block_id()
.context("no servers synced")?;
// we do this check before checking caches because it might modify the request params
// TODO: add a stat for archive vs full since they should probably cost different
let min_block_needed =
block_needed(method, request.params.as_mut(), head_block_number);
let min_block_needed = min_block_needed.as_ref();
trace!(?min_block_needed, ?method);
// TODO: emit a stat on error. maybe with .map_err?
let cache_key = match self
.cached_response_or_key(min_block_needed, &request)
.await?
let request_block_id = if let Some(request_block_needed) =
block_needed(method, request.params.as_mut(), head_block_id.num)
{
Ok(mut cache_result) => {
// we got a cache hit! no need to do any backend requests.
// TODO: maybe this should be on the app and not on balanced_rpcs
let request_block_hash =
self.balanced_rpcs.block_hash(&request_block_needed).await?;
// put our request id on the cached response
// TODO: maybe only cache the inner result? then have a JsonRpcForwardedResponse::from_cache
cache_result.id = request.id;
// emit a stat
return Ok(cache_result);
BlockId {
num: request_block_needed,
hash: request_block_hash,
}
Err(cache_key) => cache_key,
} else {
head_block_id
};
// TODO: struct for this?
// TODO: this can be rather large. is that okay?
let cache_key = (
request_block_id.hash,
request.method.clone(),
request.params.clone().map(|x| x.to_string()),
);
// TODO: move this caching outside this match and cache some of the other responses?
// TODO: cache the warp::reply to save us serializing every time?
let mut response = self
@ -856,13 +819,19 @@ impl Web3ProxyApp {
// "eth_getTransactionByHash" | "eth_getTransactionReceipt" => {
// TODO: try_send_all serially with retries instead of parallel
self.private_rpcs
.try_send_all_upstream_servers(request, min_block_needed)
.try_send_all_upstream_servers(
request,
Some(&request_block_id.num),
)
.await
}
_ => {
// TODO: retries?
// TODO: retry some failures automatically!
self.balanced_rpcs
.try_send_best_upstream_server(request, min_block_needed)
.try_send_best_upstream_server(
request,
Some(&request_block_id.num),
)
.await
}
}
@ -870,14 +839,15 @@ impl Web3ProxyApp {
.await
.unwrap();
// this is fragile and i no longer like it
response.id = id;
// since this data came out of a cache, the id is likely wrong.
// replace the id with our request's id.
response.id = request_id;
return Ok(response);
}
};
let response = JsonRpcForwardedResponse::from_value(partial_response, id);
let response = JsonRpcForwardedResponse::from_value(partial_response, request_id);
Ok(response)
}

@ -65,7 +65,7 @@ pub fn clean_block_number(
pub fn block_needed(
method: &str,
params: Option<&mut serde_json::Value>,
head_block: U64,
head_block_num: U64,
) -> Option<U64> {
// if no params, no block is needed
let params = params?;
@ -97,7 +97,7 @@ pub fn block_needed(
if let Some(x) = obj.get_mut("fromBlock") {
let block_num: BlockNumber = serde_json::from_value(x.clone()).ok()?;
let (modified, block_num) = block_num_to_u64(block_num, head_block);
let (modified, block_num) = block_num_to_u64(block_num, head_block_num);
if modified {
*x = serde_json::to_value(block_num).unwrap();
@ -109,7 +109,7 @@ pub fn block_needed(
if let Some(x) = obj.get_mut("toBlock") {
let block_num: BlockNumber = serde_json::from_value(x.clone()).ok()?;
let (modified, block_num) = block_num_to_u64(block_num, head_block);
let (modified, block_num) = block_num_to_u64(block_num, head_block_num);
if modified {
*x = serde_json::to_value(block_num).unwrap();
@ -162,7 +162,7 @@ pub fn block_needed(
}
};
match clean_block_number(params, block_param_id, head_block) {
match clean_block_number(params, block_param_id, head_block_num) {
Ok(block) => Some(block),
Err(err) => {
// TODO: seems unlikely that we will get here

@ -1,7 +1,7 @@
use crate::app::Web3ProxyApp;
use axum::{http::StatusCode, response::IntoResponse, Extension, Json};
use serde_json::json;
use std::sync::Arc;
use std::sync::{atomic, Arc};
/// Health check page for load balancers to use
pub async fn health(Extension(app): Extension<Arc<Web3ProxyApp>>) -> impl IntoResponse {
@ -17,10 +17,15 @@ pub async fn health(Extension(app): Extension<Arc<Web3ProxyApp>>) -> impl IntoRe
pub async fn status(Extension(app): Extension<Arc<Web3ProxyApp>>) -> impl IntoResponse {
// TODO: what else should we include? uptime?
let body = json!({
"balanced_rpcs": app.balanced_rpcs,
"private_rpcs": app.private_rpcs,
"total_queries": app.total_queries.load(atomic::Ordering::Relaxed),
"active_queries_count": app.active_queries.entry_count(),
"active_queries_size": app.active_queries.weighted_size(),
"pending_transactions_count": app.pending_transactions.entry_count(),
"pending_transactions_size": app.pending_transactions.weighted_size(),
"user_cache_count": app.user_cache.entry_count(),
"user_cache_size": app.user_cache.weighted_size(),
"balanced_rpcs": app.balanced_rpcs,
"private_rpcs": app.private_rpcs,
});
Json(body)

@ -120,7 +120,7 @@ impl Web3ProxyApp {
pub async fn rate_limit_by_ip(&self, ip: IpAddr) -> anyhow::Result<RateLimitResult> {
// TODO: dry this up with rate_limit_by_key
// TODO: have a local cache because if we hit redis too hard we get errors
if let Some(rate_limiter) = &self.rate_limiter {
if let Some(rate_limiter) = &self.frontend_rate_limiter {
let rate_limiter_label = format!("ip-{}", ip);
// TODO: query redis in the background so that users don't have to wait on this network request
@ -236,7 +236,7 @@ impl Web3ProxyApp {
// user key is valid. now check rate limits
// TODO: this is throwing errors when curve-api hits us with high concurrency. investigate
if false {
if let Some(rate_limiter) = &self.rate_limiter {
if let Some(rate_limiter) = &self.frontend_rate_limiter {
// TODO: query redis in the background so that users don't have to wait on this network request
// TODO: better key? have a prefix so its easy to delete all of these
// TODO: we should probably hash this or something

@ -3,7 +3,7 @@ use super::rate_limit::{rate_limit_by_ip, rate_limit_by_user_key};
use crate::stats::Protocol;
use crate::{app::Web3ProxyApp, jsonrpc::JsonRpcRequestEnum};
use axum::extract::Path;
use axum::{http::StatusCode, response::IntoResponse, Extension, Json};
use axum::{response::IntoResponse, Extension, Json};
use axum_client_ip::ClientIp;
use std::sync::Arc;
use tracing::{error_span, Instrument};
@ -58,7 +58,7 @@ pub async fn public_proxy_web3_rpc(
let response = app.proxy_web3_rpc(payload).instrument(user_span).await?;
Ok((StatusCode::OK, Json(&response)).into_response())
Ok(Json(&response).into_response())
}
pub async fn user_proxy_web3_rpc(
@ -74,5 +74,5 @@ pub async fn user_proxy_web3_rpc(
let response = app.proxy_web3_rpc(payload).instrument(user_span).await?;
Ok((StatusCode::OK, Json(&response)).into_response())
Ok(Json(&response).into_response())
}

@ -24,8 +24,8 @@ pub type BlockHashesMap = Cache<H256, ArcBlock>;
/// A block's hash and number.
#[derive(Clone, Debug, Default, From, Serialize)]
pub struct BlockId {
pub(super) hash: H256,
pub(super) num: U64,
pub hash: H256,
pub num: U64,
}
impl Display for BlockId {

@ -787,9 +787,8 @@ impl Serialize for Web3Connection {
state.serialize_field("name", &self.name)?;
let block_data_limit = self.block_data_limit.load(atomic::Ordering::Relaxed);
if block_data_limit == u64::MAX {
state.serialize_field("block_data_limit", "None")?;
state.serialize_field("block_data_limit", &None::<()>)?;
} else {
state.serialize_field("block_data_limit", &block_data_limit)?;
}
@ -821,9 +820,9 @@ impl fmt::Debug for Web3Connection {
let block_data_limit = self.block_data_limit.load(atomic::Ordering::Relaxed);
if block_data_limit == u64::MAX {
f.field("data", &"archive");
f.field("blocks", &"all");
} else {
f.field("data", &block_data_limit);
f.field("blocks", &block_data_limit);
}
f.finish_non_exhaustive()

@ -437,15 +437,15 @@ impl Web3Connections {
// TODO: better type on this that can return an anyhow::Result
pub async fn upstream_servers(
&self,
min_block_needed: Option<&U64>,
block_needed: Option<&U64>,
) -> Result<Vec<OpenRequestHandle>, Option<Instant>> {
let mut earliest_retry_at = None;
// TODO: with capacity?
let mut selected_rpcs = vec![];
for connection in self.conns.values() {
if let Some(min_block_needed) = min_block_needed {
if !connection.has_block_data(min_block_needed) {
if let Some(block_needed) = block_needed {
if !connection.has_block_data(block_needed) {
continue;
}
}
@ -575,10 +575,10 @@ impl Web3Connections {
pub async fn try_send_all_upstream_servers(
&self,
request: JsonRpcRequest,
min_block_needed: Option<&U64>,
block_needed: Option<&U64>,
) -> anyhow::Result<JsonRpcForwardedResponse> {
loop {
match self.upstream_servers(min_block_needed).await {
match self.upstream_servers(block_needed).await {
Ok(active_request_handles) => {
// TODO: benchmark this compared to waiting on unbounded futures
// TODO: do something with this handle?