From b23426545842c9b99749446860e9b8f54f572644 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Fri, 7 Jul 2023 15:15:41 -0700 Subject: [PATCH] Premium tier changes (#170) * lower premium concurrency in preparation for key+IP limits * include the ip in the user semaphore * 3, not 5 this is our current limit for free * per user_id+ip rate limiting --- Cargo.lock | 7 +- deferred-rate-limiter/src/lib.rs | 6 +- entities/Cargo.toml | 3 +- migration/Cargo.toml | 2 +- migration/src/lib.rs | 2 + .../m20230707_211936_premium_tier_changes.rs | 73 +++++++++++++++ web3_proxy/Cargo.toml | 2 +- web3_proxy/src/app/mod.rs | 20 ++--- web3_proxy/src/caches.rs | 21 +++++ web3_proxy/src/frontend/authorization.rs | 88 +++++++++---------- web3_proxy/src/lib.rs | 1 + web3_proxy/src/stats/mod.rs | 2 +- web3_proxy/src/stats/stat_buffer.rs | 3 +- 13 files changed, 158 insertions(+), 72 deletions(-) create mode 100644 migration/src/m20230707_211936_premium_tier_changes.rs create mode 100644 web3_proxy/src/caches.rs diff --git a/Cargo.lock b/Cargo.lock index b7862c09..0096270c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1678,13 +1678,12 @@ dependencies = [ [[package]] name = "entities" -version = "0.34.0" +version = "0.35.0" dependencies = [ "ethers", "sea-orm", "serde", "ulid", - "uuid 1.4.0", ] [[package]] @@ -3354,7 +3353,7 @@ dependencies = [ [[package]] name = "migration" -version = "0.34.0" +version = "0.35.0" dependencies = [ "sea-orm-migration", "tokio", @@ -7080,7 +7079,7 @@ dependencies = [ [[package]] name = "web3_proxy" -version = "0.34.0" +version = "0.35.0" dependencies = [ "anyhow", "arc-swap", diff --git a/deferred-rate-limiter/src/lib.rs b/deferred-rate-limiter/src/lib.rs index b4ccd035..626afa3e 100644 --- a/deferred-rate-limiter/src/lib.rs +++ b/deferred-rate-limiter/src/lib.rs @@ -2,7 +2,7 @@ use moka::future::{Cache, CacheBuilder}; use redis_rate_limiter::{RedisRateLimitResult, RedisRateLimiter}; use std::cmp::Eq; -use std::fmt::{Debug, Display}; +use std::fmt::Display; use std::hash::Hash; use std::sync::atomic::Ordering; use std::sync::{atomic::AtomicU64, Arc}; @@ -31,7 +31,7 @@ pub enum DeferredRateLimitResult { impl DeferredRateLimiter where - K: Copy + Debug + Display + Hash + Eq + Send + Sync + 'static, + K: Copy + Display + Hash + Eq + Send + Sync + 'static, { pub async fn new( // TODO: change this to cache_size in bytes @@ -181,7 +181,7 @@ where Err(err) => { // don't let redis errors block our users! error!( - "unable to query rate limits, but local cache is available. key={:?} err={:?}", + "unable to query rate limits, but local cache is available. key={} err={:?}", key, err, ); diff --git a/entities/Cargo.toml b/entities/Cargo.toml index 17de0a73..030ca788 100644 --- a/entities/Cargo.toml +++ b/entities/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "entities" -version = "0.34.0" +version = "0.35.0" edition = "2021" [lib] @@ -14,4 +14,3 @@ ethers = { version = "2.0.7", default-features = false } sea-orm = "0.11.3" serde = "1.0.166" ulid = "1.0.0" -uuid = "1.4.0" diff --git a/migration/Cargo.toml b/migration/Cargo.toml index 5e104f20..d9458c70 100644 --- a/migration/Cargo.toml +++ b/migration/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "migration" -version = "0.34.0" +version = "0.35.0" edition = "2021" publish = false diff --git a/migration/src/lib.rs b/migration/src/lib.rs index 57c09c97..bfab2ca8 100644 --- a/migration/src/lib.rs +++ b/migration/src/lib.rs @@ -34,6 +34,7 @@ mod m20230618_230611_longer_payload; mod m20230619_172237_default_tracking; mod m20230622_104142_stripe_deposits; mod m20230705_214013_type_fixes; +mod m20230707_211936_premium_tier_changes; pub struct Migrator; @@ -75,6 +76,7 @@ impl MigratorTrait for Migrator { Box::new(m20230619_172237_default_tracking::Migration), Box::new(m20230622_104142_stripe_deposits::Migration), Box::new(m20230705_214013_type_fixes::Migration), + Box::new(m20230707_211936_premium_tier_changes::Migration), ] } } diff --git a/migration/src/m20230707_211936_premium_tier_changes.rs b/migration/src/m20230707_211936_premium_tier_changes.rs new file mode 100644 index 00000000..3052c0a9 --- /dev/null +++ b/migration/src/m20230707_211936_premium_tier_changes.rs @@ -0,0 +1,73 @@ +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + // Replace the sample below with your own migration scripts + let update_out_of_funds = Query::update() + .table(UserTier::Table) + .limit(1) + .values([ + (UserTier::MaxRequestsPerPeriod, Some("3900").into()), + (UserTier::MaxConcurrentRequests, Some("3").into()), + ]) + .and_where(Expr::col(UserTier::Title).eq("Premium Out Of Funds")) + .to_owned(); + + manager.exec_stmt(update_out_of_funds).await?; + + let update_premium = Query::update() + .table(UserTier::Table) + .limit(1) + .values([ + (UserTier::MaxRequestsPerPeriod, None::<&str>.into()), + (UserTier::MaxConcurrentRequests, Some("20").into()), + ]) + .and_where(Expr::col(UserTier::Title).eq("Premium")) + .to_owned(); + + manager.exec_stmt(update_premium).await?; + + Ok(()) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + let update_out_of_funds = Query::update() + .table(UserTier::Table) + .limit(1) + .values([ + (UserTier::MaxRequestsPerPeriod, Some("6000").into()), + (UserTier::MaxConcurrentRequests, Some("5").into()), + ]) + .and_where(Expr::col(UserTier::Title).eq("Premium Out Of Funds")) + .to_owned(); + + manager.exec_stmt(update_out_of_funds).await?; + + let update_premium = Query::update() + .table(UserTier::Table) + .limit(1) + .values([ + (UserTier::MaxRequestsPerPeriod, None::<&str>.into()), + (UserTier::MaxConcurrentRequests, Some("100").into()), + ]) + .and_where(Expr::col(UserTier::Title).eq("Premium")) + .to_owned(); + + manager.exec_stmt(update_premium).await?; + + Ok(()) + } +} + +/// Learn more at https://docs.rs/sea-query#iden +#[derive(Iden)] +enum UserTier { + Table, + Title, + MaxRequestsPerPeriod, + MaxConcurrentRequests, +} diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index e40ca55c..5b13b7f7 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "web3_proxy" -version = "0.34.0" +version = "0.35.0" edition = "2021" default-run = "web3_proxy_cli" diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index b8564cae..dc4d6b66 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -1,11 +1,11 @@ mod ws; use crate::block_number::CacheMode; +use crate::caches::{RegisteredUserRateLimitKey, RpcSecretKeyCache, UserBalanceCache}; use crate::config::{AppConfig, TopConfig}; use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult}; use crate::frontend::authorization::{ - Authorization, AuthorizationChecks, Balance, RequestMetadata, RequestOrMethod, ResponseOrBytes, - RpcSecretKey, + Authorization, RequestMetadata, RequestOrMethod, ResponseOrBytes, }; use crate::frontend::rpc_proxy_ws::ProxyMode; use crate::jsonrpc::{ @@ -38,7 +38,6 @@ use hashbrown::{HashMap, HashSet}; use migration::sea_orm::{DatabaseTransaction, EntityTrait, PaginatorTrait, TransactionTrait}; use moka::future::{Cache, CacheBuilder}; use once_cell::sync::OnceCell; -use parking_lot::RwLock; use redis_rate_limiter::redis::AsyncCommands; use redis_rate_limiter::{redis, DeadpoolRuntime, RedisConfig, RedisPool, RedisRateLimiter}; use serde::Serialize; @@ -53,7 +52,7 @@ use std::sync::{atomic, Arc}; use std::time::Duration; use tokio::sync::{broadcast, watch, Semaphore, oneshot}; use tokio::task::JoinHandle; -use tokio::time::{timeout}; +use tokio::time::timeout; use tracing::{error, info, trace, warn, Level}; // TODO: make this customizable? @@ -71,11 +70,6 @@ pub const BILLING_PERIOD_SECONDS: i64 = 60 * 60 * 24 * 7; /// Convenience type pub type Web3ProxyJoinHandle = JoinHandle>; -/// Cache data from the database about rpc keys -pub type RpcSecretKeyCache = Cache; -/// Cache data from the database about user balances -pub type UserBalanceCache = Cache>>; - /// The application // TODO: i'm sure this is more arcs than necessary, but spawning futures makes references hard pub struct Web3ProxyApp { @@ -105,7 +99,7 @@ pub struct Web3ProxyApp { /// rate limit anonymous users pub frontend_ip_rate_limiter: Option>, /// rate limit authenticated users - pub frontend_registered_user_rate_limiter: Option>, + pub frontend_registered_user_rate_limiter: Option>, /// concurrent/parallel request limits for anonymous users pub ip_semaphores: Cache>, pub kafka_producer: Option, @@ -125,7 +119,7 @@ pub struct Web3ProxyApp { /// cache user balances so we don't have to check downgrade logic every single time pub user_balance_cache: UserBalanceCache, /// concurrent/parallel RPC request limits for authenticated users - pub user_semaphores: Cache>, + pub user_semaphores: Cache<(NonZeroU64, IpAddr), Arc>, /// volatile cache used for rate limits /// TODO: i think i might just delete this entirely. instead use local-only concurrency limits. pub vredis_pool: Option, @@ -439,10 +433,10 @@ impl Web3ProxyApp { // these are deferred rate limiters because we don't want redis network requests on the hot path // TODO: take cache_size from config frontend_ip_rate_limiter = Some( - DeferredRateLimiter::::new(20_000, "ip", rpc_rrl.clone(), None).await, + DeferredRateLimiter::new(20_000, "ip", rpc_rrl.clone(), None).await, ); frontend_registered_user_rate_limiter = - Some(DeferredRateLimiter::::new(10_000, "key", rpc_rrl, None).await); + Some(DeferredRateLimiter::new(20_000, "key", rpc_rrl, None).await); } // login rate limiter diff --git a/web3_proxy/src/caches.rs b/web3_proxy/src/caches.rs new file mode 100644 index 00000000..bf30f47e --- /dev/null +++ b/web3_proxy/src/caches.rs @@ -0,0 +1,21 @@ +use crate::frontend::authorization::{AuthorizationChecks, Balance, RpcSecretKey}; +use moka::future::Cache; +use parking_lot::RwLock; +use std::fmt; +use std::net::IpAddr; +use std::num::NonZeroU64; +use std::sync::Arc; + +/// Cache data from the database about rpc keys +pub type RpcSecretKeyCache = Cache; +/// Cache data from the database about user balances +pub type UserBalanceCache = Cache>>; + +#[derive(Clone, Copy, Hash, Eq, PartialEq)] +pub struct RegisteredUserRateLimitKey(pub u64, pub IpAddr); + +impl std::fmt::Display for RegisteredUserRateLimitKey { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}-{}", self.0, self.1) + } +} diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index 80cf8f63..2c82c94f 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -2,6 +2,7 @@ use super::rpc_proxy_ws::ProxyMode; use crate::app::{Web3ProxyApp, APP_USER_AGENT}; +use crate::caches::RegisteredUserRateLimitKey; use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult}; use crate::jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest}; use crate::rpcs::blockchain::Web3ProxyBlock; @@ -965,6 +966,7 @@ impl Web3ProxyApp { pub async fn user_semaphore( &self, authorization_checks: &AuthorizationChecks, + ip: &IpAddr, ) -> Web3ProxyResult> { if let Some(max_concurrent_requests) = authorization_checks.max_concurrent_requests { let user_id = authorization_checks @@ -974,7 +976,7 @@ impl Web3ProxyApp { let semaphore = self .user_semaphores - .get_with_by_ref(&user_id, async move { + .get_with_by_ref(&(user_id, *ip), async move { let s = Semaphore::new(max_concurrent_requests as usize); Arc::new(s) }) @@ -1368,11 +1370,9 @@ impl Web3ProxyApp { return Ok(RateLimitResult::UnknownKey); } - // TODO: rpc_key should have an option to rate limit by ip instead of by key - // only allow this rpc_key to run a limited amount of concurrent requests // TODO: rate limit should be BEFORE the semaphore! - let semaphore = self.user_semaphore(&authorization_checks).await?; + let semaphore = self.user_semaphore(&authorization_checks, ip).await?; let authorization = Authorization::try_new( authorization_checks, @@ -1384,53 +1384,49 @@ impl Web3ProxyApp { AuthorizationType::Frontend, )?; - let user_max_requests_per_period = match authorization.checks.max_requests_per_period { - None => { - return Ok(RateLimitResult::Allowed(authorization, semaphore)); - } - Some(x) => x, - }; - // user key is valid. now check rate limits - if let Some(rate_limiter) = &self.frontend_registered_user_rate_limiter { - match rate_limiter - .throttle( - authorization.checks.user_id, - Some(user_max_requests_per_period), - 1, - ) - .await - { - Ok(DeferredRateLimitResult::Allowed) => { - Ok(RateLimitResult::Allowed(authorization, semaphore)) - } - Ok(DeferredRateLimitResult::RetryAt(retry_at)) => { - // TODO: set headers so they know when they can retry - // TODO: debug or trace? - // this is too verbose, but a stat might be good - // TODO: keys are secrets! use the id instead - // TODO: emit a stat - // // trace!(?rpc_key, "rate limit exceeded until {:?}", retry_at); - Ok(RateLimitResult::RateLimited(authorization, Some(retry_at))) - } - Ok(DeferredRateLimitResult::RetryNever) => { - // TODO: keys are secret. don't log them! - // // trace!(?rpc_key, "rate limit is 0"); - // TODO: emit a stat - Ok(RateLimitResult::RateLimited(authorization, None)) - } - Err(err) => { - // internal error, not rate limit being hit - // TODO: i really want axum to do this for us in a single place. - error!("rate limiter is unhappy. allowing ip. err={:?}", err); + if let Some(user_max_requests_per_period) = authorization.checks.max_requests_per_period { + if let Some(rate_limiter) = &self.frontend_registered_user_rate_limiter { + match rate_limiter + .throttle( + RegisteredUserRateLimitKey(authorization.checks.user_id, *ip), + Some(user_max_requests_per_period), + 1, + ) + .await + { + Ok(DeferredRateLimitResult::Allowed) => { + return Ok(RateLimitResult::Allowed(authorization, semaphore)) + } + Ok(DeferredRateLimitResult::RetryAt(retry_at)) => { + // TODO: set headers so they know when they can retry + // TODO: debug or trace? + // this is too verbose, but a stat might be good + // TODO: keys are secrets! use the id instead + // TODO: emit a stat + // trace!(?rpc_key, "rate limit exceeded until {:?}", retry_at); + return Ok(RateLimitResult::RateLimited(authorization, Some(retry_at))); + } + Ok(DeferredRateLimitResult::RetryNever) => { + // TODO: keys are secret. don't log them! + // trace!(?rpc_key, "rate limit is 0"); + // TODO: emit a stat + return Ok(RateLimitResult::RateLimited(authorization, None)); + } + Err(err) => { + // internal error, not rate limit being hit + // TODO: i really want axum to do this for us in a single place. + error!(?err, "rate limiter is unhappy. allowing rpc_key"); - Ok(RateLimitResult::Allowed(authorization, semaphore)) + return Ok(RateLimitResult::Allowed(authorization, semaphore)); + } } + } else { + // TODO: if no redis, rate limit with just a local cache? } - } else { - // TODO: if no redis, rate limit with just a local cache? - Ok(RateLimitResult::Allowed(authorization, semaphore)) } + + Ok(RateLimitResult::Allowed(authorization, semaphore)) } } diff --git a/web3_proxy/src/lib.rs b/web3_proxy/src/lib.rs index c6212141..48dccd1a 100644 --- a/web3_proxy/src/lib.rs +++ b/web3_proxy/src/lib.rs @@ -4,6 +4,7 @@ pub mod admin_queries; pub mod app; pub mod block_number; +pub mod caches; pub mod compute_units; pub mod config; pub mod errors; diff --git a/web3_proxy/src/stats/mod.rs b/web3_proxy/src/stats/mod.rs index b8c6a6bc..0a1cb899 100644 --- a/web3_proxy/src/stats/mod.rs +++ b/web3_proxy/src/stats/mod.rs @@ -6,7 +6,7 @@ pub mod db_queries; pub mod influxdb_queries; use self::stat_buffer::BufferedRpcQueryStats; -use crate::app::{RpcSecretKeyCache, UserBalanceCache}; +use crate::caches::{RpcSecretKeyCache, UserBalanceCache}; use crate::compute_units::ComputeUnit; use crate::errors::{Web3ProxyError, Web3ProxyResult}; use crate::frontend::authorization::{Authorization, RequestMetadata}; diff --git a/web3_proxy/src/stats/stat_buffer.rs b/web3_proxy/src/stats/stat_buffer.rs index c510b8ac..210bc40b 100644 --- a/web3_proxy/src/stats/stat_buffer.rs +++ b/web3_proxy/src/stats/stat_buffer.rs @@ -1,5 +1,6 @@ use super::{AppStat, RpcQueryKey}; -use crate::app::{RpcSecretKeyCache, UserBalanceCache, Web3ProxyJoinHandle}; +use crate::app::Web3ProxyJoinHandle; +use crate::caches::{RpcSecretKeyCache, UserBalanceCache}; use crate::errors::Web3ProxyResult; use crate::frontend::authorization::Balance; use derive_more::From;