From e9000d1f61c51990620db477e65080cdda9f78a4 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Mon, 15 Aug 2022 22:50:56 +0000 Subject: [PATCH] drop redis-cell in favor of simpler query --- Cargo.lock | 41 +++--- Cargo.toml | 4 +- linkedhashmap/Cargo.toml | 2 +- redis-cell-client/src/lib.rs | 130 ------------------ redis-cell-server/Dockerfile | 18 --- .../Cargo.toml | 5 +- redis-rate-limit/src/errors.rs | 17 +++ redis-rate-limit/src/lib.rs | 100 ++++++++++++++ web3_proxy/Cargo.toml | 4 +- web3_proxy/src/app.rs | 20 ++- web3_proxy/src/bb8_helpers.rs | 17 --- web3_proxy/src/bin/web3_proxy.rs | 2 +- .../src/bin/web3_proxy_cli/check_config.rs | 17 ++- web3_proxy/src/config.rs | 10 +- web3_proxy/src/connection.rs | 13 +- web3_proxy/src/connections.rs | 2 +- web3_proxy/src/frontend/rate_limit.rs | 22 ++- web3_proxy/src/lib.rs | 1 - 18 files changed, 192 insertions(+), 233 deletions(-) delete mode 100644 redis-cell-client/src/lib.rs delete mode 100644 redis-cell-server/Dockerfile rename {redis-cell-client => redis-rate-limit}/Cargo.toml (68%) create mode 100644 redis-rate-limit/src/errors.rs create mode 100644 redis-rate-limit/src/lib.rs delete mode 100644 web3_proxy/src/bb8_helpers.rs diff --git a/Cargo.lock b/Cargo.lock index 053ad76f..9d8dde23 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1915,9 +1915,9 @@ checksum = "e6d5a32815ae3f33302d95fdcb2ce17862f8c65363dcfd29360480ba1001fc9c" [[package]] name = "futures" -version = "0.3.21" +version = "0.3.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f73fe65f54d1e12b726f517d3e2135ca3125a437b6d998caf1962961f7172d9e" +checksum = "ab30e97ab6aacfe635fad58f22c2bb06c8b685f7421eb1e064a729e2a5f481fa" dependencies = [ "futures-channel", "futures-core", @@ -1930,9 +1930,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.21" +version = "0.3.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3083ce4b914124575708913bca19bfe887522d6e2e6d0952943f5eac4a74010" +checksum = "2bfc52cbddcfd745bf1740338492bb0bd83d76c67b445f91c5fb29fae29ecaa1" dependencies = [ "futures-core", "futures-sink", @@ -1940,15 +1940,15 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.21" +version = "0.3.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c09fd04b7e4073ac7156a9539b57a484a8ea920f79c7c675d05d289ab6110d3" +checksum = "d2acedae88d38235936c3922476b10fced7b2b68136f5e3c03c2d5be348a1115" [[package]] name = "futures-executor" -version = "0.3.21" +version = "0.3.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9420b90cfa29e327d0429f19be13e7ddb68fa1cccb09d65e5706b8c7a749b8a6" +checksum = "1d11aa21b5b587a64682c0094c2bdd4df0076c5324961a40cc3abd7f37930528" dependencies = [ "futures-core", "futures-task", @@ -1969,9 +1969,9 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.21" +version = "0.3.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fc4045962a5a5e935ee2fdedaa4e08284547402885ab326734432bed5d12966b" +checksum = "93a66fc6d035a26a3ae255a6d2bca35eda63ae4c5512bef54449113f7a1228e5" [[package]] name = "futures-lite" @@ -2001,9 +2001,9 @@ dependencies = [ [[package]] name = "futures-macro" -version = "0.3.21" +version = "0.3.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33c1e13800337f4d4d7a316bf45a567dbcb6ffe087f16424852d97e97a91f512" +checksum = "0db9cce532b0eae2ccf2766ab246f114b56b9cf6d445e00c2549fbc100ca045d" dependencies = [ "proc-macro2", "quote", @@ -2012,15 +2012,15 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.21" +version = "0.3.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21163e139fa306126e6eedaf49ecdb4588f939600f0b1e770f4205ee4b7fa868" +checksum = "ca0bae1fe9752cf7fd9b0064c674ae63f97b37bc714d745cbde0afb7ec4e6765" [[package]] name = "futures-task" -version = "0.3.21" +version = "0.3.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "57c66a976bf5909d801bbef33416c41372779507e7a6b3a5e25e4749c58f776a" +checksum = "842fc63b931f4056a24d59de13fb1272134ce261816e063e634ad0c15cdc5306" [[package]] name = "futures-timer" @@ -2030,9 +2030,9 @@ checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c" [[package]] name = "futures-util" -version = "0.3.21" +version = "0.3.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d8b7abd5d659d9b90c8cba917f6ec750a74e2dc23902ef9cd4cc8c8b22e6036a" +checksum = "f0828a5471e340229c11c77ca80017937ce3c58cb788a17e5f1c2d5c485a9577" dependencies = [ "futures-channel", "futures-core", @@ -3646,11 +3646,12 @@ dependencies = [ ] [[package]] -name = "redis-cell-client" +name = "redis-rate-limit" version = "0.2.0" dependencies = [ "anyhow", "bb8-redis", + "tracing", ] [[package]] @@ -5336,7 +5337,7 @@ dependencies = [ "proctitle", "prometheus-client", "rand", - "redis-cell-client", + "redis-rate-limit", "regex", "reqwest", "rustc-hash", diff --git a/Cargo.toml b/Cargo.toml index 9cd9e09b..6c860d9d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,10 +1,10 @@ [workspace] members = [ "entities", - "migration", "fifomap", "linkedhashmap", - "redis-cell-client", + "migration", + "redis-rate-limit", "web3_proxy", ] diff --git a/linkedhashmap/Cargo.toml b/linkedhashmap/Cargo.toml index 304c8fd5..009ed907 100644 --- a/linkedhashmap/Cargo.toml +++ b/linkedhashmap/Cargo.toml @@ -2,7 +2,7 @@ name = "linkedhashmap" version = "0.2.0" authors = ["quininer "] -edition = "2018" +edition = "2021" [features] inline-more = [ "hashbrown" ] diff --git a/redis-cell-client/src/lib.rs b/redis-cell-client/src/lib.rs deleted file mode 100644 index 059db113..00000000 --- a/redis-cell-client/src/lib.rs +++ /dev/null @@ -1,130 +0,0 @@ -//#![warn(missing_docs)] - -use bb8_redis::redis::cmd; - -pub use bb8_redis::redis::RedisError; -pub use bb8_redis::{bb8, RedisConnectionManager}; - -use std::ops::Add; -use std::time::{Duration, Instant}; - -pub type RedisPool = bb8::Pool; - -pub struct RedisCell { - pool: RedisPool, - key: String, - default_max_burst: u32, - default_count_per_period: u32, - default_period: u32, -} - -pub enum ThrottleResult { - Allowed, - RetryAt(Instant), -} - -impl RedisCell { - // todo: seems like this could be derived - // TODO: take something generic for conn - // TODO: use r2d2 for connection pooling? - pub fn new( - pool: RedisPool, - app: &str, - key: &str, - default_max_burst: u32, - default_count_per_period: u32, - default_period: u32, - ) -> Self { - let key = format!("{}:redis-cell:{}", app, key); - - Self { - pool, - key, - default_max_burst, - default_count_per_period, - default_period, - } - } - - #[inline] - async fn _throttle( - &self, - key: &str, - max_burst: Option, - count_per_period: Option, - period: Option, - quantity: u32, - ) -> anyhow::Result { - let mut conn = self.pool.get().await?; - - let count_per_period = count_per_period.unwrap_or(self.default_count_per_period); - - if count_per_period == 0 { - return Ok(ThrottleResult::Allowed); - } - - let max_burst = max_burst.unwrap_or(self.default_max_burst); - let period = period.unwrap_or(self.default_period); - - /* - https://github.com/brandur/redis-cell#response - - CL.THROTTLE [] - - 0. Whether the action was limited: - 0 indicates the action is allowed. - 1 indicates that the action was limited/blocked. - 1. The total limit of the key (max_burst + 1). This is equivalent to the common X-RateLimit-Limit HTTP header. - 2. The remaining limit of the key. Equivalent to X-RateLimit-Remaining. - 3. The number of seconds until the user should retry, and always -1 if the action was allowed. Equivalent to Retry-After. - 4. The number of seconds until the limit will reset to its maximum capacity. Equivalent to X-RateLimit-Reset. - */ - // TODO: don't unwrap. maybe return anyhow::Result> - // TODO: should we return more error info? - let x: Vec = cmd("CL.THROTTLE") - .arg(&(key, max_burst, count_per_period, period, quantity)) - .query_async(&mut *conn) - .await?; - - // TODO: trace log the result? - - if x.len() != 5 { - return Err(anyhow::anyhow!("unexpected redis result")); - } - - let retry_after = *x.get(3).expect("index exists above"); - - if retry_after == -1 { - Ok(ThrottleResult::Allowed) - } else { - // don't return a duration, return an instant - let retry_at = Instant::now().add(Duration::from_secs(retry_after as u64)); - - Ok(ThrottleResult::RetryAt(retry_at)) - } - } - - #[inline] - pub async fn throttle(&self) -> anyhow::Result { - self._throttle(&self.key, None, None, None, 1).await - } - - #[inline] - pub async fn throttle_key( - &self, - key: &str, - max_burst: Option, - count_per_period: Option, - period: Option, - ) -> anyhow::Result { - let key = format!("{}:{}", self.key, key); - - self._throttle(key.as_ref(), max_burst, count_per_period, period, 1) - .await - } - - #[inline] - pub async fn throttle_quantity(&self, quantity: u32) -> anyhow::Result { - self._throttle(&self.key, None, None, None, quantity).await - } -} diff --git a/redis-cell-server/Dockerfile b/redis-cell-server/Dockerfile deleted file mode 100644 index 5eab503b..00000000 --- a/redis-cell-server/Dockerfile +++ /dev/null @@ -1,18 +0,0 @@ -# A redis server with the libredis_cell module installed -FROM rust:1-bullseye as builder - -WORKDIR /usr/src/redis-cell -RUN \ - --mount=type=cache,target=/usr/local/cargo/registry \ - --mount=type=cache,target=/usr/src/web3_proxy/target \ - { \ - set -eux; \ - git clone -b v0.3.0 https://github.com/brandur/redis-cell .; \ - cargo build --release; \ - } - -FROM redis:bullseye - -COPY --from=builder /usr/src/redis-cell/target/release/libredis_cell.so /usr/lib/redis/modules/ - -CMD ["redis-server", "--loadmodule", "/usr/lib/redis/modules/libredis_cell.so"] diff --git a/redis-cell-client/Cargo.toml b/redis-rate-limit/Cargo.toml similarity index 68% rename from redis-cell-client/Cargo.toml rename to redis-rate-limit/Cargo.toml index e4bf8e95..990d8504 100644 --- a/redis-cell-client/Cargo.toml +++ b/redis-rate-limit/Cargo.toml @@ -1,9 +1,10 @@ [package] -name = "redis-cell-client" +name = "redis-rate-limit" version = "0.2.0" authors = ["Bryan Stitt "] -edition = "2018" +edition = "2021" [dependencies] anyhow = "1.0.61" bb8-redis = "0.11.0" +tracing = "0.1.36" diff --git a/redis-rate-limit/src/errors.rs b/redis-rate-limit/src/errors.rs new file mode 100644 index 00000000..0f4be3e8 --- /dev/null +++ b/redis-rate-limit/src/errors.rs @@ -0,0 +1,17 @@ +pub use bb8_redis::bb8::ErrorSink as Bb8ErrorSync; +pub use bb8_redis::redis::RedisError; + +use tracing::warn; + +#[derive(Debug, Clone)] +pub struct RedisErrorSink; + +impl Bb8ErrorSync for RedisErrorSink { + fn sink(&self, err: RedisError) { + warn!(?err, "redis error"); + } + + fn boxed_clone(&self) -> Box> { + Box::new(self.clone()) + } +} diff --git a/redis-rate-limit/src/lib.rs b/redis-rate-limit/src/lib.rs new file mode 100644 index 00000000..fbefc05e --- /dev/null +++ b/redis-rate-limit/src/lib.rs @@ -0,0 +1,100 @@ +//#![warn(missing_docs)] +mod errors; + +use anyhow::Context; +use bb8_redis::redis::pipe; +use std::ops::Add; +use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; + +pub use crate::errors::{RedisError, RedisErrorSink}; +pub use bb8_redis::{bb8, RedisConnectionManager}; + +pub type RedisPool = bb8::Pool; + +pub struct RedisRateLimit { + pool: RedisPool, + key_prefix: String, + default_max_per_period: u64, + period: u64, +} + +pub enum ThrottleResult { + Allowed, + RetryAt(Instant), + RetryNever, +} + +impl RedisRateLimit { + pub fn new( + pool: RedisPool, + app: &str, + label: &str, + default_max_per_period: u64, + period: u64, + ) -> Self { + let key_prefix = format!("{}:rrl:{}", app, label); + + Self { + pool, + key_prefix, + default_max_per_period, + period, + } + } + + /// label might be an ip address or a user_key id + pub async fn throttle_label( + &self, + label: &str, + max_per_period: Option, + count: u64, + ) -> anyhow::Result { + let max_per_period = max_per_period.unwrap_or(self.default_max_per_period); + + if max_per_period == 0 { + return Ok(ThrottleResult::RetryNever); + } + + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .context("cannot tell the time")? + .as_secs(); + + let period_id = now % self.period; + + let throttle_key = format!("{}:{}:{}", self.key_prefix, label, period_id); + + let mut conn = self.pool.get().await?; + + let x: Vec = pipe() + // we could get the key first, but that means an extra redis call for every check. this seems better + .incr(&throttle_key, count) + // set expiration the first time we set the key. ignore the result + .expire(&throttle_key, self.period.try_into().unwrap()) + .arg("NX") + .ignore() + // do the query + .query_async(&mut *conn) + .await + .context("increment rate limit")?; + + let new_count = x + .first() + .ok_or_else(|| anyhow::anyhow!("check rate limit result"))?; + + if new_count > &max_per_period { + let seconds_left_in_period = self.period - now / self.period; + + let retry_at = Instant::now().add(Duration::from_secs(seconds_left_in_period)); + + return Ok(ThrottleResult::RetryAt(retry_at)); + } + + Ok(ThrottleResult::Allowed) + } + + #[inline] + pub async fn throttle(&self) -> anyhow::Result { + self.throttle_label("", None, 1).await + } +} diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index 880c66b3..7bdd5b42 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -12,9 +12,9 @@ deadlock_detection = ["parking_lot/deadlock_detection"] verbose_db = ["sea-orm/debug-print"] [dependencies] -redis-cell-client = { path = "../redis-cell-client" } entities = { path = "../entities" } migration = { path = "../migration" } +redis-rate-limit = { path = "../redis-rate-limit" } anyhow = { version = "1.0.61", features = ["backtrace"] } arc-swap = "1.5.1" @@ -28,7 +28,7 @@ dotenv = "0.15.0" ethers = { version = "0.17.0", features = ["rustls", "ws"] } fdlimit = "0.2.1" flume = "0.10.14" -futures = { version = "0.3.21", features = ["thread-pool"] } +futures = { version = "0.3.23", features = ["thread-pool"] } hashbrown = { version = "0.12.3", features = ["serde"] } indexmap = "1.9.1" fifomap = { path = "../fifomap" } diff --git a/web3_proxy/src/app.rs b/web3_proxy/src/app.rs index 4e5424b7..c1eeded9 100644 --- a/web3_proxy/src/app.rs +++ b/web3_proxy/src/app.rs @@ -15,8 +15,10 @@ use futures::stream::StreamExt; use futures::Future; use migration::{Migrator, MigratorTrait}; use parking_lot::RwLock; -use redis_cell_client::bb8::ErrorSink; -use redis_cell_client::{bb8, RedisCell, RedisConnectionManager, RedisPool}; +use redis_rate_limit::{ + bb8::{self, ErrorSink}, + RedisConnectionManager, RedisErrorSink, RedisPool, RedisRateLimit, +}; use sea_orm::DatabaseConnection; use serde_json::json; use std::fmt; @@ -32,7 +34,6 @@ use tokio_stream::wrappers::{BroadcastStream, WatchStream}; use tracing::{info, info_span, instrument, trace, warn, Instrument}; use uuid::Uuid; -use crate::bb8_helpers; use crate::block_helpers::block_needed; use crate::config::{AppConfig, TopConfig}; use crate::connections::Web3Connections; @@ -71,7 +72,7 @@ pub enum TxState { pub struct UserCacheValue { pub expires_at: Instant, pub user_id: u64, - pub user_count_per_period: u32, + pub user_count_per_period: u64, } /// flatten a JoinError into an anyhow error @@ -143,7 +144,7 @@ pub struct Web3ProxyApp { pub config: AppConfig, pub db_conn: Option, pub pending_transactions: Arc>, - pub rate_limiter: Option, + pub rate_limiter: Option, pub redis_pool: Option, pub stats: AppStats, pub user_cache: RwLock>, @@ -217,7 +218,7 @@ impl Web3ProxyApp { // TODO: min_idle? // TODO: set max_size based on max expected concurrent connections? set based on num_workers? let builder = bb8::Pool::builder() - .error_sink(bb8_helpers::RedisErrorSink.boxed_clone()) + .error_sink(RedisErrorSink.boxed_clone()) .min_idle(Some(min_size)) .max_size(max_size); @@ -284,16 +285,11 @@ impl Web3ProxyApp { private_rpcs }; - // TODO: how much should we allow? - // TODO: im seeing errors in redis. just use the redis FAQ on rate limiting. its really simple - let public_max_burst = top_config.app.public_rate_limit_per_minute / 3; - let frontend_rate_limiter = redis_pool.as_ref().map(|redis_pool| { - RedisCell::new( + RedisRateLimit::new( redis_pool.clone(), "web3_proxy", "frontend", - public_max_burst, top_config.app.public_rate_limit_per_minute, 60, ) diff --git a/web3_proxy/src/bb8_helpers.rs b/web3_proxy/src/bb8_helpers.rs deleted file mode 100644 index 847364e7..00000000 --- a/web3_proxy/src/bb8_helpers.rs +++ /dev/null @@ -1,17 +0,0 @@ -// TODO: move this into redis-cell-client - -use redis_cell_client::bb8; -use tracing::warn; - -#[derive(Debug, Clone)] -pub struct RedisErrorSink; - -impl bb8::ErrorSink for RedisErrorSink { - fn sink(&self, err: redis_cell_client::RedisError) { - warn!(?err, "redis error"); - } - - fn boxed_clone(&self) -> Box> { - Box::new(self.clone()) - } -} diff --git a/web3_proxy/src/bin/web3_proxy.rs b/web3_proxy/src/bin/web3_proxy.rs index 09758b04..7abe05b8 100644 --- a/web3_proxy/src/bin/web3_proxy.rs +++ b/web3_proxy/src/bin/web3_proxy.rs @@ -145,7 +145,7 @@ fn main() -> anyhow::Result<()> { let cli_config: CliConfig = argh::from_env(); // advanced configuration is on disk - info!("Loading rpc config @ {}", cli_config.config); + info!("Loading config @ {}", cli_config.config); let top_config: String = fs::read_to_string(cli_config.config.clone())?; let top_config: TopConfig = toml::from_str(&top_config)?; diff --git a/web3_proxy/src/bin/web3_proxy_cli/check_config.rs b/web3_proxy/src/bin/web3_proxy_cli/check_config.rs index da6e74c6..700a6287 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/check_config.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/check_config.rs @@ -1,16 +1,25 @@ use argh::FromArgs; +use std::fs; +use tracing::info; +use web3_proxy::config::TopConfig; #[derive(FromArgs, PartialEq, Eq, Debug)] /// Second subcommand. #[argh(subcommand, name = "check_config")] pub struct CheckConfigSubCommand { - #[argh(switch)] - /// whether to fooey - fooey: bool, + #[argh(option)] + /// path to the configuration toml. + path: String, } impl CheckConfigSubCommand { pub async fn main(self) -> anyhow::Result<()> { - todo!() + info!("Loading config @ {}", self.path); + let top_config: String = fs::read_to_string(self.path)?; + let top_config: TopConfig = toml::from_str(&top_config)?; + + info!("config: {:?}", top_config); + + Ok(()) } } diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index b4602d9d..1277b192 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -46,7 +46,7 @@ pub struct AppConfig { pub db_url: Option, pub redis_url: Option, #[serde(default = "default_public_rate_limit_per_minute")] - pub public_rate_limit_per_minute: u32, + pub public_rate_limit_per_minute: u64, #[serde(default = "default_response_cache_max_bytes")] pub response_cache_max_bytes: usize, /// the stats page url for an anonymous user. @@ -55,7 +55,7 @@ pub struct AppConfig { pub redirect_user_url: String, } -fn default_public_rate_limit_per_minute() -> u32 { +fn default_public_rate_limit_per_minute() -> u64 { 0 } @@ -69,7 +69,7 @@ fn default_response_cache_max_bytes() -> usize { pub struct Web3ConnectionConfig { url: String, soft_limit: u32, - hard_limit: Option, + hard_limit: Option, weight: u32, } @@ -80,14 +80,14 @@ impl Web3ConnectionConfig { pub async fn spawn( self, name: String, - redis_client_pool: Option, + redis_pool: Option, chain_id: u64, http_client: Option, http_interval_sender: Option>>, block_sender: Option>, tx_id_sender: Option)>>, ) -> anyhow::Result<(Arc, AnyhowJoinHandle<()>)> { - let hard_limit = match (self.hard_limit, redis_client_pool) { + let hard_limit = match (self.hard_limit, redis_pool) { (None, None) => None, (Some(hard_limit), Some(redis_client_pool)) => Some((hard_limit, redis_client_pool)), (None, Some(_)) => None, diff --git a/web3_proxy/src/connection.rs b/web3_proxy/src/connection.rs index 763c0446..45f6ab05 100644 --- a/web3_proxy/src/connection.rs +++ b/web3_proxy/src/connection.rs @@ -5,7 +5,7 @@ use ethers::prelude::{Block, Bytes, Middleware, ProviderError, TxHash, H256, U64 use futures::future::try_join_all; use futures::StreamExt; use parking_lot::RwLock; -use redis_cell_client::{RedisCell, ThrottleResult}; +use redis_rate_limit::{RedisPool, RedisRateLimit, ThrottleResult}; use serde::ser::{SerializeStruct, Serializer}; use serde::Serialize; use std::fmt; @@ -82,7 +82,7 @@ pub struct Web3Connection { /// it is an async lock because we hold it open across awaits provider: AsyncRwLock>>, /// rate limits are stored in a central redis so that multiple proxies can share their rate limits - hard_limit: Option, + hard_limit: Option, /// used for load balancing to the least loaded server pub soft_limit: u32, block_data_limit: AtomicU64, @@ -102,7 +102,8 @@ impl Web3Connection { // optional because this is only used for http providers. websocket providers don't use it http_client: Option, http_interval_sender: Option>>, - hard_limit: Option<(u32, redis_cell_client::RedisPool)>, + // TODO: have a builder struct for this. + hard_limit: Option<(u64, RedisPool)>, // TODO: think more about this type soft_limit: u32, block_sender: Option>, @@ -113,12 +114,11 @@ impl Web3Connection { let hard_limit = hard_limit.map(|(hard_rate_limit, redis_conection)| { // TODO: allow configurable period and max_burst let period = 1; - RedisCell::new( + RedisRateLimit::new( redis_conection, "web3_proxy", &format!("{}:{}", chain_id, url_str), hard_rate_limit, - hard_rate_limit, period, ) }); @@ -614,6 +614,9 @@ impl Web3Connection { return Ok(HandleResult::RetryAt(retry_at.into())); } + Ok(ThrottleResult::RetryNever) => { + return Err(anyhow::anyhow!("Rate limit failed.")); + } Err(err) => { return Err(err); } diff --git a/web3_proxy/src/connections.rs b/web3_proxy/src/connections.rs index 7c57df03..380978bc 100644 --- a/web3_proxy/src/connections.rs +++ b/web3_proxy/src/connections.rs @@ -137,7 +137,7 @@ impl Web3Connections { chain_id: u64, server_configs: HashMap, http_client: Option, - redis_client_pool: Option, + redis_client_pool: Option, head_block_sender: Option>>>, pending_tx_sender: Option>, pending_transactions: Arc>, diff --git a/web3_proxy/src/frontend/rate_limit.rs b/web3_proxy/src/frontend/rate_limit.rs index 5e3b2328..b2509377 100644 --- a/web3_proxy/src/frontend/rate_limit.rs +++ b/web3_proxy/src/frontend/rate_limit.rs @@ -1,6 +1,6 @@ use axum::response::Response; use entities::user_keys; -use redis_cell_client::ThrottleResult; +use redis_rate_limit::ThrottleResult; use reqwest::StatusCode; use sea_orm::{ ColumnTrait, DeriveColumn, EntityTrait, EnumIter, IdenStatic, QueryFilter, QuerySelect, @@ -52,21 +52,24 @@ impl RateLimitResult { impl Web3ProxyApp { pub async fn rate_limit_by_ip(&self, ip: IpAddr) -> anyhow::Result { - let rate_limiter_key = format!("ip-{}", ip); + let rate_limiter_label = format!("ip-{}", ip); // TODO: dry this up with rate_limit_by_key if let Some(rate_limiter) = &self.rate_limiter { match rate_limiter - .throttle_key(&rate_limiter_key, None, None, None) + .throttle_label(&rate_limiter_label, None, 1) .await { Ok(ThrottleResult::Allowed) => {} Ok(ThrottleResult::RetryAt(_retry_at)) => { // TODO: set headers so they know when they can retry - debug!(?rate_limiter_key, "rate limit exceeded"); // this is too verbose, but a stat might be good - // TODO: use their id if possible + debug!(?rate_limiter_label, "rate limit exceeded"); // this is too verbose, but a stat might be good + // TODO: use their id if possible return Ok(RateLimitResult::IpRateLimitExceeded(ip)); } + Ok(ThrottleResult::RetryNever) => { + return Err(anyhow::anyhow!("blocked by rate limiter")) + } Err(err) => { // internal error, not rate limit being hit // TODO: i really want axum to do this for us in a single place. @@ -148,16 +151,11 @@ impl Web3ProxyApp { // user key is valid. now check rate limits if let Some(rate_limiter) = &self.rate_limiter { - // TODO: how does max burst actually work? what should it be? - let user_max_burst = user_data.user_count_per_period / 3; - let user_period = 60; - if rate_limiter - .throttle_key( + .throttle_label( &user_key.to_string(), - Some(user_max_burst), Some(user_data.user_count_per_period), - Some(user_period), + 1, ) .await .is_err() diff --git a/web3_proxy/src/lib.rs b/web3_proxy/src/lib.rs index 001567f7..5da543cf 100644 --- a/web3_proxy/src/lib.rs +++ b/web3_proxy/src/lib.rs @@ -1,5 +1,4 @@ pub mod app; -pub mod bb8_helpers; pub mod block_helpers; pub mod config; pub mod connection;