diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 589c9b9d..351b035e 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -6,10 +6,9 @@ use crate::block_number::{block_needed, BlockNeeded}; use crate::config::{AppConfig, TopConfig}; use crate::frontend::authorization::{Authorization, RequestMetadata}; use crate::frontend::errors::FrontendErrorResponse; -use crate::jsonrpc::JsonRpcForwardedResponse; -use crate::jsonrpc::JsonRpcForwardedResponseEnum; -use crate::jsonrpc::JsonRpcRequest; -use crate::jsonrpc::JsonRpcRequestEnum; +use crate::jsonrpc::{ + JsonRpcForwardedResponse, JsonRpcForwardedResponseEnum, JsonRpcRequest, JsonRpcRequestEnum, +}; use crate::rpcs::blockchain::{ArcBlock, SavedBlock}; use crate::rpcs::connection::Web3Connection; use crate::rpcs::connections::Web3Connections; @@ -23,23 +22,19 @@ use deferred_rate_limiter::DeferredRateLimiter; use derive_more::From; use entities::sea_orm_active_enums::LogLevel; use ethers::core::utils::keccak256; -use ethers::prelude::{Address, Block, Bytes, TxHash, H256, U64}; -use ethers::types::Transaction; +use ethers::prelude::{Address, Block, Bytes, Transaction, TxHash, H256, U64}; use ethers::utils::rlp::{Decodable, Rlp}; use futures::future::join_all; -use futures::stream::FuturesUnordered; -use futures::stream::StreamExt; +use futures::stream::{FuturesUnordered, StreamExt}; use hashbrown::{HashMap, HashSet}; use ipnet::IpNet; -use log::Level; -use log::{debug, error, info, warn}; +use log::{debug, error, info, warn, Level}; use metered::{metered, ErrorCount, HitCount, ResponseTime, Throughput}; use migration::sea_orm::{self, ConnectionTrait, Database, DatabaseConnection}; use migration::sea_query::table::ColumnDef; use migration::{Alias, DbErr, Migrator, MigratorTrait, Table}; use moka::future::Cache; -use redis_rate_limiter::redis::AsyncCommands; -use redis_rate_limiter::{DeadpoolRuntime, RedisConfig, RedisPool, RedisRateLimiter}; +use redis_rate_limiter::{redis, DeadpoolRuntime, RedisConfig, RedisPool, RedisRateLimiter}; use serde::Serialize; use serde_json::json; use serde_json::value::to_raw_value; @@ -48,8 +43,7 @@ use std::hash::{Hash, Hasher}; use std::net::IpAddr; use std::num::NonZeroU64; use std::str::FromStr; -use std::sync::atomic; -use std::sync::Arc; +use std::sync::{atomic, Arc}; use std::time::Duration; use tokio::sync::{broadcast, watch, Semaphore}; use tokio::task::JoinHandle; @@ -194,14 +188,13 @@ pub struct Web3ProxyApp { /// store pending transactions that we've seen so that we don't send duplicates to subscribers pub pending_transactions: Cache, pub frontend_ip_rate_limiter: Option>, - // TODO: this key should be our RpcSecretKey class, not Ulid - pub frontend_key_rate_limiter: Option>, + pub frontend_registered_user_rate_limiter: Option>, pub login_rate_limiter: Option, pub vredis_pool: Option, // TODO: this key should be our RpcSecretKey class, not Ulid pub rpc_secret_key_cache: Cache, - pub rpc_key_semaphores: + pub registered_user_semaphores: Cache, hashbrown::hash_map::DefaultHashBuilder>, pub ip_semaphores: Cache, hashbrown::hash_map::DefaultHashBuilder>, pub bearer_token_semaphores: @@ -346,11 +339,17 @@ impl Web3ProxyApp { } if !top_config.extra.is_empty() { - warn!("unknown TopConfig fields!: {:?}", top_config.app.extra.keys()); + warn!( + "unknown TopConfig fields!: {:?}", + top_config.app.extra.keys() + ); } if !top_config.app.extra.is_empty() { - warn!("unknown Web3ProxyAppConfig fields!: {:?}", top_config.app.extra.keys()); + warn!( + "unknown Web3ProxyAppConfig fields!: {:?}", + top_config.app.extra.keys() + ); } // setup metrics @@ -598,7 +597,7 @@ impl Web3ProxyApp { // create rate limiters // these are optional. they require redis let mut frontend_ip_rate_limiter = None; - let mut frontend_key_rate_limiter = None; + let mut frontend_registered_user_rate_limiter = None; let mut login_rate_limiter = None; if let Some(redis_pool) = vredis_pool.as_ref() { @@ -623,7 +622,7 @@ impl Web3ProxyApp { rpc_rrl.clone(), None, )); - frontend_key_rate_limiter = Some(DeferredRateLimiter::::new( + frontend_registered_user_rate_limiter = Some(DeferredRateLimiter::::new( 10_000, "key", rpc_rrl, None, )); @@ -673,7 +672,7 @@ impl Web3ProxyApp { let ip_semaphores = Cache::builder() .time_to_idle(Duration::from_secs(120)) .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()); - let rpc_key_semaphores = Cache::builder() + let registered_user_semaphores = Cache::builder() .time_to_idle(Duration::from_secs(120)) .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()); @@ -686,7 +685,7 @@ impl Web3ProxyApp { pending_tx_sender, pending_transactions, frontend_ip_rate_limiter, - frontend_key_rate_limiter, + frontend_registered_user_rate_limiter, login_rate_limiter, db_conn, db_replica, @@ -696,7 +695,7 @@ impl Web3ProxyApp { rpc_secret_key_cache, bearer_token_semaphores, ip_semaphores, - rpc_key_semaphores, + registered_user_semaphores, stat_sender, }; @@ -711,47 +710,103 @@ impl Web3ProxyApp { // globals.insert("service", "web3_proxy"); #[derive(Serialize)] - struct RecentIps(i64); + struct RecentCounts { + one_day: i64, + one_hour: i64, + one_minute: i64, + } - let recent_ips = if let Ok(mut redis_conn) = self.redis_conn().await { - // TODO: delete any hash entries where - const ONE_DAY: i64 = 86400; - - let oldest = Utc::now().timestamp() - ONE_DAY; - - let recent_ips_key = format!("recent_ips:{}", self.config.chain_id); - - // delete any entries that are too old - // TODO: pipe - if let Err(err) = redis_conn - .zrembyscore::<_, _, _, u64>(recent_ips_key.clone(), i64::MIN, oldest) - .await - { - warn!("unable to clear recent_ips: {}", err); - } - - match redis_conn.zcount(recent_ips_key, i64::MIN, i64::MAX).await { - Ok(count) => RecentIps(count), - Err(err) => { - warn!("unable to count recent_ips: {}", err); - RecentIps(-1) + impl RecentCounts { + fn for_err() -> Self { + Self { + one_day: -1, + one_hour: -1, + one_minute: -1, } } - } else { - RecentIps(-1) - }; + } + + let (recent_ip_counts, recent_user_counts): (RecentCounts, RecentCounts) = + match self.redis_conn().await { + Ok(mut redis_conn) => { + // TODO: delete any hash entries where + const ONE_MINUTE: i64 = 60; + const ONE_HOUR: i64 = ONE_MINUTE * 60; + const ONE_DAY: i64 = ONE_HOUR * 24; + + let one_day_ago = Utc::now().timestamp() - ONE_DAY; + let one_hour_ago = Utc::now().timestamp() - ONE_HOUR; + let one_minute_ago = Utc::now().timestamp() - ONE_MINUTE; + + let recent_users_by_user = + format!("recent_users:registered:{}", self.config.chain_id); + let recent_users_by_ip = format!("recent_users:ip:{}", self.config.chain_id); + + match redis::pipe() + .atomic() + // delete any entries older than 24 hours + .zrembyscore(&recent_users_by_user, i64::MIN, one_day_ago) + .ignore() + .zrembyscore(&recent_users_by_ip, i64::MIN, one_day_ago) + .ignore() + // get count for last day + .zcount(&recent_users_by_user, one_day_ago, i64::MAX) + .zcount(&recent_users_by_ip, one_day_ago, i64::MAX) + // get count for last hour + .zcount(&recent_users_by_user, one_hour_ago, i64::MAX) + .zcount(&recent_users_by_ip, one_hour_ago, i64::MAX) + // get count for last minute + .zcount(&recent_users_by_user, one_minute_ago, i64::MAX) + .zcount(&recent_users_by_ip, one_minute_ago, i64::MAX) + .query_async(&mut redis_conn) + .await + { + Ok(( + day_by_user, + day_by_ip, + hour_by_user, + hour_by_ip, + minute_by_user, + minute_by_ip, + )) => { + let recent_ip_counts = RecentCounts { + one_day: day_by_ip, + one_hour: hour_by_ip, + one_minute: minute_by_ip, + }; + let recent_user_counts = RecentCounts { + one_day: day_by_user, + one_hour: hour_by_user, + one_minute: minute_by_user, + }; + + (recent_ip_counts, recent_user_counts) + } + Err(err) => { + warn!("unable to count recent users: {}", err); + (RecentCounts::for_err(), RecentCounts::for_err()) + } + } + } + Err(err) => { + warn!("unable to connect to redis while counting users: {:?}", err); + (RecentCounts::for_err(), RecentCounts::for_err()) + } + }; #[derive(Serialize)] struct CombinedMetrics<'a> { app: &'a Web3ProxyAppMetrics, backend_rpc: &'a OpenRequestHandleMetrics, - recent_ips: RecentIps, + recent_ip_counts: RecentCounts, + recent_user_counts: RecentCounts, } let metrics = CombinedMetrics { app: &self.app_metrics, backend_rpc: &self.open_request_handle_metrics, - recent_ips, + recent_ip_counts, + recent_user_counts, }; serde_prometheus::to_string(&metrics, Some("web3_proxy"), globals) diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index 71461fd6..bab4124e 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -45,7 +45,7 @@ pub struct TopConfig { // TODO: instead of an option, give it a default pub private_rpcs: Option>, /// unknown config options get put here - #[serde(flatten, default="HashMap::default")] + #[serde(flatten, default = "HashMap::default")] pub extra: HashMap, } @@ -148,7 +148,7 @@ pub struct AppConfig { pub volatile_redis_max_connections: Option, /// unknown config options get put here - #[serde(flatten, default="HashMap::default")] + #[serde(flatten, default = "HashMap::default")] pub extra: HashMap, } @@ -206,7 +206,7 @@ pub struct Web3ConnectionConfig { #[serde(default)] pub subscribe_txs: Option, /// unknown config options get put here - #[serde(flatten, default="HashMap::default")] + #[serde(flatten, default = "HashMap::default")] pub extra: HashMap, } @@ -232,7 +232,10 @@ impl Web3ConnectionConfig { open_request_handle_metrics: Arc, ) -> anyhow::Result<(Arc, AnyhowJoinHandle<()>)> { if !self.extra.is_empty() { - warn!("unknown Web3ConnectionConfig fields!: {:?}", self.extra.keys()); + warn!( + "unknown Web3ConnectionConfig fields!: {:?}", + self.extra.keys() + ); } let hard_limit = match (self.hard_limit, redis_pool) { diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index 85460701..051c8bb6 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -342,10 +342,9 @@ pub async fn ip_is_authorized( x => unimplemented!("rate_limit_by_ip shouldn't ever see these: {:?}", x), }; - let app = app.clone(); - - // TODO: in the background, add the ip to a recent_ips map + // in the background, add the ip to a recent_users map if app.config.public_recent_ips_salt.is_some() { + let app = app.clone(); let f = async move { let now = Utc::now().timestamp(); @@ -357,21 +356,20 @@ pub async fn ip_is_authorized( .as_ref() .expect("public_recent_ips_salt must exist in here"); - // TODO: how should we salt and hash the ips? do it faster? let salted_ip = format!("{}:{}", salt, ip); let hashed_ip = Bytes::from(keccak256(salted_ip.as_bytes())); - let recent_ips_key = format!("recent_ips:{}", app.config.chain_id); + let recent_ip_key = format!("recent_users:ip:{}", app.config.chain_id); redis_conn - .zadd(recent_ips_key, hashed_ip.to_string(), now) + .zadd(recent_ip_key, hashed_ip.to_string(), now) .await?; Ok::<_, anyhow::Error>(()) } .map_err(|err| { - warn!("background update of recent_ips failed: {}", err); + warn!("background update of recent_users:ip failed: {}", err); err }); @@ -384,7 +382,7 @@ pub async fn ip_is_authorized( /// like app.rate_limit_by_rpc_key but converts to a FrontendErrorResponse; pub async fn key_is_authorized( - app: &Web3ProxyApp, + app: &Arc, rpc_key: RpcSecretKey, ip: IpAddr, origin: Option, @@ -404,6 +402,43 @@ pub async fn key_is_authorized( RateLimitResult::UnknownKey => return Err(FrontendErrorResponse::UnknownKey), }; + // TODO: DRY and maybe optimize the hashing + // in the background, add the ip to a recent_users map + if app.config.public_recent_ips_salt.is_some() { + let app = app.clone(); + let user_id = authorization.checks.user_id; + let f = async move { + let now = Utc::now().timestamp(); + + let mut redis_conn = app.redis_conn().await?; + + let salt = app + .config + .public_recent_ips_salt + .as_ref() + .expect("public_recent_ips_salt must exist in here"); + + let salted_user_id = format!("{}:{}", salt, user_id); + + let hashed_user_id = Bytes::from(keccak256(salted_user_id.as_bytes())); + + let recent_user_id_key = format!("recent_users:registered:{}", app.config.chain_id); + + redis_conn + .zadd(recent_user_id_key, hashed_user_id.to_string(), now) + .await?; + + Ok::<_, anyhow::Error>(()) + } + .map_err(|err| { + warn!("background update of recent_users:ip failed: {}", err); + + err + }); + + tokio::spawn(f); + } + Ok((authorization, semaphore)) } @@ -434,18 +469,21 @@ impl Web3ProxyApp { } /// Limit the number of concurrent requests from the given rpc key. - pub async fn rpc_key_semaphore( + pub async fn registered_user_semaphore( &self, authorization_checks: &AuthorizationChecks, ) -> anyhow::Result> { if let Some(max_concurrent_requests) = authorization_checks.max_concurrent_requests { - let rpc_key_id = authorization_checks.rpc_key_id.context("no rpc_key_id")?; + let user_id = authorization_checks + .user_id + .try_into() + .context("user ids should always be non-zero")?; let semaphore = self - .rpc_key_semaphores - .get_with(rpc_key_id, async move { + .registered_user_semaphores + .get_with(user_id, async move { let s = Semaphore::new(max_concurrent_requests as usize); - // // trace!("new semaphore for rpc_key_id {}", rpc_key_id); + // trace!("new semaphore for user_id {}", user_id); Arc::new(s) }) .await; @@ -739,9 +777,13 @@ impl Web3ProxyApp { return Ok(RateLimitResult::UnknownKey); } + // TODO: rpc_key should have an option to rate limit by ip instead of by key + // only allow this rpc_key to run a limited amount of concurrent requests // TODO: rate limit should be BEFORE the semaphore! - let semaphore = self.rpc_key_semaphore(&authorization_checks).await?; + let semaphore = self + .registered_user_semaphore(&authorization_checks) + .await?; let authorization = Authorization::try_new( authorization_checks, @@ -761,9 +803,13 @@ impl Web3ProxyApp { }; // user key is valid. now check rate limits - if let Some(rate_limiter) = &self.frontend_key_rate_limiter { + if let Some(rate_limiter) = &self.frontend_registered_user_rate_limiter { match rate_limiter - .throttle(rpc_key.into(), Some(user_max_requests_per_period), 1) + .throttle( + authorization.checks.user_id, + Some(user_max_requests_per_period), + 1, + ) .await { Ok(DeferredRateLimitResult::Allowed) => {