fix reconnect for http clients
This commit is contained in:
parent
7eaf6f3540
commit
5a16b9eed8
@ -161,9 +161,7 @@ impl Web3Connections {
|
|||||||
// if theres multiple, use petgraph to find the one on the main chain (and remove the others if they have enough confirmations)
|
// if theres multiple, use petgraph to find the one on the main chain (and remove the others if they have enough confirmations)
|
||||||
|
|
||||||
// be sure the requested block num exists
|
// be sure the requested block num exists
|
||||||
let head_block_num = self
|
let head_block_num = self.head_block_num().context("no servers in sync")?;
|
||||||
.head_block_num()
|
|
||||||
.ok_or_else(|| anyhow::anyhow!("no servers in sync"))?;
|
|
||||||
if num > &head_block_num {
|
if num > &head_block_num {
|
||||||
// TODO: i'm seeing this a lot when using ethspam. i dont know why though. i thought we delayed publishing
|
// TODO: i'm seeing this a lot when using ethspam. i dont know why though. i thought we delayed publishing
|
||||||
// TODO: instead of error, maybe just sleep and try again?
|
// TODO: instead of error, maybe just sleep and try again?
|
||||||
|
@ -28,6 +28,7 @@ pub struct Web3Connection {
|
|||||||
pub name: String,
|
pub name: String,
|
||||||
/// TODO: can we get this from the provider? do we even need it?
|
/// TODO: can we get this from the provider? do we even need it?
|
||||||
url: String,
|
url: String,
|
||||||
|
http_client: Option<reqwest::Client>,
|
||||||
/// keep track of currently open requests. We sort on this
|
/// keep track of currently open requests. We sort on this
|
||||||
pub(super) active_requests: AtomicU32,
|
pub(super) active_requests: AtomicU32,
|
||||||
/// keep track of total requests
|
/// keep track of total requests
|
||||||
@ -86,6 +87,7 @@ impl Web3Connection {
|
|||||||
|
|
||||||
let new_connection = Self {
|
let new_connection = Self {
|
||||||
name,
|
name,
|
||||||
|
http_client,
|
||||||
url: url_str.clone(),
|
url: url_str.clone(),
|
||||||
active_requests: 0.into(),
|
active_requests: 0.into(),
|
||||||
total_requests: 0.into(),
|
total_requests: 0.into(),
|
||||||
@ -102,7 +104,7 @@ impl Web3Connection {
|
|||||||
|
|
||||||
// connect to the server (with retries)
|
// connect to the server (with retries)
|
||||||
new_connection
|
new_connection
|
||||||
.retrying_reconnect(block_sender.as_ref())
|
.retrying_reconnect(block_sender.as_ref(), false)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
// check the server's chain_id here
|
// check the server's chain_id here
|
||||||
@ -177,12 +179,11 @@ impl Web3Connection {
|
|||||||
for block_data_limit in [u64::MAX, 90_000, 128, 64, 32] {
|
for block_data_limit in [u64::MAX, 90_000, 128, 64, 32] {
|
||||||
let mut head_block_id = self.head_block_id.read().clone();
|
let mut head_block_id = self.head_block_id.read().clone();
|
||||||
|
|
||||||
// TODO: wait until head block is set outside the loop? if we disconnect while starting we could actually get 0 though
|
// TODO: subscribe to a channel instead of polling. subscribe to http_interval_sender?
|
||||||
while head_block_id.is_none() {
|
while head_block_id.is_none() {
|
||||||
warn!(rpc=%self, "no head block yet. retrying");
|
warn!(rpc=%self, "no head block yet. retrying");
|
||||||
|
|
||||||
// TODO: subscribe to a channel instead of polling? subscribe to http_interval_sender?
|
sleep(Duration::from_secs(13)).await;
|
||||||
sleep(Duration::from_secs(1)).await;
|
|
||||||
|
|
||||||
head_block_id = self.head_block_id.read().clone();
|
head_block_id = self.head_block_id.read().clone();
|
||||||
}
|
}
|
||||||
@ -253,6 +254,7 @@ impl Web3Connection {
|
|||||||
pub async fn retrying_reconnect(
|
pub async fn retrying_reconnect(
|
||||||
self: &Arc<Self>,
|
self: &Arc<Self>,
|
||||||
block_sender: Option<&flume::Sender<BlockAndRpc>>,
|
block_sender: Option<&flume::Sender<BlockAndRpc>>,
|
||||||
|
initial_sleep: bool,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
// there are several crates that have retry helpers, but they all seem more complex than necessary
|
// there are several crates that have retry helpers, but they all seem more complex than necessary
|
||||||
let base_ms = 500;
|
let base_ms = 500;
|
||||||
@ -260,11 +262,18 @@ impl Web3Connection {
|
|||||||
let range_multiplier = 3;
|
let range_multiplier = 3;
|
||||||
|
|
||||||
// sleep once before the initial retry attempt
|
// sleep once before the initial retry attempt
|
||||||
let mut sleep_ms = min(
|
// TODO: now that we use this method for our initial connection, do we still want this sleep?
|
||||||
cap_ms,
|
let mut sleep_ms = if initial_sleep {
|
||||||
rand::thread_rng().gen_range(base_ms..(base_ms * range_multiplier)),
|
let first_sleep_ms = min(
|
||||||
);
|
cap_ms,
|
||||||
sleep(Duration::from_millis(sleep_ms)).await;
|
rand::thread_rng().gen_range(base_ms..(base_ms * range_multiplier)),
|
||||||
|
);
|
||||||
|
sleep(Duration::from_millis(first_sleep_ms)).await;
|
||||||
|
|
||||||
|
first_sleep_ms
|
||||||
|
} else {
|
||||||
|
base_ms
|
||||||
|
};
|
||||||
|
|
||||||
// retry until we succeed
|
// retry until we succeed
|
||||||
while let Err(err) = self.reconnect(block_sender).await {
|
while let Err(err) = self.reconnect(block_sender).await {
|
||||||
@ -290,8 +299,6 @@ impl Web3Connection {
|
|||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
// TODO: no-op if this called on a http provider
|
// TODO: no-op if this called on a http provider
|
||||||
// websocket doesn't need the http client
|
// websocket doesn't need the http client
|
||||||
let http_client = None;
|
|
||||||
|
|
||||||
info!(rpc=%self, "reconnecting");
|
info!(rpc=%self, "reconnecting");
|
||||||
|
|
||||||
// since this lock is held open over an await, we use tokio's locking
|
// since this lock is held open over an await, we use tokio's locking
|
||||||
@ -316,7 +323,7 @@ impl Web3Connection {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// TODO: if this fails, keep retrying! otherwise it crashes and doesn't try again!
|
// TODO: if this fails, keep retrying! otherwise it crashes and doesn't try again!
|
||||||
let new_provider = Web3Provider::from_str(&self.url, http_client).await?;
|
let new_provider = Web3Provider::from_str(&self.url, self.http_client.clone()).await?;
|
||||||
|
|
||||||
*provider = Some(Arc::new(new_provider));
|
*provider = Some(Arc::new(new_provider));
|
||||||
|
|
||||||
@ -502,7 +509,7 @@ impl Web3Connection {
|
|||||||
"subscription exited",
|
"subscription exited",
|
||||||
);
|
);
|
||||||
|
|
||||||
self.retrying_reconnect(block_sender.as_ref()).await?;
|
self.retrying_reconnect(block_sender.as_ref(), true).await?;
|
||||||
} else {
|
} else {
|
||||||
error!(rpc=%self, ?err, "subscription exited");
|
error!(rpc=%self, ?err, "subscription exited");
|
||||||
return Err(err);
|
return Err(err);
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
use std::time::Duration;
|
use anyhow::Context;
|
||||||
|
|
||||||
use derive_more::From;
|
use derive_more::From;
|
||||||
|
use std::time::Duration;
|
||||||
use tracing::{info_span, instrument, Instrument};
|
use tracing::{info_span, instrument, Instrument};
|
||||||
|
|
||||||
/// Use HTTP and WS providers.
|
/// Use HTTP and WS providers.
|
||||||
@ -28,7 +28,7 @@ impl Web3Provider {
|
|||||||
let provider = if url_str.starts_with("http") {
|
let provider = if url_str.starts_with("http") {
|
||||||
let url: url::Url = url_str.parse()?;
|
let url: url::Url = url_str.parse()?;
|
||||||
|
|
||||||
let http_client = http_client.ok_or_else(|| anyhow::anyhow!("no http_client"))?;
|
let http_client = http_client.context("no http_client")?;
|
||||||
|
|
||||||
let provider = ethers::providers::Http::new_with_client(url, http_client);
|
let provider = ethers::providers::Http::new_with_client(url, http_client);
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user