From 82eb449e966d9274b9cc154f7b6ec7f5990180e0 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Mon, 19 Dec 2022 21:37:12 -0800 Subject: [PATCH] add backend nodes to the rpc response headers only do this in dev? --- redis-rate-limiter/src/lib.rs | 2 +- web3_proxy/src/app/mod.rs | 93 +++++++++++++++-------- web3_proxy/src/frontend/errors.rs | 55 +++++++++----- web3_proxy/src/frontend/rpc_proxy_http.rs | 27 +++++-- web3_proxy/src/frontend/rpc_proxy_ws.rs | 17 +++-- web3_proxy/src/jsonrpc.rs | 2 - 6 files changed, 129 insertions(+), 67 deletions(-) diff --git a/redis-rate-limiter/src/lib.rs b/redis-rate-limiter/src/lib.rs index 3f2fd0f3..7c9ab5b3 100644 --- a/redis-rate-limiter/src/lib.rs +++ b/redis-rate-limiter/src/lib.rs @@ -108,7 +108,7 @@ impl RedisRateLimiter { // do the query .query_async(&mut *conn) .await - .context("increment rate limit and set expiration")?; + .context("cannot increment rate limit or set expiration")?; let new_count: u64 = *x.first().expect("check redis"); diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index cc2e7c4e..3bb9778a 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -5,11 +5,13 @@ use crate::app_stats::{ProxyResponseStat, StatEmitter, Web3ProxyStat}; use crate::block_number::{block_needed, BlockNeeded}; use crate::config::{AppConfig, TopConfig}; use crate::frontend::authorization::{Authorization, RequestMetadata}; +use crate::frontend::errors::FrontendErrorResponse; use crate::jsonrpc::JsonRpcForwardedResponse; use crate::jsonrpc::JsonRpcForwardedResponseEnum; use crate::jsonrpc::JsonRpcRequest; use crate::jsonrpc::JsonRpcRequestEnum; use crate::rpcs::blockchain::{ArcBlock, SavedBlock}; +use crate::rpcs::connection::Web3Connection; use crate::rpcs::connections::Web3Connections; use crate::rpcs::request::OpenRequestHandleMetrics; use crate::rpcs::transactions::TxStatus; @@ -21,11 +23,10 @@ use derive_more::From; use entities::sea_orm_active_enums::LogLevel; use ethers::core::utils::keccak256; use ethers::prelude::{Address, Block, Bytes, TxHash, H256, U64}; -use ethers::types::U256; use futures::future::join_all; use futures::stream::FuturesUnordered; use futures::stream::StreamExt; -use hashbrown::HashMap; +use hashbrown::{HashMap, HashSet}; use ipnet::IpNet; use log::{debug, error, info, warn}; use metered::{metered, ErrorCount, HitCount, ResponseTime, Throughput}; @@ -708,7 +709,8 @@ impl Web3ProxyApp { self: &Arc, authorization: Arc, request: JsonRpcRequestEnum, - ) -> anyhow::Result { + ) -> Result<(JsonRpcForwardedResponseEnum, Vec>), FrontendErrorResponse> + { // TODO: this should probably be trace level // // trace!(?request, "proxy_web3_rpc"); @@ -718,24 +720,25 @@ impl Web3ProxyApp { let max_time = Duration::from_secs(120); let response = match request { - JsonRpcRequestEnum::Single(request) => JsonRpcForwardedResponseEnum::Single( - timeout( + JsonRpcRequestEnum::Single(request) => { + let (response, rpcs) = timeout( max_time, self.proxy_web3_rpc_request(&authorization, request), ) - .await??, - ), - JsonRpcRequestEnum::Batch(requests) => JsonRpcForwardedResponseEnum::Batch( - timeout( + .await??; + + (JsonRpcForwardedResponseEnum::Single(response), rpcs) + } + JsonRpcRequestEnum::Batch(requests) => { + let (responses, rpcs) = timeout( max_time, self.proxy_web3_rpc_requests(&authorization, requests), ) - .await??, - ), - }; + .await??; - // TODO: this should probably be trace level - // // trace!(?response, "Forwarding"); + (JsonRpcForwardedResponseEnum::Batch(responses), rpcs) + } + }; Ok(response) } @@ -746,12 +749,12 @@ impl Web3ProxyApp { self: &Arc, authorization: &Arc, requests: Vec, - ) -> anyhow::Result> { - // TODO: we should probably change ethers-rs to support this directly + ) -> anyhow::Result<(Vec, Vec>)> { + // TODO: we should probably change ethers-rs to support this directly. they pushed this off to v2 though let num_requests = requests.len(); - // TODO: spawn so the requests go in parallel - // TODO: i think we will need to flatten + // TODO: spawn so the requests go in parallel? need to think about rate limiting more if we do that + // TODO: improve flattening let responses = join_all( requests .into_iter() @@ -760,14 +763,21 @@ impl Web3ProxyApp { ) .await; - // TODO: i'm sure this could be done better with iterators. we could return the error earlier then, too + // TODO: i'm sure this could be done better with iterators // TODO: stream the response? let mut collected: Vec = Vec::with_capacity(num_requests); + let mut collected_rpcs: HashSet> = HashSet::new(); for response in responses { - collected.push(response?); + // TODO: any way to attach the tried rpcs to the error? it is likely helpful + let (response, rpcs) = response?; + + collected.push(response); + collected_rpcs.extend(rpcs.into_iter()); } - Ok(collected) + let collected_rpcs: Vec<_> = collected_rpcs.into_iter().collect(); + + Ok((collected, collected_rpcs)) } /// TODO: i don't think we want or need this. just use app.db_conn, or maybe app.db_conn.clone() or app.db_conn.as_ref() @@ -795,7 +805,7 @@ impl Web3ProxyApp { self: &Arc, authorization: &Arc, mut request: JsonRpcRequest, - ) -> anyhow::Result { + ) -> anyhow::Result<(JsonRpcForwardedResponse, Vec>)> { // trace!("Received request: {:?}", request); let request_metadata = Arc::new(RequestMetadata::new(REQUEST_PERIOD, request.num_bytes())?); @@ -917,6 +927,8 @@ impl Web3ProxyApp { // no stats on this. its cheap json!(Address::zero()) } + /* + // erigon was giving bad estimates. but now it doesn't need it "eth_estimateGas" => { // TODO: eth_estimateGas using anvil? // TODO: modify the block requested? @@ -937,15 +949,18 @@ impl Web3ProxyApp { parsed_gas_estimate } else { // i think this is always an error response - return Ok(response); + let rpcs = request_metadata.backend_requests.lock().clone(); + + return Ok((response, rpcs)); }; - // increase by 10.01% + // increase by 1.01% let parsed_gas_estimate = - parsed_gas_estimate * U256::from(110_010) / U256::from(100_000); + parsed_gas_estimate * U256::from(101_010) / U256::from(100_000); json!(parsed_gas_estimate) } + */ // TODO: eth_gasPrice that does awesome magic to predict the future "eth_hashrate" => { // no stats on this. its cheap @@ -959,16 +974,24 @@ impl Web3ProxyApp { // broadcast transactions to all private rpcs at once "eth_sendRawTransaction" => { // emit stats - let rpcs = self.private_rpcs.as_ref().unwrap_or(&self.balanced_rpcs); + let private_rpcs = self.private_rpcs.as_ref().unwrap_or(&self.balanced_rpcs); - return rpcs + let mut response = private_rpcs .try_send_all_upstream_servers( authorization, request, - Some(request_metadata), + Some(request_metadata.clone()), None, ) - .await; + .await?; + + response.id = request_id; + + let rpcs = request_metadata.backend_requests.lock().clone(); + + // TODO! STATS! + + return Ok((response, rpcs)); } "eth_syncing" => { // no stats on this. its cheap @@ -1134,6 +1157,9 @@ impl Web3ProxyApp { // replace the id with our request's id. response.id = request_id; + // TODO: DRY! + let rpcs = request_metadata.backend_requests.lock().clone(); + if let Some(stat_sender) = self.stat_sender.as_ref() { let response_stat = ProxyResponseStat::new( method.to_string(), @@ -1148,12 +1174,15 @@ impl Web3ProxyApp { .context("stat_sender sending response_stat")?; } - return Ok(response); + return Ok((response, rpcs)); } }; let response = JsonRpcForwardedResponse::from_value(partial_response, request_id); + // TODO: DRY + let rpcs = request_metadata.backend_requests.lock().clone(); + if let Some(stat_sender) = self.stat_sender.as_ref() { let response_stat = ProxyResponseStat::new( request_method, @@ -1168,9 +1197,7 @@ impl Web3ProxyApp { .context("stat_sender sending response stat")?; } - todo!("attach a header here"); - - Ok(response) + Ok((response, rpcs)) } } diff --git a/web3_proxy/src/frontend/errors.rs b/web3_proxy/src/frontend/errors.rs index fdbbbe04..0feb0afb 100644 --- a/web3_proxy/src/frontend/errors.rs +++ b/web3_proxy/src/frontend/errors.rs @@ -15,7 +15,6 @@ use log::{trace, warn}; use migration::sea_orm::DbErr; use redis_rate_limiter::redis::RedisError; use reqwest::header::ToStrError; -use std::error::Error; use tokio::{sync::AcquireError, task::JoinError, time::Instant}; // TODO: take "IntoResponse" instead of Response? @@ -27,7 +26,6 @@ pub enum FrontendErrorResponse { AccessDenied, Anyhow(anyhow::Error), SemaphoreAcquireError(AcquireError), - Box(Box), Database(DbErr), HeadersError(headers::Error), HeaderToString(ToStrError), @@ -40,6 +38,8 @@ pub enum FrontendErrorResponse { Response(Response), /// simple way to return an error message to the user and an anyhow to our logs StatusCode(StatusCode, String, Option), + /// TODO: what should be attached to the timout? + Timeout(tokio::time::error::Elapsed), UlidDecodeError(ulid::DecodeError), UnknownKey, } @@ -74,18 +74,18 @@ impl IntoResponse for FrontendErrorResponse { ), ) } - Self::Box(err) => { - warn!("boxed err={:?}", err); - ( - StatusCode::INTERNAL_SERVER_ERROR, - JsonRpcForwardedResponse::from_str( - // TODO: make this better. maybe include the error type? - "boxed error!", - Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), - None, - ), - ) - } + // Self::(err) => { + // warn!("boxed err={:?}", err); + // ( + // StatusCode::INTERNAL_SERVER_ERROR, + // JsonRpcForwardedResponse::from_str( + // // TODO: make this better. maybe include the error type? + // "boxed error!", + // Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), + // None, + // ), + // ) + // } Self::Database(err) => { warn!("database err={:?}", err); ( @@ -131,12 +131,20 @@ impl IntoResponse for FrontendErrorResponse { ) } Self::JoinError(err) => { - warn!("JoinError. likely shutting down. err={:?}", err); + let code = if err.is_cancelled() { + trace!("JoinError. likely shutting down. err={:?}", err); + StatusCode::BAD_GATEWAY + } else { + warn!("JoinError. err={:?}", err); + StatusCode::INTERNAL_SERVER_ERROR + }; + ( - StatusCode::INTERNAL_SERVER_ERROR, + code, JsonRpcForwardedResponse::from_str( + // TODO: different messages, too? "Unable to complete request", - Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), + Some(code.as_u16().into()), None, ), ) @@ -226,8 +234,17 @@ impl IntoResponse for FrontendErrorResponse { JsonRpcForwardedResponse::from_str(&err_msg, Some(code.into()), None), ) } + Self::Timeout(x) => ( + StatusCode::REQUEST_TIMEOUT, + JsonRpcForwardedResponse::from_str( + &format!("request timed out: {:?}", x), + Some(StatusCode::REQUEST_TIMEOUT.as_u16().into()), + // TODO: include the actual id! + None, + ), + ), Self::HeaderToString(err) => { - // // trace!(?err, "HeaderToString"); + // trace!(?err, "HeaderToString"); ( StatusCode::BAD_REQUEST, JsonRpcForwardedResponse::from_str( @@ -238,7 +255,7 @@ impl IntoResponse for FrontendErrorResponse { ) } Self::UlidDecodeError(err) => { - // // trace!(?err, "UlidDecodeError"); + // trace!(?err, "UlidDecodeError"); ( StatusCode::BAD_REQUEST, JsonRpcForwardedResponse::from_str( diff --git a/web3_proxy/src/frontend/rpc_proxy_http.rs b/web3_proxy/src/frontend/rpc_proxy_http.rs index c5c7b29d..9015125c 100644 --- a/web3_proxy/src/frontend/rpc_proxy_http.rs +++ b/web3_proxy/src/frontend/rpc_proxy_http.rs @@ -9,6 +9,7 @@ use axum::TypedHeader; use axum::{response::IntoResponse, Extension, Json}; use axum_client_ip::ClientIp; use axum_macros::debug_handler; +use itertools::Itertools; use std::sync::Arc; /// POST /rpc -- Public entrypoint for HTTP JSON-RPC requests. Web3 wallets use this. @@ -24,16 +25,30 @@ pub async fn proxy_web3_rpc( // TODO: do we care about keeping the TypedHeader wrapper? let origin = origin.map(|x| x.0); - let (authorization, _semaphore) = ip_is_authorized(&app, ip, origin).await?; + // TODO: move ip_is_authorized/key_is_authorized into proxy_web3_rpc + let f = tokio::spawn(async move { + let (authorization, _semaphore) = ip_is_authorized(&app, ip, origin).await?; - let authorization = Arc::new(authorization); + let authorization = Arc::new(authorization); - // TODO: spawn earlier? i think we want ip_is_authorized in this future - let f = tokio::spawn(async move { app.proxy_web3_rpc(authorization, payload).await }); + app.proxy_web3_rpc(authorization, payload).await + }); - let response = f.await??; + let (response, rpcs) = f.await??; - Ok(Json(&response).into_response()) + let mut response = Json(&response).into_response(); + + let headers = response.headers_mut(); + + // TODO: special string if no rpcs were used (cache hit)? + let rpcs: String = rpcs.into_iter().map(|x| x.name.clone()).join(","); + + headers.insert( + "W3P-RPCs", + rpcs.parse().expect("W3P-RPCS should always parse"), + ); + + Ok(response) } /// Authenticated entrypoint for HTTP JSON-RPC requests. Web3 wallets use this. diff --git a/web3_proxy/src/frontend/rpc_proxy_ws.rs b/web3_proxy/src/frontend/rpc_proxy_ws.rs index 9768ff83..4243cd4b 100644 --- a/web3_proxy/src/frontend/rpc_proxy_ws.rs +++ b/web3_proxy/src/frontend/rpc_proxy_ws.rs @@ -180,7 +180,6 @@ async fn handle_socket_payload( // TODO: do any clients send batches over websockets? let (id, response) = match serde_json::from_str::(payload) { Ok(json_request) => { - // TODO: should we use this id for the subscription id? it should be unique and means we dont need an atomic let id = json_request.id.clone(); let response: anyhow::Result = match &json_request.method @@ -251,8 +250,13 @@ async fn handle_socket_payload( Ok(response.into()) } _ => { - app.proxy_web3_rpc(authorization.clone(), json_request.into()) + let (response, _) = app + .proxy_web3_rpc(authorization.clone(), json_request.into()) .await + // TODO: DO NOT UNWRAP HERE! ANY FAILING MESSAGES WILL KEPP THE CONNECTION! + .unwrap(); + + Ok(response) } }; @@ -266,15 +270,16 @@ async fn handle_socket_payload( }; let response_str = match response { - Ok(x) => serde_json::to_string(&x), + Ok(x) => serde_json::to_string(&x).expect("to_string should always work here"), Err(err) => { // we have an anyhow error. turn it into a response let response = JsonRpcForwardedResponse::from_anyhow_error(err, None, Some(id)); - serde_json::to_string(&response) + + serde_json::to_string(&response).expect("to_string should always work here") } - } + }; + // TODO: what error should this be? - .unwrap(); Message::Text(response_str) } diff --git a/web3_proxy/src/jsonrpc.rs b/web3_proxy/src/jsonrpc.rs index fc423efe..6395a208 100644 --- a/web3_proxy/src/jsonrpc.rs +++ b/web3_proxy/src/jsonrpc.rs @@ -1,11 +1,9 @@ -use crate::rpcs::connection::Web3Connection; use derive_more::From; use ethers::prelude::{HttpClientError, ProviderError, WsClientError}; use serde::de::{self, Deserializer, MapAccess, SeqAccess, Visitor}; use serde::{Deserialize, Serialize}; use serde_json::value::RawValue; use std::fmt; -use std::sync::Arc; // this is used by serde #[allow(dead_code)]