From 39f5fb262805e6128119a8ed4c3ecd3d511b1d34 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Wed, 22 Nov 2023 22:33:39 -0400 Subject: [PATCH] use an app-level semaphore to limit tx subscriptions (#238) * use an app-level semaphore to limit tx subscriptions * acquire permit in spawned future or it blocks the rest of subscribe --------- Co-authored-by: Rory Neithinger --- web3_proxy/src/app/mod.rs | 5 +++++ web3_proxy/src/rpcs/one.rs | 12 +++++++++++- 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 51ff946a..6589e5d6 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -127,6 +127,8 @@ pub struct App { pub stat_sender: Option>, /// when the app started pub start: Instant, + /// limit the number of tx subscriptions + pub tx_subscriptions: Semaphore, /// Optional time series database for making pretty graphs that load quickly influxdb_client: Option, @@ -490,6 +492,8 @@ impl App { .name("jsonrpc_response_failed_cache_keys") .build(); + let tx_subscriptions = Semaphore::new(1); + let app = Self { balanced_rpcs, bonus_frontend_public_rate_limiter, @@ -522,6 +526,7 @@ impl App { user_semaphores, vredis_pool, watch_consensus_head_receiver, + tx_subscriptions, }; let app = Arc::new(app); diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index ca62b049..f6d748cd 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -5,6 +5,7 @@ use super::request::{OpenRequestHandle, OpenRequestResult}; use crate::app::Web3ProxyJoinHandle; use crate::config::{BlockAndRpc, Web3RpcConfig}; use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult}; +use crate::globals; use crate::jsonrpc::ValidatedRequest; use crate::jsonrpc::{self, JsonRpcParams, JsonRpcResultData}; use crate::rpcs::request::RequestErrorHandler; @@ -895,7 +896,16 @@ impl Web3Rpc { if self.pending_txid_firehose.is_some() && self.ws_provider.load().is_some() { let clone = self.clone(); - let f = async move { clone.subscribe_new_transactions().await }; + let f = async move { + let app = globals::APP.get().unwrap(); + let permit = app.tx_subscriptions.acquire().await?; + + let result = clone.subscribe_new_transactions().await; + + std::mem::drop(permit); + + result + }; // TODO: this is waking itself alot let h = tokio::spawn(f);