From d7c75f843e5a232bc3d1a697e8d62dfad3607e51 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Mon, 16 Jan 2023 22:54:40 -0800 Subject: [PATCH] add stub try_proxy_connection --- web3_proxy/src/app/mod.rs | 55 ++++---- web3_proxy/src/frontend/mod.rs | 92 ++++++++++++-- web3_proxy/src/frontend/rpc_proxy_http.rs | 111 ++++++++++++++++- web3_proxy/src/frontend/rpc_proxy_ws.rs | 145 ++++++++++++++++++++-- web3_proxy/src/rpcs/blockchain.rs | 4 +- web3_proxy/src/rpcs/connections.rs | 54 +++++--- web3_proxy/src/rpcs/request.rs | 44 +++---- 7 files changed, 426 insertions(+), 79 deletions(-) diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 5ec9d856..828dc6bb 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -6,6 +6,7 @@ use crate::block_number::{block_needed, BlockNeeded}; use crate::config::{AppConfig, TopConfig}; use crate::frontend::authorization::{Authorization, RequestMetadata}; use crate::frontend::errors::FrontendErrorResponse; +use crate::frontend::rpc_proxy_ws::ProxyMode; use crate::jsonrpc::{ JsonRpcForwardedResponse, JsonRpcForwardedResponseEnum, JsonRpcRequest, JsonRpcRequestEnum, }; @@ -907,10 +908,10 @@ impl Web3ProxyApp { self: &Arc, authorization: Arc, request: JsonRpcRequestEnum, + proxy_mode: ProxyMode, ) -> Result<(JsonRpcForwardedResponseEnum, Vec>), FrontendErrorResponse> { - // TODO: this should probably be trace level - // // trace!(?request, "proxy_web3_rpc"); + // trace!(?request, "proxy_web3_rpc"); // even though we have timeouts on the requests to our backend providers, // we need a timeout for the incoming request so that retries don't run forever @@ -921,7 +922,7 @@ impl Web3ProxyApp { JsonRpcRequestEnum::Single(request) => { let (response, rpcs) = timeout( max_time, - self.proxy_web3_rpc_request(&authorization, request), + self.proxy_cached_request(&authorization, request, proxy_mode), ) .await??; @@ -930,7 +931,7 @@ impl Web3ProxyApp { JsonRpcRequestEnum::Batch(requests) => { let (responses, rpcs) = timeout( max_time, - self.proxy_web3_rpc_requests(&authorization, requests), + self.proxy_web3_rpc_requests(&authorization, requests, proxy_mode), ) .await??; @@ -947,6 +948,7 @@ impl Web3ProxyApp { self: &Arc, authorization: &Arc, requests: Vec, + proxy_mode: ProxyMode, ) -> anyhow::Result<(Vec, Vec>)> { // TODO: we should probably change ethers-rs to support this directly. they pushed this off to v2 though let num_requests = requests.len(); @@ -956,7 +958,7 @@ impl Web3ProxyApp { let responses = join_all( requests .into_iter() - .map(|request| self.proxy_web3_rpc_request(authorization, request)) + .map(|request| self.proxy_cached_request(authorization, request, proxy_mode)) .collect::>(), ) .await; @@ -1000,10 +1002,11 @@ impl Web3ProxyApp { } #[measure([ErrorCount, HitCount, ResponseTime, Throughput])] - async fn proxy_web3_rpc_request( + async fn proxy_cached_request( self: &Arc, authorization: &Arc, mut request: JsonRpcRequest, + proxy_mode: ProxyMode, ) -> anyhow::Result<(JsonRpcForwardedResponse, Vec>)> { // trace!("Received request: {:?}", request); @@ -1172,22 +1175,32 @@ impl Web3ProxyApp { // TODO: eth_sendBundle (flashbots command) // broadcast transactions to all private rpcs at once "eth_sendRawTransaction" => { + // TODO: how should we handle private_mode here? + let default_num = match proxy_mode { + // TODO: how many balanced rpcs should we send to? configurable? percentage of total? + ProxyMode::Best => Some(2), + ProxyMode::Fastest(0) => None, + // TODO: how many balanced rpcs should we send to? configurable? percentage of total? + // TODO: what if we do 2 per tier? we want to blast the third party rpcs + // TODO: maybe having the third party rpcs in their own Web3Connections would be good for this + ProxyMode::Fastest(x) => Some(x * 2), + ProxyMode::Versus => None, + }; + let (private_rpcs, num) = if let Some(private_rpcs) = self.private_rpcs.as_ref() { if authorization.checks.private_txs { + // if we are sending the transaction privately, no matter the proxy_mode, we send to ALL private rpcs (private_rpcs, None) } else { - // TODO: how many balanced rpcs should we send to? configurable? percentage of total? - // TODO: what if we do 2 per tier? we want to blast the third party rpcs - // TODO: maybe having the third party rpcs would be good for this - (&self.balanced_rpcs, Some(2)) + (&self.balanced_rpcs, default_num) } } else { - (&self.balanced_rpcs, Some(2)) + (&self.balanced_rpcs, default_num) }; // try_send_all_upstream_servers puts the request id into the response. no need to do that ourselves here. let mut response = private_rpcs - .try_send_all_upstream_servers( + .try_send_all_synced_connections( authorization, &request, Some(request_metadata.clone()), @@ -1298,7 +1311,8 @@ impl Web3ProxyApp { json!(true) } "net_peerCount" => { - // emit stats + // no stats on this. its cheap + // TODO: do something with proxy_mode here? self.balanced_rpcs.num_synced_rpcs().into() } "web3_clientVersion" => { @@ -1404,10 +1418,12 @@ impl Web3ProxyApp { .try_get_with(cache_key, async move { // TODO: retry some failures automatically! // TODO: try private_rpcs if all the balanced_rpcs fail! - // TODO: put the hash here instead? + // TODO: put the hash here instead of the block number? its in the request already. + let mut response = self .balanced_rpcs - .try_send_best_upstream_server( + .try_proxy_connection( + proxy_mode, self.allowed_lag, &authorization, request, @@ -1433,18 +1449,15 @@ impl Web3ProxyApp { })? } else { self.balanced_rpcs - .try_send_best_upstream_server( + .try_proxy_connection( + proxy_mode, self.allowed_lag, &authorization, request, Some(&request_metadata), None, ) - .await - .map_err(|err| { - // TODO: emit a stat for an error - anyhow::anyhow!("error while forwarding response: {}", err) - })? + .await? } }; diff --git a/web3_proxy/src/frontend/mod.rs b/web3_proxy/src/frontend/mod.rs index 4d94367c..1507a835 100644 --- a/web3_proxy/src/frontend/mod.rs +++ b/web3_proxy/src/frontend/mod.rs @@ -42,26 +42,98 @@ pub async fn serve(port: u16, proxy_app: Arc) -> anyhow::Result<() // build our axum Router let app = Router::new() - // routes should be ordered most to least common + // TODO: i think these routes could be done a lot better + // + // HTTP RPC (POST) + // + // public .route("/", post(rpc_proxy_http::proxy_web3_rpc)) + // authenticated with and without trailing slash + .route( + "/rpc/:rpc_key/", + post(rpc_proxy_http::proxy_web3_rpc_with_key), + ) + .route( + "/rpc/:rpc_key", + post(rpc_proxy_http::proxy_web3_rpc_with_key), + ) + // public fastest with and without trailing slash + .route("/fastest/", post(rpc_proxy_http::fastest_proxy_web3_rpc)) + .route("/fastest", post(rpc_proxy_http::fastest_proxy_web3_rpc)) + // authenticated fastest with and without trailing slash + .route( + "/fastest/:rpc_key/", + post(rpc_proxy_http::fastest_proxy_web3_rpc_with_key), + ) + .route( + "/fastest/:rpc_key", + post(rpc_proxy_http::fastest_proxy_web3_rpc_with_key), + ) + // public versus + .route("/versus/", post(rpc_proxy_http::versus_proxy_web3_rpc)) + .route("/versus", post(rpc_proxy_http::versus_proxy_web3_rpc)) + // authenticated versus with and without trailing slash + .route( + "/versus/:rpc_key/", + post(rpc_proxy_http::versus_proxy_web3_rpc_with_key), + ) + .route( + "/versus/:rpc_key", + post(rpc_proxy_http::versus_proxy_web3_rpc_with_key), + ) + // + // Websocket RPC (GET) + // If not an RPC, this will redirect to configurable urls + // + // public .route("/", get(rpc_proxy_ws::websocket_handler)) - .route( - "/rpc/:rpc_key", - post(rpc_proxy_http::proxy_web3_rpc_with_key), - ) + // authenticated with and without trailing slash .route( "/rpc/:rpc_key/", - post(rpc_proxy_http::proxy_web3_rpc_with_key), + get(rpc_proxy_ws::websocket_handler_with_key), ) .route( "/rpc/:rpc_key", get(rpc_proxy_ws::websocket_handler_with_key), ) + // public fastest with and without trailing slash + .route("/fastest/", get(rpc_proxy_ws::fastest_websocket_handler)) + .route("/fastest", get(rpc_proxy_ws::fastest_websocket_handler)) + // authenticated fastest with and without trailing slash .route( - "/rpc/:rpc_key/", - get(rpc_proxy_ws::websocket_handler_with_key), + "/fastest/:rpc_key/", + get(rpc_proxy_ws::fastest_websocket_handler_with_key), ) + .route( + "/fastest/:rpc_key", + get(rpc_proxy_ws::fastest_websocket_handler_with_key), + ) + // public versus + .route( + "/versus/", + get(rpc_proxy_ws::versus_websocket_handler_with_key), + ) + .route( + "/versus", + get(rpc_proxy_ws::versus_websocket_handler_with_key), + ) + // authenticated versus with and without trailing slash + .route( + "/versus/:rpc_key/", + get(rpc_proxy_ws::versus_websocket_handler_with_key), + ) + .route( + "/versus/:rpc_key", + get(rpc_proxy_ws::versus_websocket_handler_with_key), + ) + // + // System things + // .route("/health", get(status::health)) + .route("/status", get(status::status)) + // + // User stuff + // .route("/user/login/:user_address", get(users::user_login_get)) .route( "/user/login/:user_address/:message_eip", @@ -86,9 +158,11 @@ pub async fn serve(port: u16, proxy_app: Arc) -> anyhow::Result<() ) .route("/user/stats/detailed", get(users::user_stats_detailed_get)) .route("/user/logout", post(users::user_logout_post)) - .route("/status", get(status::status)) + // + // Axum layers // layers are ordered bottom up // the last layer is first for requests and last for responses + // // Mark the `Authorization` request header as sensitive so it doesn't show in logs .layer(SetSensitiveRequestHeadersLayer::new(once(AUTHORIZATION))) // handle cors diff --git a/web3_proxy/src/frontend/rpc_proxy_http.rs b/web3_proxy/src/frontend/rpc_proxy_http.rs index 72664812..067546db 100644 --- a/web3_proxy/src/frontend/rpc_proxy_http.rs +++ b/web3_proxy/src/frontend/rpc_proxy_http.rs @@ -2,6 +2,7 @@ use super::authorization::{ip_is_authorized, key_is_authorized}; use super::errors::FrontendResult; +use super::rpc_proxy_ws::ProxyMode; use crate::{app::Web3ProxyApp, jsonrpc::JsonRpcRequestEnum}; use axum::extract::Path; use axum::headers::{Origin, Referer, UserAgent}; @@ -18,9 +19,41 @@ use std::sync::Arc; #[debug_handler] pub async fn proxy_web3_rpc( Extension(app): Extension>, - ClientIp(ip): ClientIp, + ip: ClientIp, origin: Option>, Json(payload): Json, +) -> FrontendResult { + _proxy_web3_rpc(app, ip, origin, payload, ProxyMode::Best).await +} + +#[debug_handler] +pub async fn fastest_proxy_web3_rpc( + Extension(app): Extension>, + ip: ClientIp, + origin: Option>, + Json(payload): Json, +) -> FrontendResult { + // TODO: read the fastest number from params + // TODO: check that the app allows this without authentication + _proxy_web3_rpc(app, ip, origin, payload, ProxyMode::Fastest(0)).await +} + +#[debug_handler] +pub async fn versus_proxy_web3_rpc( + Extension(app): Extension>, + ip: ClientIp, + origin: Option>, + Json(payload): Json, +) -> FrontendResult { + _proxy_web3_rpc(app, ip, origin, payload, ProxyMode::Versus).await +} + +async fn _proxy_web3_rpc( + app: Arc, + ClientIp(ip): ClientIp, + origin: Option>, + payload: JsonRpcRequestEnum, + proxy_mode: ProxyMode, ) -> FrontendResult { // TODO: benchmark spawning this // TODO: do we care about keeping the TypedHeader wrapper? @@ -31,7 +64,7 @@ pub async fn proxy_web3_rpc( let authorization = Arc::new(authorization); let (response, rpcs, _semaphore) = app - .proxy_web3_rpc(authorization, payload) + .proxy_web3_rpc(authorization, payload, proxy_mode) .await .map(|(x, y)| (x, y, semaphore))?; @@ -58,12 +91,82 @@ pub async fn proxy_web3_rpc( #[debug_handler] pub async fn proxy_web3_rpc_with_key( Extension(app): Extension>, - ClientIp(ip): ClientIp, + ip: ClientIp, origin: Option>, referer: Option>, user_agent: Option>, Path(rpc_key): Path, Json(payload): Json, +) -> FrontendResult { + _proxy_web3_rpc_with_key( + app, + ip, + origin, + referer, + user_agent, + rpc_key, + payload, + ProxyMode::Best, + ) + .await +} + +#[debug_handler] +pub async fn fastest_proxy_web3_rpc_with_key( + Extension(app): Extension>, + ip: ClientIp, + origin: Option>, + referer: Option>, + user_agent: Option>, + Path(rpc_key): Path, + Json(payload): Json, +) -> FrontendResult { + _proxy_web3_rpc_with_key( + app, + ip, + origin, + referer, + user_agent, + rpc_key, + payload, + ProxyMode::Fastest(0), + ) + .await +} + +#[debug_handler] +pub async fn versus_proxy_web3_rpc_with_key( + Extension(app): Extension>, + ip: ClientIp, + origin: Option>, + referer: Option>, + user_agent: Option>, + Path(rpc_key): Path, + Json(payload): Json, +) -> FrontendResult { + _proxy_web3_rpc_with_key( + app, + ip, + origin, + referer, + user_agent, + rpc_key, + payload, + ProxyMode::Versus, + ) + .await +} + +#[allow(clippy::too_many_arguments)] +async fn _proxy_web3_rpc_with_key( + app: Arc, + ClientIp(ip): ClientIp, + origin: Option>, + referer: Option>, + user_agent: Option>, + rpc_key: String, + payload: JsonRpcRequestEnum, + proxy_mode: ProxyMode, ) -> FrontendResult { // TODO: DRY w/ proxy_web3_rpc // the request can take a while, so we spawn so that we can start serving another request @@ -82,7 +185,7 @@ pub async fn proxy_web3_rpc_with_key( let authorization = Arc::new(authorization); let (response, rpcs, _semaphore) = app - .proxy_web3_rpc(authorization, payload) + .proxy_web3_rpc(authorization, payload, proxy_mode) .await .map(|(x, y)| (x, y, semaphore))?; diff --git a/web3_proxy/src/frontend/rpc_proxy_ws.rs b/web3_proxy/src/frontend/rpc_proxy_ws.rs index b1f70e9f..23516738 100644 --- a/web3_proxy/src/frontend/rpc_proxy_ws.rs +++ b/web3_proxy/src/frontend/rpc_proxy_ws.rs @@ -33,10 +33,58 @@ use serde_json::value::to_raw_value; use std::sync::Arc; use std::{str::from_utf8_mut, sync::atomic::AtomicUsize}; +#[derive(Copy, Clone)] +pub enum ProxyMode { + /// send to the "best" synced server + Best, + /// send to all synced servers and return the fastest non-error response (reverts do not count as errors here) + Fastest(usize), + /// send to all servers for benchmarking. return the fastest non-error response + Versus, +} + /// Public entrypoint for WebSocket JSON-RPC requests. +/// Queries a single server at a time #[debug_handler] pub async fn websocket_handler( Extension(app): Extension>, + ip: ClientIp, + origin: Option>, + ws_upgrade: Option, +) -> FrontendResult { + _websocket_handler(ProxyMode::Fastest(1), app, ip, origin, ws_upgrade).await +} + +/// Public entrypoint for WebSocket JSON-RPC requests that uses all synced servers. +/// Queries all synced backends with every request! This might get expensive! +#[debug_handler] +pub async fn fastest_websocket_handler( + Extension(app): Extension>, + ip: ClientIp, + origin: Option>, + ws_upgrade: Option, +) -> FrontendResult { + // TODO: get the fastest number from the url params (default to 0/all) + // TODO: config to disable this + _websocket_handler(ProxyMode::Fastest(0), app, ip, origin, ws_upgrade).await +} + +/// Public entrypoint for WebSocket JSON-RPC requests that uses all synced servers. +/// Queries **all** backends with every request! This might get expensive! +#[debug_handler] +pub async fn versus_websocket_handler( + Extension(app): Extension>, + ip: ClientIp, + origin: Option>, + ws_upgrade: Option, +) -> FrontendResult { + // TODO: config to disable this + _websocket_handler(ProxyMode::Versus, app, ip, origin, ws_upgrade).await +} + +async fn _websocket_handler( + proxy_mode: ProxyMode, + app: Arc, ClientIp(ip): ClientIp, origin: Option>, ws_upgrade: Option, @@ -49,7 +97,7 @@ pub async fn websocket_handler( match ws_upgrade { Some(ws) => Ok(ws - .on_upgrade(|socket| proxy_web3_socket(app, authorization, socket)) + .on_upgrade(move |socket| proxy_web3_socket(app, authorization, socket, proxy_mode)) .into_response()), None => { if let Some(redirect) = &app.config.redirect_public_url { @@ -72,12 +120,84 @@ pub async fn websocket_handler( #[debug_handler] pub async fn websocket_handler_with_key( Extension(app): Extension>, - ClientIp(ip): ClientIp, + ip: ClientIp, Path(rpc_key): Path, origin: Option>, referer: Option>, user_agent: Option>, ws_upgrade: Option, +) -> FrontendResult { + // TODO: config instead of defaulting to fastest(1)? + _websocket_handler_with_key( + ProxyMode::Fastest(1), + app, + ip, + rpc_key, + origin, + referer, + user_agent, + ws_upgrade, + ) + .await +} + +#[debug_handler] +pub async fn fastest_websocket_handler_with_key( + Extension(app): Extension>, + ip: ClientIp, + Path(rpc_key): Path, + origin: Option>, + referer: Option>, + user_agent: Option>, + ws_upgrade: Option, +) -> FrontendResult { + // TODO: get the fastest number from the url params (default to 0/all) + _websocket_handler_with_key( + ProxyMode::Fastest(0), + app, + ip, + rpc_key, + origin, + referer, + user_agent, + ws_upgrade, + ) + .await +} + +#[debug_handler] +pub async fn versus_websocket_handler_with_key( + Extension(app): Extension>, + ip: ClientIp, + Path(rpc_key): Path, + origin: Option>, + referer: Option>, + user_agent: Option>, + ws_upgrade: Option, +) -> FrontendResult { + _websocket_handler_with_key( + ProxyMode::Versus, + app, + ip, + rpc_key, + origin, + referer, + user_agent, + ws_upgrade, + ) + .await +} + +#[allow(clippy::too_many_arguments)] +async fn _websocket_handler_with_key( + proxy_mode: ProxyMode, + app: Arc, + ClientIp(ip): ClientIp, + rpc_key: String, + origin: Option>, + referer: Option>, + user_agent: Option>, + ws_upgrade: Option, ) -> FrontendResult { let rpc_key = rpc_key.parse()?; @@ -96,9 +216,8 @@ pub async fn websocket_handler_with_key( let authorization = Arc::new(authorization); match ws_upgrade { - Some(ws_upgrade) => { - Ok(ws_upgrade.on_upgrade(move |socket| proxy_web3_socket(app, authorization, socket))) - } + Some(ws_upgrade) => Ok(ws_upgrade + .on_upgrade(move |socket| proxy_web3_socket(app, authorization, socket, proxy_mode))), None => { // if no websocket upgrade, this is probably a user loading the url with their browser @@ -154,6 +273,7 @@ async fn proxy_web3_socket( app: Arc, authorization: Arc, socket: WebSocket, + proxy_mode: ProxyMode, ) { // split the websocket so we can read and write concurrently let (ws_tx, ws_rx) = socket.split(); @@ -162,7 +282,13 @@ async fn proxy_web3_socket( let (response_sender, response_receiver) = flume::unbounded::(); tokio::spawn(write_web3_socket(response_receiver, ws_tx)); - tokio::spawn(read_web3_socket(app, authorization, ws_rx, response_sender)); + tokio::spawn(read_web3_socket( + app, + authorization, + ws_rx, + response_sender, + proxy_mode, + )); } /// websockets support a few more methods than http clients @@ -173,6 +299,7 @@ async fn handle_socket_payload( response_sender: &flume::Sender, subscription_count: &AtomicUsize, subscriptions: &mut HashMap, + proxy_mode: ProxyMode, ) -> Message { // TODO: do any clients send batches over websockets? let (id, response) = match serde_json::from_str::(payload) { @@ -183,6 +310,7 @@ async fn handle_socket_payload( [..] { "eth_subscribe" => { + // TODO: how can we subscribe with proxy_mode? match app .eth_subscribe( authorization.clone(), @@ -247,7 +375,7 @@ async fn handle_socket_payload( Ok(response.into()) } _ => app - .proxy_web3_rpc(authorization.clone(), json_request.into()) + .proxy_web3_rpc(authorization.clone(), json_request.into(), proxy_mode) .await .map_or_else( |err| match err { @@ -291,6 +419,7 @@ async fn read_web3_socket( authorization: Arc, mut ws_rx: SplitStream, response_sender: flume::Sender, + proxy_mode: ProxyMode, ) { let mut subscriptions = HashMap::new(); let subscription_count = AtomicUsize::new(1); @@ -307,6 +436,7 @@ async fn read_web3_socket( &response_sender, &subscription_count, &mut subscriptions, + proxy_mode, ) .await } @@ -333,6 +463,7 @@ async fn read_web3_socket( &response_sender, &subscription_count, &mut subscriptions, + proxy_mode, ) .await } diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index 456d21fd..e03cc6fd 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -165,7 +165,7 @@ impl Web3Connections { // TODO: request_metadata? maybe we should put it in the authorization? // TODO: don't hard code allowed lag let response = self - .try_send_best_upstream_server(60, authorization, request, None, None) + .try_send_best_consensus_head_connection(60, authorization, request, None, None) .await?; let block = response.result.context("failed fetching block")?; @@ -241,7 +241,7 @@ impl Web3Connections { // TODO: if error, retry? // TODO: request_metadata or authorization? let response = self - .try_send_best_upstream_server(60, authorization, request, None, Some(num)) + .try_send_best_consensus_head_connection(60, authorization, request, None, Some(num)) .await?; let raw_block = response.result.context("no block result")?; diff --git a/web3_proxy/src/rpcs/connections.rs b/web3_proxy/src/rpcs/connections.rs index fbd75b3f..82dcbbe7 100644 --- a/web3_proxy/src/rpcs/connections.rs +++ b/web3_proxy/src/rpcs/connections.rs @@ -2,12 +2,13 @@ use super::blockchain::{ArcBlock, BlockHashesCache}; use super::connection::Web3Connection; use super::request::{ - OpenRequestHandle, OpenRequestHandleMetrics, OpenRequestResult, RequestErrorHandler, + OpenRequestHandle, OpenRequestHandleMetrics, OpenRequestResult, RequestRevertHandler, }; use super::synced_connections::SyncedConnections; use crate::app::{flatten_handle, AnyhowJoinHandle}; use crate::config::{BlockAndRpc, TxHashAndRpc, Web3ConnectionConfig}; use crate::frontend::authorization::{Authorization, RequestMetadata}; +use crate::frontend::rpc_proxy_ws::ProxyMode; use crate::jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest}; use crate::rpcs::transactions::TxStatus; use arc_swap::ArcSwap; @@ -406,8 +407,8 @@ impl Web3Connections { unimplemented!("this shouldn't be possible") } - /// get the best available rpc server - pub async fn best_synced_backend_connection( + /// get the best available rpc server with the consensus head block. it might have blocks after the consensus head + pub async fn best_consensus_head_connection( &self, allowed_lag: u64, authorization: &Arc, @@ -662,7 +663,7 @@ impl Web3Connections { /// be sure there is a timeout on this or it might loop forever /// TODO: do not take allowed_lag here. have it be on the connections struct instead - pub async fn try_send_best_upstream_server( + pub async fn try_send_best_consensus_head_connection( &self, allowed_lag: u64, authorization: &Arc, @@ -679,7 +680,7 @@ impl Web3Connections { break; } match self - .best_synced_backend_connection( + .best_consensus_head_connection( allowed_lag, authorization, request_metadata, @@ -705,7 +706,7 @@ impl Web3Connections { .request( &request.method, &json!(request.params), - RequestErrorHandler::SaveReverts, + RequestRevertHandler::Save, ) .await; @@ -818,7 +819,7 @@ impl Web3Connections { } /// be sure there is a timeout on this or it might loop forever - pub async fn try_send_all_upstream_servers( + pub async fn try_send_all_synced_connections( &self, authorization: &Arc, request: &JsonRpcRequest, @@ -887,6 +888,31 @@ impl Web3Connections { } } } + + pub async fn try_proxy_connection( + &self, + proxy_mode: ProxyMode, + allowed_lag: u64, + authorization: &Arc, + request: JsonRpcRequest, + request_metadata: Option<&Arc>, + min_block_needed: Option<&U64>, + ) -> anyhow::Result { + match proxy_mode { + ProxyMode::Best => { + self.try_send_best_consensus_head_connection( + allowed_lag, + authorization, + request, + request_metadata, + min_block_needed, + ) + .await + } + ProxyMode::Fastest(x) => todo!("Fastest"), + ProxyMode::Versus => todo!("Versus"), + } + } } impl fmt::Debug for Web3Connections { @@ -1088,7 +1114,7 @@ mod tests { // best_synced_backend_connection requires servers to be synced with the head block // TODO: don't hard code allowed_lag let x = conns - .best_synced_backend_connection(60, &authorization, None, &[], None) + .best_consensus_head_connection(60, &authorization, None, &[], None) .await .unwrap(); @@ -1143,21 +1169,21 @@ mod tests { assert!(matches!( conns - .best_synced_backend_connection(60, &authorization, None, &[], None) + .best_consensus_head_connection(60, &authorization, None, &[], None) .await, Ok(OpenRequestResult::Handle(_)) )); assert!(matches!( conns - .best_synced_backend_connection(60, &authorization, None, &[], Some(&0.into())) + .best_consensus_head_connection(60, &authorization, None, &[], Some(&0.into())) .await, Ok(OpenRequestResult::Handle(_)) )); assert!(matches!( conns - .best_synced_backend_connection(60, &authorization, None, &[], Some(&1.into())) + .best_consensus_head_connection(60, &authorization, None, &[], Some(&1.into())) .await, Ok(OpenRequestResult::Handle(_)) )); @@ -1165,7 +1191,7 @@ mod tests { // future block should not get a handle assert!(matches!( conns - .best_synced_backend_connection(60, &authorization, None, &[], Some(&2.into())) + .best_consensus_head_connection(60, &authorization, None, &[], Some(&2.into())) .await, Ok(OpenRequestResult::NotReady) )); @@ -1298,7 +1324,7 @@ mod tests { // best_synced_backend_connection requires servers to be synced with the head block let best_head_server = conns - .best_synced_backend_connection( + .best_consensus_head_connection( 60, &authorization, None, @@ -1313,7 +1339,7 @@ mod tests { )); let best_archive_server = conns - .best_synced_backend_connection(60, &authorization, None, &[], Some(&1.into())) + .best_consensus_head_connection(60, &authorization, None, &[], Some(&1.into())) .await; match best_archive_server { diff --git a/web3_proxy/src/rpcs/request.rs b/web3_proxy/src/rpcs/request.rs index 7358982c..7db16fd5 100644 --- a/web3_proxy/src/rpcs/request.rs +++ b/web3_proxy/src/rpcs/request.rs @@ -42,7 +42,7 @@ pub struct OpenRequestHandle { } /// Depending on the context, RPC errors can require different handling. -pub enum RequestErrorHandler { +pub enum RequestRevertHandler { /// Log at the trace level. Use when errors are expected. TraceLevel, /// Log at the debug level. Use when errors are expected. @@ -52,7 +52,7 @@ pub enum RequestErrorHandler { /// Log at the warn level. Use when errors do not cause problems. WarnLevel, /// Potentially save the revert. Users can tune how often this happens - SaveReverts, + Save, } // TODO: second param could be skipped since we don't need it here @@ -65,13 +65,13 @@ struct EthCallFirstParams { data: Option, } -impl From for RequestErrorHandler { +impl From for RequestRevertHandler { fn from(level: Level) -> Self { match level { - Level::Trace => RequestErrorHandler::TraceLevel, - Level::Debug => RequestErrorHandler::DebugLevel, - Level::Error => RequestErrorHandler::ErrorLevel, - Level::Warn => RequestErrorHandler::WarnLevel, + Level::Trace => RequestRevertHandler::TraceLevel, + Level::Debug => RequestRevertHandler::DebugLevel, + Level::Error => RequestRevertHandler::ErrorLevel, + Level::Warn => RequestRevertHandler::WarnLevel, _ => unimplemented!("unexpected tracing Level"), } } @@ -213,7 +213,7 @@ impl OpenRequestHandle { &self, method: &str, params: &P, - error_handler: RequestErrorHandler, + revert_handler: RequestRevertHandler, ) -> Result where // TODO: not sure about this type. would be better to not need clones, but measure and spawns combine to need it @@ -252,36 +252,36 @@ impl OpenRequestHandle { if let Err(err) = &response { // only save reverts for some types of calls // TODO: do something special for eth_sendRawTransaction too - let error_handler = if let RequestErrorHandler::SaveReverts = error_handler { + let revert_handler = if let RequestRevertHandler::Save = revert_handler { // TODO: should all these be Trace or Debug or a mix? if !["eth_call", "eth_estimateGas"].contains(&method) { // trace!(%method, "skipping save on revert"); - RequestErrorHandler::TraceLevel + RequestRevertHandler::TraceLevel } else if self.authorization.db_conn.is_some() { let log_revert_chance = self.authorization.checks.log_revert_chance; if log_revert_chance == 0.0 { // trace!(%method, "no chance. skipping save on revert"); - RequestErrorHandler::TraceLevel + RequestRevertHandler::TraceLevel } else if log_revert_chance == 1.0 { // trace!(%method, "gaurenteed chance. SAVING on revert"); - error_handler + revert_handler } else if thread_fast_rng::thread_fast_rng().gen_range(0.0f64..=1.0) < log_revert_chance { // trace!(%method, "missed chance. skipping save on revert"); - RequestErrorHandler::TraceLevel + RequestRevertHandler::TraceLevel } else { // trace!("Saving on revert"); // TODO: is always logging at debug level fine? - error_handler + revert_handler } } else { // trace!(%method, "no database. skipping save on revert"); - RequestErrorHandler::TraceLevel + RequestRevertHandler::TraceLevel } } else { - error_handler + revert_handler }; // check for "execution reverted" here @@ -323,8 +323,8 @@ impl OpenRequestHandle { } // TODO: think more about the method and param logs. those can be sensitive information - match error_handler { - RequestErrorHandler::DebugLevel => { + match revert_handler { + RequestRevertHandler::DebugLevel => { // TODO: think about this revert check more. sometimes we might want reverts logged so this needs a flag if !is_revert { debug!( @@ -333,7 +333,7 @@ impl OpenRequestHandle { ); } } - RequestErrorHandler::TraceLevel => { + RequestRevertHandler::TraceLevel => { trace!( "bad response from {}! method={} params={:?} err={:?}", self.conn, @@ -342,21 +342,21 @@ impl OpenRequestHandle { err ); } - RequestErrorHandler::ErrorLevel => { + RequestRevertHandler::ErrorLevel => { // TODO: include params if not running in release mode error!( "bad response from {}! method={} err={:?}", self.conn, method, err ); } - RequestErrorHandler::WarnLevel => { + RequestRevertHandler::WarnLevel => { // TODO: include params if not running in release mode warn!( "bad response from {}! method={} err={:?}", self.conn, method, err ); } - RequestErrorHandler::SaveReverts => { + RequestRevertHandler::Save => { trace!( "bad response from {}! method={} params={:?} err={:?}", self.conn,