diff --git a/Dockerfile b/Dockerfile index c1487de7..2f35c3bf 100644 --- a/Dockerfile +++ b/Dockerfile @@ -13,7 +13,9 @@ RUN --mount=type=cache,target=/usr/local/cargo/registry \ FROM debian:bullseye-slim COPY --from=builder /opt/bin/* /usr/local/bin/ -ENTRYPOINT ["web3_proxy"] + +# TODO: be careful changing this to just web3_proxy_cli. if you don't do it correctly, there will be a production outage! +ENTRYPOINT ["web3_proxy_cli", "proxyd"] # TODO: lower log level when done with prototyping ENV RUST_LOG "web3_proxy=debug" diff --git a/README.md b/README.md index d16a3c66..6f2e67c0 100644 --- a/README.md +++ b/README.md @@ -37,7 +37,7 @@ Options: Start the server with the defaults (listen on `http://localhost:8544` and use `./config/development.toml` which uses the database and cache running under docker and proxies to a bunch of public nodes: ``` -cargo run --release +cargo run --release -- daemon ``` ## Common commands @@ -45,7 +45,7 @@ cargo run --release Create a user: ``` -cargo run --bin web3_proxy_cli -- --db-url "$YOUR_DB_URL" create_user --address "$USER_ADDRESS_0x" +cargo run -- --db-url "$YOUR_DB_URL" create_user --address "$USER_ADDRESS_0x" ``` Check that the proxy is working: diff --git a/TODO.md b/TODO.md index 5b10936b..081844ac 100644 --- a/TODO.md +++ b/TODO.md @@ -300,6 +300,12 @@ These are not yet ordered. There might be duplicates. We might not actually need - [x] if private txs are disabled, only send trasactions to some of our servers. we were DOSing ourselves with transactions and slowing down sync - [x] retry if we get "the method X is not available" - [x] remove weight. we don't use it anymore. tiers are what we use now +- [x] make deadlock feature optional +- [x] standalone healthcheck daemon (sentryd) +- [x] status page should show version +- [x] combine the proxy and cli into one bin +- [-] proxy mode for benchmarking all backends +- [-] proxy mode for sending to multiple backends - [-] let users choose a % of reverts to log (or maybe x/second). someone like curve logging all reverts will be a BIG database very quickly - this must be opt-in and spawned in the background since it will slow things down and will make their calls less private - [ ] automatic pruning of old revert logs once too many are collected @@ -578,7 +584,6 @@ in another repo: event subscriber - [ ] sentry profiling - [ ] support alchemy_minedTransactions - [ ] debug print of user::Model's address is a big vec of numbers. make that hex somehow -- [ ] should we combine the proxy and cli into one bin? - [ ] make it so you can put a string like "LN arbitrum" into the create_user script, and have it automatically turn it into 0x4c4e20617262697472756d000000000000000000. - [ ] if --address not given, use the --description - [ ] if it is too long, (the last 4 bytes must be zero), give an error so descriptions like this stand out diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index 9da390b1..e0c64087 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -2,7 +2,7 @@ name = "web3_proxy" version = "0.12.0" edition = "2021" -default-run = "web3_proxy" +default-run = "web3_proxy_cli" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/web3_proxy/src/bin/web3_proxy.rs b/web3_proxy/src/bin/web3_proxy.rs deleted file mode 100644 index f1461a22..00000000 --- a/web3_proxy/src/bin/web3_proxy.rs +++ /dev/null @@ -1,408 +0,0 @@ -//! Web3_proxy is a fast caching and load balancing proxy for web3 (Ethereum or similar) JsonRPC servers. -//! -//! Signed transactions (eth_sendRawTransaction) are sent in parallel to the configured private RPCs (eden, ethermine, flashbots, etc.). -//! -//! All other requests are sent to an RPC server on the latest block (alchemy, moralis, rivet, your own node, or one of many other providers). -//! If multiple servers are in sync, the fastest server is prioritized. Since the fastest server is most likely to serve requests, slow servers are unlikely to ever get any requests. - -//#![warn(missing_docs)] -#![forbid(unsafe_code)] - -use anyhow::Context; -use futures::StreamExt; -use log::{debug, error, info, warn}; -use num::Zero; -use std::fs; -use std::path::Path; -use std::sync::atomic::{self, AtomicUsize}; -use tokio::runtime; -use tokio::sync::broadcast; -use web3_proxy::app::{flatten_handle, flatten_handles, Web3ProxyApp, APP_USER_AGENT}; -use web3_proxy::config::{CliConfig, TopConfig}; -use web3_proxy::{frontend, metrics_frontend}; - -#[cfg(feature = "deadlock")] -use parking_lot::deadlock; -#[cfg(feature = "deadlock")] -use std::thread; -#[cfg(feature = "deadlock")] -use tokio::time::Duration; - -fn run( - shutdown_sender: broadcast::Sender<()>, - cli_config: CliConfig, - top_config: TopConfig, -) -> anyhow::Result<()> { - debug!("{:?}", cli_config); - debug!("{:?}", top_config); - - let mut shutdown_receiver = shutdown_sender.subscribe(); - - #[cfg(feature = "deadlock")] - { - // spawn a thread for deadlock detection - thread::spawn(move || loop { - thread::sleep(Duration::from_secs(10)); - let deadlocks = deadlock::check_deadlock(); - if deadlocks.is_empty() { - continue; - } - - println!("{} deadlocks detected", deadlocks.len()); - for (i, threads) in deadlocks.iter().enumerate() { - println!("Deadlock #{}", i); - for t in threads { - println!("Thread Id {:#?}", t.thread_id()); - println!("{:#?}", t.backtrace()); - } - } - }); - } - - // set up tokio's async runtime - let mut rt_builder = runtime::Builder::new_multi_thread(); - - let chain_id = top_config.app.chain_id; - rt_builder.enable_all().thread_name_fn(move || { - static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0); - // TODO: what ordering? i think we want seqcst so that these all happen in order, but that might be stricter than we really need - let worker_id = ATOMIC_ID.fetch_add(1, atomic::Ordering::SeqCst); - // TODO: i think these max at 15 characters - format!("web3-{}-{}", chain_id, worker_id) - }); - - if cli_config.workers > 0 { - rt_builder.worker_threads(cli_config.workers); - } - - // start tokio's async runtime - let rt = rt_builder.build()?; - - let num_workers = rt.metrics().num_workers(); - info!("num_workers: {}", num_workers); - - rt.block_on(async { - let app_frontend_port = cli_config.port; - let app_prometheus_port = cli_config.prometheus_port; - - // start the main app - let mut spawned_app = - Web3ProxyApp::spawn(top_config, num_workers, shutdown_sender.subscribe()).await?; - - let frontend_handle = - tokio::spawn(frontend::serve(app_frontend_port, spawned_app.app.clone())); - - // TODO: should we put this in a dedicated thread? - let prometheus_handle = tokio::spawn(metrics_frontend::serve( - spawned_app.app.clone(), - app_prometheus_port, - )); - - // if everything is working, these should both run forever - tokio::select! { - x = flatten_handles(spawned_app.app_handles) => { - match x { - Ok(_) => info!("app_handle exited"), - Err(e) => { - return Err(e); - } - } - } - x = flatten_handle(frontend_handle) => { - match x { - Ok(_) => info!("frontend exited"), - Err(e) => { - return Err(e); - } - } - } - x = flatten_handle(prometheus_handle) => { - match x { - Ok(_) => info!("prometheus exited"), - Err(e) => { - return Err(e); - } - } - } - x = tokio::signal::ctrl_c() => { - match x { - Ok(_) => info!("quiting from ctrl-c"), - Err(e) => { - return Err(e.into()); - } - } - } - x = shutdown_receiver.recv() => { - match x { - Ok(_) => info!("quiting from shutdown receiver"), - Err(e) => { - return Err(e.into()); - } - } - } - }; - - // one of the handles stopped. send a value so the others know to shut down - if let Err(err) = shutdown_sender.send(()) { - warn!("shutdown sender err={:?}", err); - }; - - // wait for things like saving stats to the database to complete - info!("waiting on important background tasks"); - let mut background_errors = 0; - while let Some(x) = spawned_app.background_handles.next().await { - match x { - Err(e) => { - error!("{:?}", e); - background_errors += 1; - } - Ok(Err(e)) => { - error!("{:?}", e); - background_errors += 1; - } - Ok(Ok(_)) => continue, - } - } - - if background_errors.is_zero() { - info!("finished"); - Ok(()) - } else { - // TODO: collect instead? - Err(anyhow::anyhow!("finished with errors!")) - } - }) -} - -fn main() -> anyhow::Result<()> { - // if RUST_LOG isn't set, configure a default - let rust_log = match std::env::var("RUST_LOG") { - Ok(x) => x, - Err(_) => "info,ethers=debug,redis_rate_limit=debug,web3_proxy=debug".to_string(), - }; - - // this probably won't matter for us in docker, but better safe than sorry - fdlimit::raise_fd_limit(); - - // initial configuration from flags - let cli_config: CliConfig = argh::from_env(); - - // convert to absolute path so error logging is most helpful - let config_path = Path::new(&cli_config.config) - .canonicalize() - .context(format!( - "checking full path of {} and {}", - ".", // TODO: get cwd somehow - cli_config.config - ))?; - - // advanced configuration is on disk - let top_config: String = fs::read_to_string(config_path.clone()) - .context(format!("reading config at {}", config_path.display()))?; - let top_config: TopConfig = toml::from_str(&top_config) - .context(format!("parsing config at {}", config_path.display()))?; - - // TODO: this doesn't seem to do anything - proctitle::set_title(format!("web3_proxy-{}", top_config.app.chain_id)); - - let logger = env_logger::builder().parse_filters(&rust_log).build(); - - let max_level = logger.filter(); - - // connect to sentry for error reporting - // if no sentry, only log to stdout - let _sentry_guard = if let Some(sentry_url) = top_config.app.sentry_url.clone() { - let logger = sentry::integrations::log::SentryLogger::with_dest(logger); - - log::set_boxed_logger(Box::new(logger)).unwrap(); - - let guard = sentry::init(( - sentry_url, - sentry::ClientOptions { - release: sentry::release_name!(), - // TODO: Set this a to lower value (from config) in production - traces_sample_rate: 1.0, - ..Default::default() - }, - )); - - Some(guard) - } else { - log::set_boxed_logger(Box::new(logger)).unwrap(); - - None - }; - - log::set_max_level(max_level); - - info!("{}", APP_USER_AGENT); - - // we used to do this earlier, but now we attach sentry - debug!("CLI config @ {:#?}", cli_config.config); - - // tokio has code for catching ctrl+c so we use that - // this shutdown sender is currently only used in tests, but we might make a /shutdown endpoint or something - // we do not need this receiver. new receivers are made by `shutdown_sender.subscribe()` - let (shutdown_sender, _) = broadcast::channel(1); - - run(shutdown_sender, cli_config, top_config) -} - -#[cfg(test)] -mod tests { - use ethers::{ - prelude::{Http, Provider, U256}, - utils::Anvil, - }; - use hashbrown::HashMap; - use std::env; - use std::thread; - - use web3_proxy::{ - config::{AppConfig, Web3ConnectionConfig}, - rpcs::blockchain::ArcBlock, - }; - - use super::*; - - #[tokio::test] - async fn it_works() { - // TODO: move basic setup into a test fixture - let path = env::var("PATH").unwrap(); - - println!("path: {}", path); - - // TODO: how should we handle logs in this? - // TODO: option for super verbose logs - std::env::set_var("RUST_LOG", "info,web3_proxy=debug"); - - let _ = env_logger::builder().is_test(true).try_init(); - - let anvil = Anvil::new().spawn(); - - println!("Anvil running at `{}`", anvil.endpoint()); - - let anvil_provider = Provider::::try_from(anvil.endpoint()).unwrap(); - - // mine a block because my code doesn't like being on block 0 - // TODO: make block 0 okay? is it okay now? - let _: U256 = anvil_provider - .request("evm_mine", None::<()>) - .await - .unwrap(); - - // make a test CliConfig - let cli_config = CliConfig { - port: 0, - prometheus_port: 0, - workers: 4, - config: "./does/not/exist/test.toml".to_string(), - cookie_key_filename: "./does/not/exist/development_cookie_key".to_string(), - }; - - // make a test TopConfig - // TODO: load TopConfig from a file? CliConfig could have `cli_config.load_top_config`. would need to inject our endpoint ports - let top_config = TopConfig { - app: AppConfig { - chain_id: 31337, - default_user_max_requests_per_period: Some(6_000_000), - min_sum_soft_limit: 1, - min_synced_rpcs: 1, - public_requests_per_period: Some(1_000_000), - response_cache_max_bytes: 10_usize.pow(7), - redirect_public_url: Some("example.com/".to_string()), - redirect_rpc_key_url: Some("example.com/{{rpc_key_id}}".to_string()), - ..Default::default() - }, - balanced_rpcs: HashMap::from([ - ( - "anvil".to_string(), - Web3ConnectionConfig { - disabled: false, - display_name: None, - url: anvil.endpoint(), - block_data_limit: None, - soft_limit: 100, - hard_limit: None, - tier: 0, - subscribe_txs: Some(false), - extra: Default::default(), - }, - ), - ( - "anvil_ws".to_string(), - Web3ConnectionConfig { - disabled: false, - display_name: None, - url: anvil.ws_endpoint(), - block_data_limit: None, - soft_limit: 100, - hard_limit: None, - tier: 0, - subscribe_txs: Some(false), - extra: Default::default(), - }, - ), - ]), - private_rpcs: None, - extra: Default::default(), - }; - - let (shutdown_sender, _) = broadcast::channel(1); - - // spawn another thread for running the app - // TODO: allow launching into the local tokio runtime instead of creating a new one? - let handle = { - let shutdown_sender = shutdown_sender.clone(); - - thread::spawn(move || run(shutdown_sender, cli_config, top_config)) - }; - - // TODO: do something to the node. query latest block, mine another block, query again - let proxy_provider = Provider::::try_from(anvil.endpoint()).unwrap(); - - let anvil_result = anvil_provider - .request::<_, Option>("eth_getBlockByNumber", ("latest", true)) - .await - .unwrap() - .unwrap(); - let proxy_result = proxy_provider - .request::<_, Option>("eth_getBlockByNumber", ("latest", true)) - .await - .unwrap() - .unwrap(); - - assert_eq!(anvil_result, proxy_result); - - let first_block_num = anvil_result.number.unwrap(); - - let _: U256 = anvil_provider - .request("evm_mine", None::<()>) - .await - .unwrap(); - - let anvil_result = anvil_provider - .request::<_, Option>("eth_getBlockByNumber", ("latest", true)) - .await - .unwrap() - .unwrap(); - let proxy_result = proxy_provider - .request::<_, Option>("eth_getBlockByNumber", ("latest", true)) - .await - .unwrap() - .unwrap(); - - assert_eq!(anvil_result, proxy_result); - - let second_block_num = anvil_result.number.unwrap(); - - assert_eq!(first_block_num, second_block_num - 1); - - // tell the test app to shut down - shutdown_sender.send(()).unwrap(); - - println!("waiting for shutdown..."); - // TODO: panic if a timeout is reached - handle.join().unwrap().unwrap(); - } -} diff --git a/web3_proxy/src/bin/web3_proxy_cli/daemon.rs b/web3_proxy/src/bin/web3_proxy_cli/daemon.rs new file mode 100644 index 00000000..09998ea4 --- /dev/null +++ b/web3_proxy/src/bin/web3_proxy_cli/daemon.rs @@ -0,0 +1,305 @@ +#![forbid(unsafe_code)] + +use argh::FromArgs; +use futures::StreamExt; +use log::{error, info, warn}; +use num::Zero; +use tokio::sync::broadcast; +use web3_proxy::app::{flatten_handle, flatten_handles, Web3ProxyApp}; +use web3_proxy::config::TopConfig; +use web3_proxy::{frontend, metrics_frontend}; + +/// count requests +#[derive(FromArgs, PartialEq, Debug, Eq)] +#[argh(subcommand, name = "proxyd")] +pub struct ProxydSubCommand { + /// path to a toml of rpc servers + /// what port the proxy should listen on + #[argh(option, default = "8544")] + pub port: u16, + + /// what port the proxy should expose prometheus stats on + #[argh(option, default = "8543")] + pub prometheus_port: u16, +} + +impl ProxydSubCommand { + pub async fn main(self, top_config: TopConfig, num_workers: usize) -> anyhow::Result<()> { + let (shutdown_sender, _) = broadcast::channel(1); + + run( + top_config, + self.port, + self.prometheus_port, + num_workers, + shutdown_sender, + ) + .await + } +} + +async fn run( + top_config: TopConfig, + frontend_port: u16, + prometheus_port: u16, + num_workers: usize, + shutdown_sender: broadcast::Sender<()>, +) -> anyhow::Result<()> { + // tokio has code for catching ctrl+c so we use that + // this shutdown sender is currently only used in tests, but we might make a /shutdown endpoint or something + // we do not need this receiver. new receivers are made by `shutdown_sender.subscribe()` + + let app_frontend_port = frontend_port; + let app_prometheus_port = prometheus_port; + + // start the main app + let mut spawned_app = + Web3ProxyApp::spawn(top_config, num_workers, shutdown_sender.subscribe()).await?; + + let frontend_handle = tokio::spawn(frontend::serve(app_frontend_port, spawned_app.app.clone())); + + // TODO: should we put this in a dedicated thread? + let prometheus_handle = tokio::spawn(metrics_frontend::serve( + spawned_app.app.clone(), + app_prometheus_port, + )); + + let mut shutdown_receiver = shutdown_sender.subscribe(); + + // if everything is working, these should both run forever + tokio::select! { + x = flatten_handles(spawned_app.app_handles) => { + match x { + Ok(_) => info!("app_handle exited"), + Err(e) => { + return Err(e); + } + } + } + x = flatten_handle(frontend_handle) => { + match x { + Ok(_) => info!("frontend exited"), + Err(e) => { + return Err(e); + } + } + } + x = flatten_handle(prometheus_handle) => { + match x { + Ok(_) => info!("prometheus exited"), + Err(e) => { + return Err(e); + } + } + } + x = tokio::signal::ctrl_c() => { + match x { + Ok(_) => info!("quiting from ctrl-c"), + Err(e) => { + return Err(e.into()); + } + } + } + x = shutdown_receiver.recv() => { + match x { + Ok(_) => info!("quiting from shutdown receiver"), + Err(e) => { + return Err(e.into()); + } + } + } + }; + + // one of the handles stopped. send a value so the others know to shut down + if let Err(err) = shutdown_sender.send(()) { + warn!("shutdown sender err={:?}", err); + }; + + // wait for things like saving stats to the database to complete + info!("waiting on important background tasks"); + let mut background_errors = 0; + while let Some(x) = spawned_app.background_handles.next().await { + match x { + Err(e) => { + error!("{:?}", e); + background_errors += 1; + } + Ok(Err(e)) => { + error!("{:?}", e); + background_errors += 1; + } + Ok(Ok(_)) => continue, + } + } + + if background_errors.is_zero() { + info!("finished"); + Ok(()) + } else { + // TODO: collect instead? + Err(anyhow::anyhow!("finished with errors!")) + } +} + +#[cfg(test)] +mod tests { + use ethers::{ + prelude::{Http, Provider, U256}, + utils::Anvil, + }; + use hashbrown::HashMap; + use std::env; + + use web3_proxy::{ + config::{AppConfig, Web3ConnectionConfig}, + rpcs::blockchain::ArcBlock, + }; + + use super::*; + + #[tokio::test] + async fn it_works() { + // TODO: move basic setup into a test fixture + let path = env::var("PATH").unwrap(); + + println!("path: {}", path); + + // TODO: how should we handle logs in this? + // TODO: option for super verbose logs + std::env::set_var("RUST_LOG", "info,web3_proxy=debug"); + + let _ = env_logger::builder().is_test(true).try_init(); + + let anvil = Anvil::new().spawn(); + + println!("Anvil running at `{}`", anvil.endpoint()); + + let anvil_provider = Provider::::try_from(anvil.endpoint()).unwrap(); + + // mine a block because my code doesn't like being on block 0 + // TODO: make block 0 okay? is it okay now? + let _: U256 = anvil_provider + .request("evm_mine", None::<()>) + .await + .unwrap(); + + // make a test TopConfig + // TODO: load TopConfig from a file? CliConfig could have `cli_config.load_top_config`. would need to inject our endpoint ports + let top_config = TopConfig { + app: AppConfig { + chain_id: 31337, + default_user_max_requests_per_period: Some(6_000_000), + min_sum_soft_limit: 1, + min_synced_rpcs: 1, + public_requests_per_period: Some(1_000_000), + response_cache_max_bytes: 10_usize.pow(7), + redirect_public_url: Some("example.com/".to_string()), + redirect_rpc_key_url: Some("example.com/{{rpc_key_id}}".to_string()), + ..Default::default() + }, + balanced_rpcs: HashMap::from([ + ( + "anvil".to_string(), + Web3ConnectionConfig { + disabled: false, + display_name: None, + url: anvil.endpoint(), + backup: None, + block_data_limit: None, + soft_limit: 100, + hard_limit: None, + tier: 0, + subscribe_txs: Some(false), + extra: Default::default(), + }, + ), + ( + "anvil_ws".to_string(), + Web3ConnectionConfig { + disabled: false, + display_name: None, + url: anvil.ws_endpoint(), + backup: None, + block_data_limit: None, + soft_limit: 100, + hard_limit: None, + tier: 0, + subscribe_txs: Some(false), + extra: Default::default(), + }, + ), + ]), + private_rpcs: None, + extra: Default::default(), + }; + + let (shutdown_sender, _) = broadcast::channel(1); + + // spawn another thread for running the app + // TODO: allow launching into the local tokio runtime instead of creating a new one? + let handle = { + let shutdown_sender = shutdown_sender.clone(); + + let frontend_port = 0; + let prometheus_port = 0; + + tokio::spawn(async move { + run( + top_config, + frontend_port, + prometheus_port, + 2, + shutdown_sender, + ) + .await + }) + }; + + // TODO: do something to the node. query latest block, mine another block, query again + let proxy_provider = Provider::::try_from(anvil.endpoint()).unwrap(); + + let anvil_result = anvil_provider + .request::<_, Option>("eth_getBlockByNumber", ("latest", true)) + .await + .unwrap() + .unwrap(); + let proxy_result = proxy_provider + .request::<_, Option>("eth_getBlockByNumber", ("latest", true)) + .await + .unwrap() + .unwrap(); + + assert_eq!(anvil_result, proxy_result); + + let first_block_num = anvil_result.number.unwrap(); + + let _: U256 = anvil_provider + .request("evm_mine", None::<()>) + .await + .unwrap(); + + let anvil_result = anvil_provider + .request::<_, Option>("eth_getBlockByNumber", ("latest", true)) + .await + .unwrap() + .unwrap(); + let proxy_result = proxy_provider + .request::<_, Option>("eth_getBlockByNumber", ("latest", true)) + .await + .unwrap() + .unwrap(); + + assert_eq!(anvil_result, proxy_result); + + let second_block_num = anvil_result.number.unwrap(); + + assert_eq!(first_block_num, second_block_num - 1); + + // tell the test app to shut down + shutdown_sender.send(()).unwrap(); + + println!("waiting for shutdown..."); + // TODO: panic if a timeout is reached + handle.await.unwrap().unwrap(); + } +} diff --git a/web3_proxy/src/bin/web3_proxy_cli/main.rs b/web3_proxy/src/bin/web3_proxy_cli/main.rs index 85dd901b..a99ea54d 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/main.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/main.rs @@ -5,6 +5,7 @@ mod change_user_tier_by_key; mod check_config; mod count_users; mod create_user; +mod daemon; mod drop_migration_lock; mod list_user_tier; mod rpc_accounting; @@ -13,27 +14,41 @@ mod transfer_key; mod user_export; mod user_import; +use anyhow::Context; use argh::FromArgs; use log::{info, warn}; -use std::fs; +use std::{ + fs, + path::Path, + sync::atomic::{self, AtomicUsize}, +}; +use tokio::runtime; use web3_proxy::{ app::{get_db, get_migrated_db, APP_USER_AGENT}, config::TopConfig, }; +#[cfg(feature = "deadlock")] +use parking_lot::deadlock; +#[cfg(feature = "deadlock")] +use std::thread; +#[cfg(feature = "deadlock")] +use tokio::time::Duration; + #[derive(Debug, FromArgs)] /// Command line interface for admins to interact with web3_proxy -pub struct CliConfig { - /// path to the application config (optional). +pub struct Web3ProxyCli { + /// path to the application config (only required for some commands; defaults to dev config). #[argh(option)] pub config: Option, - /// if no config, what database the client should connect to. Defaults to dev db - #[argh( - option, - default = "\"mysql://root:dev_web3_proxy@127.0.0.1:13306/dev_web3_proxy\".to_string()" - )] - pub db_url: String, + /// number of worker threads. Defaults to the number of logical processors + #[argh(option, default = "0")] + pub workers: usize, + + /// if no config, what database the client should connect to (only required for some commands; Defaults to dev db) + #[argh(option)] + pub db_url: Option, /// if no config, what sentry url should the client should connect to #[argh(option)] @@ -55,6 +70,7 @@ enum SubCommand { CountUsers(count_users::CountUsersSubCommand), CreateUser(create_user::CreateUserSubCommand), DropMigrationLock(drop_migration_lock::DropMigrationLockSubCommand), + Proxyd(daemon::ProxydSubCommand), RpcAccounting(rpc_accounting::RpcAccountingSubCommand), Sentryd(sentryd::SentrydSubCommand), TransferKey(transfer_key::TransferKeySubCommand), @@ -65,32 +81,65 @@ enum SubCommand { // TODO: sub command to change a user's tier } -#[tokio::main] -async fn main() -> anyhow::Result<()> { +fn main() -> anyhow::Result<()> { + #[cfg(feature = "deadlock")] + { + // spawn a thread for deadlock detection + thread::spawn(move || loop { + thread::sleep(Duration::from_secs(10)); + let deadlocks = deadlock::check_deadlock(); + if deadlocks.is_empty() { + continue; + } + + println!("{} deadlocks detected", deadlocks.len()); + for (i, threads) in deadlocks.iter().enumerate() { + println!("Deadlock #{}", i); + for t in threads { + println!("Thread Id {:#?}", t.thread_id()); + println!("{:#?}", t.backtrace()); + } + } + }); + } + // if RUST_LOG isn't set, configure a default // TODO: is there a better way to do this? let rust_log = match std::env::var("RUST_LOG") { Ok(x) => x, - Err(_) => "info,web3_proxy=debug,web3_proxy_cli=debug".to_string(), + Err(_) => "info,ethers=debug,redis_rate_limit=debug,web3_proxy=debug,web3_proxy_cli=debug" + .to_string(), }; // this probably won't matter for us in docker, but better safe than sorry fdlimit::raise_fd_limit(); - let mut cli_config: CliConfig = argh::from_env(); + let mut cli_config: Web3ProxyCli = argh::from_env(); + + if cli_config.config.is_none() && cli_config.db_url.is_none() { + info!("defaulting to development config"); + cli_config.config = Some("./config/development.toml".to_string()); + } + + let top_config = if let Some(top_config_path) = cli_config.config.clone() { + let top_config_path = Path::new(&top_config_path) + .canonicalize() + .context(format!("checking for config at {}", top_config_path))?; - let _top_config = if let Some(top_config_path) = cli_config.config.clone() { let top_config: String = fs::read_to_string(top_config_path)?; let top_config: TopConfig = toml::from_str(&top_config)?; - if let Some(db_url) = top_config.app.db_url.clone() { - cli_config.db_url = db_url; + if cli_config.db_url.is_none() { + cli_config.db_url = top_config.app.db_url.clone(); } if let Some(sentry_url) = top_config.app.sentry_url.clone() { cli_config.sentry_url = Some(sentry_url); } + // TODO: this doesn't seem to do anything + proctitle::set_title(format!("web3_proxy-{}", top_config.app.chain_id)); + Some(top_config) } else { None @@ -128,70 +177,141 @@ async fn main() -> anyhow::Result<()> { info!("{}", APP_USER_AGENT); - match cli_config.sub_command { - SubCommand::ChangeUserAddress(x) => { - let db_conn = get_db(cli_config.db_url, 1, 1).await?; + // set up tokio's async runtime + let mut rt_builder = runtime::Builder::new_multi_thread(); - x.main(&db_conn).await - } - SubCommand::ChangeUserTier(x) => { - let db_conn = get_db(cli_config.db_url, 1, 1).await?; + if let Some(top_config) = top_config.as_ref() { + let chain_id = top_config.app.chain_id; - x.main(&db_conn).await - } - SubCommand::ChangeUserTierByAddress(x) => { - let db_conn = get_db(cli_config.db_url, 1, 1).await?; - - x.main(&db_conn).await - } - SubCommand::ChangeUserTierByKey(x) => { - let db_conn = get_db(cli_config.db_url, 1, 1).await?; - - x.main(&db_conn).await - } - SubCommand::CheckConfig(x) => x.main().await, - SubCommand::CreateUser(x) => { - let db_conn = get_migrated_db(cli_config.db_url, 1, 1).await?; - - x.main(&db_conn).await - } - SubCommand::CountUsers(x) => { - let db_conn = get_db(cli_config.db_url, 1, 1).await?; - - x.main(&db_conn).await - } - SubCommand::DropMigrationLock(x) => { - // very intentionally, do NOT run migrations here - let db_conn = get_db(cli_config.db_url, 1, 1).await?; - - x.main(&db_conn).await - } - SubCommand::Sentryd(x) => { - if cli_config.sentry_url.is_none() { - warn!("sentry_url is not set! Logs will only show in this console"); - } - - x.main().await - } - SubCommand::RpcAccounting(x) => { - let db_conn = get_migrated_db(cli_config.db_url, 1, 1).await?; - - x.main(&db_conn).await - } - SubCommand::TransferKey(x) => { - let db_conn = get_db(cli_config.db_url, 1, 1).await?; - - x.main(&db_conn).await - } - SubCommand::UserExport(x) => { - let db_conn = get_migrated_db(cli_config.db_url, 1, 1).await?; - - x.main(&db_conn).await - } - SubCommand::UserImport(x) => { - let db_conn = get_migrated_db(cli_config.db_url, 1, 1).await?; - - x.main(&db_conn).await - } + rt_builder.enable_all().thread_name_fn(move || { + static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0); + // TODO: what ordering? i think we want seqcst so that these all happen in order, but that might be stricter than we really need + let worker_id = ATOMIC_ID.fetch_add(1, atomic::Ordering::SeqCst); + // TODO: i think these max at 15 characters + format!("web3-{}-{}", chain_id, worker_id) + }); } + + // start tokio's async runtime + let rt = rt_builder.build()?; + + let num_workers = rt.metrics().num_workers(); + info!("num_workers: {}", num_workers); + + rt.block_on(async { + match cli_config.sub_command { + SubCommand::ChangeUserAddress(x) => { + let db_url = cli_config + .db_url + .expect("'--config' (with a db) or '--db-url' is required to run proxyd"); + + let db_conn = get_db(db_url, 1, 1).await?; + + x.main(&db_conn).await + } + SubCommand::ChangeUserTier(x) => { + let db_url = cli_config + .db_url + .expect("'--config' (with a db) or '--db-url' is required to run proxyd"); + + let db_conn = get_db(db_url, 1, 1).await?; + + x.main(&db_conn).await + } + SubCommand::ChangeUserTierByAddress(x) => { + let db_url = cli_config + .db_url + .expect("'--config' (with a db) or '--db-url' is required to run proxyd"); + + let db_conn = get_db(db_url, 1, 1).await?; + + x.main(&db_conn).await + } + SubCommand::ChangeUserTierByKey(x) => { + let db_url = cli_config + .db_url + .expect("'--config' (with a db) or '--db-url' is required to run proxyd"); + + let db_conn = get_db(db_url, 1, 1).await?; + + x.main(&db_conn).await + } + SubCommand::CheckConfig(x) => x.main().await, + SubCommand::CreateUser(x) => { + let db_url = cli_config + .db_url + .expect("'--config' (with a db) or '--db-url' is required to run proxyd"); + + let db_conn = get_migrated_db(db_url, 1, 1).await?; + + x.main(&db_conn).await + } + SubCommand::CountUsers(x) => { + let db_url = cli_config + .db_url + .expect("'--config' (with a db) or '--db-url' is required to run proxyd"); + + let db_conn = get_db(db_url, 1, 1).await?; + + x.main(&db_conn).await + } + SubCommand::Proxyd(x) => { + let top_config = top_config.expect("--config is required to run proxyd"); + + x.main(top_config, num_workers).await + } + SubCommand::DropMigrationLock(x) => { + let db_url = cli_config + .db_url + .expect("'--config' (with a db) or '--db-url' is required to run proxyd"); + + // very intentionally, do NOT run migrations here + let db_conn = get_db(db_url, 1, 1).await?; + + x.main(&db_conn).await + } + SubCommand::Sentryd(x) => { + if cli_config.sentry_url.is_none() { + warn!("sentry_url is not set! Logs will only show in this console"); + } + + x.main().await + } + SubCommand::RpcAccounting(x) => { + let db_url = cli_config + .db_url + .expect("'--config' (with a db) or '--db-url' is required to run proxyd"); + + let db_conn = get_migrated_db(db_url, 1, 1).await?; + + x.main(&db_conn).await + } + SubCommand::TransferKey(x) => { + let db_url = cli_config + .db_url + .expect("'--config' (with a db) or '--db-url' is required to run proxyd"); + let db_conn = get_db(db_url, 1, 1).await?; + + x.main(&db_conn).await + } + SubCommand::UserExport(x) => { + let db_url = cli_config + .db_url + .expect("'--config' (with a db) or '--db-url' is required to run proxyd"); + + let db_conn = get_migrated_db(db_url, 1, 1).await?; + + x.main(&db_conn).await + } + SubCommand::UserImport(x) => { + let db_url = cli_config + .db_url + .expect("'--config' (with a db) or '--db-url' is required to run proxyd"); + + let db_conn = get_migrated_db(db_url, 1, 1).await?; + + x.main(&db_conn).await + } + } + }) }