From 36cf8af5110d4c7682d32816af69c2734168e5ae Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Sun, 7 Aug 2022 19:33:16 +0000 Subject: [PATCH] requests_per_minute, not requests_per_second --- entities/src/user_keys.rs | 2 +- web3_proxy/src/app.rs | 2 +- .../src/bin/web3_proxy_cli/create_user.rs | 1 + web3_proxy/src/frontend/mod.rs | 66 +++++++++++++------ web3_proxy/src/frontend/ws_proxy.rs | 26 +++++--- 5 files changed, 68 insertions(+), 29 deletions(-) diff --git a/entities/src/user_keys.rs b/entities/src/user_keys.rs index 0931bde2..22683865 100644 --- a/entities/src/user_keys.rs +++ b/entities/src/user_keys.rs @@ -15,7 +15,7 @@ pub struct Model { pub description: Option, pub private_txs: bool, pub active: bool, - pub requests_per_second: u32, + pub requests_per_minute: u32, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] diff --git a/web3_proxy/src/app.rs b/web3_proxy/src/app.rs index 8a3c4407..1a91bd60 100644 --- a/web3_proxy/src/app.rs +++ b/web3_proxy/src/app.rs @@ -27,7 +27,7 @@ use tokio::sync::{broadcast, watch}; use tokio::task::JoinHandle; use tokio::time::timeout; use tokio_stream::wrappers::{BroadcastStream, WatchStream}; -use tracing::{debug, info, info_span, instrument, trace, warn, Instrument}; +use tracing::{info, info_span, instrument, trace, warn, Instrument}; use crate::bb8_helpers; use crate::config::AppConfig; diff --git a/web3_proxy/src/bin/web3_proxy_cli/create_user.rs b/web3_proxy/src/bin/web3_proxy_cli/create_user.rs index c597adc6..5e81f351 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/create_user.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/create_user.rs @@ -43,6 +43,7 @@ impl CreateUserSubCommand { let uk = user_keys::ActiveModel { user_id: sea_orm::Set(u.id), api_key: sea_orm::Set(new_api_key()), + requests_per_minute: sea_orm::Set(6_000_000), ..Default::default() }; diff --git a/web3_proxy/src/frontend/mod.rs b/web3_proxy/src/frontend/mod.rs index dac49bfc..dbb00de3 100644 --- a/web3_proxy/src/frontend/mod.rs +++ b/web3_proxy/src/frontend/mod.rs @@ -12,13 +12,14 @@ use axum::{ Extension, Router, }; use entities::user_keys; +use redis_cell_client::ThrottleResult; use reqwest::StatusCode; use sea_orm::{ ColumnTrait, DeriveColumn, EntityTrait, EnumIter, IdenStatic, QueryFilter, QuerySelect, }; use std::net::{IpAddr, SocketAddr}; use std::sync::Arc; -use tracing::info; +use tracing::{debug, info}; use uuid::Uuid; use crate::app::Web3ProxyApp; @@ -26,25 +27,38 @@ use crate::app::Web3ProxyApp; use self::errors::handle_anyhow_error; pub async fn rate_limit_by_ip(app: &Web3ProxyApp, ip: &IpAddr) -> Result<(), impl IntoResponse> { - let rate_limiter_key = format!("ip:{}", ip); + let rate_limiter_key = format!("ip-{}", ip); // TODO: dry this up with rate_limit_by_key if let Some(rate_limiter) = app.rate_limiter() { - if rate_limiter + match rate_limiter .throttle_key(&rate_limiter_key, None, None, None) .await - .is_err() { - // TODO: set headers so they know when they can retry - // warn!(?ip, "public rate limit exceeded"); // this is too verbose, but a stat might be good - // TODO: use their id if possible - return Err(handle_anyhow_error( - Some(StatusCode::TOO_MANY_REQUESTS), - None, - anyhow::anyhow!(format!("too many requests from this ip: {}", ip)), - ) - .await - .into_response()); + Ok(ThrottleResult::Allowed) => {} + Ok(ThrottleResult::RetryAt(_retry_at)) => { + // TODO: set headers so they know when they can retry + debug!(?rate_limiter_key, "rate limit exceeded"); // this is too verbose, but a stat might be good + // TODO: use their id if possible + return Err(handle_anyhow_error( + Some(StatusCode::TOO_MANY_REQUESTS), + None, + anyhow::anyhow!(format!("too many requests from this ip: {}", ip)), + ) + .await + .into_response()); + } + Err(err) => { + // internal error, not rate limit being hit + // TODO: i really want axum to do this for us in a single place. + return Err(handle_anyhow_error( + Some(StatusCode::INTERNAL_SERVER_ERROR), + None, + anyhow::anyhow!(format!("too many requests from this ip: {}", ip)), + ) + .await + .into_response()); + } } } else { // TODO: if no redis, rate limit with a local cache? @@ -61,9 +75,11 @@ pub async fn rate_limit_by_key( ) -> Result<(), impl IntoResponse> { let db = app.db_conn(); + /// query just a few columns instead of the entire table #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)] enum QueryAs { UserId, + RequestsPerMinute, } // query the db to make sure this key is active @@ -71,20 +87,22 @@ pub async fn rate_limit_by_key( match user_keys::Entity::find() .select_only() .column_as(user_keys::Column::UserId, QueryAs::UserId) + .column_as( + user_keys::Column::RequestsPerMinute, + QueryAs::RequestsPerMinute, + ) .filter(user_keys::Column::ApiKey.eq(user_key)) .filter(user_keys::Column::Active.eq(true)) .into_values::<_, QueryAs>() .one(db) .await { - Ok::, _>(Some(_)) => { + Ok::, _>(Some((_user_id, user_count_per_period))) => { // user key is valid if let Some(rate_limiter) = app.rate_limiter() { - // TODO: check the db for this? maybe add to the find above with a join? - let user_count_per_period = 100_000; // TODO: how does max burst actually work? what should it be? - let user_max_burst = user_count_per_period; - let user_period = 1; + let user_max_burst = user_count_per_period / 3; + let user_period = 60; if rate_limiter .throttle_key( @@ -164,7 +182,17 @@ pub async fn run(port: u16, proxy_app: Arc) -> anyhow::Result<()> axum::Server::bind(&addr) // TODO: option to use with_connect_info. we want it in dev, but not when running behind a proxy, but not .serve(app.into_make_service_with_connect_info::()) + .with_graceful_shutdown(signal_shutdown()) // .serve(app.into_make_service()) .await .map_err(Into::into) } + +/// Tokio signal handler that will wait for a user to press CTRL+C. +/// We use this in our hyper `Server` method `with_graceful_shutdown`. +async fn signal_shutdown() { + tokio::signal::ctrl_c() + .await + .expect("expect tokio signal ctrl-c"); + info!("signal shutdown"); +} diff --git a/web3_proxy/src/frontend/ws_proxy.rs b/web3_proxy/src/frontend/ws_proxy.rs index 03a1d803..8115762f 100644 --- a/web3_proxy/src/frontend/ws_proxy.rs +++ b/web3_proxy/src/frontend/ws_proxy.rs @@ -1,7 +1,7 @@ use axum::{ extract::ws::{Message, WebSocket, WebSocketUpgrade}, extract::Path, - response::IntoResponse, + response::{IntoResponse, Response}, Extension, }; use axum_client_ip::ClientIp; @@ -27,20 +27,30 @@ use super::{rate_limit_by_ip, rate_limit_by_key}; pub async fn public_websocket_handler( Extension(app): Extension>, ClientIp(ip): ClientIp, - ws: WebSocketUpgrade, -) -> impl IntoResponse { - if let Err(x) = rate_limit_by_ip(&app, &ip).await { - return x.into_response(); - } + ws: Option, +) -> Response { + match ws { + Some(ws) => { + if let Err(x) = rate_limit_by_ip(&app, &ip).await { + return x.into_response(); + } - ws.on_upgrade(|socket| proxy_web3_socket(app, socket)) + ws.on_upgrade(|socket| proxy_web3_socket(app, socket)) + .into_response() + } + None => { + // this is not a websocket. give a friendly page + // TODO: make a friendly page + "hello, world".into_response() + } + } } pub async fn user_websocket_handler( Extension(app): Extension>, ws: WebSocketUpgrade, Path(user_key): Path, -) -> impl IntoResponse { +) -> Response { if let Err(x) = rate_limit_by_key(&app, user_key).await { return x.into_response(); }