Premium tier changes (#170)

* lower premium concurrency in preparation for key+IP limits

* include the ip in the user semaphore

* 3, not 5

this is our current limit for free

* per user_id+ip rate limiting
This commit is contained in:
Bryan Stitt 2023-07-07 15:15:41 -07:00 committed by GitHub
parent 651244fbca
commit b234265458
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 158 additions and 72 deletions

7
Cargo.lock generated

@ -1678,13 +1678,12 @@ dependencies = [
[[package]]
name = "entities"
version = "0.34.0"
version = "0.35.0"
dependencies = [
"ethers",
"sea-orm",
"serde",
"ulid",
"uuid 1.4.0",
]
[[package]]
@ -3354,7 +3353,7 @@ dependencies = [
[[package]]
name = "migration"
version = "0.34.0"
version = "0.35.0"
dependencies = [
"sea-orm-migration",
"tokio",
@ -7080,7 +7079,7 @@ dependencies = [
[[package]]
name = "web3_proxy"
version = "0.34.0"
version = "0.35.0"
dependencies = [
"anyhow",
"arc-swap",

@ -2,7 +2,7 @@
use moka::future::{Cache, CacheBuilder};
use redis_rate_limiter::{RedisRateLimitResult, RedisRateLimiter};
use std::cmp::Eq;
use std::fmt::{Debug, Display};
use std::fmt::Display;
use std::hash::Hash;
use std::sync::atomic::Ordering;
use std::sync::{atomic::AtomicU64, Arc};
@ -31,7 +31,7 @@ pub enum DeferredRateLimitResult {
impl<K> DeferredRateLimiter<K>
where
K: Copy + Debug + Display + Hash + Eq + Send + Sync + 'static,
K: Copy + Display + Hash + Eq + Send + Sync + 'static,
{
pub async fn new(
// TODO: change this to cache_size in bytes
@ -181,7 +181,7 @@ where
Err(err) => {
// don't let redis errors block our users!
error!(
"unable to query rate limits, but local cache is available. key={:?} err={:?}",
"unable to query rate limits, but local cache is available. key={} err={:?}",
key,
err,
);

@ -1,6 +1,6 @@
[package]
name = "entities"
version = "0.34.0"
version = "0.35.0"
edition = "2021"
[lib]
@ -14,4 +14,3 @@ ethers = { version = "2.0.7", default-features = false }
sea-orm = "0.11.3"
serde = "1.0.166"
ulid = "1.0.0"
uuid = "1.4.0"

@ -1,6 +1,6 @@
[package]
name = "migration"
version = "0.34.0"
version = "0.35.0"
edition = "2021"
publish = false

@ -34,6 +34,7 @@ mod m20230618_230611_longer_payload;
mod m20230619_172237_default_tracking;
mod m20230622_104142_stripe_deposits;
mod m20230705_214013_type_fixes;
mod m20230707_211936_premium_tier_changes;
pub struct Migrator;
@ -75,6 +76,7 @@ impl MigratorTrait for Migrator {
Box::new(m20230619_172237_default_tracking::Migration),
Box::new(m20230622_104142_stripe_deposits::Migration),
Box::new(m20230705_214013_type_fixes::Migration),
Box::new(m20230707_211936_premium_tier_changes::Migration),
]
}
}

@ -0,0 +1,73 @@
use sea_orm_migration::prelude::*;
#[derive(DeriveMigrationName)]
pub struct Migration;
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
// Replace the sample below with your own migration scripts
let update_out_of_funds = Query::update()
.table(UserTier::Table)
.limit(1)
.values([
(UserTier::MaxRequestsPerPeriod, Some("3900").into()),
(UserTier::MaxConcurrentRequests, Some("3").into()),
])
.and_where(Expr::col(UserTier::Title).eq("Premium Out Of Funds"))
.to_owned();
manager.exec_stmt(update_out_of_funds).await?;
let update_premium = Query::update()
.table(UserTier::Table)
.limit(1)
.values([
(UserTier::MaxRequestsPerPeriod, None::<&str>.into()),
(UserTier::MaxConcurrentRequests, Some("20").into()),
])
.and_where(Expr::col(UserTier::Title).eq("Premium"))
.to_owned();
manager.exec_stmt(update_premium).await?;
Ok(())
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
let update_out_of_funds = Query::update()
.table(UserTier::Table)
.limit(1)
.values([
(UserTier::MaxRequestsPerPeriod, Some("6000").into()),
(UserTier::MaxConcurrentRequests, Some("5").into()),
])
.and_where(Expr::col(UserTier::Title).eq("Premium Out Of Funds"))
.to_owned();
manager.exec_stmt(update_out_of_funds).await?;
let update_premium = Query::update()
.table(UserTier::Table)
.limit(1)
.values([
(UserTier::MaxRequestsPerPeriod, None::<&str>.into()),
(UserTier::MaxConcurrentRequests, Some("100").into()),
])
.and_where(Expr::col(UserTier::Title).eq("Premium"))
.to_owned();
manager.exec_stmt(update_premium).await?;
Ok(())
}
}
/// Learn more at https://docs.rs/sea-query#iden
#[derive(Iden)]
enum UserTier {
Table,
Title,
MaxRequestsPerPeriod,
MaxConcurrentRequests,
}

@ -1,6 +1,6 @@
[package]
name = "web3_proxy"
version = "0.34.0"
version = "0.35.0"
edition = "2021"
default-run = "web3_proxy_cli"

@ -1,11 +1,11 @@
mod ws;
use crate::block_number::CacheMode;
use crate::caches::{RegisteredUserRateLimitKey, RpcSecretKeyCache, UserBalanceCache};
use crate::config::{AppConfig, TopConfig};
use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult};
use crate::frontend::authorization::{
Authorization, AuthorizationChecks, Balance, RequestMetadata, RequestOrMethod, ResponseOrBytes,
RpcSecretKey,
Authorization, RequestMetadata, RequestOrMethod, ResponseOrBytes,
};
use crate::frontend::rpc_proxy_ws::ProxyMode;
use crate::jsonrpc::{
@ -38,7 +38,6 @@ use hashbrown::{HashMap, HashSet};
use migration::sea_orm::{DatabaseTransaction, EntityTrait, PaginatorTrait, TransactionTrait};
use moka::future::{Cache, CacheBuilder};
use once_cell::sync::OnceCell;
use parking_lot::RwLock;
use redis_rate_limiter::redis::AsyncCommands;
use redis_rate_limiter::{redis, DeadpoolRuntime, RedisConfig, RedisPool, RedisRateLimiter};
use serde::Serialize;
@ -53,7 +52,7 @@ use std::sync::{atomic, Arc};
use std::time::Duration;
use tokio::sync::{broadcast, watch, Semaphore, oneshot};
use tokio::task::JoinHandle;
use tokio::time::{timeout};
use tokio::time::timeout;
use tracing::{error, info, trace, warn, Level};
// TODO: make this customizable?
@ -71,11 +70,6 @@ pub const BILLING_PERIOD_SECONDS: i64 = 60 * 60 * 24 * 7;
/// Convenience type
pub type Web3ProxyJoinHandle<T> = JoinHandle<Web3ProxyResult<T>>;
/// Cache data from the database about rpc keys
pub type RpcSecretKeyCache = Cache<RpcSecretKey, AuthorizationChecks>;
/// Cache data from the database about user balances
pub type UserBalanceCache = Cache<NonZeroU64, Arc<RwLock<Balance>>>;
/// The application
// TODO: i'm sure this is more arcs than necessary, but spawning futures makes references hard
pub struct Web3ProxyApp {
@ -105,7 +99,7 @@ pub struct Web3ProxyApp {
/// rate limit anonymous users
pub frontend_ip_rate_limiter: Option<DeferredRateLimiter<IpAddr>>,
/// rate limit authenticated users
pub frontend_registered_user_rate_limiter: Option<DeferredRateLimiter<u64>>,
pub frontend_registered_user_rate_limiter: Option<DeferredRateLimiter<RegisteredUserRateLimitKey>>,
/// concurrent/parallel request limits for anonymous users
pub ip_semaphores: Cache<IpAddr, Arc<Semaphore>>,
pub kafka_producer: Option<rdkafka::producer::FutureProducer>,
@ -125,7 +119,7 @@ pub struct Web3ProxyApp {
/// cache user balances so we don't have to check downgrade logic every single time
pub user_balance_cache: UserBalanceCache,
/// concurrent/parallel RPC request limits for authenticated users
pub user_semaphores: Cache<NonZeroU64, Arc<Semaphore>>,
pub user_semaphores: Cache<(NonZeroU64, IpAddr), Arc<Semaphore>>,
/// volatile cache used for rate limits
/// TODO: i think i might just delete this entirely. instead use local-only concurrency limits.
pub vredis_pool: Option<RedisPool>,
@ -439,10 +433,10 @@ impl Web3ProxyApp {
// these are deferred rate limiters because we don't want redis network requests on the hot path
// TODO: take cache_size from config
frontend_ip_rate_limiter = Some(
DeferredRateLimiter::<IpAddr>::new(20_000, "ip", rpc_rrl.clone(), None).await,
DeferredRateLimiter::new(20_000, "ip", rpc_rrl.clone(), None).await,
);
frontend_registered_user_rate_limiter =
Some(DeferredRateLimiter::<u64>::new(10_000, "key", rpc_rrl, None).await);
Some(DeferredRateLimiter::new(20_000, "key", rpc_rrl, None).await);
}
// login rate limiter

21
web3_proxy/src/caches.rs Normal file

@ -0,0 +1,21 @@
use crate::frontend::authorization::{AuthorizationChecks, Balance, RpcSecretKey};
use moka::future::Cache;
use parking_lot::RwLock;
use std::fmt;
use std::net::IpAddr;
use std::num::NonZeroU64;
use std::sync::Arc;
/// Cache data from the database about rpc keys
pub type RpcSecretKeyCache = Cache<RpcSecretKey, AuthorizationChecks>;
/// Cache data from the database about user balances
pub type UserBalanceCache = Cache<NonZeroU64, Arc<RwLock<Balance>>>;
#[derive(Clone, Copy, Hash, Eq, PartialEq)]
pub struct RegisteredUserRateLimitKey(pub u64, pub IpAddr);
impl std::fmt::Display for RegisteredUserRateLimitKey {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}-{}", self.0, self.1)
}
}

@ -2,6 +2,7 @@
use super::rpc_proxy_ws::ProxyMode;
use crate::app::{Web3ProxyApp, APP_USER_AGENT};
use crate::caches::RegisteredUserRateLimitKey;
use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult};
use crate::jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest};
use crate::rpcs::blockchain::Web3ProxyBlock;
@ -965,6 +966,7 @@ impl Web3ProxyApp {
pub async fn user_semaphore(
&self,
authorization_checks: &AuthorizationChecks,
ip: &IpAddr,
) -> Web3ProxyResult<Option<OwnedSemaphorePermit>> {
if let Some(max_concurrent_requests) = authorization_checks.max_concurrent_requests {
let user_id = authorization_checks
@ -974,7 +976,7 @@ impl Web3ProxyApp {
let semaphore = self
.user_semaphores
.get_with_by_ref(&user_id, async move {
.get_with_by_ref(&(user_id, *ip), async move {
let s = Semaphore::new(max_concurrent_requests as usize);
Arc::new(s)
})
@ -1368,11 +1370,9 @@ impl Web3ProxyApp {
return Ok(RateLimitResult::UnknownKey);
}
// TODO: rpc_key should have an option to rate limit by ip instead of by key
// only allow this rpc_key to run a limited amount of concurrent requests
// TODO: rate limit should be BEFORE the semaphore!
let semaphore = self.user_semaphore(&authorization_checks).await?;
let semaphore = self.user_semaphore(&authorization_checks, ip).await?;
let authorization = Authorization::try_new(
authorization_checks,
@ -1384,53 +1384,49 @@ impl Web3ProxyApp {
AuthorizationType::Frontend,
)?;
let user_max_requests_per_period = match authorization.checks.max_requests_per_period {
None => {
return Ok(RateLimitResult::Allowed(authorization, semaphore));
}
Some(x) => x,
};
// user key is valid. now check rate limits
if let Some(rate_limiter) = &self.frontend_registered_user_rate_limiter {
match rate_limiter
.throttle(
authorization.checks.user_id,
Some(user_max_requests_per_period),
1,
)
.await
{
Ok(DeferredRateLimitResult::Allowed) => {
Ok(RateLimitResult::Allowed(authorization, semaphore))
}
Ok(DeferredRateLimitResult::RetryAt(retry_at)) => {
// TODO: set headers so they know when they can retry
// TODO: debug or trace?
// this is too verbose, but a stat might be good
// TODO: keys are secrets! use the id instead
// TODO: emit a stat
// // trace!(?rpc_key, "rate limit exceeded until {:?}", retry_at);
Ok(RateLimitResult::RateLimited(authorization, Some(retry_at)))
}
Ok(DeferredRateLimitResult::RetryNever) => {
// TODO: keys are secret. don't log them!
// // trace!(?rpc_key, "rate limit is 0");
// TODO: emit a stat
Ok(RateLimitResult::RateLimited(authorization, None))
}
Err(err) => {
// internal error, not rate limit being hit
// TODO: i really want axum to do this for us in a single place.
error!("rate limiter is unhappy. allowing ip. err={:?}", err);
if let Some(user_max_requests_per_period) = authorization.checks.max_requests_per_period {
if let Some(rate_limiter) = &self.frontend_registered_user_rate_limiter {
match rate_limiter
.throttle(
RegisteredUserRateLimitKey(authorization.checks.user_id, *ip),
Some(user_max_requests_per_period),
1,
)
.await
{
Ok(DeferredRateLimitResult::Allowed) => {
return Ok(RateLimitResult::Allowed(authorization, semaphore))
}
Ok(DeferredRateLimitResult::RetryAt(retry_at)) => {
// TODO: set headers so they know when they can retry
// TODO: debug or trace?
// this is too verbose, but a stat might be good
// TODO: keys are secrets! use the id instead
// TODO: emit a stat
// trace!(?rpc_key, "rate limit exceeded until {:?}", retry_at);
return Ok(RateLimitResult::RateLimited(authorization, Some(retry_at)));
}
Ok(DeferredRateLimitResult::RetryNever) => {
// TODO: keys are secret. don't log them!
// trace!(?rpc_key, "rate limit is 0");
// TODO: emit a stat
return Ok(RateLimitResult::RateLimited(authorization, None));
}
Err(err) => {
// internal error, not rate limit being hit
// TODO: i really want axum to do this for us in a single place.
error!(?err, "rate limiter is unhappy. allowing rpc_key");
Ok(RateLimitResult::Allowed(authorization, semaphore))
return Ok(RateLimitResult::Allowed(authorization, semaphore));
}
}
} else {
// TODO: if no redis, rate limit with just a local cache?
}
} else {
// TODO: if no redis, rate limit with just a local cache?
Ok(RateLimitResult::Allowed(authorization, semaphore))
}
Ok(RateLimitResult::Allowed(authorization, semaphore))
}
}

@ -4,6 +4,7 @@
pub mod admin_queries;
pub mod app;
pub mod block_number;
pub mod caches;
pub mod compute_units;
pub mod config;
pub mod errors;

@ -6,7 +6,7 @@ pub mod db_queries;
pub mod influxdb_queries;
use self::stat_buffer::BufferedRpcQueryStats;
use crate::app::{RpcSecretKeyCache, UserBalanceCache};
use crate::caches::{RpcSecretKeyCache, UserBalanceCache};
use crate::compute_units::ComputeUnit;
use crate::errors::{Web3ProxyError, Web3ProxyResult};
use crate::frontend::authorization::{Authorization, RequestMetadata};

@ -1,5 +1,6 @@
use super::{AppStat, RpcQueryKey};
use crate::app::{RpcSecretKeyCache, UserBalanceCache, Web3ProxyJoinHandle};
use crate::app::Web3ProxyJoinHandle;
use crate::caches::{RpcSecretKeyCache, UserBalanceCache};
use crate::errors::Web3ProxyResult;
use crate::frontend::authorization::Balance;
use derive_more::From;