diff --git a/web3_proxy/src/bin/web3_proxy_cli/proxyd.rs b/web3_proxy/src/bin/web3_proxy_cli/proxyd.rs index 38fe591d..f46835b8 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/proxyd.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/proxyd.rs @@ -10,7 +10,6 @@ use tokio::sync::broadcast; use web3_proxy::app::{flatten_handle, flatten_handles, Web3ProxyApp}; use web3_proxy::config::TopConfig; use web3_proxy::{frontend, prometheus}; -use anyhow::Context; /// start the main proxy daemon #[derive(FromArgs, PartialEq, Debug, Eq)] @@ -64,6 +63,8 @@ async fn run( let app_prometheus_port = prometheus_port; // TODO: should we use a watch or broadcast for these? + // Maybe this one ? + // let mut shutdown_receiver = shutdown_sender.subscribe(); let (app_shutdown_sender, _app_shutdown_receiver) = broadcast::channel(1); let frontend_shutdown_receiver = frontend_shutdown_sender.subscribe(); @@ -74,7 +75,84 @@ async fn run( broadcast::channel(1); // start the main app - let mut spawned_app = Web3ProxyApp::spawn(top_config, num_workers, app_shutdown_sender.clone()).await?; + // let mut spawned_app = Web3ProxyApp::spawn(top_config, num_workers, app_shutdown_sender.clone()).await?; + let mut spawned_app = + Web3ProxyApp::spawn(top_config.clone(), num_workers, app_shutdown_sender.clone()).await?; + + // start thread for watching config + if let Some(top_config_path) = top_config_path { + let config_sender = spawned_app.new_top_config_sender; + /* + #[cfg(feature = "inotify")] + { + let mut inotify = Inotify::init().expect("Failed to initialize inotify"); + + inotify + .add_watch(top_config_path.clone(), WatchMask::MODIFY) + .expect("Failed to add inotify watch on config"); + + let mut buffer = [0u8; 4096]; + + // TODO: exit the app if this handle exits + thread::spawn(move || loop { + // TODO: debounce + + let events = inotify + .read_events_blocking(&mut buffer) + .expect("Failed to read inotify events"); + + for event in events { + if event.mask.contains(EventMask::MODIFY) { + info!("config changed"); + match fs::read_to_string(&top_config_path) { + Ok(top_config) => match toml::from_str(&top_config) { + Ok(top_config) => { + config_sender.send(top_config).unwrap(); + } + Err(err) => { + // TODO: panic? + error!("Unable to parse config! {:#?}", err); + } + }, + Err(err) => { + // TODO: panic? + error!("Unable to read config! {:#?}", err); + } + }; + } else { + // TODO: is "MODIFY" enough, or do we want CLOSE_WRITE? + unimplemented!(); + } + } + }); + } + */ + // #[cfg(not(feature = "inotify"))] + { + thread::spawn(move || loop { + match fs::read_to_string(&top_config_path) { + Ok(new_top_config) => match toml::from_str(&new_top_config) { + Ok(new_top_config) => { + if new_top_config != top_config { + top_config = new_top_config; + config_sender.send(top_config.clone()).unwrap(); + } + } + Err(err) => { + // TODO: panic? + error!("Unable to parse config! {:#?}", err); + } + }, + Err(err) => { + // TODO: panic? + error!("Unable to read config! {:#?}", err); + } + } + + thread::sleep(Duration::from_secs(10)); + }); + } + } // start thread for watching config // if let Some(top_config_path) = top_config_path { @@ -114,6 +192,8 @@ async fn run( // wait until the app has seen its first consensus head block // if backups were included, wait a little longer + // let _ = spawned_app.app.head_block_receiver().changed().await; + // if backups were included, wait a little longer // for _ in 0..3 { // let _ = spawned_app.consensus_connections_watcher.changed().await; // @@ -121,16 +201,19 @@ async fn run( // .consensus_connections_watcher // .borrow_and_update(); // - // if *consensus.context("Channel closed!")?.backups_needed { + // // Let's just do a super dirty unwrap to get things going + // if consensus.unwrap().backups_needed { // info!( // "waiting longer. found consensus with backups: {}", - // *consensus.context("Channel closed!")?.head_block.as_ref().unwrap(), + // consensus.unwrap().head_block.as_ref().unwrap(), // ); // } else { // // TODO: also check that we have at least one archive node connected? // break; // } // } + let _ = spawned_app.app.head_block_receiver().changed().await; + // start the frontend port let frontend_handle = tokio::spawn(frontend::serve(