From f79c8adbf9e7e9ee8aca5e9723c375163f994e84 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Mon, 25 Sep 2023 12:34:00 -0700 Subject: [PATCH] add a shared pool of bonus concurrency (#218) --- web3_proxy/src/app/mod.rs | 12 ++++++++++++ web3_proxy/src/config.rs | 7 +++++++ web3_proxy/src/frontend/authorization.rs | 25 ++++++++++++++++++++++-- 3 files changed, 42 insertions(+), 2 deletions(-) diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 4c356b96..f83ca022 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -99,6 +99,9 @@ pub struct Web3ProxyApp { Option>, /// concurrent/parallel request limits for anonymous users pub ip_semaphores: Cache>, + /// give some bonus capacity to public users + pub bonus_ip_concurrency: Arc, + /// the /debug/ rpc endpoints send detailed logging to kafka pub kafka_producer: Option, /// rate limit the login endpoint /// we do this because each pending login is a row in the database @@ -114,6 +117,8 @@ pub struct Web3ProxyApp { pub user_balance_cache: UserBalanceCache, /// concurrent/parallel RPC request limits for authenticated users pub user_semaphores: Cache<(NonZeroU64, IpAddr), Arc>, + /// give some bonus capacity to premium users + pub bonus_user_concurrency: Arc, /// volatile cache used for rate limits /// TODO: i think i might just delete this entirely. instead use local-only concurrency limits. pub vredis_pool: Option, @@ -488,8 +493,15 @@ impl Web3ProxyApp { .ok() .and_then(|x| x.to_str().map(|x| x.to_string())); + // TODO: get the size out of the config + let bonus_ip_concurrency = Arc::new(Semaphore::new(top_config.app.bonus_ip_concurrency)); + let bonus_user_concurrency = + Arc::new(Semaphore::new(top_config.app.bonus_user_concurrency)); + let app = Self { balanced_rpcs, + bonus_ip_concurrency, + bonus_user_concurrency, bundler_4337_rpcs, config: top_config.app.clone(), frontend_ip_rate_limiter, diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index 5bbd13fc..dface507 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -83,6 +83,13 @@ pub struct AppConfig { #[serde_inline_default(90_000u64)] pub archive_depth: u64, + /// pool of extra connections allowed for authenticated users + #[serde_inline_default(0usize)] + pub bonus_user_concurrency: usize, + /// pool of extra connections allowed for anonymous users + #[serde_inline_default(0usize)] + pub bonus_ip_concurrency: usize, + /// EVM chain id. 1 for ETH /// TODO: better type for chain_id? max of `u64::MAX / 2 - 36` #[serde_inline_default(1u64)] diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index 46a599c3..0a346867 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -955,7 +955,16 @@ impl Web3ProxyApp { }) .await; - let semaphore_permit = semaphore.acquire_owned().await?; + let semaphore_permit = tokio::select! { + biased; + + p = semaphore.acquire_owned() => { + p + } + p = self.bonus_ip_concurrency.clone().acquire_owned() => { + p + } + }?; Ok(Some(semaphore_permit)) } else { @@ -984,7 +993,19 @@ impl Web3ProxyApp { }) .await; - let semaphore_permit = semaphore.acquire_owned().await?; + let semaphore_permit = tokio::select! { + biased; + + p = semaphore.acquire_owned() => { + p + } + p = self.bonus_user_concurrency.clone().acquire_owned() => { + p + } + p = self.bonus_ip_concurrency.clone().acquire_owned() => { + p + } + }?; Ok(Some(semaphore_permit)) } else {