automatically reconnect

This commit is contained in:
Bryan Stitt 2022-05-17 02:26:47 +00:00
parent 9213e1a796
commit 68ac25d586
6 changed files with 165 additions and 104 deletions

@ -56,7 +56,7 @@ impl fmt::Debug for Web3ProxyApp {
}
impl Web3ProxyApp {
#[instrument(skip_all)]
#[instrument(name = "try_new_Web3ProxyApp", skip_all)]
pub async fn try_new(
chain_id: usize,
balanced_rpcs: Vec<Web3ConnectionConfig>,
@ -119,6 +119,7 @@ impl Web3ProxyApp {
}
};
// TODO: i don't seem to ever see this log. why?
debug!("Forwarding response: {:?}", response);
Ok(warp::reply::json(&response))

@ -43,7 +43,7 @@ pub struct Web3ConnectionConfig {
impl RpcConfig {
/// Create a Web3ProxyApp from config
#[instrument(skip_all)]
#[instrument(name = "try_build_RpcConfig", skip_all)]
pub async fn try_build(self) -> anyhow::Result<Web3ProxyApp> {
let balanced_rpcs = self.balanced_rpcs.into_values().collect();
@ -59,7 +59,7 @@ impl RpcConfig {
impl Web3ConnectionConfig {
/// Create a Web3Connection from config
#[instrument(skip_all)]
#[instrument(name = "try_build_Web3ConnectionConfig", skip_all)]
pub async fn try_build(
self,
clock: &QuantaClock,

@ -7,13 +7,12 @@ use governor::middleware::NoOpMiddleware;
use governor::state::{InMemoryState, NotKeyed};
use governor::NotUntil;
use governor::RateLimiter;
use parking_lot::RwLock;
use std::fmt;
use std::num::NonZeroU32;
use std::sync::atomic::{self, AtomicU32};
use std::time::Duration;
use std::{cmp::Ordering, sync::Arc};
use tokio::time::{interval, sleep, timeout_at, Instant, MissedTickBehavior};
use tokio::sync::RwLock;
use tokio::time::{interval, sleep, timeout_at, Duration, Instant, MissedTickBehavior};
use tracing::{info, instrument, trace, warn};
type Web3RateLimiter =
@ -26,6 +25,38 @@ pub enum Web3Provider {
Ws(ethers::providers::Provider<ethers::providers::Ws>),
}
impl Web3Provider {
#[instrument]
async fn from_str(url_str: &str, http_client: Option<reqwest::Client>) -> anyhow::Result<Self> {
let provider = if url_str.starts_with("http") {
let url: url::Url = url_str.parse()?;
let http_client = http_client.ok_or_else(|| anyhow::anyhow!("no http_client"))?;
let provider = ethers::providers::Http::new_with_client(url, http_client);
// TODO: dry this up (needs https://github.com/gakonst/ethers-rs/issues/592)
ethers::providers::Provider::new(provider)
.interval(Duration::from_secs(1))
.into()
} else if url_str.starts_with("ws") {
// TODO: wrapper automatically reconnect
let provider = ethers::providers::Ws::connect(url_str).await?;
// TODO: make sure this automatically reconnects
// TODO: dry this up (needs https://github.com/gakonst/ethers-rs/issues/592)
ethers::providers::Provider::new(provider)
.interval(Duration::from_secs(1))
.into()
} else {
return Err(anyhow::anyhow!("only http and ws servers are supported"));
};
Ok(provider)
}
}
impl fmt::Debug for Web3Provider {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// TODO: the default Debug takes forever to write. this is too quiet though. we at least need the url
@ -40,7 +71,7 @@ pub struct Web3Connection {
/// keep track of currently open requests. We sort on this
active_requests: AtomicU32,
// TODO: put this in a RwLock so that we can replace it if re-connecting
provider: Web3Provider,
provider: RwLock<Arc<Web3Provider>>,
ratelimiter: Option<Web3RateLimiter>,
/// used for load balancing to the least loaded server
soft_limit: u32,
@ -64,10 +95,30 @@ impl fmt::Display for Web3Connection {
impl Web3Connection {
#[instrument(skip_all)]
async fn reconnect(&self) {}
pub async fn reconnect(
self: &Arc<Self>,
block_sender: &flume::Sender<(u64, H256, Arc<Self>)>,
) -> anyhow::Result<()> {
// websocket doesn't need the http client
let http_client = None;
// since this lock is held open over an await, we use tokio's locking
let mut provider = self.provider.write().await;
// TODO: tell the block subscriber that we are at 0
block_sender
.send_async((0, H256::default(), self.clone()))
.await?;
let new_provider = Web3Provider::from_str(&self.url, http_client).await?;
*provider = Arc::new(new_provider);
Ok(())
}
/// Connect to a web3 rpc and subscribe to new heads
#[instrument(skip_all)]
#[instrument(name = "try_new_Web3Connection", skip(clock, http_client))]
pub async fn try_new(
chain_id: usize,
url_str: String,
@ -88,36 +139,13 @@ impl Web3Connection {
None
};
let provider = if url_str.starts_with("http") {
let url: url::Url = url_str.parse()?;
let http_client = http_client.ok_or_else(|| anyhow::anyhow!("no http_client"))?;
let provider = ethers::providers::Http::new_with_client(url, http_client);
// TODO: dry this up (needs https://github.com/gakonst/ethers-rs/issues/592)
ethers::providers::Provider::new(provider)
.interval(Duration::from_secs(1))
.into()
} else if url_str.starts_with("ws") {
// TODO: wrapper automatically reconnect
let provider = ethers::providers::Ws::connect(url_str.clone()).await?;
// TODO: make sure this automatically reconnects
// TODO: dry this up (needs https://github.com/gakonst/ethers-rs/issues/592)
ethers::providers::Provider::new(provider)
.interval(Duration::from_secs(1))
.into()
} else {
return Err(anyhow::anyhow!("only http and ws servers are supported"));
};
let provider = Web3Provider::from_str(&url_str, http_client).await?;
let connection = Web3Connection {
clock: clock.clone(),
url: url_str.clone(),
active_requests: 0.into(),
provider,
provider: RwLock::new(Arc::new(provider)),
ratelimiter: hard_rate_limiter,
soft_limit,
};
@ -193,94 +221,111 @@ impl Web3Connection {
}
}
/// Subscribe to new blocks
/// Subscribe to new blocks. If `reconnect` is true, this runs forever.
#[instrument(skip_all)]
pub async fn subscribe_new_heads(
self: Arc<Self>,
block_sender: flume::Sender<(u64, H256, Arc<Self>)>,
reconnect: bool,
) -> anyhow::Result<()> {
info!("Watching new_heads on {}", self);
loop {
info!("Watching new_heads on {}", self);
match &self.provider {
Web3Provider::Http(provider) => {
// there is a "watch_blocks" function, but a lot of public nodes do not support the necessary rpc endpoints
// TODO: what should this interval be? probably some fraction of block time. set automatically?
// TODO: maybe it would be better to have one interval for all of the http providers, but this works for now
// TODO: if there are some websocket providers, maybe have a longer interval and a channel that tells the https to update when a websocket gets a new head? if they are slow this wouldn't work well though
let mut interval = interval(Duration::from_secs(2));
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
// TODO: is a RwLock of Arc the right thing here?
let provider = self.provider.read().await.clone();
let mut last_hash = Default::default();
match &*provider {
Web3Provider::Http(provider) => {
// there is a "watch_blocks" function, but a lot of public nodes do not support the necessary rpc endpoints
// TODO: what should this interval be? probably some fraction of block time. set automatically?
// TODO: maybe it would be better to have one interval for all of the http providers, but this works for now
// TODO: if there are some websocket providers, maybe have a longer interval and a channel that tells the https to update when a websocket gets a new head? if they are slow this wouldn't work well though
let mut interval = interval(Duration::from_secs(2));
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
loop {
// wait for the interval
// TODO: if error or rate limit, increase interval?
interval.tick().await;
let mut last_hash = Default::default();
match self.try_request_handle() {
Ok(active_request_handle) => {
// TODO: i feel like this should be easier. there is a provider.getBlock, but i don't know how to give it "latest"
let block: Result<Block<TxHash>, _> = provider
.request("eth_getBlockByNumber", ("latest", false))
.await;
loop {
// wait for the interval
// TODO: if error or rate limit, increase interval?
interval.tick().await;
drop(active_request_handle);
match self.try_request_handle() {
Ok(active_request_handle) => {
// TODO: i feel like this should be easier. there is a provider.getBlock, but i don't know how to give it "latest"
let block: Result<Block<TxHash>, _> = provider
.request("eth_getBlockByNumber", ("latest", false))
.await;
// don't send repeat blocks
if let Ok(block) = &block {
let new_hash = block.hash.unwrap();
drop(active_request_handle);
if new_hash == last_hash {
continue;
// don't send repeat blocks
if let Ok(block) = &block {
let new_hash = block.hash.unwrap();
if new_hash == last_hash {
continue;
}
last_hash = new_hash;
}
last_hash = new_hash;
self.send_block(block, &block_sender).await;
}
Err(e) => {
warn!("Failed getting latest block from {}: {:?}", self, e);
}
self.send_block(block, &block_sender).await;
}
Err(e) => {
warn!("Failed getting latest block from {}: {:?}", self, e);
}
}
}
}
Web3Provider::Ws(provider) => {
// rate limits
let active_request_handle = self.wait_for_request_handle().await;
Web3Provider::Ws(provider) => {
// rate limits
let active_request_handle = self.wait_for_request_handle().await;
// TODO: automatically reconnect?
// TODO: it would be faster to get the block number, but subscriptions don't provide that
// TODO: maybe we can do provider.subscribe("newHeads") and then parse into a custom struct that only gets the number out?
let mut stream = provider.subscribe_blocks().await?;
// TODO: automatically reconnect?
// TODO: it would be faster to get the block number, but subscriptions don't provide that
// TODO: maybe we can do provider.subscribe("newHeads") and then parse into a custom struct that only gets the number out?
let mut stream = provider.subscribe_blocks().await?;
drop(active_request_handle);
let active_request_handle = self.wait_for_request_handle().await;
drop(active_request_handle);
let active_request_handle = self.wait_for_request_handle().await;
// query the block once since the subscription doesn't send the current block
// there is a very small race condition here where the stream could send us a new block right now
// all it does is print "new block" for the same block as current block
// TODO: rate limit!
let block: Result<Block<TxHash>, _> = active_request_handle
.request("eth_getBlockByNumber", ("latest", false))
.await;
// query the block once since the subscription doesn't send the current block
// there is a very small race condition here where the stream could send us a new block right now
// all it does is print "new block" for the same block as current block
// TODO: rate limit!
let block: Result<Block<TxHash>, _> = active_request_handle
.request("eth_getBlockByNumber", ("latest", false))
.await;
self.send_block(block, &block_sender).await;
self.send_block(block, &block_sender).await;
// TODO: what should this timeout be? needs to be larger than worst case block time
// TODO: although reconnects will make this less of an issue
while let Ok(Some(new_block)) =
timeout_at(Instant::now() + Duration::from_secs(300), stream.next()).await
{
self.send_block(Ok(new_block), &block_sender).await;
// TODO: what should this timeout be? needs to be larger than worst case block time
// TODO: although reconnects will make this less of an issue
while let Ok(Some(new_block)) =
timeout_at(Instant::now() + Duration::from_secs(300), stream.next()).await
{
self.send_block(Ok(new_block), &block_sender).await;
}
// TODO: re-connect!
}
}
// TODO: re-connect!
if reconnect {
drop(provider);
// TODO: exponential backoff
warn!("new heads subscription exited. reconnecting in 10 seconds...");
sleep(Duration::from_secs(10)).await;
self.reconnect(&block_sender).await?;
} else {
break;
}
}
info!("Done watching new_heads on {}", self);
Ok(())
}
@ -359,7 +404,9 @@ impl ActiveRequestHandle {
// TODO: it would be nice to have the request id on this
trace!("Sending {}({:?}) to {}", method, params, self.0);
let response = match &self.0.provider {
let provider = self.0.provider.read().await.clone();
let response = match &*provider {
Web3Provider::Http(provider) => provider.request(method, params).await,
Web3Provider::Ws(provider) => provider.request(method, params).await,
};

@ -47,7 +47,7 @@ impl SyncedConnections {
cmp::Ordering::Greater => {
// the rpc's newest block is the new overall best block
if log {
info!("new head block {} from {}", new_block_num, rpc);
info!("new head {} from {}", new_block_num, rpc);
}
self.inner.clear();
@ -60,10 +60,15 @@ impl SyncedConnections {
if new_block_hash != self.head_block_hash {
// same height, but different chain
// TODO: anything else we should do? set some "nextSafeBlockHeight" to delay sending transactions?
// TODO: sometimes a node changes its block. if that happens, a new block is probably right behind this one
if log {
warn!(
"chain is forked at #{}! {} has {:?}. {:?} have {:?}",
new_block_num, rpc, new_block_hash, self.inner, self.head_block_hash
"chain is forked at #{}! {} has {}. {} rpcs have {}",
new_block_num,
rpc,
new_block_hash,
self.inner.len(),
self.head_block_hash
);
}
return;
@ -140,7 +145,7 @@ impl fmt::Debug for Web3Connections {
}
impl Web3Connections {
#[instrument(skip_all)]
#[instrument(name = "try_new_Web3Connections", skip_all)]
pub async fn try_new(
chain_id: usize,
servers: Vec<Web3ConnectionConfig>,
@ -183,9 +188,17 @@ impl Web3Connections {
tokio::spawn(async move {
let url = connection.url().to_string();
// TODO: instead of passing Some(connections), pass Some(channel_sender). Then listen on the receiver below to keep local heads up-to-date
if let Err(e) = connection.subscribe_new_heads(block_sender).await {
warn!("new_heads error on {}: {:?}", url, e);
// loop to automatically reconnect
// TODO: make this cancellable?
loop {
// TODO: instead of passing Some(connections), pass Some(channel_sender). Then listen on the receiver below to keep local heads up-to-date
if let Err(e) = connection
.clone()
.subscribe_new_heads(block_sender.clone(), true)
.await
{
warn!("new_heads error on {}: {:?}", url, e);
}
}
});
}

@ -16,11 +16,11 @@ pub struct JsonRpcRequest {
impl fmt::Debug for JsonRpcRequest {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// TODO: the default formatter takes forever to write. this is too quiet though
// TODO: how should we include params in this? maybe just the length?
f.debug_struct("JsonRpcRequest")
.field("id", &self.id)
.field("method", &self.method)
.field("params", &self.params)
.finish()
.finish_non_exhaustive()
}
}

@ -96,7 +96,7 @@ fn handle_anyhow_errors<T: warp::Reply>(
match res {
Ok(r) => r.into_response(),
Err(e) => {
warn!("Responding with an error: {:?}", e);
warn!("Responding with error: {:?}", e);
let e = JsonRpcForwardedResponse {
jsonrpc: "2.0".to_string(),