dry rate_limit_by_x

This commit is contained in:
Bryan Stitt 2022-08-21 09:39:38 +00:00
parent 748674fe7a
commit b16aa8d813
5 changed files with 111 additions and 87 deletions

@ -19,6 +19,7 @@ pub enum FrontendErrorResponse {
// TODO: should we box these instead?
Redis(RedisError),
RedisRunError(RunError<RedisError>),
Response(Response),
}
impl IntoResponse for FrontendErrorResponse {
@ -31,6 +32,9 @@ impl IntoResponse for FrontendErrorResponse {
Self::Box(err) => anyhow::anyhow!("Boxed error: {:?}", err),
Self::Redis(err) => err.into(),
Self::RedisRunError(err) => err.into(),
Self::Response(r) => {
return r;
}
};
let err = JsonRpcForwardedResponse::from_anyhow_error(err, null_id);

@ -1,11 +1,12 @@
use super::errors::anyhow_error_into_response;
use super::rate_limit::RateLimitResult;
use super::errors::{anyhow_error_into_response, FrontendResult};
use super::rate_limit::{rate_limit_by_ip, rate_limit_by_user_key, RateLimitResult};
use crate::stats::{Protocol, ProxyRequestLabels};
use crate::{app::Web3ProxyApp, jsonrpc::JsonRpcRequestEnum};
use axum::extract::Path;
use axum::response::Response;
use axum::{http::StatusCode, response::IntoResponse, Extension, Json};
use axum_client_ip::ClientIp;
use std::net::IpAddr;
use std::sync::Arc;
use tracing::{error_span, Instrument};
use uuid::Uuid;
@ -14,16 +15,8 @@ pub async fn public_proxy_web3_rpc(
Json(payload): Json<JsonRpcRequestEnum>,
Extension(app): Extension<Arc<Web3ProxyApp>>,
ClientIp(ip): ClientIp,
) -> Response {
// TODO: dry this up a lot
let _ip = match app.rate_limit_by_ip(ip).await {
Ok(x) => match x.try_into_response().await {
Ok(RateLimitResult::AllowedIp(x)) => x,
Err(err_response) => return err_response,
_ => unimplemented!(),
},
Err(err) => return anyhow_error_into_response(None, None, err),
};
) -> FrontendResult {
let _ip: IpAddr = rate_limit_by_ip(&app, ip).await?.try_into()?;
let protocol = Protocol::HTTP;
let user_id = 0;
@ -65,32 +58,23 @@ pub async fn public_proxy_web3_rpc(
};
*/
match app.proxy_web3_rpc(payload).instrument(user_span).await {
Ok(response) => (StatusCode::OK, Json(&response)).into_response(),
Err(err) => anyhow_error_into_response(None, None, err),
}
let response = app.proxy_web3_rpc(payload).instrument(user_span).await?;
Ok((StatusCode::OK, Json(&response)).into_response())
}
pub async fn user_proxy_web3_rpc(
Json(payload): Json<JsonRpcRequestEnum>,
Extension(app): Extension<Arc<Web3ProxyApp>>,
Path(user_key): Path<Uuid>,
) -> Response {
let user_id = match app.rate_limit_by_key(user_key).await {
Ok(x) => match x.try_into_response().await {
Ok(RateLimitResult::AllowedUser(x)) => x,
Err(err_response) => return err_response,
_ => unimplemented!(),
},
Err(err) => return anyhow_error_into_response(None, None, err),
};
) -> FrontendResult {
let user_id: u64 = rate_limit_by_user_key(&app, user_key).await?.try_into()?;
let protocol = Protocol::HTTP;
let user_span = error_span!("user", user_id, ?protocol);
match app.proxy_web3_rpc(payload).instrument(user_span).await {
Ok(response) => (StatusCode::OK, Json(&response)).into_response(),
Err(err) => anyhow_error_into_response(None, None, err),
}
let response = app.proxy_web3_rpc(payload).instrument(user_span).await?;
Ok((StatusCode::OK, Json(&response)).into_response())
}

@ -1,6 +1,7 @@
use super::errors::anyhow_error_into_response;
use super::errors::{anyhow_error_into_response, FrontendErrorResponse, FrontendResult};
use crate::app::{UserCacheValue, Web3ProxyApp};
use axum::response::Response;
use derive_more::{From, TryInto};
use entities::user_keys;
use redis_rate_limit::ThrottleResult;
use reqwest::StatusCode;
@ -20,12 +21,77 @@ pub enum RateLimitResult {
UnknownKey,
}
impl RateLimitResult {
// TODO: i think this should be a function on RateLimitResult
pub async fn try_into_response(self) -> Result<RateLimitResult, Response> {
match self {
RateLimitResult::AllowedIp(_) => Ok(self),
RateLimitResult::AllowedUser(_) => Ok(self),
#[derive(From)]
pub enum RequestFrom {
Ip(IpAddr),
// TODO: fetch the actual user?
User(u64),
}
pub type RateLimitFrontendResult = Result<RequestFrom, FrontendErrorResponse>;
impl TryFrom<RequestFrom> for IpAddr {
type Error = anyhow::Error;
fn try_from(value: RequestFrom) -> Result<Self, Self::Error> {
match value {
RequestFrom::Ip(x) => Ok(x),
_ => Err(anyhow::anyhow!("not an ip")),
}
}
}
impl TryFrom<RequestFrom> for u64 {
type Error = anyhow::Error;
fn try_from(value: RequestFrom) -> Result<Self, Self::Error> {
match value {
RequestFrom::User(x) => Ok(x),
_ => Err(anyhow::anyhow!("not a user")),
}
}
}
pub async fn rate_limit_by_ip(app: &Web3ProxyApp, ip: IpAddr) -> RateLimitFrontendResult {
let rate_limit_result = app.rate_limit_by_ip(ip).await?;
match rate_limit_result {
RateLimitResult::AllowedIp(x) => Ok(x.into()),
RateLimitResult::AllowedUser(_) => panic!("only ips or errors are expected here"),
rate_limit_result => {
let _: RequestFrom = rate_limit_result.try_into()?;
panic!("try_into should have failed")
}
}
}
pub async fn rate_limit_by_user_key(
app: &Web3ProxyApp,
// TODO: change this to a Ulid
user_key: Uuid,
) -> RateLimitFrontendResult {
let rate_limit_result = app.rate_limit_by_key(user_key).await?.into();
match rate_limit_result {
RateLimitResult::AllowedIp(x) => panic!("only user keys or errors are expected here"),
RateLimitResult::AllowedUser(x) => Ok(x.into()),
rate_limit_result => {
let _: RequestFrom = rate_limit_result.try_into()?;
panic!("try_into should have failed")
}
}
}
impl TryFrom<RateLimitResult> for RequestFrom {
// TODO: return an error that has its own IntoResponse?
type Error = Response;
fn try_from(value: RateLimitResult) -> Result<Self, Self::Error> {
match value {
RateLimitResult::AllowedIp(x) => Ok(RequestFrom::Ip(x)),
RateLimitResult::AllowedUser(x) => Ok(RequestFrom::User(x)),
RateLimitResult::IpRateLimitExceeded(ip) => Err(anyhow_error_into_response(
Some(StatusCode::TOO_MANY_REQUESTS),
None,

@ -7,6 +7,7 @@
// I wonder how we handle payment
// probably have to do manual withdrawals
use super::rate_limit::rate_limit_by_ip;
use super::{
errors::{anyhow_error_into_response, FrontendResult},
rate_limit::RateLimitResult,
@ -27,8 +28,8 @@ use reqwest::StatusCode;
use sea_orm::ActiveModelTrait;
use serde::Deserialize;
use siwe::Message;
use std::ops::Add;
use std::sync::Arc;
use std::{net::IpAddr, ops::Add};
use time::{Duration, OffsetDateTime};
use ulid::Ulid;
@ -44,15 +45,7 @@ pub async fn get_login(
// TODO: allow ENS names here?
Path(mut params): Path<HashMap<String, String>>,
) -> FrontendResult {
// TODO: refactor this to use the try operator
let _ip = match app.rate_limit_by_ip(ip).await {
Ok(x) => match x.try_into_response().await {
Ok(RateLimitResult::AllowedIp(x)) => x,
Err(err_response) => return Ok(err_response),
_ => unimplemented!(),
},
Err(err) => return Ok(anyhow_error_into_response(None, None, err)),
};
let _ip: IpAddr = rate_limit_by_ip(&app, ip).await?.try_into()?;
// at first i thought about checking that user_address is in our db
// but theres no need to separate the registration and login flows
@ -135,8 +128,9 @@ pub struct PostLogin {
address: Address,
msg: String,
sig: Bytes,
version: String,
signer: String,
// TODO: do we care about these? we should probably check the version is something we expect
// version: String,
// signer: String,
}
#[debug_handler]
@ -146,17 +140,8 @@ pub async fn post_login(
Extension(app): Extension<Arc<Web3ProxyApp>>,
Json(payload): Json<PostLogin>,
Query(query): Query<PostLoginQuery>,
) -> Response {
// TODO: return a Result instead
// TODO: dry this up ip checking up
let _ip = match app.rate_limit_by_ip(ip).await {
Ok(x) => match x.try_into_response().await {
Ok(RateLimitResult::AllowedIp(x)) => x,
Err(err_response) => return err_response,
_ => unimplemented!(),
},
Err(err) => return anyhow_error_into_response(None, None, err),
};
) -> FrontendResult {
let _ip: IpAddr = rate_limit_by_ip(&app, ip).await?.try_into()?;
let mut new_user = true; // TODO: check the database

@ -1,3 +1,5 @@
use super::errors::FrontendResult;
use super::rate_limit::{rate_limit_by_ip, rate_limit_by_user_key};
use axum::{
extract::ws::{Message, WebSocket, WebSocketUpgrade},
extract::Path,
@ -14,7 +16,7 @@ use futures::{
use handlebars::Handlebars;
use hashbrown::HashMap;
use serde_json::{json, value::RawValue};
use std::sync::Arc;
use std::{net::IpAddr, sync::Arc};
use std::{str::from_utf8_mut, sync::atomic::AtomicUsize};
use tracing::{error, error_span, info, trace, Instrument};
use uuid::Uuid;
@ -25,22 +27,13 @@ use crate::{
stats::Protocol,
};
use super::{errors::anyhow_error_into_response, rate_limit::RateLimitResult};
#[debug_handler]
pub async fn public_websocket_handler(
Extension(app): Extension<Arc<Web3ProxyApp>>,
ClientIp(ip): ClientIp,
ws_upgrade: Option<WebSocketUpgrade>,
) -> Response {
let _ip = match app.rate_limit_by_ip(ip).await {
Ok(x) => match x.try_into_response().await {
Ok(RateLimitResult::AllowedIp(x)) => x,
Err(err_response) => return err_response,
_ => unimplemented!(),
},
Err(err) => return anyhow_error_into_response(None, None, err).into_response(),
};
) -> FrontendResult {
let _ip: IpAddr = rate_limit_by_ip(&app, ip).await?.try_into()?;
let user_id = 0;
let protocol = Protocol::Websocket;
@ -48,12 +41,12 @@ pub async fn public_websocket_handler(
let user_span = error_span!("user", user_id, ?protocol);
match ws_upgrade {
Some(ws) => ws
Some(ws) => Ok(ws
.on_upgrade(|socket| proxy_web3_socket(app, socket).instrument(user_span))
.into_response(),
.into_response()),
None => {
// this is not a websocket. redirect to a friendly page
Redirect::to(&app.config.redirect_public_url).into_response()
Ok(Redirect::to(&app.config.redirect_public_url).into_response())
}
}
}
@ -63,16 +56,8 @@ pub async fn user_websocket_handler(
Extension(app): Extension<Arc<Web3ProxyApp>>,
Path(user_key): Path<Uuid>,
ws_upgrade: Option<WebSocketUpgrade>,
) -> Response {
// TODO: dry this up. maybe a rate_limit_by_key_response function?
let user_id = match app.rate_limit_by_key(user_key).await {
Ok(x) => match x.try_into_response().await {
Ok(RateLimitResult::AllowedUser(x)) => x,
Err(err_response) => return err_response,
_ => unimplemented!(),
},
Err(err) => return anyhow_error_into_response(None, None, err).into_response(),
};
) -> FrontendResult {
let user_id: u64 = rate_limit_by_user_key(&app, user_key).await?.try_into()?;
let protocol = Protocol::Websocket;
@ -81,8 +66,8 @@ pub async fn user_websocket_handler(
let user_span = error_span!("user", user_id, ?protocol);
match ws_upgrade {
Some(ws_upgrade) => ws_upgrade
.on_upgrade(move |socket| proxy_web3_socket(app, socket).instrument(user_span)),
Some(ws_upgrade) => Ok(ws_upgrade
.on_upgrade(move |socket| proxy_web3_socket(app, socket).instrument(user_span))),
None => {
// TODO: store this on the app and use register_template?
let reg = Handlebars::new();
@ -97,7 +82,7 @@ pub async fn user_websocket_handler(
.unwrap();
// this is not a websocket. redirect to a page for this user
Redirect::to(&user_url).into_response()
Ok(Redirect::to(&user_url).into_response())
}
}
}