diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index ebf7f483..41084ab2 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -154,6 +154,8 @@ impl Web3Rpcs { let handle = { let connections = connections.clone(); + // TODO: do we actually want this spawned? i think we already spawned to get here + // todo!(this task is waking itself ~50% of the time. that seems bad) tokio::spawn(async move { connections .process_incoming_blocks(block_and_rpc_receiver) diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index cd81bcd1..280bcb2b 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -87,6 +87,7 @@ pub struct Web3Rpc { /// Track in-flight requests pub(super) active_requests: AtomicUsize, /// disconnect_watch is only inside an Option so that the "Default" derive works. it will always be set. + /// todo!(qthis gets cloned a TON. probably too much. something seems wrong) pub(super) disconnect_watch: Option>, /// created_at is only inside an Option so that the "Default" derive works. it will always be set. pub(super) created_at: Option, @@ -739,31 +740,7 @@ impl Web3Rpc { .web3_context("failed check_provider")?; let mut futures = Vec::new(); - - // TODO: use this channel instead of self.disconnect_watch - let (subscribe_stop_tx, mut subscribe_stop_rx) = watch::channel(false); - - // subscribe to the disconnect watch. the app uses this when shutting down or when configs change - if let Some(disconnect_watch_tx) = self.disconnect_watch.as_ref() { - let rpc = self.clone(); - let mut disconnect_watch_rx = disconnect_watch_tx.subscribe(); - - let f = async move { - loop { - if *disconnect_watch_rx.borrow_and_update() { - break; - } - - disconnect_watch_rx.changed().await?; - } - trace!("disconnect triggered on {}", rpc); - Ok(()) - }; - - futures.push(tokio::spawn(f)); - } else { - unimplemented!("there should always be a disconnect watch!"); - } + let mut abort_handles = vec![]; // health check that runs if there haven't been any recent requests if let Some(block_and_rpc_sender) = block_and_rpc_sender.clone() { @@ -781,14 +758,6 @@ impl Web3Rpc { let mut old_total_requests = 0; let mut new_total_requests; - if *subscribe_stop_rx.borrow_and_update() { - trace!( - "stopping healthcheck loop before even starting it on {}", - rpc - ); - return Ok(()); - } - // errors here should not cause the loop to exit! loop { new_total_requests = rpc.internal_requests.load(atomic::Ordering::Relaxed) @@ -818,22 +787,8 @@ impl Web3Rpc { // TODO: should we count the requests done inside this health check old_total_requests = new_total_requests; - select! { - x = subscribe_stop_rx.changed() => { - if *subscribe_stop_rx.borrow_and_update() || x.is_err() { - trace!(%rpc, "stopping http block subscription"); - break; - } - } - _ = sleep(Duration::from_secs(health_sleep_seconds)) => { - // time for another health check - } - } + sleep(Duration::from_secs(health_sleep_seconds)).await; } - - trace!("healthcheck loop on {} exited", rpc); - - Ok(()) }; // TODO: log quick_check lik @@ -851,7 +806,11 @@ impl Web3Rpc { self.healthy.store(initial_check, atomic::Ordering::Relaxed); - futures.push(tokio::spawn(f)); + let h = tokio::spawn(f); + let a = h.abort_handle(); + + futures.push(h); + abort_handles.push(a); } else { self.healthy.store(true, atomic::Ordering::Relaxed); } @@ -859,46 +818,39 @@ impl Web3Rpc { // subscribe to new heads if let Some(block_and_rpc_sender) = block_and_rpc_sender.clone() { let clone = self.clone(); - let mut subscribe_stop_rx = subscribe_stop_tx.subscribe(); let block_map = block_map.clone(); - if *subscribe_stop_rx.borrow_and_update() { - return Err(anyhow::anyhow!( - "exiting subscribe_new_heads before even starting on {}", - self - ) - .into()); - } - let f = async move { clone - .subscribe_new_heads(block_and_rpc_sender.clone(), block_map, subscribe_stop_rx) + .subscribe_new_heads(block_and_rpc_sender.clone(), block_map) .await }; - futures.push(tokio::spawn(f)); + let h = tokio::spawn(f); + let a = h.abort_handle(); + + futures.push(h); + abort_handles.push(a); } // subscribe to new transactions - if let Some(pending_txid_firehose) = pending_txid_firehose.clone() { - let clone = self.clone(); - let mut subscribe_stop_rx = subscribe_stop_tx.subscribe(); + if self.subscribe_txs && self.ws_provider.load().is_some() { + if let Some(pending_txid_firehose) = pending_txid_firehose.clone() { + let clone = self.clone(); - if *subscribe_stop_rx.borrow_and_update() { - return Err(anyhow::anyhow!( - "exiting subscribe_new_heads before even starting on {}", - self - ) - .into()); + let f = async move { + clone + .subscribe_new_transactions(pending_txid_firehose) + .await + }; + + // TODO: this is waking itself alot + let h = tokio::spawn(f); + let a = h.abort_handle(); + + futures.push(h); + abort_handles.push(a); } - - let f = async move { - clone - .subscribe_new_transactions(pending_txid_firehose, subscribe_stop_rx) - .await - }; - - futures.push(tokio::spawn(f)); } // exit if any of the futures exit @@ -913,9 +865,10 @@ impl Web3Rpc { .await? }; - subscribe_stop_tx.send_replace(true); - - // TODO: wait for all of the futures to exit? + // stop the other futures + for a in abort_handles { + a.abort(); + } // TODO: tell ethers to disconnect? self.ws_provider.store(None); @@ -926,21 +879,9 @@ impl Web3Rpc { async fn subscribe_new_transactions( self: &Arc, pending_txid_firehose: Arc>, - mut subscribe_stop_rx: watch::Receiver, ) -> Web3ProxyResult<()> { trace!("subscribing to new transactions on {}", self); - // rpcs opt-into subscribing to transactions. its a lot of bandwidth - if !self.subscribe_txs { - loop { - if *subscribe_stop_rx.borrow_and_update() { - trace!("stopping ws block subscription on {}", self); - return Ok(()); - } - subscribe_stop_rx.changed().await?; - } - } - if let Some(ws_provider) = self.ws_provider.load().as_ref() { // todo: move subscribe_blocks onto the request handle instead of having a seperate wait_for_throttle self.wait_for_throttle(Instant::now() + Duration::from_secs(5)) @@ -949,31 +890,14 @@ impl Web3Rpc { // TODO: only subscribe if a user has subscribed let mut pending_txs_sub = ws_provider.subscribe_pending_txs().await?; - loop { - select! { - x = subscribe_stop_rx.changed() => { - if *subscribe_stop_rx.borrow_and_update() || x.is_err() { - break; - } - } - x = pending_txs_sub.next() => { - if let Some(x) = x { - pending_txid_firehose.send(x).await; - } - } - } + while let Some(x) = pending_txs_sub.next().await { + pending_txid_firehose.send(x).await; } } else { // only websockets subscribe to pending transactions // its possible to do with http, but not recommended // TODO: what should we do here? - loop { - if *subscribe_stop_rx.borrow_and_update() { - trace!("stopping ws block subscription on {}", self); - return Ok(()); - } - subscribe_stop_rx.changed().await?; - } + unimplemented!() } Ok(()) @@ -984,7 +908,6 @@ impl Web3Rpc { self: &Arc, block_sender: mpsc::UnboundedSender, block_map: BlocksByHashCache, - mut subscribe_stop_rx: watch::Receiver, ) -> Web3ProxyResult<()> { trace!("subscribing to new heads on {}", self); @@ -1018,21 +941,11 @@ impl Web3Rpc { self.send_head_block_result(latest_block, &block_sender, &block_map) .await?; - loop { - select! { - x = subscribe_stop_rx.changed() => { - if *subscribe_stop_rx.borrow_and_update() || x.is_err() { - trace!(%self, "stopping websocket block subscription"); - break; - } - } - block = blocks.next() => { - let block = block.map(Arc::new); + while let Some(block) = blocks.next().await { + let block = Ok(Some(Arc::new(block))); - self.send_head_block_result(Ok(block), &block_sender, &block_map) - .await?; - } - } + self.send_head_block_result(block, &block_sender, &block_map) + .await?; } } else if self.http_client.is_some() { // there is a "watch_blocks" function, but a lot of public nodes (including llamanodes) do not support the necessary rpc endpoints @@ -1054,15 +967,7 @@ impl Web3Rpc { .await?; // TODO: should this select be at the start or end of the loop? - select! { - x = subscribe_stop_rx.changed() => { - if *subscribe_stop_rx.borrow_and_update() || x.is_err() { - trace!(%self, "stopping http block subscription"); - break; - } - } - _ = i.tick() => {} - } + i.tick().await; } } else { return Err(anyhow!("no ws or http provider!").into()); @@ -1072,7 +977,7 @@ impl Web3Rpc { self.send_head_block_result(Ok(None), &block_sender, &block_map) .await?; - if *subscribe_stop_rx.borrow() { + if self.should_disconnect() { trace!(%self, "new heads subscription exited"); Ok(()) } else {