From 9786fb58b2c9281ce32b42bfcd67ba68d49ff189 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Thu, 12 Oct 2023 19:23:22 -0700 Subject: [PATCH] use select_all more --- Dockerfile | 2 ++ web3_proxy/src/app/mod.rs | 26 +---------------------- web3_proxy/src/rpcs/consensus.rs | 17 ++++++--------- web3_proxy/src/rpcs/one.rs | 16 +++++++------- web3_proxy_cli/src/sub_commands/proxyd.rs | 26 ++++++++++++++++------- 5 files changed, 35 insertions(+), 52 deletions(-) diff --git a/Dockerfile b/Dockerfile index 96dd708e..b8da4f01 100644 --- a/Dockerfile +++ b/Dockerfile @@ -169,6 +169,8 @@ USER llama ENTRYPOINT ["web3_proxy_cli"] CMD [ "--config", "/web3-proxy.toml", "proxyd" ] +ENV PATH "/root/.cargo/bin:${PATH}" + # TODO: lower log level when done with prototyping ENV RUST_LOG "warn,ethers_providers::rpc=off,web3_proxy=debug,web3_proxy::rpcs::consensus=info,web3_proxy_cli=debug" diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 047b56fa..fe9033d6 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -27,7 +27,7 @@ use ethers::core::utils::keccak256; use ethers::prelude::{Address, Bytes, Transaction, TxHash, H256, U256, U64}; use ethers::utils::rlp::{Decodable, Rlp}; use futures::future::join_all; -use futures::stream::{FuturesUnordered, StreamExt}; +use futures::stream::FuturesUnordered; use hashbrown::{HashMap, HashSet}; use migration::sea_orm::{EntityTrait, PaginatorTrait}; use moka::future::{Cache, CacheBuilder}; @@ -134,30 +134,6 @@ pub struct App { internal_provider: OnceCell>, } -/// flatten a JoinError into an anyhow error -/// Useful when joining multiple futures. -pub async fn flatten_handle(handle: Web3ProxyJoinHandle) -> Web3ProxyResult { - match handle.await { - Ok(Ok(result)) => Ok(result), - Ok(Err(err)) => Err(err), - Err(err) => Err(err.into()), - } -} - -/// return the first error, or Ok if everything worked -pub async fn flatten_handles( - mut handles: FuturesUnordered>, -) -> Web3ProxyResult<()> { - while let Some(x) = handles.next().await { - match x { - Err(e) => return Err(e.into()), - Ok(Err(e)) => return Err(e), - Ok(Ok(_)) => continue, - } - } - Ok(()) -} - /// starting an app creates many tasks pub struct Web3ProxyAppSpawn { /// the app. probably clone this to use in other groups of handles diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index cacd2691..cee32503 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -9,8 +9,8 @@ use async_stream::stream; use base64::engine::general_purpose; use derive_more::Constructor; use ethers::prelude::{H256, U64}; -use futures::stream::FuturesUnordered; -use futures::{Stream, StreamExt}; +use futures::future::select_all; +use futures::Stream; use hashbrown::{HashMap, HashSet}; use hdrhistogram::serialization::{Serializer, V2DeflateSerializer}; use hdrhistogram::Histogram; @@ -878,7 +878,7 @@ impl RpcsForRequest { } let mut earliest_retry_at = None; - let mut wait_for_sync = FuturesUnordered::new(); + let mut wait_for_sync = Vec::new(); // TODO: we used to do a neat power of 2 random choices here, but it had bugs. bring that back for best_rpc in self.inner.iter() { @@ -938,26 +938,21 @@ impl RpcsForRequest { sleep_until(retry_at).await; } else { select!{ - x = wait_for_sync.next() => { + (x, _, _) = select_all(wait_for_sync) => { match x { - Some(Ok(rpc)) => { + Ok(rpc) => { trace!(%rpc, "rpc ready. it might be used on the next loop"); // TODO: i don't think this sleep should be necessary. but i just want the cpus to cool down sleep_until(min_wait_until).await; }, - Some(Err(err)) => { + Err(err) => { error!(?err, "problem while waiting for an rpc for a request"); // TODO: break or continue? // TODO: i don't think this sleep should be necessary. but i just want the cpus to cool down sleep_until(min_wait_until).await; }, - None => { - // this would only happen if we got to the end of wait_for_sync. but we stop after the first result - warn!("wait_for_sync is empty. how'd this happen?"); - break; - } } }, _ = sleep_until(retry_at) => { diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index 425aa286..cd81bcd1 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -2,7 +2,7 @@ use super::blockchain::{ArcBlock, BlocksByHashCache, Web3ProxyBlock}; use super::provider::{connect_ws, EthersWsProvider}; use super::request::{OpenRequestHandle, OpenRequestResult}; -use crate::app::{flatten_handle, Web3ProxyJoinHandle}; +use crate::app::Web3ProxyJoinHandle; use crate::config::{BlockAndRpc, Web3RpcConfig}; use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult}; use crate::jsonrpc::ValidatedRequest; @@ -12,7 +12,7 @@ use anyhow::{anyhow, Context}; use arc_swap::ArcSwapOption; use deduped_broadcast::DedupedBroadcaster; use ethers::prelude::{Address, Bytes, Middleware, Transaction, TxHash, U256, U64}; -use futures::stream::FuturesUnordered; +use futures::future::select_all; use futures::StreamExt; use latency::{EwmaLatency, PeakEwmaLatency, RollingQuantileLatency}; use migration::sea_orm::DatabaseConnection; @@ -738,7 +738,7 @@ impl Web3Rpc { .await .web3_context("failed check_provider")?; - let mut futures = FuturesUnordered::new(); + let mut futures = Vec::new(); // TODO: use this channel instead of self.disconnect_watch let (subscribe_stop_tx, mut subscribe_stop_rx) = watch::channel(false); @@ -760,7 +760,7 @@ impl Web3Rpc { Ok(()) }; - futures.push(flatten_handle(tokio::spawn(f))); + futures.push(tokio::spawn(f)); } else { unimplemented!("there should always be a disconnect watch!"); } @@ -851,7 +851,7 @@ impl Web3Rpc { self.healthy.store(initial_check, atomic::Ordering::Relaxed); - futures.push(flatten_handle(tokio::spawn(f))); + futures.push(tokio::spawn(f)); } else { self.healthy.store(true, atomic::Ordering::Relaxed); } @@ -876,7 +876,7 @@ impl Web3Rpc { .await }; - futures.push(flatten_handle(tokio::spawn(f))); + futures.push(tokio::spawn(f)); } // subscribe to new transactions @@ -898,12 +898,12 @@ impl Web3Rpc { .await }; - futures.push(flatten_handle(tokio::spawn(f))); + futures.push(tokio::spawn(f)); } // exit if any of the futures exit // TODO: have an enum for which one exited? - let first_exit = futures.next().await; + let (first_exit, _, _) = select_all(futures).await; debug!(?first_exit, "subscriptions on {} exited", self); diff --git a/web3_proxy_cli/src/sub_commands/proxyd.rs b/web3_proxy_cli/src/sub_commands/proxyd.rs index 8fef91dd..50af3c1e 100644 --- a/web3_proxy_cli/src/sub_commands/proxyd.rs +++ b/web3_proxy_cli/src/sub_commands/proxyd.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use std::time::Duration; use std::{fs, thread}; use tracing::{error, info, trace, warn}; -use web3_proxy::app::{flatten_handle, App}; +use web3_proxy::app::App; use web3_proxy::config::TopConfig; use web3_proxy::globals::global_db_conn; use web3_proxy::prelude::anyhow; @@ -145,7 +145,7 @@ impl ProxydSubCommand { // TODO: wait for SIGHUP instead? // TODO: wait for file to change instead of polling. file notifications are really fragile depending on the system and setup though - thread::sleep(Duration::from_secs(10)); + thread::sleep(Duration::from_secs(30)); }); } } @@ -227,23 +227,33 @@ impl ProxydSubCommand { // } // } // } - x = flatten_handle(frontend_handle) => { + x = frontend_handle => { frontend_exited = true; match x { - Ok(_) => info!("frontend exited"), - Err(e) => { + Ok(Ok(_)) => info!("frontend exited"), + Ok(Err(e)) => { error!("frontend exited: {:#?}", e); exited_with_err = true; } + Err(e) => { + error!(?e, "join on frontend failed"); + exited_with_err = true; + + } } } - x = flatten_handle(prometheus_handle) => { + x = prometheus_handle => { match x { - Ok(_) => info!("prometheus exited"), - Err(e) => { + Ok(Ok(_)) => info!("prometheus exited"), + Ok(Err(e)) => { error!("prometheus exited: {:#?}", e); exited_with_err = true; } + Err(e) => { + error!(?e, "join on prometheus failed"); + exited_with_err = true; + + } } } x = tokio::signal::ctrl_c() => {