From 3098791ad91019dc2706a4f4bb9e3966afc0725f Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Fri, 3 Mar 2023 01:39:50 +0000 Subject: [PATCH] add optional kafka feature --- Cargo.lock | 93 +++++++++++++++ Dockerfile | 3 + TODO.md | 1 + web3_proxy/Cargo.toml | 2 + web3_proxy/src/app/mod.rs | 132 +++++++++++++++++++--- web3_proxy/src/config.rs | 6 + web3_proxy/src/frontend/authorization.rs | 34 +++++- web3_proxy/src/frontend/errors.rs | 20 +++- web3_proxy/src/frontend/mod.rs | 24 +++- web3_proxy/src/frontend/rpc_proxy_http.rs | 30 ++++- web3_proxy/src/frontend/rpc_proxy_ws.rs | 58 +++++++--- web3_proxy/src/rpcs/many.rs | 5 +- 12 files changed, 356 insertions(+), 52 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8a91a56e..2e67ec28 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2708,6 +2708,18 @@ version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "348108ab3fba42ec82ff6e9564fc4ca0247bdccdc68dd8af9764bbc79c3c8ffb" +[[package]] +name = "libz-sys" +version = "1.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9702761c3935f8cc2f101793272e202c72b99da8f4224a19ddcf1279a6450bbf" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "link-cplusplus" version = "1.0.8" @@ -2997,6 +3009,27 @@ dependencies = [ "libc", ] +[[package]] +name = "num_enum" +version = "0.5.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f646caf906c20226733ed5b1374287eb97e3c2a5c227ce668c1f2ce20ae57c9" +dependencies = [ + "num_enum_derive", +] + +[[package]] +name = "num_enum_derive" +version = "0.5.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dcbff9bc912032c62bf65ef1d5aea88983b420f4f839db1e9b0c281a25c9c799" +dependencies = [ + "proc-macro-crate 1.3.0", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "number_prefix" version = "0.4.0" @@ -3677,6 +3710,36 @@ dependencies = [ "num_cpus", ] +[[package]] +name = "rdkafka" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd7c5d6d17442bcb9f943aae96d67d98c6d36af60442dd5da62aaa7fcbb25c48" +dependencies = [ + "futures-channel", + "futures-util", + "libc", + "log", + "rdkafka-sys", + "serde", + "serde_derive", + "serde_json", + "slab", + "tokio", +] + +[[package]] +name = "rdkafka-sys" +version = "4.3.0+1.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d222a401698c7f2010e3967353eae566d9934dcda49c29910da922414ab4e3f4" +dependencies = [ + "libc", + "libz-sys", + "num_enum", + "pkg-config", +] + [[package]] name = "redis" version = "0.22.3" @@ -3895,6 +3958,28 @@ dependencies = [ "syn", ] +[[package]] +name = "rmp" +version = "0.8.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44519172358fd6d58656c86ab8e7fbc9e1490c3e8f14d35ed78ca0dd07403c9f" +dependencies = [ + "byteorder", + "num-traits", + "paste", +] + +[[package]] +name = "rmp-serde" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c5b13be192e0220b8afb7222aa5813cb62cc269ebb5cac346ca6487681d2913e" +dependencies = [ + "byteorder", + "rmp", + "serde", +] + [[package]] name = "rsa" version = "0.6.1" @@ -5579,6 +5664,12 @@ dependencies = [ "serde", ] +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + [[package]] name = "version_check" version = "0.9.4" @@ -5755,9 +5846,11 @@ dependencies = [ "parking_lot 0.12.1", "prettytable", "proctitle", + "rdkafka", "redis-rate-limiter", "regex", "reqwest", + "rmp-serde", "rustc-hash", "sentry", "serde", diff --git a/Dockerfile b/Dockerfile index cc9baf00..bb2800c1 100644 --- a/Dockerfile +++ b/Dockerfile @@ -17,6 +17,8 @@ RUN --mount=type=cache,target=/usr/local/cargo/registry \ ENV PATH /root/.foundry/bin:$PATH RUN curl -L https://foundry.paradigm.xyz | bash && foundryup +RUN apt-get update && apt-get install librdkafka && rm -rf /var/lib/apt/lists/* + # copy the application COPY . . @@ -32,6 +34,7 @@ RUN --mount=type=cache,target=/usr/local/cargo/registry \ cargo install \ --locked \ --no-default-features \ + --features rdkafka \ --profile faster_release \ --root /opt/bin \ --path ./web3_proxy diff --git a/TODO.md b/TODO.md index e3b3bab9..67730a38 100644 --- a/TODO.md +++ b/TODO.md @@ -393,6 +393,7 @@ These are not yet ordered. There might be duplicates. We might not actually need - [ ] rename "concurrent" requests to "parallel" requests - [ ] minimum allowed query_start on query_user_stats - [ ] setting request limits to None is broken. it does maxu64 and then internal deferred rate limiter counts try to *99/100 +- [ ] if kafka fails to connect at the start, automatically reconnect - [ ] during shutdown, mark the proxy unhealthy and send unsubscribe responses for any open websocket subscriptions - [ ] some chains still use total_difficulty. have total_difficulty be used only if the chain needs it - if total difficulty is not on the block and we aren't on ETH, fetch the full block instead of just the header diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index df420e3c..46fef651 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -56,8 +56,10 @@ pagerduty-rs = { version = "0.1.6", default-features = false, features = ["async parking_lot = { version = "0.12.1", features = ["arc_lock"] } prettytable = "*" proctitle = "0.1.1" +rdkafka = { version = "0.29.0", optional = true } regex = "1.7.1" reqwest = { version = "0.11.14", default-features = false, features = ["json", "tokio-rustls"] } +rmp-serde = "1.1.1" rustc-hash = "1.1.0" sentry = { version = "0.30.0", default-features = false, features = ["backtrace", "contexts", "panic", "anyhow", "reqwest", "rustls", "log", "sentry-log"] } serde = { version = "1.0.152", features = [] } diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 31700b68..0e08ad32 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -37,6 +37,10 @@ use migration::sea_orm::{ use migration::sea_query::table::ColumnDef; use migration::{Alias, DbErr, Migrator, MigratorTrait, Table}; use moka::future::Cache; +#[cfg(rdkafka)] +use rdkafka::message::{Header, OwnedHeaders}; +#[cfg(rdkafka)] +use rdkafka::producer::FutureRecord; use redis_rate_limiter::redis::AsyncCommands; use redis_rate_limiter::{redis, DeadpoolRuntime, RedisConfig, RedisPool, RedisRateLimiter}; use serde::Serialize; @@ -179,6 +183,7 @@ pub struct AuthorizationChecks { pub log_revert_chance: f64, /// if true, transactions are broadcast to private mempools. They will still be public on the blockchain! pub private_txs: bool, + pub proxy_mode: ProxyMode, } /// Simple wrapper so that we can keep track of read only connections. @@ -225,6 +230,11 @@ pub struct Web3ProxyApp { pub bearer_token_semaphores: Cache, hashbrown::hash_map::DefaultHashBuilder>, pub stat_sender: Option>, + + #[cfg(rdkafka)] + pub kafka_producer: Option, + #[cfg(not(rdkafka))] + pub kafka_producer: Option<()>, } /// flatten a JoinError into an anyhow error @@ -457,6 +467,29 @@ impl Web3ProxyApp { warn!("no database. some features will be disabled"); }; + // connect to kafka for logging requests from the /debug/ urls + + #[cfg(rdkafka)] + let mut kafka_producer: Option = None; + #[cfg(not(rdkafka))] + let kafka_producer: Option<()> = None; + + #[cfg(rdkafka)] + if let Some(kafka_brokers) = top_config.app.kafka_urls.clone() { + match rdkafka::ClientConfig::new() + .set("bootstrap.servers", kafka_brokers) + .set("message.timeout.ms", "5000") + .create() + { + Ok(k) => kafka_producer = Some(k), + Err(err) => error!("Failed connecting to kafka. This will not retry. {:?}", err), + } + } + #[cfg(not(rdkafka))] + if top_config.app.kafka_urls.is_some() { + warn!("rdkafka rust feature is not enabled!"); + } + // TODO: do this during apply_config so that we can change redis url while running // create a connection pool for redis // a failure to connect does NOT block the application from starting @@ -680,6 +713,7 @@ impl Web3ProxyApp { config: top_config.app.clone(), balanced_rpcs, http_client, + kafka_producer, private_rpcs, response_cache, watch_consensus_head_receiver, @@ -943,7 +977,6 @@ impl Web3ProxyApp { self: &Arc, authorization: Arc, request: JsonRpcRequestEnum, - proxy_mode: ProxyMode, ) -> Result<(JsonRpcForwardedResponseEnum, Vec>), FrontendErrorResponse> { // trace!(?request, "proxy_web3_rpc"); @@ -956,7 +989,7 @@ impl Web3ProxyApp { JsonRpcRequestEnum::Single(request) => { let (response, rpcs) = timeout( max_time, - self.proxy_cached_request(&authorization, request, proxy_mode, None), + self.proxy_cached_request(&authorization, request, None), ) .await??; @@ -965,7 +998,7 @@ impl Web3ProxyApp { JsonRpcRequestEnum::Batch(requests) => { let (responses, rpcs) = timeout( max_time, - self.proxy_web3_rpc_requests(&authorization, requests, proxy_mode), + self.proxy_web3_rpc_requests(&authorization, requests), ) .await??; @@ -982,7 +1015,6 @@ impl Web3ProxyApp { self: &Arc, authorization: &Arc, requests: Vec, - proxy_mode: ProxyMode, ) -> Result<(Vec, Vec>), FrontendErrorResponse> { // TODO: we should probably change ethers-rs to support this directly. they pushed this off to v2 though let num_requests = requests.len(); @@ -1002,12 +1034,7 @@ impl Web3ProxyApp { requests .into_iter() .map(|request| { - self.proxy_cached_request( - authorization, - request, - proxy_mode, - Some(head_block_num), - ) + self.proxy_cached_request(authorization, request, Some(head_block_num)) }) .collect::>(), ) @@ -1062,13 +1089,62 @@ impl Web3ProxyApp { self: &Arc, authorization: &Arc, mut request: JsonRpcRequest, - proxy_mode: ProxyMode, head_block_num: Option, ) -> Result<(JsonRpcForwardedResponse, Vec>), FrontendErrorResponse> { // trace!("Received request: {:?}", request); let request_metadata = Arc::new(RequestMetadata::new(REQUEST_PERIOD, request.num_bytes())?); + #[cfg(rdkafka)] + let mut kafka_stuff = None; + + #[cfg(rdkafka)] + if let Some(kafka_producer) = self.kafka_producer.clone() { + let request_bytes = rmp_serde::to_vec(&request)?; + + let rpc_secret_key_id = authorization + .checks + .rpc_secret_key_id + .map(|x| x.get()) + .unwrap_or_default(); + + let kafka_key = rmp_serde::to_vec(&rpc_secret_key_id) + .context("failed serializing kafka key") + .unwrap(); + + let request_hash = Some(keccak256(&request_bytes)); + + // the third item is added with the response + let kafka_headers = OwnedHeaders::new_with_capacity(3) + .insert(Header { + key: "request_hash", + value: request_hash.as_ref(), + }) + .insert(Header { + key: "head_block_num", + value: head_block_num.map(|x| x.to_string()).as_ref(), + }); + + // save the key and headers for when we log the response + kafka_stuff = Some((kafka_key.clone(), kafka_headers.clone())); + + let f = async move { + let produce_future = kafka_producer.send( + FutureRecord::to("proxy_rpc_request") + .key(&kafka_key) + .payload(&request_bytes) + .headers(kafka_headers), + Duration::from_secs(0), + ); + + if let Err((err, msg)) = produce_future.await { + error!("produce kafka request log: {}. {:#?}", err, msg); + } + }; + + tokio::spawn(f); + } + // save the id so we can attach it to the response // TODO: instead of cloning, take the id out? let request_id = request.id.clone(); @@ -1206,7 +1282,6 @@ impl Web3ProxyApp { let mut response = self .balanced_rpcs .try_proxy_connection( - proxy_mode, authorization, request, Some(&request_metadata), @@ -1253,9 +1328,9 @@ impl Web3ProxyApp { // broadcast transactions to all private rpcs at once "eth_sendRawTransaction" => { // TODO: how should we handle private_mode here? - let default_num = match proxy_mode { + let default_num = match authorization.checks.proxy_mode { // TODO: how many balanced rpcs should we send to? configurable? percentage of total? - ProxyMode::Best => Some(4), + ProxyMode::Best | ProxyMode::Debug => Some(4), ProxyMode::Fastest(0) => None, // TODO: how many balanced rpcs should we send to? configurable? percentage of total? // TODO: what if we do 2 per tier? we want to blast the third party rpcs @@ -1595,7 +1670,6 @@ impl Web3ProxyApp { let mut response = self .balanced_rpcs .try_proxy_connection( - proxy_mode, &authorization, request, Some(&request_metadata), @@ -1624,7 +1698,6 @@ impl Web3ProxyApp { } else { self.balanced_rpcs .try_proxy_connection( - proxy_mode, &authorization, request, Some(&request_metadata), @@ -1679,6 +1752,33 @@ impl Web3ProxyApp { .context("stat_sender sending response stat")?; } + #[cfg(rdkafka)] + if let Some((kafka_key, kafka_headers)) = kafka_stuff { + let kafka_producer = self + .kafka_producer + .clone() + .expect("if headers are set, producer must exist"); + + let response_bytes = + rmp_serde::to_vec(&response).context("failed msgpack serialize response")?; + + let f = async move { + let produce_future = kafka_producer.send( + FutureRecord::to("proxy_rpc_response") + .key(&kafka_key) + .payload(&response_bytes) + .headers(kafka_headers), + Duration::from_secs(0), + ); + + if let Err((err, msg)) = produce_future.await { + error!("produce kafka request log: {}. {:#?}", err, msg); + } + }; + + tokio::spawn(f); + } + Ok((response, rpcs)) } } diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index 14bfd81c..5501091c 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -103,6 +103,12 @@ pub struct AppConfig { /// Restrict user registration. /// None = no code needed pub invite_code: Option, + + /// Optional kafka brokers + /// Used by /debug/:rpc_key urls for logging requests and responses. No other endpoints log request/response data. + pub kafka_urls: Option, + + /// domain in sign-in-with-ethereum messages pub login_domain: Option, /// do not serve any requests if the best known block is older than this many seconds. diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index 6d014ebd..b0890878 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -1,6 +1,7 @@ //! Utilities for authorization of logged in and anonymous users. use super::errors::FrontendErrorResponse; +use super::rpc_proxy_ws::ProxyMode; use crate::app::{AuthorizationChecks, Web3ProxyApp, APP_USER_AGENT}; use crate::rpcs::one::Web3Rpc; use crate::user_token::UserBearerToken; @@ -205,6 +206,7 @@ impl Authorization { db_conn: Option, ip: IpAddr, origin: Option, + proxy_mode: ProxyMode, referer: Option, user_agent: Option, ) -> anyhow::Result { @@ -221,6 +223,7 @@ impl Authorization { // TODO: default or None? let authorization_checks = AuthorizationChecks { max_requests_per_period, + proxy_mode, ..Default::default() }; @@ -235,6 +238,7 @@ impl Authorization { ) } + #[allow(clippy::too_many_arguments)] pub fn try_new( authorization_checks: AuthorizationChecks, db_conn: Option, @@ -311,7 +315,7 @@ pub async fn login_is_authorized( app: &Web3ProxyApp, ip: IpAddr, ) -> Result { - let authorization = match app.rate_limit_login(ip).await? { + let authorization = match app.rate_limit_login(ip, ProxyMode::Best).await? { RateLimitResult::Allowed(authorization, None) => authorization, RateLimitResult::RateLimited(authorization, retry_at) => { return Err(FrontendErrorResponse::RateLimited(authorization, retry_at)); @@ -328,11 +332,17 @@ pub async fn ip_is_authorized( app: &Arc, ip: IpAddr, origin: Option, + proxy_mode: ProxyMode, ) -> Result<(Authorization, Option), FrontendErrorResponse> { // TODO: i think we could write an `impl From` for this // TODO: move this to an AuthorizedUser extrator let (authorization, semaphore) = match app - .rate_limit_by_ip(&app.config.allowed_origin_requests_per_period, ip, origin) + .rate_limit_by_ip( + &app.config.allowed_origin_requests_per_period, + ip, + origin, + proxy_mode, + ) .await? { RateLimitResult::Allowed(authorization, semaphore) => (authorization, semaphore), @@ -388,13 +398,14 @@ pub async fn key_is_authorized( rpc_key: RpcSecretKey, ip: IpAddr, origin: Option, + proxy_mode: ProxyMode, referer: Option, user_agent: Option, ) -> Result<(Authorization, Option), FrontendErrorResponse> { // check the rate limits. error if over the limit // TODO: i think this should be in an "impl From" or "impl Into" let (authorization, semaphore) = match app - .rate_limit_by_rpc_key(ip, origin, referer, rpc_key, user_agent) + .rate_limit_by_rpc_key(ip, origin, proxy_mode, referer, rpc_key, user_agent) .await? { RateLimitResult::Allowed(authorization, semaphore) => (authorization, semaphore), @@ -542,7 +553,11 @@ impl Web3ProxyApp { Ok((user, semaphore_permit)) } - pub async fn rate_limit_login(&self, ip: IpAddr) -> anyhow::Result { + pub async fn rate_limit_login( + &self, + ip: IpAddr, + proxy_mode: ProxyMode, + ) -> anyhow::Result { // TODO: dry this up with rate_limit_by_rpc_key? // we don't care about user agent or origin or referer @@ -551,6 +566,7 @@ impl Web3ProxyApp { self.db_conn(), ip, None, + proxy_mode, None, None, )?; @@ -597,6 +613,7 @@ impl Web3ProxyApp { allowed_origin_requests_per_period: &HashMap, ip: IpAddr, origin: Option, + proxy_mode: ProxyMode, ) -> anyhow::Result { // ip rate limits don't check referer or user agent // the do check @@ -605,6 +622,7 @@ impl Web3ProxyApp { self.db_conn.clone(), ip, origin, + proxy_mode, None, None, )?; @@ -653,6 +671,7 @@ impl Web3ProxyApp { // check the local cache for user data, or query the database pub(crate) async fn authorization_checks( &self, + proxy_mode: ProxyMode, rpc_secret_key: RpcSecretKey, ) -> anyhow::Result { let authorization_checks: Result<_, Arc> = self @@ -752,6 +771,7 @@ impl Web3ProxyApp { max_concurrent_requests: user_tier_model.max_concurrent_requests, max_requests_per_period: user_tier_model.max_requests_per_period, private_txs: rpc_key_model.private_txs, + proxy_mode, }) } None => Ok(AuthorizationChecks::default()), @@ -768,11 +788,12 @@ impl Web3ProxyApp { &self, ip: IpAddr, origin: Option, + proxy_mode: ProxyMode, referer: Option, rpc_key: RpcSecretKey, user_agent: Option, ) -> anyhow::Result { - let authorization_checks = self.authorization_checks(rpc_key).await?; + let authorization_checks = self.authorization_checks(proxy_mode, rpc_key).await?; // if no rpc_key_id matching the given rpc was found, then we can't rate limit by key if authorization_checks.rpc_secret_key_id.is_none() { @@ -859,12 +880,13 @@ impl Authorization { rpc_secret_key, self.ip, self.origin.clone(), + self.checks.proxy_mode, self.referer.clone(), self.user_agent.clone(), ) .await? } else { - ip_is_authorized(app, self.ip, self.origin.clone()).await? + ip_is_authorized(app, self.ip, self.origin.clone(), self.checks.proxy_mode).await? }; let a = Arc::new(a); diff --git a/web3_proxy/src/frontend/errors.rs b/web3_proxy/src/frontend/errors.rs index 162bf255..01c3cd13 100644 --- a/web3_proxy/src/frontend/errors.rs +++ b/web3_proxy/src/frontend/errors.rs @@ -28,11 +28,12 @@ pub enum FrontendErrorResponse { BadRequest(String), SemaphoreAcquireError(AcquireError), Database(DbErr), - HeadersError(headers::Error), + Headers(headers::Error), HeaderToString(ToStrError), InvalidHeaderValue(InvalidHeaderValue), IpAddrParse(AddrParseError), JoinError(JoinError), + MsgPackEncode(rmp_serde::encode::Error), NotFound, RateLimited(Authorization, Option), Redis(RedisError), @@ -40,7 +41,7 @@ pub enum FrontendErrorResponse { StatusCode(StatusCode, String, Option), /// TODO: what should be attached to the timout? Timeout(tokio::time::error::Elapsed), - UlidDecodeError(ulid::DecodeError), + UlidDecode(ulid::DecodeError), UnknownKey, } @@ -94,7 +95,7 @@ impl FrontendErrorResponse { ), ) } - Self::HeadersError(err) => { + Self::Headers(err) => { warn!("HeadersError {:?}", err); ( StatusCode::BAD_REQUEST, @@ -146,6 +147,17 @@ impl FrontendErrorResponse { ), ) } + Self::MsgPackEncode(err) => { + debug!("MsgPackEncode Error: {}", err); + ( + StatusCode::BAD_REQUEST, + JsonRpcForwardedResponse::from_str( + &format!("msgpack encode error: {}", err), + Some(StatusCode::BAD_REQUEST.as_u16().into()), + None, + ), + ) + } Self::NotFound => { // TODO: emit a stat? // TODO: instead of an error, show a normal html page for 404 @@ -247,7 +259,7 @@ impl FrontendErrorResponse { ), ) } - Self::UlidDecodeError(err) => { + Self::UlidDecode(err) => { // trace!(?err, "UlidDecodeError"); ( StatusCode::BAD_REQUEST, diff --git a/web3_proxy/src/frontend/mod.rs b/web3_proxy/src/frontend/mod.rs index 1d9f5981..bfa7256d 100644 --- a/web3_proxy/src/frontend/mod.rs +++ b/web3_proxy/src/frontend/mod.rs @@ -67,6 +67,15 @@ pub async fn serve(port: u16, proxy_app: Arc) -> anyhow::Result<() "/rpc/:rpc_key", post(rpc_proxy_http::proxy_web3_rpc_with_key), ) + // authenticated debug route with and without trailing slash + .route( + "/debug/:rpc_key/", + post(rpc_proxy_http::debug_proxy_web3_rpc_with_key), + ) + .route( + "/debug/:rpc_key", + post(rpc_proxy_http::debug_proxy_web3_rpc_with_key), + ) // public fastest with and without trailing slash .route("/fastest/", post(rpc_proxy_http::fastest_proxy_web3_rpc)) .route("/fastest", post(rpc_proxy_http::fastest_proxy_web3_rpc)) @@ -106,7 +115,15 @@ pub async fn serve(port: u16, proxy_app: Arc) -> anyhow::Result<() "/rpc/:rpc_key", get(rpc_proxy_ws::websocket_handler_with_key), ) - // public fastest with and without trailing slash + // debug with and without trailing slash + .route( + "/debug/:rpc_key/", + get(rpc_proxy_ws::websocket_handler_with_key), + ) + .route( + "/debug/:rpc_key", + get(rpc_proxy_ws::websocket_handler_with_key), + ) // public fastest with and without trailing slash .route("/fastest/", get(rpc_proxy_ws::fastest_websocket_handler)) .route("/fastest", get(rpc_proxy_ws::fastest_websocket_handler)) // authenticated fastest with and without trailing slash @@ -169,7 +186,10 @@ pub async fn serve(port: u16, proxy_app: Arc) -> anyhow::Result<() .route("/user/stats/detailed", get(users::user_stats_detailed_get)) .route("/user/logout", post(users::user_logout_post)) .route("/admin/modify_role", get(admin::admin_change_user_roles)) - .route("/admin/imitate-login/:admin_address/:user_address", get(admin::admin_login_get)) + .route( + "/admin/imitate-login/:admin_address/:user_address", + get(admin::admin_login_get), + ) .route( "/admin/imitate-login/:admin_address/:user_address/:message_eip", get(admin::admin_login_get), diff --git a/web3_proxy/src/frontend/rpc_proxy_http.rs b/web3_proxy/src/frontend/rpc_proxy_http.rs index 781df406..1ddd93f5 100644 --- a/web3_proxy/src/frontend/rpc_proxy_http.rs +++ b/web3_proxy/src/frontend/rpc_proxy_http.rs @@ -59,12 +59,12 @@ async fn _proxy_web3_rpc( // TODO: do we care about keeping the TypedHeader wrapper? let origin = origin.map(|x| x.0); - let (authorization, semaphore) = ip_is_authorized(&app, ip, origin).await?; + let (authorization, semaphore) = ip_is_authorized(&app, ip, origin, proxy_mode).await?; let authorization = Arc::new(authorization); let (response, rpcs, _semaphore) = app - .proxy_web3_rpc(authorization, payload, proxy_mode) + .proxy_web3_rpc(authorization, payload) .await .map(|(x, y)| (x, y, semaphore))?; @@ -138,6 +138,29 @@ pub async fn proxy_web3_rpc_with_key( .await } +#[debug_handler] +pub async fn debug_proxy_web3_rpc_with_key( + Extension(app): Extension>, + ip: InsecureClientIp, + origin: Option>, + referer: Option>, + user_agent: Option>, + Path(rpc_key): Path, + Json(payload): Json, +) -> FrontendResult { + _proxy_web3_rpc_with_key( + app, + ip, + origin, + referer, + user_agent, + rpc_key, + payload, + ProxyMode::Debug, + ) + .await +} + #[debug_handler] pub async fn fastest_proxy_web3_rpc_with_key( Extension(app): Extension>, @@ -204,6 +227,7 @@ async fn _proxy_web3_rpc_with_key( rpc_key, ip, origin.map(|x| x.0), + proxy_mode, referer.map(|x| x.0), user_agent.map(|x| x.0), ) @@ -214,7 +238,7 @@ async fn _proxy_web3_rpc_with_key( let rpc_secret_key_id = authorization.checks.rpc_secret_key_id; let (response, rpcs, _semaphore) = app - .proxy_web3_rpc(authorization, payload, proxy_mode) + .proxy_web3_rpc(authorization, payload) .await .map(|(x, y)| (x, y, semaphore))?; diff --git a/web3_proxy/src/frontend/rpc_proxy_ws.rs b/web3_proxy/src/frontend/rpc_proxy_ws.rs index 4de01bce..2676f9e5 100644 --- a/web3_proxy/src/frontend/rpc_proxy_ws.rs +++ b/web3_proxy/src/frontend/rpc_proxy_ws.rs @@ -34,7 +34,7 @@ use std::sync::Arc; use std::{str::from_utf8_mut, sync::atomic::AtomicUsize}; use tokio::sync::{broadcast, OwnedSemaphorePermit, RwLock}; -#[derive(Copy, Clone)] +#[derive(Copy, Clone, Debug)] pub enum ProxyMode { /// send to the "best" synced server Best, @@ -42,6 +42,14 @@ pub enum ProxyMode { Fastest(usize), /// send to all servers for benchmarking. return the fastest non-error response Versus, + /// send all requests and responses to kafka + Debug, +} + +impl Default for ProxyMode { + fn default() -> Self { + Self::Best + } } /// Public entrypoint for WebSocket JSON-RPC requests. @@ -92,13 +100,13 @@ async fn _websocket_handler( ) -> FrontendResult { let origin = origin.map(|x| x.0); - let (authorization, _semaphore) = ip_is_authorized(&app, ip, origin).await?; + let (authorization, _semaphore) = ip_is_authorized(&app, ip, origin, proxy_mode).await?; let authorization = Arc::new(authorization); match ws_upgrade { Some(ws) => Ok(ws - .on_upgrade(move |socket| proxy_web3_socket(app, authorization, socket, proxy_mode)) + .on_upgrade(move |socket| proxy_web3_socket(app, authorization, socket)) .into_response()), None => { if let Some(redirect) = &app.config.redirect_public_url { @@ -141,6 +149,29 @@ pub async fn websocket_handler_with_key( .await } +#[debug_handler] +pub async fn debug_websocket_handler_with_key( + Extension(app): Extension>, + ip: InsecureClientIp, + Path(rpc_key): Path, + origin: Option>, + referer: Option>, + user_agent: Option>, + ws_upgrade: Option, +) -> FrontendResult { + _websocket_handler_with_key( + ProxyMode::Debug, + app, + ip, + rpc_key, + origin, + referer, + user_agent, + ws_upgrade, + ) + .await +} + #[debug_handler] pub async fn fastest_websocket_handler_with_key( Extension(app): Extension>, @@ -206,6 +237,7 @@ async fn _websocket_handler_with_key( rpc_key, ip, origin.map(|x| x.0), + proxy_mode, referer.map(|x| x.0), user_agent.map(|x| x.0), ) @@ -216,8 +248,9 @@ async fn _websocket_handler_with_key( let authorization = Arc::new(authorization); match ws_upgrade { - Some(ws_upgrade) => Ok(ws_upgrade - .on_upgrade(move |socket| proxy_web3_socket(app, authorization, socket, proxy_mode))), + Some(ws_upgrade) => { + Ok(ws_upgrade.on_upgrade(move |socket| proxy_web3_socket(app, authorization, socket))) + } None => { // if no websocket upgrade, this is probably a user loading the url with their browser @@ -273,7 +306,6 @@ async fn proxy_web3_socket( app: Arc, authorization: Arc, socket: WebSocket, - proxy_mode: ProxyMode, ) { // split the websocket so we can read and write concurrently let (ws_tx, ws_rx) = socket.split(); @@ -282,13 +314,7 @@ async fn proxy_web3_socket( let (response_sender, response_receiver) = flume::unbounded::(); tokio::spawn(write_web3_socket(response_receiver, ws_tx)); - tokio::spawn(read_web3_socket( - app, - authorization, - ws_rx, - response_sender, - proxy_mode, - )); + tokio::spawn(read_web3_socket(app, authorization, ws_rx, response_sender)); } /// websockets support a few more methods than http clients @@ -299,7 +325,6 @@ async fn handle_socket_payload( response_sender: &flume::Sender, subscription_count: &AtomicUsize, subscriptions: Arc>>, - proxy_mode: ProxyMode, ) -> (Message, Option) { let (authorization, semaphore) = match authorization.check_again(&app).await { Ok((a, s)) => (a, s), @@ -392,7 +417,7 @@ async fn handle_socket_payload( Ok(response.into()) } _ => app - .proxy_web3_rpc(authorization.clone(), json_request.into(), proxy_mode) + .proxy_web3_rpc(authorization.clone(), json_request.into()) .await .map_or_else( |err| match err { @@ -434,7 +459,6 @@ async fn read_web3_socket( authorization: Arc, mut ws_rx: SplitStream, response_sender: flume::Sender, - proxy_mode: ProxyMode, ) { // TODO: need a concurrent hashmap let subscriptions = Arc::new(RwLock::new(HashMap::new())); @@ -468,7 +492,6 @@ async fn read_web3_socket( &response_sender, &subscription_count, subscriptions, - proxy_mode, ) .await; @@ -500,7 +523,6 @@ async fn read_web3_socket( &response_sender, &subscription_count, subscriptions, - proxy_mode, ) .await; diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index ea95ad63..64706a1b 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -1136,15 +1136,14 @@ impl Web3Rpcs { pub async fn try_proxy_connection( &self, - proxy_mode: ProxyMode, authorization: &Arc, request: JsonRpcRequest, request_metadata: Option<&Arc>, min_block_needed: Option<&U64>, max_block_needed: Option<&U64>, ) -> anyhow::Result { - match proxy_mode { - ProxyMode::Best => { + match authorization.checks.proxy_mode { + ProxyMode::Debug | ProxyMode::Best => { self.try_send_best_consensus_head_connection( authorization, request,