2023-01-18 08:26:10 +03:00
|
|
|
#![forbid(unsafe_code)]
|
|
|
|
use argh::FromArgs;
|
|
|
|
use futures::StreamExt;
|
2023-01-26 08:24:09 +03:00
|
|
|
use log::{error, info, trace, warn};
|
2023-01-18 08:26:10 +03:00
|
|
|
use num::Zero;
|
2023-02-28 00:13:18 +03:00
|
|
|
use std::path::PathBuf;
|
|
|
|
use std::time::Duration;
|
|
|
|
use std::{fs, thread};
|
2023-01-18 08:26:10 +03:00
|
|
|
use tokio::sync::broadcast;
|
|
|
|
use web3_proxy::app::{flatten_handle, flatten_handles, Web3ProxyApp};
|
|
|
|
use web3_proxy::config::TopConfig;
|
2023-01-26 08:24:09 +03:00
|
|
|
use web3_proxy::{frontend, prometheus};
|
2023-03-04 23:19:39 +03:00
|
|
|
use anyhow::Context;
|
2023-01-18 08:26:10 +03:00
|
|
|
|
2023-01-24 08:08:24 +03:00
|
|
|
/// start the main proxy daemon
|
2023-01-18 08:26:10 +03:00
|
|
|
#[derive(FromArgs, PartialEq, Debug, Eq)]
|
|
|
|
#[argh(subcommand, name = "proxyd")]
|
|
|
|
pub struct ProxydSubCommand {
|
|
|
|
/// path to a toml of rpc servers
|
|
|
|
/// what port the proxy should listen on
|
|
|
|
#[argh(option, default = "8544")]
|
|
|
|
pub port: u16,
|
|
|
|
|
|
|
|
/// what port the proxy should expose prometheus stats on
|
|
|
|
#[argh(option, default = "8543")]
|
|
|
|
pub prometheus_port: u16,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl ProxydSubCommand {
|
2023-02-27 09:44:09 +03:00
|
|
|
pub async fn main(
|
|
|
|
self,
|
|
|
|
top_config: TopConfig,
|
|
|
|
top_config_path: PathBuf,
|
|
|
|
num_workers: usize,
|
|
|
|
) -> anyhow::Result<()> {
|
2023-01-18 08:26:10 +03:00
|
|
|
let (shutdown_sender, _) = broadcast::channel(1);
|
2023-02-27 09:44:09 +03:00
|
|
|
// TODO: i think there is a small race. if config_path changes
|
|
|
|
|
2023-01-18 08:26:10 +03:00
|
|
|
run(
|
|
|
|
top_config,
|
2023-02-27 09:44:09 +03:00
|
|
|
Some(top_config_path),
|
2023-01-18 08:26:10 +03:00
|
|
|
self.port,
|
|
|
|
self.prometheus_port,
|
|
|
|
num_workers,
|
|
|
|
shutdown_sender,
|
|
|
|
)
|
|
|
|
.await
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
async fn run(
|
2023-02-28 00:13:18 +03:00
|
|
|
mut top_config: TopConfig,
|
2023-02-27 09:44:09 +03:00
|
|
|
top_config_path: Option<PathBuf>,
|
2023-01-18 08:26:10 +03:00
|
|
|
frontend_port: u16,
|
|
|
|
prometheus_port: u16,
|
|
|
|
num_workers: usize,
|
2023-01-26 08:24:09 +03:00
|
|
|
frontend_shutdown_sender: broadcast::Sender<()>,
|
2023-01-18 08:26:10 +03:00
|
|
|
) -> anyhow::Result<()> {
|
|
|
|
// tokio has code for catching ctrl+c so we use that
|
|
|
|
// this shutdown sender is currently only used in tests, but we might make a /shutdown endpoint or something
|
|
|
|
// we do not need this receiver. new receivers are made by `shutdown_sender.subscribe()`
|
|
|
|
|
|
|
|
let app_frontend_port = frontend_port;
|
|
|
|
let app_prometheus_port = prometheus_port;
|
2023-01-26 08:24:09 +03:00
|
|
|
|
|
|
|
// TODO: should we use a watch or broadcast for these?
|
|
|
|
let (app_shutdown_sender, _app_shutdown_receiver) = broadcast::channel(1);
|
|
|
|
|
|
|
|
let frontend_shutdown_receiver = frontend_shutdown_sender.subscribe();
|
|
|
|
let prometheus_shutdown_receiver = app_shutdown_sender.subscribe();
|
|
|
|
|
|
|
|
// TODO: should we use a watch or broadcast for these?
|
|
|
|
let (frontend_shutdown_complete_sender, mut frontend_shutdown_complete_receiver) =
|
|
|
|
broadcast::channel(1);
|
2023-01-18 08:26:10 +03:00
|
|
|
|
|
|
|
// start the main app
|
2023-01-26 08:24:09 +03:00
|
|
|
let mut spawned_app = Web3ProxyApp::spawn(top_config, num_workers, app_shutdown_sender.clone()).await?;
|
2023-02-27 10:52:37 +03:00
|
|
|
|
|
|
|
// start thread for watching config
|
2023-01-26 08:24:09 +03:00
|
|
|
// if let Some(top_config_path) = top_config_path {
|
|
|
|
// let config_sender = spawned_app.new_top_config_sender;
|
|
|
|
// {
|
|
|
|
// thread::spawn(move || loop {
|
|
|
|
// match fs::read_to_string(&top_config_path) {
|
|
|
|
// Ok(new_top_config) => match toml::from_str(&new_top_config) {
|
|
|
|
// Ok(new_top_config) => {
|
|
|
|
// if new_top_config != top_config {
|
|
|
|
// top_config = new_top_config;
|
|
|
|
// config_sender.send(top_config.clone()).unwrap();
|
|
|
|
// }
|
|
|
|
// }
|
|
|
|
// Err(err) => {
|
|
|
|
// // TODO: panic?
|
|
|
|
// error!("Unable to parse config! {:#?}", err);
|
|
|
|
// }
|
|
|
|
// },
|
|
|
|
// Err(err) => {
|
|
|
|
// // TODO: panic?
|
|
|
|
// error!("Unable to read config! {:#?}", err);
|
|
|
|
// }
|
|
|
|
// }
|
|
|
|
//
|
|
|
|
// thread::sleep(Duration::from_secs(10));
|
|
|
|
// });
|
|
|
|
// }
|
|
|
|
// }
|
2023-01-18 08:26:10 +03:00
|
|
|
|
2023-01-19 13:13:00 +03:00
|
|
|
// start the prometheus metrics port
|
2023-01-26 08:24:09 +03:00
|
|
|
let prometheus_handle = tokio::spawn(prometheus::serve(
|
2023-01-18 08:26:10 +03:00
|
|
|
spawned_app.app.clone(),
|
|
|
|
app_prometheus_port,
|
2023-01-26 08:24:09 +03:00
|
|
|
prometheus_shutdown_receiver,
|
2023-01-18 08:26:10 +03:00
|
|
|
));
|
|
|
|
|
2023-01-19 13:13:00 +03:00
|
|
|
// wait until the app has seen its first consensus head block
|
2023-01-26 08:24:09 +03:00
|
|
|
// if backups were included, wait a little longer
|
2023-03-04 23:19:39 +03:00
|
|
|
// for _ in 0..3 {
|
|
|
|
// let _ = spawned_app.consensus_connections_watcher.changed().await;
|
|
|
|
//
|
|
|
|
// let consensus = spawned_app
|
|
|
|
// .consensus_connections_watcher
|
|
|
|
// .borrow_and_update();
|
|
|
|
//
|
|
|
|
// if *consensus.context("Channel closed!")?.backups_needed {
|
|
|
|
// info!(
|
|
|
|
// "waiting longer. found consensus with backups: {}",
|
|
|
|
// *consensus.context("Channel closed!")?.head_block.as_ref().unwrap(),
|
|
|
|
// );
|
|
|
|
// } else {
|
|
|
|
// // TODO: also check that we have at least one archive node connected?
|
|
|
|
// break;
|
|
|
|
// }
|
|
|
|
// }
|
2023-01-19 13:13:00 +03:00
|
|
|
|
|
|
|
// start the frontend port
|
2023-01-26 08:24:09 +03:00
|
|
|
let frontend_handle = tokio::spawn(frontend::serve(
|
|
|
|
app_frontend_port,
|
|
|
|
spawned_app.app.clone(),
|
|
|
|
frontend_shutdown_receiver,
|
|
|
|
frontend_shutdown_complete_sender,
|
|
|
|
));
|
|
|
|
|
|
|
|
let frontend_handle = flatten_handle(frontend_handle);
|
2023-01-18 08:26:10 +03:00
|
|
|
|
2023-01-19 13:13:00 +03:00
|
|
|
// if everything is working, these should all run forever
|
2023-01-26 08:24:09 +03:00
|
|
|
let mut exited_with_err = false;
|
|
|
|
let mut frontend_exited = false;
|
2023-01-18 08:26:10 +03:00
|
|
|
tokio::select! {
|
|
|
|
x = flatten_handles(spawned_app.app_handles) => {
|
|
|
|
match x {
|
|
|
|
Ok(_) => info!("app_handle exited"),
|
|
|
|
Err(e) => {
|
2023-01-26 08:24:09 +03:00
|
|
|
error!("app_handle exited: {:#?}", e);
|
|
|
|
exited_with_err = true;
|
2023-01-18 08:26:10 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2023-01-26 08:24:09 +03:00
|
|
|
x = frontend_handle => {
|
|
|
|
frontend_exited = true;
|
2023-01-18 08:26:10 +03:00
|
|
|
match x {
|
|
|
|
Ok(_) => info!("frontend exited"),
|
|
|
|
Err(e) => {
|
2023-01-26 08:24:09 +03:00
|
|
|
error!("frontend exited: {:#?}", e);
|
|
|
|
exited_with_err = true;
|
2023-01-18 08:26:10 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
x = flatten_handle(prometheus_handle) => {
|
|
|
|
match x {
|
|
|
|
Ok(_) => info!("prometheus exited"),
|
|
|
|
Err(e) => {
|
2023-01-26 08:24:09 +03:00
|
|
|
error!("prometheus exited: {:#?}", e);
|
|
|
|
exited_with_err = true;
|
2023-01-18 08:26:10 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
x = tokio::signal::ctrl_c() => {
|
2023-01-26 08:24:09 +03:00
|
|
|
// TODO: unix terminate signal, too
|
2023-01-18 08:26:10 +03:00
|
|
|
match x {
|
|
|
|
Ok(_) => info!("quiting from ctrl-c"),
|
|
|
|
Err(e) => {
|
2023-01-26 08:24:09 +03:00
|
|
|
// TODO: i don't think this is possible
|
|
|
|
error!("error quiting from ctrl-c: {:#?}", e);
|
|
|
|
exited_with_err = true;
|
2023-01-18 08:26:10 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2023-01-26 08:24:09 +03:00
|
|
|
// TODO: how can we properly watch background handles here? this returns None immediatly and the app exits. i think the bug is somewhere else though
|
|
|
|
x = spawned_app.background_handles.next() => {
|
2023-01-18 08:26:10 +03:00
|
|
|
match x {
|
2023-01-26 08:24:09 +03:00
|
|
|
Some(Ok(_)) => info!("quiting from background handles"),
|
|
|
|
Some(Err(e)) => {
|
|
|
|
error!("quiting from background handle error: {:#?}", e);
|
|
|
|
exited_with_err = true;
|
|
|
|
}
|
|
|
|
None => {
|
|
|
|
// TODO: is this an error?
|
|
|
|
warn!("background handles exited");
|
2023-01-18 08:26:10 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
2023-01-26 08:24:09 +03:00
|
|
|
// if a future above completed, make sure the frontend knows to start turning off
|
|
|
|
if !frontend_exited {
|
|
|
|
if let Err(err) = frontend_shutdown_sender.send(()) {
|
|
|
|
// TODO: this is actually expected if the frontend is already shut down
|
|
|
|
warn!("shutdown sender err={:?}", err);
|
|
|
|
};
|
|
|
|
}
|
|
|
|
|
|
|
|
// TODO: wait until the frontend completes
|
|
|
|
if let Err(err) = frontend_shutdown_complete_receiver.recv().await {
|
|
|
|
warn!("shutdown completition err={:?}", err);
|
|
|
|
} else {
|
|
|
|
info!("frontend exited gracefully");
|
|
|
|
}
|
|
|
|
|
|
|
|
// now that the frontend is complete, tell all the other futures to finish
|
|
|
|
if let Err(err) = app_shutdown_sender.send(()) {
|
|
|
|
warn!("backend sender err={:?}", err);
|
2023-01-18 08:26:10 +03:00
|
|
|
};
|
|
|
|
|
2023-01-26 08:24:09 +03:00
|
|
|
info!(
|
|
|
|
"waiting on {} important background tasks",
|
|
|
|
spawned_app.background_handles.len()
|
|
|
|
);
|
2023-01-18 08:26:10 +03:00
|
|
|
let mut background_errors = 0;
|
|
|
|
while let Some(x) = spawned_app.background_handles.next().await {
|
|
|
|
match x {
|
|
|
|
Err(e) => {
|
|
|
|
error!("{:?}", e);
|
|
|
|
background_errors += 1;
|
|
|
|
}
|
|
|
|
Ok(Err(e)) => {
|
|
|
|
error!("{:?}", e);
|
|
|
|
background_errors += 1;
|
|
|
|
}
|
2023-01-26 08:24:09 +03:00
|
|
|
Ok(Ok(_)) => {
|
|
|
|
// TODO: how can we know which handle exited?
|
|
|
|
trace!("a background handle exited");
|
|
|
|
continue;
|
|
|
|
}
|
2023-01-18 08:26:10 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-01-26 08:24:09 +03:00
|
|
|
if background_errors.is_zero() && !exited_with_err {
|
2023-01-18 08:26:10 +03:00
|
|
|
info!("finished");
|
|
|
|
Ok(())
|
|
|
|
} else {
|
2023-01-26 08:24:09 +03:00
|
|
|
// TODO: collect all the errors here instead?
|
2023-01-18 08:26:10 +03:00
|
|
|
Err(anyhow::anyhow!("finished with errors!"))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[cfg(test)]
|
|
|
|
mod tests {
|
|
|
|
use ethers::{
|
|
|
|
prelude::{Http, Provider, U256},
|
|
|
|
utils::Anvil,
|
|
|
|
};
|
|
|
|
use hashbrown::HashMap;
|
|
|
|
use std::env;
|
|
|
|
|
|
|
|
use web3_proxy::{
|
2023-02-06 20:55:27 +03:00
|
|
|
config::{AppConfig, Web3RpcConfig},
|
2023-01-18 08:26:10 +03:00
|
|
|
rpcs::blockchain::ArcBlock,
|
|
|
|
};
|
|
|
|
|
|
|
|
use super::*;
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
async fn it_works() {
|
|
|
|
// TODO: move basic setup into a test fixture
|
|
|
|
let path = env::var("PATH").unwrap();
|
|
|
|
|
|
|
|
println!("path: {}", path);
|
|
|
|
|
|
|
|
// TODO: how should we handle logs in this?
|
|
|
|
// TODO: option for super verbose logs
|
|
|
|
std::env::set_var("RUST_LOG", "info,web3_proxy=debug");
|
|
|
|
|
|
|
|
let _ = env_logger::builder().is_test(true).try_init();
|
|
|
|
|
|
|
|
let anvil = Anvil::new().spawn();
|
|
|
|
|
|
|
|
println!("Anvil running at `{}`", anvil.endpoint());
|
|
|
|
|
|
|
|
let anvil_provider = Provider::<Http>::try_from(anvil.endpoint()).unwrap();
|
|
|
|
|
|
|
|
// mine a block because my code doesn't like being on block 0
|
|
|
|
// TODO: make block 0 okay? is it okay now?
|
|
|
|
let _: U256 = anvil_provider
|
|
|
|
.request("evm_mine", None::<()>)
|
|
|
|
.await
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
// make a test TopConfig
|
|
|
|
// TODO: load TopConfig from a file? CliConfig could have `cli_config.load_top_config`. would need to inject our endpoint ports
|
|
|
|
let top_config = TopConfig {
|
|
|
|
app: AppConfig {
|
|
|
|
chain_id: 31337,
|
|
|
|
default_user_max_requests_per_period: Some(6_000_000),
|
|
|
|
min_sum_soft_limit: 1,
|
|
|
|
min_synced_rpcs: 1,
|
|
|
|
public_requests_per_period: Some(1_000_000),
|
2023-02-09 23:02:57 +03:00
|
|
|
response_cache_max_bytes: 10_u64.pow(7),
|
2023-01-18 08:26:10 +03:00
|
|
|
redirect_public_url: Some("example.com/".to_string()),
|
|
|
|
redirect_rpc_key_url: Some("example.com/{{rpc_key_id}}".to_string()),
|
|
|
|
..Default::default()
|
|
|
|
},
|
|
|
|
balanced_rpcs: HashMap::from([
|
|
|
|
(
|
|
|
|
"anvil".to_string(),
|
2023-02-06 20:55:27 +03:00
|
|
|
Web3RpcConfig {
|
2023-02-12 12:22:53 +03:00
|
|
|
http_url: Some(anvil.endpoint()),
|
2023-01-18 08:26:10 +03:00
|
|
|
soft_limit: 100,
|
|
|
|
tier: 0,
|
2023-02-12 12:22:53 +03:00
|
|
|
..Default::default()
|
2023-01-18 08:26:10 +03:00
|
|
|
},
|
|
|
|
),
|
|
|
|
(
|
|
|
|
"anvil_ws".to_string(),
|
2023-02-06 20:55:27 +03:00
|
|
|
Web3RpcConfig {
|
2023-02-12 12:22:53 +03:00
|
|
|
ws_url: Some(anvil.ws_endpoint()),
|
2023-01-18 08:26:10 +03:00
|
|
|
soft_limit: 100,
|
|
|
|
tier: 0,
|
2023-02-12 12:22:53 +03:00
|
|
|
..Default::default()
|
|
|
|
},
|
|
|
|
),
|
|
|
|
(
|
|
|
|
"anvil_both".to_string(),
|
|
|
|
Web3RpcConfig {
|
|
|
|
http_url: Some(anvil.endpoint()),
|
|
|
|
ws_url: Some(anvil.ws_endpoint()),
|
|
|
|
..Default::default()
|
2023-01-18 08:26:10 +03:00
|
|
|
},
|
|
|
|
),
|
|
|
|
]),
|
|
|
|
private_rpcs: None,
|
|
|
|
extra: Default::default(),
|
|
|
|
};
|
|
|
|
|
2023-01-26 08:24:09 +03:00
|
|
|
let (shutdown_sender, _shutdown_receiver) = broadcast::channel(1);
|
2023-01-18 08:26:10 +03:00
|
|
|
|
|
|
|
// spawn another thread for running the app
|
|
|
|
// TODO: allow launching into the local tokio runtime instead of creating a new one?
|
|
|
|
let handle = {
|
|
|
|
let frontend_port = 0;
|
|
|
|
let prometheus_port = 0;
|
2023-01-26 08:24:09 +03:00
|
|
|
let shutdown_sender = shutdown_sender.clone();
|
2023-01-18 08:26:10 +03:00
|
|
|
|
|
|
|
tokio::spawn(async move {
|
|
|
|
run(
|
|
|
|
top_config,
|
2023-02-27 09:44:09 +03:00
|
|
|
None,
|
2023-01-18 08:26:10 +03:00
|
|
|
frontend_port,
|
|
|
|
prometheus_port,
|
|
|
|
2,
|
|
|
|
shutdown_sender,
|
|
|
|
)
|
|
|
|
.await
|
|
|
|
})
|
|
|
|
};
|
|
|
|
|
|
|
|
// TODO: do something to the node. query latest block, mine another block, query again
|
|
|
|
let proxy_provider = Provider::<Http>::try_from(anvil.endpoint()).unwrap();
|
|
|
|
|
|
|
|
let anvil_result = anvil_provider
|
|
|
|
.request::<_, Option<ArcBlock>>("eth_getBlockByNumber", ("latest", true))
|
|
|
|
.await
|
|
|
|
.unwrap()
|
|
|
|
.unwrap();
|
|
|
|
let proxy_result = proxy_provider
|
|
|
|
.request::<_, Option<ArcBlock>>("eth_getBlockByNumber", ("latest", true))
|
|
|
|
.await
|
|
|
|
.unwrap()
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
assert_eq!(anvil_result, proxy_result);
|
|
|
|
|
|
|
|
let first_block_num = anvil_result.number.unwrap();
|
|
|
|
|
|
|
|
let _: U256 = anvil_provider
|
|
|
|
.request("evm_mine", None::<()>)
|
|
|
|
.await
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
let anvil_result = anvil_provider
|
|
|
|
.request::<_, Option<ArcBlock>>("eth_getBlockByNumber", ("latest", true))
|
|
|
|
.await
|
|
|
|
.unwrap()
|
|
|
|
.unwrap();
|
|
|
|
let proxy_result = proxy_provider
|
|
|
|
.request::<_, Option<ArcBlock>>("eth_getBlockByNumber", ("latest", true))
|
|
|
|
.await
|
|
|
|
.unwrap()
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
assert_eq!(anvil_result, proxy_result);
|
|
|
|
|
|
|
|
let second_block_num = anvil_result.number.unwrap();
|
|
|
|
|
|
|
|
assert_eq!(first_block_num, second_block_num - 1);
|
|
|
|
|
|
|
|
// tell the test app to shut down
|
|
|
|
shutdown_sender.send(()).unwrap();
|
|
|
|
|
|
|
|
println!("waiting for shutdown...");
|
|
|
|
// TODO: panic if a timeout is reached
|
|
|
|
handle.await.unwrap().unwrap();
|
|
|
|
}
|
|
|
|
}
|