diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index 86ee7019..2ac4a765 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -134,6 +134,7 @@ pub struct Web3Rpc { pub(super) total_requests: AtomicUsize, pub(super) active_requests: AtomicUsize, pub(super) reconnect: AtomicBool, + /// this is only inside an Option so that the "Default" derive works. it will always be set. pub(super) disconnect_watch: Option>, pub(super) created_at: Option, } @@ -676,6 +677,10 @@ impl Web3Rpc { Ok(()) } + fn should_disconnect(&self) -> bool { + *self.disconnect_watch.as_ref().unwrap().borrow() + } + /// subscribe to blocks and transactions with automatic reconnects /// This should only exit when the program is exiting. /// TODO: should more of these args be on self? @@ -885,29 +890,6 @@ impl Web3Rpc { Ok(()) } - fn should_disconnect(&self) -> bool { - *self.disconnect_watch.as_ref().unwrap().borrow() - } - - async fn wait_for_provider(&self) -> Arc { - let mut provider = self.provider.read().await.clone(); - - let mut logged = false; - while provider.is_none() { - // trace!("waiting on unlocked_provider: locking..."); - sleep(Duration::from_millis(100)).await; - - if !logged { - debug!("waiting for provider on {}", self); - logged = true; - } - - provider = self.provider.read().await.clone(); - } - - provider.unwrap() - } - /// Subscribe to new blocks. async fn subscribe_new_heads( self: Arc, @@ -1099,41 +1081,19 @@ impl Web3Rpc { ) -> anyhow::Result<()> { // TODO: give this a separate client. don't use new_head_client for everything. especially a firehose this big // TODO: timeout - let mut provider = self.provider.read().await.clone(); - - let mut logged = false; - while provider.is_none() { - // trace!("waiting on provider: locking..."); - // TODO: can we subscribe to something instead? should self.provider be a watch channel? - sleep(Duration::from_millis(100)).await; - - if !logged { - debug!( - "no provider for subscribe_pending_transactions handle on {}", - self - ); - logged = true; - } - - provider = self.provider.read().await.clone(); - } + let provider = self.wait_for_provider().await; trace!("watching pending transactions on {}", self); // TODO: does this keep the lock open for too long? - match provider.as_deref() { - None => { - unimplemented!("no provider"); - } - Some(Web3Provider::Http(provider)) => { + match provider.as_ref() { + Web3Provider::Http(provider) => { // there is a "watch_pending_transactions" function, but a lot of public nodes do not support the necessary rpc endpoints - // TODO: maybe subscribe to self.head_block? - // TODO: this keeps a read lock guard open on provider_state forever. is that okay for an http client? - futures::future::pending::<()>().await; + self.wait_for_disconnect().await?; } - Some(Web3Provider::Both(_, client)) | Some(Web3Provider::Ws(client)) => { + Web3Provider::Both(_, client) | Web3Provider::Ws(client) => { // TODO: maybe the subscribe_pending_txs function should be on the active_request_handle let active_request_handle = self - .wait_for_request_handle(&authorization, None, provider.clone()) + .wait_for_request_handle(&authorization, None, Some(provider.clone())) .await?; let mut stream = client.subscribe_pending_txs().await?; @@ -1159,13 +1119,8 @@ impl Web3Rpc { debug!("pending_transactions subscription ended on {}", self); } #[cfg(test)] - Some(Web3Provider::Mock) => { - let mut disconnect_watch = self.disconnect_watch.as_ref().unwrap().subscribe(); - - if !*disconnect_watch.borrow_and_update() { - // wait for disconnect_watch to change - disconnect_watch.changed().await?; - } + Web3Provider::Mock => { + self.wait_for_disconnect().await?; } } @@ -1293,6 +1248,39 @@ impl Web3Rpc { Ok(OpenRequestResult::Handle(handle)) } + async fn wait_for_disconnect(&self) -> Result<(), tokio::sync::watch::error::RecvError> { + let mut disconnect_watch = self.disconnect_watch.as_ref().unwrap().subscribe(); + + loop { + if *disconnect_watch.borrow_and_update() { + // disconnect watch is set to "true" + return Ok(()); + } + + // wait for disconnect_watch to change + disconnect_watch.changed().await?; + } + } + + async fn wait_for_provider(&self) -> Arc { + let mut provider = self.provider.read().await.clone(); + + let mut logged = false; + while provider.is_none() { + // trace!("waiting on unlocked_provider: locking..."); + sleep(Duration::from_millis(100)).await; + + if !logged { + debug!("waiting for provider on {}", self); + logged = true; + } + + provider = self.provider.read().await.clone(); + } + + provider.unwrap() + } + pub async fn wait_for_query( self: &Arc, method: &str,