From 0edd11349fae739ceb6cd0001c1a50d6db62e5de Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Mon, 6 Feb 2023 14:13:15 -0800 Subject: [PATCH] web3 rpc spawn should be unordered --- web3_proxy/src/rpcs/many.rs | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index c50bab61..4222798e 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -12,7 +12,7 @@ use crate::rpcs::transactions::TxStatus; use counter::Counter; use derive_more::From; use ethers::prelude::{ProviderError, TxHash, H256, U64}; -use futures::future::{join_all, try_join_all}; +use futures::future::try_join_all; use futures::stream::FuturesUnordered; use futures::StreamExt; use hashbrown::{HashMap, HashSet}; @@ -108,9 +108,15 @@ impl Web3Rpcs { // trace!("http interval ready"); - // errors are okay. they mean that all receivers have been dropped - let _ = sender.send(()); + if let Err(_) = sender.send(()) { + // errors are okay. they mean that all receivers have been dropped. that is normal if there are only websocket backend rpcs + // TODO: if error, await a `std::future::pending::<()>`? whats the right thing here? + info!("all http receivers have been dropped"); + break; + }; } + + info!("http interval sender exited"); } }; @@ -125,7 +131,7 @@ impl Web3Rpcs { // turn configs into connections (in parallel) // TODO: move this into a helper function. then we can use it when configs change (will need a remove function too) // TODO: futures unordered? - let spawn_handles: Vec<_> = server_configs + let mut spawn_handles: FuturesUnordered<_> = server_configs .into_iter() .filter_map(|(server_name, server_config)| { if server_config.disabled { @@ -170,20 +176,21 @@ impl Web3Rpcs { let mut connections = HashMap::new(); let mut handles = vec![]; - // TODO: futures unordered? - for x in join_all(spawn_handles).await { + for x in spawn_handles.next().await { // TODO: how should we handle errors here? one rpc being down shouldn't cause the program to exit match x { Ok(Ok((connection, handle))) => { + // web3 connection worked connections.insert(connection.name.clone(), connection); handles.push(handle); } Ok(Err(err)) => { - // if we got an error here, it is not retryable + // if we got an error here, the app can continue on // TODO: include context about which connection failed error!("Unable to create connection. err={:?}", err); } Err(err) => { + // something actually bad happened. exit with an error return Err(err.into()); } }