replace subscribe_stop_rx with aborts
This commit is contained in:
parent
9786fb58b2
commit
3bf7f6c34e
@ -154,6 +154,8 @@ impl Web3Rpcs {
|
|||||||
let handle = {
|
let handle = {
|
||||||
let connections = connections.clone();
|
let connections = connections.clone();
|
||||||
|
|
||||||
|
// TODO: do we actually want this spawned? i think we already spawned to get here
|
||||||
|
// todo!(this task is waking itself ~50% of the time. that seems bad)
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
connections
|
connections
|
||||||
.process_incoming_blocks(block_and_rpc_receiver)
|
.process_incoming_blocks(block_and_rpc_receiver)
|
||||||
|
@ -87,6 +87,7 @@ pub struct Web3Rpc {
|
|||||||
/// Track in-flight requests
|
/// Track in-flight requests
|
||||||
pub(super) active_requests: AtomicUsize,
|
pub(super) active_requests: AtomicUsize,
|
||||||
/// disconnect_watch is only inside an Option so that the "Default" derive works. it will always be set.
|
/// disconnect_watch is only inside an Option so that the "Default" derive works. it will always be set.
|
||||||
|
/// todo!(qthis gets cloned a TON. probably too much. something seems wrong)
|
||||||
pub(super) disconnect_watch: Option<watch::Sender<bool>>,
|
pub(super) disconnect_watch: Option<watch::Sender<bool>>,
|
||||||
/// created_at is only inside an Option so that the "Default" derive works. it will always be set.
|
/// created_at is only inside an Option so that the "Default" derive works. it will always be set.
|
||||||
pub(super) created_at: Option<Instant>,
|
pub(super) created_at: Option<Instant>,
|
||||||
@ -739,31 +740,7 @@ impl Web3Rpc {
|
|||||||
.web3_context("failed check_provider")?;
|
.web3_context("failed check_provider")?;
|
||||||
|
|
||||||
let mut futures = Vec::new();
|
let mut futures = Vec::new();
|
||||||
|
let mut abort_handles = vec![];
|
||||||
// TODO: use this channel instead of self.disconnect_watch
|
|
||||||
let (subscribe_stop_tx, mut subscribe_stop_rx) = watch::channel(false);
|
|
||||||
|
|
||||||
// subscribe to the disconnect watch. the app uses this when shutting down or when configs change
|
|
||||||
if let Some(disconnect_watch_tx) = self.disconnect_watch.as_ref() {
|
|
||||||
let rpc = self.clone();
|
|
||||||
let mut disconnect_watch_rx = disconnect_watch_tx.subscribe();
|
|
||||||
|
|
||||||
let f = async move {
|
|
||||||
loop {
|
|
||||||
if *disconnect_watch_rx.borrow_and_update() {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
disconnect_watch_rx.changed().await?;
|
|
||||||
}
|
|
||||||
trace!("disconnect triggered on {}", rpc);
|
|
||||||
Ok(())
|
|
||||||
};
|
|
||||||
|
|
||||||
futures.push(tokio::spawn(f));
|
|
||||||
} else {
|
|
||||||
unimplemented!("there should always be a disconnect watch!");
|
|
||||||
}
|
|
||||||
|
|
||||||
// health check that runs if there haven't been any recent requests
|
// health check that runs if there haven't been any recent requests
|
||||||
if let Some(block_and_rpc_sender) = block_and_rpc_sender.clone() {
|
if let Some(block_and_rpc_sender) = block_and_rpc_sender.clone() {
|
||||||
@ -781,14 +758,6 @@ impl Web3Rpc {
|
|||||||
let mut old_total_requests = 0;
|
let mut old_total_requests = 0;
|
||||||
let mut new_total_requests;
|
let mut new_total_requests;
|
||||||
|
|
||||||
if *subscribe_stop_rx.borrow_and_update() {
|
|
||||||
trace!(
|
|
||||||
"stopping healthcheck loop before even starting it on {}",
|
|
||||||
rpc
|
|
||||||
);
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
|
|
||||||
// errors here should not cause the loop to exit!
|
// errors here should not cause the loop to exit!
|
||||||
loop {
|
loop {
|
||||||
new_total_requests = rpc.internal_requests.load(atomic::Ordering::Relaxed)
|
new_total_requests = rpc.internal_requests.load(atomic::Ordering::Relaxed)
|
||||||
@ -818,22 +787,8 @@ impl Web3Rpc {
|
|||||||
// TODO: should we count the requests done inside this health check
|
// TODO: should we count the requests done inside this health check
|
||||||
old_total_requests = new_total_requests;
|
old_total_requests = new_total_requests;
|
||||||
|
|
||||||
select! {
|
sleep(Duration::from_secs(health_sleep_seconds)).await;
|
||||||
x = subscribe_stop_rx.changed() => {
|
|
||||||
if *subscribe_stop_rx.borrow_and_update() || x.is_err() {
|
|
||||||
trace!(%rpc, "stopping http block subscription");
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
_ = sleep(Duration::from_secs(health_sleep_seconds)) => {
|
|
||||||
// time for another health check
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
trace!("healthcheck loop on {} exited", rpc);
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
};
|
};
|
||||||
|
|
||||||
// TODO: log quick_check lik
|
// TODO: log quick_check lik
|
||||||
@ -851,7 +806,11 @@ impl Web3Rpc {
|
|||||||
|
|
||||||
self.healthy.store(initial_check, atomic::Ordering::Relaxed);
|
self.healthy.store(initial_check, atomic::Ordering::Relaxed);
|
||||||
|
|
||||||
futures.push(tokio::spawn(f));
|
let h = tokio::spawn(f);
|
||||||
|
let a = h.abort_handle();
|
||||||
|
|
||||||
|
futures.push(h);
|
||||||
|
abort_handles.push(a);
|
||||||
} else {
|
} else {
|
||||||
self.healthy.store(true, atomic::Ordering::Relaxed);
|
self.healthy.store(true, atomic::Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
@ -859,46 +818,39 @@ impl Web3Rpc {
|
|||||||
// subscribe to new heads
|
// subscribe to new heads
|
||||||
if let Some(block_and_rpc_sender) = block_and_rpc_sender.clone() {
|
if let Some(block_and_rpc_sender) = block_and_rpc_sender.clone() {
|
||||||
let clone = self.clone();
|
let clone = self.clone();
|
||||||
let mut subscribe_stop_rx = subscribe_stop_tx.subscribe();
|
|
||||||
let block_map = block_map.clone();
|
let block_map = block_map.clone();
|
||||||
|
|
||||||
if *subscribe_stop_rx.borrow_and_update() {
|
|
||||||
return Err(anyhow::anyhow!(
|
|
||||||
"exiting subscribe_new_heads before even starting on {}",
|
|
||||||
self
|
|
||||||
)
|
|
||||||
.into());
|
|
||||||
}
|
|
||||||
|
|
||||||
let f = async move {
|
let f = async move {
|
||||||
clone
|
clone
|
||||||
.subscribe_new_heads(block_and_rpc_sender.clone(), block_map, subscribe_stop_rx)
|
.subscribe_new_heads(block_and_rpc_sender.clone(), block_map)
|
||||||
.await
|
.await
|
||||||
};
|
};
|
||||||
|
|
||||||
futures.push(tokio::spawn(f));
|
let h = tokio::spawn(f);
|
||||||
|
let a = h.abort_handle();
|
||||||
|
|
||||||
|
futures.push(h);
|
||||||
|
abort_handles.push(a);
|
||||||
}
|
}
|
||||||
|
|
||||||
// subscribe to new transactions
|
// subscribe to new transactions
|
||||||
|
if self.subscribe_txs && self.ws_provider.load().is_some() {
|
||||||
if let Some(pending_txid_firehose) = pending_txid_firehose.clone() {
|
if let Some(pending_txid_firehose) = pending_txid_firehose.clone() {
|
||||||
let clone = self.clone();
|
let clone = self.clone();
|
||||||
let mut subscribe_stop_rx = subscribe_stop_tx.subscribe();
|
|
||||||
|
|
||||||
if *subscribe_stop_rx.borrow_and_update() {
|
|
||||||
return Err(anyhow::anyhow!(
|
|
||||||
"exiting subscribe_new_heads before even starting on {}",
|
|
||||||
self
|
|
||||||
)
|
|
||||||
.into());
|
|
||||||
}
|
|
||||||
|
|
||||||
let f = async move {
|
let f = async move {
|
||||||
clone
|
clone
|
||||||
.subscribe_new_transactions(pending_txid_firehose, subscribe_stop_rx)
|
.subscribe_new_transactions(pending_txid_firehose)
|
||||||
.await
|
.await
|
||||||
};
|
};
|
||||||
|
|
||||||
futures.push(tokio::spawn(f));
|
// TODO: this is waking itself alot
|
||||||
|
let h = tokio::spawn(f);
|
||||||
|
let a = h.abort_handle();
|
||||||
|
|
||||||
|
futures.push(h);
|
||||||
|
abort_handles.push(a);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// exit if any of the futures exit
|
// exit if any of the futures exit
|
||||||
@ -913,9 +865,10 @@ impl Web3Rpc {
|
|||||||
.await?
|
.await?
|
||||||
};
|
};
|
||||||
|
|
||||||
subscribe_stop_tx.send_replace(true);
|
// stop the other futures
|
||||||
|
for a in abort_handles {
|
||||||
// TODO: wait for all of the futures to exit?
|
a.abort();
|
||||||
|
}
|
||||||
|
|
||||||
// TODO: tell ethers to disconnect?
|
// TODO: tell ethers to disconnect?
|
||||||
self.ws_provider.store(None);
|
self.ws_provider.store(None);
|
||||||
@ -926,21 +879,9 @@ impl Web3Rpc {
|
|||||||
async fn subscribe_new_transactions(
|
async fn subscribe_new_transactions(
|
||||||
self: &Arc<Self>,
|
self: &Arc<Self>,
|
||||||
pending_txid_firehose: Arc<DedupedBroadcaster<TxHash>>,
|
pending_txid_firehose: Arc<DedupedBroadcaster<TxHash>>,
|
||||||
mut subscribe_stop_rx: watch::Receiver<bool>,
|
|
||||||
) -> Web3ProxyResult<()> {
|
) -> Web3ProxyResult<()> {
|
||||||
trace!("subscribing to new transactions on {}", self);
|
trace!("subscribing to new transactions on {}", self);
|
||||||
|
|
||||||
// rpcs opt-into subscribing to transactions. its a lot of bandwidth
|
|
||||||
if !self.subscribe_txs {
|
|
||||||
loop {
|
|
||||||
if *subscribe_stop_rx.borrow_and_update() {
|
|
||||||
trace!("stopping ws block subscription on {}", self);
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
subscribe_stop_rx.changed().await?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Some(ws_provider) = self.ws_provider.load().as_ref() {
|
if let Some(ws_provider) = self.ws_provider.load().as_ref() {
|
||||||
// todo: move subscribe_blocks onto the request handle instead of having a seperate wait_for_throttle
|
// todo: move subscribe_blocks onto the request handle instead of having a seperate wait_for_throttle
|
||||||
self.wait_for_throttle(Instant::now() + Duration::from_secs(5))
|
self.wait_for_throttle(Instant::now() + Duration::from_secs(5))
|
||||||
@ -949,31 +890,14 @@ impl Web3Rpc {
|
|||||||
// TODO: only subscribe if a user has subscribed
|
// TODO: only subscribe if a user has subscribed
|
||||||
let mut pending_txs_sub = ws_provider.subscribe_pending_txs().await?;
|
let mut pending_txs_sub = ws_provider.subscribe_pending_txs().await?;
|
||||||
|
|
||||||
loop {
|
while let Some(x) = pending_txs_sub.next().await {
|
||||||
select! {
|
|
||||||
x = subscribe_stop_rx.changed() => {
|
|
||||||
if *subscribe_stop_rx.borrow_and_update() || x.is_err() {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
x = pending_txs_sub.next() => {
|
|
||||||
if let Some(x) = x {
|
|
||||||
pending_txid_firehose.send(x).await;
|
pending_txid_firehose.send(x).await;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
// only websockets subscribe to pending transactions
|
// only websockets subscribe to pending transactions
|
||||||
// its possible to do with http, but not recommended
|
// its possible to do with http, but not recommended
|
||||||
// TODO: what should we do here?
|
// TODO: what should we do here?
|
||||||
loop {
|
unimplemented!()
|
||||||
if *subscribe_stop_rx.borrow_and_update() {
|
|
||||||
trace!("stopping ws block subscription on {}", self);
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
subscribe_stop_rx.changed().await?;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
@ -984,7 +908,6 @@ impl Web3Rpc {
|
|||||||
self: &Arc<Self>,
|
self: &Arc<Self>,
|
||||||
block_sender: mpsc::UnboundedSender<BlockAndRpc>,
|
block_sender: mpsc::UnboundedSender<BlockAndRpc>,
|
||||||
block_map: BlocksByHashCache,
|
block_map: BlocksByHashCache,
|
||||||
mut subscribe_stop_rx: watch::Receiver<bool>,
|
|
||||||
) -> Web3ProxyResult<()> {
|
) -> Web3ProxyResult<()> {
|
||||||
trace!("subscribing to new heads on {}", self);
|
trace!("subscribing to new heads on {}", self);
|
||||||
|
|
||||||
@ -1018,22 +941,12 @@ impl Web3Rpc {
|
|||||||
self.send_head_block_result(latest_block, &block_sender, &block_map)
|
self.send_head_block_result(latest_block, &block_sender, &block_map)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
loop {
|
while let Some(block) = blocks.next().await {
|
||||||
select! {
|
let block = Ok(Some(Arc::new(block)));
|
||||||
x = subscribe_stop_rx.changed() => {
|
|
||||||
if *subscribe_stop_rx.borrow_and_update() || x.is_err() {
|
|
||||||
trace!(%self, "stopping websocket block subscription");
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
block = blocks.next() => {
|
|
||||||
let block = block.map(Arc::new);
|
|
||||||
|
|
||||||
self.send_head_block_result(Ok(block), &block_sender, &block_map)
|
self.send_head_block_result(block, &block_sender, &block_map)
|
||||||
.await?;
|
.await?;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
}
|
|
||||||
} else if self.http_client.is_some() {
|
} else if self.http_client.is_some() {
|
||||||
// there is a "watch_blocks" function, but a lot of public nodes (including llamanodes) do not support the necessary rpc endpoints
|
// there is a "watch_blocks" function, but a lot of public nodes (including llamanodes) do not support the necessary rpc endpoints
|
||||||
// TODO: is 1/2 the block time okay?
|
// TODO: is 1/2 the block time okay?
|
||||||
@ -1054,15 +967,7 @@ impl Web3Rpc {
|
|||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
// TODO: should this select be at the start or end of the loop?
|
// TODO: should this select be at the start or end of the loop?
|
||||||
select! {
|
i.tick().await;
|
||||||
x = subscribe_stop_rx.changed() => {
|
|
||||||
if *subscribe_stop_rx.borrow_and_update() || x.is_err() {
|
|
||||||
trace!(%self, "stopping http block subscription");
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
_ = i.tick() => {}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
return Err(anyhow!("no ws or http provider!").into());
|
return Err(anyhow!("no ws or http provider!").into());
|
||||||
@ -1072,7 +977,7 @@ impl Web3Rpc {
|
|||||||
self.send_head_block_result(Ok(None), &block_sender, &block_map)
|
self.send_head_block_result(Ok(None), &block_sender, &block_map)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
if *subscribe_stop_rx.borrow() {
|
if self.should_disconnect() {
|
||||||
trace!(%self, "new heads subscription exited");
|
trace!(%self, "new heads subscription exited");
|
||||||
Ok(())
|
Ok(())
|
||||||
} else {
|
} else {
|
||||||
|
Loading…
Reference in New Issue
Block a user