From be68a9babb2d7668599449bf1b02b623a60cad4c Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Wed, 1 Nov 2023 22:31:00 -0700 Subject: [PATCH] less function args. more owned data --- web3_proxy/src/rpcs/many.rs | 2 +- web3_proxy/src/rpcs/one.rs | 164 +++++++++++++++--------------------- 2 files changed, 67 insertions(+), 99 deletions(-) diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 8664b136..c623f019 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -40,8 +40,8 @@ pub struct Web3Rpcs { /// TODO: hopefully this not being an async lock will be okay. if you need it across awaits, clone the arc pub(crate) by_name: RwLock>>, /// all providers with the same consensus head block. won't update if there is no `self.watch_head_block` - /// TODO: document that this is a watch sender and not a broadcast! if things get busy, blocks might get missed /// TODO: why is watch_head_block in an Option, but this one isn't? + /// TODO: document that this is a watch sender and not a broadcast! if things get busy, blocks might get missed /// Geth's subscriptions have the same potential for skipping blocks. pub(crate) watch_ranked_rpcs: watch::Sender>>, /// this head receiver makes it easy to wait until there is a new block diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index 0082b226..4263866d 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -46,7 +46,13 @@ pub struct Web3Rpc { pub block_interval: Duration, pub display_name: Option, pub db_conn: Option, - pub subscribe_txs: bool, + + /// Track in-flight requests + pub(super) active_requests: AtomicUsize, + /// mapping of block numbers and hashes + pub(super) block_map: Option, + /// created_at is only inside an Option so that the "Default" derive works. it will always be set. + pub(super) created_at: Option, /// most all requests prefer use the http_provider pub(super) http_client: Option, pub(super) http_url: Option, @@ -66,6 +72,8 @@ pub struct Web3Rpc { pub(super) automatic_block_limit: bool, /// only use this rpc if everything else is lagging too far. this allows us to ignore fast but very low limit rpcs pub backup: bool, + /// if subscribed to new heads, blocks are sent through this channel to update a parent Web3Rpcs + pub(super) block_and_rpc_sender: Option>, /// TODO: have an enum for this so that "no limit" prints pretty? pub(super) block_data_limit: AtomicU64, /// head_block is only inside an Option so that the "Default" derive works. it will always be set. @@ -73,10 +81,12 @@ pub struct Web3Rpc { /// Track head block latency. /// TODO: This is in a sync lock, but writes are infrequent and quick. Is this actually okay? Set from a spawned task and read an atomic instead? pub(super) head_delay: RwLock, + /// false if a health check has failed + pub(super) healthy: AtomicBool, /// Track peak request latency /// peak_latency is only inside an Option so that the "Default" derive works. it will always be set. pub(super) peak_latency: Option, - /// Automatically set priority + /// Automatically set priority based on request latency and active requests pub(super) tier: AtomicU32, /// Track total internal requests served pub(super) internal_requests: AtomicUsize, @@ -87,15 +97,11 @@ pub struct Web3Rpc { /// Track time used by external requests served /// request_ms_histogram is only inside an Option so that the "Default" derive works. it will always be set. pub(super) median_latency: Option, - /// Track in-flight requests - pub(super) active_requests: AtomicUsize, /// disconnect_watch is only inside an Option so that the "Default" derive works. it will always be set. /// todo!(qthis gets cloned a TON. probably too much. something seems wrong) pub(super) disconnect_watch: Option>, - /// created_at is only inside an Option so that the "Default" derive works. it will always be set. - pub(super) created_at: Option, - /// false if a health check has failed - pub(super) healthy: AtomicBool, + /// if subscribed to pending transactions, transactions are sent through this channel to update a parent Web3App + pub(super) pending_txid_firehose: Option>>, } impl Web3Rpc { @@ -200,11 +206,20 @@ impl Web3Rpc { // TODO: start optimistically? let healthy = false.into(); + let pending_txid_firehose = if config.subscribe_txs { + // TODO: error if subscribe_txs but not pending_txid_firehose + pending_txid_firehose + } else { + None + }; + let new_rpc = Self { automatic_block_limit, backup, block_data_limit, block_interval, + block_map: Some(block_map), + chain_id, created_at: Some(created_at), display_name: config.display_name, hard_limit, @@ -217,7 +232,8 @@ impl Web3Rpc { peak_latency: Some(peak_latency), median_latency: Some(median_request_latency), soft_limit: config.soft_limit, - subscribe_txs: config.subscribe_txs, + pending_txid_firehose, + block_and_rpc_sender, ws_url, disconnect_watch: Some(disconnect_watch), healthy, @@ -230,16 +246,7 @@ impl Web3Rpc { // subscribing starts the connection (with retries) let handle = { let new_connection = new_connection.clone(); - tokio::spawn(async move { - new_connection - .subscribe_with_reconnect( - block_map, - block_and_rpc_sender, - pending_txid_firehose, - chain_id, - ) - .await - }) + tokio::spawn(async move { new_connection.subscribe_with_reconnect().await }) }; Ok((new_connection, handle)) @@ -492,10 +499,11 @@ impl Web3Rpc { /// query the web3 provider to confirm it is on the expected chain with the expected data available /// TODO: this currently checks only the http if both http and ws are set. it should check both and make sure they match - async fn check_provider(self: &Arc, chain_id: u64) -> Web3ProxyResult<()> { + async fn check_provider(self: &Arc) -> Web3ProxyResult<()> { // TODO: different handlers for backup vs primary let error_handler = Some(Level::TRACE.into()); + // TODO: make this configurable. voltaire bundler uses web3_bundlerVersion match self .internal_request::<_, String>( "web3_clientVersion".into(), @@ -536,10 +544,10 @@ impl Web3Rpc { trace!("found_chain_id: {:#?}", found_chain_id); - if chain_id != found_chain_id.as_u64() { + if self.chain_id != found_chain_id.as_u64() { return Err(anyhow::anyhow!( "incorrect chain id! Config has {}, but RPC has {}", - chain_id, + self.chain_id, found_chain_id ) .context(format!("failed @ {}", self)) @@ -559,8 +567,6 @@ impl Web3Rpc { pub(crate) async fn send_head_block_result( self: &Arc, new_head_block: Web3ProxyResult>, - block_and_rpc_sender: &mpsc::UnboundedSender, - block_map: &BlocksByHashCache, ) -> Web3ProxyResult<()> { let head_block_sender = self.head_block_sender.as_ref().unwrap(); @@ -590,13 +596,17 @@ impl Web3Rpc { let new_hash = *new_head_block.hash(); // if we already have this block saved, set new_head_block to that arc. otherwise store this copy - let new_head_block = block_map - .get_with_by_ref(&new_hash, async move { new_head_block }) + let new_head_block = self + .block_map + .as_ref() + .unwrap() + .get_with(new_hash, async move { new_head_block }) .await; // we are synced! yey! head_block_sender.send_replace(Some(new_head_block.clone())); + // TODO: checking this every time seems excessive if self.block_data_limit() == U64::zero() { if let Err(err) = self.check_block_data_limit().await { warn!( @@ -623,9 +633,11 @@ impl Web3Rpc { }; // tell web3rpcs about this rpc having this block - block_and_rpc_sender - .send((new_head_block, self.clone())) - .context("block_and_rpc_sender failed sending")?; + if let Some(block_and_rpc_sender) = &self.block_and_rpc_sender { + block_and_rpc_sender + .send((new_head_block, self.clone())) + .context("block_and_rpc_sender failed sending")?; + } Ok(()) } @@ -691,24 +703,9 @@ impl Web3Rpc { } /// TODO: this needs to be a subscribe_with_reconnect that does a retry with jitter and exponential backoff - async fn subscribe_with_reconnect( - self: Arc, - block_map: BlocksByHashCache, - block_and_rpc_sender: Option>, - pending_txid_firehose: Option>>, - chain_id: u64, - ) -> Web3ProxyResult<()> { + async fn subscribe_with_reconnect(self: Arc) -> Web3ProxyResult<()> { loop { - if let Err(err) = self - .clone() - .subscribe( - block_map.clone(), - block_and_rpc_sender.clone(), - pending_txid_firehose.clone(), - chain_id, - ) - .await - { + if let Err(err) = self.clone().subscribe().await { if self.should_disconnect() { break; } @@ -733,13 +730,7 @@ impl Web3Rpc { /// subscribe to blocks and transactions /// This should only exit when the program is exiting. /// TODO: should more of these args be on self? chain_id for sure - async fn subscribe( - self: Arc, - block_map: BlocksByHashCache, - block_and_rpc_sender: Option>, - pending_txid_firehose: Option>>, - chain_id: u64, - ) -> Web3ProxyResult<()> { + async fn subscribe(self: Arc) -> Web3ProxyResult<()> { let error_handler = if self.backup { Some(RequestErrorHandler::DebugLevel) } else { @@ -768,7 +759,7 @@ impl Web3Rpc { trace!("starting subscriptions on {}", self); if let Err(err) = self - .check_provider(chain_id) + .check_provider() .await .web3_context("failed check_provider") { @@ -780,7 +771,7 @@ impl Web3Rpc { let mut abort_handles = vec![]; // health check that runs if there haven't been any recent requests - let health_handle = if block_and_rpc_sender.is_some() { + let health_handle = if self.block_and_rpc_sender.is_some() { // TODO: move this into a proper function let rpc = self.clone(); @@ -857,7 +848,7 @@ impl Web3Rpc { } // TODO: if this fails too many times, reset the connection - if let Err(err) = rpc.check_provider(chain_id).await { + if let Err(err) = rpc.check_provider().await { rpc.healthy.store(false, atomic::Ordering::Relaxed); // TODO: if rate limit error, set "retry_at" @@ -883,15 +874,10 @@ impl Web3Rpc { futures.push(health_handle); // subscribe to new heads - if let Some(block_and_rpc_sender) = block_and_rpc_sender.clone() { + if self.block_and_rpc_sender.is_some() { let clone = self.clone(); - let block_map = block_map.clone(); - let f = async move { - clone - .subscribe_new_heads(block_and_rpc_sender, block_map) - .await - }; + let f = async move { clone.subscribe_new_heads().await }; let h = tokio::spawn(f); let a = h.abort_handle(); @@ -901,23 +887,17 @@ impl Web3Rpc { } // subscribe to new transactions - if self.subscribe_txs && self.ws_provider.load().is_some() { - if let Some(pending_txid_firehose) = pending_txid_firehose.clone() { - let clone = self.clone(); + if self.pending_txid_firehose.is_some() && self.ws_provider.load().is_some() { + let clone = self.clone(); - let f = async move { - clone - .subscribe_new_transactions(pending_txid_firehose) - .await - }; + let f = async move { clone.subscribe_new_transactions().await }; - // TODO: this is waking itself alot - let h = tokio::spawn(f); - let a = h.abort_handle(); + // TODO: this is waking itself alot + let h = tokio::spawn(f); + let a = h.abort_handle(); - futures.push(h); - abort_handles.push(a); - } + futures.push(h); + abort_handles.push(a); } // exit if any of the futures exit @@ -929,10 +909,7 @@ impl Web3Rpc { debug!(?first_exit, "subscriptions on {} exited", self); // clear the head block - if let Some(block_and_rpc_sender) = block_and_rpc_sender { - self.send_head_block_result(Ok(None), &block_and_rpc_sender, &block_map) - .await? - }; + self.send_head_block_result(Ok(None)).await?; // stop the other futures for a in abort_handles { @@ -945,12 +922,11 @@ impl Web3Rpc { Ok(()) } - async fn subscribe_new_transactions( - self: &Arc, - pending_txid_firehose: Arc>, - ) -> Web3ProxyResult<()> { + async fn subscribe_new_transactions(self: &Arc) -> Web3ProxyResult<()> { trace!("subscribing to new transactions on {}", self); + let pending_txid_firehose = self.pending_txid_firehose.as_ref().unwrap(); + if let Some(ws_provider) = self.ws_provider.load().as_ref() { // todo: move subscribe_blocks onto the request handle instead of having a seperate wait_for_throttle self.wait_for_throttle(Instant::now() + Duration::from_secs(5)) @@ -973,11 +949,7 @@ impl Web3Rpc { } /// Subscribe to new block headers. - async fn subscribe_new_heads( - self: &Arc, - block_sender: mpsc::UnboundedSender, - block_map: BlocksByHashCache, - ) -> Web3ProxyResult<()> { + async fn subscribe_new_heads(self: &Arc) -> Web3ProxyResult<()> { trace!("subscribing to new heads on {}", self); let error_handler = if self.backup { @@ -1004,14 +976,12 @@ impl Web3Rpc { ) .await; - self.send_head_block_result(latest_block, &block_sender, &block_map) - .await?; + self.send_head_block_result(latest_block).await?; while let Some(block) = blocks.next().await { let block = Ok(Some(Arc::new(block))); - self.send_head_block_result(block, &block_sender, &block_map) - .await?; + self.send_head_block_result(block).await?; } } else if self.http_client.is_some() { // there is a "watch_blocks" function, but a lot of public nodes (including llamanodes) do not support the necessary rpc endpoints @@ -1029,8 +999,7 @@ impl Web3Rpc { ) .await; - self.send_head_block_result(block_result, &block_sender, &block_map) - .await?; + self.send_head_block_result(block_result).await?; // TODO: should this select be at the start or end of the loop? i.tick().await; @@ -1040,8 +1009,7 @@ impl Web3Rpc { } // clear the head block. this might not be needed, but it won't hurt - self.send_head_block_result(Ok(None), &block_sender, &block_map) - .await?; + self.send_head_block_result(Ok(None)).await?; if self.should_disconnect() { trace!(%self, "new heads subscription exited");