diff --git a/web3_proxy/src/bin/web3_proxy_cli/main.rs b/web3_proxy/src/bin/web3_proxy_cli.rs similarity index 90% rename from web3_proxy/src/bin/web3_proxy_cli/main.rs rename to web3_proxy/src/bin/web3_proxy_cli.rs index 16d21f1a..24dc3f5e 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/main.rs +++ b/web3_proxy/src/bin/web3_proxy_cli.rs @@ -1,25 +1,3 @@ -mod change_admin_status; -mod change_user_address; -mod change_user_tier; -mod change_user_tier_by_address; -mod change_user_tier_by_key; -mod check_config; -mod count_users; -mod create_key; -mod create_user; -mod drop_migration_lock; -mod list_user_tier; -mod migrate_stats_to_v2; -mod pagerduty; -mod popularity_contest; -mod proxyd; -mod rpc_accounting; -mod search_kafka; -mod sentryd; -mod transfer_key; -mod user_export; -mod user_import; - use anyhow::Context; use argh::FromArgs; use ethers::types::U256; @@ -36,6 +14,7 @@ use tokio::runtime; use tracing::{info, warn}; use tracing_subscriber::{prelude::*, EnvFilter}; use web3_proxy::pagerduty::panic_handler; +use web3_proxy::sub_commands; use web3_proxy::{ app::APP_USER_AGENT, config::TopConfig, @@ -79,26 +58,26 @@ pub struct Web3ProxyCli { #[derive(FromArgs, PartialEq, Debug)] #[argh(subcommand)] enum SubCommand { - ChangeAdminStatus(change_admin_status::ChangeAdminStatusSubCommand), - ChangeUserAddress(change_user_address::ChangeUserAddressSubCommand), - ChangeUserTier(change_user_tier::ChangeUserTierSubCommand), - ChangeUserTierByAddress(change_user_tier_by_address::ChangeUserTierByAddressSubCommand), - ChangeUserTierByKey(change_user_tier_by_key::ChangeUserTierByKeySubCommand), - CheckConfig(check_config::CheckConfigSubCommand), - CountUsers(count_users::CountUsersSubCommand), - CreateKey(create_key::CreateKeySubCommand), - CreateUser(create_user::CreateUserSubCommand), - DropMigrationLock(drop_migration_lock::DropMigrationLockSubCommand), - MigrateStatsToV2(migrate_stats_to_v2::MigrateStatsToV2), - Pagerduty(pagerduty::PagerdutySubCommand), - PopularityContest(popularity_contest::PopularityContestSubCommand), - Proxyd(proxyd::ProxydSubCommand), - RpcAccounting(rpc_accounting::RpcAccountingSubCommand), - SearchKafka(search_kafka::SearchKafkaSubCommand), - Sentryd(sentryd::SentrydSubCommand), - TransferKey(transfer_key::TransferKeySubCommand), - UserExport(user_export::UserExportSubCommand), - UserImport(user_import::UserImportSubCommand), + ChangeAdminStatus(sub_commands::ChangeAdminStatusSubCommand), + ChangeUserAddress(sub_commands::ChangeUserAddressSubCommand), + ChangeUserTier(sub_commands::ChangeUserTierSubCommand), + ChangeUserTierByAddress(sub_commands::ChangeUserTierByAddressSubCommand), + ChangeUserTierByKey(sub_commands::ChangeUserTierByKeySubCommand), + CheckConfig(sub_commands::CheckConfigSubCommand), + CountUsers(sub_commands::CountUsersSubCommand), + CreateKey(sub_commands::CreateKeySubCommand), + CreateUser(sub_commands::CreateUserSubCommand), + DropMigrationLock(sub_commands::DropMigrationLockSubCommand), + MigrateStatsToV2(sub_commands::MigrateStatsToV2SubCommand), + Pagerduty(sub_commands::PagerdutySubCommand), + PopularityContest(sub_commands::PopularityContestSubCommand), + Proxyd(sub_commands::ProxydSubCommand), + RpcAccounting(sub_commands::RpcAccountingSubCommand), + SearchKafka(sub_commands::SearchKafkaSubCommand), + Sentryd(sub_commands::SentrydSubCommand), + TransferKey(sub_commands::TransferKeySubCommand), + UserExport(sub_commands::UserExportSubCommand), + UserImport(sub_commands::UserImportSubCommand), // TODO: sub command to downgrade migrations? sea-orm has this but doing downgrades here would be easier+safer // TODO: sub command to add new api keys to an existing user? // TODO: sub command to change a user's tier diff --git a/web3_proxy/src/bin/web3_proxy_cli/list_user_tier.rs b/web3_proxy/src/bin/web3_proxy_cli/list_user_tier.rs deleted file mode 100644 index 0228f6b1..00000000 --- a/web3_proxy/src/bin/web3_proxy_cli/list_user_tier.rs +++ /dev/null @@ -1 +0,0 @@ -//! TODO: write this diff --git a/web3_proxy/src/bin/web3_proxy_cli/proxyd.rs b/web3_proxy/src/bin/web3_proxy_cli/proxyd.rs deleted file mode 100644 index beb76bd3..00000000 --- a/web3_proxy/src/bin/web3_proxy_cli/proxyd.rs +++ /dev/null @@ -1,474 +0,0 @@ -#![forbid(unsafe_code)] - -use argh::FromArgs; -use futures::StreamExt; -use num::Zero; -use std::path::PathBuf; -use std::sync::atomic::AtomicU16; -use std::sync::Arc; -use std::time::Duration; -use std::{fs, thread}; -use tokio::sync::broadcast; -use tracing::{error, info, trace, warn}; -use web3_proxy::app::{flatten_handle, flatten_handles, Web3ProxyApp}; -use web3_proxy::config::TopConfig; -use web3_proxy::{frontend, prometheus}; - -/// start the main proxy daemon -#[derive(FromArgs, PartialEq, Debug, Eq)] -#[argh(subcommand, name = "proxyd")] -pub struct ProxydSubCommand { - /// path to a toml of rpc servers - /// what port the proxy should listen on - #[argh(option, default = "8544")] - pub port: u16, - - /// what port the proxy should expose prometheus stats on - #[argh(option, default = "8543")] - pub prometheus_port: u16, -} - -impl ProxydSubCommand { - pub async fn main( - self, - top_config: TopConfig, - top_config_path: PathBuf, - num_workers: usize, - ) -> anyhow::Result<()> { - let (shutdown_sender, _) = broadcast::channel(1); - // TODO: i think there is a small race. if config_path changes - - let frontend_port = Arc::new(self.port.into()); - let prometheus_port = Arc::new(self.prometheus_port.into()); - - run( - top_config, - Some(top_config_path), - frontend_port, - prometheus_port, - num_workers, - shutdown_sender, - ) - .await - } -} - -async fn run( - top_config: TopConfig, - top_config_path: Option, - frontend_port: Arc, - prometheus_port: Arc, - num_workers: usize, - frontend_shutdown_sender: broadcast::Sender<()>, -) -> anyhow::Result<()> { - // tokio has code for catching ctrl+c so we use that - // this shutdown sender is currently only used in tests, but we might make a /shutdown endpoint or something - // we do not need this receiver. new receivers are made by `shutdown_sender.subscribe()` - - // TODO: should we use a watch or broadcast for these? - // Maybe this one ? - // let mut shutdown_receiver = shutdown_sender.subscribe(); - let (app_shutdown_sender, _app_shutdown_receiver) = broadcast::channel(1); - - let frontend_shutdown_receiver = frontend_shutdown_sender.subscribe(); - let prometheus_shutdown_receiver = app_shutdown_sender.subscribe(); - - // TODO: should we use a watch or broadcast for these? - let (frontend_shutdown_complete_sender, mut frontend_shutdown_complete_receiver) = - broadcast::channel(1); - - // start the main app - let mut spawned_app = Web3ProxyApp::spawn( - frontend_port, - prometheus_port, - top_config.clone(), - num_workers, - app_shutdown_sender.clone(), - ) - .await?; - - // start thread for watching config - if let Some(top_config_path) = top_config_path { - let config_sender = spawned_app.new_top_config; - { - let mut current_config = config_sender.borrow().clone(); - - thread::spawn(move || loop { - match fs::read_to_string(&top_config_path) { - Ok(new_top_config) => match toml::from_str::(&new_top_config) { - Ok(new_top_config) => { - if new_top_config != current_config { - // TODO: print the differences - // TODO: first run seems to always see differences. why? - info!("config @ {:?} changed", top_config_path); - config_sender.send(new_top_config.clone()).unwrap(); - current_config = new_top_config; - } - } - Err(err) => { - // TODO: panic? - error!("Unable to parse config! {:#?}", err); - } - }, - Err(err) => { - // TODO: panic? - error!("Unable to read config! {:#?}", err); - } - } - - thread::sleep(Duration::from_secs(10)); - }); - } - } - - // start the prometheus metrics port - let prometheus_handle = tokio::spawn(prometheus::serve( - spawned_app.app.clone(), - prometheus_shutdown_receiver, - )); - - info!("waiting for head block"); - loop { - spawned_app.app.head_block_receiver().changed().await?; - - if spawned_app - .app - .head_block_receiver() - .borrow_and_update() - .is_some() - { - break; - } else { - info!("no head block yet!"); - } - } - - // start the frontend port - let frontend_handle = tokio::spawn(frontend::serve( - spawned_app.app, - frontend_shutdown_receiver, - frontend_shutdown_complete_sender, - )); - - let frontend_handle = flatten_handle(frontend_handle); - - // if everything is working, these should all run forever - let mut exited_with_err = false; - let mut frontend_exited = false; - tokio::select! { - x = flatten_handles(spawned_app.app_handles) => { - match x { - Ok(_) => info!("app_handle exited"), - Err(e) => { - error!("app_handle exited: {:#?}", e); - exited_with_err = true; - } - } - } - x = frontend_handle => { - frontend_exited = true; - match x { - Ok(_) => info!("frontend exited"), - Err(e) => { - error!("frontend exited: {:#?}", e); - exited_with_err = true; - } - } - } - x = flatten_handle(prometheus_handle) => { - match x { - Ok(_) => info!("prometheus exited"), - Err(e) => { - error!("prometheus exited: {:#?}", e); - exited_with_err = true; - } - } - } - x = tokio::signal::ctrl_c() => { - // TODO: unix terminate signal, too - match x { - Ok(_) => info!("quiting from ctrl-c"), - Err(e) => { - // TODO: i don't think this is possible - error!("error quiting from ctrl-c: {:#?}", e); - exited_with_err = true; - } - } - } - // TODO: This seems to have been removed on the main branch - // TODO: how can we properly watch background handles here? this returns None immediatly and the app exits. i think the bug is somewhere else though - x = spawned_app.background_handles.next() => { - match x { - Some(Ok(_)) => info!("quiting from background handles"), - Some(Err(e)) => { - error!("quiting from background handle error: {:#?}", e); - exited_with_err = true; - } - None => { - // TODO: is this an error? - warn!("background handles exited"); - } - } - } - }; - - // TODO: This is also not there on the main branch - // if a future above completed, make sure the frontend knows to start turning off - if !frontend_exited { - if let Err(err) = frontend_shutdown_sender.send(()) { - // TODO: this is actually expected if the frontend is already shut down - warn!(?err, "shutdown sender"); - }; - } - - // TODO: Also not there on main branch - // TODO: wait until the frontend completes - if let Err(err) = frontend_shutdown_complete_receiver.recv().await { - warn!(?err, "shutdown completition"); - } else { - info!("frontend exited gracefully"); - } - - // now that the frontend is complete, tell all the other futures to finish - if let Err(err) = app_shutdown_sender.send(()) { - warn!(?err, "backend sender"); - }; - - info!( - "waiting on {} important background tasks", - spawned_app.background_handles.len() - ); - let mut background_errors = 0; - while let Some(x) = spawned_app.background_handles.next().await { - match x { - Err(e) => { - error!("{:?}", e); - background_errors += 1; - } - Ok(Err(e)) => { - error!("{:?}", e); - background_errors += 1; - } - Ok(Ok(_)) => { - // TODO: how can we know which handle exited? - trace!("a background handle exited"); - continue; - } - } - } - - if background_errors.is_zero() && !exited_with_err { - info!("finished"); - Ok(()) - } else { - // TODO: collect all the errors here instead? - Err(anyhow::anyhow!("finished with errors!")) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use ethers::{ - prelude::{Http, Provider, U256}, - types::Address, - utils::{Anvil, AnvilInstance}, - }; - use hashbrown::HashMap; - use parking_lot::Mutex; - use std::{ - env, - str::FromStr, - sync::atomic::{AtomicU16, Ordering}, - }; - use tokio::{ - sync::broadcast::error::SendError, - task::JoinHandle, - time::{sleep, Instant}, - }; - use web3_proxy::{ - config::{AppConfig, Web3RpcConfig}, - rpcs::blockchain::ArcBlock, - }; - - // TODO: put it in a thread? - struct TestApp { - _anvil: AnvilInstance, - handle: Mutex>>>, - anvil_provider: Provider, - proxy_provider: Provider, - shutdown_sender: broadcast::Sender<()>, - } - - impl TestApp { - async fn spawn() -> Self { - // TODO: move basic setup into a test fixture - let path = env::var("PATH").unwrap(); - - info!("path: {}", path); - - // TODO: configurable rpc and block - let anvil = Anvil::new() - // .fork("https://polygon.llamarpc.com@44300000") - .spawn(); - - info!("Anvil running at `{}`", anvil.endpoint()); - - let anvil_provider = Provider::::try_from(anvil.endpoint()).unwrap(); - - // make a test TopConfig - // TODO: load TopConfig from a file? CliConfig could have `cli_config.load_top_config`. would need to inject our endpoint ports - let top_config = TopConfig { - app: AppConfig { - chain_id: 31337, - default_user_max_requests_per_period: Some(6_000_000), - deposit_factory_contract: Address::from_str( - "4e3BC2054788De923A04936C6ADdB99A05B0Ea36", - ) - .ok(), - min_sum_soft_limit: 1, - min_synced_rpcs: 1, - public_requests_per_period: Some(1_000_000), - response_cache_max_bytes: 10_u64.pow(7), - ..Default::default() - }, - balanced_rpcs: HashMap::from([( - "anvil_both".to_string(), - Web3RpcConfig { - http_url: Some(anvil.endpoint()), - ws_url: Some(anvil.ws_endpoint()), - ..Default::default() - }, - )]), - private_rpcs: None, - bundler_4337_rpcs: None, - extra: Default::default(), - }; - - let (shutdown_sender, _shutdown_receiver) = broadcast::channel(1); - - let frontend_port_arc = Arc::new(AtomicU16::new(0)); - let prometheus_port_arc = Arc::new(AtomicU16::new(0)); - - // spawn another thread for running the app - // TODO: allow launching into the local tokio runtime instead of creating a new one? - let handle = { - tokio::spawn(run( - top_config, - None, - frontend_port_arc.clone(), - prometheus_port_arc, - 2, - shutdown_sender.clone(), - )) - }; - - let mut frontend_port = frontend_port_arc.load(Ordering::Relaxed); - let start = Instant::now(); - while frontend_port == 0 { - if start.elapsed() > Duration::from_secs(1) { - panic!("took too long to start!"); - } - - sleep(Duration::from_millis(10)).await; - frontend_port = frontend_port_arc.load(Ordering::Relaxed); - } - - let proxy_endpoint = format!("http://127.0.0.1:{}", frontend_port); - - let proxy_provider = Provider::::try_from(proxy_endpoint).unwrap(); - - Self { - handle: Mutex::new(Some(handle)), - anvil_provider, - proxy_provider, - shutdown_sender, - _anvil: anvil, - } - } - - fn stop(&self) -> Result> { - self.shutdown_sender.send(()) - } - - async fn wait(&self) { - // TODO: lock+take feels weird, but it works - let handle = self.handle.lock().take(); - - if let Some(handle) = handle { - let _ = self.stop(); - - info!("waiting for the app to stop..."); - handle.await.unwrap().unwrap(); - } - } - } - - impl Drop for TestApp { - fn drop(&mut self) { - let _ = self.stop(); - } - } - - #[test_log::test(tokio::test)] - async fn it_works() { - let x = TestApp::spawn().await; - - let anvil_provider = &x.anvil_provider; - let proxy_provider = &x.proxy_provider; - - let anvil_result = anvil_provider - .request::<_, Option>("eth_getBlockByNumber", ("latest", false)) - .await - .unwrap() - .unwrap(); - let proxy_result = proxy_provider - .request::<_, Option>("eth_getBlockByNumber", ("latest", false)) - .await - .unwrap() - .unwrap(); - - assert_eq!(anvil_result, proxy_result); - - let first_block_num = anvil_result.number.unwrap(); - - // mine a block - let _: U256 = anvil_provider.request("evm_mine", ()).await.unwrap(); - - // make sure the block advanced - let anvil_result = anvil_provider - .request::<_, Option>("eth_getBlockByNumber", ("latest", false)) - .await - .unwrap() - .unwrap(); - - let second_block_num = anvil_result.number.unwrap(); - - assert_eq!(first_block_num, second_block_num - 1); - - let mut proxy_result; - let start = Instant::now(); - loop { - if start.elapsed() > Duration::from_secs(1) { - panic!("took too long to sync!"); - } - - proxy_result = proxy_provider - .request::<_, Option>("eth_getBlockByNumber", ("latest", false)) - .await - .unwrap(); - - if let Some(ref proxy_result) = proxy_result { - if proxy_result.number != Some(first_block_num) { - break; - } - } - - sleep(Duration::from_millis(10)).await; - } - - assert_eq!(anvil_result, proxy_result.unwrap()); - - x.wait().await; - } -} diff --git a/web3_proxy/src/lib.rs b/web3_proxy/src/lib.rs index 56c5da5e..aee5e56f 100644 --- a/web3_proxy/src/lib.rs +++ b/web3_proxy/src/lib.rs @@ -17,6 +17,7 @@ pub mod relational_db; pub mod response_cache; pub mod rpcs; pub mod stats; +pub mod sub_commands; pub mod user_token; use serde::Deserialize; diff --git a/web3_proxy/src/bin/web3_proxy_cli/change_admin_status.rs b/web3_proxy/src/sub_commands/change_admin_status.rs similarity index 100% rename from web3_proxy/src/bin/web3_proxy_cli/change_admin_status.rs rename to web3_proxy/src/sub_commands/change_admin_status.rs diff --git a/web3_proxy/src/bin/web3_proxy_cli/change_user_address.rs b/web3_proxy/src/sub_commands/change_user_address.rs similarity index 100% rename from web3_proxy/src/bin/web3_proxy_cli/change_user_address.rs rename to web3_proxy/src/sub_commands/change_user_address.rs diff --git a/web3_proxy/src/bin/web3_proxy_cli/change_user_tier.rs b/web3_proxy/src/sub_commands/change_user_tier.rs similarity index 100% rename from web3_proxy/src/bin/web3_proxy_cli/change_user_tier.rs rename to web3_proxy/src/sub_commands/change_user_tier.rs diff --git a/web3_proxy/src/bin/web3_proxy_cli/change_user_tier_by_address.rs b/web3_proxy/src/sub_commands/change_user_tier_by_address.rs similarity index 100% rename from web3_proxy/src/bin/web3_proxy_cli/change_user_tier_by_address.rs rename to web3_proxy/src/sub_commands/change_user_tier_by_address.rs diff --git a/web3_proxy/src/bin/web3_proxy_cli/change_user_tier_by_key.rs b/web3_proxy/src/sub_commands/change_user_tier_by_key.rs similarity index 97% rename from web3_proxy/src/bin/web3_proxy_cli/change_user_tier_by_key.rs rename to web3_proxy/src/sub_commands/change_user_tier_by_key.rs index a113e4e5..c6106aa8 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/change_user_tier_by_key.rs +++ b/web3_proxy/src/sub_commands/change_user_tier_by_key.rs @@ -1,3 +1,4 @@ +use crate::frontend::authorization::RpcSecretKey; use anyhow::Context; use argh::FromArgs; use entities::{rpc_key, user, user_tier}; @@ -8,7 +9,6 @@ use migration::sea_orm::{ use serde_json::json; use tracing::{debug, info}; use uuid::Uuid; -use web3_proxy::frontend::authorization::RpcSecretKey; /// change a user's tier. #[derive(FromArgs, PartialEq, Eq, Debug)] diff --git a/web3_proxy/src/bin/web3_proxy_cli/check_config.rs b/web3_proxy/src/sub_commands/check_config.rs similarity index 99% rename from web3_proxy/src/bin/web3_proxy_cli/check_config.rs rename to web3_proxy/src/sub_commands/check_config.rs index fcdda0c3..6e6721eb 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/check_config.rs +++ b/web3_proxy/src/sub_commands/check_config.rs @@ -1,7 +1,7 @@ +use crate::config::TopConfig; use argh::FromArgs; use std::fs; use tracing::{error, info, warn}; -use web3_proxy::config::TopConfig; #[derive(FromArgs, PartialEq, Eq, Debug)] /// Check the config for any problems. diff --git a/web3_proxy/src/bin/web3_proxy_cli/count_users.rs b/web3_proxy/src/sub_commands/count_users.rs similarity index 100% rename from web3_proxy/src/bin/web3_proxy_cli/count_users.rs rename to web3_proxy/src/sub_commands/count_users.rs diff --git a/web3_proxy/src/bin/web3_proxy_cli/create_key.rs b/web3_proxy/src/sub_commands/create_key.rs similarity index 97% rename from web3_proxy/src/bin/web3_proxy_cli/create_key.rs rename to web3_proxy/src/sub_commands/create_key.rs index 58bae44d..cfe10f6a 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/create_key.rs +++ b/web3_proxy/src/sub_commands/create_key.rs @@ -1,3 +1,4 @@ +use crate::frontend::authorization::RpcSecretKey; use anyhow::Context; use argh::FromArgs; use entities::{rpc_key, user}; @@ -6,7 +7,6 @@ use migration::sea_orm::{self, ActiveModelTrait, ColumnTrait, EntityTrait, Query use tracing::info; use ulid::Ulid; use uuid::Uuid; -use web3_proxy::frontend::authorization::RpcSecretKey; #[derive(FromArgs, PartialEq, Debug, Eq)] /// Create a new user and api key diff --git a/web3_proxy/src/bin/web3_proxy_cli/create_user.rs b/web3_proxy/src/sub_commands/create_user.rs similarity index 98% rename from web3_proxy/src/bin/web3_proxy_cli/create_user.rs rename to web3_proxy/src/sub_commands/create_user.rs index feda9005..3020d479 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/create_user.rs +++ b/web3_proxy/src/sub_commands/create_user.rs @@ -1,3 +1,4 @@ +use crate::frontend::authorization::RpcSecretKey; use anyhow::Context; use argh::FromArgs; use entities::{rpc_key, user}; @@ -6,7 +7,6 @@ use migration::sea_orm::{self, ActiveModelTrait, TransactionTrait}; use tracing::info; use ulid::Ulid; use uuid::Uuid; -use web3_proxy::frontend::authorization::RpcSecretKey; #[derive(FromArgs, PartialEq, Debug, Eq)] /// Create a new user and api key diff --git a/web3_proxy/src/bin/web3_proxy_cli/delete_user.rs b/web3_proxy/src/sub_commands/delete_user.rs similarity index 100% rename from web3_proxy/src/bin/web3_proxy_cli/delete_user.rs rename to web3_proxy/src/sub_commands/delete_user.rs diff --git a/web3_proxy/src/bin/web3_proxy_cli/drop_migration_lock.rs b/web3_proxy/src/sub_commands/drop_migration_lock.rs similarity index 90% rename from web3_proxy/src/bin/web3_proxy_cli/drop_migration_lock.rs rename to web3_proxy/src/sub_commands/drop_migration_lock.rs index a5b7996b..72bd3a6b 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/drop_migration_lock.rs +++ b/web3_proxy/src/sub_commands/drop_migration_lock.rs @@ -1,6 +1,6 @@ +use crate::relational_db::{drop_migration_lock, migrate_db}; use argh::FromArgs; use migration::sea_orm::DatabaseConnection; -use web3_proxy::relational_db::{drop_migration_lock, migrate_db}; #[derive(FromArgs, PartialEq, Debug, Eq)] /// In case of emergency, break glass. diff --git a/web3_proxy/src/bin/web3_proxy_cli/example.rs b/web3_proxy/src/sub_commands/example.rs similarity index 100% rename from web3_proxy/src/bin/web3_proxy_cli/example.rs rename to web3_proxy/src/sub_commands/example.rs diff --git a/web3_proxy/src/bin/web3_proxy_cli/list_recent_users.rs b/web3_proxy/src/sub_commands/list_recent_users.rs similarity index 100% rename from web3_proxy/src/bin/web3_proxy_cli/list_recent_users.rs rename to web3_proxy/src/sub_commands/list_recent_users.rs diff --git a/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs b/web3_proxy/src/sub_commands/migrate_stats_to_v2.rs similarity index 97% rename from web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs rename to web3_proxy/src/sub_commands/migrate_stats_to_v2.rs index 08d7b1ac..6c3a98d8 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs +++ b/web3_proxy/src/sub_commands/migrate_stats_to_v2.rs @@ -1,3 +1,8 @@ +use crate::app::BILLING_PERIOD_SECONDS; +use crate::config::TopConfig; +use crate::frontend::authorization::{Authorization, RequestMetadata, RpcSecretKey}; +use crate::rpcs::one::Web3Rpc; +use crate::stats::StatBuffer; use anyhow::{anyhow, Context}; use argh::FromArgs; use entities::{rpc_accounting, rpc_key}; @@ -15,18 +20,13 @@ use tokio::sync::broadcast; use tokio::time::Instant; use tracing::{error, info}; use ulid::Ulid; -use web3_proxy::app::BILLING_PERIOD_SECONDS; -use web3_proxy::config::TopConfig; -use web3_proxy::frontend::authorization::{Authorization, RequestMetadata, RpcSecretKey}; -use web3_proxy::rpcs::one::Web3Rpc; -use web3_proxy::stats::StatBuffer; #[derive(FromArgs, PartialEq, Eq, Debug)] /// Migrate towards influxdb and rpc_accounting_v2 from rpc_accounting #[argh(subcommand, name = "migrate_stats_to_v2")] -pub struct MigrateStatsToV2 {} +pub struct MigrateStatsToV2SubCommand {} -impl MigrateStatsToV2 { +impl MigrateStatsToV2SubCommand { pub async fn main( self, top_config: TopConfig, diff --git a/web3_proxy/src/sub_commands/mod.rs b/web3_proxy/src/sub_commands/mod.rs new file mode 100644 index 00000000..5ff80938 --- /dev/null +++ b/web3_proxy/src/sub_commands/mod.rs @@ -0,0 +1,41 @@ +mod change_admin_status; +mod change_user_address; +mod change_user_tier; +mod change_user_tier_by_address; +mod change_user_tier_by_key; +mod check_config; +mod count_users; +mod create_key; +mod create_user; +mod drop_migration_lock; +mod migrate_stats_to_v2; +mod pagerduty; +mod popularity_contest; +mod proxyd; +mod rpc_accounting; +mod search_kafka; +mod sentryd; +mod transfer_key; +mod user_export; +mod user_import; + +pub use self::change_admin_status::ChangeAdminStatusSubCommand; +pub use self::change_user_address::ChangeUserAddressSubCommand; +pub use self::change_user_tier::ChangeUserTierSubCommand; +pub use self::change_user_tier_by_address::ChangeUserTierByAddressSubCommand; +pub use self::change_user_tier_by_key::ChangeUserTierByKeySubCommand; +pub use self::check_config::CheckConfigSubCommand; +pub use self::count_users::CountUsersSubCommand; +pub use self::create_key::CreateKeySubCommand; +pub use self::create_user::CreateUserSubCommand; +pub use self::drop_migration_lock::DropMigrationLockSubCommand; +pub use self::migrate_stats_to_v2::MigrateStatsToV2SubCommand; +pub use self::pagerduty::PagerdutySubCommand; +pub use self::popularity_contest::PopularityContestSubCommand; +pub use self::proxyd::ProxydSubCommand; +pub use self::rpc_accounting::RpcAccountingSubCommand; +pub use self::search_kafka::SearchKafkaSubCommand; +pub use self::sentryd::SentrydSubCommand; +pub use self::transfer_key::TransferKeySubCommand; +pub use self::user_export::UserExportSubCommand; +pub use self::user_import::UserImportSubCommand; diff --git a/web3_proxy/src/bin/web3_proxy_cli/pagerduty.rs b/web3_proxy/src/sub_commands/pagerduty.rs similarity index 99% rename from web3_proxy/src/bin/web3_proxy_cli/pagerduty.rs rename to web3_proxy/src/sub_commands/pagerduty.rs index ca5f2710..400b7ed6 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/pagerduty.rs +++ b/web3_proxy/src/sub_commands/pagerduty.rs @@ -1,11 +1,11 @@ +use crate::{ + config::TopConfig, + pagerduty::{pagerduty_alert, pagerduty_alert_for_config}, +}; use argh::FromArgs; use pagerduty_rs::{eventsv2async::EventsV2 as PagerdutyAsyncEventsV2, types::Event}; use serde_json::json; use tracing::{error, info}; -use web3_proxy::{ - config::TopConfig, - pagerduty::{pagerduty_alert, pagerduty_alert_for_config}, -}; #[derive(FromArgs, PartialEq, Debug, Eq)] /// Quickly create a pagerduty alert diff --git a/web3_proxy/src/bin/web3_proxy_cli/popularity_contest.rs b/web3_proxy/src/sub_commands/popularity_contest.rs similarity index 100% rename from web3_proxy/src/bin/web3_proxy_cli/popularity_contest.rs rename to web3_proxy/src/sub_commands/popularity_contest.rs diff --git a/web3_proxy/src/sub_commands/proxyd.rs b/web3_proxy/src/sub_commands/proxyd.rs new file mode 100644 index 00000000..44b310d4 --- /dev/null +++ b/web3_proxy/src/sub_commands/proxyd.rs @@ -0,0 +1,268 @@ +#![forbid(unsafe_code)] + +use crate::app::{flatten_handle, flatten_handles, Web3ProxyApp}; +use crate::config::TopConfig; +use crate::{frontend, prometheus}; +use argh::FromArgs; +use futures::StreamExt; +use num::Zero; +use std::path::PathBuf; +use std::sync::atomic::AtomicU16; +use std::sync::Arc; +use std::time::Duration; +use std::{fs, thread}; +use tokio::sync::broadcast; +use tracing::{error, info, trace, warn}; + +/// start the main proxy daemon +#[derive(FromArgs, PartialEq, Debug, Eq)] +#[argh(subcommand, name = "proxyd")] +pub struct ProxydSubCommand { + /// path to a toml of rpc servers + /// what port the proxy should listen on + #[argh(option, default = "8544")] + pub port: u16, + + /// what port the proxy should expose prometheus stats on + #[argh(option, default = "8543")] + pub prometheus_port: u16, +} + +impl ProxydSubCommand { + pub async fn main( + self, + top_config: TopConfig, + top_config_path: PathBuf, + num_workers: usize, + ) -> anyhow::Result<()> { + let (shutdown_sender, _) = broadcast::channel(1); + // TODO: i think there is a small race. if config_path changes + + let frontend_port = Arc::new(self.port.into()); + let prometheus_port = Arc::new(self.prometheus_port.into()); + + Self::_main( + top_config, + Some(top_config_path), + frontend_port, + prometheus_port, + num_workers, + shutdown_sender, + ) + .await + } + + /// this shouldn't really be pub except it makes test fixtures easier + pub async fn _main( + top_config: TopConfig, + top_config_path: Option, + frontend_port: Arc, + prometheus_port: Arc, + num_workers: usize, + frontend_shutdown_sender: broadcast::Sender<()>, + ) -> anyhow::Result<()> { + // tokio has code for catching ctrl+c so we use that + // this shutdown sender is currently only used in tests, but we might make a /shutdown endpoint or something + // we do not need this receiver. new receivers are made by `shutdown_sender.subscribe()` + + // TODO: should we use a watch or broadcast for these? + // Maybe this one ? + // let mut shutdown_receiver = shutdown_sender.subscribe(); + let (app_shutdown_sender, _app_shutdown_receiver) = broadcast::channel(1); + + let frontend_shutdown_receiver = frontend_shutdown_sender.subscribe(); + let prometheus_shutdown_receiver = app_shutdown_sender.subscribe(); + + // TODO: should we use a watch or broadcast for these? + let (frontend_shutdown_complete_sender, mut frontend_shutdown_complete_receiver) = + broadcast::channel(1); + + // start the main app + let mut spawned_app = Web3ProxyApp::spawn( + frontend_port, + prometheus_port, + top_config.clone(), + num_workers, + app_shutdown_sender.clone(), + ) + .await?; + + // start thread for watching config + if let Some(top_config_path) = top_config_path { + let config_sender = spawned_app.new_top_config; + { + let mut current_config = config_sender.borrow().clone(); + + thread::spawn(move || loop { + match fs::read_to_string(&top_config_path) { + Ok(new_top_config) => match toml::from_str::(&new_top_config) { + Ok(new_top_config) => { + if new_top_config != current_config { + // TODO: print the differences + // TODO: first run seems to always see differences. why? + info!("config @ {:?} changed", top_config_path); + config_sender.send(new_top_config.clone()).unwrap(); + current_config = new_top_config; + } + } + Err(err) => { + // TODO: panic? + error!("Unable to parse config! {:#?}", err); + } + }, + Err(err) => { + // TODO: panic? + error!("Unable to read config! {:#?}", err); + } + } + + thread::sleep(Duration::from_secs(10)); + }); + } + } + + // start the prometheus metrics port + let prometheus_handle = tokio::spawn(prometheus::serve( + spawned_app.app.clone(), + prometheus_shutdown_receiver, + )); + + info!("waiting for head block"); + loop { + spawned_app.app.head_block_receiver().changed().await?; + + if spawned_app + .app + .head_block_receiver() + .borrow_and_update() + .is_some() + { + break; + } else { + info!("no head block yet!"); + } + } + + // start the frontend port + let frontend_handle = tokio::spawn(frontend::serve( + spawned_app.app, + frontend_shutdown_receiver, + frontend_shutdown_complete_sender, + )); + + let frontend_handle = flatten_handle(frontend_handle); + + // if everything is working, these should all run forever + let mut exited_with_err = false; + let mut frontend_exited = false; + tokio::select! { + x = flatten_handles(spawned_app.app_handles) => { + match x { + Ok(_) => info!("app_handle exited"), + Err(e) => { + error!("app_handle exited: {:#?}", e); + exited_with_err = true; + } + } + } + x = frontend_handle => { + frontend_exited = true; + match x { + Ok(_) => info!("frontend exited"), + Err(e) => { + error!("frontend exited: {:#?}", e); + exited_with_err = true; + } + } + } + x = flatten_handle(prometheus_handle) => { + match x { + Ok(_) => info!("prometheus exited"), + Err(e) => { + error!("prometheus exited: {:#?}", e); + exited_with_err = true; + } + } + } + x = tokio::signal::ctrl_c() => { + // TODO: unix terminate signal, too + match x { + Ok(_) => info!("quiting from ctrl-c"), + Err(e) => { + // TODO: i don't think this is possible + error!("error quiting from ctrl-c: {:#?}", e); + exited_with_err = true; + } + } + } + // TODO: This seems to have been removed on the main branch + // TODO: how can we properly watch background handles here? this returns None immediatly and the app exits. i think the bug is somewhere else though + x = spawned_app.background_handles.next() => { + match x { + Some(Ok(_)) => info!("quiting from background handles"), + Some(Err(e)) => { + error!("quiting from background handle error: {:#?}", e); + exited_with_err = true; + } + None => { + // TODO: is this an error? + warn!("background handles exited"); + } + } + } + }; + + // TODO: This is also not there on the main branch + // if a future above completed, make sure the frontend knows to start turning off + if !frontend_exited { + if let Err(err) = frontend_shutdown_sender.send(()) { + // TODO: this is actually expected if the frontend is already shut down + warn!(?err, "shutdown sender"); + }; + } + + // TODO: Also not there on main branch + // TODO: wait until the frontend completes + if let Err(err) = frontend_shutdown_complete_receiver.recv().await { + warn!(?err, "shutdown completition"); + } else { + info!("frontend exited gracefully"); + } + + // now that the frontend is complete, tell all the other futures to finish + if let Err(err) = app_shutdown_sender.send(()) { + warn!(?err, "backend sender"); + }; + + info!( + "waiting on {} important background tasks", + spawned_app.background_handles.len() + ); + let mut background_errors = 0; + while let Some(x) = spawned_app.background_handles.next().await { + match x { + Err(e) => { + error!("{:?}", e); + background_errors += 1; + } + Ok(Err(e)) => { + error!("{:?}", e); + background_errors += 1; + } + Ok(Ok(_)) => { + // TODO: how can we know which handle exited? + trace!("a background handle exited"); + continue; + } + } + } + + if background_errors.is_zero() && !exited_with_err { + info!("finished"); + Ok(()) + } else { + // TODO: collect all the errors here instead? + Err(anyhow::anyhow!("finished with errors!")) + } + } +} diff --git a/web3_proxy/src/bin/web3_proxy_cli/rpc_accounting.rs b/web3_proxy/src/sub_commands/rpc_accounting.rs similarity index 100% rename from web3_proxy/src/bin/web3_proxy_cli/rpc_accounting.rs rename to web3_proxy/src/sub_commands/rpc_accounting.rs diff --git a/web3_proxy/src/bin/web3_proxy_cli/search_kafka.rs b/web3_proxy/src/sub_commands/search_kafka.rs similarity index 96% rename from web3_proxy/src/bin/web3_proxy_cli/search_kafka.rs rename to web3_proxy/src/sub_commands/search_kafka.rs index caf5fe97..0c53c861 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/search_kafka.rs +++ b/web3_proxy/src/sub_commands/search_kafka.rs @@ -1,3 +1,4 @@ +use crate::{config::TopConfig, frontend::authorization::RpcSecretKey, relational_db::get_db}; use anyhow::Context; use argh::FromArgs; use entities::rpc_key; @@ -10,7 +11,6 @@ use rdkafka::{ use std::num::NonZeroU64; use tracing::info; use uuid::Uuid; -use web3_proxy::{config::TopConfig, frontend::authorization::RpcSecretKey, relational_db::get_db}; /// Second subcommand. #[derive(FromArgs, PartialEq, Debug, Eq)] diff --git a/web3_proxy/src/bin/web3_proxy_cli/sentryd/compare.rs b/web3_proxy/src/sub_commands/sentryd/compare.rs similarity index 99% rename from web3_proxy/src/bin/web3_proxy_cli/sentryd/compare.rs rename to web3_proxy/src/sub_commands/sentryd/compare.rs index 3752a421..2416e159 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/sentryd/compare.rs +++ b/web3_proxy/src/sub_commands/sentryd/compare.rs @@ -1,3 +1,4 @@ +use crate::jsonrpc::JsonRpcErrorData; use anyhow::{anyhow, Context}; use chrono::{DateTime, Utc}; use ethers::types::{Block, TxHash, H256}; @@ -5,7 +6,6 @@ use futures::{stream::FuturesUnordered, StreamExt}; use serde::{Deserialize, Serialize}; use serde_json::json; use tracing::{debug, warn}; -use web3_proxy::jsonrpc::JsonRpcErrorData; use super::{SentrydErrorBuilder, SentrydResult}; diff --git a/web3_proxy/src/bin/web3_proxy_cli/sentryd/mod.rs b/web3_proxy/src/sub_commands/sentryd/mod.rs similarity index 99% rename from web3_proxy/src/bin/web3_proxy_cli/sentryd/mod.rs rename to web3_proxy/src/sub_commands/sentryd/mod.rs index 1867911c..62750590 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/sentryd/mod.rs +++ b/web3_proxy/src/sub_commands/sentryd/mod.rs @@ -1,6 +1,7 @@ mod compare; mod simple; +use crate::{config::TopConfig, pagerduty::pagerduty_alert}; use anyhow::Context; use argh::FromArgs; use futures::{ @@ -13,7 +14,6 @@ use std::time::Duration; use tokio::sync::mpsc; use tokio::time::{interval, MissedTickBehavior}; use tracing::{debug, error, info, warn, Level}; -use web3_proxy::{config::TopConfig, pagerduty::pagerduty_alert}; #[derive(FromArgs, PartialEq, Debug, Eq)] /// Loop healthchecks and send pager duty alerts if any fail diff --git a/web3_proxy/src/bin/web3_proxy_cli/sentryd/simple.rs b/web3_proxy/src/sub_commands/sentryd/simple.rs similarity index 100% rename from web3_proxy/src/bin/web3_proxy_cli/sentryd/simple.rs rename to web3_proxy/src/sub_commands/sentryd/simple.rs diff --git a/web3_proxy/src/bin/web3_proxy_cli/stat_age.rs b/web3_proxy/src/sub_commands/stat_age.rs similarity index 100% rename from web3_proxy/src/bin/web3_proxy_cli/stat_age.rs rename to web3_proxy/src/sub_commands/stat_age.rs diff --git a/web3_proxy/src/bin/web3_proxy_cli/transfer_key.rs b/web3_proxy/src/sub_commands/transfer_key.rs similarity index 96% rename from web3_proxy/src/bin/web3_proxy_cli/transfer_key.rs rename to web3_proxy/src/sub_commands/transfer_key.rs index c44b8fe1..f4fa4862 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/transfer_key.rs +++ b/web3_proxy/src/sub_commands/transfer_key.rs @@ -1,3 +1,4 @@ +use crate::frontend::authorization::RpcSecretKey; use anyhow::Context; use argh::FromArgs; use entities::{rpc_key, user}; @@ -8,7 +9,6 @@ use migration::sea_orm::{ }; use sea_orm::prelude::Uuid; use tracing::{debug, info}; -use web3_proxy::frontend::authorization::RpcSecretKey; /// change a key's owner. #[derive(FromArgs, PartialEq, Eq, Debug)] diff --git a/web3_proxy/src/bin/web3_proxy_cli/user_export.rs b/web3_proxy/src/sub_commands/user_export.rs similarity index 100% rename from web3_proxy/src/bin/web3_proxy_cli/user_export.rs rename to web3_proxy/src/sub_commands/user_export.rs diff --git a/web3_proxy/src/bin/web3_proxy_cli/user_import.rs b/web3_proxy/src/sub_commands/user_import.rs similarity index 100% rename from web3_proxy/src/bin/web3_proxy_cli/user_import.rs rename to web3_proxy/src/sub_commands/user_import.rs diff --git a/web3_proxy/tests/common/mod.rs b/web3_proxy/tests/common/mod.rs new file mode 100644 index 00000000..92a42101 --- /dev/null +++ b/web3_proxy/tests/common/mod.rs @@ -0,0 +1,155 @@ +use ethers::{ + prelude::{Http, Provider}, + types::Address, + utils::{Anvil, AnvilInstance}, +}; +use hashbrown::HashMap; +use parking_lot::Mutex; +use std::{ + env, + str::FromStr, + sync::atomic::{AtomicU16, Ordering}, +}; +use std::{sync::Arc, time::Duration}; +use tokio::{ + sync::broadcast::{self, error::SendError}, + task::JoinHandle, + time::{sleep, Instant}, +}; +use tracing::info; +use web3_proxy::{ + config::{AppConfig, TopConfig, Web3RpcConfig}, + sub_commands::ProxydSubCommand, +}; + +pub struct TestApp { + /// anvil shuts down when this guard is dropped. + _anvil: AnvilInstance, + + /// spawn handle for the proxy. + handle: Mutex>>>, + + /// tell the app to shut down (use `self.stop()`). + shutdown_sender: broadcast::Sender<()>, + + /// connection to anvil. + pub anvil_provider: Provider, + + /// connection to the proxy that is connected to anil. + pub proxy_provider: Provider, +} + +impl TestApp { + pub async fn spawn() -> Self { + let num_workers = 2; + + // TODO: move basic setup into a test fixture + let path = env::var("PATH").unwrap(); + + info!("path: {}", path); + + // TODO: configurable rpc and block + let anvil = Anvil::new() + // .fork("https://polygon.llamarpc.com@44300000") + .spawn(); + + info!("Anvil running at `{}`", anvil.endpoint()); + + let anvil_provider = Provider::::try_from(anvil.endpoint()).unwrap(); + + // make a test TopConfig + // TODO: load TopConfig from a file? CliConfig could have `cli_config.load_top_config`. would need to inject our endpoint ports + let top_config = TopConfig { + app: AppConfig { + chain_id: 31337, + db_url: Some("sqlite://:memory:".into()), + default_user_max_requests_per_period: Some(6_000_000), + deposit_factory_contract: Address::from_str( + "4e3BC2054788De923A04936C6ADdB99A05B0Ea36", + ) + .ok(), + min_sum_soft_limit: 1, + min_synced_rpcs: 1, + public_requests_per_period: Some(1_000_000), + response_cache_max_bytes: 10_u64.pow(7), + ..Default::default() + }, + balanced_rpcs: HashMap::from([( + "anvil_both".to_string(), + Web3RpcConfig { + http_url: Some(anvil.endpoint()), + ws_url: Some(anvil.ws_endpoint()), + ..Default::default() + }, + )]), + private_rpcs: None, + bundler_4337_rpcs: None, + extra: Default::default(), + }; + + let (shutdown_sender, _shutdown_receiver) = broadcast::channel(1); + + let frontend_port_arc = Arc::new(AtomicU16::new(0)); + let prometheus_port_arc = Arc::new(AtomicU16::new(0)); + + // spawn the app + // TODO: spawn in a thread so we can run from non-async tests and so the Drop impl can wait for it to stop + let handle = { + tokio::spawn(ProxydSubCommand::_main( + top_config, + None, + frontend_port_arc.clone(), + prometheus_port_arc, + num_workers, + shutdown_sender.clone(), + )) + }; + + let mut frontend_port = frontend_port_arc.load(Ordering::Relaxed); + let start = Instant::now(); + while frontend_port == 0 { + if start.elapsed() > Duration::from_secs(1) { + panic!("took too long to start!"); + } + + sleep(Duration::from_millis(10)).await; + frontend_port = frontend_port_arc.load(Ordering::Relaxed); + } + + let proxy_endpoint = format!("http://127.0.0.1:{}", frontend_port); + + let proxy_provider = Provider::::try_from(proxy_endpoint).unwrap(); + + Self { + handle: Mutex::new(Some(handle)), + anvil_provider, + proxy_provider, + shutdown_sender, + _anvil: anvil, + } + } + + pub fn stop(&self) -> Result> { + self.shutdown_sender.send(()) + } + + pub async fn wait(&self) { + // TODO: lock+take feels weird, but it works + let handle = self.handle.lock().take(); + + if let Some(handle) = handle { + let _ = self.stop(); + + info!("waiting for the app to stop..."); + handle.await.unwrap().unwrap(); + } + } +} + +impl Drop for TestApp { + fn drop(&mut self) { + let _ = self.stop(); + + // TODO: do we care about waiting for it to stop? it will slow our tests down so we probably only care about waiting in some tests + } +} diff --git a/web3_proxy/tests/test_proxy.rs b/web3_proxy/tests/test_proxy.rs new file mode 100644 index 00000000..4234502b --- /dev/null +++ b/web3_proxy/tests/test_proxy.rs @@ -0,0 +1,69 @@ +mod common; + +use crate::common::TestApp; +use ethers::prelude::U256; +use std::time::Duration; +use tokio::time::{sleep, Instant}; +use web3_proxy::rpcs::blockchain::ArcBlock; + +#[test_log::test(tokio::test)] +async fn it_works() { + let x = TestApp::spawn().await; + + let anvil_provider = &x.anvil_provider; + let proxy_provider = &x.proxy_provider; + + let anvil_result = anvil_provider + .request::<_, Option>("eth_getBlockByNumber", ("latest", false)) + .await + .unwrap() + .unwrap(); + let proxy_result = proxy_provider + .request::<_, Option>("eth_getBlockByNumber", ("latest", false)) + .await + .unwrap() + .unwrap(); + + assert_eq!(anvil_result, proxy_result); + + let first_block_num = anvil_result.number.unwrap(); + + // mine a block + let _: U256 = anvil_provider.request("evm_mine", ()).await.unwrap(); + + // make sure the block advanced + let anvil_result = anvil_provider + .request::<_, Option>("eth_getBlockByNumber", ("latest", false)) + .await + .unwrap() + .unwrap(); + + let second_block_num = anvil_result.number.unwrap(); + + assert_eq!(first_block_num, second_block_num - 1); + + let mut proxy_result; + let start = Instant::now(); + loop { + if start.elapsed() > Duration::from_secs(1) { + panic!("took too long to sync!"); + } + + proxy_result = proxy_provider + .request::<_, Option>("eth_getBlockByNumber", ("latest", false)) + .await + .unwrap(); + + if let Some(ref proxy_result) = proxy_result { + if proxy_result.number != Some(first_block_num) { + break; + } + } + + sleep(Duration::from_millis(10)).await; + } + + assert_eq!(anvil_result, proxy_result.unwrap()); + + x.wait().await; +}