From 0b1929a8205a17ba2d8c6c339506559deec7890d Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Mon, 6 Feb 2023 15:20:36 -0800 Subject: [PATCH] add minimal config for quick testing --- config/minimal.toml | 32 +++++++++++++++++++++++ web3_proxy/src/rpcs/blockchain.rs | 42 ++++++++++++++++++------------- web3_proxy/src/rpcs/many.rs | 26 ++++++++----------- 3 files changed, 66 insertions(+), 34 deletions(-) create mode 100644 config/minimal.toml diff --git a/config/minimal.toml b/config/minimal.toml new file mode 100644 index 00000000..2225c9d1 --- /dev/null +++ b/config/minimal.toml @@ -0,0 +1,32 @@ +[app] +chain_id = 1 + +# no database +# no influxdb +# no redis +# no sentry +# no public limits means anon gets full access + +# no thundering herd protection +min_sum_soft_limit = 1 +min_synced_rpcs = 1 + +# 1GB of cache +response_cache_max_bytes = 1_000_000_000 + +[balanced_rpcs] + + [balanced_rpcs.llama_public_wss] + # TODO: what should we do if all rpcs are disabled? warn and wait for a config change? + disabled = false + display_name = "LlamaNodes WSS" + url = "wss://eth.llamarpc.com/" + soft_limit = 1_000 + tier = 0 + + [balanced_rpcs.llama_public_https] + disabled = false + display_name = "LlamaNodes HTTPS" + url = "https://eth.llamarpc.com/" + soft_limit = 1_000 + tier = 0 diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index e5116d6d..f89ec041 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -288,29 +288,34 @@ impl Web3Rpcs { // TODO: this will grow unbounded. prune old heads on this at the same time we prune the graph? let mut connection_heads = ConsensusFinder::default(); - while let Ok((new_block, rpc)) = block_receiver.recv_async().await { - let new_block = new_block.map(Into::into); + loop { + match block_receiver.recv_async().await { + Ok((new_block, rpc)) => { + let new_block = new_block.map(Into::into); - let rpc_name = rpc.name.clone(); + let rpc_name = rpc.name.clone(); - if let Err(err) = self - .process_block_from_rpc( - authorization, - &mut connection_heads, - new_block, - rpc, - &head_block_sender, - &pending_tx_sender, - ) - .await - { - warn!("unable to process block from rpc {}: {:?}", rpc_name, err); + if let Err(err) = self + .process_block_from_rpc( + authorization, + &mut connection_heads, + new_block, + rpc, + &head_block_sender, + &pending_tx_sender, + ) + .await + { + warn!("unable to process block from rpc {}: {:?}", rpc_name, err); + } + } + Err(err) => { + warn!("block_receiver exited! {:#?}", err); + return Err(err.into()); + } } } - // TODO: if there was an error, should we return it instead of an Ok? - warn!("block_receiver exited!"); - Ok(()) } @@ -674,6 +679,7 @@ impl ConnectionsGroup { } else { // i don't think this is an error. i think its just if a reconnect is currently happening warn!("connection missing: {}", rpc_name); + debug!("web3_rpcs.conns: {:#?}", web3_rpcs.conns); } } diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 4222798e..ac88191b 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -12,7 +12,7 @@ use crate::rpcs::transactions::TxStatus; use counter::Counter; use derive_more::From; use ethers::prelude::{ProviderError, TxHash, H256, U64}; -use futures::future::try_join_all; +use futures::future::{join_all, try_join_all}; use futures::stream::FuturesUnordered; use futures::StreamExt; use hashbrown::{HashMap, HashSet}; @@ -89,9 +89,7 @@ impl Web3Rpcs { }; let http_interval_sender = if http_client.is_some() { - let (sender, receiver) = broadcast::channel(1); - - drop(receiver); + let (sender, _) = broadcast::channel(1); // TODO: what interval? follow a websocket also? maybe by watching synced connections with a timeout. will need debounce let mut interval = interval(Duration::from_millis(expected_block_time_ms / 2)); @@ -109,14 +107,10 @@ impl Web3Rpcs { // trace!("http interval ready"); if let Err(_) = sender.send(()) { - // errors are okay. they mean that all receivers have been dropped. that is normal if there are only websocket backend rpcs - // TODO: if error, await a `std::future::pending::<()>`? whats the right thing here? - info!("all http receivers have been dropped"); - break; + // errors are okay. they mean that all receivers have been dropped, or the rpcs just haven't started yet + trace!("no http receivers"); }; } - - info!("http interval sender exited"); } }; @@ -130,11 +124,11 @@ impl Web3Rpcs { // turn configs into connections (in parallel) // TODO: move this into a helper function. then we can use it when configs change (will need a remove function too) - // TODO: futures unordered? - let mut spawn_handles: FuturesUnordered<_> = server_configs + let mut spawn_handles: Vec<_> = server_configs .into_iter() .filter_map(|(server_name, server_config)| { if server_config.disabled { + info!("{} is disabled", server_name); return None; } @@ -152,6 +146,8 @@ impl Web3Rpcs { let pending_tx_id_sender = Some(pending_tx_id_sender.clone()); let block_map = block_map.clone(); + debug!("spawning {}", server_name); + let handle = tokio::spawn(async move { server_config .spawn( @@ -176,8 +172,8 @@ impl Web3Rpcs { let mut connections = HashMap::new(); let mut handles = vec![]; - for x in spawn_handles.next().await { - // TODO: how should we handle errors here? one rpc being down shouldn't cause the program to exit + // TODO: futures unordered? + for x in join_all(spawn_handles).await { match x { Ok(Ok((connection, handle))) => { // web3 connection worked @@ -230,7 +226,6 @@ impl Web3Rpcs { let connections = connections.clone(); tokio::spawn(async move { - // TODO: try_join_all with the other handles here connections .subscribe( authorization, @@ -328,7 +323,6 @@ impl Web3Rpcs { } info!("subscriptions over: {:?}", self); - Ok(()) }