diff --git a/web3_proxy/src/rpcs/connection.rs b/web3_proxy/src/rpcs/connection.rs index aef181fb..dd9e62e9 100644 --- a/web3_proxy/src/rpcs/connection.rs +++ b/web3_proxy/src/rpcs/connection.rs @@ -16,10 +16,10 @@ use std::fmt; use std::hash::{Hash, Hasher}; use std::sync::atomic::{self, AtomicU32, AtomicU64}; use std::{cmp::Ordering, sync::Arc}; -use tokio::sync::broadcast; use tokio::sync::RwLock as AsyncRwLock; +use tokio::sync::{broadcast, oneshot}; use tokio::time::{interval, sleep, sleep_until, Duration, MissedTickBehavior}; -use tracing::{error, info, instrument, trace, warn}; +use tracing::{debug, error, info, instrument, trace, warn}; /// An active connection to a Web3Rpc pub struct Web3Connection { @@ -385,6 +385,8 @@ impl Web3Connection { reconnect: bool, ) -> anyhow::Result<()> { loop { + debug!(rpc=%self, "subscribing"); + let http_interval_receiver = http_interval_sender.as_ref().map(|x| x.subscribe()); let mut futures = vec![]; @@ -407,10 +409,20 @@ impl Web3Connection { futures.push(flatten_handle(tokio::spawn(f))); } + let mut never_shot = None; + if futures.is_empty() { - // TODO: is there a better way to make a channel that is never ready? info!(rpc=%self, "no-op subscription"); - return Ok(()); + + // TODO: is there a better way to make a channel that is never ready? + // TODO: this is wrong! we still need retries! have this do a health check on an interval instead + let (tx, rx) = oneshot::channel(); + + never_shot = Some(tx); + + let f = async move { rx.await.map_err(Into::into) }; + + futures.push(flatten_handle(tokio::spawn(f))); } match try_join_all(futures).await { @@ -436,8 +448,12 @@ impl Web3Connection { } } } + + drop(never_shot); } + info!(rpc=%self, "subscription complete"); + Ok(()) }