From b16aa8d813d0adbbf5e399d712c5c7e8a8aaa462 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Sun, 21 Aug 2022 09:39:38 +0000 Subject: [PATCH] dry rate_limit_by_x --- web3_proxy/src/frontend/errors.rs | 4 ++ web3_proxy/src/frontend/http_proxy.rs | 42 +++++--------- web3_proxy/src/frontend/rate_limit.rs | 80 ++++++++++++++++++++++++--- web3_proxy/src/frontend/users.rs | 31 +++-------- web3_proxy/src/frontend/ws_proxy.rs | 41 +++++--------- 5 files changed, 111 insertions(+), 87 deletions(-) diff --git a/web3_proxy/src/frontend/errors.rs b/web3_proxy/src/frontend/errors.rs index 022aa5ff..b75df8bd 100644 --- a/web3_proxy/src/frontend/errors.rs +++ b/web3_proxy/src/frontend/errors.rs @@ -19,6 +19,7 @@ pub enum FrontendErrorResponse { // TODO: should we box these instead? Redis(RedisError), RedisRunError(RunError), + Response(Response), } impl IntoResponse for FrontendErrorResponse { @@ -31,6 +32,9 @@ impl IntoResponse for FrontendErrorResponse { Self::Box(err) => anyhow::anyhow!("Boxed error: {:?}", err), Self::Redis(err) => err.into(), Self::RedisRunError(err) => err.into(), + Self::Response(r) => { + return r; + } }; let err = JsonRpcForwardedResponse::from_anyhow_error(err, null_id); diff --git a/web3_proxy/src/frontend/http_proxy.rs b/web3_proxy/src/frontend/http_proxy.rs index bffc5441..42e708b7 100644 --- a/web3_proxy/src/frontend/http_proxy.rs +++ b/web3_proxy/src/frontend/http_proxy.rs @@ -1,11 +1,12 @@ -use super::errors::anyhow_error_into_response; -use super::rate_limit::RateLimitResult; +use super::errors::{anyhow_error_into_response, FrontendResult}; +use super::rate_limit::{rate_limit_by_ip, rate_limit_by_user_key, RateLimitResult}; use crate::stats::{Protocol, ProxyRequestLabels}; use crate::{app::Web3ProxyApp, jsonrpc::JsonRpcRequestEnum}; use axum::extract::Path; use axum::response::Response; use axum::{http::StatusCode, response::IntoResponse, Extension, Json}; use axum_client_ip::ClientIp; +use std::net::IpAddr; use std::sync::Arc; use tracing::{error_span, Instrument}; use uuid::Uuid; @@ -14,16 +15,8 @@ pub async fn public_proxy_web3_rpc( Json(payload): Json, Extension(app): Extension>, ClientIp(ip): ClientIp, -) -> Response { - // TODO: dry this up a lot - let _ip = match app.rate_limit_by_ip(ip).await { - Ok(x) => match x.try_into_response().await { - Ok(RateLimitResult::AllowedIp(x)) => x, - Err(err_response) => return err_response, - _ => unimplemented!(), - }, - Err(err) => return anyhow_error_into_response(None, None, err), - }; +) -> FrontendResult { + let _ip: IpAddr = rate_limit_by_ip(&app, ip).await?.try_into()?; let protocol = Protocol::HTTP; let user_id = 0; @@ -65,32 +58,23 @@ pub async fn public_proxy_web3_rpc( }; */ - match app.proxy_web3_rpc(payload).instrument(user_span).await { - Ok(response) => (StatusCode::OK, Json(&response)).into_response(), - Err(err) => anyhow_error_into_response(None, None, err), - } + let response = app.proxy_web3_rpc(payload).instrument(user_span).await?; + + Ok((StatusCode::OK, Json(&response)).into_response()) } pub async fn user_proxy_web3_rpc( Json(payload): Json, Extension(app): Extension>, Path(user_key): Path, -) -> Response { - let user_id = match app.rate_limit_by_key(user_key).await { - Ok(x) => match x.try_into_response().await { - Ok(RateLimitResult::AllowedUser(x)) => x, - Err(err_response) => return err_response, - _ => unimplemented!(), - }, - Err(err) => return anyhow_error_into_response(None, None, err), - }; +) -> FrontendResult { + let user_id: u64 = rate_limit_by_user_key(&app, user_key).await?.try_into()?; let protocol = Protocol::HTTP; let user_span = error_span!("user", user_id, ?protocol); - match app.proxy_web3_rpc(payload).instrument(user_span).await { - Ok(response) => (StatusCode::OK, Json(&response)).into_response(), - Err(err) => anyhow_error_into_response(None, None, err), - } + let response = app.proxy_web3_rpc(payload).instrument(user_span).await?; + + Ok((StatusCode::OK, Json(&response)).into_response()) } diff --git a/web3_proxy/src/frontend/rate_limit.rs b/web3_proxy/src/frontend/rate_limit.rs index 2cecbbc0..1dc4aff4 100644 --- a/web3_proxy/src/frontend/rate_limit.rs +++ b/web3_proxy/src/frontend/rate_limit.rs @@ -1,6 +1,7 @@ -use super::errors::anyhow_error_into_response; +use super::errors::{anyhow_error_into_response, FrontendErrorResponse, FrontendResult}; use crate::app::{UserCacheValue, Web3ProxyApp}; use axum::response::Response; +use derive_more::{From, TryInto}; use entities::user_keys; use redis_rate_limit::ThrottleResult; use reqwest::StatusCode; @@ -20,12 +21,77 @@ pub enum RateLimitResult { UnknownKey, } -impl RateLimitResult { - // TODO: i think this should be a function on RateLimitResult - pub async fn try_into_response(self) -> Result { - match self { - RateLimitResult::AllowedIp(_) => Ok(self), - RateLimitResult::AllowedUser(_) => Ok(self), +#[derive(From)] +pub enum RequestFrom { + Ip(IpAddr), + // TODO: fetch the actual user? + User(u64), +} + +pub type RateLimitFrontendResult = Result; + +impl TryFrom for IpAddr { + type Error = anyhow::Error; + + fn try_from(value: RequestFrom) -> Result { + match value { + RequestFrom::Ip(x) => Ok(x), + _ => Err(anyhow::anyhow!("not an ip")), + } + } +} + +impl TryFrom for u64 { + type Error = anyhow::Error; + + fn try_from(value: RequestFrom) -> Result { + match value { + RequestFrom::User(x) => Ok(x), + _ => Err(anyhow::anyhow!("not a user")), + } + } +} + +pub async fn rate_limit_by_ip(app: &Web3ProxyApp, ip: IpAddr) -> RateLimitFrontendResult { + let rate_limit_result = app.rate_limit_by_ip(ip).await?; + + match rate_limit_result { + RateLimitResult::AllowedIp(x) => Ok(x.into()), + RateLimitResult::AllowedUser(_) => panic!("only ips or errors are expected here"), + rate_limit_result => { + let _: RequestFrom = rate_limit_result.try_into()?; + + panic!("try_into should have failed") + } + } +} + +pub async fn rate_limit_by_user_key( + app: &Web3ProxyApp, + // TODO: change this to a Ulid + user_key: Uuid, +) -> RateLimitFrontendResult { + let rate_limit_result = app.rate_limit_by_key(user_key).await?.into(); + + match rate_limit_result { + RateLimitResult::AllowedIp(x) => panic!("only user keys or errors are expected here"), + RateLimitResult::AllowedUser(x) => Ok(x.into()), + rate_limit_result => { + let _: RequestFrom = rate_limit_result.try_into()?; + + panic!("try_into should have failed") + } + } +} + +impl TryFrom for RequestFrom { + // TODO: return an error that has its own IntoResponse? + type Error = Response; + + fn try_from(value: RateLimitResult) -> Result { + match value { + RateLimitResult::AllowedIp(x) => Ok(RequestFrom::Ip(x)), + RateLimitResult::AllowedUser(x) => Ok(RequestFrom::User(x)), RateLimitResult::IpRateLimitExceeded(ip) => Err(anyhow_error_into_response( Some(StatusCode::TOO_MANY_REQUESTS), None, diff --git a/web3_proxy/src/frontend/users.rs b/web3_proxy/src/frontend/users.rs index 62f7f256..6ee06867 100644 --- a/web3_proxy/src/frontend/users.rs +++ b/web3_proxy/src/frontend/users.rs @@ -7,6 +7,7 @@ // I wonder how we handle payment // probably have to do manual withdrawals +use super::rate_limit::rate_limit_by_ip; use super::{ errors::{anyhow_error_into_response, FrontendResult}, rate_limit::RateLimitResult, @@ -27,8 +28,8 @@ use reqwest::StatusCode; use sea_orm::ActiveModelTrait; use serde::Deserialize; use siwe::Message; -use std::ops::Add; use std::sync::Arc; +use std::{net::IpAddr, ops::Add}; use time::{Duration, OffsetDateTime}; use ulid::Ulid; @@ -44,15 +45,7 @@ pub async fn get_login( // TODO: allow ENS names here? Path(mut params): Path>, ) -> FrontendResult { - // TODO: refactor this to use the try operator - let _ip = match app.rate_limit_by_ip(ip).await { - Ok(x) => match x.try_into_response().await { - Ok(RateLimitResult::AllowedIp(x)) => x, - Err(err_response) => return Ok(err_response), - _ => unimplemented!(), - }, - Err(err) => return Ok(anyhow_error_into_response(None, None, err)), - }; + let _ip: IpAddr = rate_limit_by_ip(&app, ip).await?.try_into()?; // at first i thought about checking that user_address is in our db // but theres no need to separate the registration and login flows @@ -135,8 +128,9 @@ pub struct PostLogin { address: Address, msg: String, sig: Bytes, - version: String, - signer: String, + // TODO: do we care about these? we should probably check the version is something we expect + // version: String, + // signer: String, } #[debug_handler] @@ -146,17 +140,8 @@ pub async fn post_login( Extension(app): Extension>, Json(payload): Json, Query(query): Query, -) -> Response { - // TODO: return a Result instead - // TODO: dry this up ip checking up - let _ip = match app.rate_limit_by_ip(ip).await { - Ok(x) => match x.try_into_response().await { - Ok(RateLimitResult::AllowedIp(x)) => x, - Err(err_response) => return err_response, - _ => unimplemented!(), - }, - Err(err) => return anyhow_error_into_response(None, None, err), - }; +) -> FrontendResult { + let _ip: IpAddr = rate_limit_by_ip(&app, ip).await?.try_into()?; let mut new_user = true; // TODO: check the database diff --git a/web3_proxy/src/frontend/ws_proxy.rs b/web3_proxy/src/frontend/ws_proxy.rs index 51db5acc..f1f72e2c 100644 --- a/web3_proxy/src/frontend/ws_proxy.rs +++ b/web3_proxy/src/frontend/ws_proxy.rs @@ -1,3 +1,5 @@ +use super::errors::FrontendResult; +use super::rate_limit::{rate_limit_by_ip, rate_limit_by_user_key}; use axum::{ extract::ws::{Message, WebSocket, WebSocketUpgrade}, extract::Path, @@ -14,7 +16,7 @@ use futures::{ use handlebars::Handlebars; use hashbrown::HashMap; use serde_json::{json, value::RawValue}; -use std::sync::Arc; +use std::{net::IpAddr, sync::Arc}; use std::{str::from_utf8_mut, sync::atomic::AtomicUsize}; use tracing::{error, error_span, info, trace, Instrument}; use uuid::Uuid; @@ -25,22 +27,13 @@ use crate::{ stats::Protocol, }; -use super::{errors::anyhow_error_into_response, rate_limit::RateLimitResult}; - #[debug_handler] pub async fn public_websocket_handler( Extension(app): Extension>, ClientIp(ip): ClientIp, ws_upgrade: Option, -) -> Response { - let _ip = match app.rate_limit_by_ip(ip).await { - Ok(x) => match x.try_into_response().await { - Ok(RateLimitResult::AllowedIp(x)) => x, - Err(err_response) => return err_response, - _ => unimplemented!(), - }, - Err(err) => return anyhow_error_into_response(None, None, err).into_response(), - }; +) -> FrontendResult { + let _ip: IpAddr = rate_limit_by_ip(&app, ip).await?.try_into()?; let user_id = 0; let protocol = Protocol::Websocket; @@ -48,12 +41,12 @@ pub async fn public_websocket_handler( let user_span = error_span!("user", user_id, ?protocol); match ws_upgrade { - Some(ws) => ws + Some(ws) => Ok(ws .on_upgrade(|socket| proxy_web3_socket(app, socket).instrument(user_span)) - .into_response(), + .into_response()), None => { // this is not a websocket. redirect to a friendly page - Redirect::to(&app.config.redirect_public_url).into_response() + Ok(Redirect::to(&app.config.redirect_public_url).into_response()) } } } @@ -63,16 +56,8 @@ pub async fn user_websocket_handler( Extension(app): Extension>, Path(user_key): Path, ws_upgrade: Option, -) -> Response { - // TODO: dry this up. maybe a rate_limit_by_key_response function? - let user_id = match app.rate_limit_by_key(user_key).await { - Ok(x) => match x.try_into_response().await { - Ok(RateLimitResult::AllowedUser(x)) => x, - Err(err_response) => return err_response, - _ => unimplemented!(), - }, - Err(err) => return anyhow_error_into_response(None, None, err).into_response(), - }; +) -> FrontendResult { + let user_id: u64 = rate_limit_by_user_key(&app, user_key).await?.try_into()?; let protocol = Protocol::Websocket; @@ -81,8 +66,8 @@ pub async fn user_websocket_handler( let user_span = error_span!("user", user_id, ?protocol); match ws_upgrade { - Some(ws_upgrade) => ws_upgrade - .on_upgrade(move |socket| proxy_web3_socket(app, socket).instrument(user_span)), + Some(ws_upgrade) => Ok(ws_upgrade + .on_upgrade(move |socket| proxy_web3_socket(app, socket).instrument(user_span))), None => { // TODO: store this on the app and use register_template? let reg = Handlebars::new(); @@ -97,7 +82,7 @@ pub async fn user_websocket_handler( .unwrap(); // this is not a websocket. redirect to a page for this user - Redirect::to(&user_url).into_response() + Ok(Redirect::to(&user_url).into_response()) } } }