add backend nodes to the rpc response headers

only do this in dev?
This commit is contained in:
Bryan Stitt 2022-12-19 21:37:12 -08:00
parent f27c764a07
commit 82eb449e96
6 changed files with 129 additions and 67 deletions

@ -108,7 +108,7 @@ impl RedisRateLimiter {
// do the query
.query_async(&mut *conn)
.await
.context("increment rate limit and set expiration")?;
.context("cannot increment rate limit or set expiration")?;
let new_count: u64 = *x.first().expect("check redis");

@ -5,11 +5,13 @@ use crate::app_stats::{ProxyResponseStat, StatEmitter, Web3ProxyStat};
use crate::block_number::{block_needed, BlockNeeded};
use crate::config::{AppConfig, TopConfig};
use crate::frontend::authorization::{Authorization, RequestMetadata};
use crate::frontend::errors::FrontendErrorResponse;
use crate::jsonrpc::JsonRpcForwardedResponse;
use crate::jsonrpc::JsonRpcForwardedResponseEnum;
use crate::jsonrpc::JsonRpcRequest;
use crate::jsonrpc::JsonRpcRequestEnum;
use crate::rpcs::blockchain::{ArcBlock, SavedBlock};
use crate::rpcs::connection::Web3Connection;
use crate::rpcs::connections::Web3Connections;
use crate::rpcs::request::OpenRequestHandleMetrics;
use crate::rpcs::transactions::TxStatus;
@ -21,11 +23,10 @@ use derive_more::From;
use entities::sea_orm_active_enums::LogLevel;
use ethers::core::utils::keccak256;
use ethers::prelude::{Address, Block, Bytes, TxHash, H256, U64};
use ethers::types::U256;
use futures::future::join_all;
use futures::stream::FuturesUnordered;
use futures::stream::StreamExt;
use hashbrown::HashMap;
use hashbrown::{HashMap, HashSet};
use ipnet::IpNet;
use log::{debug, error, info, warn};
use metered::{metered, ErrorCount, HitCount, ResponseTime, Throughput};
@ -708,7 +709,8 @@ impl Web3ProxyApp {
self: &Arc<Self>,
authorization: Arc<Authorization>,
request: JsonRpcRequestEnum,
) -> anyhow::Result<JsonRpcForwardedResponseEnum> {
) -> Result<(JsonRpcForwardedResponseEnum, Vec<Arc<Web3Connection>>), FrontendErrorResponse>
{
// TODO: this should probably be trace level
// // trace!(?request, "proxy_web3_rpc");
@ -718,24 +720,25 @@ impl Web3ProxyApp {
let max_time = Duration::from_secs(120);
let response = match request {
JsonRpcRequestEnum::Single(request) => JsonRpcForwardedResponseEnum::Single(
timeout(
JsonRpcRequestEnum::Single(request) => {
let (response, rpcs) = timeout(
max_time,
self.proxy_web3_rpc_request(&authorization, request),
)
.await??,
),
JsonRpcRequestEnum::Batch(requests) => JsonRpcForwardedResponseEnum::Batch(
timeout(
.await??;
(JsonRpcForwardedResponseEnum::Single(response), rpcs)
}
JsonRpcRequestEnum::Batch(requests) => {
let (responses, rpcs) = timeout(
max_time,
self.proxy_web3_rpc_requests(&authorization, requests),
)
.await??,
),
};
.await??;
// TODO: this should probably be trace level
// // trace!(?response, "Forwarding");
(JsonRpcForwardedResponseEnum::Batch(responses), rpcs)
}
};
Ok(response)
}
@ -746,12 +749,12 @@ impl Web3ProxyApp {
self: &Arc<Self>,
authorization: &Arc<Authorization>,
requests: Vec<JsonRpcRequest>,
) -> anyhow::Result<Vec<JsonRpcForwardedResponse>> {
// TODO: we should probably change ethers-rs to support this directly
) -> anyhow::Result<(Vec<JsonRpcForwardedResponse>, Vec<Arc<Web3Connection>>)> {
// TODO: we should probably change ethers-rs to support this directly. they pushed this off to v2 though
let num_requests = requests.len();
// TODO: spawn so the requests go in parallel
// TODO: i think we will need to flatten
// TODO: spawn so the requests go in parallel? need to think about rate limiting more if we do that
// TODO: improve flattening
let responses = join_all(
requests
.into_iter()
@ -760,14 +763,21 @@ impl Web3ProxyApp {
)
.await;
// TODO: i'm sure this could be done better with iterators. we could return the error earlier then, too
// TODO: i'm sure this could be done better with iterators
// TODO: stream the response?
let mut collected: Vec<JsonRpcForwardedResponse> = Vec::with_capacity(num_requests);
let mut collected_rpcs: HashSet<Arc<Web3Connection>> = HashSet::new();
for response in responses {
collected.push(response?);
// TODO: any way to attach the tried rpcs to the error? it is likely helpful
let (response, rpcs) = response?;
collected.push(response);
collected_rpcs.extend(rpcs.into_iter());
}
Ok(collected)
let collected_rpcs: Vec<_> = collected_rpcs.into_iter().collect();
Ok((collected, collected_rpcs))
}
/// TODO: i don't think we want or need this. just use app.db_conn, or maybe app.db_conn.clone() or app.db_conn.as_ref()
@ -795,7 +805,7 @@ impl Web3ProxyApp {
self: &Arc<Self>,
authorization: &Arc<Authorization>,
mut request: JsonRpcRequest,
) -> anyhow::Result<JsonRpcForwardedResponse> {
) -> anyhow::Result<(JsonRpcForwardedResponse, Vec<Arc<Web3Connection>>)> {
// trace!("Received request: {:?}", request);
let request_metadata = Arc::new(RequestMetadata::new(REQUEST_PERIOD, request.num_bytes())?);
@ -917,6 +927,8 @@ impl Web3ProxyApp {
// no stats on this. its cheap
json!(Address::zero())
}
/*
// erigon was giving bad estimates. but now it doesn't need it
"eth_estimateGas" => {
// TODO: eth_estimateGas using anvil?
// TODO: modify the block requested?
@ -937,15 +949,18 @@ impl Web3ProxyApp {
parsed_gas_estimate
} else {
// i think this is always an error response
return Ok(response);
let rpcs = request_metadata.backend_requests.lock().clone();
return Ok((response, rpcs));
};
// increase by 10.01%
// increase by 1.01%
let parsed_gas_estimate =
parsed_gas_estimate * U256::from(110_010) / U256::from(100_000);
parsed_gas_estimate * U256::from(101_010) / U256::from(100_000);
json!(parsed_gas_estimate)
}
*/
// TODO: eth_gasPrice that does awesome magic to predict the future
"eth_hashrate" => {
// no stats on this. its cheap
@ -959,16 +974,24 @@ impl Web3ProxyApp {
// broadcast transactions to all private rpcs at once
"eth_sendRawTransaction" => {
// emit stats
let rpcs = self.private_rpcs.as_ref().unwrap_or(&self.balanced_rpcs);
let private_rpcs = self.private_rpcs.as_ref().unwrap_or(&self.balanced_rpcs);
return rpcs
let mut response = private_rpcs
.try_send_all_upstream_servers(
authorization,
request,
Some(request_metadata),
Some(request_metadata.clone()),
None,
)
.await;
.await?;
response.id = request_id;
let rpcs = request_metadata.backend_requests.lock().clone();
// TODO! STATS!
return Ok((response, rpcs));
}
"eth_syncing" => {
// no stats on this. its cheap
@ -1134,6 +1157,9 @@ impl Web3ProxyApp {
// replace the id with our request's id.
response.id = request_id;
// TODO: DRY!
let rpcs = request_metadata.backend_requests.lock().clone();
if let Some(stat_sender) = self.stat_sender.as_ref() {
let response_stat = ProxyResponseStat::new(
method.to_string(),
@ -1148,12 +1174,15 @@ impl Web3ProxyApp {
.context("stat_sender sending response_stat")?;
}
return Ok(response);
return Ok((response, rpcs));
}
};
let response = JsonRpcForwardedResponse::from_value(partial_response, request_id);
// TODO: DRY
let rpcs = request_metadata.backend_requests.lock().clone();
if let Some(stat_sender) = self.stat_sender.as_ref() {
let response_stat = ProxyResponseStat::new(
request_method,
@ -1168,9 +1197,7 @@ impl Web3ProxyApp {
.context("stat_sender sending response stat")?;
}
todo!("attach a header here");
Ok(response)
Ok((response, rpcs))
}
}

@ -15,7 +15,6 @@ use log::{trace, warn};
use migration::sea_orm::DbErr;
use redis_rate_limiter::redis::RedisError;
use reqwest::header::ToStrError;
use std::error::Error;
use tokio::{sync::AcquireError, task::JoinError, time::Instant};
// TODO: take "IntoResponse" instead of Response?
@ -27,7 +26,6 @@ pub enum FrontendErrorResponse {
AccessDenied,
Anyhow(anyhow::Error),
SemaphoreAcquireError(AcquireError),
Box(Box<dyn Error>),
Database(DbErr),
HeadersError(headers::Error),
HeaderToString(ToStrError),
@ -40,6 +38,8 @@ pub enum FrontendErrorResponse {
Response(Response),
/// simple way to return an error message to the user and an anyhow to our logs
StatusCode(StatusCode, String, Option<anyhow::Error>),
/// TODO: what should be attached to the timout?
Timeout(tokio::time::error::Elapsed),
UlidDecodeError(ulid::DecodeError),
UnknownKey,
}
@ -74,18 +74,18 @@ impl IntoResponse for FrontendErrorResponse {
),
)
}
Self::Box(err) => {
warn!("boxed err={:?}", err);
(
StatusCode::INTERNAL_SERVER_ERROR,
JsonRpcForwardedResponse::from_str(
// TODO: make this better. maybe include the error type?
"boxed error!",
Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()),
None,
),
)
}
// Self::(err) => {
// warn!("boxed err={:?}", err);
// (
// StatusCode::INTERNAL_SERVER_ERROR,
// JsonRpcForwardedResponse::from_str(
// // TODO: make this better. maybe include the error type?
// "boxed error!",
// Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()),
// None,
// ),
// )
// }
Self::Database(err) => {
warn!("database err={:?}", err);
(
@ -131,12 +131,20 @@ impl IntoResponse for FrontendErrorResponse {
)
}
Self::JoinError(err) => {
warn!("JoinError. likely shutting down. err={:?}", err);
let code = if err.is_cancelled() {
trace!("JoinError. likely shutting down. err={:?}", err);
StatusCode::BAD_GATEWAY
} else {
warn!("JoinError. err={:?}", err);
StatusCode::INTERNAL_SERVER_ERROR
};
(
StatusCode::INTERNAL_SERVER_ERROR,
code,
JsonRpcForwardedResponse::from_str(
// TODO: different messages, too?
"Unable to complete request",
Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()),
Some(code.as_u16().into()),
None,
),
)
@ -226,8 +234,17 @@ impl IntoResponse for FrontendErrorResponse {
JsonRpcForwardedResponse::from_str(&err_msg, Some(code.into()), None),
)
}
Self::Timeout(x) => (
StatusCode::REQUEST_TIMEOUT,
JsonRpcForwardedResponse::from_str(
&format!("request timed out: {:?}", x),
Some(StatusCode::REQUEST_TIMEOUT.as_u16().into()),
// TODO: include the actual id!
None,
),
),
Self::HeaderToString(err) => {
// // trace!(?err, "HeaderToString");
// trace!(?err, "HeaderToString");
(
StatusCode::BAD_REQUEST,
JsonRpcForwardedResponse::from_str(
@ -238,7 +255,7 @@ impl IntoResponse for FrontendErrorResponse {
)
}
Self::UlidDecodeError(err) => {
// // trace!(?err, "UlidDecodeError");
// trace!(?err, "UlidDecodeError");
(
StatusCode::BAD_REQUEST,
JsonRpcForwardedResponse::from_str(

@ -9,6 +9,7 @@ use axum::TypedHeader;
use axum::{response::IntoResponse, Extension, Json};
use axum_client_ip::ClientIp;
use axum_macros::debug_handler;
use itertools::Itertools;
use std::sync::Arc;
/// POST /rpc -- Public entrypoint for HTTP JSON-RPC requests. Web3 wallets use this.
@ -24,16 +25,30 @@ pub async fn proxy_web3_rpc(
// TODO: do we care about keeping the TypedHeader wrapper?
let origin = origin.map(|x| x.0);
let (authorization, _semaphore) = ip_is_authorized(&app, ip, origin).await?;
// TODO: move ip_is_authorized/key_is_authorized into proxy_web3_rpc
let f = tokio::spawn(async move {
let (authorization, _semaphore) = ip_is_authorized(&app, ip, origin).await?;
let authorization = Arc::new(authorization);
let authorization = Arc::new(authorization);
// TODO: spawn earlier? i think we want ip_is_authorized in this future
let f = tokio::spawn(async move { app.proxy_web3_rpc(authorization, payload).await });
app.proxy_web3_rpc(authorization, payload).await
});
let response = f.await??;
let (response, rpcs) = f.await??;
Ok(Json(&response).into_response())
let mut response = Json(&response).into_response();
let headers = response.headers_mut();
// TODO: special string if no rpcs were used (cache hit)?
let rpcs: String = rpcs.into_iter().map(|x| x.name.clone()).join(",");
headers.insert(
"W3P-RPCs",
rpcs.parse().expect("W3P-RPCS should always parse"),
);
Ok(response)
}
/// Authenticated entrypoint for HTTP JSON-RPC requests. Web3 wallets use this.

@ -180,7 +180,6 @@ async fn handle_socket_payload(
// TODO: do any clients send batches over websockets?
let (id, response) = match serde_json::from_str::<JsonRpcRequest>(payload) {
Ok(json_request) => {
// TODO: should we use this id for the subscription id? it should be unique and means we dont need an atomic
let id = json_request.id.clone();
let response: anyhow::Result<JsonRpcForwardedResponseEnum> = match &json_request.method
@ -251,8 +250,13 @@ async fn handle_socket_payload(
Ok(response.into())
}
_ => {
app.proxy_web3_rpc(authorization.clone(), json_request.into())
let (response, _) = app
.proxy_web3_rpc(authorization.clone(), json_request.into())
.await
// TODO: DO NOT UNWRAP HERE! ANY FAILING MESSAGES WILL KEPP THE CONNECTION!
.unwrap();
Ok(response)
}
};
@ -266,15 +270,16 @@ async fn handle_socket_payload(
};
let response_str = match response {
Ok(x) => serde_json::to_string(&x),
Ok(x) => serde_json::to_string(&x).expect("to_string should always work here"),
Err(err) => {
// we have an anyhow error. turn it into a response
let response = JsonRpcForwardedResponse::from_anyhow_error(err, None, Some(id));
serde_json::to_string(&response)
serde_json::to_string(&response).expect("to_string should always work here")
}
}
};
// TODO: what error should this be?
.unwrap();
Message::Text(response_str)
}

@ -1,11 +1,9 @@
use crate::rpcs::connection::Web3Connection;
use derive_more::From;
use ethers::prelude::{HttpClientError, ProviderError, WsClientError};
use serde::de::{self, Deserializer, MapAccess, SeqAccess, Visitor};
use serde::{Deserialize, Serialize};
use serde_json::value::RawValue;
use std::fmt;
use std::sync::Arc;
// this is used by serde
#[allow(dead_code)]