improve reconnect logic

This commit is contained in:
Bryan Stitt 2023-03-22 16:16:15 -07:00
parent 86b4f39a75
commit 7b223efa4d
2 changed files with 52 additions and 20 deletions

@ -370,7 +370,7 @@ impl Web3Rpcs {
)
.await
{
warn!("unable to process block from rpc {}: {:#?}", rpc_name, err);
warn!("error while processing block from rpc {}: {:#?}", rpc_name, err);
}
}
Err(err) => {

@ -11,6 +11,7 @@ use ethers::prelude::{Bytes, Middleware, ProviderError, TxHash, H256, U64};
use ethers::types::{Address, Transaction, U256};
use futures::future::try_join_all;
use futures::StreamExt;
use futures::stream::FuturesUnordered;
use log::{debug, error, info, trace, warn, Level};
use migration::sea_orm::DatabaseConnection;
use ordered_float::OrderedFloat;
@ -701,8 +702,12 @@ impl Web3Rpc {
RequestRevertHandler::ErrorLevel
};
// this does loop. just only when reconnect is enabled
#[allow(clippy::never_loop)]
loop {
let mut futures = vec![];
debug!("subscription loop started");
let mut futures = FuturesUnordered::new();
let http_interval_receiver = http_interval_sender.as_ref().map(|x| x.subscribe());
@ -735,6 +740,7 @@ impl Web3Rpc {
// health check loop
loop {
// TODO: do we need this to be abortable?
if rpc.should_disconnect() {
break;
}
@ -839,6 +845,7 @@ impl Web3Rpc {
}
if let Some(block_sender) = &block_sender {
// TODO: do we need this to be abortable?
let f = self.clone().subscribe_new_heads(
authorization.clone(),
http_interval_receiver,
@ -850,6 +857,7 @@ impl Web3Rpc {
}
if let Some(tx_id_sender) = &tx_id_sender {
// TODO: do we need this to be abortable?
let f = self
.clone()
.subscribe_pending_transactions(authorization.clone(), tx_id_sender.clone());
@ -857,32 +865,48 @@ impl Web3Rpc {
futures.push(flatten_handle(tokio::spawn(f)));
}
match try_join_all(futures).await {
Ok(_) => {
// futures all exited without error. break instead of restarting subscriptions
break;
}
Err(err) => {
if self.reconnect.load(atomic::Ordering::Acquire) {
warn!("{} connection ended. err={:?}", self, err);
while let Some(x) = futures.next().await {
match x {
Ok(_) => {
// future exited without error
// TODO: think about this more. we never set it to false. this can't be right
info!("future on {} exited successfully", self)
}
Err(err) => {
if self.reconnect.load(atomic::Ordering::Acquire) {
warn!("{} connection ended. reconnecting. err={:?}", self, err);
self.clone()
.retrying_connect(
let disconnect_sender = self.disconnect_watch.as_ref().unwrap();
// TODO: i'm not sure if this is necessary, but telling everything to disconnect seems like a better idea than relying on timeouts and dropped futures.
disconnect_sender.send_replace(true);
disconnect_sender.send_replace(false);
// we call retrying_connect here with initial_delay=true. above, initial_delay=false
self.retrying_connect(
block_sender.as_ref(),
chain_id,
authorization.db_conn.as_ref(),
true,
)
.await?;
} else if *disconnect_receiver.borrow() {
info!("{} is disconnecting", self);
break;
} else {
error!("{} subscription exited. err={:?}", self, err);
return Err(err);
continue;
}
// reconnect is not enabled.
if *disconnect_receiver.borrow() {
info!("{} is disconnecting", self);
break;
} else {
error!("{} subscription exited. err={:?}", self, err);
break;
}
}
}
}
break;
}
info!("all subscriptions on {} completed", self);
@ -1070,7 +1094,11 @@ impl Web3Rpc {
self.send_head_block_result(Ok(None), &block_sender, block_map)
.await?;
Ok(())
if self.should_disconnect() {
Ok(())
} else {
Err(anyhow!("new_heads subscription exited. reconnect needed"))
}
}
/// Turn on the firehose of pending transactions
@ -1124,7 +1152,11 @@ impl Web3Rpc {
}
}
Ok(())
if self.should_disconnect() {
Ok(())
} else {
Err(anyhow!("pending_transactions subscription exited. reconnect needed"))
}
}
/// be careful with this; it might wait forever!