wait_for_provider helper function
This commit is contained in:
parent
0d07e20d7c
commit
907a147afa
@ -105,7 +105,7 @@ pub struct Web3Rpc {
|
|||||||
/// provider is in a RwLock so that we can replace it if re-connecting
|
/// provider is in a RwLock so that we can replace it if re-connecting
|
||||||
/// it is an async lock because we hold it open across awaits
|
/// it is an async lock because we hold it open across awaits
|
||||||
/// this provider is only used for new heads subscriptions
|
/// this provider is only used for new heads subscriptions
|
||||||
/// TODO: put the provider inside an arc?
|
/// TODO: watch channel instead of a lock
|
||||||
pub(super) provider: AsyncRwLock<Option<Arc<Web3Provider>>>,
|
pub(super) provider: AsyncRwLock<Option<Arc<Web3Provider>>>,
|
||||||
/// keep track of hard limits
|
/// keep track of hard limits
|
||||||
pub(super) hard_limit_until: Option<watch::Sender<Instant>>,
|
pub(super) hard_limit_until: Option<watch::Sender<Instant>>,
|
||||||
@ -889,6 +889,25 @@ impl Web3Rpc {
|
|||||||
*self.disconnect_watch.as_ref().unwrap().borrow()
|
*self.disconnect_watch.as_ref().unwrap().borrow()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn wait_for_provider(&self) -> Arc<Web3Provider> {
|
||||||
|
let mut provider = self.provider.read().await.clone();
|
||||||
|
|
||||||
|
let mut logged = false;
|
||||||
|
while provider.is_none() {
|
||||||
|
// trace!("waiting on unlocked_provider: locking...");
|
||||||
|
sleep(Duration::from_millis(100)).await;
|
||||||
|
|
||||||
|
if !logged {
|
||||||
|
debug!("waiting for provider on {}", self);
|
||||||
|
logged = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
provider = self.provider.read().await.clone();
|
||||||
|
}
|
||||||
|
|
||||||
|
provider.unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
/// Subscribe to new blocks.
|
/// Subscribe to new blocks.
|
||||||
async fn subscribe_new_heads(
|
async fn subscribe_new_heads(
|
||||||
self: Arc<Self>,
|
self: Arc<Self>,
|
||||||
@ -899,23 +918,10 @@ impl Web3Rpc {
|
|||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
trace!("watching new heads on {}", self);
|
trace!("watching new heads on {}", self);
|
||||||
|
|
||||||
let mut unlocked_provider = self.provider.read().await.clone();
|
let provider = self.wait_for_provider().await;
|
||||||
|
|
||||||
let mut logged = false;
|
match provider.as_ref() {
|
||||||
while unlocked_provider.is_none() {
|
Web3Provider::Http(_client) => {
|
||||||
// trace!("waiting on unlocked_provider: locking...");
|
|
||||||
sleep(Duration::from_millis(100)).await;
|
|
||||||
|
|
||||||
if !logged {
|
|
||||||
debug!("no provider for subscribe_new_heads on {}", self);
|
|
||||||
logged = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
unlocked_provider = self.provider.read().await.clone();
|
|
||||||
}
|
|
||||||
|
|
||||||
match unlocked_provider.as_deref() {
|
|
||||||
Some(Web3Provider::Http(_client)) => {
|
|
||||||
// there is a "watch_blocks" function, but a lot of public nodes do not support the necessary rpc endpoints
|
// there is a "watch_blocks" function, but a lot of public nodes do not support the necessary rpc endpoints
|
||||||
// TODO: try watch_blocks and fall back to this?
|
// TODO: try watch_blocks and fall back to this?
|
||||||
|
|
||||||
@ -925,8 +931,9 @@ impl Web3Rpc {
|
|||||||
|
|
||||||
while !self.should_disconnect() {
|
while !self.should_disconnect() {
|
||||||
// TODO: what should the max_wait be?
|
// TODO: what should the max_wait be?
|
||||||
|
// we do not pass unlocked_provider because we want to get a new one each call. otherwise we might re-use an old one
|
||||||
match self
|
match self
|
||||||
.wait_for_request_handle(&authorization, None, unlocked_provider.clone())
|
.wait_for_request_handle(&authorization, None, None)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
Ok(active_request_handle) => {
|
Ok(active_request_handle) => {
|
||||||
@ -1009,10 +1016,10 @@ impl Web3Rpc {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Some(Web3Provider::Both(_, client)) | Some(Web3Provider::Ws(client)) => {
|
Web3Provider::Both(_, client) | Web3Provider::Ws(client) => {
|
||||||
// todo: move subscribe_blocks onto the request handle?
|
// todo: move subscribe_blocks onto the request handle?
|
||||||
let active_request_handle = self
|
let active_request_handle = self
|
||||||
.wait_for_request_handle(&authorization, None, unlocked_provider.clone())
|
.wait_for_request_handle(&authorization, None, Some(provider.clone()))
|
||||||
.await;
|
.await;
|
||||||
let mut stream = client.subscribe_blocks().await?;
|
let mut stream = client.subscribe_blocks().await?;
|
||||||
drop(active_request_handle);
|
drop(active_request_handle);
|
||||||
@ -1023,13 +1030,13 @@ impl Web3Rpc {
|
|||||||
// TODO: how does this get wrapped in an arc? does ethers handle that?
|
// TODO: how does this get wrapped in an arc? does ethers handle that?
|
||||||
// TODO: do this part over http?
|
// TODO: do this part over http?
|
||||||
let block: Result<Option<ArcBlock>, _> = self
|
let block: Result<Option<ArcBlock>, _> = self
|
||||||
.wait_for_request_handle(&authorization, None, unlocked_provider.clone())
|
.wait_for_request_handle(&authorization, None, Some(provider.clone()))
|
||||||
.await?
|
.await?
|
||||||
.request(
|
.request(
|
||||||
"eth_getBlockByNumber",
|
"eth_getBlockByNumber",
|
||||||
&json!(("latest", false)),
|
&json!(("latest", false)),
|
||||||
Level::Warn.into(),
|
Level::Warn.into(),
|
||||||
unlocked_provider.clone(),
|
Some(provider.clone()),
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
@ -1073,9 +1080,8 @@ impl Web3Rpc {
|
|||||||
// TODO: we probably don't want a warn and to return error
|
// TODO: we probably don't want a warn and to return error
|
||||||
debug!("new_heads subscription to {} ended", self);
|
debug!("new_heads subscription to {} ended", self);
|
||||||
}
|
}
|
||||||
None => unimplemented!("there should always be a provider"),
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
Some(Web3Provider::Mock) => unimplemented!(),
|
Web3Provider::Mock => unimplemented!(),
|
||||||
}
|
}
|
||||||
|
|
||||||
// clear the head block. this might not be needed, but it won't hurt
|
// clear the head block. this might not be needed, but it won't hurt
|
||||||
@ -1098,6 +1104,7 @@ impl Web3Rpc {
|
|||||||
let mut logged = false;
|
let mut logged = false;
|
||||||
while provider.is_none() {
|
while provider.is_none() {
|
||||||
// trace!("waiting on provider: locking...");
|
// trace!("waiting on provider: locking...");
|
||||||
|
// TODO: can we subscribe to something instead? should self.provider be a watch channel?
|
||||||
sleep(Duration::from_millis(100)).await;
|
sleep(Duration::from_millis(100)).await;
|
||||||
|
|
||||||
if !logged {
|
if !logged {
|
||||||
@ -1115,8 +1122,7 @@ impl Web3Rpc {
|
|||||||
// TODO: does this keep the lock open for too long?
|
// TODO: does this keep the lock open for too long?
|
||||||
match provider.as_deref() {
|
match provider.as_deref() {
|
||||||
None => {
|
None => {
|
||||||
// TODO: wait for a provider
|
unimplemented!("no provider");
|
||||||
return Err(anyhow!("no provider"));
|
|
||||||
}
|
}
|
||||||
Some(Web3Provider::Http(provider)) => {
|
Some(Web3Provider::Http(provider)) => {
|
||||||
// there is a "watch_pending_transactions" function, but a lot of public nodes do not support the necessary rpc endpoints
|
// there is a "watch_pending_transactions" function, but a lot of public nodes do not support the necessary rpc endpoints
|
||||||
|
Loading…
Reference in New Issue
Block a user