From 7b223efa4db0641799f4f3fc742f8b0fb260d505 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Wed, 22 Mar 2023 16:16:15 -0700 Subject: [PATCH] improve reconnect logic --- web3_proxy/src/rpcs/blockchain.rs | 2 +- web3_proxy/src/rpcs/one.rs | 70 ++++++++++++++++++++++--------- 2 files changed, 52 insertions(+), 20 deletions(-) diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index 543513e6..a51a9032 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -370,7 +370,7 @@ impl Web3Rpcs { ) .await { - warn!("unable to process block from rpc {}: {:#?}", rpc_name, err); + warn!("error while processing block from rpc {}: {:#?}", rpc_name, err); } } Err(err) => { diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index 2ac4a765..bb42574f 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -11,6 +11,7 @@ use ethers::prelude::{Bytes, Middleware, ProviderError, TxHash, H256, U64}; use ethers::types::{Address, Transaction, U256}; use futures::future::try_join_all; use futures::StreamExt; +use futures::stream::FuturesUnordered; use log::{debug, error, info, trace, warn, Level}; use migration::sea_orm::DatabaseConnection; use ordered_float::OrderedFloat; @@ -701,8 +702,12 @@ impl Web3Rpc { RequestRevertHandler::ErrorLevel }; + // this does loop. just only when reconnect is enabled + #[allow(clippy::never_loop)] loop { - let mut futures = vec![]; + debug!("subscription loop started"); + + let mut futures = FuturesUnordered::new(); let http_interval_receiver = http_interval_sender.as_ref().map(|x| x.subscribe()); @@ -735,6 +740,7 @@ impl Web3Rpc { // health check loop loop { + // TODO: do we need this to be abortable? if rpc.should_disconnect() { break; } @@ -839,6 +845,7 @@ impl Web3Rpc { } if let Some(block_sender) = &block_sender { + // TODO: do we need this to be abortable? let f = self.clone().subscribe_new_heads( authorization.clone(), http_interval_receiver, @@ -850,6 +857,7 @@ impl Web3Rpc { } if let Some(tx_id_sender) = &tx_id_sender { + // TODO: do we need this to be abortable? let f = self .clone() .subscribe_pending_transactions(authorization.clone(), tx_id_sender.clone()); @@ -857,32 +865,48 @@ impl Web3Rpc { futures.push(flatten_handle(tokio::spawn(f))); } - match try_join_all(futures).await { - Ok(_) => { - // futures all exited without error. break instead of restarting subscriptions - break; - } - Err(err) => { - if self.reconnect.load(atomic::Ordering::Acquire) { - warn!("{} connection ended. err={:?}", self, err); + while let Some(x) = futures.next().await { + match x { + Ok(_) => { + // future exited without error + // TODO: think about this more. we never set it to false. this can't be right + info!("future on {} exited successfully", self) + } + Err(err) => { + if self.reconnect.load(atomic::Ordering::Acquire) { + warn!("{} connection ended. reconnecting. err={:?}", self, err); - self.clone() - .retrying_connect( + let disconnect_sender = self.disconnect_watch.as_ref().unwrap(); + + // TODO: i'm not sure if this is necessary, but telling everything to disconnect seems like a better idea than relying on timeouts and dropped futures. + disconnect_sender.send_replace(true); + disconnect_sender.send_replace(false); + + // we call retrying_connect here with initial_delay=true. above, initial_delay=false + self.retrying_connect( block_sender.as_ref(), chain_id, authorization.db_conn.as_ref(), true, ) .await?; - } else if *disconnect_receiver.borrow() { - info!("{} is disconnecting", self); - break; - } else { - error!("{} subscription exited. err={:?}", self, err); - return Err(err); + + continue; + } + + // reconnect is not enabled. + if *disconnect_receiver.borrow() { + info!("{} is disconnecting", self); + break; + } else { + error!("{} subscription exited. err={:?}", self, err); + break; + } } } } + + break; } info!("all subscriptions on {} completed", self); @@ -1070,7 +1094,11 @@ impl Web3Rpc { self.send_head_block_result(Ok(None), &block_sender, block_map) .await?; - Ok(()) + if self.should_disconnect() { + Ok(()) + } else { + Err(anyhow!("new_heads subscription exited. reconnect needed")) + } } /// Turn on the firehose of pending transactions @@ -1124,7 +1152,11 @@ impl Web3Rpc { } } - Ok(()) + if self.should_disconnect() { + Ok(()) + } else { + Err(anyhow!("pending_transactions subscription exited. reconnect needed")) + } } /// be careful with this; it might wait forever!