diff --git a/Cargo.lock b/Cargo.lock index d5543075..5c7130c6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1256,6 +1256,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "deferred-rate-limiter" +version = "0.2.0" +dependencies = [ + "anyhow", + "moka", + "redis-rate-limiter", + "tokio", + "tracing", +] + [[package]] name = "der" version = "0.5.1" @@ -3752,7 +3763,7 @@ dependencies = [ ] [[package]] -name = "redis-rate-limit" +name = "redis-rate-limiter" version = "0.2.0" dependencies = [ "anyhow", @@ -5529,6 +5540,7 @@ dependencies = [ "axum-macros", "counter", "dashmap", + "deferred-rate-limiter", "derive_more", "dotenv", "entities", @@ -5548,7 +5560,7 @@ dependencies = [ "petgraph", "proctitle", "rand 0.8.5", - "redis-rate-limit", + "redis-rate-limiter", "regex", "reqwest", "rustc-hash", diff --git a/Cargo.toml b/Cargo.toml index 01a2095c..bde23c3c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,8 +1,9 @@ [workspace] members = [ + "deferred-rate-limiter", "entities", "migration", - "redis-rate-limit", + "redis-rate-limiter", "web3_proxy", ] diff --git a/TODO.md b/TODO.md index 5589f988..35ab4c59 100644 --- a/TODO.md +++ b/TODO.md @@ -206,6 +206,8 @@ These are not yet ordered. - [ ] on ETH, we no longer use total difficulty, but other chains might - if total difficulty is not on the block and we aren't on ETH, fetch the full block instead of just the header - if total difficulty is set and non-zero, use it for consensus instead of just the number +- [ ] if we subscribe to a server that is syncing, it gives us null block_data_limit. when it catches up, we don't ever send queries to it. we need to recheck block_data_limit + new endpoints for users (not totally sure about the exact paths, but these features are all needed): - [x] GET /u/:api_key diff --git a/deferred-rate-limiter/Cargo.toml b/deferred-rate-limiter/Cargo.toml new file mode 100644 index 00000000..44078934 --- /dev/null +++ b/deferred-rate-limiter/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "deferred-rate-limiter" +version = "0.2.0" +authors = ["Bryan Stitt "] +edition = "2021" + +[dependencies] +redis-rate-limiter = { path = "../redis-rate-limiter" } + +anyhow = "1.0.65" +moka = { version = "0.9.4", default-features = false, features = ["future"] } +tokio = "1.21.1" +tracing = "0.1.36" diff --git a/deferred-rate-limiter/src/lib.rs b/deferred-rate-limiter/src/lib.rs new file mode 100644 index 00000000..2dc586c1 --- /dev/null +++ b/deferred-rate-limiter/src/lib.rs @@ -0,0 +1,119 @@ +//#![warn(missing_docs)] +use moka::future::Cache; +use redis_rate_limiter::{RedisRateLimitResult, RedisRateLimiter}; +use std::cell::Cell; +use std::cmp::Eq; +use std::fmt::Display; +use std::hash::Hash; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{atomic::AtomicU64, Arc}; +use tokio::time::Instant; + +/// A local cache that sits in front of a RedisRateLimiter +/// Generic accross the key so it is simple to use with IPs or user keys +pub struct DeferredRateLimiter +where + K: Send + Sync, +{ + local_cache: Cache>, + prefix: String, + rrl: RedisRateLimiter, +} + +pub enum DeferredRateLimitResult { + Allowed, + RetryAt(Instant), + RetryNever, +} + +impl DeferredRateLimiter +where + K: Copy + Display + Hash + Eq + Send + Sync + 'static, +{ + pub fn new(cache_size: u64, prefix: &str, rrl: RedisRateLimiter) -> Self { + Self { + local_cache: Cache::new(cache_size), + prefix: prefix.to_string(), + rrl, + } + } + + /// if setting max_per_period, be sure to keep the period the same for all requests to this label + pub async fn throttle( + &self, + key: &K, + max_per_period: Option, + count: u64, + ) -> anyhow::Result { + let max_per_period = max_per_period.unwrap_or(self.rrl.max_requests_per_period); + + if max_per_period == 0 { + return Ok(DeferredRateLimitResult::RetryNever); + } + + let arc_new_entry = Arc::new(AtomicBool::new(false)); + let arc_retry_at = Arc::new(Cell::new(None)); + + // TODO: DO NOT UNWRAP HERE. figure out how to handle anyhow error being wrapped in an Arc + // TODO: i'm sure this could be a lot better. but race conditions make this hard to think through. brain needs sleep + let key_count = { + let arc_new_entry = arc_new_entry.clone(); + let arc_retry_at = arc_retry_at.clone(); + + self.local_cache + .try_get_with(*key, async move { + arc_new_entry.store(true, Ordering::Release); + + let label = format!("{}:{}", self.prefix, key); + + let redis_count = match self + .rrl + .throttle_label(&label, Some(max_per_period), count) + .await? + { + RedisRateLimitResult::Allowed(count) => count, + RedisRateLimitResult::RetryAt(retry_at, count) => { + arc_retry_at.set(Some(retry_at)); + count + } + RedisRateLimitResult::RetryNever => unimplemented!(), + }; + + Ok::<_, anyhow::Error>(Arc::new(AtomicU64::new(redis_count))) + }) + .await + .unwrap() + }; + + if arc_new_entry.load(Ordering::Acquire) { + // new entry + if let Some(retry_at) = arc_retry_at.get() { + Ok(DeferredRateLimitResult::RetryAt(retry_at)) + } else { + Ok(DeferredRateLimitResult::Allowed) + } + } else { + // we have a cached amount here + + // increment our local count if + + let f = async move { + let label = format!("{}:{}", self.prefix, key); + + let redis_count = match self + .rrl + .throttle_label(&label, Some(max_per_period), count) + .await? + { + RedisRateLimitResult::Allowed(count) => todo!("do something with allow"), + RedisRateLimitResult::RetryAt(retry_at, count) => todo!("do something with retry at") + RedisRateLimitResult::RetryNever => unimplemented!(), + }; + + Ok::<_, anyhow::Error>(()) + }; + + todo!("write more"); + } + } +} diff --git a/redis-rate-limit/src/errors.rs b/redis-rate-limit/src/errors.rs deleted file mode 100644 index 43edf9dd..00000000 --- a/redis-rate-limit/src/errors.rs +++ /dev/null @@ -1,17 +0,0 @@ -// pub use bb8_redis::bb8::ErrorSink as Bb8ErrorSync; -// pub use bb8_redis::redis::RedisError; - -// use tracing::error; - -// #[derive(Debug, Clone)] -// pub struct RedisErrorSink; - -// impl Bb8ErrorSync for RedisErrorSink { -// fn sink(&self, err: RedisError) { -// error!(?err, "redis error"); -// } - -// fn boxed_clone(&self) -> Box> { -// Box::new(self.clone()) -// } -// } diff --git a/redis-rate-limit/Cargo.toml b/redis-rate-limiter/Cargo.toml similarity index 89% rename from redis-rate-limit/Cargo.toml rename to redis-rate-limiter/Cargo.toml index c610ab60..cb580f6a 100644 --- a/redis-rate-limit/Cargo.toml +++ b/redis-rate-limiter/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "redis-rate-limit" +name = "redis-rate-limiter" version = "0.2.0" authors = ["Bryan Stitt "] edition = "2021" diff --git a/redis-rate-limit/src/lib.rs b/redis-rate-limiter/src/lib.rs similarity index 72% rename from redis-rate-limit/src/lib.rs rename to redis-rate-limiter/src/lib.rs index ecca74b9..465101b2 100644 --- a/redis-rate-limit/src/lib.rs +++ b/redis-rate-limiter/src/lib.rs @@ -1,41 +1,39 @@ //#![warn(missing_docs)] -mod errors; - use anyhow::Context; -use deadpool_redis::redis::pipe; use std::ops::Add; use std::time::{SystemTime, UNIX_EPOCH}; use tokio::time::{Duration, Instant}; use tracing::{debug, trace}; pub use deadpool_redis::redis; -pub use deadpool_redis::{Config, Connection, Manager, Pool, Runtime}; +pub use deadpool_redis::{ + Config as RedisConfig, Connection as RedisConnection, Manager as RedisManager, + Pool as RedisPool, Runtime as DeadpoolRuntime, +}; -// pub use crate::errors::{RedisError, RedisErrorSink}; -// pub use bb8_redis::{bb8, redis, RedisConnectionManager}; - -pub struct RedisRateLimit { - pool: Pool, +#[derive(Clone)] +pub struct RedisRateLimiter { key_prefix: String, /// The default maximum requests allowed in a period. - max_requests_per_period: u64, + pub max_requests_per_period: u64, /// seconds - period: f32, + pub period: f32, + pool: RedisPool, } -pub enum ThrottleResult { - Allowed, - RetryAt(Instant), +pub enum RedisRateLimitResult { + Allowed(u64), + RetryAt(Instant, u64), RetryNever, } -impl RedisRateLimit { +impl RedisRateLimiter { pub fn new( - pool: Pool, app: &str, label: &str, max_requests_per_period: u64, period: f32, + pool: RedisPool, ) -> Self { let key_prefix = format!("{}:rrl:{}", app, label); @@ -49,17 +47,16 @@ impl RedisRateLimit { /// label might be an ip address or a user_key id. /// if setting max_per_period, be sure to keep the period the same for all requests to this label - /// TODO: pub async fn throttle_label( &self, label: &str, max_per_period: Option, count: u64, - ) -> anyhow::Result { + ) -> anyhow::Result { let max_per_period = max_per_period.unwrap_or(self.max_requests_per_period); if max_per_period == 0 { - return Ok(ThrottleResult::RetryNever); + return Ok(RedisRateLimitResult::RetryNever); } let now = SystemTime::now() @@ -70,13 +67,14 @@ impl RedisRateLimit { // if self.period is 60, period_id will be the minute of the current time let period_id = (now / self.period) % self.period; + // TODO: include max per period in the throttle key? let throttle_key = format!("{}:{}:{}", self.key_prefix, label, period_id); let mut conn = self.pool.get().await.context("throttle")?; // TODO: at high concurency, i think this is giving errors // TODO: i'm starting to think that bb8 has a bug - let x: Vec = pipe() + let x: Vec = redis::pipe() // we could get the key first, but that means an extra redis call for every check. this seems better .incr(&throttle_key, count) // set expiration each time we set the key. ignore the result @@ -89,28 +87,25 @@ impl RedisRateLimit { .await .context("increment rate limit")?; - let new_count = x.first().context("check rate limit result")?; + // TODO: is there a better way to do this? + let new_count = *x.first().context("check rate limit result")?; - if new_count > &max_per_period { + if new_count > max_per_period { let seconds_left_in_period = self.period - (now % self.period); let retry_at = Instant::now().add(Duration::from_secs_f32(seconds_left_in_period)); debug!(%label, ?retry_at, "rate limited: {}/{}", new_count, max_per_period); - Ok(ThrottleResult::RetryAt(retry_at)) + Ok(RedisRateLimitResult::RetryAt(retry_at, new_count)) } else { trace!(%label, "NOT rate limited: {}/{}", new_count, max_per_period); - Ok(ThrottleResult::Allowed) + Ok(RedisRateLimitResult::Allowed(new_count)) } } #[inline] - pub async fn throttle(&self) -> anyhow::Result { + pub async fn throttle(&self) -> anyhow::Result { self.throttle_label("", None, 1).await } - - pub fn max_requests_per_period(&self) -> u64 { - self.max_requests_per_period - } } diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index ed668774..a62913d0 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -11,10 +11,13 @@ default = ["deadlock_detection", "verbose_db"] deadlock_detection = ["parking_lot/deadlock_detection"] verbose_db = ["sea-orm/debug-print"] +# TODO: turn tokio-console on with a feature. console-subscriber = { version = "0.1.7" } + [dependencies] +deferred-rate-limiter = { path = "../deferred-rate-limiter" } entities = { path = "../entities" } migration = { path = "../migration" } -redis-rate-limit = { path = "../redis-rate-limit" } +redis-rate-limiter = { path = "../redis-rate-limiter" } anyhow = { version = "1.0.65", features = ["backtrace"] } arc-swap = "1.5.1" @@ -23,7 +26,6 @@ axum = { version = "0.5.16", features = ["headers", "serde_json", "tokio-tungste axum-auth = "0.3.0" axum-client-ip = "0.2.0" axum-macros = "0.2.3" -# TODO: turn tokio-console on with a feature. console-subscriber = { version = "0.1.7" } counter = "0.5.6" dashmap = "5.4.0" derive_more = "0.99.17" diff --git a/web3_proxy/src/app.rs b/web3_proxy/src/app.rs index d19554a1..57343792 100644 --- a/web3_proxy/src/app.rs +++ b/web3_proxy/src/app.rs @@ -12,6 +12,7 @@ use crate::rpcs::request::OpenRequestHandleMetrics; use crate::rpcs::transactions::TxStatus; use anyhow::Context; use axum::extract::ws::Message; +use deferred_rate_limiter::DeferredRateLimiter; use derive_more::From; use ethers::core::utils::keccak256; use ethers::prelude::{Address, Block, Bytes, TxHash, H256, U64}; @@ -24,11 +25,12 @@ use hashbrown::HashMap; use metered::{metered, ErrorCount, HitCount, ResponseTime, Throughput}; use migration::{Migrator, MigratorTrait}; use moka::future::Cache; -use redis_rate_limit::{Config as RedisConfig, Pool as RedisPool, RedisRateLimit, Runtime}; +use redis_rate_limiter::{DeadpoolRuntime, RedisConfig, RedisPool, RedisRateLimiter}; use sea_orm::DatabaseConnection; use serde::Serialize; use serde_json::json; use std::fmt; +use std::net::IpAddr; use std::pin::Pin; use std::str::FromStr; use std::sync::atomic::{self, AtomicUsize}; @@ -83,7 +85,8 @@ pub struct Web3ProxyApp { open_request_handle_metrics: Arc, /// store pending transactions that we've seen so that we don't send duplicates to subscribers pub pending_transactions: Cache, - pub frontend_rate_limiter: Option, + pub frontend_ip_rate_limiter: Option>, + pub frontend_key_rate_limiter: Option>, pub redis_pool: Option, pub user_cache: Cache, } @@ -224,7 +227,7 @@ impl Web3ProxyApp { .create_timeout(Some(Duration::from_secs(5))) .max_size(redis_max_connections) .recycle_timeout(Some(Duration::from_secs(5))) - .runtime(Runtime::Tokio1) + .runtime(DeadpoolRuntime::Tokio1) .build()?; // test the pool @@ -306,18 +309,29 @@ impl Web3ProxyApp { Some(private_rpcs) }; - let frontend_rate_limiter = redis_pool.as_ref().map(|redis_pool| { - RedisRateLimit::new( - redis_pool.clone(), + let mut frontend_ip_rate_limiter = None; + let mut frontend_key_rate_limiter = None; + if let Some(redis_pool) = redis_pool.as_ref() { + let rrl = RedisRateLimiter::new( "web3_proxy", "frontend", top_config.app.public_rate_limit_per_minute, 60.0, - ) - }); + redis_pool.clone(), + ); - // TODO: change this to a sized cache + // TODO: take cache_size from config + frontend_ip_rate_limiter = Some(DeferredRateLimiter::::new( + 10_000, + "ip", + rrl.clone(), + )); + frontend_key_rate_limiter = Some(DeferredRateLimiter::::new(10_000, "key", rrl)); + } + + // TODO: change this to a sized cache. theres some potentially giant responses that will break things let response_cache = Cache::new(10_000); + let user_cache = Cache::new(10_000); let app = Self { @@ -328,7 +342,8 @@ impl Web3ProxyApp { head_block_receiver, pending_tx_sender, pending_transactions, - frontend_rate_limiter, + frontend_ip_rate_limiter, + frontend_key_rate_limiter, db_conn, redis_pool, app_metrics, @@ -610,7 +625,7 @@ impl Web3ProxyApp { } #[instrument(skip_all)] - pub async fn redis_conn(&self) -> anyhow::Result { + pub async fn redis_conn(&self) -> anyhow::Result { match self.redis_pool.as_ref() { None => Err(anyhow::anyhow!("no redis server configured")), Some(redis_pool) => { diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index 1f3477d1..0eab0cae 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -116,7 +116,7 @@ impl Web3ConnectionConfig { pub async fn spawn( self, name: String, - redis_pool: Option, + redis_pool: Option, chain_id: u64, http_client: Option, http_interval_sender: Option>>, diff --git a/web3_proxy/src/frontend/errors.rs b/web3_proxy/src/frontend/errors.rs index a92f9085..52308036 100644 --- a/web3_proxy/src/frontend/errors.rs +++ b/web3_proxy/src/frontend/errors.rs @@ -5,7 +5,7 @@ use axum::{ Json, }; use derive_more::From; -use redis_rate_limit::redis::RedisError; +use redis_rate_limiter::redis::RedisError; use sea_orm::DbErr; use std::{error::Error, net::IpAddr}; use tokio::time::Instant; @@ -18,9 +18,7 @@ pub type FrontendResult = Result; pub enum FrontendErrorResponse { Anyhow(anyhow::Error), Box(Box), - // TODO: update these for Redis Redis(RedisError), - // RedisRun(RunError), Response(Response), Database(DbErr), RateLimitedUser(u64, Option), diff --git a/web3_proxy/src/frontend/rate_limit.rs b/web3_proxy/src/frontend/rate_limit.rs index 0e290ec6..8049fa47 100644 --- a/web3_proxy/src/frontend/rate_limit.rs +++ b/web3_proxy/src/frontend/rate_limit.rs @@ -1,14 +1,14 @@ use super::errors::FrontendErrorResponse; use crate::app::{UserCacheValue, Web3ProxyApp}; use anyhow::Context; +use deferred_rate_limiter::DeferredRateLimitResult; use entities::user_keys; -use redis_rate_limit::ThrottleResult; use sea_orm::{ ColumnTrait, DeriveColumn, EntityTrait, EnumIter, IdenStatic, QueryFilter, QuerySelect, }; use std::{net::IpAddr, time::Duration}; use tokio::time::Instant; -use tracing::{debug, error, trace}; +use tracing::{error, trace}; use uuid::Uuid; #[derive(Debug)] @@ -55,28 +55,19 @@ impl Web3ProxyApp { // TODO: dry this up with rate_limit_by_key // TODO: have a local cache because if we hit redis too hard we get errors // TODO: query redis in the background so that users don't have to wait on this network request - if let Some(rate_limiter) = &self.frontend_rate_limiter { - let rate_limiter_label = format!("ip-{}", ip); - - match rate_limiter - .throttle_label(&rate_limiter_label, None, 1) - .await - { - Ok(ThrottleResult::Allowed) => Ok(RateLimitResult::AllowedIp(ip)), - Ok(ThrottleResult::RetryAt(retry_at)) => { + if let Some(rate_limiter) = &self.frontend_ip_rate_limiter { + match rate_limiter.throttle(&ip, None, 1).await { + Ok(DeferredRateLimitResult::Allowed) => Ok(RateLimitResult::AllowedIp(ip)), + Ok(DeferredRateLimitResult::RetryAt(retry_at)) => { // TODO: set headers so they know when they can retry // TODO: debug or trace? // this is too verbose, but a stat might be good - trace!( - ?rate_limiter_label, - "rate limit exceeded until {:?}", - retry_at - ); + trace!(?ip, "rate limit exceeded until {:?}", retry_at); Ok(RateLimitResult::RateLimitedIp(ip, Some(retry_at))) } - Ok(ThrottleResult::RetryNever) => { + Ok(DeferredRateLimitResult::RetryNever) => { // TODO: i don't think we'll get here. maybe if we ban an IP forever? seems unlikely - debug!(?rate_limiter_label, "rate limit exceeded"); + trace!(?ip, "rate limit is 0"); Ok(RateLimitResult::RateLimitedIp(ip, None)) } Err(err) => { @@ -175,38 +166,32 @@ impl Web3ProxyApp { // user key is valid. now check rate limits // TODO: this is throwing errors when curve-api hits us with high concurrency. investigate i think its bb8's fault - if let Some(rate_limiter) = &self.frontend_rate_limiter { + if let Some(rate_limiter) = &self.frontend_key_rate_limiter { // TODO: query redis in the background so that users don't have to wait on this network request // TODO: better key? have a prefix so its easy to delete all of these // TODO: we should probably hash this or something - let rate_limiter_label = format!("user-{}", user_key); - match rate_limiter - .throttle_label( - &rate_limiter_label, - Some(user_data.user_count_per_period), - 1, - ) + .throttle(&user_key, Some(user_data.user_count_per_period), 1) .await { - Ok(ThrottleResult::Allowed) => Ok(RateLimitResult::AllowedUser(user_data.user_id)), - Ok(ThrottleResult::RetryAt(retry_at)) => { + Ok(DeferredRateLimitResult::Allowed) => { + Ok(RateLimitResult::AllowedUser(user_data.user_id)) + } + Ok(DeferredRateLimitResult::RetryAt(retry_at)) => { // TODO: set headers so they know when they can retry // TODO: debug or trace? // this is too verbose, but a stat might be good - trace!( - ?rate_limiter_label, - "rate limit exceeded until {:?}", - retry_at - ); + // TODO: keys are secrets! use the id instead + trace!(?user_key, "rate limit exceeded until {:?}", retry_at); Ok(RateLimitResult::RateLimitedUser( user_data.user_id, Some(retry_at), )) } - Ok(ThrottleResult::RetryNever) => { + Ok(DeferredRateLimitResult::RetryNever) => { // TODO: i don't think we'll get here. maybe if we ban an IP forever? seems unlikely - debug!(?rate_limiter_label, "rate limit exceeded"); + // TODO: keys are secret. don't log them! + trace!(?user_key, "rate limit is 0"); Ok(RateLimitResult::RateLimitedUser(user_data.user_id, None)) } Err(err) => { diff --git a/web3_proxy/src/frontend/users.rs b/web3_proxy/src/frontend/users.rs index ea9f6254..0feb2c52 100644 --- a/web3_proxy/src/frontend/users.rs +++ b/web3_proxy/src/frontend/users.rs @@ -19,13 +19,11 @@ use axum::{ use axum_auth::AuthBearer; use axum_client_ip::ClientIp; use axum_macros::debug_handler; -use http::StatusCode; -use uuid::Uuid; -// use entities::sea_orm_active_enums::Role; use entities::{user, user_keys}; use ethers::{prelude::Address, types::Bytes}; use hashbrown::HashMap; -use redis_rate_limit::redis::AsyncCommands; +use http::StatusCode; +use redis_rate_limiter::redis::AsyncCommands; use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, QueryFilter, TransactionTrait}; use serde::{Deserialize, Serialize}; use siwe::Message; @@ -33,6 +31,7 @@ use std::ops::Add; use std::sync::Arc; use time::{Duration, OffsetDateTime}; use ulid::Ulid; +use uuid::Uuid; // TODO: how do we customize axum's error response? I think we probably want an enum that implements IntoResponse instead #[debug_handler] diff --git a/web3_proxy/src/lib.rs b/web3_proxy/src/lib.rs index d0c8e076..a2da1712 100644 --- a/web3_proxy/src/lib.rs +++ b/web3_proxy/src/lib.rs @@ -4,6 +4,5 @@ pub mod config; pub mod frontend; pub mod jsonrpc; pub mod metrics; -pub mod rate_limiter; pub mod rpcs; pub mod users; diff --git a/web3_proxy/src/rate_limiter.rs b/web3_proxy/src/rate_limiter.rs deleted file mode 100644 index e69de29b..00000000 diff --git a/web3_proxy/src/rpcs/connection.rs b/web3_proxy/src/rpcs/connection.rs index 79e3e1af..f186996f 100644 --- a/web3_proxy/src/rpcs/connection.rs +++ b/web3_proxy/src/rpcs/connection.rs @@ -10,7 +10,7 @@ use futures::future::try_join_all; use futures::StreamExt; use parking_lot::RwLock; use rand::Rng; -use redis_rate_limit::{Pool as RedisPool, RedisRateLimit, ThrottleResult}; +use redis_rate_limiter::{RedisPool, RedisRateLimitResult, RedisRateLimiter}; use serde::ser::{SerializeStruct, Serializer}; use serde::Serialize; use std::cmp::min; @@ -23,11 +23,12 @@ use tokio::sync::RwLock as AsyncRwLock; use tokio::time::{interval, sleep, sleep_until, Duration, MissedTickBehavior}; use tracing::{debug, error, info, instrument, trace, warn}; -/// An active connection to a Web3Rpc +/// An active connection to a Web3 RPC server like geth or erigon. pub struct Web3Connection { pub name: String, /// TODO: can we get this from the provider? do we even need it? url: String, + /// Some connections use an http_client. we keep a clone for reconnecting http_client: Option, /// keep track of currently open requests. We sort on this pub(super) active_requests: AtomicU32, @@ -39,7 +40,8 @@ pub struct Web3Connection { /// it is an async lock because we hold it open across awaits pub(super) provider: AsyncRwLock>>, /// rate limits are stored in a central redis so that multiple proxies can share their rate limits - hard_limit: Option, + /// We do not use the deferred rate limiter because going over limits would cause errors + hard_limit: Option, /// used for load balancing to the least loaded server pub(super) soft_limit: u32, /// TODO: have an enum for this so that "no limit" prints pretty? @@ -74,14 +76,14 @@ impl Web3Connection { weight: u32, open_request_handle_metrics: Arc, ) -> anyhow::Result<(Arc, AnyhowJoinHandle<()>)> { - let hard_limit = hard_limit.map(|(hard_rate_limit, redis_conection)| { - // TODO: allow configurable period and max_burst - RedisRateLimit::new( - redis_conection, + let hard_limit = hard_limit.map(|(hard_rate_limit, redis_pool)| { + // TODO: is cache size 1 okay? i think we need + RedisRateLimiter::new( "web3_proxy", &format!("{}:{}", chain_id, url_str), hard_rate_limit, 60.0, + redis_pool, ) }); @@ -755,10 +757,10 @@ impl Web3Connection { // check rate limits if let Some(ratelimiter) = self.hard_limit.as_ref() { match ratelimiter.throttle().await? { - ThrottleResult::Allowed => { + RedisRateLimitResult::Allowed(_) => { trace!("rate limit succeeded") } - ThrottleResult::RetryAt(retry_at) => { + RedisRateLimitResult::RetryAt(retry_at, _) => { // rate limit failed // save the smallest retry_after. if nothing succeeds, return an Err with retry_after in it // TODO: use tracing better @@ -767,7 +769,7 @@ impl Web3Connection { return Ok(OpenRequestResult::RetryAt(retry_at)); } - ThrottleResult::RetryNever => { + RedisRateLimitResult::RetryNever => { return Ok(OpenRequestResult::RetryNever); } } diff --git a/web3_proxy/src/rpcs/connections.rs b/web3_proxy/src/rpcs/connections.rs index 2cfaa3a7..dea9bed1 100644 --- a/web3_proxy/src/rpcs/connections.rs +++ b/web3_proxy/src/rpcs/connections.rs @@ -56,7 +56,7 @@ impl Web3Connections { chain_id: u64, server_configs: HashMap, http_client: Option, - redis_pool: Option, + redis_pool: Option, block_map: BlockHashesMap, head_block_sender: Option>, min_sum_soft_limit: u32,