add more headers for debug endpoints and use more refs

This commit is contained in:
Bryan Stitt 2023-06-21 22:11:26 -07:00
parent 095a505017
commit c560a59ef0
3 changed files with 180 additions and 151 deletions

View File

@ -678,10 +678,10 @@ impl Authorization {
Self::try_new( Self::try_new(
authorization_checks, authorization_checks,
db_conn, db_conn,
ip, &ip,
None, None,
None, None,
user_agent, user_agent.as_ref(),
AuthorizationType::Internal, AuthorizationType::Internal,
) )
} }
@ -689,15 +689,15 @@ impl Authorization {
pub fn external( pub fn external(
allowed_origin_requests_per_period: &HashMap<String, u64>, allowed_origin_requests_per_period: &HashMap<String, u64>,
db_conn: Option<DatabaseConnection>, db_conn: Option<DatabaseConnection>,
ip: IpAddr, ip: &IpAddr,
origin: Option<Origin>, origin: Option<&Origin>,
proxy_mode: ProxyMode, proxy_mode: ProxyMode,
referer: Option<Referer>, referer: Option<&Referer>,
user_agent: Option<UserAgent>, user_agent: Option<&UserAgent>,
) -> Web3ProxyResult<Self> { ) -> Web3ProxyResult<Self> {
// some origins can override max_requests_per_period for anon users // some origins can override max_requests_per_period for anon users
// TODO: i don't like the `to_string` here
let max_requests_per_period = origin let max_requests_per_period = origin
.as_ref()
.map(|origin| { .map(|origin| {
allowed_origin_requests_per_period allowed_origin_requests_per_period
.get(&origin.to_string()) .get(&origin.to_string())
@ -726,54 +726,54 @@ impl Authorization {
pub fn try_new( pub fn try_new(
authorization_checks: AuthorizationChecks, authorization_checks: AuthorizationChecks,
db_conn: Option<DatabaseConnection>, db_conn: Option<DatabaseConnection>,
ip: IpAddr, ip: &IpAddr,
origin: Option<Origin>, origin: Option<&Origin>,
referer: Option<Referer>, referer: Option<&Referer>,
user_agent: Option<UserAgent>, user_agent: Option<&UserAgent>,
authorization_type: AuthorizationType, authorization_type: AuthorizationType,
) -> Web3ProxyResult<Self> { ) -> Web3ProxyResult<Self> {
// check ip // check ip
match &authorization_checks.allowed_ips { match &authorization_checks.allowed_ips {
None => {} None => {}
Some(allowed_ips) => { Some(allowed_ips) => {
if !allowed_ips.iter().any(|x| x.contains(&ip)) { if !allowed_ips.iter().any(|x| x.contains(ip)) {
return Err(Web3ProxyError::IpNotAllowed(ip)); return Err(Web3ProxyError::IpNotAllowed(ip.to_owned()));
} }
} }
} }
// check origin // check origin
match (&origin, &authorization_checks.allowed_origins) { match (origin, &authorization_checks.allowed_origins) {
(None, None) => {} (None, None) => {}
(Some(_), None) => {} (Some(_), None) => {}
(None, Some(_)) => return Err(Web3ProxyError::OriginRequired), (None, Some(_)) => return Err(Web3ProxyError::OriginRequired),
(Some(origin), Some(allowed_origins)) => { (Some(origin), Some(allowed_origins)) => {
if !allowed_origins.contains(origin) { if !allowed_origins.contains(origin) {
return Err(Web3ProxyError::OriginNotAllowed(origin.clone())); return Err(Web3ProxyError::OriginNotAllowed(origin.to_owned()));
} }
} }
} }
// check referer // check referer
match (&referer, &authorization_checks.allowed_referers) { match (referer, &authorization_checks.allowed_referers) {
(None, None) => {} (None, None) => {}
(Some(_), None) => {} (Some(_), None) => {}
(None, Some(_)) => return Err(Web3ProxyError::RefererRequired), (None, Some(_)) => return Err(Web3ProxyError::RefererRequired),
(Some(referer), Some(allowed_referers)) => { (Some(referer), Some(allowed_referers)) => {
if !allowed_referers.contains(referer) { if !allowed_referers.contains(referer) {
return Err(Web3ProxyError::RefererNotAllowed(referer.clone())); return Err(Web3ProxyError::RefererNotAllowed(referer.to_owned()));
} }
} }
} }
// check user_agent // check user_agent
match (&user_agent, &authorization_checks.allowed_user_agents) { match (user_agent, &authorization_checks.allowed_user_agents) {
(None, None) => {} (None, None) => {}
(Some(_), None) => {} (Some(_), None) => {}
(None, Some(_)) => return Err(Web3ProxyError::UserAgentRequired), (None, Some(_)) => return Err(Web3ProxyError::UserAgentRequired),
(Some(user_agent), Some(allowed_user_agents)) => { (Some(user_agent), Some(allowed_user_agents)) => {
if !allowed_user_agents.contains(user_agent) { if !allowed_user_agents.contains(user_agent) {
return Err(Web3ProxyError::UserAgentNotAllowed(user_agent.clone())); return Err(Web3ProxyError::UserAgentNotAllowed(user_agent.to_owned()));
} }
} }
} }
@ -781,10 +781,10 @@ impl Authorization {
Ok(Self { Ok(Self {
checks: authorization_checks, checks: authorization_checks,
db_conn, db_conn,
ip, ip: *ip,
origin, origin: origin.cloned(),
referer, referer: referer.cloned(),
user_agent, user_agent: user_agent.cloned(),
authorization_type, authorization_type,
}) })
} }
@ -809,8 +809,8 @@ pub async fn login_is_authorized(app: &Web3ProxyApp, ip: IpAddr) -> Web3ProxyRes
/// keep the semaphore alive until the user's request is entirely complete /// keep the semaphore alive until the user's request is entirely complete
pub async fn ip_is_authorized( pub async fn ip_is_authorized(
app: &Arc<Web3ProxyApp>, app: &Arc<Web3ProxyApp>,
ip: IpAddr, ip: &IpAddr,
origin: Option<Origin>, origin: Option<&Origin>,
proxy_mode: ProxyMode, proxy_mode: ProxyMode,
) -> Web3ProxyResult<(Authorization, Option<OwnedSemaphorePermit>)> { ) -> Web3ProxyResult<(Authorization, Option<OwnedSemaphorePermit>)> {
// TODO: i think we could write an `impl From` for this // TODO: i think we could write an `impl From` for this
@ -836,6 +836,8 @@ pub async fn ip_is_authorized(
// in the background, add the ip to a recent_users map // in the background, add the ip to a recent_users map
if app.config.public_recent_ips_salt.is_some() { if app.config.public_recent_ips_salt.is_some() {
let app = app.clone(); let app = app.clone();
let ip = *ip;
let f = async move { let f = async move {
let now = Utc::now().timestamp(); let now = Utc::now().timestamp();
@ -875,12 +877,12 @@ pub async fn ip_is_authorized(
/// keep the semaphore alive until the user's request is entirely complete /// keep the semaphore alive until the user's request is entirely complete
pub async fn key_is_authorized( pub async fn key_is_authorized(
app: &Arc<Web3ProxyApp>, app: &Arc<Web3ProxyApp>,
rpc_key: RpcSecretKey, rpc_key: &RpcSecretKey,
ip: IpAddr, ip: &IpAddr,
origin: Option<Origin>, origin: Option<&Origin>,
proxy_mode: ProxyMode, proxy_mode: ProxyMode,
referer: Option<Referer>, referer: Option<&Referer>,
user_agent: Option<UserAgent>, user_agent: Option<&UserAgent>,
) -> Web3ProxyResult<(Authorization, Option<OwnedSemaphorePermit>)> { ) -> Web3ProxyResult<(Authorization, Option<OwnedSemaphorePermit>)> {
// check the rate limits. error if over the limit // check the rate limits. error if over the limit
// TODO: i think this should be in an "impl From" or "impl Into" // TODO: i think this should be in an "impl From" or "impl Into"
@ -1035,7 +1037,7 @@ impl Web3ProxyApp {
let authorization = Authorization::external( let authorization = Authorization::external(
&self.config.allowed_origin_requests_per_period, &self.config.allowed_origin_requests_per_period,
self.db_conn(), self.db_conn(),
ip, &ip,
None, None,
proxy_mode, proxy_mode,
None, None,
@ -1084,8 +1086,8 @@ impl Web3ProxyApp {
pub async fn rate_limit_by_ip( pub async fn rate_limit_by_ip(
&self, &self,
allowed_origin_requests_per_period: &HashMap<String, u64>, allowed_origin_requests_per_period: &HashMap<String, u64>,
ip: IpAddr, ip: &IpAddr,
origin: Option<Origin>, origin: Option<&Origin>,
proxy_mode: ProxyMode, proxy_mode: ProxyMode,
) -> Web3ProxyResult<RateLimitResult> { ) -> Web3ProxyResult<RateLimitResult> {
if ip.is_loopback() { if ip.is_loopback() {
@ -1109,12 +1111,12 @@ impl Web3ProxyApp {
if let Some(rate_limiter) = &self.frontend_ip_rate_limiter { if let Some(rate_limiter) = &self.frontend_ip_rate_limiter {
match rate_limiter match rate_limiter
.throttle(ip, authorization.checks.max_requests_per_period, 1) .throttle(*ip, authorization.checks.max_requests_per_period, 1)
.await .await
{ {
Ok(DeferredRateLimitResult::Allowed) => { Ok(DeferredRateLimitResult::Allowed) => {
// rate limit allowed us. check concurrent request limits // rate limit allowed us. check concurrent request limits
let semaphore = self.ip_semaphore(&ip).await?; let semaphore = self.ip_semaphore(ip).await?;
Ok(RateLimitResult::Allowed(authorization, semaphore)) Ok(RateLimitResult::Allowed(authorization, semaphore))
} }
@ -1134,14 +1136,14 @@ impl Web3ProxyApp {
error!("rate limiter is unhappy. allowing ip. err={:?}", err); error!("rate limiter is unhappy. allowing ip. err={:?}", err);
// at least we can still check the semaphore // at least we can still check the semaphore
let semaphore = self.ip_semaphore(&ip).await?; let semaphore = self.ip_semaphore(ip).await?;
Ok(RateLimitResult::Allowed(authorization, semaphore)) Ok(RateLimitResult::Allowed(authorization, semaphore))
} }
} }
} else { } else {
// no redis, but we can still check the ip semaphore // no redis, but we can still check the ip semaphore
let semaphore = self.ip_semaphore(&ip).await?; let semaphore = self.ip_semaphore(ip).await?;
// TODO: if no redis, rate limit with a local cache? "warn!" probably isn't right // TODO: if no redis, rate limit with a local cache? "warn!" probably isn't right
Ok(RateLimitResult::Allowed(authorization, semaphore)) Ok(RateLimitResult::Allowed(authorization, semaphore))
@ -1217,10 +1219,10 @@ impl Web3ProxyApp {
pub(crate) async fn authorization_checks( pub(crate) async fn authorization_checks(
&self, &self,
proxy_mode: ProxyMode, proxy_mode: ProxyMode,
rpc_secret_key: RpcSecretKey, rpc_secret_key: &RpcSecretKey,
) -> Web3ProxyResult<AuthorizationChecks> { ) -> Web3ProxyResult<AuthorizationChecks> {
self.rpc_secret_key_cache self.rpc_secret_key_cache
.try_get_with_by_ref(&rpc_secret_key, async move { .try_get_with_by_ref(rpc_secret_key, async move {
// trace!(?rpc_secret_key, "user cache miss"); // trace!(?rpc_secret_key, "user cache miss");
let db_replica = self let db_replica = self
@ -1231,7 +1233,7 @@ impl Web3ProxyApp {
// TODO: join on secondary users // TODO: join on secondary users
// TODO: join on user tier // TODO: join on user tier
match rpc_key::Entity::find() match rpc_key::Entity::find()
.filter(rpc_key::Column::SecretKey.eq(<Uuid>::from(rpc_secret_key))) .filter(rpc_key::Column::SecretKey.eq(<Uuid>::from(*rpc_secret_key)))
.filter(rpc_key::Column::Active.eq(true)) .filter(rpc_key::Column::Active.eq(true))
.one(db_replica.as_ref()) .one(db_replica.as_ref())
.await? .await?
@ -1354,7 +1356,7 @@ impl Web3ProxyApp {
max_requests_per_period: user_tier_model.max_requests_per_period, max_requests_per_period: user_tier_model.max_requests_per_period,
private_txs: rpc_key_model.private_txs, private_txs: rpc_key_model.private_txs,
proxy_mode, proxy_mode,
rpc_secret_key: Some(rpc_secret_key), rpc_secret_key: Some(*rpc_secret_key),
rpc_secret_key_id: rpc_key_id, rpc_secret_key_id: rpc_key_id,
user_id: rpc_key_model.user_id, user_id: rpc_key_model.user_id,
}) })
@ -1369,12 +1371,12 @@ impl Web3ProxyApp {
/// Authorized the ip/origin/referer/useragent and rate limit and concurrency /// Authorized the ip/origin/referer/useragent and rate limit and concurrency
pub async fn rate_limit_by_rpc_key( pub async fn rate_limit_by_rpc_key(
&self, &self,
ip: IpAddr, ip: &IpAddr,
origin: Option<Origin>, origin: Option<&Origin>,
proxy_mode: ProxyMode, proxy_mode: ProxyMode,
referer: Option<Referer>, referer: Option<&Referer>,
rpc_key: RpcSecretKey, rpc_key: &RpcSecretKey,
user_agent: Option<UserAgent>, user_agent: Option<&UserAgent>,
) -> Web3ProxyResult<RateLimitResult> { ) -> Web3ProxyResult<RateLimitResult> {
let authorization_checks = self.authorization_checks(proxy_mode, rpc_key).await?; let authorization_checks = self.authorization_checks(proxy_mode, rpc_key).await?;
@ -1455,19 +1457,19 @@ impl Authorization {
app: &Arc<Web3ProxyApp>, app: &Arc<Web3ProxyApp>,
) -> Web3ProxyResult<(Arc<Self>, Option<OwnedSemaphorePermit>)> { ) -> Web3ProxyResult<(Arc<Self>, Option<OwnedSemaphorePermit>)> {
// TODO: we could probably do this without clones. but this is easy // TODO: we could probably do this without clones. but this is easy
let (a, s) = if let Some(rpc_secret_key) = self.checks.rpc_secret_key { let (a, s) = if let Some(ref rpc_secret_key) = self.checks.rpc_secret_key {
key_is_authorized( key_is_authorized(
app, app,
rpc_secret_key, rpc_secret_key,
self.ip, &self.ip,
self.origin.clone(), self.origin.as_ref(),
self.checks.proxy_mode, self.checks.proxy_mode,
self.referer.clone(), self.referer.as_ref(),
self.user_agent.clone(), self.user_agent.as_ref(),
) )
.await? .await?
} else { } else {
ip_is_authorized(app, self.ip, self.origin.clone(), self.checks.proxy_mode).await? ip_is_authorized(app, &self.ip, self.origin.as_ref(), self.checks.proxy_mode).await?
}; };
let a = Arc::new(a); let a = Arc::new(a);

View File

@ -11,7 +11,9 @@ use axum::TypedHeader;
use axum::{response::IntoResponse, Extension, Json}; use axum::{response::IntoResponse, Extension, Json};
use axum_client_ip::InsecureClientIp; use axum_client_ip::InsecureClientIp;
use axum_macros::debug_handler; use axum_macros::debug_handler;
use http::HeaderMap;
use itertools::Itertools; use itertools::Itertools;
use std::net::IpAddr;
use std::sync::Arc; use std::sync::Arc;
/// POST /rpc -- Public entrypoint for HTTP JSON-RPC requests. Web3 wallets use this. /// POST /rpc -- Public entrypoint for HTTP JSON-RPC requests. Web3 wallets use this.
@ -20,46 +22,42 @@ use std::sync::Arc;
#[debug_handler] #[debug_handler]
pub async fn proxy_web3_rpc( pub async fn proxy_web3_rpc(
Extension(app): Extension<Arc<Web3ProxyApp>>, Extension(app): Extension<Arc<Web3ProxyApp>>,
ip: InsecureClientIp, InsecureClientIp(ip): InsecureClientIp,
origin: Option<TypedHeader<Origin>>, origin: Option<TypedHeader<Origin>>,
Json(payload): Json<JsonRpcRequestEnum>, Json(payload): Json<JsonRpcRequestEnum>,
) -> Result<Response, Response> { ) -> Result<Response, Response> {
_proxy_web3_rpc(app, ip, origin, payload, ProxyMode::Best).await _proxy_web3_rpc(app, &ip, origin.as_deref(), payload, ProxyMode::Best).await
} }
#[debug_handler] #[debug_handler]
pub async fn fastest_proxy_web3_rpc( pub async fn fastest_proxy_web3_rpc(
Extension(app): Extension<Arc<Web3ProxyApp>>, Extension(app): Extension<Arc<Web3ProxyApp>>,
ip: InsecureClientIp, InsecureClientIp(ip): InsecureClientIp,
origin: Option<TypedHeader<Origin>>, origin: Option<TypedHeader<Origin>>,
Json(payload): Json<JsonRpcRequestEnum>, Json(payload): Json<JsonRpcRequestEnum>,
) -> Result<Response, Response> { ) -> Result<Response, Response> {
// TODO: read the fastest number from params // TODO: read the fastest number from params
// TODO: check that the app allows this without authentication // TODO: check that the app allows this without authentication
_proxy_web3_rpc(app, ip, origin, payload, ProxyMode::Fastest(0)).await _proxy_web3_rpc(app, &ip, origin.as_deref(), payload, ProxyMode::Fastest(0)).await
} }
#[debug_handler] #[debug_handler]
pub async fn versus_proxy_web3_rpc( pub async fn versus_proxy_web3_rpc(
Extension(app): Extension<Arc<Web3ProxyApp>>, Extension(app): Extension<Arc<Web3ProxyApp>>,
ip: InsecureClientIp, InsecureClientIp(ip): InsecureClientIp,
origin: Option<TypedHeader<Origin>>, origin: Option<TypedHeader<Origin>>,
Json(payload): Json<JsonRpcRequestEnum>, Json(payload): Json<JsonRpcRequestEnum>,
) -> Result<Response, Response> { ) -> Result<Response, Response> {
_proxy_web3_rpc(app, ip, origin, payload, ProxyMode::Versus).await _proxy_web3_rpc(app, &ip, origin.as_deref(), payload, ProxyMode::Versus).await
} }
async fn _proxy_web3_rpc( async fn _proxy_web3_rpc(
app: Arc<Web3ProxyApp>, app: Arc<Web3ProxyApp>,
InsecureClientIp(ip): InsecureClientIp, ip: &IpAddr,
origin: Option<TypedHeader<Origin>>, origin: Option<&Origin>,
payload: JsonRpcRequestEnum, payload: JsonRpcRequestEnum,
proxy_mode: ProxyMode, proxy_mode: ProxyMode,
) -> Result<Response, Response> { ) -> Result<Response, Response> {
// TODO: benchmark spawning this
// TODO: do we care about keeping the TypedHeader wrapper?
let origin = origin.map(|x| x.0);
let first_id = payload.first_id(); let first_id = payload.first_id();
let (authorization, _semaphore) = ip_is_authorized(&app, ip, origin, proxy_mode) let (authorization, _semaphore) = ip_is_authorized(&app, ip, origin, proxy_mode)
@ -78,7 +76,8 @@ async fn _proxy_web3_rpc(
let mut response = (status_code, Json(response)).into_response(); let mut response = (status_code, Json(response)).into_response();
let headers = response.headers_mut(); // TODO: DRY this up. same for public and private queries
let response_headers = response.headers_mut();
// TODO: this might be slow. think about this more // TODO: this might be slow. think about this more
// TODO: special string if no rpcs were used (cache hit)? // TODO: special string if no rpcs were used (cache hit)?
@ -94,12 +93,12 @@ async fn _proxy_web3_rpc(
}) })
.join(","); .join(",");
headers.insert( response_headers.insert(
"X-W3P-BACKEND-RPCS", "X-W3P-BACKEND-RPCS",
rpcs.parse().expect("W3P-BACKEND-RPCS should always parse"), rpcs.parse().expect("W3P-BACKEND-RPCS should always parse"),
); );
headers.insert( response_headers.insert(
"X-W3P-BACKUP-RPC", "X-W3P-BACKUP-RPC",
backup_used backup_used
.to_string() .to_string()
@ -117,7 +116,7 @@ async fn _proxy_web3_rpc(
#[debug_handler] #[debug_handler]
pub async fn proxy_web3_rpc_with_key( pub async fn proxy_web3_rpc_with_key(
Extension(app): Extension<Arc<Web3ProxyApp>>, Extension(app): Extension<Arc<Web3ProxyApp>>,
ip: InsecureClientIp, InsecureClientIp(ip): InsecureClientIp,
origin: Option<TypedHeader<Origin>>, origin: Option<TypedHeader<Origin>>,
referer: Option<TypedHeader<Referer>>, referer: Option<TypedHeader<Referer>>,
user_agent: Option<TypedHeader<UserAgent>>, user_agent: Option<TypedHeader<UserAgent>>,
@ -126,10 +125,10 @@ pub async fn proxy_web3_rpc_with_key(
) -> Result<Response, Response> { ) -> Result<Response, Response> {
_proxy_web3_rpc_with_key( _proxy_web3_rpc_with_key(
app, app,
ip, &ip,
origin, origin.as_deref(),
referer, referer.as_deref(),
user_agent, user_agent.as_deref(),
rpc_key, rpc_key,
payload, payload,
ProxyMode::Best, ProxyMode::Best,
@ -138,34 +137,54 @@ pub async fn proxy_web3_rpc_with_key(
} }
// TODO: if a /debug/ request gets rejected by an invalid request, there won't be any kafka log // TODO: if a /debug/ request gets rejected by an invalid request, there won't be any kafka log
// TODO:
#[debug_handler] #[debug_handler]
#[allow(clippy::too_many_arguments)]
pub async fn debug_proxy_web3_rpc_with_key( pub async fn debug_proxy_web3_rpc_with_key(
Extension(app): Extension<Arc<Web3ProxyApp>>, Extension(app): Extension<Arc<Web3ProxyApp>>,
ip: InsecureClientIp, InsecureClientIp(ip): InsecureClientIp,
origin: Option<TypedHeader<Origin>>, origin: Option<TypedHeader<Origin>>,
referer: Option<TypedHeader<Referer>>, referer: Option<TypedHeader<Referer>>,
user_agent: Option<TypedHeader<UserAgent>>, user_agent: Option<TypedHeader<UserAgent>>,
request_headers: HeaderMap,
Path(rpc_key): Path<String>, Path(rpc_key): Path<String>,
Json(payload): Json<JsonRpcRequestEnum>, Json(payload): Json<JsonRpcRequestEnum>,
) -> Result<Response, Response> { ) -> Result<Response, Response> {
_proxy_web3_rpc_with_key( let mut response = match _proxy_web3_rpc_with_key(
app, app,
ip, &ip,
origin, origin.as_deref(),
referer, referer.as_deref(),
user_agent, user_agent.as_deref(),
rpc_key, rpc_key,
payload, payload,
ProxyMode::Debug, ProxyMode::Debug,
) )
.await .await
{
Ok(r) => r,
Err(r) => r,
};
// add some headers that might be useful while debugging
let response_headers = response.headers_mut();
if let Some(x) = request_headers.get("x-amzn-trace-id").cloned() {
response_headers.insert("x-amzn-trace-id", x);
}
if let Some(x) = request_headers.get("x-balance-id").cloned() {
response_headers.insert("x-balance-id", x);
}
response_headers.insert("client-ip", ip.to_string().parse().unwrap());
Ok(response)
} }
#[debug_handler] #[debug_handler]
pub async fn fastest_proxy_web3_rpc_with_key( pub async fn fastest_proxy_web3_rpc_with_key(
Extension(app): Extension<Arc<Web3ProxyApp>>, Extension(app): Extension<Arc<Web3ProxyApp>>,
ip: InsecureClientIp, InsecureClientIp(ip): InsecureClientIp,
origin: Option<TypedHeader<Origin>>, origin: Option<TypedHeader<Origin>>,
referer: Option<TypedHeader<Referer>>, referer: Option<TypedHeader<Referer>>,
user_agent: Option<TypedHeader<UserAgent>>, user_agent: Option<TypedHeader<UserAgent>>,
@ -174,10 +193,10 @@ pub async fn fastest_proxy_web3_rpc_with_key(
) -> Result<Response, Response> { ) -> Result<Response, Response> {
_proxy_web3_rpc_with_key( _proxy_web3_rpc_with_key(
app, app,
ip, &ip,
origin, origin.as_deref(),
referer, referer.as_deref(),
user_agent, user_agent.as_deref(),
rpc_key, rpc_key,
payload, payload,
ProxyMode::Fastest(0), ProxyMode::Fastest(0),
@ -188,7 +207,7 @@ pub async fn fastest_proxy_web3_rpc_with_key(
#[debug_handler] #[debug_handler]
pub async fn versus_proxy_web3_rpc_with_key( pub async fn versus_proxy_web3_rpc_with_key(
Extension(app): Extension<Arc<Web3ProxyApp>>, Extension(app): Extension<Arc<Web3ProxyApp>>,
ip: InsecureClientIp, InsecureClientIp(ip): InsecureClientIp,
origin: Option<TypedHeader<Origin>>, origin: Option<TypedHeader<Origin>>,
referer: Option<TypedHeader<Referer>>, referer: Option<TypedHeader<Referer>>,
user_agent: Option<TypedHeader<UserAgent>>, user_agent: Option<TypedHeader<UserAgent>>,
@ -197,10 +216,10 @@ pub async fn versus_proxy_web3_rpc_with_key(
) -> Result<Response, Response> { ) -> Result<Response, Response> {
_proxy_web3_rpc_with_key( _proxy_web3_rpc_with_key(
app, app,
ip, &ip,
origin, origin.as_deref(),
referer, referer.as_deref(),
user_agent, user_agent.as_deref(),
rpc_key, rpc_key,
payload, payload,
ProxyMode::Versus, ProxyMode::Versus,
@ -211,10 +230,10 @@ pub async fn versus_proxy_web3_rpc_with_key(
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
async fn _proxy_web3_rpc_with_key( async fn _proxy_web3_rpc_with_key(
app: Arc<Web3ProxyApp>, app: Arc<Web3ProxyApp>,
InsecureClientIp(ip): InsecureClientIp, ip: &IpAddr,
origin: Option<TypedHeader<Origin>>, origin: Option<&Origin>,
referer: Option<TypedHeader<Referer>>, referer: Option<&Referer>,
user_agent: Option<TypedHeader<UserAgent>>, user_agent: Option<&UserAgent>,
rpc_key: String, rpc_key: String,
payload: JsonRpcRequestEnum, payload: JsonRpcRequestEnum,
proxy_mode: ProxyMode, proxy_mode: ProxyMode,
@ -227,17 +246,10 @@ async fn _proxy_web3_rpc_with_key(
.parse() .parse()
.map_err(|e: Web3ProxyError| e.into_response_with_id(first_id.clone()))?; .map_err(|e: Web3ProxyError| e.into_response_with_id(first_id.clone()))?;
let (authorization, _semaphore) = key_is_authorized( let (authorization, _semaphore) =
&app, key_is_authorized(&app, &rpc_key, ip, origin, proxy_mode, referer, user_agent)
rpc_key, .await
ip, .map_err(|e| e.into_response_with_id(first_id.clone()))?;
origin.map(|x| x.0),
proxy_mode,
referer.map(|x| x.0),
user_agent.map(|x| x.0),
)
.await
.map_err(|e| e.into_response_with_id(first_id.clone()))?;
let authorization = Arc::new(authorization); let authorization = Arc::new(authorization);

View File

@ -28,9 +28,10 @@ use futures::{
}; };
use handlebars::Handlebars; use handlebars::Handlebars;
use hashbrown::HashMap; use hashbrown::HashMap;
use http::StatusCode; use http::{HeaderMap, StatusCode};
use log::{info, trace}; use log::{info, trace};
use serde_json::json; use serde_json::json;
use std::net::IpAddr;
use std::str::from_utf8_mut; use std::str::from_utf8_mut;
use std::sync::atomic::AtomicU64; use std::sync::atomic::AtomicU64;
use std::sync::Arc; use std::sync::Arc;
@ -61,11 +62,11 @@ impl Default for ProxyMode {
#[debug_handler] #[debug_handler]
pub async fn websocket_handler( pub async fn websocket_handler(
Extension(app): Extension<Arc<Web3ProxyApp>>, Extension(app): Extension<Arc<Web3ProxyApp>>,
ip: InsecureClientIp, InsecureClientIp(ip): InsecureClientIp,
origin: Option<TypedHeader<Origin>>, origin: Option<TypedHeader<Origin>>,
ws_upgrade: Option<WebSocketUpgrade>, ws_upgrade: Option<WebSocketUpgrade>,
) -> Web3ProxyResponse { ) -> Web3ProxyResponse {
_websocket_handler(ProxyMode::Best, app, ip, origin, ws_upgrade).await _websocket_handler(ProxyMode::Best, app, &ip, origin.as_deref(), ws_upgrade).await
} }
/// Public entrypoint for WebSocket JSON-RPC requests that uses all synced servers. /// Public entrypoint for WebSocket JSON-RPC requests that uses all synced servers.
@ -73,13 +74,20 @@ pub async fn websocket_handler(
#[debug_handler] #[debug_handler]
pub async fn fastest_websocket_handler( pub async fn fastest_websocket_handler(
Extension(app): Extension<Arc<Web3ProxyApp>>, Extension(app): Extension<Arc<Web3ProxyApp>>,
ip: InsecureClientIp, InsecureClientIp(ip): InsecureClientIp,
origin: Option<TypedHeader<Origin>>, origin: Option<TypedHeader<Origin>>,
ws_upgrade: Option<WebSocketUpgrade>, ws_upgrade: Option<WebSocketUpgrade>,
) -> Web3ProxyResponse { ) -> Web3ProxyResponse {
// TODO: get the fastest number from the url params (default to 0/all) // TODO: get the fastest number from the url params (default to 0/all)
// TODO: config to disable this // TODO: config to disable this
_websocket_handler(ProxyMode::Fastest(0), app, ip, origin, ws_upgrade).await _websocket_handler(
ProxyMode::Fastest(0),
app,
&ip,
origin.as_deref(),
ws_upgrade,
)
.await
} }
/// Public entrypoint for WebSocket JSON-RPC requests that uses all synced servers. /// Public entrypoint for WebSocket JSON-RPC requests that uses all synced servers.
@ -87,23 +95,21 @@ pub async fn fastest_websocket_handler(
#[debug_handler] #[debug_handler]
pub async fn versus_websocket_handler( pub async fn versus_websocket_handler(
Extension(app): Extension<Arc<Web3ProxyApp>>, Extension(app): Extension<Arc<Web3ProxyApp>>,
ip: InsecureClientIp, InsecureClientIp(ip): InsecureClientIp,
origin: Option<TypedHeader<Origin>>, origin: Option<TypedHeader<Origin>>,
ws_upgrade: Option<WebSocketUpgrade>, ws_upgrade: Option<WebSocketUpgrade>,
) -> Web3ProxyResponse { ) -> Web3ProxyResponse {
// TODO: config to disable this // TODO: config to disable this
_websocket_handler(ProxyMode::Versus, app, ip, origin, ws_upgrade).await _websocket_handler(ProxyMode::Versus, app, &ip, origin.as_deref(), ws_upgrade).await
} }
async fn _websocket_handler( async fn _websocket_handler(
proxy_mode: ProxyMode, proxy_mode: ProxyMode,
app: Arc<Web3ProxyApp>, app: Arc<Web3ProxyApp>,
InsecureClientIp(ip): InsecureClientIp, ip: &IpAddr,
origin: Option<TypedHeader<Origin>>, origin: Option<&Origin>,
ws_upgrade: Option<WebSocketUpgrade>, ws_upgrade: Option<WebSocketUpgrade>,
) -> Web3ProxyResponse { ) -> Web3ProxyResponse {
let origin = origin.map(|x| x.0);
let (authorization, _semaphore) = ip_is_authorized(&app, ip, origin, proxy_mode).await?; let (authorization, _semaphore) = ip_is_authorized(&app, ip, origin, proxy_mode).await?;
let authorization = Arc::new(authorization); let authorization = Arc::new(authorization);
@ -129,7 +135,7 @@ async fn _websocket_handler(
#[debug_handler] #[debug_handler]
pub async fn websocket_handler_with_key( pub async fn websocket_handler_with_key(
Extension(app): Extension<Arc<Web3ProxyApp>>, Extension(app): Extension<Arc<Web3ProxyApp>>,
ip: InsecureClientIp, InsecureClientIp(ip): InsecureClientIp,
Path(rpc_key): Path<String>, Path(rpc_key): Path<String>,
origin: Option<TypedHeader<Origin>>, origin: Option<TypedHeader<Origin>>,
referer: Option<TypedHeader<Referer>>, referer: Option<TypedHeader<Referer>>,
@ -139,43 +145,60 @@ pub async fn websocket_handler_with_key(
_websocket_handler_with_key( _websocket_handler_with_key(
ProxyMode::Best, ProxyMode::Best,
app, app,
ip, &ip,
rpc_key, rpc_key,
origin, origin.as_deref(),
referer, referer.as_deref(),
user_agent, user_agent.as_deref(),
ws_upgrade, ws_upgrade,
) )
.await .await
} }
#[debug_handler] #[debug_handler]
#[allow(clippy::too_many_arguments)]
pub async fn debug_websocket_handler_with_key( pub async fn debug_websocket_handler_with_key(
Extension(app): Extension<Arc<Web3ProxyApp>>, Extension(app): Extension<Arc<Web3ProxyApp>>,
ip: InsecureClientIp, InsecureClientIp(ip): InsecureClientIp,
Path(rpc_key): Path<String>, Path(rpc_key): Path<String>,
origin: Option<TypedHeader<Origin>>, origin: Option<TypedHeader<Origin>>,
referer: Option<TypedHeader<Referer>>, referer: Option<TypedHeader<Referer>>,
user_agent: Option<TypedHeader<UserAgent>>, user_agent: Option<TypedHeader<UserAgent>>,
headers: HeaderMap,
ws_upgrade: Option<WebSocketUpgrade>, ws_upgrade: Option<WebSocketUpgrade>,
) -> Web3ProxyResponse { ) -> Web3ProxyResponse {
_websocket_handler_with_key( let mut response = _websocket_handler_with_key(
ProxyMode::Debug, ProxyMode::Debug,
app, app,
ip, &ip,
rpc_key, rpc_key,
origin, origin.as_deref(),
referer, referer.as_deref(),
user_agent, user_agent.as_deref(),
ws_upgrade, ws_upgrade,
) )
.await .await?;
// add some headers that might be useful while debugging
let response_headers = response.headers_mut();
if let Some(x) = headers.get("x-amzn-trace-id").cloned() {
response_headers.insert("x-amzn-trace-id", x);
}
if let Some(x) = headers.get("x-balance-id").cloned() {
response_headers.insert("x-balance-id", x);
}
response_headers.insert("client-ip", ip.to_string().parse().unwrap());
Ok(response)
} }
#[debug_handler] #[debug_handler]
pub async fn fastest_websocket_handler_with_key( pub async fn fastest_websocket_handler_with_key(
Extension(app): Extension<Arc<Web3ProxyApp>>, Extension(app): Extension<Arc<Web3ProxyApp>>,
ip: InsecureClientIp, InsecureClientIp(ip): InsecureClientIp,
Path(rpc_key): Path<String>, Path(rpc_key): Path<String>,
origin: Option<TypedHeader<Origin>>, origin: Option<TypedHeader<Origin>>,
referer: Option<TypedHeader<Referer>>, referer: Option<TypedHeader<Referer>>,
@ -186,11 +209,11 @@ pub async fn fastest_websocket_handler_with_key(
_websocket_handler_with_key( _websocket_handler_with_key(
ProxyMode::Fastest(0), ProxyMode::Fastest(0),
app, app,
ip, &ip,
rpc_key, rpc_key,
origin, origin.as_deref(),
referer, referer.as_deref(),
user_agent, user_agent.as_deref(),
ws_upgrade, ws_upgrade,
) )
.await .await
@ -199,7 +222,7 @@ pub async fn fastest_websocket_handler_with_key(
#[debug_handler] #[debug_handler]
pub async fn versus_websocket_handler_with_key( pub async fn versus_websocket_handler_with_key(
Extension(app): Extension<Arc<Web3ProxyApp>>, Extension(app): Extension<Arc<Web3ProxyApp>>,
ip: InsecureClientIp, InsecureClientIp(ip): InsecureClientIp,
Path(rpc_key): Path<String>, Path(rpc_key): Path<String>,
origin: Option<TypedHeader<Origin>>, origin: Option<TypedHeader<Origin>>,
referer: Option<TypedHeader<Referer>>, referer: Option<TypedHeader<Referer>>,
@ -209,11 +232,11 @@ pub async fn versus_websocket_handler_with_key(
_websocket_handler_with_key( _websocket_handler_with_key(
ProxyMode::Versus, ProxyMode::Versus,
app, app,
ip, &ip,
rpc_key, rpc_key,
origin, origin.as_deref(),
referer, referer.as_deref(),
user_agent, user_agent.as_deref(),
ws_upgrade, ws_upgrade,
) )
.await .await
@ -223,25 +246,17 @@ pub async fn versus_websocket_handler_with_key(
async fn _websocket_handler_with_key( async fn _websocket_handler_with_key(
proxy_mode: ProxyMode, proxy_mode: ProxyMode,
app: Arc<Web3ProxyApp>, app: Arc<Web3ProxyApp>,
InsecureClientIp(ip): InsecureClientIp, ip: &IpAddr,
rpc_key: String, rpc_key: String,
origin: Option<TypedHeader<Origin>>, origin: Option<&Origin>,
referer: Option<TypedHeader<Referer>>, referer: Option<&Referer>,
user_agent: Option<TypedHeader<UserAgent>>, user_agent: Option<&UserAgent>,
ws_upgrade: Option<WebSocketUpgrade>, ws_upgrade: Option<WebSocketUpgrade>,
) -> Web3ProxyResponse { ) -> Web3ProxyResponse {
let rpc_key = rpc_key.parse()?; let rpc_key = rpc_key.parse()?;
let (authorization, _semaphore) = key_is_authorized( let (authorization, _semaphore) =
&app, key_is_authorized(&app, &rpc_key, ip, origin, proxy_mode, referer, user_agent).await?;
rpc_key,
ip,
origin.map(|x| x.0),
proxy_mode,
referer.map(|x| x.0),
user_agent.map(|x| x.0),
)
.await?;
trace!("websocket_handler_with_key {:?}", authorization); trace!("websocket_handler_with_key {:?}", authorization);