add a shared pool of bonus concurrency (#218)
This commit is contained in:
parent
e1f922dc57
commit
f79c8adbf9
|
@ -99,6 +99,9 @@ pub struct Web3ProxyApp {
|
||||||
Option<DeferredRateLimiter<RegisteredUserRateLimitKey>>,
|
Option<DeferredRateLimiter<RegisteredUserRateLimitKey>>,
|
||||||
/// concurrent/parallel request limits for anonymous users
|
/// concurrent/parallel request limits for anonymous users
|
||||||
pub ip_semaphores: Cache<IpAddr, Arc<Semaphore>>,
|
pub ip_semaphores: Cache<IpAddr, Arc<Semaphore>>,
|
||||||
|
/// give some bonus capacity to public users
|
||||||
|
pub bonus_ip_concurrency: Arc<Semaphore>,
|
||||||
|
/// the /debug/ rpc endpoints send detailed logging to kafka
|
||||||
pub kafka_producer: Option<rdkafka::producer::FutureProducer>,
|
pub kafka_producer: Option<rdkafka::producer::FutureProducer>,
|
||||||
/// rate limit the login endpoint
|
/// rate limit the login endpoint
|
||||||
/// we do this because each pending login is a row in the database
|
/// we do this because each pending login is a row in the database
|
||||||
|
@ -114,6 +117,8 @@ pub struct Web3ProxyApp {
|
||||||
pub user_balance_cache: UserBalanceCache,
|
pub user_balance_cache: UserBalanceCache,
|
||||||
/// concurrent/parallel RPC request limits for authenticated users
|
/// concurrent/parallel RPC request limits for authenticated users
|
||||||
pub user_semaphores: Cache<(NonZeroU64, IpAddr), Arc<Semaphore>>,
|
pub user_semaphores: Cache<(NonZeroU64, IpAddr), Arc<Semaphore>>,
|
||||||
|
/// give some bonus capacity to premium users
|
||||||
|
pub bonus_user_concurrency: Arc<Semaphore>,
|
||||||
/// volatile cache used for rate limits
|
/// volatile cache used for rate limits
|
||||||
/// TODO: i think i might just delete this entirely. instead use local-only concurrency limits.
|
/// TODO: i think i might just delete this entirely. instead use local-only concurrency limits.
|
||||||
pub vredis_pool: Option<RedisPool>,
|
pub vredis_pool: Option<RedisPool>,
|
||||||
|
@ -488,8 +493,15 @@ impl Web3ProxyApp {
|
||||||
.ok()
|
.ok()
|
||||||
.and_then(|x| x.to_str().map(|x| x.to_string()));
|
.and_then(|x| x.to_str().map(|x| x.to_string()));
|
||||||
|
|
||||||
|
// TODO: get the size out of the config
|
||||||
|
let bonus_ip_concurrency = Arc::new(Semaphore::new(top_config.app.bonus_ip_concurrency));
|
||||||
|
let bonus_user_concurrency =
|
||||||
|
Arc::new(Semaphore::new(top_config.app.bonus_user_concurrency));
|
||||||
|
|
||||||
let app = Self {
|
let app = Self {
|
||||||
balanced_rpcs,
|
balanced_rpcs,
|
||||||
|
bonus_ip_concurrency,
|
||||||
|
bonus_user_concurrency,
|
||||||
bundler_4337_rpcs,
|
bundler_4337_rpcs,
|
||||||
config: top_config.app.clone(),
|
config: top_config.app.clone(),
|
||||||
frontend_ip_rate_limiter,
|
frontend_ip_rate_limiter,
|
||||||
|
|
|
@ -83,6 +83,13 @@ pub struct AppConfig {
|
||||||
#[serde_inline_default(90_000u64)]
|
#[serde_inline_default(90_000u64)]
|
||||||
pub archive_depth: u64,
|
pub archive_depth: u64,
|
||||||
|
|
||||||
|
/// pool of extra connections allowed for authenticated users
|
||||||
|
#[serde_inline_default(0usize)]
|
||||||
|
pub bonus_user_concurrency: usize,
|
||||||
|
/// pool of extra connections allowed for anonymous users
|
||||||
|
#[serde_inline_default(0usize)]
|
||||||
|
pub bonus_ip_concurrency: usize,
|
||||||
|
|
||||||
/// EVM chain id. 1 for ETH
|
/// EVM chain id. 1 for ETH
|
||||||
/// TODO: better type for chain_id? max of `u64::MAX / 2 - 36` <https://github.com/ethereum/EIPs/issues/2294>
|
/// TODO: better type for chain_id? max of `u64::MAX / 2 - 36` <https://github.com/ethereum/EIPs/issues/2294>
|
||||||
#[serde_inline_default(1u64)]
|
#[serde_inline_default(1u64)]
|
||||||
|
|
|
@ -955,7 +955,16 @@ impl Web3ProxyApp {
|
||||||
})
|
})
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
let semaphore_permit = semaphore.acquire_owned().await?;
|
let semaphore_permit = tokio::select! {
|
||||||
|
biased;
|
||||||
|
|
||||||
|
p = semaphore.acquire_owned() => {
|
||||||
|
p
|
||||||
|
}
|
||||||
|
p = self.bonus_ip_concurrency.clone().acquire_owned() => {
|
||||||
|
p
|
||||||
|
}
|
||||||
|
}?;
|
||||||
|
|
||||||
Ok(Some(semaphore_permit))
|
Ok(Some(semaphore_permit))
|
||||||
} else {
|
} else {
|
||||||
|
@ -984,7 +993,19 @@ impl Web3ProxyApp {
|
||||||
})
|
})
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
let semaphore_permit = semaphore.acquire_owned().await?;
|
let semaphore_permit = tokio::select! {
|
||||||
|
biased;
|
||||||
|
|
||||||
|
p = semaphore.acquire_owned() => {
|
||||||
|
p
|
||||||
|
}
|
||||||
|
p = self.bonus_user_concurrency.clone().acquire_owned() => {
|
||||||
|
p
|
||||||
|
}
|
||||||
|
p = self.bonus_ip_concurrency.clone().acquire_owned() => {
|
||||||
|
p
|
||||||
|
}
|
||||||
|
}?;
|
||||||
|
|
||||||
Ok(Some(semaphore_permit))
|
Ok(Some(semaphore_permit))
|
||||||
} else {
|
} else {
|
||||||
|
|
Loading…
Reference in New Issue