From 5a16b9eed8cb3c5d0b3d7d6484bf7a7a759a0d29 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Wed, 14 Sep 2022 02:11:48 +0000 Subject: [PATCH] fix reconnect for http clients --- web3_proxy/src/rpcs/blockchain.rs | 4 +--- web3_proxy/src/rpcs/connection.rs | 33 +++++++++++++++++++------------ web3_proxy/src/rpcs/provider.rs | 6 +++--- 3 files changed, 24 insertions(+), 19 deletions(-) diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index c404955d..d71b626f 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -161,9 +161,7 @@ impl Web3Connections { // if theres multiple, use petgraph to find the one on the main chain (and remove the others if they have enough confirmations) // be sure the requested block num exists - let head_block_num = self - .head_block_num() - .ok_or_else(|| anyhow::anyhow!("no servers in sync"))?; + let head_block_num = self.head_block_num().context("no servers in sync")?; if num > &head_block_num { // TODO: i'm seeing this a lot when using ethspam. i dont know why though. i thought we delayed publishing // TODO: instead of error, maybe just sleep and try again? diff --git a/web3_proxy/src/rpcs/connection.rs b/web3_proxy/src/rpcs/connection.rs index ceacb035..a1c1c7e4 100644 --- a/web3_proxy/src/rpcs/connection.rs +++ b/web3_proxy/src/rpcs/connection.rs @@ -28,6 +28,7 @@ pub struct Web3Connection { pub name: String, /// TODO: can we get this from the provider? do we even need it? url: String, + http_client: Option, /// keep track of currently open requests. We sort on this pub(super) active_requests: AtomicU32, /// keep track of total requests @@ -86,6 +87,7 @@ impl Web3Connection { let new_connection = Self { name, + http_client, url: url_str.clone(), active_requests: 0.into(), total_requests: 0.into(), @@ -102,7 +104,7 @@ impl Web3Connection { // connect to the server (with retries) new_connection - .retrying_reconnect(block_sender.as_ref()) + .retrying_reconnect(block_sender.as_ref(), false) .await?; // check the server's chain_id here @@ -177,12 +179,11 @@ impl Web3Connection { for block_data_limit in [u64::MAX, 90_000, 128, 64, 32] { let mut head_block_id = self.head_block_id.read().clone(); - // TODO: wait until head block is set outside the loop? if we disconnect while starting we could actually get 0 though + // TODO: subscribe to a channel instead of polling. subscribe to http_interval_sender? while head_block_id.is_none() { warn!(rpc=%self, "no head block yet. retrying"); - // TODO: subscribe to a channel instead of polling? subscribe to http_interval_sender? - sleep(Duration::from_secs(1)).await; + sleep(Duration::from_secs(13)).await; head_block_id = self.head_block_id.read().clone(); } @@ -253,6 +254,7 @@ impl Web3Connection { pub async fn retrying_reconnect( self: &Arc, block_sender: Option<&flume::Sender>, + initial_sleep: bool, ) -> anyhow::Result<()> { // there are several crates that have retry helpers, but they all seem more complex than necessary let base_ms = 500; @@ -260,11 +262,18 @@ impl Web3Connection { let range_multiplier = 3; // sleep once before the initial retry attempt - let mut sleep_ms = min( - cap_ms, - rand::thread_rng().gen_range(base_ms..(base_ms * range_multiplier)), - ); - sleep(Duration::from_millis(sleep_ms)).await; + // TODO: now that we use this method for our initial connection, do we still want this sleep? + let mut sleep_ms = if initial_sleep { + let first_sleep_ms = min( + cap_ms, + rand::thread_rng().gen_range(base_ms..(base_ms * range_multiplier)), + ); + sleep(Duration::from_millis(first_sleep_ms)).await; + + first_sleep_ms + } else { + base_ms + }; // retry until we succeed while let Err(err) = self.reconnect(block_sender).await { @@ -290,8 +299,6 @@ impl Web3Connection { ) -> anyhow::Result<()> { // TODO: no-op if this called on a http provider // websocket doesn't need the http client - let http_client = None; - info!(rpc=%self, "reconnecting"); // since this lock is held open over an await, we use tokio's locking @@ -316,7 +323,7 @@ impl Web3Connection { } // TODO: if this fails, keep retrying! otherwise it crashes and doesn't try again! - let new_provider = Web3Provider::from_str(&self.url, http_client).await?; + let new_provider = Web3Provider::from_str(&self.url, self.http_client.clone()).await?; *provider = Some(Arc::new(new_provider)); @@ -502,7 +509,7 @@ impl Web3Connection { "subscription exited", ); - self.retrying_reconnect(block_sender.as_ref()).await?; + self.retrying_reconnect(block_sender.as_ref(), true).await?; } else { error!(rpc=%self, ?err, "subscription exited"); return Err(err); diff --git a/web3_proxy/src/rpcs/provider.rs b/web3_proxy/src/rpcs/provider.rs index baf73ef2..f02f5551 100644 --- a/web3_proxy/src/rpcs/provider.rs +++ b/web3_proxy/src/rpcs/provider.rs @@ -1,6 +1,6 @@ -use std::time::Duration; - +use anyhow::Context; use derive_more::From; +use std::time::Duration; use tracing::{info_span, instrument, Instrument}; /// Use HTTP and WS providers. @@ -28,7 +28,7 @@ impl Web3Provider { let provider = if url_str.starts_with("http") { let url: url::Url = url_str.parse()?; - let http_client = http_client.ok_or_else(|| anyhow::anyhow!("no http_client"))?; + let http_client = http_client.context("no http_client")?; let provider = ethers::providers::Http::new_with_client(url, http_client);