clean up wait_for_ functions

This commit is contained in:
Bryan Stitt 2023-03-02 18:27:32 +00:00
parent 907a147afa
commit d1b955275d

@ -134,6 +134,7 @@ pub struct Web3Rpc {
pub(super) total_requests: AtomicUsize,
pub(super) active_requests: AtomicUsize,
pub(super) reconnect: AtomicBool,
/// this is only inside an Option so that the "Default" derive works. it will always be set.
pub(super) disconnect_watch: Option<watch::Sender<bool>>,
pub(super) created_at: Option<Instant>,
}
@ -676,6 +677,10 @@ impl Web3Rpc {
Ok(())
}
fn should_disconnect(&self) -> bool {
*self.disconnect_watch.as_ref().unwrap().borrow()
}
/// subscribe to blocks and transactions with automatic reconnects
/// This should only exit when the program is exiting.
/// TODO: should more of these args be on self?
@ -885,29 +890,6 @@ impl Web3Rpc {
Ok(())
}
fn should_disconnect(&self) -> bool {
*self.disconnect_watch.as_ref().unwrap().borrow()
}
async fn wait_for_provider(&self) -> Arc<Web3Provider> {
let mut provider = self.provider.read().await.clone();
let mut logged = false;
while provider.is_none() {
// trace!("waiting on unlocked_provider: locking...");
sleep(Duration::from_millis(100)).await;
if !logged {
debug!("waiting for provider on {}", self);
logged = true;
}
provider = self.provider.read().await.clone();
}
provider.unwrap()
}
/// Subscribe to new blocks.
async fn subscribe_new_heads(
self: Arc<Self>,
@ -1099,41 +1081,19 @@ impl Web3Rpc {
) -> anyhow::Result<()> {
// TODO: give this a separate client. don't use new_head_client for everything. especially a firehose this big
// TODO: timeout
let mut provider = self.provider.read().await.clone();
let mut logged = false;
while provider.is_none() {
// trace!("waiting on provider: locking...");
// TODO: can we subscribe to something instead? should self.provider be a watch channel?
sleep(Duration::from_millis(100)).await;
if !logged {
debug!(
"no provider for subscribe_pending_transactions handle on {}",
self
);
logged = true;
}
provider = self.provider.read().await.clone();
}
let provider = self.wait_for_provider().await;
trace!("watching pending transactions on {}", self);
// TODO: does this keep the lock open for too long?
match provider.as_deref() {
None => {
unimplemented!("no provider");
}
Some(Web3Provider::Http(provider)) => {
match provider.as_ref() {
Web3Provider::Http(provider) => {
// there is a "watch_pending_transactions" function, but a lot of public nodes do not support the necessary rpc endpoints
// TODO: maybe subscribe to self.head_block?
// TODO: this keeps a read lock guard open on provider_state forever. is that okay for an http client?
futures::future::pending::<()>().await;
self.wait_for_disconnect().await?;
}
Some(Web3Provider::Both(_, client)) | Some(Web3Provider::Ws(client)) => {
Web3Provider::Both(_, client) | Web3Provider::Ws(client) => {
// TODO: maybe the subscribe_pending_txs function should be on the active_request_handle
let active_request_handle = self
.wait_for_request_handle(&authorization, None, provider.clone())
.wait_for_request_handle(&authorization, None, Some(provider.clone()))
.await?;
let mut stream = client.subscribe_pending_txs().await?;
@ -1159,13 +1119,8 @@ impl Web3Rpc {
debug!("pending_transactions subscription ended on {}", self);
}
#[cfg(test)]
Some(Web3Provider::Mock) => {
let mut disconnect_watch = self.disconnect_watch.as_ref().unwrap().subscribe();
if !*disconnect_watch.borrow_and_update() {
// wait for disconnect_watch to change
disconnect_watch.changed().await?;
}
Web3Provider::Mock => {
self.wait_for_disconnect().await?;
}
}
@ -1293,6 +1248,39 @@ impl Web3Rpc {
Ok(OpenRequestResult::Handle(handle))
}
async fn wait_for_disconnect(&self) -> Result<(), tokio::sync::watch::error::RecvError> {
let mut disconnect_watch = self.disconnect_watch.as_ref().unwrap().subscribe();
loop {
if *disconnect_watch.borrow_and_update() {
// disconnect watch is set to "true"
return Ok(());
}
// wait for disconnect_watch to change
disconnect_watch.changed().await?;
}
}
async fn wait_for_provider(&self) -> Arc<Web3Provider> {
let mut provider = self.provider.read().await.clone();
let mut logged = false;
while provider.is_none() {
// trace!("waiting on unlocked_provider: locking...");
sleep(Duration::from_millis(100)).await;
if !logged {
debug!("waiting for provider on {}", self);
logged = true;
}
provider = self.provider.read().await.clone();
}
provider.unwrap()
}
pub async fn wait_for_query<P, R>(
self: &Arc<Self>,
method: &str,