From 515d0af751f8a38139f0014633e09856c890790a Mon Sep 17 00:00:00 2001 From: Rory Neithinger Date: Tue, 7 Nov 2023 00:57:30 -0800 Subject: [PATCH 1/2] forward request id for eth_sendRawTransaction currently looking into a single test case failure which may be due to this change. --- Cargo.lock | 2 + web3_proxy/Cargo.toml | 2 + web3_proxy/src/app/mod.rs | 23 ++++++-- web3_proxy/src/app/ws.rs | 2 + web3_proxy/src/frontend/mod.rs | 7 ++- web3_proxy/src/frontend/request_id.rs | 52 +++++++++++++++++++ web3_proxy/src/frontend/rpc_proxy_http.rs | 48 +++++++++++++++-- web3_proxy/src/frontend/rpc_proxy_ws.rs | 4 +- web3_proxy/src/jsonrpc/request.rs | 1 + web3_proxy/src/jsonrpc/request_builder.rs | 18 ++++++- web3_proxy/src/rpcs/request.rs | 12 ++++- .../src/sub_commands/migrate_stats_to_v2.rs | 1 + 12 files changed, 157 insertions(+), 15 deletions(-) create mode 100644 web3_proxy/src/frontend/request_id.rs diff --git a/Cargo.lock b/Cargo.lock index 4251482a..f5fc5907 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6692,6 +6692,8 @@ dependencies = [ "tokio-stream", "toml 0.8.6", "tower-http", + "tower-layer", + "tower-service", "tracing", "tracing-subscriber", "ulid", diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index 3300a192..4f3934e3 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -88,6 +88,8 @@ tokio = { version = "1.33.0", features = ["full", "tracing"] } tokio-stream = { version = "0.1.14", features = ["sync"] } toml = "0.8.6" tower-http = { version = "0.4.4", features = ["cors", "normalize-path", "sensitive-headers", "trace"] } +tower-layer = "0.3.2" +tower-service = "0.3.2" tracing = "0.1" ulid = { version = "1.1.0", features = ["rand", "uuid", "serde"] } url = { version = "2.4.1" } diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 29aed76f..24b34c72 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -957,7 +957,8 @@ impl App { ) -> Web3ProxyResult { let authorization = Arc::new(Authorization::internal()?); - self.authorized_request(method, params, authorization).await + self.authorized_request(method, params, authorization, None) + .await } /// this is way more round-a-bout than we want, but it means stats are emitted and caches are used @@ -966,12 +967,15 @@ impl App { method: &str, params: P, authorization: Arc, + request_id: Option, ) -> Web3ProxyResult { // TODO: proper ids let request = SingleRequest::new(LooseId::Number(1), method.to_string().into(), json!(params))?; - let (_, response, _) = self.proxy_request(request, authorization, None).await; + let (_, response, _) = self + .proxy_request(request, authorization, None, request_id) + .await; // TODO: error handling? match response.parsed().await?.payload { @@ -987,20 +991,21 @@ impl App { self: &Arc, authorization: Arc, request: JsonRpcRequestEnum, + request_id: Option, ) -> Web3ProxyResult<(StatusCode, jsonrpc::Response, Vec>)> { // trace!(?request, "proxy_web3_rpc"); let response = match request { JsonRpcRequestEnum::Single(request) => { let (status_code, response, rpcs) = self - .proxy_request(request, authorization.clone(), None) + .proxy_request(request, authorization.clone(), None, request_id) .await; (status_code, jsonrpc::Response::Single(response), rpcs) } JsonRpcRequestEnum::Batch(requests) => { let (responses, rpcs) = self - .proxy_web3_rpc_requests(&authorization, requests) + .proxy_web3_rpc_requests(&authorization, requests, request_id) .await?; // TODO: real status code. if an error happens, i don't think we are following the spec here @@ -1017,6 +1022,7 @@ impl App { self: &Arc, authorization: &Arc, requests: Vec, + request_id: Option, ) -> Web3ProxyResult<(Vec, Vec>)> { // TODO: we should probably change ethers-rs to support this directly. they pushed this off to v2 though let num_requests = requests.len(); @@ -1037,7 +1043,12 @@ impl App { requests .into_iter() .map(|request| { - self.proxy_request(request, authorization.clone(), Some(head_block.clone())) + self.proxy_request( + request, + authorization.clone(), + Some(head_block.clone()), + request_id.clone(), + ) }) .collect::>(), ) @@ -1237,6 +1248,7 @@ impl App { request: SingleRequest, authorization: Arc, head_block: Option, + request_id: Option, ) -> (StatusCode, jsonrpc::SingleResponse, Vec>) { // TODO: this clone is only for an error response. refactor to not need it let error_id = request.id.clone(); @@ -1261,6 +1273,7 @@ impl App { None, request.clone().into(), head_block.clone(), + request_id.clone(), ) .await { diff --git a/web3_proxy/src/app/ws.rs b/web3_proxy/src/app/ws.rs index 7aaef131..9ac39ff9 100644 --- a/web3_proxy/src/app/ws.rs +++ b/web3_proxy/src/app/ws.rs @@ -90,6 +90,7 @@ impl App { None, RequestOrMethod::Method("eth_subscribe(newHeads)".into(), 0), Some(new_head), + None, ) .await; @@ -181,6 +182,7 @@ impl App { 0, ), None, + None, ) .await { diff --git a/web3_proxy/src/frontend/mod.rs b/web3_proxy/src/frontend/mod.rs index 7b159f34..d41313c9 100644 --- a/web3_proxy/src/frontend/mod.rs +++ b/web3_proxy/src/frontend/mod.rs @@ -9,6 +9,7 @@ pub mod rpc_proxy_http; pub mod rpc_proxy_ws; pub mod status; pub mod users; +pub mod request_id; use crate::app::App; use crate::errors::Web3ProxyResult; @@ -16,6 +17,7 @@ use axum::{ routing::{get, post}, Extension, Router, }; +use request_id::RequestId; use http::{header::AUTHORIZATION, Request, StatusCode}; use hyper::Body; @@ -28,7 +30,6 @@ use tokio::{process::Command, sync::broadcast}; use tower_http::sensitive_headers::SetSensitiveRequestHeadersLayer; use tower_http::{cors::CorsLayer, normalize_path::NormalizePathLayer, trace::TraceLayer}; use tracing::{error, error_span, info, trace_span}; -use ulid::Ulid; #[cfg(feature = "listenfd")] use listenfd::ListenFd; @@ -278,12 +279,15 @@ pub async fn serve( // We get the request id from the header // If no header, a new Ulid is created // TODO: move this header name to config + /* let request_id = request .headers() .get("x-amzn-trace-id") .and_then(|x| x.to_str().ok()) .map(ToString::to_string) .unwrap_or_else(|| Ulid::new().to_string()); + */ + let request_id = &request.extensions().get::().unwrap().0; // And then we put it along with other information into the `request` span // TODO: what other info should we attach? how can we attach an error and a tracing span here? @@ -305,6 +309,7 @@ pub async fn serve( } }), // .on_failure(|| todo!("on failure that has the request and response body so we can debug more easily")), ) + .layer(request_id::RequestIdLayer) // 404 for any unknown routes .fallback(errors::handler_404); diff --git a/web3_proxy/src/frontend/request_id.rs b/web3_proxy/src/frontend/request_id.rs new file mode 100644 index 00000000..61982f70 --- /dev/null +++ b/web3_proxy/src/frontend/request_id.rs @@ -0,0 +1,52 @@ +use std::task::{Context, Poll}; + +use http::Request; +use tower_service::Service; +use ulid::Ulid; + +/// RequestId from x-amzn-trace-id header or new Ulid +#[derive(Clone, Debug)] +pub struct RequestId(pub String); + +/// Middleware layer for adding RequestId as an Extension +#[derive(Clone, Debug)] +pub struct RequestIdLayer; + +impl tower_layer::Layer for RequestIdLayer { + type Service = RequestIdService; + + fn layer(&self, inner: S) -> Self::Service { + RequestIdService { inner } + } +} + +/// Service used by RequestIdLayer to inject RequestId +#[derive(Clone, Debug)] +pub struct RequestIdService { + inner: S, +} + +impl Service> for RequestIdService +where + S: Service>, +{ + type Response = S::Response; + type Error = S::Error; + type Future = S::Future; + + #[inline] + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, mut req: Request) -> Self::Future { + let request_id = req + .headers() + .get("x-amzn-trace-id") + .and_then(|x| x.to_str().ok()) + .map(ToString::to_string) + .unwrap_or_else(|| Ulid::new().to_string()); + req.extensions_mut().insert(RequestId(request_id)); + self.inner.call(req) + } +} diff --git a/web3_proxy/src/frontend/rpc_proxy_http.rs b/web3_proxy/src/frontend/rpc_proxy_http.rs index 4dc86b47..27279df1 100644 --- a/web3_proxy/src/frontend/rpc_proxy_http.rs +++ b/web3_proxy/src/frontend/rpc_proxy_http.rs @@ -1,6 +1,7 @@ //! Take a user's HTTP JSON-RPC requests and either respond from local data or proxy the request to a backend rpc server. use super::authorization::{ip_is_authorized, key_is_authorized}; +use super::request_id::RequestId; use super::rpc_proxy_ws::ProxyMode; use crate::errors::Web3ProxyError; use crate::{app::App, jsonrpc::JsonRpcRequestEnum}; @@ -26,9 +27,18 @@ pub async fn proxy_web3_rpc( Extension(app): Extension>, InsecureClientIp(ip): InsecureClientIp, origin: Option>, + Extension(RequestId(request_id)): Extension, payload: Result, JsonRejection>, ) -> Result { - _proxy_web3_rpc(app, &ip, origin.as_deref(), payload, ProxyMode::Best).await + _proxy_web3_rpc( + app, + &ip, + origin.as_deref(), + payload, + ProxyMode::Best, + request_id, + ) + .await } #[debug_handler] @@ -36,11 +46,20 @@ pub async fn fastest_proxy_web3_rpc( Extension(app): Extension>, InsecureClientIp(ip): InsecureClientIp, origin: Option>, + Extension(RequestId(request_id)): Extension, payload: Result, JsonRejection>, ) -> Result { // TODO: read the fastest number from params // TODO: check that the app allows this without authentication - _proxy_web3_rpc(app, &ip, origin.as_deref(), payload, ProxyMode::Fastest(0)).await + _proxy_web3_rpc( + app, + &ip, + origin.as_deref(), + payload, + ProxyMode::Fastest(0), + request_id, + ) + .await } #[debug_handler] @@ -48,9 +67,18 @@ pub async fn versus_proxy_web3_rpc( Extension(app): Extension>, InsecureClientIp(ip): InsecureClientIp, origin: Option>, + Extension(RequestId(request_id)): Extension, payload: Result, JsonRejection>, ) -> Result { - _proxy_web3_rpc(app, &ip, origin.as_deref(), payload, ProxyMode::Versus).await + _proxy_web3_rpc( + app, + &ip, + origin.as_deref(), + payload, + ProxyMode::Versus, + request_id, + ) + .await } async fn _proxy_web3_rpc( @@ -59,6 +87,7 @@ async fn _proxy_web3_rpc( origin: Option<&Origin>, payload: Result, JsonRejection>, proxy_mode: ProxyMode, + request_id: String, ) -> Result { // TODO: create a stat if they error. (but we haven't parsed rpc_key yet, so it needs some thought) let payload = payload @@ -81,7 +110,7 @@ async fn _proxy_web3_rpc( // TODO: is first_id the right thing to attach to this error? let (status_code, response, rpcs) = app - .proxy_web3_rpc(authorization, payload) + .proxy_web3_rpc(authorization, payload, Some(request_id)) .await .map_err(|e| e.into_response_with_id(first_id))?; @@ -132,6 +161,7 @@ pub async fn proxy_web3_rpc_with_key( referer: Option>, user_agent: Option>, Path(rpc_key): Path, + Extension(RequestId(request_id)): Extension, payload: Result, JsonRejection>, ) -> Result { _proxy_web3_rpc_with_key( @@ -143,6 +173,7 @@ pub async fn proxy_web3_rpc_with_key( rpc_key, payload, ProxyMode::Best, + request_id, ) .await } @@ -158,6 +189,7 @@ pub async fn debug_proxy_web3_rpc_with_key( user_agent: Option>, request_headers: HeaderMap, Path(rpc_key): Path, + Extension(RequestId(request_id)): Extension, payload: Result, JsonRejection>, ) -> Result { let mut response = match _proxy_web3_rpc_with_key( @@ -169,6 +201,7 @@ pub async fn debug_proxy_web3_rpc_with_key( rpc_key, payload, ProxyMode::Debug, + request_id, ) .await { @@ -201,6 +234,7 @@ pub async fn fastest_proxy_web3_rpc_with_key( referer: Option>, user_agent: Option>, Path(rpc_key): Path, + Extension(RequestId(request_id)): Extension, payload: Result, JsonRejection>, ) -> Result { _proxy_web3_rpc_with_key( @@ -212,6 +246,7 @@ pub async fn fastest_proxy_web3_rpc_with_key( rpc_key, payload, ProxyMode::Fastest(0), + request_id, ) .await } @@ -224,6 +259,7 @@ pub async fn versus_proxy_web3_rpc_with_key( referer: Option>, user_agent: Option>, Path(rpc_key): Path, + Extension(RequestId(request_id)): Extension, payload: Result, JsonRejection>, ) -> Result { _proxy_web3_rpc_with_key( @@ -235,6 +271,7 @@ pub async fn versus_proxy_web3_rpc_with_key( rpc_key, payload, ProxyMode::Versus, + request_id, ) .await } @@ -249,6 +286,7 @@ async fn _proxy_web3_rpc_with_key( rpc_key: String, payload: Result, JsonRejection>, proxy_mode: ProxyMode, + request_id: String, ) -> Result { // TODO: DRY w/ proxy_web3_rpc // TODO: create a stat if they error. (but we haven't parsed rpc_key yet, so it needs some thought) @@ -276,7 +314,7 @@ async fn _proxy_web3_rpc_with_key( let rpc_secret_key_id = authorization.checks.rpc_secret_key_id; let (status_code, response, rpcs) = app - .proxy_web3_rpc(authorization, payload) + .proxy_web3_rpc(authorization, payload, Some(request_id)) .await .map_err(|e| e.into_response_with_id(first_id))?; diff --git a/web3_proxy/src/frontend/rpc_proxy_ws.rs b/web3_proxy/src/frontend/rpc_proxy_ws.rs index 39686ed3..5fc56960 100644 --- a/web3_proxy/src/frontend/rpc_proxy_ws.rs +++ b/web3_proxy/src/frontend/rpc_proxy_ws.rs @@ -327,6 +327,7 @@ async fn websocket_proxy_web3_rpc( None, json_request.into(), None, + None, ) .await?; @@ -361,6 +362,7 @@ async fn websocket_proxy_web3_rpc( None, json_request.into(), None, + None, ) .await?; @@ -405,7 +407,7 @@ async fn websocket_proxy_web3_rpc( Ok(response.into()) } _ => app - .proxy_web3_rpc(authorization, json_request.into()) + .proxy_web3_rpc(authorization, json_request.into(), None) .await .map(|(_, response, _)| response), } diff --git a/web3_proxy/src/jsonrpc/request.rs b/web3_proxy/src/jsonrpc/request.rs index 69c3123b..151b5afa 100644 --- a/web3_proxy/src/jsonrpc/request.rs +++ b/web3_proxy/src/jsonrpc/request.rs @@ -129,6 +129,7 @@ impl JsonRpcRequestEnum { None, RequestOrMethod::Method("invalid_method".into(), size), None, + None, ) .await .unwrap(); diff --git a/web3_proxy/src/jsonrpc/request_builder.rs b/web3_proxy/src/jsonrpc/request_builder.rs index af45509f..ef3c4a8e 100644 --- a/web3_proxy/src/jsonrpc/request_builder.rs +++ b/web3_proxy/src/jsonrpc/request_builder.rs @@ -175,6 +175,7 @@ impl RequestBuilder { permit, self.request_or_method.clone(), self.head_block.clone(), + None, ) .await; @@ -257,6 +258,9 @@ pub struct ValidatedRequest { /// limit the number of concurrent requests from a given user. pub permit: Option, + + /// RequestId from x-amzn-trace-id or generated + pub request_id: Option, } impl Display for ValidatedRequest { @@ -322,6 +326,7 @@ impl ValidatedRequest { permit: Option, mut request: RequestOrMethod, usd_per_cu: Decimal, + request_id: Option, ) -> Web3ProxyResult> { let start_instant = Instant::now(); @@ -381,6 +386,7 @@ impl ValidatedRequest { stat_sender, usd_per_cu, user_error_response: false.into(), + request_id, }; Ok(Arc::new(x)) @@ -394,11 +400,16 @@ impl ValidatedRequest { permit: Option, request: RequestOrMethod, head_block: Option, + request_id: Option, ) -> Web3ProxyResult> { #[cfg(feature = "rdkafka")] let kafka_debug_logger = if matches!(authorization.checks.proxy_mode, ProxyMode::Debug) { - // TODO: get this out of tracing instead (where we have a String from Amazon's LB) - let request_ulid = Ulid::new(); + let request_ulid = request_id + .map(|s| Ulid::from_string(&s)) + .transpose() + .ok() + .flatten() + .unwrap_or_else(Ulid::new); KafkaDebugLogger::try_new( app, @@ -426,6 +437,7 @@ impl ValidatedRequest { permit, request, usd_per_cu, + request_id, ) .await } @@ -452,6 +464,7 @@ impl ValidatedRequest { None, request.into(), head_block, + None, ) .await } else { @@ -466,6 +479,7 @@ impl ValidatedRequest { None, request.into(), Default::default(), + None, ) .await } diff --git a/web3_proxy/src/rpcs/request.rs b/web3_proxy/src/rpcs/request.rs index b47d82b0..f74ca358 100644 --- a/web3_proxy/src/rpcs/request.rs +++ b/web3_proxy/src/rpcs/request.rs @@ -209,7 +209,17 @@ impl OpenRequestHandle { .jsonrpc_request() .context("there should always be a request here")?; - let response = client.post(url).json(request).send().await?; + let mut request_builder = client.post(url).json(request); + if request.method == "eth_sendRawTransaction" { + if let Some(ref request_id) = self.web3_request.request_id { + let mut headers = reqwest::header::HeaderMap::with_capacity(1); + let request_id = reqwest::header::HeaderValue::from_str(&request_id) + .expect("request id should be a valid header"); + headers.insert("x-amzn-trace-id", request_id); + request_builder = request_builder.headers(headers); + } + } + let response = request_builder.send().await?; if response.status() == StatusCode::TOO_MANY_REQUESTS { // TODO: how much should we actually rate limit? diff --git a/web3_proxy_cli/src/sub_commands/migrate_stats_to_v2.rs b/web3_proxy_cli/src/sub_commands/migrate_stats_to_v2.rs index 34e30f7e..afea6891 100644 --- a/web3_proxy_cli/src/sub_commands/migrate_stats_to_v2.rs +++ b/web3_proxy_cli/src/sub_commands/migrate_stats_to_v2.rs @@ -217,6 +217,7 @@ impl MigrateStatsToV2SubCommand { connect_timeout: Default::default(), expire_timeout: Default::default(), permit: None, + request_id: None, }; web3_request.try_send_stat()?; From 9ea712dfd0c04996483d575ee61d2ab6aef81ef8 Mon Sep 17 00:00:00 2001 From: Rory Neithinger Date: Thu, 9 Nov 2023 16:53:25 -0800 Subject: [PATCH 2/2] always use x-amzn-trace-id even if not ulid --- web3_proxy/src/jsonrpc/request_builder.rs | 9 +-------- web3_proxy/src/kafka.rs | 4 ++-- 2 files changed, 3 insertions(+), 10 deletions(-) diff --git a/web3_proxy/src/jsonrpc/request_builder.rs b/web3_proxy/src/jsonrpc/request_builder.rs index ef3c4a8e..b5716ac4 100644 --- a/web3_proxy/src/jsonrpc/request_builder.rs +++ b/web3_proxy/src/jsonrpc/request_builder.rs @@ -404,19 +404,12 @@ impl ValidatedRequest { ) -> Web3ProxyResult> { #[cfg(feature = "rdkafka")] let kafka_debug_logger = if matches!(authorization.checks.proxy_mode, ProxyMode::Debug) { - let request_ulid = request_id - .map(|s| Ulid::from_string(&s)) - .transpose() - .ok() - .flatten() - .unwrap_or_else(Ulid::new); - KafkaDebugLogger::try_new( app, authorization.clone(), head_block.as_ref().map(|x| x.number()), "web3_proxy:rpc", - request_ulid, + &request_id, ) } else { None diff --git a/web3_proxy/src/kafka.rs b/web3_proxy/src/kafka.rs index d4c34ed1..fa26bd33 100644 --- a/web3_proxy/src/kafka.rs +++ b/web3_proxy/src/kafka.rs @@ -37,7 +37,7 @@ impl KafkaDebugLogger { authorization: Arc, head_block_num: Option, kafka_topic: &str, - request_ulid: Ulid, + request_id: &str, ) -> Option> { let kafka_producer = app.kafka_producer.clone()?; @@ -74,7 +74,7 @@ impl KafkaDebugLogger { }) .insert(KafkaHeader { key: "request_ulid", - value: Some(&request_ulid.to_string()), + value: Some(request_id), }) .insert(KafkaHeader { key: "head_block_num",