use select_all more

This commit is contained in:
Bryan Stitt 2023-10-12 19:23:22 -07:00
parent 6bd5ca3277
commit 9786fb58b2
5 changed files with 35 additions and 52 deletions

@ -169,6 +169,8 @@ USER llama
ENTRYPOINT ["web3_proxy_cli"]
CMD [ "--config", "/web3-proxy.toml", "proxyd" ]
ENV PATH "/root/.cargo/bin:${PATH}"
# TODO: lower log level when done with prototyping
ENV RUST_LOG "warn,ethers_providers::rpc=off,web3_proxy=debug,web3_proxy::rpcs::consensus=info,web3_proxy_cli=debug"

@ -27,7 +27,7 @@ use ethers::core::utils::keccak256;
use ethers::prelude::{Address, Bytes, Transaction, TxHash, H256, U256, U64};
use ethers::utils::rlp::{Decodable, Rlp};
use futures::future::join_all;
use futures::stream::{FuturesUnordered, StreamExt};
use futures::stream::FuturesUnordered;
use hashbrown::{HashMap, HashSet};
use migration::sea_orm::{EntityTrait, PaginatorTrait};
use moka::future::{Cache, CacheBuilder};
@ -134,30 +134,6 @@ pub struct App {
internal_provider: OnceCell<Arc<EthersHttpProvider>>,
}
/// flatten a JoinError into an anyhow error
/// Useful when joining multiple futures.
pub async fn flatten_handle<T>(handle: Web3ProxyJoinHandle<T>) -> Web3ProxyResult<T> {
match handle.await {
Ok(Ok(result)) => Ok(result),
Ok(Err(err)) => Err(err),
Err(err) => Err(err.into()),
}
}
/// return the first error, or Ok if everything worked
pub async fn flatten_handles<T>(
mut handles: FuturesUnordered<Web3ProxyJoinHandle<T>>,
) -> Web3ProxyResult<()> {
while let Some(x) = handles.next().await {
match x {
Err(e) => return Err(e.into()),
Ok(Err(e)) => return Err(e),
Ok(Ok(_)) => continue,
}
}
Ok(())
}
/// starting an app creates many tasks
pub struct Web3ProxyAppSpawn {
/// the app. probably clone this to use in other groups of handles

@ -9,8 +9,8 @@ use async_stream::stream;
use base64::engine::general_purpose;
use derive_more::Constructor;
use ethers::prelude::{H256, U64};
use futures::stream::FuturesUnordered;
use futures::{Stream, StreamExt};
use futures::future::select_all;
use futures::Stream;
use hashbrown::{HashMap, HashSet};
use hdrhistogram::serialization::{Serializer, V2DeflateSerializer};
use hdrhistogram::Histogram;
@ -878,7 +878,7 @@ impl RpcsForRequest {
}
let mut earliest_retry_at = None;
let mut wait_for_sync = FuturesUnordered::new();
let mut wait_for_sync = Vec::new();
// TODO: we used to do a neat power of 2 random choices here, but it had bugs. bring that back
for best_rpc in self.inner.iter() {
@ -938,26 +938,21 @@ impl RpcsForRequest {
sleep_until(retry_at).await;
} else {
select!{
x = wait_for_sync.next() => {
(x, _, _) = select_all(wait_for_sync) => {
match x {
Some(Ok(rpc)) => {
Ok(rpc) => {
trace!(%rpc, "rpc ready. it might be used on the next loop");
// TODO: i don't think this sleep should be necessary. but i just want the cpus to cool down
sleep_until(min_wait_until).await;
},
Some(Err(err)) => {
Err(err) => {
error!(?err, "problem while waiting for an rpc for a request");
// TODO: break or continue?
// TODO: i don't think this sleep should be necessary. but i just want the cpus to cool down
sleep_until(min_wait_until).await;
},
None => {
// this would only happen if we got to the end of wait_for_sync. but we stop after the first result
warn!("wait_for_sync is empty. how'd this happen?");
break;
}
}
},
_ = sleep_until(retry_at) => {

@ -2,7 +2,7 @@
use super::blockchain::{ArcBlock, BlocksByHashCache, Web3ProxyBlock};
use super::provider::{connect_ws, EthersWsProvider};
use super::request::{OpenRequestHandle, OpenRequestResult};
use crate::app::{flatten_handle, Web3ProxyJoinHandle};
use crate::app::Web3ProxyJoinHandle;
use crate::config::{BlockAndRpc, Web3RpcConfig};
use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult};
use crate::jsonrpc::ValidatedRequest;
@ -12,7 +12,7 @@ use anyhow::{anyhow, Context};
use arc_swap::ArcSwapOption;
use deduped_broadcast::DedupedBroadcaster;
use ethers::prelude::{Address, Bytes, Middleware, Transaction, TxHash, U256, U64};
use futures::stream::FuturesUnordered;
use futures::future::select_all;
use futures::StreamExt;
use latency::{EwmaLatency, PeakEwmaLatency, RollingQuantileLatency};
use migration::sea_orm::DatabaseConnection;
@ -738,7 +738,7 @@ impl Web3Rpc {
.await
.web3_context("failed check_provider")?;
let mut futures = FuturesUnordered::new();
let mut futures = Vec::new();
// TODO: use this channel instead of self.disconnect_watch
let (subscribe_stop_tx, mut subscribe_stop_rx) = watch::channel(false);
@ -760,7 +760,7 @@ impl Web3Rpc {
Ok(())
};
futures.push(flatten_handle(tokio::spawn(f)));
futures.push(tokio::spawn(f));
} else {
unimplemented!("there should always be a disconnect watch!");
}
@ -851,7 +851,7 @@ impl Web3Rpc {
self.healthy.store(initial_check, atomic::Ordering::Relaxed);
futures.push(flatten_handle(tokio::spawn(f)));
futures.push(tokio::spawn(f));
} else {
self.healthy.store(true, atomic::Ordering::Relaxed);
}
@ -876,7 +876,7 @@ impl Web3Rpc {
.await
};
futures.push(flatten_handle(tokio::spawn(f)));
futures.push(tokio::spawn(f));
}
// subscribe to new transactions
@ -898,12 +898,12 @@ impl Web3Rpc {
.await
};
futures.push(flatten_handle(tokio::spawn(f)));
futures.push(tokio::spawn(f));
}
// exit if any of the futures exit
// TODO: have an enum for which one exited?
let first_exit = futures.next().await;
let (first_exit, _, _) = select_all(futures).await;
debug!(?first_exit, "subscriptions on {} exited", self);

@ -4,7 +4,7 @@ use std::sync::Arc;
use std::time::Duration;
use std::{fs, thread};
use tracing::{error, info, trace, warn};
use web3_proxy::app::{flatten_handle, App};
use web3_proxy::app::App;
use web3_proxy::config::TopConfig;
use web3_proxy::globals::global_db_conn;
use web3_proxy::prelude::anyhow;
@ -145,7 +145,7 @@ impl ProxydSubCommand {
// TODO: wait for SIGHUP instead?
// TODO: wait for file to change instead of polling. file notifications are really fragile depending on the system and setup though
thread::sleep(Duration::from_secs(10));
thread::sleep(Duration::from_secs(30));
});
}
}
@ -227,23 +227,33 @@ impl ProxydSubCommand {
// }
// }
// }
x = flatten_handle(frontend_handle) => {
x = frontend_handle => {
frontend_exited = true;
match x {
Ok(_) => info!("frontend exited"),
Err(e) => {
Ok(Ok(_)) => info!("frontend exited"),
Ok(Err(e)) => {
error!("frontend exited: {:#?}", e);
exited_with_err = true;
}
Err(e) => {
error!(?e, "join on frontend failed");
exited_with_err = true;
}
}
}
x = flatten_handle(prometheus_handle) => {
x = prometheus_handle => {
match x {
Ok(_) => info!("prometheus exited"),
Err(e) => {
Ok(Ok(_)) => info!("prometheus exited"),
Ok(Err(e)) => {
error!("prometheus exited: {:#?}", e);
exited_with_err = true;
}
Err(e) => {
error!(?e, "join on prometheus failed");
exited_with_err = true;
}
}
}
x = tokio::signal::ctrl_c() => {