From 907a147afab683ed3df37211aa46ba4cbdc830ab Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Thu, 2 Mar 2023 18:14:17 +0000 Subject: [PATCH] wait_for_provider helper function --- web3_proxy/src/rpcs/one.rs | 58 +++++++++++++++++++++----------------- 1 file changed, 32 insertions(+), 26 deletions(-) diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index 2d79ac3d..86ee7019 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -105,7 +105,7 @@ pub struct Web3Rpc { /// provider is in a RwLock so that we can replace it if re-connecting /// it is an async lock because we hold it open across awaits /// this provider is only used for new heads subscriptions - /// TODO: put the provider inside an arc? + /// TODO: watch channel instead of a lock pub(super) provider: AsyncRwLock>>, /// keep track of hard limits pub(super) hard_limit_until: Option>, @@ -889,6 +889,25 @@ impl Web3Rpc { *self.disconnect_watch.as_ref().unwrap().borrow() } + async fn wait_for_provider(&self) -> Arc { + let mut provider = self.provider.read().await.clone(); + + let mut logged = false; + while provider.is_none() { + // trace!("waiting on unlocked_provider: locking..."); + sleep(Duration::from_millis(100)).await; + + if !logged { + debug!("waiting for provider on {}", self); + logged = true; + } + + provider = self.provider.read().await.clone(); + } + + provider.unwrap() + } + /// Subscribe to new blocks. async fn subscribe_new_heads( self: Arc, @@ -899,23 +918,10 @@ impl Web3Rpc { ) -> anyhow::Result<()> { trace!("watching new heads on {}", self); - let mut unlocked_provider = self.provider.read().await.clone(); + let provider = self.wait_for_provider().await; - let mut logged = false; - while unlocked_provider.is_none() { - // trace!("waiting on unlocked_provider: locking..."); - sleep(Duration::from_millis(100)).await; - - if !logged { - debug!("no provider for subscribe_new_heads on {}", self); - logged = true; - } - - unlocked_provider = self.provider.read().await.clone(); - } - - match unlocked_provider.as_deref() { - Some(Web3Provider::Http(_client)) => { + match provider.as_ref() { + Web3Provider::Http(_client) => { // there is a "watch_blocks" function, but a lot of public nodes do not support the necessary rpc endpoints // TODO: try watch_blocks and fall back to this? @@ -925,8 +931,9 @@ impl Web3Rpc { while !self.should_disconnect() { // TODO: what should the max_wait be? + // we do not pass unlocked_provider because we want to get a new one each call. otherwise we might re-use an old one match self - .wait_for_request_handle(&authorization, None, unlocked_provider.clone()) + .wait_for_request_handle(&authorization, None, None) .await { Ok(active_request_handle) => { @@ -1009,10 +1016,10 @@ impl Web3Rpc { } } } - Some(Web3Provider::Both(_, client)) | Some(Web3Provider::Ws(client)) => { + Web3Provider::Both(_, client) | Web3Provider::Ws(client) => { // todo: move subscribe_blocks onto the request handle? let active_request_handle = self - .wait_for_request_handle(&authorization, None, unlocked_provider.clone()) + .wait_for_request_handle(&authorization, None, Some(provider.clone())) .await; let mut stream = client.subscribe_blocks().await?; drop(active_request_handle); @@ -1023,13 +1030,13 @@ impl Web3Rpc { // TODO: how does this get wrapped in an arc? does ethers handle that? // TODO: do this part over http? let block: Result, _> = self - .wait_for_request_handle(&authorization, None, unlocked_provider.clone()) + .wait_for_request_handle(&authorization, None, Some(provider.clone())) .await? .request( "eth_getBlockByNumber", &json!(("latest", false)), Level::Warn.into(), - unlocked_provider.clone(), + Some(provider.clone()), ) .await; @@ -1073,9 +1080,8 @@ impl Web3Rpc { // TODO: we probably don't want a warn and to return error debug!("new_heads subscription to {} ended", self); } - None => unimplemented!("there should always be a provider"), #[cfg(test)] - Some(Web3Provider::Mock) => unimplemented!(), + Web3Provider::Mock => unimplemented!(), } // clear the head block. this might not be needed, but it won't hurt @@ -1098,6 +1104,7 @@ impl Web3Rpc { let mut logged = false; while provider.is_none() { // trace!("waiting on provider: locking..."); + // TODO: can we subscribe to something instead? should self.provider be a watch channel? sleep(Duration::from_millis(100)).await; if !logged { @@ -1115,8 +1122,7 @@ impl Web3Rpc { // TODO: does this keep the lock open for too long? match provider.as_deref() { None => { - // TODO: wait for a provider - return Err(anyhow!("no provider")); + unimplemented!("no provider"); } Some(Web3Provider::Http(provider)) => { // there is a "watch_pending_transactions" function, but a lot of public nodes do not support the necessary rpc endpoints