diff --git a/Cargo.lock b/Cargo.lock index 70c12bba..d9d9701d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6703,6 +6703,8 @@ dependencies = [ "tokio-stream", "toml 0.8.8", "tower-http", + "tower-layer", + "tower-service", "tracing", "tracing-subscriber", "ulid", diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index 75fdce07..d4e83a0f 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -88,6 +88,8 @@ tokio = { version = "1.33.0", features = ["full", "tracing"] } tokio-stream = { version = "0.1.14", features = ["sync"] } toml = "0.8.8" tower-http = { version = "0.4.4", features = ["cors", "normalize-path", "sensitive-headers", "trace"] } +tower-layer = "0.3.2" +tower-service = "0.3.2" tracing = "0.1" ulid = { version = "1.1.0", features = ["rand", "uuid", "serde"] } url = { version = "2.4.1" } diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 313ae2dc..1b42588b 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -957,7 +957,8 @@ impl App { ) -> Web3ProxyResult { let authorization = Arc::new(Authorization::internal()?); - self.authorized_request(method, params, authorization).await + self.authorized_request(method, params, authorization, None) + .await } /// this is way more round-a-bout than we want, but it means stats are emitted and caches are used @@ -966,12 +967,15 @@ impl App { method: &str, params: P, authorization: Arc, + request_id: Option, ) -> Web3ProxyResult { // TODO: proper ids let request = SingleRequest::new(LooseId::Number(1), method.to_string().into(), json!(params))?; - let (_, response, _) = self.proxy_request(request, authorization, None).await; + let (_, response, _) = self + .proxy_request(request, authorization, None, request_id) + .await; // TODO: error handling? match response.parsed().await?.payload { @@ -987,20 +991,21 @@ impl App { self: &Arc, authorization: Arc, request: JsonRpcRequestEnum, + request_id: Option, ) -> Web3ProxyResult<(StatusCode, jsonrpc::Response, Vec>)> { // trace!(?request, "proxy_web3_rpc"); let response = match request { JsonRpcRequestEnum::Single(request) => { let (status_code, response, rpcs) = self - .proxy_request(request, authorization.clone(), None) + .proxy_request(request, authorization.clone(), None, request_id) .await; (status_code, jsonrpc::Response::Single(response), rpcs) } JsonRpcRequestEnum::Batch(requests) => { let (responses, rpcs) = self - .proxy_web3_rpc_requests(&authorization, requests) + .proxy_web3_rpc_requests(&authorization, requests, request_id) .await?; // TODO: real status code. if an error happens, i don't think we are following the spec here @@ -1017,6 +1022,7 @@ impl App { self: &Arc, authorization: &Arc, requests: Vec, + request_id: Option, ) -> Web3ProxyResult<(Vec, Vec>)> { // TODO: we should probably change ethers-rs to support this directly. they pushed this off to v2 though let num_requests = requests.len(); @@ -1037,7 +1043,12 @@ impl App { requests .into_iter() .map(|request| { - self.proxy_request(request, authorization.clone(), Some(head_block.clone())) + self.proxy_request( + request, + authorization.clone(), + Some(head_block.clone()), + request_id.clone(), + ) }) .collect::>(), ) @@ -1237,6 +1248,7 @@ impl App { request: SingleRequest, authorization: Arc, head_block: Option, + request_id: Option, ) -> (StatusCode, jsonrpc::SingleResponse, Vec>) { // TODO: this clone is only for an error response. refactor to not need it let error_id = request.id.clone(); @@ -1251,6 +1263,7 @@ impl App { None, request.into(), head_block.clone(), + request_id, ) .await { diff --git a/web3_proxy/src/app/ws.rs b/web3_proxy/src/app/ws.rs index 7aaef131..9ac39ff9 100644 --- a/web3_proxy/src/app/ws.rs +++ b/web3_proxy/src/app/ws.rs @@ -90,6 +90,7 @@ impl App { None, RequestOrMethod::Method("eth_subscribe(newHeads)".into(), 0), Some(new_head), + None, ) .await; @@ -181,6 +182,7 @@ impl App { 0, ), None, + None, ) .await { diff --git a/web3_proxy/src/frontend/mod.rs b/web3_proxy/src/frontend/mod.rs index b389696e..2f078818 100644 --- a/web3_proxy/src/frontend/mod.rs +++ b/web3_proxy/src/frontend/mod.rs @@ -11,6 +11,7 @@ pub mod rpc_proxy_http; pub mod rpc_proxy_ws; pub mod status; pub mod users; +pub mod request_id; use crate::app::App; use crate::errors::Web3ProxyResult; @@ -18,6 +19,7 @@ use axum::{ routing::{get, post}, Extension, Router, }; +use request_id::RequestId; use http::{header::AUTHORIZATION, Request, StatusCode}; use hyper::Body; @@ -30,7 +32,6 @@ use tokio::{process::Command, sync::broadcast}; use tower_http::sensitive_headers::SetSensitiveRequestHeadersLayer; use tower_http::{cors::CorsLayer, normalize_path::NormalizePathLayer, trace::TraceLayer}; use tracing::{error, error_span, info, trace_span}; -use ulid::Ulid; #[cfg(feature = "listenfd")] use listenfd::ListenFd; @@ -283,12 +284,15 @@ pub fn make_router(app: Arc) -> Router<()> { // We get the request id from the header // If no header, a new Ulid is created // TODO: move this header name to config + /* let request_id = request .headers() .get("x-amzn-trace-id") .and_then(|x| x.to_str().ok()) .map(ToString::to_string) .unwrap_or_else(|| Ulid::new().to_string()); + */ + let request_id = &request.extensions().get::().unwrap().0; // And then we put it along with other information into the `request` span // TODO: what other info should we attach? how can we attach an error and a tracing span here? @@ -310,6 +314,7 @@ pub fn make_router(app: Arc) -> Router<()> { } }), // .on_failure(|| todo!("on failure that has the request and response body so we can debug more easily")), ) + .layer(request_id::RequestIdLayer) // 404 for any unknown routes .fallback(errors::handler_404) .with_state(app); diff --git a/web3_proxy/src/frontend/request_id.rs b/web3_proxy/src/frontend/request_id.rs new file mode 100644 index 00000000..61982f70 --- /dev/null +++ b/web3_proxy/src/frontend/request_id.rs @@ -0,0 +1,52 @@ +use std::task::{Context, Poll}; + +use http::Request; +use tower_service::Service; +use ulid::Ulid; + +/// RequestId from x-amzn-trace-id header or new Ulid +#[derive(Clone, Debug)] +pub struct RequestId(pub String); + +/// Middleware layer for adding RequestId as an Extension +#[derive(Clone, Debug)] +pub struct RequestIdLayer; + +impl tower_layer::Layer for RequestIdLayer { + type Service = RequestIdService; + + fn layer(&self, inner: S) -> Self::Service { + RequestIdService { inner } + } +} + +/// Service used by RequestIdLayer to inject RequestId +#[derive(Clone, Debug)] +pub struct RequestIdService { + inner: S, +} + +impl Service> for RequestIdService +where + S: Service>, +{ + type Response = S::Response; + type Error = S::Error; + type Future = S::Future; + + #[inline] + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, mut req: Request) -> Self::Future { + let request_id = req + .headers() + .get("x-amzn-trace-id") + .and_then(|x| x.to_str().ok()) + .map(ToString::to_string) + .unwrap_or_else(|| Ulid::new().to_string()); + req.extensions_mut().insert(RequestId(request_id)); + self.inner.call(req) + } +} diff --git a/web3_proxy/src/frontend/rpc_proxy_http.rs b/web3_proxy/src/frontend/rpc_proxy_http.rs index 2479096a..767a5cad 100644 --- a/web3_proxy/src/frontend/rpc_proxy_http.rs +++ b/web3_proxy/src/frontend/rpc_proxy_http.rs @@ -1,6 +1,7 @@ //! Take a user's HTTP JSON-RPC requests and either respond from local data or proxy the request to a backend rpc server. use super::authorization::{ip_is_authorized, key_is_authorized}; +use super::request_id::RequestId; use super::rpc_proxy_ws::ProxyMode; use crate::errors::Web3ProxyError; use crate::{app::App, jsonrpc::JsonRpcRequestEnum}; @@ -26,9 +27,18 @@ pub async fn proxy_web3_rpc( State(app): State>, InsecureClientIp(ip): InsecureClientIp, origin: Option>, + Extension(RequestId(request_id)): Extension, payload: Result, JsonRejection>, ) -> Result { - _proxy_web3_rpc(app, &ip, origin.as_deref(), payload, ProxyMode::Best).await + _proxy_web3_rpc( + app, + &ip, + origin.as_deref(), + payload, + ProxyMode::Best, + request_id, + ) + .await } #[debug_handler] @@ -36,11 +46,20 @@ pub async fn fastest_proxy_web3_rpc( State(app): State>, InsecureClientIp(ip): InsecureClientIp, origin: Option>, + Extension(RequestId(request_id)): Extension, payload: Result, JsonRejection>, ) -> Result { // TODO: read the fastest number from params // TODO: check that the app allows this without authentication - _proxy_web3_rpc(app, &ip, origin.as_deref(), payload, ProxyMode::Fastest(0)).await + _proxy_web3_rpc( + app, + &ip, + origin.as_deref(), + payload, + ProxyMode::Fastest(0), + request_id, + ) + .await } #[debug_handler] @@ -48,9 +67,18 @@ pub async fn versus_proxy_web3_rpc( State(app): State>, InsecureClientIp(ip): InsecureClientIp, origin: Option>, + Extension(RequestId(request_id)): Extension, payload: Result, JsonRejection>, ) -> Result { - _proxy_web3_rpc(app, &ip, origin.as_deref(), payload, ProxyMode::Versus).await + _proxy_web3_rpc( + app, + &ip, + origin.as_deref(), + payload, + ProxyMode::Versus, + request_id, + ) + .await } async fn _proxy_web3_rpc( @@ -59,6 +87,7 @@ async fn _proxy_web3_rpc( origin: Option<&Origin>, payload: Result, JsonRejection>, proxy_mode: ProxyMode, + request_id: String, ) -> Result { // TODO: create a stat if they error. (but we haven't parsed rpc_key yet, so it needs some thought) let payload = payload @@ -81,7 +110,7 @@ async fn _proxy_web3_rpc( // TODO: is first_id the right thing to attach to this error? let (status_code, response, rpcs) = app - .proxy_web3_rpc(authorization, payload) + .proxy_web3_rpc(authorization, payload, Some(request_id)) .await .map_err(|e| e.into_response_with_id(first_id))?; @@ -132,6 +161,7 @@ pub async fn proxy_web3_rpc_with_key( referer: Option>, user_agent: Option>, Path(rpc_key): Path, + Extension(RequestId(request_id)): Extension, payload: Result, JsonRejection>, ) -> Result { _proxy_web3_rpc_with_key( @@ -143,6 +173,7 @@ pub async fn proxy_web3_rpc_with_key( rpc_key, payload, ProxyMode::Best, + request_id, ) .await } @@ -158,6 +189,7 @@ pub async fn debug_proxy_web3_rpc_with_key( user_agent: Option>, request_headers: HeaderMap, Path(rpc_key): Path, + Extension(RequestId(request_id)): Extension, payload: Result, JsonRejection>, ) -> Result { let mut response = match _proxy_web3_rpc_with_key( @@ -169,6 +201,7 @@ pub async fn debug_proxy_web3_rpc_with_key( rpc_key, payload, ProxyMode::Debug, + request_id, ) .await { @@ -201,6 +234,7 @@ pub async fn fastest_proxy_web3_rpc_with_key( referer: Option>, user_agent: Option>, Path(rpc_key): Path, + Extension(RequestId(request_id)): Extension, payload: Result, JsonRejection>, ) -> Result { _proxy_web3_rpc_with_key( @@ -212,6 +246,7 @@ pub async fn fastest_proxy_web3_rpc_with_key( rpc_key, payload, ProxyMode::Fastest(0), + request_id, ) .await } @@ -224,6 +259,7 @@ pub async fn versus_proxy_web3_rpc_with_key( referer: Option>, user_agent: Option>, Path(rpc_key): Path, + Extension(RequestId(request_id)): Extension, payload: Result, JsonRejection>, ) -> Result { _proxy_web3_rpc_with_key( @@ -235,6 +271,7 @@ pub async fn versus_proxy_web3_rpc_with_key( rpc_key, payload, ProxyMode::Versus, + request_id, ) .await } @@ -249,6 +286,7 @@ async fn _proxy_web3_rpc_with_key( rpc_key: String, payload: Result, JsonRejection>, proxy_mode: ProxyMode, + request_id: String, ) -> Result { // TODO: DRY w/ proxy_web3_rpc // TODO: create a stat if they error. (but we haven't parsed rpc_key yet, so it needs some thought) @@ -276,7 +314,7 @@ async fn _proxy_web3_rpc_with_key( let rpc_secret_key_id = authorization.checks.rpc_secret_key_id; let (status_code, response, rpcs) = app - .proxy_web3_rpc(authorization, payload) + .proxy_web3_rpc(authorization, payload, Some(request_id)) .await .map_err(|e| e.into_response_with_id(first_id))?; diff --git a/web3_proxy/src/frontend/rpc_proxy_ws.rs b/web3_proxy/src/frontend/rpc_proxy_ws.rs index 7680c762..6a199627 100644 --- a/web3_proxy/src/frontend/rpc_proxy_ws.rs +++ b/web3_proxy/src/frontend/rpc_proxy_ws.rs @@ -327,6 +327,7 @@ async fn websocket_proxy_web3_rpc( None, json_request.into(), None, + None, ) .await?; @@ -361,6 +362,7 @@ async fn websocket_proxy_web3_rpc( None, json_request.into(), None, + None, ) .await?; @@ -405,7 +407,7 @@ async fn websocket_proxy_web3_rpc( Ok(response.into()) } _ => app - .proxy_web3_rpc(authorization, json_request.into()) + .proxy_web3_rpc(authorization, json_request.into(), None) .await .map(|(_, response, _)| response), } diff --git a/web3_proxy/src/jsonrpc/request.rs b/web3_proxy/src/jsonrpc/request.rs index 69c3123b..151b5afa 100644 --- a/web3_proxy/src/jsonrpc/request.rs +++ b/web3_proxy/src/jsonrpc/request.rs @@ -129,6 +129,7 @@ impl JsonRpcRequestEnum { None, RequestOrMethod::Method("invalid_method".into(), size), None, + None, ) .await .unwrap(); diff --git a/web3_proxy/src/jsonrpc/request_builder.rs b/web3_proxy/src/jsonrpc/request_builder.rs index 5b473339..4ecdd34e 100644 --- a/web3_proxy/src/jsonrpc/request_builder.rs +++ b/web3_proxy/src/jsonrpc/request_builder.rs @@ -175,6 +175,7 @@ impl RequestBuilder { permit, self.request_or_method.clone(), self.head_block.clone(), + None, ) .await; @@ -257,6 +258,9 @@ pub struct ValidatedRequest { /// limit the number of concurrent requests from a given user. pub permit: Option, + + /// RequestId from x-amzn-trace-id or generated + pub request_id: Option, } impl Display for ValidatedRequest { @@ -322,6 +326,7 @@ impl ValidatedRequest { permit: Option, mut request: RequestOrMethod, usd_per_cu: Decimal, + request_id: Option, ) -> Web3ProxyResult> { let start_instant = Instant::now(); @@ -390,6 +395,7 @@ impl ValidatedRequest { stat_sender, usd_per_cu, user_error_response: false.into(), + request_id, }; Ok(Arc::new(x)) @@ -403,18 +409,16 @@ impl ValidatedRequest { permit: Option, request: RequestOrMethod, head_block: Option, + request_id: Option, ) -> Web3ProxyResult> { #[cfg(feature = "rdkafka")] let kafka_debug_logger = if matches!(authorization.checks.proxy_mode, ProxyMode::Debug) { - // TODO: get this out of tracing instead (where we have a String from Amazon's LB) - let request_ulid = Ulid::new(); - KafkaDebugLogger::try_new( app, authorization.clone(), head_block.as_ref().map(|x| x.number()), "web3_proxy:rpc", - request_ulid, + &request_id, ) } else { None @@ -435,6 +439,7 @@ impl ValidatedRequest { permit, request, usd_per_cu, + request_id, ) .await } @@ -461,6 +466,7 @@ impl ValidatedRequest { None, request.into(), head_block, + None, ) .await } else { @@ -475,6 +481,7 @@ impl ValidatedRequest { None, request.into(), Default::default(), + None, ) .await } diff --git a/web3_proxy/src/kafka.rs b/web3_proxy/src/kafka.rs index d4c34ed1..fa26bd33 100644 --- a/web3_proxy/src/kafka.rs +++ b/web3_proxy/src/kafka.rs @@ -37,7 +37,7 @@ impl KafkaDebugLogger { authorization: Arc, head_block_num: Option, kafka_topic: &str, - request_ulid: Ulid, + request_id: &str, ) -> Option> { let kafka_producer = app.kafka_producer.clone()?; @@ -74,7 +74,7 @@ impl KafkaDebugLogger { }) .insert(KafkaHeader { key: "request_ulid", - value: Some(&request_ulid.to_string()), + value: Some(request_id), }) .insert(KafkaHeader { key: "head_block_num", diff --git a/web3_proxy/src/rpcs/request.rs b/web3_proxy/src/rpcs/request.rs index d4c8d421..28ef9b6f 100644 --- a/web3_proxy/src/rpcs/request.rs +++ b/web3_proxy/src/rpcs/request.rs @@ -209,7 +209,17 @@ impl OpenRequestHandle { .jsonrpc_request() .context("there should always be a request here")?; - let response = client.post(url).json(request).send().await?; + let mut request_builder = client.post(url).json(request); + if request.method == "eth_sendRawTransaction" { + if let Some(ref request_id) = self.web3_request.request_id { + let mut headers = reqwest::header::HeaderMap::with_capacity(1); + let request_id = reqwest::header::HeaderValue::from_str(&request_id) + .expect("request id should be a valid header"); + headers.insert("x-amzn-trace-id", request_id); + request_builder = request_builder.headers(headers); + } + } + let response = request_builder.send().await?; if response.status() == StatusCode::TOO_MANY_REQUESTS { // TODO: how much should we actually rate limit? diff --git a/web3_proxy_cli/src/sub_commands/migrate_stats_to_v2.rs b/web3_proxy_cli/src/sub_commands/migrate_stats_to_v2.rs index 34e30f7e..afea6891 100644 --- a/web3_proxy_cli/src/sub_commands/migrate_stats_to_v2.rs +++ b/web3_proxy_cli/src/sub_commands/migrate_stats_to_v2.rs @@ -217,6 +217,7 @@ impl MigrateStatsToV2SubCommand { connect_timeout: Default::default(), expire_timeout: Default::default(), permit: None, + request_id: None, }; web3_request.try_send_stat()?;