diff --git a/README.md b/README.md index 39f331fe..28015e0f 100644 --- a/README.md +++ b/README.md @@ -94,21 +94,21 @@ TODO: also enable debug symbols in the release build by modifying the root Cargo Test the proxy: - wrk -s ./data/wrk/getBlockNumber.lua -t12 -c400 -d30s --latency http://127.0.0.1:8544 - wrk -s ./data/wrk/getLatestBlockByNumber.lua -t12 -c400 -d30s --latency http://127.0.0.1:8544 + wrk -s ./wrk/getBlockNumber.lua -t12 -c400 -d30s --latency http://127.0.0.1:8544/u/$API_KEY + wrk -s ./wrk/getLatestBlockByNumber.lua -t12 -c400 -d30s --latency http://127.0.0.1:8544/u/$API_KEY Test geth (assuming it is on 8545): - wrk -s ./data/wrk/getBlockNumber.lua -t12 -c400 -d30s --latency http://127.0.0.1:8545 - wrk -s ./data/wrk/getLatestBlockByNumber.lua -t12 -c400 -d30s --latency http://127.0.0.1:8545 + wrk -s ./wrk/getBlockNumber.lua -t12 -c400 -d30s --latency http://127.0.0.1:8545 + wrk -s ./wrk/getLatestBlockByNumber.lua -t12 -c400 -d30s --latency http://127.0.0.1:8545 Test erigon (assuming it is on 8945): - wrk -s ./data/wrk/getBlockNumber.lua -t12 -c400 -d30s --latency http://127.0.0.1:8945 - wrk -s ./data/wrk/getLatestBlockByNumber.lua -t12 -c400 -d30s --latency http://127.0.0.1:8945 + wrk -s ./wrk/getBlockNumber.lua -t12 -c400 -d30s --latency http://127.0.0.1:8945 + wrk -s ./wrk/getLatestBlockByNumber.lua -t12 -c400 -d30s --latency http://127.0.0.1:8945 Note: Testing with `getLatestBlockByNumber.lua` is not great because the latest block changes and so one run is likely to be very different than another. Run [ethspam](https://github.com/INFURA/versus) and [versus](https://github.com/shazow/ethspam) for a more realistic load test: - ethspam --rpc http://127.0.0.1:8544/u/someuserkey | versus --concurrency=100 --stop-after=10000 http://127.0.0.1:8544/u/someuserkey + ethspam --rpc http://127.0.0.1:8544/u/$API_KEY | versus --concurrency=100 --stop-after=10000 http://127.0.0.1:8544/u/$API_KEY diff --git a/web3_proxy/src/app.rs b/web3_proxy/src/app.rs index 69fbf621..d98a4546 100644 --- a/web3_proxy/src/app.rs +++ b/web3_proxy/src/app.rs @@ -6,7 +6,7 @@ use crate::jsonrpc::JsonRpcForwardedResponse; use crate::jsonrpc::JsonRpcForwardedResponseEnum; use crate::jsonrpc::JsonRpcRequest; use crate::jsonrpc::JsonRpcRequestEnum; -use crate::rpcs::blockchain::{ArcBlock, BlockHashesMap}; +use crate::rpcs::blockchain::{ArcBlock, BlockHashesMap, BlockId}; use crate::rpcs::connections::Web3Connections; use crate::rpcs::transactions::TxStatus; use crate::stats::AppStats; @@ -32,10 +32,10 @@ use serde_json::json; use std::fmt; use std::pin::Pin; use std::str::FromStr; -use std::sync::atomic::{self, AtomicUsize}; +use std::sync::atomic::{self, AtomicU64, AtomicUsize}; use std::sync::Arc; use std::time::Duration; -use tokio::sync::{broadcast, watch}; +use tokio::sync::{broadcast, watch, Notify}; use tokio::task::JoinHandle; use tokio::time::{timeout, Instant}; use tokio_stream::wrappers::{BroadcastStream, WatchStream}; @@ -52,9 +52,13 @@ static APP_USER_AGENT: &str = concat!( /// block hash, method, params // TODO: better name -type ResponseCacheKey = (H256, String, Option); +type Web3QueryCacheKey = (H256, String, Option); -type ResponseCache = Cache; +/// wait on this to +type ResponseCacheReady = Arc; + +type RequestCache = Cache; +type ResponseCache = Cache; pub type AnyhowJoinHandle = JoinHandle>; @@ -80,9 +84,12 @@ pub struct Web3ProxyApp { pending_tx_sender: broadcast::Sender, pub config: AppConfig, pub db_conn: Option, + /// store pending queries so that we don't send the same request to our backends multiple times + pub total_queries: AtomicU64, + pub active_queries: RequestCache, /// store pending transactions that we've seen so that we don't send duplicates to subscribers pub pending_transactions: Cache, - pub rate_limiter: Option, + pub frontend_rate_limiter: Option, pub redis_pool: Option, pub stats: AppStats, pub user_cache: Cache, @@ -304,6 +311,8 @@ impl Web3ProxyApp { }); // TODO: change this to a sized cache + let total_queries = 0.into(); + let active_queries = Cache::new(10_000); let response_cache = Cache::new(10_000); let user_cache = Cache::new(10_000); @@ -314,8 +323,10 @@ impl Web3ProxyApp { response_cache, head_block_receiver, pending_tx_sender, + total_queries, + active_queries, pending_transactions, - rate_limiter: frontend_rate_limiter, + frontend_rate_limiter, db_conn, redis_pool, stats: app_stats, @@ -324,8 +335,6 @@ impl Web3ProxyApp { let app = Arc::new(app); - // create a handle that returns on the first error - // TODO: move this to a helper. i think Web3Connections needs it too let handle = Box::pin(flatten_handles(handles)); Ok((app, handle)) @@ -586,48 +595,6 @@ impl Web3ProxyApp { } } - async fn cached_response_or_key( - &self, - // TODO: accept a block hash here also? - min_block_needed: Option<&U64>, - request: &JsonRpcRequest, - ) -> anyhow::Result> { - // TODO: inspect the request to pick the right cache - // TODO: https://github.com/ethereum/web3.py/blob/master/web3/middleware/cache.py - - let request_block_hash = if let Some(min_block_needed) = min_block_needed { - // TODO: maybe this should be on the app and not on balanced_rpcs - self.balanced_rpcs.block_hash(min_block_needed).await? - } else { - // TODO: maybe this should be on the app and not on balanced_rpcs - self.balanced_rpcs - .head_block_hash() - .context("no servers in sync")? - }; - - // TODO: better key? benchmark this - let key = ( - request_block_hash, - request.method.clone(), - request.params.clone().map(|x| x.to_string()), - ); - - if let Some(response) = self.response_cache.get(&key) { - // TODO: emit a stat - trace!(?request.method, "cache hit!"); - - // TODO: can we make references work? maybe put them in an Arc? - return Ok(Ok(response)); - } - - // TODO: another lock here so that we don't send the same request to a backend more than onces xzs - - // TODO: emit a stat - trace!(?request.method, "cache miss!"); - - Ok(Err(key)) - } - async fn proxy_web3_rpc_request( &self, mut request: JsonRpcRequest, @@ -635,7 +602,7 @@ impl Web3ProxyApp { trace!("Received request: {:?}", request); // save the id so we can attach it to the response - let id = request.id.clone(); + let request_id = request.id.clone(); // TODO: if eth_chainId or net_version, serve those without querying the backend @@ -807,45 +774,41 @@ impl Web3ProxyApp { _ => return Err(anyhow::anyhow!("invalid request")), } } - - // TODO: web3_sha3? // anything else gets sent to backend rpcs and cached method => { // emit stats - let head_block_number = self + // TODO: wait for them to be synced? + let head_block_id = self .balanced_rpcs - .head_block_num() + .head_block_id() .context("no servers synced")?; // we do this check before checking caches because it might modify the request params // TODO: add a stat for archive vs full since they should probably cost different - let min_block_needed = - block_needed(method, request.params.as_mut(), head_block_number); - - let min_block_needed = min_block_needed.as_ref(); - - trace!(?min_block_needed, ?method); - - // TODO: emit a stat on error. maybe with .map_err? - let cache_key = match self - .cached_response_or_key(min_block_needed, &request) - .await? + let request_block_id = if let Some(request_block_needed) = + block_needed(method, request.params.as_mut(), head_block_id.num) { - Ok(mut cache_result) => { - // we got a cache hit! no need to do any backend requests. + // TODO: maybe this should be on the app and not on balanced_rpcs + let request_block_hash = + self.balanced_rpcs.block_hash(&request_block_needed).await?; - // put our request id on the cached response - // TODO: maybe only cache the inner result? then have a JsonRpcForwardedResponse::from_cache - cache_result.id = request.id; - - // emit a stat - - return Ok(cache_result); + BlockId { + num: request_block_needed, + hash: request_block_hash, } - Err(cache_key) => cache_key, + } else { + head_block_id }; + // TODO: struct for this? + // TODO: this can be rather large. is that okay? + let cache_key = ( + request_block_id.hash, + request.method.clone(), + request.params.clone().map(|x| x.to_string()), + ); + // TODO: move this caching outside this match and cache some of the other responses? // TODO: cache the warp::reply to save us serializing every time? let mut response = self @@ -856,13 +819,19 @@ impl Web3ProxyApp { // "eth_getTransactionByHash" | "eth_getTransactionReceipt" => { // TODO: try_send_all serially with retries instead of parallel self.private_rpcs - .try_send_all_upstream_servers(request, min_block_needed) + .try_send_all_upstream_servers( + request, + Some(&request_block_id.num), + ) .await } _ => { - // TODO: retries? + // TODO: retry some failures automatically! self.balanced_rpcs - .try_send_best_upstream_server(request, min_block_needed) + .try_send_best_upstream_server( + request, + Some(&request_block_id.num), + ) .await } } @@ -870,14 +839,15 @@ impl Web3ProxyApp { .await .unwrap(); - // this is fragile and i no longer like it - response.id = id; + // since this data came out of a cache, the id is likely wrong. + // replace the id with our request's id. + response.id = request_id; return Ok(response); } }; - let response = JsonRpcForwardedResponse::from_value(partial_response, id); + let response = JsonRpcForwardedResponse::from_value(partial_response, request_id); Ok(response) } diff --git a/web3_proxy/src/block_number.rs b/web3_proxy/src/block_number.rs index f28c2cf7..425ee657 100644 --- a/web3_proxy/src/block_number.rs +++ b/web3_proxy/src/block_number.rs @@ -65,7 +65,7 @@ pub fn clean_block_number( pub fn block_needed( method: &str, params: Option<&mut serde_json::Value>, - head_block: U64, + head_block_num: U64, ) -> Option { // if no params, no block is needed let params = params?; @@ -97,7 +97,7 @@ pub fn block_needed( if let Some(x) = obj.get_mut("fromBlock") { let block_num: BlockNumber = serde_json::from_value(x.clone()).ok()?; - let (modified, block_num) = block_num_to_u64(block_num, head_block); + let (modified, block_num) = block_num_to_u64(block_num, head_block_num); if modified { *x = serde_json::to_value(block_num).unwrap(); @@ -109,7 +109,7 @@ pub fn block_needed( if let Some(x) = obj.get_mut("toBlock") { let block_num: BlockNumber = serde_json::from_value(x.clone()).ok()?; - let (modified, block_num) = block_num_to_u64(block_num, head_block); + let (modified, block_num) = block_num_to_u64(block_num, head_block_num); if modified { *x = serde_json::to_value(block_num).unwrap(); @@ -162,7 +162,7 @@ pub fn block_needed( } }; - match clean_block_number(params, block_param_id, head_block) { + match clean_block_number(params, block_param_id, head_block_num) { Ok(block) => Some(block), Err(err) => { // TODO: seems unlikely that we will get here diff --git a/web3_proxy/src/frontend/http.rs b/web3_proxy/src/frontend/http.rs index 5ffc807e..7efca991 100644 --- a/web3_proxy/src/frontend/http.rs +++ b/web3_proxy/src/frontend/http.rs @@ -1,7 +1,7 @@ use crate::app::Web3ProxyApp; use axum::{http::StatusCode, response::IntoResponse, Extension, Json}; use serde_json::json; -use std::sync::Arc; +use std::sync::{atomic, Arc}; /// Health check page for load balancers to use pub async fn health(Extension(app): Extension>) -> impl IntoResponse { @@ -17,10 +17,15 @@ pub async fn health(Extension(app): Extension>) -> impl IntoRe pub async fn status(Extension(app): Extension>) -> impl IntoResponse { // TODO: what else should we include? uptime? let body = json!({ - "balanced_rpcs": app.balanced_rpcs, - "private_rpcs": app.private_rpcs, + "total_queries": app.total_queries.load(atomic::Ordering::Relaxed), + "active_queries_count": app.active_queries.entry_count(), + "active_queries_size": app.active_queries.weighted_size(), "pending_transactions_count": app.pending_transactions.entry_count(), "pending_transactions_size": app.pending_transactions.weighted_size(), + "user_cache_count": app.user_cache.entry_count(), + "user_cache_size": app.user_cache.weighted_size(), + "balanced_rpcs": app.balanced_rpcs, + "private_rpcs": app.private_rpcs, }); Json(body) diff --git a/web3_proxy/src/frontend/rate_limit.rs b/web3_proxy/src/frontend/rate_limit.rs index 7c4e6b70..593ba8ac 100644 --- a/web3_proxy/src/frontend/rate_limit.rs +++ b/web3_proxy/src/frontend/rate_limit.rs @@ -120,7 +120,7 @@ impl Web3ProxyApp { pub async fn rate_limit_by_ip(&self, ip: IpAddr) -> anyhow::Result { // TODO: dry this up with rate_limit_by_key // TODO: have a local cache because if we hit redis too hard we get errors - if let Some(rate_limiter) = &self.rate_limiter { + if let Some(rate_limiter) = &self.frontend_rate_limiter { let rate_limiter_label = format!("ip-{}", ip); // TODO: query redis in the background so that users don't have to wait on this network request @@ -236,7 +236,7 @@ impl Web3ProxyApp { // user key is valid. now check rate limits // TODO: this is throwing errors when curve-api hits us with high concurrency. investigate if false { - if let Some(rate_limiter) = &self.rate_limiter { + if let Some(rate_limiter) = &self.frontend_rate_limiter { // TODO: query redis in the background so that users don't have to wait on this network request // TODO: better key? have a prefix so its easy to delete all of these // TODO: we should probably hash this or something diff --git a/web3_proxy/src/frontend/rpc_proxy_http.rs b/web3_proxy/src/frontend/rpc_proxy_http.rs index 8e5af78d..81628613 100644 --- a/web3_proxy/src/frontend/rpc_proxy_http.rs +++ b/web3_proxy/src/frontend/rpc_proxy_http.rs @@ -3,7 +3,7 @@ use super::rate_limit::{rate_limit_by_ip, rate_limit_by_user_key}; use crate::stats::Protocol; use crate::{app::Web3ProxyApp, jsonrpc::JsonRpcRequestEnum}; use axum::extract::Path; -use axum::{http::StatusCode, response::IntoResponse, Extension, Json}; +use axum::{response::IntoResponse, Extension, Json}; use axum_client_ip::ClientIp; use std::sync::Arc; use tracing::{error_span, Instrument}; @@ -58,7 +58,7 @@ pub async fn public_proxy_web3_rpc( let response = app.proxy_web3_rpc(payload).instrument(user_span).await?; - Ok((StatusCode::OK, Json(&response)).into_response()) + Ok(Json(&response).into_response()) } pub async fn user_proxy_web3_rpc( @@ -74,5 +74,5 @@ pub async fn user_proxy_web3_rpc( let response = app.proxy_web3_rpc(payload).instrument(user_span).await?; - Ok((StatusCode::OK, Json(&response)).into_response()) + Ok(Json(&response).into_response()) } diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index a7233b8a..61e75ebf 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -24,8 +24,8 @@ pub type BlockHashesMap = Cache; /// A block's hash and number. #[derive(Clone, Debug, Default, From, Serialize)] pub struct BlockId { - pub(super) hash: H256, - pub(super) num: U64, + pub hash: H256, + pub num: U64, } impl Display for BlockId { diff --git a/web3_proxy/src/rpcs/connection.rs b/web3_proxy/src/rpcs/connection.rs index ae809d44..9feba2c1 100644 --- a/web3_proxy/src/rpcs/connection.rs +++ b/web3_proxy/src/rpcs/connection.rs @@ -787,9 +787,8 @@ impl Serialize for Web3Connection { state.serialize_field("name", &self.name)?; let block_data_limit = self.block_data_limit.load(atomic::Ordering::Relaxed); - if block_data_limit == u64::MAX { - state.serialize_field("block_data_limit", "None")?; + state.serialize_field("block_data_limit", &None::<()>)?; } else { state.serialize_field("block_data_limit", &block_data_limit)?; } @@ -821,9 +820,9 @@ impl fmt::Debug for Web3Connection { let block_data_limit = self.block_data_limit.load(atomic::Ordering::Relaxed); if block_data_limit == u64::MAX { - f.field("data", &"archive"); + f.field("blocks", &"all"); } else { - f.field("data", &block_data_limit); + f.field("blocks", &block_data_limit); } f.finish_non_exhaustive() diff --git a/web3_proxy/src/rpcs/connections.rs b/web3_proxy/src/rpcs/connections.rs index 23496ee0..6f7d8a6d 100644 --- a/web3_proxy/src/rpcs/connections.rs +++ b/web3_proxy/src/rpcs/connections.rs @@ -437,15 +437,15 @@ impl Web3Connections { // TODO: better type on this that can return an anyhow::Result pub async fn upstream_servers( &self, - min_block_needed: Option<&U64>, + block_needed: Option<&U64>, ) -> Result, Option> { let mut earliest_retry_at = None; // TODO: with capacity? let mut selected_rpcs = vec![]; for connection in self.conns.values() { - if let Some(min_block_needed) = min_block_needed { - if !connection.has_block_data(min_block_needed) { + if let Some(block_needed) = block_needed { + if !connection.has_block_data(block_needed) { continue; } } @@ -575,10 +575,10 @@ impl Web3Connections { pub async fn try_send_all_upstream_servers( &self, request: JsonRpcRequest, - min_block_needed: Option<&U64>, + block_needed: Option<&U64>, ) -> anyhow::Result { loop { - match self.upstream_servers(min_block_needed).await { + match self.upstream_servers(block_needed).await { Ok(active_request_handles) => { // TODO: benchmark this compared to waiting on unbounded futures // TODO: do something with this handle?