From 68ac25d586de907d136ad9596dabb39bd2743946 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Tue, 17 May 2022 02:26:47 +0000 Subject: [PATCH] automatically reconnect --- web3-proxy/src/app.rs | 3 +- web3-proxy/src/config.rs | 4 +- web3-proxy/src/connection.rs | 229 ++++++++++++++++++++-------------- web3-proxy/src/connections.rs | 27 ++-- web3-proxy/src/jsonrpc.rs | 4 +- web3-proxy/src/main.rs | 2 +- 6 files changed, 165 insertions(+), 104 deletions(-) diff --git a/web3-proxy/src/app.rs b/web3-proxy/src/app.rs index b50ef5a7..e83b5e7f 100644 --- a/web3-proxy/src/app.rs +++ b/web3-proxy/src/app.rs @@ -56,7 +56,7 @@ impl fmt::Debug for Web3ProxyApp { } impl Web3ProxyApp { - #[instrument(skip_all)] + #[instrument(name = "try_new_Web3ProxyApp", skip_all)] pub async fn try_new( chain_id: usize, balanced_rpcs: Vec, @@ -119,6 +119,7 @@ impl Web3ProxyApp { } }; + // TODO: i don't seem to ever see this log. why? debug!("Forwarding response: {:?}", response); Ok(warp::reply::json(&response)) diff --git a/web3-proxy/src/config.rs b/web3-proxy/src/config.rs index a88058d3..24b9787a 100644 --- a/web3-proxy/src/config.rs +++ b/web3-proxy/src/config.rs @@ -43,7 +43,7 @@ pub struct Web3ConnectionConfig { impl RpcConfig { /// Create a Web3ProxyApp from config - #[instrument(skip_all)] + #[instrument(name = "try_build_RpcConfig", skip_all)] pub async fn try_build(self) -> anyhow::Result { let balanced_rpcs = self.balanced_rpcs.into_values().collect(); @@ -59,7 +59,7 @@ impl RpcConfig { impl Web3ConnectionConfig { /// Create a Web3Connection from config - #[instrument(skip_all)] + #[instrument(name = "try_build_Web3ConnectionConfig", skip_all)] pub async fn try_build( self, clock: &QuantaClock, diff --git a/web3-proxy/src/connection.rs b/web3-proxy/src/connection.rs index 3553c42a..a853f5a8 100644 --- a/web3-proxy/src/connection.rs +++ b/web3-proxy/src/connection.rs @@ -7,13 +7,12 @@ use governor::middleware::NoOpMiddleware; use governor::state::{InMemoryState, NotKeyed}; use governor::NotUntil; use governor::RateLimiter; -use parking_lot::RwLock; use std::fmt; use std::num::NonZeroU32; use std::sync::atomic::{self, AtomicU32}; -use std::time::Duration; use std::{cmp::Ordering, sync::Arc}; -use tokio::time::{interval, sleep, timeout_at, Instant, MissedTickBehavior}; +use tokio::sync::RwLock; +use tokio::time::{interval, sleep, timeout_at, Duration, Instant, MissedTickBehavior}; use tracing::{info, instrument, trace, warn}; type Web3RateLimiter = @@ -26,6 +25,38 @@ pub enum Web3Provider { Ws(ethers::providers::Provider), } +impl Web3Provider { + #[instrument] + async fn from_str(url_str: &str, http_client: Option) -> anyhow::Result { + let provider = if url_str.starts_with("http") { + let url: url::Url = url_str.parse()?; + + let http_client = http_client.ok_or_else(|| anyhow::anyhow!("no http_client"))?; + + let provider = ethers::providers::Http::new_with_client(url, http_client); + + // TODO: dry this up (needs https://github.com/gakonst/ethers-rs/issues/592) + ethers::providers::Provider::new(provider) + .interval(Duration::from_secs(1)) + .into() + } else if url_str.starts_with("ws") { + // TODO: wrapper automatically reconnect + let provider = ethers::providers::Ws::connect(url_str).await?; + + // TODO: make sure this automatically reconnects + + // TODO: dry this up (needs https://github.com/gakonst/ethers-rs/issues/592) + ethers::providers::Provider::new(provider) + .interval(Duration::from_secs(1)) + .into() + } else { + return Err(anyhow::anyhow!("only http and ws servers are supported")); + }; + + Ok(provider) + } +} + impl fmt::Debug for Web3Provider { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { // TODO: the default Debug takes forever to write. this is too quiet though. we at least need the url @@ -40,7 +71,7 @@ pub struct Web3Connection { /// keep track of currently open requests. We sort on this active_requests: AtomicU32, // TODO: put this in a RwLock so that we can replace it if re-connecting - provider: Web3Provider, + provider: RwLock>, ratelimiter: Option, /// used for load balancing to the least loaded server soft_limit: u32, @@ -64,10 +95,30 @@ impl fmt::Display for Web3Connection { impl Web3Connection { #[instrument(skip_all)] - async fn reconnect(&self) {} + pub async fn reconnect( + self: &Arc, + block_sender: &flume::Sender<(u64, H256, Arc)>, + ) -> anyhow::Result<()> { + // websocket doesn't need the http client + let http_client = None; + + // since this lock is held open over an await, we use tokio's locking + let mut provider = self.provider.write().await; + + // TODO: tell the block subscriber that we are at 0 + block_sender + .send_async((0, H256::default(), self.clone())) + .await?; + + let new_provider = Web3Provider::from_str(&self.url, http_client).await?; + + *provider = Arc::new(new_provider); + + Ok(()) + } /// Connect to a web3 rpc and subscribe to new heads - #[instrument(skip_all)] + #[instrument(name = "try_new_Web3Connection", skip(clock, http_client))] pub async fn try_new( chain_id: usize, url_str: String, @@ -88,36 +139,13 @@ impl Web3Connection { None }; - let provider = if url_str.starts_with("http") { - let url: url::Url = url_str.parse()?; - - let http_client = http_client.ok_or_else(|| anyhow::anyhow!("no http_client"))?; - - let provider = ethers::providers::Http::new_with_client(url, http_client); - - // TODO: dry this up (needs https://github.com/gakonst/ethers-rs/issues/592) - ethers::providers::Provider::new(provider) - .interval(Duration::from_secs(1)) - .into() - } else if url_str.starts_with("ws") { - // TODO: wrapper automatically reconnect - let provider = ethers::providers::Ws::connect(url_str.clone()).await?; - - // TODO: make sure this automatically reconnects - - // TODO: dry this up (needs https://github.com/gakonst/ethers-rs/issues/592) - ethers::providers::Provider::new(provider) - .interval(Duration::from_secs(1)) - .into() - } else { - return Err(anyhow::anyhow!("only http and ws servers are supported")); - }; + let provider = Web3Provider::from_str(&url_str, http_client).await?; let connection = Web3Connection { clock: clock.clone(), url: url_str.clone(), active_requests: 0.into(), - provider, + provider: RwLock::new(Arc::new(provider)), ratelimiter: hard_rate_limiter, soft_limit, }; @@ -193,94 +221,111 @@ impl Web3Connection { } } - /// Subscribe to new blocks + /// Subscribe to new blocks. If `reconnect` is true, this runs forever. #[instrument(skip_all)] pub async fn subscribe_new_heads( self: Arc, block_sender: flume::Sender<(u64, H256, Arc)>, + reconnect: bool, ) -> anyhow::Result<()> { - info!("Watching new_heads on {}", self); + loop { + info!("Watching new_heads on {}", self); - match &self.provider { - Web3Provider::Http(provider) => { - // there is a "watch_blocks" function, but a lot of public nodes do not support the necessary rpc endpoints - // TODO: what should this interval be? probably some fraction of block time. set automatically? - // TODO: maybe it would be better to have one interval for all of the http providers, but this works for now - // TODO: if there are some websocket providers, maybe have a longer interval and a channel that tells the https to update when a websocket gets a new head? if they are slow this wouldn't work well though - let mut interval = interval(Duration::from_secs(2)); - interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + // TODO: is a RwLock of Arc the right thing here? + let provider = self.provider.read().await.clone(); - let mut last_hash = Default::default(); + match &*provider { + Web3Provider::Http(provider) => { + // there is a "watch_blocks" function, but a lot of public nodes do not support the necessary rpc endpoints + // TODO: what should this interval be? probably some fraction of block time. set automatically? + // TODO: maybe it would be better to have one interval for all of the http providers, but this works for now + // TODO: if there are some websocket providers, maybe have a longer interval and a channel that tells the https to update when a websocket gets a new head? if they are slow this wouldn't work well though + let mut interval = interval(Duration::from_secs(2)); + interval.set_missed_tick_behavior(MissedTickBehavior::Delay); - loop { - // wait for the interval - // TODO: if error or rate limit, increase interval? - interval.tick().await; + let mut last_hash = Default::default(); - match self.try_request_handle() { - Ok(active_request_handle) => { - // TODO: i feel like this should be easier. there is a provider.getBlock, but i don't know how to give it "latest" - let block: Result, _> = provider - .request("eth_getBlockByNumber", ("latest", false)) - .await; + loop { + // wait for the interval + // TODO: if error or rate limit, increase interval? + interval.tick().await; - drop(active_request_handle); + match self.try_request_handle() { + Ok(active_request_handle) => { + // TODO: i feel like this should be easier. there is a provider.getBlock, but i don't know how to give it "latest" + let block: Result, _> = provider + .request("eth_getBlockByNumber", ("latest", false)) + .await; - // don't send repeat blocks - if let Ok(block) = &block { - let new_hash = block.hash.unwrap(); + drop(active_request_handle); - if new_hash == last_hash { - continue; + // don't send repeat blocks + if let Ok(block) = &block { + let new_hash = block.hash.unwrap(); + + if new_hash == last_hash { + continue; + } + + last_hash = new_hash; } - last_hash = new_hash; + self.send_block(block, &block_sender).await; + } + Err(e) => { + warn!("Failed getting latest block from {}: {:?}", self, e); } - - self.send_block(block, &block_sender).await; - } - Err(e) => { - warn!("Failed getting latest block from {}: {:?}", self, e); } } } - } - Web3Provider::Ws(provider) => { - // rate limits - let active_request_handle = self.wait_for_request_handle().await; + Web3Provider::Ws(provider) => { + // rate limits + let active_request_handle = self.wait_for_request_handle().await; - // TODO: automatically reconnect? - // TODO: it would be faster to get the block number, but subscriptions don't provide that - // TODO: maybe we can do provider.subscribe("newHeads") and then parse into a custom struct that only gets the number out? - let mut stream = provider.subscribe_blocks().await?; + // TODO: automatically reconnect? + // TODO: it would be faster to get the block number, but subscriptions don't provide that + // TODO: maybe we can do provider.subscribe("newHeads") and then parse into a custom struct that only gets the number out? + let mut stream = provider.subscribe_blocks().await?; - drop(active_request_handle); - let active_request_handle = self.wait_for_request_handle().await; + drop(active_request_handle); + let active_request_handle = self.wait_for_request_handle().await; - // query the block once since the subscription doesn't send the current block - // there is a very small race condition here where the stream could send us a new block right now - // all it does is print "new block" for the same block as current block - // TODO: rate limit! - let block: Result, _> = active_request_handle - .request("eth_getBlockByNumber", ("latest", false)) - .await; + // query the block once since the subscription doesn't send the current block + // there is a very small race condition here where the stream could send us a new block right now + // all it does is print "new block" for the same block as current block + // TODO: rate limit! + let block: Result, _> = active_request_handle + .request("eth_getBlockByNumber", ("latest", false)) + .await; - self.send_block(block, &block_sender).await; + self.send_block(block, &block_sender).await; - // TODO: what should this timeout be? needs to be larger than worst case block time - // TODO: although reconnects will make this less of an issue - while let Ok(Some(new_block)) = - timeout_at(Instant::now() + Duration::from_secs(300), stream.next()).await - { - self.send_block(Ok(new_block), &block_sender).await; + // TODO: what should this timeout be? needs to be larger than worst case block time + // TODO: although reconnects will make this less of an issue + while let Ok(Some(new_block)) = + timeout_at(Instant::now() + Duration::from_secs(300), stream.next()).await + { + self.send_block(Ok(new_block), &block_sender).await; + } + + // TODO: re-connect! } + } - // TODO: re-connect! + if reconnect { + drop(provider); + + // TODO: exponential backoff + warn!("new heads subscription exited. reconnecting in 10 seconds..."); + sleep(Duration::from_secs(10)).await; + + self.reconnect(&block_sender).await?; + } else { + break; } } info!("Done watching new_heads on {}", self); - Ok(()) } @@ -359,7 +404,9 @@ impl ActiveRequestHandle { // TODO: it would be nice to have the request id on this trace!("Sending {}({:?}) to {}", method, params, self.0); - let response = match &self.0.provider { + let provider = self.0.provider.read().await.clone(); + + let response = match &*provider { Web3Provider::Http(provider) => provider.request(method, params).await, Web3Provider::Ws(provider) => provider.request(method, params).await, }; diff --git a/web3-proxy/src/connections.rs b/web3-proxy/src/connections.rs index eb3e3d2a..7c512c28 100644 --- a/web3-proxy/src/connections.rs +++ b/web3-proxy/src/connections.rs @@ -47,7 +47,7 @@ impl SyncedConnections { cmp::Ordering::Greater => { // the rpc's newest block is the new overall best block if log { - info!("new head block {} from {}", new_block_num, rpc); + info!("new head {} from {}", new_block_num, rpc); } self.inner.clear(); @@ -60,10 +60,15 @@ impl SyncedConnections { if new_block_hash != self.head_block_hash { // same height, but different chain // TODO: anything else we should do? set some "nextSafeBlockHeight" to delay sending transactions? + // TODO: sometimes a node changes its block. if that happens, a new block is probably right behind this one if log { warn!( - "chain is forked at #{}! {} has {:?}. {:?} have {:?}", - new_block_num, rpc, new_block_hash, self.inner, self.head_block_hash + "chain is forked at #{}! {} has {}. {} rpcs have {}", + new_block_num, + rpc, + new_block_hash, + self.inner.len(), + self.head_block_hash ); } return; @@ -140,7 +145,7 @@ impl fmt::Debug for Web3Connections { } impl Web3Connections { - #[instrument(skip_all)] + #[instrument(name = "try_new_Web3Connections", skip_all)] pub async fn try_new( chain_id: usize, servers: Vec, @@ -183,9 +188,17 @@ impl Web3Connections { tokio::spawn(async move { let url = connection.url().to_string(); - // TODO: instead of passing Some(connections), pass Some(channel_sender). Then listen on the receiver below to keep local heads up-to-date - if let Err(e) = connection.subscribe_new_heads(block_sender).await { - warn!("new_heads error on {}: {:?}", url, e); + // loop to automatically reconnect + // TODO: make this cancellable? + loop { + // TODO: instead of passing Some(connections), pass Some(channel_sender). Then listen on the receiver below to keep local heads up-to-date + if let Err(e) = connection + .clone() + .subscribe_new_heads(block_sender.clone(), true) + .await + { + warn!("new_heads error on {}: {:?}", url, e); + } } }); } diff --git a/web3-proxy/src/jsonrpc.rs b/web3-proxy/src/jsonrpc.rs index 43956964..59802b77 100644 --- a/web3-proxy/src/jsonrpc.rs +++ b/web3-proxy/src/jsonrpc.rs @@ -16,11 +16,11 @@ pub struct JsonRpcRequest { impl fmt::Debug for JsonRpcRequest { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { // TODO: the default formatter takes forever to write. this is too quiet though + // TODO: how should we include params in this? maybe just the length? f.debug_struct("JsonRpcRequest") .field("id", &self.id) .field("method", &self.method) - .field("params", &self.params) - .finish() + .finish_non_exhaustive() } } diff --git a/web3-proxy/src/main.rs b/web3-proxy/src/main.rs index 79f693c3..cee3b760 100644 --- a/web3-proxy/src/main.rs +++ b/web3-proxy/src/main.rs @@ -96,7 +96,7 @@ fn handle_anyhow_errors( match res { Ok(r) => r.into_response(), Err(e) => { - warn!("Responding with an error: {:?}", e); + warn!("Responding with error: {:?}", e); let e = JsonRpcForwardedResponse { jsonrpc: "2.0".to_string(),