From be13cb0ff98ff6302a482a25f6c7ef5a56a1a532 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Thu, 4 Aug 2022 01:10:27 +0000 Subject: [PATCH] rate limiting on user key --- Cargo.lock | 12 ++++---- entities/Cargo.toml | 2 +- web3-proxy/Cargo.toml | 4 +-- web3-proxy/src/app.rs | 12 +++++--- web3-proxy/src/frontend/http_proxy.rs | 35 ++++++++++----------- web3-proxy/src/frontend/mod.rs | 44 ++++++++++++++++++++++++++- web3-proxy/src/frontend/users.rs | 44 ++++++++++++++++++++------- web3-proxy/src/frontend/ws_proxy.rs | 20 ++++++++++++ 8 files changed, 131 insertions(+), 42 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 43b45c2f..614cfa77 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3999,9 +3999,9 @@ checksum = "930c0acf610d3fdb5e2ab6213019aaa04e227ebe9547b0649ba599b16d788bd7" [[package]] name = "serde" -version = "1.0.141" +version = "1.0.142" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7af873f2c95b99fcb0bd0fe622a43e29514658873c8ceba88c4cb88833a22500" +checksum = "e590c437916fb6b221e1d00df6e3294f3fccd70ca7e92541c475d6ed6ef5fee2" dependencies = [ "serde_derive", ] @@ -4028,9 +4028,9 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.141" +version = "1.0.142" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75743a150d003dd863b51dc809bcad0d73f2102c53632f1e954e738192a3413f" +checksum = "34b5b8d809babe02f538c2cfec6f2c1ed10804c0e5a6a041a049a4f5588ccc2e" dependencies = [ "proc-macro2", "quote", @@ -4039,9 +4039,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.82" +version = "1.0.83" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "82c2c1fdcd807d1098552c5b9a36e425e42e9fbd7c6a37a8425f390f781f7fa7" +checksum = "38dd04e3c8279e75b31ef29dbdceebfe5ad89f4d0937213c53f7d49d01b3d5a7" dependencies = [ "itoa 1.0.2", "ryu", diff --git a/entities/Cargo.toml b/entities/Cargo.toml index 65563b06..be40c529 100644 --- a/entities/Cargo.toml +++ b/entities/Cargo.toml @@ -10,4 +10,4 @@ path = "src/mod.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -sea-orm = { version = "0.9.1" } +sea-orm = "0.9.1" diff --git a/web3-proxy/Cargo.toml b/web3-proxy/Cargo.toml index 7a70728f..3411f338 100644 --- a/web3-proxy/Cargo.toml +++ b/web3-proxy/Cargo.toml @@ -43,8 +43,8 @@ reqwest = { version = "0.11.11", default-features = false, features = ["json", " rustc-hash = "1.1.0" siwe = "0.4.1" sea-orm = { version = "0.9.1", features = ["macros"] } -serde = { version = "1.0.141", features = [] } -serde_json = { version = "1.0.82", default-features = false, features = ["alloc", "raw_value"] } +serde = { version = "1.0.142", features = [] } +serde_json = { version = "1.0.83", default-features = false, features = ["alloc", "raw_value"] } tokio = { version = "1.20.1", features = ["full", "tracing"] } async-std = { version = "1.12.0", features = ["attributes", "tokio1"] } toml = "0.5.9" diff --git a/web3-proxy/src/app.rs b/web3-proxy/src/app.rs index 138bc26f..87e1c428 100644 --- a/web3-proxy/src/app.rs +++ b/web3-proxy/src/app.rs @@ -262,7 +262,7 @@ pub struct Web3ProxyApp { head_block_receiver: watch::Receiver>>, pending_tx_sender: broadcast::Sender, pending_transactions: Arc>, - public_rate_limiter: Option, + rate_limiter: Option, db_conn: Option, } @@ -274,12 +274,16 @@ impl fmt::Debug for Web3ProxyApp { } impl Web3ProxyApp { + pub fn db_conn(&self) -> &sea_orm::DatabaseConnection { + self.db_conn.as_ref().unwrap() + } + pub fn pending_transactions(&self) -> &DashMap { &self.pending_transactions } - pub fn public_rate_limiter(&self) -> Option<&RedisCellClient> { - self.public_rate_limiter.as_ref() + pub fn rate_limiter(&self) -> Option<&RedisCellClient> { + self.rate_limiter.as_ref() } // TODO: should we just take the rpc config as the only arg instead? @@ -441,7 +445,7 @@ impl Web3ProxyApp { head_block_receiver, pending_tx_sender, pending_transactions, - public_rate_limiter, + rate_limiter: public_rate_limiter, db_conn, }; diff --git a/web3-proxy/src/frontend/http_proxy.rs b/web3-proxy/src/frontend/http_proxy.rs index bd69e8ba..b47a9c9d 100644 --- a/web3-proxy/src/frontend/http_proxy.rs +++ b/web3-proxy/src/frontend/http_proxy.rs @@ -3,6 +3,7 @@ use axum_client_ip::ClientIp; use std::sync::Arc; use super::errors::handle_anyhow_error; +use super::{rate_limit_by_ip, rate_limit_by_key}; use crate::{app::Web3ProxyApp, jsonrpc::JsonRpcRequestEnum}; pub async fn proxy_web3_rpc( @@ -10,23 +11,23 @@ pub async fn proxy_web3_rpc( Extension(app): Extension>, ClientIp(ip): ClientIp, ) -> impl IntoResponse { - if let Some(rate_limiter) = app.public_rate_limiter() { - let rate_limiter_key = format!("{}", ip); - - if rate_limiter.throttle_key(&rate_limiter_key).await.is_err() { - // TODO: set headers so they know when they can retry - // warn!(?ip, "public rate limit exceeded"); - // TODO: use their id if possible - return handle_anyhow_error( - Some(StatusCode::TOO_MANY_REQUESTS), - None, - anyhow::anyhow!("too many requests"), - ) - .await - .into_response(); - } - } else { - // TODO: if no redis, rate limit with a local cache? + if let Err(x) = rate_limit_by_ip(&app, &ip).await { + return x.into_response(); + } + + match app.proxy_web3_rpc(payload).await { + Ok(response) => (StatusCode::OK, Json(&response)).into_response(), + Err(err) => handle_anyhow_error(None, None, err).await.into_response(), + } +} + +pub async fn user_proxy_web3_rpc( + Json(payload): Json, + Extension(app): Extension>, + key: String, +) -> impl IntoResponse { + if let Err(x) = rate_limit_by_key(&app, &key).await { + return x.into_response(); } match app.proxy_web3_rpc(payload).await { diff --git a/web3-proxy/src/frontend/mod.rs b/web3-proxy/src/frontend/mod.rs index f04c733a..24c4165c 100644 --- a/web3-proxy/src/frontend/mod.rs +++ b/web3-proxy/src/frontend/mod.rs @@ -7,15 +7,53 @@ mod ws_proxy; use axum::{ handler::Handler, + response::IntoResponse, routing::{get, post}, Extension, Router, }; -use std::net::SocketAddr; +use reqwest::StatusCode; +use std::net::{IpAddr, SocketAddr}; use std::sync::Arc; use tracing::debug; use crate::app::Web3ProxyApp; +use self::errors::handle_anyhow_error; + +pub async fn rate_limit_by_ip(app: &Web3ProxyApp, ip: &IpAddr) -> Result<(), impl IntoResponse> { + let rate_limiter_key = format!("ip:{}", ip); + + rate_limit_by_key(app, &rate_limiter_key).await +} + +pub async fn rate_limit_by_key( + app: &Web3ProxyApp, + user_key: &str, +) -> Result<(), impl IntoResponse> { + let db = app.db_conn(); + + // TODO: query the db to make sure this key is active + + if let Some(rate_limiter) = app.rate_limiter() { + if rate_limiter.throttle_key(user_key).await.is_err() { + // TODO: set headers so they know when they can retry + // warn!(?ip, "public rate limit exceeded"); + // TODO: use their id if possible + return Err(handle_anyhow_error( + Some(StatusCode::TOO_MANY_REQUESTS), + None, + anyhow::anyhow!("too many requests"), + ) + .await + .into_response()); + } + } else { + // TODO: if no redis, rate limit with a local cache? + } + + Ok(()) +} + pub async fn run(port: u16, proxy_app: Arc) -> anyhow::Result<()> { // TODO: check auth (from authp?) here // build our application with a route @@ -25,6 +63,10 @@ pub async fn run(port: u16, proxy_app: Arc) -> anyhow::Result<()> .route("/", post(http_proxy::proxy_web3_rpc)) // `websocket /` goes to `proxy_web3_ws` .route("/", get(ws_proxy::websocket_handler)) + // `POST /rpc/:key` goes to `proxy_web3_rpc` + .route("/rpc/:key", post(http_proxy::user_proxy_web3_rpc)) + // `websocket /` goes to `proxy_web3_ws` + .route("/rpc/:key", get(ws_proxy::user_websocket_handler)) // `GET /health` goes to `health` .route("/health", get(http::health)) // `GET /status` goes to `status` diff --git a/web3-proxy/src/frontend/users.rs b/web3-proxy/src/frontend/users.rs index c80769ea..540e335b 100644 --- a/web3-proxy/src/frontend/users.rs +++ b/web3-proxy/src/frontend/users.rs @@ -7,32 +7,54 @@ // I wonder how we handle payment // probably have to do manual withdrawals -use axum::{http::StatusCode, response::IntoResponse, Json}; -use ethers::prelude::{Address, Bytes}; -use serde::{Deserialize, Serialize}; - +use axum::{response::IntoResponse, Extension, Json}; +use axum_client_ip::ClientIp; use entities::user; +use ethers::{prelude::Address, types::Bytes}; +use sea_orm::ActiveModelTrait; +use serde::Deserialize; +use std::sync::Arc; -// use entities::user::User; +use crate::{app::Web3ProxyApp, frontend::rate_limit_by_ip}; pub async fn create_user( // this argument tells axum to parse the request body // as JSON into a `CreateUser` type Json(payload): Json, + Extension(app): Extension>, + ClientIp(ip): ClientIp, ) -> impl IntoResponse { - // TODO: rate limit by ip - // TODO: insert your application logic here + if let Err(x) = rate_limit_by_ip(&app, &ip).await { + return x; + } + + // TODO: check invite_code against the app's config + if payload.invite_code != "llam4n0des!" { + todo!("proper error message") + } + + // TODO: dont unwrap. proper error + let signature: [u8; 65] = payload.signature.as_ref().try_into().unwrap(); + + // TODO: calculate the expected message for the current user. include domain and a nonce. let timestamp be automatic + let message: siwe::Message = "abc123".parse().unwrap(); + if let Err(e) = message.verify(signature, None, None, None) { + // message cannot be correctly authenticated + todo!("proper error message: {}", e) + } + let user = user::ActiveModel { address: sea_orm::Set(payload.address.to_string()), + email: sea_orm::Set(payload.email), ..Default::default() }; - // TODO: optional email + let db = app.db_conn(); - todo!(); + // TODO: proper error message + let user = user.insert(db).await.unwrap(); - // this will be converted into a JSON response - // with a status code of `201 Created` + todo!("serialize and return the user: {:?}", user) // (StatusCode::CREATED, Json(user)) } diff --git a/web3-proxy/src/frontend/ws_proxy.rs b/web3-proxy/src/frontend/ws_proxy.rs index 16f66c58..59871531 100644 --- a/web3-proxy/src/frontend/ws_proxy.rs +++ b/web3-proxy/src/frontend/ws_proxy.rs @@ -3,6 +3,7 @@ use axum::{ response::IntoResponse, Extension, }; +use axum_client_ip::ClientIp; use futures::SinkExt; use futures::{ future::AbortHandle, @@ -19,10 +20,29 @@ use crate::{ jsonrpc::{JsonRpcForwardedResponse, JsonRpcForwardedResponseEnum, JsonRpcRequest}, }; +use super::{rate_limit_by_ip, rate_limit_by_key}; + pub async fn websocket_handler( Extension(app): Extension>, + ClientIp(ip): ClientIp, ws: WebSocketUpgrade, ) -> impl IntoResponse { + if let Err(x) = rate_limit_by_ip(&app, &ip).await { + return x.into_response(); + } + + ws.on_upgrade(|socket| proxy_web3_socket(app, socket)) +} + +pub async fn user_websocket_handler( + Extension(app): Extension>, + ws: WebSocketUpgrade, + key: String, +) -> impl IntoResponse { + if let Err(x) = rate_limit_by_key(&app, &key).await { + return x.into_response(); + } + ws.on_upgrade(|socket| proxy_web3_socket(app, socket)) }