improve pooling more

This commit is contained in:
Bryan Stitt 2022-07-09 00:00:31 +00:00
parent 035680a203
commit 89439d015f
2 changed files with 23 additions and 16 deletions

View File

@ -91,7 +91,7 @@ impl RedisCellClient {
#[inline] #[inline]
pub async fn throttle_key(&self, key: &str) -> Result<(), Duration> { pub async fn throttle_key(&self, key: &str) -> Result<(), Duration> {
let key = format!("{}:{}", KEY_PREFIX, key); let key = format!("{}:{}", self.key, key);
self._throttle(key.as_ref(), 1).await self._throttle(key.as_ref(), 1).await
} }

View File

@ -146,17 +146,21 @@ impl Web3ProxyApp {
.build()?, .build()?,
); );
let rate_limiter_pool = match app_config.shared.rate_limit_redis { let redis_client_pool = match app_config.shared.rate_limit_redis {
Some(redis_address) => { Some(redis_address) => {
info!("Connecting to redis on {}", redis_address); info!("Connecting to redis on {}", redis_address);
let manager = RedisConnectionManager::new(redis_address)?; let manager = RedisConnectionManager::new(redis_address)?;
let min_size = num_workers as u32;
let max_size = min_size * 4;
// TODO: min_idle? // TODO: min_idle?
// TODO: set max_size based on max expected concurrent connections? set based on num_workers? // TODO: set max_size based on max expected concurrent connections? set based on num_workers?
let builder = bb8::Pool::builder() let builder = bb8::Pool::builder()
.error_sink(bb8_helpers::RedisErrorSink.boxed_clone()) .error_sink(bb8_helpers::RedisErrorSink.boxed_clone())
.max_size(1024); .min_idle(Some(min_size))
.max_size(max_size);
let pool = builder.build(manager).await?; let pool = builder.build(manager).await?;
@ -168,9 +172,8 @@ impl Web3ProxyApp {
} }
}; };
// TODO: subscribe to pending transactions on the private rpcs, too?
let (head_block_sender, head_block_receiver) = watch::channel(Block::default()); let (head_block_sender, head_block_receiver) = watch::channel(Block::default());
// TODO: will one receiver lagging be okay? // TODO: will one receiver lagging be okay? how big should this be?
let (pending_tx_sender, pending_tx_receiver) = broadcast::channel(16); let (pending_tx_sender, pending_tx_receiver) = broadcast::channel(16);
let pending_transactions = Arc::new(DashMap::new()); let pending_transactions = Arc::new(DashMap::new());
@ -184,7 +187,7 @@ impl Web3ProxyApp {
app_config.shared.chain_id, app_config.shared.chain_id,
balanced_rpcs, balanced_rpcs,
http_client.as_ref(), http_client.as_ref(),
rate_limiter_pool.as_ref(), redis_client_pool.as_ref(),
Some(head_block_sender), Some(head_block_sender),
Some(pending_tx_sender.clone()), Some(pending_tx_sender.clone()),
pending_transactions.clone(), pending_transactions.clone(),
@ -202,7 +205,7 @@ impl Web3ProxyApp {
app_config.shared.chain_id, app_config.shared.chain_id,
private_rpcs, private_rpcs,
http_client.as_ref(), http_client.as_ref(),
rate_limiter_pool.as_ref(), redis_client_pool.as_ref(),
// subscribing to new heads here won't work well // subscribing to new heads here won't work well
None, None,
// TODO: subscribe to pending transactions on the private rpcs? // TODO: subscribe to pending transactions on the private rpcs?
@ -222,15 +225,19 @@ impl Web3ProxyApp {
// TODO: how much should we allow? // TODO: how much should we allow?
let public_max_burst = app_config.shared.public_rate_limit_per_minute / 3; let public_max_burst = app_config.shared.public_rate_limit_per_minute / 3;
let public_rate_limiter = rate_limiter_pool.as_ref().map(|redis_client_pool| { let public_rate_limiter = if app_config.shared.public_rate_limit_per_minute == 0 {
RedisCellClient::new( None
redis_client_pool.clone(), } else {
"public".to_string(), redis_client_pool.as_ref().map(|redis_client_pool| {
public_max_burst, RedisCellClient::new(
app_config.shared.public_rate_limit_per_minute, redis_client_pool.clone(),
60, "public".to_string(),
) public_max_burst,
}); app_config.shared.public_rate_limit_per_minute,
60,
)
})
};
let app = Self { let app = Self {
balanced_rpcs, balanced_rpcs,