only select on the balanced handle

This commit is contained in:
Bryan Stitt 2023-10-12 16:52:39 -07:00
parent 926003edd9
commit 6bd5ca3277
2 changed files with 35 additions and 21 deletions

View File

@ -162,8 +162,12 @@ pub async fn flatten_handles<T>(
pub struct Web3ProxyAppSpawn { pub struct Web3ProxyAppSpawn {
/// the app. probably clone this to use in other groups of handles /// the app. probably clone this to use in other groups of handles
pub app: Arc<App>, pub app: Arc<App>,
/// handles for the balanced and private rpcs /// handle for some rpcs
pub app_handles: FuturesUnordered<Web3ProxyJoinHandle<()>>, pub balanced_handle: Web3ProxyJoinHandle<()>,
/// handle for some rpcs
pub private_handle: Web3ProxyJoinHandle<()>,
/// handle for some rpcs
pub bundler_4337_rpcs_handle: Web3ProxyJoinHandle<()>,
/// these are important and must be allowed to finish /// these are important and must be allowed to finish
pub background_handles: FuturesUnordered<Web3ProxyJoinHandle<()>>, pub background_handles: FuturesUnordered<Web3ProxyJoinHandle<()>>,
/// config changes are sent here /// config changes are sent here
@ -207,11 +211,8 @@ impl App {
); );
} }
// these futures are key parts of the app. if they stop running, the app has encountered an irrecoverable error
// TODO: this is a small enough group, that a vec with try_join_all is probably fine
let app_handles: FuturesUnordered<Web3ProxyJoinHandle<()>> = FuturesUnordered::new();
// we must wait for these to end on their own (and they need to subscribe to shutdown_sender) // we must wait for these to end on their own (and they need to subscribe to shutdown_sender)
// TODO: is FuturesUnordered what we need? I want to return when the first one returns
let important_background_handles: FuturesUnordered<Web3ProxyJoinHandle<()>> = let important_background_handles: FuturesUnordered<Web3ProxyJoinHandle<()>> =
FuturesUnordered::new(); FuturesUnordered::new();
@ -453,8 +454,6 @@ impl App {
.await .await
.web3_context("spawning balanced rpcs")?; .web3_context("spawning balanced rpcs")?;
app_handles.push(balanced_handle);
// prepare a Web3Rpcs to hold all our private connections // prepare a Web3Rpcs to hold all our private connections
// only some chains have this, so this might be empty // only some chains have this, so this might be empty
// TODO: set min_sum_soft_limit > 0 if any private rpcs are configured. this way we don't accidently leak to the public mempool if they are all offline // TODO: set min_sum_soft_limit > 0 if any private rpcs are configured. this way we don't accidently leak to the public mempool if they are all offline
@ -475,8 +474,6 @@ impl App {
.await .await
.web3_context("spawning private_rpcs")?; .web3_context("spawning private_rpcs")?;
app_handles.push(private_handle);
// prepare a Web3Rpcs to hold all our 4337 Abstraction Bundler connections (if any) // prepare a Web3Rpcs to hold all our 4337 Abstraction Bundler connections (if any)
let (bundler_4337_rpcs, bundler_4337_rpcs_handle, _) = Web3Rpcs::spawn( let (bundler_4337_rpcs, bundler_4337_rpcs_handle, _) = Web3Rpcs::spawn(
chain_id, chain_id,
@ -491,8 +488,6 @@ impl App {
.await .await
.web3_context("spawning bundler_4337_rpcs")?; .web3_context("spawning bundler_4337_rpcs")?;
app_handles.push(bundler_4337_rpcs_handle);
let hostname = hostname::get() let hostname = hostname::get()
.ok() .ok()
.and_then(|x| x.to_str().map(|x| x.to_string())); .and_then(|x| x.to_str().map(|x| x.to_string()));
@ -609,7 +604,9 @@ impl App {
Ok(Web3ProxyAppSpawn { Ok(Web3ProxyAppSpawn {
app, app,
app_handles, balanced_handle,
private_handle,
bundler_4337_rpcs_handle,
background_handles: important_background_handles, background_handles: important_background_handles,
new_top_config: Arc::new(new_top_config_sender), new_top_config: Arc::new(new_top_config_sender),
ranked_rpcs: consensus_connections_watcher, ranked_rpcs: consensus_connections_watcher,

View File

@ -3,9 +3,8 @@ use std::sync::atomic::AtomicU16;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use std::{fs, thread}; use std::{fs, thread};
use tokio::time::sleep;
use tracing::{error, info, trace, warn}; use tracing::{error, info, trace, warn};
use web3_proxy::app::{flatten_handle, flatten_handles, App}; use web3_proxy::app::{flatten_handle, App};
use web3_proxy::config::TopConfig; use web3_proxy::config::TopConfig;
use web3_proxy::globals::global_db_conn; use web3_proxy::globals::global_db_conn;
use web3_proxy::prelude::anyhow; use web3_proxy::prelude::anyhow;
@ -193,24 +192,42 @@ impl ProxydSubCommand {
frontend_shutdown_complete_sender, frontend_shutdown_complete_sender,
)); ));
let frontend_handle = flatten_handle(frontend_handle);
let mut terminate_stream = signal::unix::signal(SignalKind::terminate())?; let mut terminate_stream = signal::unix::signal(SignalKind::terminate())?;
// if everything is working, these should all run forever // if everything is working, these should all run forever
let mut exited_with_err = false; let mut exited_with_err = false;
let mut frontend_exited = false; let mut frontend_exited = false;
select! { select! {
x = flatten_handles(spawned_app.app_handles) => { x = spawned_app.balanced_handle => {
match x { match x {
Ok(_) => info!("app_handle exited"), Ok(_) => info!("balanced_handle exited"),
Err(e) => { Err(e) => {
error!("app_handle exited: {:#?}", e); error!("balanced_handle exited: {:#?}", e);
exited_with_err = true; exited_with_err = true;
} }
} }
} }
x = frontend_handle => { // // TODO: this handle always exits right away because it doesn't subscribe to any blocks
// x = spawned_app.private_handle => {
// match x {
// Ok(_) => info!("private_handle exited"),
// Err(e) => {
// error!("private_handle exited: {:#?}", e);
// exited_with_err = true;
// }
// }
// }
// // TODO: this handle always exits right away because it doesn't subscribe to any blocks
// x = spawned_app.bundler_4337_rpcs_handle => {
// match x {
// Ok(_) => info!("bundler_4337_rpcs_handle exited"),
// Err(e) => {
// error!("bundler_4337_rpcs_handle exited: {:#?}", e);
// exited_with_err = true;
// }
// }
// }
x = flatten_handle(frontend_handle) => {
frontend_exited = true; frontend_exited = true;
match x { match x {
Ok(_) => info!("frontend exited"), Ok(_) => info!("frontend exited"),