web3 rpc spawn should be unordered

This commit is contained in:
Bryan Stitt 2023-02-06 14:13:15 -08:00
parent 19e51ce328
commit 0edd11349f

View File

@ -12,7 +12,7 @@ use crate::rpcs::transactions::TxStatus;
use counter::Counter; use counter::Counter;
use derive_more::From; use derive_more::From;
use ethers::prelude::{ProviderError, TxHash, H256, U64}; use ethers::prelude::{ProviderError, TxHash, H256, U64};
use futures::future::{join_all, try_join_all}; use futures::future::try_join_all;
use futures::stream::FuturesUnordered; use futures::stream::FuturesUnordered;
use futures::StreamExt; use futures::StreamExt;
use hashbrown::{HashMap, HashSet}; use hashbrown::{HashMap, HashSet};
@ -108,9 +108,15 @@ impl Web3Rpcs {
// trace!("http interval ready"); // trace!("http interval ready");
// errors are okay. they mean that all receivers have been dropped if let Err(_) = sender.send(()) {
let _ = sender.send(()); // errors are okay. they mean that all receivers have been dropped. that is normal if there are only websocket backend rpcs
// TODO: if error, await a `std::future::pending::<()>`? whats the right thing here?
info!("all http receivers have been dropped");
break;
};
} }
info!("http interval sender exited");
} }
}; };
@ -125,7 +131,7 @@ impl Web3Rpcs {
// turn configs into connections (in parallel) // turn configs into connections (in parallel)
// TODO: move this into a helper function. then we can use it when configs change (will need a remove function too) // TODO: move this into a helper function. then we can use it when configs change (will need a remove function too)
// TODO: futures unordered? // TODO: futures unordered?
let spawn_handles: Vec<_> = server_configs let mut spawn_handles: FuturesUnordered<_> = server_configs
.into_iter() .into_iter()
.filter_map(|(server_name, server_config)| { .filter_map(|(server_name, server_config)| {
if server_config.disabled { if server_config.disabled {
@ -170,20 +176,21 @@ impl Web3Rpcs {
let mut connections = HashMap::new(); let mut connections = HashMap::new();
let mut handles = vec![]; let mut handles = vec![];
// TODO: futures unordered? for x in spawn_handles.next().await {
for x in join_all(spawn_handles).await {
// TODO: how should we handle errors here? one rpc being down shouldn't cause the program to exit // TODO: how should we handle errors here? one rpc being down shouldn't cause the program to exit
match x { match x {
Ok(Ok((connection, handle))) => { Ok(Ok((connection, handle))) => {
// web3 connection worked
connections.insert(connection.name.clone(), connection); connections.insert(connection.name.clone(), connection);
handles.push(handle); handles.push(handle);
} }
Ok(Err(err)) => { Ok(Err(err)) => {
// if we got an error here, it is not retryable // if we got an error here, the app can continue on
// TODO: include context about which connection failed // TODO: include context about which connection failed
error!("Unable to create connection. err={:?}", err); error!("Unable to create connection. err={:?}", err);
} }
Err(err) => { Err(err) => {
// something actually bad happened. exit with an error
return Err(err.into()); return Err(err.into());
} }
} }