rate limiting on user key

This commit is contained in:
Bryan Stitt 2022-08-04 01:10:27 +00:00
parent dede79fde1
commit be13cb0ff9
8 changed files with 131 additions and 42 deletions

12
Cargo.lock generated
View File

@ -3999,9 +3999,9 @@ checksum = "930c0acf610d3fdb5e2ab6213019aaa04e227ebe9547b0649ba599b16d788bd7"
[[package]] [[package]]
name = "serde" name = "serde"
version = "1.0.141" version = "1.0.142"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7af873f2c95b99fcb0bd0fe622a43e29514658873c8ceba88c4cb88833a22500" checksum = "e590c437916fb6b221e1d00df6e3294f3fccd70ca7e92541c475d6ed6ef5fee2"
dependencies = [ dependencies = [
"serde_derive", "serde_derive",
] ]
@ -4028,9 +4028,9 @@ dependencies = [
[[package]] [[package]]
name = "serde_derive" name = "serde_derive"
version = "1.0.141" version = "1.0.142"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "75743a150d003dd863b51dc809bcad0d73f2102c53632f1e954e738192a3413f" checksum = "34b5b8d809babe02f538c2cfec6f2c1ed10804c0e5a6a041a049a4f5588ccc2e"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
@ -4039,9 +4039,9 @@ dependencies = [
[[package]] [[package]]
name = "serde_json" name = "serde_json"
version = "1.0.82" version = "1.0.83"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "82c2c1fdcd807d1098552c5b9a36e425e42e9fbd7c6a37a8425f390f781f7fa7" checksum = "38dd04e3c8279e75b31ef29dbdceebfe5ad89f4d0937213c53f7d49d01b3d5a7"
dependencies = [ dependencies = [
"itoa 1.0.2", "itoa 1.0.2",
"ryu", "ryu",

View File

@ -10,4 +10,4 @@ path = "src/mod.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
sea-orm = { version = "0.9.1" } sea-orm = "0.9.1"

View File

@ -43,8 +43,8 @@ reqwest = { version = "0.11.11", default-features = false, features = ["json", "
rustc-hash = "1.1.0" rustc-hash = "1.1.0"
siwe = "0.4.1" siwe = "0.4.1"
sea-orm = { version = "0.9.1", features = ["macros"] } sea-orm = { version = "0.9.1", features = ["macros"] }
serde = { version = "1.0.141", features = [] } serde = { version = "1.0.142", features = [] }
serde_json = { version = "1.0.82", default-features = false, features = ["alloc", "raw_value"] } serde_json = { version = "1.0.83", default-features = false, features = ["alloc", "raw_value"] }
tokio = { version = "1.20.1", features = ["full", "tracing"] } tokio = { version = "1.20.1", features = ["full", "tracing"] }
async-std = { version = "1.12.0", features = ["attributes", "tokio1"] } async-std = { version = "1.12.0", features = ["attributes", "tokio1"] }
toml = "0.5.9" toml = "0.5.9"

View File

@ -262,7 +262,7 @@ pub struct Web3ProxyApp {
head_block_receiver: watch::Receiver<Arc<Block<TxHash>>>, head_block_receiver: watch::Receiver<Arc<Block<TxHash>>>,
pending_tx_sender: broadcast::Sender<TxState>, pending_tx_sender: broadcast::Sender<TxState>,
pending_transactions: Arc<DashMap<TxHash, TxState>>, pending_transactions: Arc<DashMap<TxHash, TxState>>,
public_rate_limiter: Option<RedisCellClient>, rate_limiter: Option<RedisCellClient>,
db_conn: Option<sea_orm::DatabaseConnection>, db_conn: Option<sea_orm::DatabaseConnection>,
} }
@ -274,12 +274,16 @@ impl fmt::Debug for Web3ProxyApp {
} }
impl Web3ProxyApp { impl Web3ProxyApp {
pub fn db_conn(&self) -> &sea_orm::DatabaseConnection {
self.db_conn.as_ref().unwrap()
}
pub fn pending_transactions(&self) -> &DashMap<TxHash, TxState> { pub fn pending_transactions(&self) -> &DashMap<TxHash, TxState> {
&self.pending_transactions &self.pending_transactions
} }
pub fn public_rate_limiter(&self) -> Option<&RedisCellClient> { pub fn rate_limiter(&self) -> Option<&RedisCellClient> {
self.public_rate_limiter.as_ref() self.rate_limiter.as_ref()
} }
// TODO: should we just take the rpc config as the only arg instead? // TODO: should we just take the rpc config as the only arg instead?
@ -441,7 +445,7 @@ impl Web3ProxyApp {
head_block_receiver, head_block_receiver,
pending_tx_sender, pending_tx_sender,
pending_transactions, pending_transactions,
public_rate_limiter, rate_limiter: public_rate_limiter,
db_conn, db_conn,
}; };

View File

@ -3,6 +3,7 @@ use axum_client_ip::ClientIp;
use std::sync::Arc; use std::sync::Arc;
use super::errors::handle_anyhow_error; use super::errors::handle_anyhow_error;
use super::{rate_limit_by_ip, rate_limit_by_key};
use crate::{app::Web3ProxyApp, jsonrpc::JsonRpcRequestEnum}; use crate::{app::Web3ProxyApp, jsonrpc::JsonRpcRequestEnum};
pub async fn proxy_web3_rpc( pub async fn proxy_web3_rpc(
@ -10,23 +11,23 @@ pub async fn proxy_web3_rpc(
Extension(app): Extension<Arc<Web3ProxyApp>>, Extension(app): Extension<Arc<Web3ProxyApp>>,
ClientIp(ip): ClientIp, ClientIp(ip): ClientIp,
) -> impl IntoResponse { ) -> impl IntoResponse {
if let Some(rate_limiter) = app.public_rate_limiter() { if let Err(x) = rate_limit_by_ip(&app, &ip).await {
let rate_limiter_key = format!("{}", ip); return x.into_response();
}
if rate_limiter.throttle_key(&rate_limiter_key).await.is_err() {
// TODO: set headers so they know when they can retry match app.proxy_web3_rpc(payload).await {
// warn!(?ip, "public rate limit exceeded"); Ok(response) => (StatusCode::OK, Json(&response)).into_response(),
// TODO: use their id if possible Err(err) => handle_anyhow_error(None, None, err).await.into_response(),
return handle_anyhow_error( }
Some(StatusCode::TOO_MANY_REQUESTS), }
None,
anyhow::anyhow!("too many requests"), pub async fn user_proxy_web3_rpc(
) Json(payload): Json<JsonRpcRequestEnum>,
.await Extension(app): Extension<Arc<Web3ProxyApp>>,
.into_response(); key: String,
} ) -> impl IntoResponse {
} else { if let Err(x) = rate_limit_by_key(&app, &key).await {
// TODO: if no redis, rate limit with a local cache? return x.into_response();
} }
match app.proxy_web3_rpc(payload).await { match app.proxy_web3_rpc(payload).await {

View File

@ -7,15 +7,53 @@ mod ws_proxy;
use axum::{ use axum::{
handler::Handler, handler::Handler,
response::IntoResponse,
routing::{get, post}, routing::{get, post},
Extension, Router, Extension, Router,
}; };
use std::net::SocketAddr; use reqwest::StatusCode;
use std::net::{IpAddr, SocketAddr};
use std::sync::Arc; use std::sync::Arc;
use tracing::debug; use tracing::debug;
use crate::app::Web3ProxyApp; use crate::app::Web3ProxyApp;
use self::errors::handle_anyhow_error;
pub async fn rate_limit_by_ip(app: &Web3ProxyApp, ip: &IpAddr) -> Result<(), impl IntoResponse> {
let rate_limiter_key = format!("ip:{}", ip);
rate_limit_by_key(app, &rate_limiter_key).await
}
pub async fn rate_limit_by_key(
app: &Web3ProxyApp,
user_key: &str,
) -> Result<(), impl IntoResponse> {
let db = app.db_conn();
// TODO: query the db to make sure this key is active
if let Some(rate_limiter) = app.rate_limiter() {
if rate_limiter.throttle_key(user_key).await.is_err() {
// TODO: set headers so they know when they can retry
// warn!(?ip, "public rate limit exceeded");
// TODO: use their id if possible
return Err(handle_anyhow_error(
Some(StatusCode::TOO_MANY_REQUESTS),
None,
anyhow::anyhow!("too many requests"),
)
.await
.into_response());
}
} else {
// TODO: if no redis, rate limit with a local cache?
}
Ok(())
}
pub async fn run(port: u16, proxy_app: Arc<Web3ProxyApp>) -> anyhow::Result<()> { pub async fn run(port: u16, proxy_app: Arc<Web3ProxyApp>) -> anyhow::Result<()> {
// TODO: check auth (from authp?) here // TODO: check auth (from authp?) here
// build our application with a route // build our application with a route
@ -25,6 +63,10 @@ pub async fn run(port: u16, proxy_app: Arc<Web3ProxyApp>) -> anyhow::Result<()>
.route("/", post(http_proxy::proxy_web3_rpc)) .route("/", post(http_proxy::proxy_web3_rpc))
// `websocket /` goes to `proxy_web3_ws` // `websocket /` goes to `proxy_web3_ws`
.route("/", get(ws_proxy::websocket_handler)) .route("/", get(ws_proxy::websocket_handler))
// `POST /rpc/:key` goes to `proxy_web3_rpc`
.route("/rpc/:key", post(http_proxy::user_proxy_web3_rpc))
// `websocket /` goes to `proxy_web3_ws`
.route("/rpc/:key", get(ws_proxy::user_websocket_handler))
// `GET /health` goes to `health` // `GET /health` goes to `health`
.route("/health", get(http::health)) .route("/health", get(http::health))
// `GET /status` goes to `status` // `GET /status` goes to `status`

View File

@ -7,32 +7,54 @@
// I wonder how we handle payment // I wonder how we handle payment
// probably have to do manual withdrawals // probably have to do manual withdrawals
use axum::{http::StatusCode, response::IntoResponse, Json}; use axum::{response::IntoResponse, Extension, Json};
use ethers::prelude::{Address, Bytes}; use axum_client_ip::ClientIp;
use serde::{Deserialize, Serialize};
use entities::user; use entities::user;
use ethers::{prelude::Address, types::Bytes};
use sea_orm::ActiveModelTrait;
use serde::Deserialize;
use std::sync::Arc;
// use entities::user::User; use crate::{app::Web3ProxyApp, frontend::rate_limit_by_ip};
pub async fn create_user( pub async fn create_user(
// this argument tells axum to parse the request body // this argument tells axum to parse the request body
// as JSON into a `CreateUser` type // as JSON into a `CreateUser` type
Json(payload): Json<CreateUser>, Json(payload): Json<CreateUser>,
Extension(app): Extension<Arc<Web3ProxyApp>>,
ClientIp(ip): ClientIp,
) -> impl IntoResponse { ) -> impl IntoResponse {
// TODO: rate limit by ip if let Err(x) = rate_limit_by_ip(&app, &ip).await {
// TODO: insert your application logic here return x;
}
// TODO: check invite_code against the app's config
if payload.invite_code != "llam4n0des!" {
todo!("proper error message")
}
// TODO: dont unwrap. proper error
let signature: [u8; 65] = payload.signature.as_ref().try_into().unwrap();
// TODO: calculate the expected message for the current user. include domain and a nonce. let timestamp be automatic
let message: siwe::Message = "abc123".parse().unwrap();
if let Err(e) = message.verify(signature, None, None, None) {
// message cannot be correctly authenticated
todo!("proper error message: {}", e)
}
let user = user::ActiveModel { let user = user::ActiveModel {
address: sea_orm::Set(payload.address.to_string()), address: sea_orm::Set(payload.address.to_string()),
email: sea_orm::Set(payload.email),
..Default::default() ..Default::default()
}; };
// TODO: optional email let db = app.db_conn();
todo!(); // TODO: proper error message
let user = user.insert(db).await.unwrap();
// this will be converted into a JSON response todo!("serialize and return the user: {:?}", user)
// with a status code of `201 Created`
// (StatusCode::CREATED, Json(user)) // (StatusCode::CREATED, Json(user))
} }

View File

@ -3,6 +3,7 @@ use axum::{
response::IntoResponse, response::IntoResponse,
Extension, Extension,
}; };
use axum_client_ip::ClientIp;
use futures::SinkExt; use futures::SinkExt;
use futures::{ use futures::{
future::AbortHandle, future::AbortHandle,
@ -19,10 +20,29 @@ use crate::{
jsonrpc::{JsonRpcForwardedResponse, JsonRpcForwardedResponseEnum, JsonRpcRequest}, jsonrpc::{JsonRpcForwardedResponse, JsonRpcForwardedResponseEnum, JsonRpcRequest},
}; };
use super::{rate_limit_by_ip, rate_limit_by_key};
pub async fn websocket_handler( pub async fn websocket_handler(
Extension(app): Extension<Arc<Web3ProxyApp>>, Extension(app): Extension<Arc<Web3ProxyApp>>,
ClientIp(ip): ClientIp,
ws: WebSocketUpgrade, ws: WebSocketUpgrade,
) -> impl IntoResponse { ) -> impl IntoResponse {
if let Err(x) = rate_limit_by_ip(&app, &ip).await {
return x.into_response();
}
ws.on_upgrade(|socket| proxy_web3_socket(app, socket))
}
pub async fn user_websocket_handler(
Extension(app): Extension<Arc<Web3ProxyApp>>,
ws: WebSocketUpgrade,
key: String,
) -> impl IntoResponse {
if let Err(x) = rate_limit_by_key(&app, &key).await {
return x.into_response();
}
ws.on_upgrade(|socket| proxy_web3_socket(app, socket)) ws.on_upgrade(|socket| proxy_web3_socket(app, socket))
} }