From 5fb3298cd0064804724954233a117e416e646777 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Tue, 19 Jul 2022 04:21:32 +0000 Subject: [PATCH] survive an rpc being offline i still want retries on some of these --- web3-proxy/src/app.rs | 8 ++-- web3-proxy/src/config.rs | 10 ++--- web3-proxy/src/connection.rs | 42 +++++++++---------- web3-proxy/src/connections.rs | 77 +++++++++++++++++++++-------------- 4 files changed, 75 insertions(+), 62 deletions(-) diff --git a/web3-proxy/src/app.rs b/web3-proxy/src/app.rs index b126ae8f..df7cad95 100644 --- a/web3-proxy/src/app.rs +++ b/web3-proxy/src/app.rs @@ -346,8 +346,8 @@ impl Web3ProxyApp { let (balanced_rpcs, balanced_handle) = Web3Connections::spawn( app_config.shared.chain_id, balanced_rpcs, - http_client.as_ref(), - redis_client_pool.as_ref(), + http_client.clone(), + redis_client_pool.clone(), Some(head_block_sender), Some(pending_tx_sender.clone()), pending_transactions.clone(), @@ -364,8 +364,8 @@ impl Web3ProxyApp { let (private_rpcs, private_handle) = Web3Connections::spawn( app_config.shared.chain_id, private_rpcs, - http_client.as_ref(), - redis_client_pool.as_ref(), + http_client.clone(), + redis_client_pool.clone(), // subscribing to new heads here won't work well None, // TODO: subscribe to pending transactions on the private rpcs? diff --git a/web3-proxy/src/config.rs b/web3-proxy/src/config.rs index c73ff52c..8d57c061 100644 --- a/web3-proxy/src/config.rs +++ b/web3-proxy/src/config.rs @@ -34,8 +34,8 @@ pub struct AppConfig { /// shared configuration between Web3Connections #[derive(Debug, Deserialize)] pub struct RpcSharedConfig { - /// TODO: what type for chain_id? TODO: this isn't at the right level. this is inside a "Config" - pub chain_id: usize, + // TODO: better type for chain_id? max of `u64::MAX / 2 - 36` https://github.com/ethereum/EIPs/issues/2294 + pub chain_id: u64, pub rate_limit_redis: Option, // TODO: serde default for development? // TODO: allow no limit? @@ -54,9 +54,9 @@ impl Web3ConnectionConfig { // #[instrument(name = "try_build_Web3ConnectionConfig", skip_all)] pub async fn spawn( self, - redis_client_pool: Option<&redis_cell_client::RedisClientPool>, - chain_id: usize, - http_client: Option<&reqwest::Client>, + redis_client_pool: Option, + chain_id: u64, + http_client: Option, http_interval_sender: Option>>, block_sender: Option, Arc)>>, tx_id_sender: Option)>>, diff --git a/web3-proxy/src/connection.rs b/web3-proxy/src/connection.rs index e318319a..135e81aa 100644 --- a/web3-proxy/src/connection.rs +++ b/web3-proxy/src/connection.rs @@ -27,16 +27,11 @@ pub enum Web3Provider { impl Web3Provider { #[instrument] - async fn from_str( - url_str: &str, - http_client: Option<&reqwest::Client>, - ) -> anyhow::Result { + async fn from_str(url_str: &str, http_client: Option) -> anyhow::Result { let provider = if url_str.starts_with("http") { let url: url::Url = url_str.parse()?; - let http_client = http_client - .ok_or_else(|| anyhow::anyhow!("no http_client"))? - .clone(); + let http_client = http_client.ok_or_else(|| anyhow::anyhow!("no http_client"))?; let provider = ethers::providers::Http::new_with_client(url, http_client); @@ -114,9 +109,9 @@ impl fmt::Debug for Web3Connection { let block_data_limit = self.block_data_limit.load(atomic::Ordering::Relaxed); if block_data_limit == u64::MAX { - f.field("limit", &"archive"); + f.field("data", &"archive"); } else { - f.field("limit", &block_data_limit); + f.field("data", &block_data_limit); } f.finish_non_exhaustive() @@ -132,15 +127,15 @@ impl fmt::Display for Web3Connection { impl Web3Connection { /// Connect to a web3 rpc // #[instrument(name = "spawn_Web3Connection", skip(hard_limit, http_client))] - // TODO: have this take a builder (which will have senders attached) + // TODO: have this take a builder (which will have channels attached) #[allow(clippy::too_many_arguments)] pub async fn spawn( - chain_id: usize, + chain_id: u64, url_str: String, // optional because this is only used for http providers. websocket providers don't use it - http_client: Option<&reqwest::Client>, + http_client: Option, http_interval_sender: Option>>, - hard_limit: Option<(u32, &redis_cell_client::RedisClientPool)>, + hard_limit: Option<(u32, redis_cell_client::RedisClientPool)>, // TODO: think more about this type soft_limit: u32, block_sender: Option, Arc)>>, @@ -149,9 +144,10 @@ impl Web3Connection { ) -> anyhow::Result<(Arc, AnyhowJoinHandle<()>)> { let hard_limit = hard_limit.map(|(hard_rate_limit, redis_conection)| { // TODO: allow different max_burst and count_per_period and period + // TODO: if this url rate limits by IP instead of api key, we want to include our public ip in this key let period = 1; RedisCellClient::new( - redis_conection.clone(), + redis_conection, format!("{},{}", chain_id, url_str), hard_rate_limit, hard_rate_limit, @@ -177,7 +173,7 @@ impl Web3Connection { // TODO: move this outside the `new` function and into a `start` function or something. that way we can do retries from there // TODO: some public rpcs (on bsc and fantom) do not return an id and so this ends up being an error // TODO: this will wait forever. do we want that? - let found_chain_id: Result = new_connection + let found_chain_id: Result = new_connection .wait_for_request_handle() .await .request("eth_chainId", Option::None::<()>) @@ -186,19 +182,18 @@ impl Web3Connection { match found_chain_id { Ok(found_chain_id) => { // TODO: there has to be a cleaner way to do this - let found_chain_id = - usize::from_str_radix(found_chain_id.trim_start_matches("0x"), 16).unwrap(); - - if chain_id != found_chain_id { + if chain_id != found_chain_id.as_u64() { return Err(anyhow::anyhow!( "incorrect chain id! Expected {}. Found {}", chain_id, found_chain_id - )); + ) + .context(format!("failed spawning {}", new_connection))); } } Err(e) => { - let e = anyhow::Error::from(e).context(format!("{}", new_connection)); + let e = + anyhow::Error::from(e).context(format!("failed spawning {}", new_connection)); return Err(e); } } @@ -225,7 +220,7 @@ impl Web3Connection { for block_data_limit in [u64::MAX, 90_000, 128, 64, 32] { let mut head_block_num = new_connection.head_block.read().1; - // TODO: wait until head block is set outside the loop? + // TODO: wait until head block is set outside the loop? if we disconnect while starting we could actually get 0 though while head_block_num == U64::zero() { info!(?new_connection, "no head block"); @@ -235,8 +230,9 @@ impl Web3Connection { head_block_num = new_connection.head_block.read().1; } + // TODO: subtract 1 from block_data_limit for safety? let maybe_archive_block = head_block_num - .saturating_sub(block_data_limit.into()) + .saturating_sub((block_data_limit).into()) .max(U64::one()); let archive_result: Result = new_connection diff --git a/web3-proxy/src/connections.rs b/web3-proxy/src/connections.rs index 7f1de3dc..c2e3ddaa 100644 --- a/web3-proxy/src/connections.rs +++ b/web3-proxy/src/connections.rs @@ -5,7 +5,7 @@ use counter::Counter; use dashmap::DashMap; use derive_more::From; use ethers::prelude::{Block, ProviderError, Transaction, TxHash, H256, U64}; -use futures::future::try_join_all; +use futures::future::{join_all, try_join_all}; use futures::stream::FuturesUnordered; use futures::StreamExt; use hashbrown::HashMap; @@ -94,20 +94,14 @@ impl fmt::Debug for Web3Connections { impl Web3Connections { // #[instrument(name = "spawn_Web3Connections", skip_all)] pub async fn spawn( - chain_id: usize, + chain_id: u64, server_configs: Vec, - http_client: Option<&reqwest::Client>, - redis_client_pool: Option<&redis_cell_client::RedisClientPool>, + http_client: Option, + redis_client_pool: Option, head_block_sender: Option>>, pending_tx_sender: Option>, pending_transactions: Arc>, ) -> anyhow::Result<(Arc, AnyhowJoinHandle<()>)> { - let num_connections = server_configs.len(); - - // TODO: try_join_all - let mut handles = vec![]; - - // TODO: only create these if head_block_sender and pending_tx_sender are set let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded(); let (block_sender, block_receiver) = flume::unbounded(); @@ -116,7 +110,7 @@ impl Web3Connections { drop(receiver); - // TODO: what interval? follow a websocket instead? + // TODO: what interval? follow a websocket also? maybe by watching synced connections with a timeout. will need debounce let mut interval = interval(Duration::from_secs(13)); interval.set_missed_tick_behavior(MissedTickBehavior::Delay); @@ -146,26 +140,49 @@ impl Web3Connections { None }; - // turn configs into connections - let mut connections = Vec::with_capacity(num_connections); - for server_config in server_configs.into_iter() { - match server_config - .spawn( - redis_client_pool, - chain_id, - http_client, - http_interval_sender.clone(), - Some(block_sender.clone()), - Some(pending_tx_id_sender.clone()), - ) - .await - { - Ok((connection, connection_handle)) => { - handles.push(flatten_handle(connection_handle)); - connections.push(connection) + // turn configs into connections (in parallel) + let spawn_handles: Vec<_> = server_configs + .into_iter() + .map(|server_config| { + let http_client = http_client.clone(); + let redis_client_pool = redis_client_pool.clone(); + let http_interval_sender = http_interval_sender.clone(); + let block_sender = Some(block_sender.clone()); + let pending_tx_id_sender = Some(pending_tx_id_sender.clone()); + + tokio::spawn(async move { + server_config + .spawn( + redis_client_pool, + chain_id, + http_client, + http_interval_sender, + block_sender, + pending_tx_id_sender, + ) + .await + }) + }) + .collect(); + + let mut connections = vec![]; + let mut handles = vec![]; + + // TODO: futures unordered? + for x in join_all(spawn_handles).await { + // TODO: how should we handle errors here? one rpc being down shouldn't cause the program to exit + match x { + Ok(Ok((connection, handle))) => { + connections.push(connection); + handles.push(handle); + } + Ok(Err(err)) => { + // TODO: some of these are probably retry-able + error!(?err); + } + Err(err) => { + return Err(err.into()); } - // TODO: include the server url in this - Err(e) => warn!("Unable to connect to a server! {:?}", e), } }