tests seem to run successfully, should double check the huge merge

This commit is contained in:
yenicelik 2023-03-04 22:31:39 +01:00
parent 5a54fc5e99
commit ffd63444b2

@ -10,7 +10,6 @@ use tokio::sync::broadcast;
use web3_proxy::app::{flatten_handle, flatten_handles, Web3ProxyApp};
use web3_proxy::config::TopConfig;
use web3_proxy::{frontend, prometheus};
use anyhow::Context;
/// start the main proxy daemon
#[derive(FromArgs, PartialEq, Debug, Eq)]
@ -64,6 +63,8 @@ async fn run(
let app_prometheus_port = prometheus_port;
// TODO: should we use a watch or broadcast for these?
// Maybe this one ?
// let mut shutdown_receiver = shutdown_sender.subscribe();
let (app_shutdown_sender, _app_shutdown_receiver) = broadcast::channel(1);
let frontend_shutdown_receiver = frontend_shutdown_sender.subscribe();
@ -74,7 +75,84 @@ async fn run(
broadcast::channel(1);
// start the main app
let mut spawned_app = Web3ProxyApp::spawn(top_config, num_workers, app_shutdown_sender.clone()).await?;
// let mut spawned_app = Web3ProxyApp::spawn(top_config, num_workers, app_shutdown_sender.clone()).await?;
let mut spawned_app =
Web3ProxyApp::spawn(top_config.clone(), num_workers, app_shutdown_sender.clone()).await?;
// start thread for watching config
if let Some(top_config_path) = top_config_path {
let config_sender = spawned_app.new_top_config_sender;
/*
#[cfg(feature = "inotify")]
{
let mut inotify = Inotify::init().expect("Failed to initialize inotify");
inotify
.add_watch(top_config_path.clone(), WatchMask::MODIFY)
.expect("Failed to add inotify watch on config");
let mut buffer = [0u8; 4096];
// TODO: exit the app if this handle exits
thread::spawn(move || loop {
// TODO: debounce
let events = inotify
.read_events_blocking(&mut buffer)
.expect("Failed to read inotify events");
for event in events {
if event.mask.contains(EventMask::MODIFY) {
info!("config changed");
match fs::read_to_string(&top_config_path) {
Ok(top_config) => match toml::from_str(&top_config) {
Ok(top_config) => {
config_sender.send(top_config).unwrap();
}
Err(err) => {
// TODO: panic?
error!("Unable to parse config! {:#?}", err);
}
},
Err(err) => {
// TODO: panic?
error!("Unable to read config! {:#?}", err);
}
};
} else {
// TODO: is "MODIFY" enough, or do we want CLOSE_WRITE?
unimplemented!();
}
}
});
}
*/
// #[cfg(not(feature = "inotify"))]
{
thread::spawn(move || loop {
match fs::read_to_string(&top_config_path) {
Ok(new_top_config) => match toml::from_str(&new_top_config) {
Ok(new_top_config) => {
if new_top_config != top_config {
top_config = new_top_config;
config_sender.send(top_config.clone()).unwrap();
}
}
Err(err) => {
// TODO: panic?
error!("Unable to parse config! {:#?}", err);
}
},
Err(err) => {
// TODO: panic?
error!("Unable to read config! {:#?}", err);
}
}
thread::sleep(Duration::from_secs(10));
});
}
}
// start thread for watching config
// if let Some(top_config_path) = top_config_path {
@ -114,6 +192,8 @@ async fn run(
// wait until the app has seen its first consensus head block
// if backups were included, wait a little longer
// let _ = spawned_app.app.head_block_receiver().changed().await;
// if backups were included, wait a little longer
// for _ in 0..3 {
// let _ = spawned_app.consensus_connections_watcher.changed().await;
//
@ -121,16 +201,19 @@ async fn run(
// .consensus_connections_watcher
// .borrow_and_update();
//
// if *consensus.context("Channel closed!")?.backups_needed {
// // Let's just do a super dirty unwrap to get things going
// if consensus.unwrap().backups_needed {
// info!(
// "waiting longer. found consensus with backups: {}",
// *consensus.context("Channel closed!")?.head_block.as_ref().unwrap(),
// consensus.unwrap().head_block.as_ref().unwrap(),
// );
// } else {
// // TODO: also check that we have at least one archive node connected?
// break;
// }
// }
let _ = spawned_app.app.head_block_receiver().changed().await;
// start the frontend port
let frontend_handle = tokio::spawn(frontend::serve(