improve no-op subscription
can still be improved more
This commit is contained in:
parent
bb96757452
commit
83a7b03dea
@ -16,10 +16,10 @@ use std::fmt;
|
|||||||
use std::hash::{Hash, Hasher};
|
use std::hash::{Hash, Hasher};
|
||||||
use std::sync::atomic::{self, AtomicU32, AtomicU64};
|
use std::sync::atomic::{self, AtomicU32, AtomicU64};
|
||||||
use std::{cmp::Ordering, sync::Arc};
|
use std::{cmp::Ordering, sync::Arc};
|
||||||
use tokio::sync::broadcast;
|
|
||||||
use tokio::sync::RwLock as AsyncRwLock;
|
use tokio::sync::RwLock as AsyncRwLock;
|
||||||
|
use tokio::sync::{broadcast, oneshot};
|
||||||
use tokio::time::{interval, sleep, sleep_until, Duration, MissedTickBehavior};
|
use tokio::time::{interval, sleep, sleep_until, Duration, MissedTickBehavior};
|
||||||
use tracing::{error, info, instrument, trace, warn};
|
use tracing::{debug, error, info, instrument, trace, warn};
|
||||||
|
|
||||||
/// An active connection to a Web3Rpc
|
/// An active connection to a Web3Rpc
|
||||||
pub struct Web3Connection {
|
pub struct Web3Connection {
|
||||||
@ -385,6 +385,8 @@ impl Web3Connection {
|
|||||||
reconnect: bool,
|
reconnect: bool,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
loop {
|
loop {
|
||||||
|
debug!(rpc=%self, "subscribing");
|
||||||
|
|
||||||
let http_interval_receiver = http_interval_sender.as_ref().map(|x| x.subscribe());
|
let http_interval_receiver = http_interval_sender.as_ref().map(|x| x.subscribe());
|
||||||
|
|
||||||
let mut futures = vec![];
|
let mut futures = vec![];
|
||||||
@ -407,10 +409,20 @@ impl Web3Connection {
|
|||||||
futures.push(flatten_handle(tokio::spawn(f)));
|
futures.push(flatten_handle(tokio::spawn(f)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let mut never_shot = None;
|
||||||
|
|
||||||
if futures.is_empty() {
|
if futures.is_empty() {
|
||||||
// TODO: is there a better way to make a channel that is never ready?
|
|
||||||
info!(rpc=%self, "no-op subscription");
|
info!(rpc=%self, "no-op subscription");
|
||||||
return Ok(());
|
|
||||||
|
// TODO: is there a better way to make a channel that is never ready?
|
||||||
|
// TODO: this is wrong! we still need retries! have this do a health check on an interval instead
|
||||||
|
let (tx, rx) = oneshot::channel();
|
||||||
|
|
||||||
|
never_shot = Some(tx);
|
||||||
|
|
||||||
|
let f = async move { rx.await.map_err(Into::into) };
|
||||||
|
|
||||||
|
futures.push(flatten_handle(tokio::spawn(f)));
|
||||||
}
|
}
|
||||||
|
|
||||||
match try_join_all(futures).await {
|
match try_join_all(futures).await {
|
||||||
@ -436,8 +448,12 @@ impl Web3Connection {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
drop(never_shot);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
info!(rpc=%self, "subscription complete");
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user