From f04905698ad977c35e76ad070cf6a64b6e097ef2 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Fri, 16 Dec 2022 20:05:01 -0800 Subject: [PATCH] improve request caching --- README.md | 2 + TODO.md | 3 +- web3_proxy/src/app/mod.rs | 216 +++++++++++++++------- web3_proxy/src/block_number.rs | 83 ++++++--- web3_proxy/src/frontend/rpc_proxy_http.rs | 2 +- web3_proxy/src/rpcs/blockchain.rs | 15 +- web3_proxy/src/rpcs/connections.rs | 6 +- 7 files changed, 232 insertions(+), 95 deletions(-) diff --git a/README.md b/README.md index acf75ed4..fba71c7c 100644 --- a/README.md +++ b/README.md @@ -163,4 +163,6 @@ Note: Testing with `getLatestBlockByNumber.lua` is not great because the latest Run [ethspam](https://github.com/INFURA/versus) and [versus](https://github.com/shazow/ethspam) for a more realistic load test: + ethspam --rpc http://127.0.0.1:8544 | versus --concurrency=100 --stop-after=10000 http://127.0.0.1:8544 + ethspam --rpc http://127.0.0.1:8544/u/$API_KEY | versus --concurrency=100 --stop-after=10000 http://127.0.0.1:8544/u/$API_KEY diff --git a/TODO.md b/TODO.md index 6b30eb14..b3a967bb 100644 --- a/TODO.md +++ b/TODO.md @@ -576,4 +576,5 @@ in another repo: event subscriber - [ ] if it is too long, (the last 4 bytes must be zero), give an error so descriptions like this stand out - [ ] we need to use docker-compose's proper environment variable handling. because now if someone tries to start dev containers in their prod, remove orphans stops and removes them - [ ] change invite codes to set the user_tier -- [ ] some cli commands should use the replica if possible \ No newline at end of file +- [ ] some cli commands should use the replica if possible +- [ ] some third party rpcs have limits on the size of eth_getLogs. include those limits in server config diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 7257a9e8..388dd5d2 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -2,7 +2,7 @@ mod ws; use crate::app_stats::{ProxyResponseStat, StatEmitter, Web3ProxyStat}; -use crate::block_number::block_needed; +use crate::block_number::{block_needed, BlockNeeded}; use crate::config::{AppConfig, TopConfig}; use crate::frontend::authorization::{Authorization, RequestMetadata}; use crate::jsonrpc::JsonRpcForwardedResponse; @@ -37,6 +37,7 @@ use redis_rate_limiter::{DeadpoolRuntime, RedisConfig, RedisPool, RedisRateLimit use serde::Serialize; use serde_json::json; use std::fmt; +use std::hash::{Hash, Hasher}; use std::net::IpAddr; use std::num::NonZeroU64; use std::str::FromStr; @@ -59,9 +60,68 @@ pub static APP_USER_AGENT: &str = concat!( /// TODO: allow customizing the request period? pub static REQUEST_PERIOD: u64 = 60; -/// block hash, method, params -// TODO: better name -type ResponseCacheKey = (H256, String, Option); +#[derive(From)] +struct ResponseCacheKey { + // if none, this is cached until evicted + block: Option, + method: String, + // TODO: better type for this + params: Option, + cache_errors: bool, +} + +impl ResponseCacheKey { + fn weight(&self) -> usize { + let mut w = self.method.len(); + + if let Some(p) = self.params.as_ref() { + w += p.to_string().len(); + } + + w + } +} + +impl PartialEq for ResponseCacheKey { + fn eq(&self, other: &Self) -> bool { + if self.cache_errors != other.cache_errors { + return false; + } + + match (self.block.as_ref(), other.block.as_ref()) { + (None, None) => {} + (None, Some(_)) => { + return false; + } + (Some(_), None) => { + return false; + } + (Some(s), Some(o)) => { + if s != o { + return false; + } + } + } + + if self.method != other.method { + return false; + } + + self.params == other.params + } +} + +impl Eq for ResponseCacheKey {} + +impl Hash for ResponseCacheKey { + fn hash(&self, state: &mut H) { + self.block.as_ref().map(|x| x.hash()).hash(state); + self.method.hash(state); + self.params.as_ref().map(|x| x.to_string()).hash(state); + self.cache_errors.hash(state) + } +} + type ResponseCache = Cache; @@ -560,19 +620,13 @@ impl Web3ProxyApp { // TODO: don't allow any response to be bigger than X% of the cache let response_cache = Cache::builder() .max_capacity(1024 * 1024 * 1024) - .weigher(|k: &(H256, String, Option), v| { - // TODO: make this weigher past. serializing json is not fast - let mut size = (k.1).len(); - - if let Some(params) = &k.2 { - size += params.len() - } - + .weigher(|k: &ResponseCacheKey, v| { + // TODO: is this good? if let Ok(v) = serde_json::to_string(v) { - size += v.len(); + let weight = k.weight() + v.len(); // the or in unwrap_or is probably never called - size.try_into().unwrap_or(u32::MAX) + weight.try_into().unwrap_or(u32::MAX) } else { // this seems impossible u32::MAX @@ -974,7 +1028,8 @@ impl Web3ProxyApp { // we do this check before checking caches because it might modify the request params // TODO: add a stat for archive vs full since they should probably cost different - let request_block = if let Some(request_block_needed) = block_needed( + // TODO: this cache key can be rather large. is that okay? + let cache_key: Option = match block_needed( authorization, method, request.params.as_mut(), @@ -983,69 +1038,96 @@ impl Web3ProxyApp { ) .await? { - // TODO: maybe this should be on the app and not on balanced_rpcs - let (request_block_hash, archive_needed) = self - .balanced_rpcs - .block_hash(authorization, &request_block_needed) - .await?; + BlockNeeded::CacheSuccessForever => Some(ResponseCacheKey { + block: None, + method: method.to_string(), + params: request.params.clone(), + cache_errors: false, + }), + BlockNeeded::CacheNever => None, + BlockNeeded::Cache { + block_num, + cache_errors, + } => { + let (request_block_hash, archive_needed) = self + .balanced_rpcs + .block_hash(authorization, &block_num) + .await?; - if archive_needed { - request_metadata - .archive_request - .store(true, atomic::Ordering::Relaxed); + if archive_needed { + request_metadata + .archive_request + .store(true, atomic::Ordering::Relaxed); + } + + let request_block = self + .balanced_rpcs + .block(authorization, &request_block_hash, None) + .await?; + + Some(ResponseCacheKey { + block: Some(SavedBlock::new(request_block)), + method: method.to_string(), + // TODO: hash here? + params: request.params.clone(), + cache_errors, + }) } - - let request_block = self - .balanced_rpcs - .block(authorization, &request_block_hash, None) - .await?; - - SavedBlock::new(request_block) - } else { - head_block }; - // TODO: struct for this? - // TODO: this can be rather large. is that okay? - let cache_key = ( - request_block.hash(), - request.method.clone(), - request.params.clone().map(|x| x.to_string()), - ); - let mut response = { let request_metadata = request_metadata.clone(); let authorization = authorization.clone(); - self.response_cache - .try_get_with(cache_key, async move { - // TODO: retry some failures automatically! - // TODO: try private_rpcs if all the balanced_rpcs fail! - // TODO: put the hash here instead? - let mut response = self - .balanced_rpcs - .try_send_best_upstream_server( - &authorization, - request, - Some(&request_metadata), - Some(&request_block.number()), - ) - .await?; + if let Some(cache_key) = cache_key { + let request_block_number = cache_key.block.as_ref().map(|x| x.number()); - // discard their id by replacing it with an empty - response.id = Default::default(); + self.response_cache + .try_get_with(cache_key, async move { + // TODO: retry some failures automatically! + // TODO: try private_rpcs if all the balanced_rpcs fail! + // TODO: put the hash here instead? + let mut response = self + .balanced_rpcs + .try_send_best_upstream_server( + &authorization, + request, + Some(&request_metadata), + request_block_number.as_ref(), + ) + .await?; - // TODO: only cache the inner response (or error) - Ok::<_, anyhow::Error>(response) - }) - .await - // TODO: what is the best way to handle an Arc here? - .map_err(|err| { - // TODO: emit a stat for an error - anyhow::anyhow!(err) - }) - .context("caching response")? + // discard their id by replacing it with an empty + response.id = Default::default(); + + // TODO: only cache the inner response (or error) + Ok::<_, anyhow::Error>(response) + }) + .await + // TODO: what is the best way to handle an Arc here? + .map_err(|err| { + // TODO: emit a stat for an error + anyhow::anyhow!(err) + }) + .context("caching response")? + } else { + let mut response = self + .balanced_rpcs + .try_send_best_upstream_server( + &authorization, + request, + Some(&request_metadata), + None, + ) + .await?; + + // discard their id by replacing it with an empty + response.id = Default::default(); + + // TODO: only cache the inner response (or error) + response + } }; // since this data came likely out of a cache, the id is not going to match diff --git a/web3_proxy/src/block_number.rs b/web3_proxy/src/block_number.rs index b383a375..5b646d5f 100644 --- a/web3_proxy/src/block_number.rs +++ b/web3_proxy/src/block_number.rs @@ -5,6 +5,7 @@ use ethers::{ types::H256, }; use log::warn; +use serde_json::json; use std::sync::Arc; use crate::{frontend::authorization::Authorization, rpcs::connections::Web3Connections}; @@ -97,7 +98,12 @@ pub async fn clean_block_number( } } -// TODO: change this to also return the hash needed? +/// TODO: change this to also return the hash needed? +pub enum BlockNeeded { + CacheSuccessForever, + CacheNever, + Cache { block_num: U64, cache_errors: bool }, +} pub async fn block_needed( authorization: &Arc, @@ -105,12 +111,17 @@ pub async fn block_needed( params: Option<&mut serde_json::Value>, head_block_num: U64, rpcs: &Web3Connections, -) -> anyhow::Result> { +) -> anyhow::Result { // if no params, no block is needed let params = if let Some(params) = params { params } else { - return Ok(None); + // TODO: check all the methods with no params, some might not be cacheable + // caching for one block should always be okay + return Ok(BlockNeeded::Cache { + block_num: head_block_num, + cache_errors: true, + }); }; // get the index for the BlockNumber or return None to say no block is needed. @@ -122,20 +133,26 @@ pub async fn block_needed( "eth_getBalance" => 1, "eth_getBlockByHash" => { // TODO: double check that any node can serve this - return Ok(None); + // TODO: can a block change? like what if it gets orphaned? + return Ok(BlockNeeded::CacheSuccessForever); } "eth_getBlockByNumber" => { // TODO: double check that any node can serve this - return Ok(None); + // TODO: CacheSuccessForever if the block is old enough + return Ok(BlockNeeded::Cache { + block_num: head_block_num, + cache_errors: true, + }); } "eth_getBlockReceipts" => 0, "eth_getBlockTransactionCountByHash" => { // TODO: double check that any node can serve this - return Ok(None); + return Ok(BlockNeeded::CacheSuccessForever); } "eth_getBlockTransactionCountByNumber" => 0, "eth_getCode" => 1, "eth_getLogs" => { + // TODO: think about this more // TODO: jsonrpc has a specific code for this let obj = params[0] .as_object_mut() @@ -146,12 +163,14 @@ pub async fn block_needed( let block_num = block_num_to_u64(block_num, head_block_num); - *x = - serde_json::to_value(block_num).expect("U64 can always be a serde_json::Value"); + *x = json!(block_num); // TODO: maybe don't return. instead check toBlock too? // TODO: if there is a very wide fromBlock and toBlock, we need to check that our rpcs have both! - return Ok(Some(block_num)); + return Ok(BlockNeeded::Cache { + block_num, + cache_errors: false, + }); } if let Some(x) = obj.get_mut("toBlock") { @@ -159,60 +178,80 @@ pub async fn block_needed( let block_num = block_num_to_u64(block_num, head_block_num); - *x = serde_json::to_value(block_num) - .expect("block_num should always turn into a value"); + *x = json!(block_num); - return Ok(Some(block_num)); + return Ok(BlockNeeded::Cache { + block_num, + cache_errors: false, + }); } if obj.contains_key("blockHash") { 1 } else { - return Ok(None); + return Ok(BlockNeeded::Cache { + block_num: head_block_num, + cache_errors: true, + }); } } "eth_getStorageAt" => 2, "eth_getTransactionByHash" => { // TODO: not sure how best to look these up // try full nodes first. retry will use archive - return Ok(None); + return Ok(BlockNeeded::Cache { + block_num: head_block_num, + cache_errors: true, + }); } "eth_getTransactionByBlockHashAndIndex" => { // TODO: check a Cache of recent hashes // try full nodes first. retry will use archive - return Ok(None); + return Ok(BlockNeeded::CacheSuccessForever); } "eth_getTransactionByBlockNumberAndIndex" => 0, "eth_getTransactionCount" => 1, "eth_getTransactionReceipt" => { // TODO: not sure how best to look these up // try full nodes first. retry will use archive - return Ok(None); + return Ok(BlockNeeded::Cache { + block_num: head_block_num, + cache_errors: true, + }); } "eth_getUncleByBlockHashAndIndex" => { // TODO: check a Cache of recent hashes // try full nodes first. retry will use archive - return Ok(None); + return Ok(BlockNeeded::CacheSuccessForever); } "eth_getUncleByBlockNumberAndIndex" => 0, "eth_getUncleCountByBlockHash" => { // TODO: check a Cache of recent hashes // try full nodes first. retry will use archive - return Ok(None); + return Ok(BlockNeeded::CacheSuccessForever); } "eth_getUncleCountByBlockNumber" => 0, _ => { // some other command that doesn't take block numbers as an argument - return Ok(None); + // since we are caching with the head block, it should be safe to cache_errors + return Ok(BlockNeeded::Cache { + block_num: head_block_num, + cache_errors: true, + }); } }; match clean_block_number(authorization, params, block_param_id, head_block_num, rpcs).await { - Ok(block) => Ok(Some(block)), + Ok(block_num) => Ok(BlockNeeded::Cache { + block_num, + cache_errors: true, + }), Err(err) => { - // TODO: seems unlikely that we will get here warn!("could not get block from params. err={:?}", err); - Ok(None) + Ok(BlockNeeded::Cache { + block_num: head_block_num, + cache_errors: true, + }) } } } diff --git a/web3_proxy/src/frontend/rpc_proxy_http.rs b/web3_proxy/src/frontend/rpc_proxy_http.rs index fc03c712..c5c7b29d 100644 --- a/web3_proxy/src/frontend/rpc_proxy_http.rs +++ b/web3_proxy/src/frontend/rpc_proxy_http.rs @@ -31,7 +31,7 @@ pub async fn proxy_web3_rpc( // TODO: spawn earlier? i think we want ip_is_authorized in this future let f = tokio::spawn(async move { app.proxy_web3_rpc(authorization, payload).await }); - let response = f.await.expect("joinhandle should always work")?; + let response = f.await??; Ok(Json(&response).into_response()) } diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index ff350bed..b43f7294 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -32,6 +32,17 @@ pub struct SavedBlock { pub lag: u64, } +impl PartialEq for SavedBlock { + fn eq(&self, other: &Self) -> bool { + match (self.block.hash, other.block.hash) { + (None, None) => true, + (Some(_), None) => false, + (None, Some(_)) => false, + (Some(s), Some(o)) => s == o, + } + } +} + impl SavedBlock { pub fn new(block: ArcBlock) -> Self { let mut x = Self { block, lag: 0 }; @@ -67,12 +78,12 @@ impl SavedBlock { } pub fn hash(&self) -> H256 { - self.block.hash.unwrap() + self.block.hash.expect("saved blocks must have a hash") } // TODO: return as U64 or u64? pub fn number(&self) -> U64 { - self.block.number.unwrap() + self.block.number.expect("saved blocks must have a number") } /// When the block was received, this node was still syncing diff --git a/web3_proxy/src/rpcs/connections.rs b/web3_proxy/src/rpcs/connections.rs index 6497a7f5..b364a2ea 100644 --- a/web3_proxy/src/rpcs/connections.rs +++ b/web3_proxy/src/rpcs/connections.rs @@ -18,7 +18,7 @@ use futures::future::{join_all, try_join_all}; use futures::stream::FuturesUnordered; use futures::StreamExt; use hashbrown::HashMap; -use log::{error, info, trace, warn, Level}; +use log::{debug, error, info, trace, warn, Level}; use migration::sea_orm::DatabaseConnection; use moka::future::{Cache, ConcurrentCacheExt}; use serde::ser::{SerializeStruct, Serializer}; @@ -663,7 +663,9 @@ impl Web3Connections { .last() .expect("there must have been a provider if we got an error"); - warn!( + // TODO: emit a stat. if a server is getting skipped a lot, something is not right + + debug!( "Backend server error on {}! Retrying on another. err={:?}", rpc, err );