forward request id for eth_sendRawTransaction

currently looking into a single test case failure which may be due to
this change.
This commit is contained in:
Rory Neithinger 2023-11-07 00:57:30 -08:00
parent 6ce4796c94
commit 515d0af751
12 changed files with 157 additions and 15 deletions

2
Cargo.lock generated

@ -6692,6 +6692,8 @@ dependencies = [
"tokio-stream",
"toml 0.8.6",
"tower-http",
"tower-layer",
"tower-service",
"tracing",
"tracing-subscriber",
"ulid",

@ -88,6 +88,8 @@ tokio = { version = "1.33.0", features = ["full", "tracing"] }
tokio-stream = { version = "0.1.14", features = ["sync"] }
toml = "0.8.6"
tower-http = { version = "0.4.4", features = ["cors", "normalize-path", "sensitive-headers", "trace"] }
tower-layer = "0.3.2"
tower-service = "0.3.2"
tracing = "0.1"
ulid = { version = "1.1.0", features = ["rand", "uuid", "serde"] }
url = { version = "2.4.1" }

@ -957,7 +957,8 @@ impl App {
) -> Web3ProxyResult<R> {
let authorization = Arc::new(Authorization::internal()?);
self.authorized_request(method, params, authorization).await
self.authorized_request(method, params, authorization, None)
.await
}
/// this is way more round-a-bout than we want, but it means stats are emitted and caches are used
@ -966,12 +967,15 @@ impl App {
method: &str,
params: P,
authorization: Arc<Authorization>,
request_id: Option<String>,
) -> Web3ProxyResult<R> {
// TODO: proper ids
let request =
SingleRequest::new(LooseId::Number(1), method.to_string().into(), json!(params))?;
let (_, response, _) = self.proxy_request(request, authorization, None).await;
let (_, response, _) = self
.proxy_request(request, authorization, None, request_id)
.await;
// TODO: error handling?
match response.parsed().await?.payload {
@ -987,20 +991,21 @@ impl App {
self: &Arc<Self>,
authorization: Arc<Authorization>,
request: JsonRpcRequestEnum,
request_id: Option<String>,
) -> Web3ProxyResult<(StatusCode, jsonrpc::Response, Vec<Arc<Web3Rpc>>)> {
// trace!(?request, "proxy_web3_rpc");
let response = match request {
JsonRpcRequestEnum::Single(request) => {
let (status_code, response, rpcs) = self
.proxy_request(request, authorization.clone(), None)
.proxy_request(request, authorization.clone(), None, request_id)
.await;
(status_code, jsonrpc::Response::Single(response), rpcs)
}
JsonRpcRequestEnum::Batch(requests) => {
let (responses, rpcs) = self
.proxy_web3_rpc_requests(&authorization, requests)
.proxy_web3_rpc_requests(&authorization, requests, request_id)
.await?;
// TODO: real status code. if an error happens, i don't think we are following the spec here
@ -1017,6 +1022,7 @@ impl App {
self: &Arc<Self>,
authorization: &Arc<Authorization>,
requests: Vec<SingleRequest>,
request_id: Option<String>,
) -> Web3ProxyResult<(Vec<jsonrpc::ParsedResponse>, Vec<Arc<Web3Rpc>>)> {
// TODO: we should probably change ethers-rs to support this directly. they pushed this off to v2 though
let num_requests = requests.len();
@ -1037,7 +1043,12 @@ impl App {
requests
.into_iter()
.map(|request| {
self.proxy_request(request, authorization.clone(), Some(head_block.clone()))
self.proxy_request(
request,
authorization.clone(),
Some(head_block.clone()),
request_id.clone(),
)
})
.collect::<Vec<_>>(),
)
@ -1237,6 +1248,7 @@ impl App {
request: SingleRequest,
authorization: Arc<Authorization>,
head_block: Option<Web3ProxyBlock>,
request_id: Option<String>,
) -> (StatusCode, jsonrpc::SingleResponse, Vec<Arc<Web3Rpc>>) {
// TODO: this clone is only for an error response. refactor to not need it
let error_id = request.id.clone();
@ -1261,6 +1273,7 @@ impl App {
None,
request.clone().into(),
head_block.clone(),
request_id.clone(),
)
.await
{

@ -90,6 +90,7 @@ impl App {
None,
RequestOrMethod::Method("eth_subscribe(newHeads)".into(), 0),
Some(new_head),
None,
)
.await;
@ -181,6 +182,7 @@ impl App {
0,
),
None,
None,
)
.await
{

@ -9,6 +9,7 @@ pub mod rpc_proxy_http;
pub mod rpc_proxy_ws;
pub mod status;
pub mod users;
pub mod request_id;
use crate::app::App;
use crate::errors::Web3ProxyResult;
@ -16,6 +17,7 @@ use axum::{
routing::{get, post},
Extension, Router,
};
use request_id::RequestId;
use http::{header::AUTHORIZATION, Request, StatusCode};
use hyper::Body;
@ -28,7 +30,6 @@ use tokio::{process::Command, sync::broadcast};
use tower_http::sensitive_headers::SetSensitiveRequestHeadersLayer;
use tower_http::{cors::CorsLayer, normalize_path::NormalizePathLayer, trace::TraceLayer};
use tracing::{error, error_span, info, trace_span};
use ulid::Ulid;
#[cfg(feature = "listenfd")]
use listenfd::ListenFd;
@ -278,12 +279,15 @@ pub async fn serve(
// We get the request id from the header
// If no header, a new Ulid is created
// TODO: move this header name to config
/*
let request_id = request
.headers()
.get("x-amzn-trace-id")
.and_then(|x| x.to_str().ok())
.map(ToString::to_string)
.unwrap_or_else(|| Ulid::new().to_string());
*/
let request_id = &request.extensions().get::<RequestId>().unwrap().0;
// And then we put it along with other information into the `request` span
// TODO: what other info should we attach? how can we attach an error and a tracing span here?
@ -305,6 +309,7 @@ pub async fn serve(
}
}), // .on_failure(|| todo!("on failure that has the request and response body so we can debug more easily")),
)
.layer(request_id::RequestIdLayer)
// 404 for any unknown routes
.fallback(errors::handler_404);

@ -0,0 +1,52 @@
use std::task::{Context, Poll};
use http::Request;
use tower_service::Service;
use ulid::Ulid;
/// RequestId from x-amzn-trace-id header or new Ulid
#[derive(Clone, Debug)]
pub struct RequestId(pub String);
/// Middleware layer for adding RequestId as an Extension
#[derive(Clone, Debug)]
pub struct RequestIdLayer;
impl<S> tower_layer::Layer<S> for RequestIdLayer {
type Service = RequestIdService<S>;
fn layer(&self, inner: S) -> Self::Service {
RequestIdService { inner }
}
}
/// Service used by RequestIdLayer to inject RequestId
#[derive(Clone, Debug)]
pub struct RequestIdService<S> {
inner: S,
}
impl<ResBody, S> Service<Request<ResBody>> for RequestIdService<S>
where
S: Service<Request<ResBody>>,
{
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;
#[inline]
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}
fn call(&mut self, mut req: Request<ResBody>) -> Self::Future {
let request_id = req
.headers()
.get("x-amzn-trace-id")
.and_then(|x| x.to_str().ok())
.map(ToString::to_string)
.unwrap_or_else(|| Ulid::new().to_string());
req.extensions_mut().insert(RequestId(request_id));
self.inner.call(req)
}
}

@ -1,6 +1,7 @@
//! Take a user's HTTP JSON-RPC requests and either respond from local data or proxy the request to a backend rpc server.
use super::authorization::{ip_is_authorized, key_is_authorized};
use super::request_id::RequestId;
use super::rpc_proxy_ws::ProxyMode;
use crate::errors::Web3ProxyError;
use crate::{app::App, jsonrpc::JsonRpcRequestEnum};
@ -26,9 +27,18 @@ pub async fn proxy_web3_rpc(
Extension(app): Extension<Arc<App>>,
InsecureClientIp(ip): InsecureClientIp,
origin: Option<TypedHeader<Origin>>,
Extension(RequestId(request_id)): Extension<RequestId>,
payload: Result<Json<JsonRpcRequestEnum>, JsonRejection>,
) -> Result<Response, Response> {
_proxy_web3_rpc(app, &ip, origin.as_deref(), payload, ProxyMode::Best).await
_proxy_web3_rpc(
app,
&ip,
origin.as_deref(),
payload,
ProxyMode::Best,
request_id,
)
.await
}
#[debug_handler]
@ -36,11 +46,20 @@ pub async fn fastest_proxy_web3_rpc(
Extension(app): Extension<Arc<App>>,
InsecureClientIp(ip): InsecureClientIp,
origin: Option<TypedHeader<Origin>>,
Extension(RequestId(request_id)): Extension<RequestId>,
payload: Result<Json<JsonRpcRequestEnum>, JsonRejection>,
) -> Result<Response, Response> {
// TODO: read the fastest number from params
// TODO: check that the app allows this without authentication
_proxy_web3_rpc(app, &ip, origin.as_deref(), payload, ProxyMode::Fastest(0)).await
_proxy_web3_rpc(
app,
&ip,
origin.as_deref(),
payload,
ProxyMode::Fastest(0),
request_id,
)
.await
}
#[debug_handler]
@ -48,9 +67,18 @@ pub async fn versus_proxy_web3_rpc(
Extension(app): Extension<Arc<App>>,
InsecureClientIp(ip): InsecureClientIp,
origin: Option<TypedHeader<Origin>>,
Extension(RequestId(request_id)): Extension<RequestId>,
payload: Result<Json<JsonRpcRequestEnum>, JsonRejection>,
) -> Result<Response, Response> {
_proxy_web3_rpc(app, &ip, origin.as_deref(), payload, ProxyMode::Versus).await
_proxy_web3_rpc(
app,
&ip,
origin.as_deref(),
payload,
ProxyMode::Versus,
request_id,
)
.await
}
async fn _proxy_web3_rpc(
@ -59,6 +87,7 @@ async fn _proxy_web3_rpc(
origin: Option<&Origin>,
payload: Result<Json<JsonRpcRequestEnum>, JsonRejection>,
proxy_mode: ProxyMode,
request_id: String,
) -> Result<Response, Response> {
// TODO: create a stat if they error. (but we haven't parsed rpc_key yet, so it needs some thought)
let payload = payload
@ -81,7 +110,7 @@ async fn _proxy_web3_rpc(
// TODO: is first_id the right thing to attach to this error?
let (status_code, response, rpcs) = app
.proxy_web3_rpc(authorization, payload)
.proxy_web3_rpc(authorization, payload, Some(request_id))
.await
.map_err(|e| e.into_response_with_id(first_id))?;
@ -132,6 +161,7 @@ pub async fn proxy_web3_rpc_with_key(
referer: Option<TypedHeader<Referer>>,
user_agent: Option<TypedHeader<UserAgent>>,
Path(rpc_key): Path<String>,
Extension(RequestId(request_id)): Extension<RequestId>,
payload: Result<Json<JsonRpcRequestEnum>, JsonRejection>,
) -> Result<Response, Response> {
_proxy_web3_rpc_with_key(
@ -143,6 +173,7 @@ pub async fn proxy_web3_rpc_with_key(
rpc_key,
payload,
ProxyMode::Best,
request_id,
)
.await
}
@ -158,6 +189,7 @@ pub async fn debug_proxy_web3_rpc_with_key(
user_agent: Option<TypedHeader<UserAgent>>,
request_headers: HeaderMap,
Path(rpc_key): Path<String>,
Extension(RequestId(request_id)): Extension<RequestId>,
payload: Result<Json<JsonRpcRequestEnum>, JsonRejection>,
) -> Result<Response, Response> {
let mut response = match _proxy_web3_rpc_with_key(
@ -169,6 +201,7 @@ pub async fn debug_proxy_web3_rpc_with_key(
rpc_key,
payload,
ProxyMode::Debug,
request_id,
)
.await
{
@ -201,6 +234,7 @@ pub async fn fastest_proxy_web3_rpc_with_key(
referer: Option<TypedHeader<Referer>>,
user_agent: Option<TypedHeader<UserAgent>>,
Path(rpc_key): Path<String>,
Extension(RequestId(request_id)): Extension<RequestId>,
payload: Result<Json<JsonRpcRequestEnum>, JsonRejection>,
) -> Result<Response, Response> {
_proxy_web3_rpc_with_key(
@ -212,6 +246,7 @@ pub async fn fastest_proxy_web3_rpc_with_key(
rpc_key,
payload,
ProxyMode::Fastest(0),
request_id,
)
.await
}
@ -224,6 +259,7 @@ pub async fn versus_proxy_web3_rpc_with_key(
referer: Option<TypedHeader<Referer>>,
user_agent: Option<TypedHeader<UserAgent>>,
Path(rpc_key): Path<String>,
Extension(RequestId(request_id)): Extension<RequestId>,
payload: Result<Json<JsonRpcRequestEnum>, JsonRejection>,
) -> Result<Response, Response> {
_proxy_web3_rpc_with_key(
@ -235,6 +271,7 @@ pub async fn versus_proxy_web3_rpc_with_key(
rpc_key,
payload,
ProxyMode::Versus,
request_id,
)
.await
}
@ -249,6 +286,7 @@ async fn _proxy_web3_rpc_with_key(
rpc_key: String,
payload: Result<Json<JsonRpcRequestEnum>, JsonRejection>,
proxy_mode: ProxyMode,
request_id: String,
) -> Result<Response, Response> {
// TODO: DRY w/ proxy_web3_rpc
// TODO: create a stat if they error. (but we haven't parsed rpc_key yet, so it needs some thought)
@ -276,7 +314,7 @@ async fn _proxy_web3_rpc_with_key(
let rpc_secret_key_id = authorization.checks.rpc_secret_key_id;
let (status_code, response, rpcs) = app
.proxy_web3_rpc(authorization, payload)
.proxy_web3_rpc(authorization, payload, Some(request_id))
.await
.map_err(|e| e.into_response_with_id(first_id))?;

@ -327,6 +327,7 @@ async fn websocket_proxy_web3_rpc(
None,
json_request.into(),
None,
None,
)
.await?;
@ -361,6 +362,7 @@ async fn websocket_proxy_web3_rpc(
None,
json_request.into(),
None,
None,
)
.await?;
@ -405,7 +407,7 @@ async fn websocket_proxy_web3_rpc(
Ok(response.into())
}
_ => app
.proxy_web3_rpc(authorization, json_request.into())
.proxy_web3_rpc(authorization, json_request.into(), None)
.await
.map(|(_, response, _)| response),
}

@ -129,6 +129,7 @@ impl JsonRpcRequestEnum {
None,
RequestOrMethod::Method("invalid_method".into(), size),
None,
None,
)
.await
.unwrap();

@ -175,6 +175,7 @@ impl RequestBuilder {
permit,
self.request_or_method.clone(),
self.head_block.clone(),
None,
)
.await;
@ -257,6 +258,9 @@ pub struct ValidatedRequest {
/// limit the number of concurrent requests from a given user.
pub permit: Option<OwnedSemaphorePermit>,
/// RequestId from x-amzn-trace-id or generated
pub request_id: Option<String>,
}
impl Display for ValidatedRequest {
@ -322,6 +326,7 @@ impl ValidatedRequest {
permit: Option<OwnedSemaphorePermit>,
mut request: RequestOrMethod,
usd_per_cu: Decimal,
request_id: Option<String>,
) -> Web3ProxyResult<Arc<Self>> {
let start_instant = Instant::now();
@ -381,6 +386,7 @@ impl ValidatedRequest {
stat_sender,
usd_per_cu,
user_error_response: false.into(),
request_id,
};
Ok(Arc::new(x))
@ -394,11 +400,16 @@ impl ValidatedRequest {
permit: Option<OwnedSemaphorePermit>,
request: RequestOrMethod,
head_block: Option<Web3ProxyBlock>,
request_id: Option<String>,
) -> Web3ProxyResult<Arc<Self>> {
#[cfg(feature = "rdkafka")]
let kafka_debug_logger = if matches!(authorization.checks.proxy_mode, ProxyMode::Debug) {
// TODO: get this out of tracing instead (where we have a String from Amazon's LB)
let request_ulid = Ulid::new();
let request_ulid = request_id
.map(|s| Ulid::from_string(&s))
.transpose()
.ok()
.flatten()
.unwrap_or_else(Ulid::new);
KafkaDebugLogger::try_new(
app,
@ -426,6 +437,7 @@ impl ValidatedRequest {
permit,
request,
usd_per_cu,
request_id,
)
.await
}
@ -452,6 +464,7 @@ impl ValidatedRequest {
None,
request.into(),
head_block,
None,
)
.await
} else {
@ -466,6 +479,7 @@ impl ValidatedRequest {
None,
request.into(),
Default::default(),
None,
)
.await
}

@ -209,7 +209,17 @@ impl OpenRequestHandle {
.jsonrpc_request()
.context("there should always be a request here")?;
let response = client.post(url).json(request).send().await?;
let mut request_builder = client.post(url).json(request);
if request.method == "eth_sendRawTransaction" {
if let Some(ref request_id) = self.web3_request.request_id {
let mut headers = reqwest::header::HeaderMap::with_capacity(1);
let request_id = reqwest::header::HeaderValue::from_str(&request_id)
.expect("request id should be a valid header");
headers.insert("x-amzn-trace-id", request_id);
request_builder = request_builder.headers(headers);
}
}
let response = request_builder.send().await?;
if response.status() == StatusCode::TOO_MANY_REQUESTS {
// TODO: how much should we actually rate limit?

@ -217,6 +217,7 @@ impl MigrateStatsToV2SubCommand {
connect_timeout: Default::default(),
expire_timeout: Default::default(),
permit: None,
request_id: None,
};
web3_request.try_send_stat()?;