move subcommands so integration tests are easier to write

This commit is contained in:
Bryan Stitt 2023-06-29 12:24:36 -07:00
parent b4f02b24f3
commit c0a658c6b4
33 changed files with 575 additions and 537 deletions

@ -1,25 +1,3 @@
mod change_admin_status;
mod change_user_address;
mod change_user_tier;
mod change_user_tier_by_address;
mod change_user_tier_by_key;
mod check_config;
mod count_users;
mod create_key;
mod create_user;
mod drop_migration_lock;
mod list_user_tier;
mod migrate_stats_to_v2;
mod pagerduty;
mod popularity_contest;
mod proxyd;
mod rpc_accounting;
mod search_kafka;
mod sentryd;
mod transfer_key;
mod user_export;
mod user_import;
use anyhow::Context;
use argh::FromArgs;
use ethers::types::U256;
@ -36,6 +14,7 @@ use tokio::runtime;
use tracing::{info, warn};
use tracing_subscriber::{prelude::*, EnvFilter};
use web3_proxy::pagerduty::panic_handler;
use web3_proxy::sub_commands;
use web3_proxy::{
app::APP_USER_AGENT,
config::TopConfig,
@ -79,26 +58,26 @@ pub struct Web3ProxyCli {
#[derive(FromArgs, PartialEq, Debug)]
#[argh(subcommand)]
enum SubCommand {
ChangeAdminStatus(change_admin_status::ChangeAdminStatusSubCommand),
ChangeUserAddress(change_user_address::ChangeUserAddressSubCommand),
ChangeUserTier(change_user_tier::ChangeUserTierSubCommand),
ChangeUserTierByAddress(change_user_tier_by_address::ChangeUserTierByAddressSubCommand),
ChangeUserTierByKey(change_user_tier_by_key::ChangeUserTierByKeySubCommand),
CheckConfig(check_config::CheckConfigSubCommand),
CountUsers(count_users::CountUsersSubCommand),
CreateKey(create_key::CreateKeySubCommand),
CreateUser(create_user::CreateUserSubCommand),
DropMigrationLock(drop_migration_lock::DropMigrationLockSubCommand),
MigrateStatsToV2(migrate_stats_to_v2::MigrateStatsToV2),
Pagerduty(pagerduty::PagerdutySubCommand),
PopularityContest(popularity_contest::PopularityContestSubCommand),
Proxyd(proxyd::ProxydSubCommand),
RpcAccounting(rpc_accounting::RpcAccountingSubCommand),
SearchKafka(search_kafka::SearchKafkaSubCommand),
Sentryd(sentryd::SentrydSubCommand),
TransferKey(transfer_key::TransferKeySubCommand),
UserExport(user_export::UserExportSubCommand),
UserImport(user_import::UserImportSubCommand),
ChangeAdminStatus(sub_commands::ChangeAdminStatusSubCommand),
ChangeUserAddress(sub_commands::ChangeUserAddressSubCommand),
ChangeUserTier(sub_commands::ChangeUserTierSubCommand),
ChangeUserTierByAddress(sub_commands::ChangeUserTierByAddressSubCommand),
ChangeUserTierByKey(sub_commands::ChangeUserTierByKeySubCommand),
CheckConfig(sub_commands::CheckConfigSubCommand),
CountUsers(sub_commands::CountUsersSubCommand),
CreateKey(sub_commands::CreateKeySubCommand),
CreateUser(sub_commands::CreateUserSubCommand),
DropMigrationLock(sub_commands::DropMigrationLockSubCommand),
MigrateStatsToV2(sub_commands::MigrateStatsToV2SubCommand),
Pagerduty(sub_commands::PagerdutySubCommand),
PopularityContest(sub_commands::PopularityContestSubCommand),
Proxyd(sub_commands::ProxydSubCommand),
RpcAccounting(sub_commands::RpcAccountingSubCommand),
SearchKafka(sub_commands::SearchKafkaSubCommand),
Sentryd(sub_commands::SentrydSubCommand),
TransferKey(sub_commands::TransferKeySubCommand),
UserExport(sub_commands::UserExportSubCommand),
UserImport(sub_commands::UserImportSubCommand),
// TODO: sub command to downgrade migrations? sea-orm has this but doing downgrades here would be easier+safer
// TODO: sub command to add new api keys to an existing user?
// TODO: sub command to change a user's tier

@ -1 +0,0 @@
//! TODO: write this

@ -1,474 +0,0 @@
#![forbid(unsafe_code)]
use argh::FromArgs;
use futures::StreamExt;
use num::Zero;
use std::path::PathBuf;
use std::sync::atomic::AtomicU16;
use std::sync::Arc;
use std::time::Duration;
use std::{fs, thread};
use tokio::sync::broadcast;
use tracing::{error, info, trace, warn};
use web3_proxy::app::{flatten_handle, flatten_handles, Web3ProxyApp};
use web3_proxy::config::TopConfig;
use web3_proxy::{frontend, prometheus};
/// start the main proxy daemon
#[derive(FromArgs, PartialEq, Debug, Eq)]
#[argh(subcommand, name = "proxyd")]
pub struct ProxydSubCommand {
/// path to a toml of rpc servers
/// what port the proxy should listen on
#[argh(option, default = "8544")]
pub port: u16,
/// what port the proxy should expose prometheus stats on
#[argh(option, default = "8543")]
pub prometheus_port: u16,
}
impl ProxydSubCommand {
pub async fn main(
self,
top_config: TopConfig,
top_config_path: PathBuf,
num_workers: usize,
) -> anyhow::Result<()> {
let (shutdown_sender, _) = broadcast::channel(1);
// TODO: i think there is a small race. if config_path changes
let frontend_port = Arc::new(self.port.into());
let prometheus_port = Arc::new(self.prometheus_port.into());
run(
top_config,
Some(top_config_path),
frontend_port,
prometheus_port,
num_workers,
shutdown_sender,
)
.await
}
}
async fn run(
top_config: TopConfig,
top_config_path: Option<PathBuf>,
frontend_port: Arc<AtomicU16>,
prometheus_port: Arc<AtomicU16>,
num_workers: usize,
frontend_shutdown_sender: broadcast::Sender<()>,
) -> anyhow::Result<()> {
// tokio has code for catching ctrl+c so we use that
// this shutdown sender is currently only used in tests, but we might make a /shutdown endpoint or something
// we do not need this receiver. new receivers are made by `shutdown_sender.subscribe()`
// TODO: should we use a watch or broadcast for these?
// Maybe this one ?
// let mut shutdown_receiver = shutdown_sender.subscribe();
let (app_shutdown_sender, _app_shutdown_receiver) = broadcast::channel(1);
let frontend_shutdown_receiver = frontend_shutdown_sender.subscribe();
let prometheus_shutdown_receiver = app_shutdown_sender.subscribe();
// TODO: should we use a watch or broadcast for these?
let (frontend_shutdown_complete_sender, mut frontend_shutdown_complete_receiver) =
broadcast::channel(1);
// start the main app
let mut spawned_app = Web3ProxyApp::spawn(
frontend_port,
prometheus_port,
top_config.clone(),
num_workers,
app_shutdown_sender.clone(),
)
.await?;
// start thread for watching config
if let Some(top_config_path) = top_config_path {
let config_sender = spawned_app.new_top_config;
{
let mut current_config = config_sender.borrow().clone();
thread::spawn(move || loop {
match fs::read_to_string(&top_config_path) {
Ok(new_top_config) => match toml::from_str::<TopConfig>(&new_top_config) {
Ok(new_top_config) => {
if new_top_config != current_config {
// TODO: print the differences
// TODO: first run seems to always see differences. why?
info!("config @ {:?} changed", top_config_path);
config_sender.send(new_top_config.clone()).unwrap();
current_config = new_top_config;
}
}
Err(err) => {
// TODO: panic?
error!("Unable to parse config! {:#?}", err);
}
},
Err(err) => {
// TODO: panic?
error!("Unable to read config! {:#?}", err);
}
}
thread::sleep(Duration::from_secs(10));
});
}
}
// start the prometheus metrics port
let prometheus_handle = tokio::spawn(prometheus::serve(
spawned_app.app.clone(),
prometheus_shutdown_receiver,
));
info!("waiting for head block");
loop {
spawned_app.app.head_block_receiver().changed().await?;
if spawned_app
.app
.head_block_receiver()
.borrow_and_update()
.is_some()
{
break;
} else {
info!("no head block yet!");
}
}
// start the frontend port
let frontend_handle = tokio::spawn(frontend::serve(
spawned_app.app,
frontend_shutdown_receiver,
frontend_shutdown_complete_sender,
));
let frontend_handle = flatten_handle(frontend_handle);
// if everything is working, these should all run forever
let mut exited_with_err = false;
let mut frontend_exited = false;
tokio::select! {
x = flatten_handles(spawned_app.app_handles) => {
match x {
Ok(_) => info!("app_handle exited"),
Err(e) => {
error!("app_handle exited: {:#?}", e);
exited_with_err = true;
}
}
}
x = frontend_handle => {
frontend_exited = true;
match x {
Ok(_) => info!("frontend exited"),
Err(e) => {
error!("frontend exited: {:#?}", e);
exited_with_err = true;
}
}
}
x = flatten_handle(prometheus_handle) => {
match x {
Ok(_) => info!("prometheus exited"),
Err(e) => {
error!("prometheus exited: {:#?}", e);
exited_with_err = true;
}
}
}
x = tokio::signal::ctrl_c() => {
// TODO: unix terminate signal, too
match x {
Ok(_) => info!("quiting from ctrl-c"),
Err(e) => {
// TODO: i don't think this is possible
error!("error quiting from ctrl-c: {:#?}", e);
exited_with_err = true;
}
}
}
// TODO: This seems to have been removed on the main branch
// TODO: how can we properly watch background handles here? this returns None immediatly and the app exits. i think the bug is somewhere else though
x = spawned_app.background_handles.next() => {
match x {
Some(Ok(_)) => info!("quiting from background handles"),
Some(Err(e)) => {
error!("quiting from background handle error: {:#?}", e);
exited_with_err = true;
}
None => {
// TODO: is this an error?
warn!("background handles exited");
}
}
}
};
// TODO: This is also not there on the main branch
// if a future above completed, make sure the frontend knows to start turning off
if !frontend_exited {
if let Err(err) = frontend_shutdown_sender.send(()) {
// TODO: this is actually expected if the frontend is already shut down
warn!(?err, "shutdown sender");
};
}
// TODO: Also not there on main branch
// TODO: wait until the frontend completes
if let Err(err) = frontend_shutdown_complete_receiver.recv().await {
warn!(?err, "shutdown completition");
} else {
info!("frontend exited gracefully");
}
// now that the frontend is complete, tell all the other futures to finish
if let Err(err) = app_shutdown_sender.send(()) {
warn!(?err, "backend sender");
};
info!(
"waiting on {} important background tasks",
spawned_app.background_handles.len()
);
let mut background_errors = 0;
while let Some(x) = spawned_app.background_handles.next().await {
match x {
Err(e) => {
error!("{:?}", e);
background_errors += 1;
}
Ok(Err(e)) => {
error!("{:?}", e);
background_errors += 1;
}
Ok(Ok(_)) => {
// TODO: how can we know which handle exited?
trace!("a background handle exited");
continue;
}
}
}
if background_errors.is_zero() && !exited_with_err {
info!("finished");
Ok(())
} else {
// TODO: collect all the errors here instead?
Err(anyhow::anyhow!("finished with errors!"))
}
}
#[cfg(test)]
mod tests {
use super::*;
use ethers::{
prelude::{Http, Provider, U256},
types::Address,
utils::{Anvil, AnvilInstance},
};
use hashbrown::HashMap;
use parking_lot::Mutex;
use std::{
env,
str::FromStr,
sync::atomic::{AtomicU16, Ordering},
};
use tokio::{
sync::broadcast::error::SendError,
task::JoinHandle,
time::{sleep, Instant},
};
use web3_proxy::{
config::{AppConfig, Web3RpcConfig},
rpcs::blockchain::ArcBlock,
};
// TODO: put it in a thread?
struct TestApp {
_anvil: AnvilInstance,
handle: Mutex<Option<JoinHandle<anyhow::Result<()>>>>,
anvil_provider: Provider<Http>,
proxy_provider: Provider<Http>,
shutdown_sender: broadcast::Sender<()>,
}
impl TestApp {
async fn spawn() -> Self {
// TODO: move basic setup into a test fixture
let path = env::var("PATH").unwrap();
info!("path: {}", path);
// TODO: configurable rpc and block
let anvil = Anvil::new()
// .fork("https://polygon.llamarpc.com@44300000")
.spawn();
info!("Anvil running at `{}`", anvil.endpoint());
let anvil_provider = Provider::<Http>::try_from(anvil.endpoint()).unwrap();
// make a test TopConfig
// TODO: load TopConfig from a file? CliConfig could have `cli_config.load_top_config`. would need to inject our endpoint ports
let top_config = TopConfig {
app: AppConfig {
chain_id: 31337,
default_user_max_requests_per_period: Some(6_000_000),
deposit_factory_contract: Address::from_str(
"4e3BC2054788De923A04936C6ADdB99A05B0Ea36",
)
.ok(),
min_sum_soft_limit: 1,
min_synced_rpcs: 1,
public_requests_per_period: Some(1_000_000),
response_cache_max_bytes: 10_u64.pow(7),
..Default::default()
},
balanced_rpcs: HashMap::from([(
"anvil_both".to_string(),
Web3RpcConfig {
http_url: Some(anvil.endpoint()),
ws_url: Some(anvil.ws_endpoint()),
..Default::default()
},
)]),
private_rpcs: None,
bundler_4337_rpcs: None,
extra: Default::default(),
};
let (shutdown_sender, _shutdown_receiver) = broadcast::channel(1);
let frontend_port_arc = Arc::new(AtomicU16::new(0));
let prometheus_port_arc = Arc::new(AtomicU16::new(0));
// spawn another thread for running the app
// TODO: allow launching into the local tokio runtime instead of creating a new one?
let handle = {
tokio::spawn(run(
top_config,
None,
frontend_port_arc.clone(),
prometheus_port_arc,
2,
shutdown_sender.clone(),
))
};
let mut frontend_port = frontend_port_arc.load(Ordering::Relaxed);
let start = Instant::now();
while frontend_port == 0 {
if start.elapsed() > Duration::from_secs(1) {
panic!("took too long to start!");
}
sleep(Duration::from_millis(10)).await;
frontend_port = frontend_port_arc.load(Ordering::Relaxed);
}
let proxy_endpoint = format!("http://127.0.0.1:{}", frontend_port);
let proxy_provider = Provider::<Http>::try_from(proxy_endpoint).unwrap();
Self {
handle: Mutex::new(Some(handle)),
anvil_provider,
proxy_provider,
shutdown_sender,
_anvil: anvil,
}
}
fn stop(&self) -> Result<usize, SendError<()>> {
self.shutdown_sender.send(())
}
async fn wait(&self) {
// TODO: lock+take feels weird, but it works
let handle = self.handle.lock().take();
if let Some(handle) = handle {
let _ = self.stop();
info!("waiting for the app to stop...");
handle.await.unwrap().unwrap();
}
}
}
impl Drop for TestApp {
fn drop(&mut self) {
let _ = self.stop();
}
}
#[test_log::test(tokio::test)]
async fn it_works() {
let x = TestApp::spawn().await;
let anvil_provider = &x.anvil_provider;
let proxy_provider = &x.proxy_provider;
let anvil_result = anvil_provider
.request::<_, Option<ArcBlock>>("eth_getBlockByNumber", ("latest", false))
.await
.unwrap()
.unwrap();
let proxy_result = proxy_provider
.request::<_, Option<ArcBlock>>("eth_getBlockByNumber", ("latest", false))
.await
.unwrap()
.unwrap();
assert_eq!(anvil_result, proxy_result);
let first_block_num = anvil_result.number.unwrap();
// mine a block
let _: U256 = anvil_provider.request("evm_mine", ()).await.unwrap();
// make sure the block advanced
let anvil_result = anvil_provider
.request::<_, Option<ArcBlock>>("eth_getBlockByNumber", ("latest", false))
.await
.unwrap()
.unwrap();
let second_block_num = anvil_result.number.unwrap();
assert_eq!(first_block_num, second_block_num - 1);
let mut proxy_result;
let start = Instant::now();
loop {
if start.elapsed() > Duration::from_secs(1) {
panic!("took too long to sync!");
}
proxy_result = proxy_provider
.request::<_, Option<ArcBlock>>("eth_getBlockByNumber", ("latest", false))
.await
.unwrap();
if let Some(ref proxy_result) = proxy_result {
if proxy_result.number != Some(first_block_num) {
break;
}
}
sleep(Duration::from_millis(10)).await;
}
assert_eq!(anvil_result, proxy_result.unwrap());
x.wait().await;
}
}

@ -17,6 +17,7 @@ pub mod relational_db;
pub mod response_cache;
pub mod rpcs;
pub mod stats;
pub mod sub_commands;
pub mod user_token;
use serde::Deserialize;

@ -1,3 +1,4 @@
use crate::frontend::authorization::RpcSecretKey;
use anyhow::Context;
use argh::FromArgs;
use entities::{rpc_key, user, user_tier};
@ -8,7 +9,6 @@ use migration::sea_orm::{
use serde_json::json;
use tracing::{debug, info};
use uuid::Uuid;
use web3_proxy::frontend::authorization::RpcSecretKey;
/// change a user's tier.
#[derive(FromArgs, PartialEq, Eq, Debug)]

@ -1,7 +1,7 @@
use crate::config::TopConfig;
use argh::FromArgs;
use std::fs;
use tracing::{error, info, warn};
use web3_proxy::config::TopConfig;
#[derive(FromArgs, PartialEq, Eq, Debug)]
/// Check the config for any problems.

@ -1,3 +1,4 @@
use crate::frontend::authorization::RpcSecretKey;
use anyhow::Context;
use argh::FromArgs;
use entities::{rpc_key, user};
@ -6,7 +7,6 @@ use migration::sea_orm::{self, ActiveModelTrait, ColumnTrait, EntityTrait, Query
use tracing::info;
use ulid::Ulid;
use uuid::Uuid;
use web3_proxy::frontend::authorization::RpcSecretKey;
#[derive(FromArgs, PartialEq, Debug, Eq)]
/// Create a new user and api key

@ -1,3 +1,4 @@
use crate::frontend::authorization::RpcSecretKey;
use anyhow::Context;
use argh::FromArgs;
use entities::{rpc_key, user};
@ -6,7 +7,6 @@ use migration::sea_orm::{self, ActiveModelTrait, TransactionTrait};
use tracing::info;
use ulid::Ulid;
use uuid::Uuid;
use web3_proxy::frontend::authorization::RpcSecretKey;
#[derive(FromArgs, PartialEq, Debug, Eq)]
/// Create a new user and api key

@ -1,6 +1,6 @@
use crate::relational_db::{drop_migration_lock, migrate_db};
use argh::FromArgs;
use migration::sea_orm::DatabaseConnection;
use web3_proxy::relational_db::{drop_migration_lock, migrate_db};
#[derive(FromArgs, PartialEq, Debug, Eq)]
/// In case of emergency, break glass.

@ -1,3 +1,8 @@
use crate::app::BILLING_PERIOD_SECONDS;
use crate::config::TopConfig;
use crate::frontend::authorization::{Authorization, RequestMetadata, RpcSecretKey};
use crate::rpcs::one::Web3Rpc;
use crate::stats::StatBuffer;
use anyhow::{anyhow, Context};
use argh::FromArgs;
use entities::{rpc_accounting, rpc_key};
@ -15,18 +20,13 @@ use tokio::sync::broadcast;
use tokio::time::Instant;
use tracing::{error, info};
use ulid::Ulid;
use web3_proxy::app::BILLING_PERIOD_SECONDS;
use web3_proxy::config::TopConfig;
use web3_proxy::frontend::authorization::{Authorization, RequestMetadata, RpcSecretKey};
use web3_proxy::rpcs::one::Web3Rpc;
use web3_proxy::stats::StatBuffer;
#[derive(FromArgs, PartialEq, Eq, Debug)]
/// Migrate towards influxdb and rpc_accounting_v2 from rpc_accounting
#[argh(subcommand, name = "migrate_stats_to_v2")]
pub struct MigrateStatsToV2 {}
pub struct MigrateStatsToV2SubCommand {}
impl MigrateStatsToV2 {
impl MigrateStatsToV2SubCommand {
pub async fn main(
self,
top_config: TopConfig,

@ -0,0 +1,41 @@
mod change_admin_status;
mod change_user_address;
mod change_user_tier;
mod change_user_tier_by_address;
mod change_user_tier_by_key;
mod check_config;
mod count_users;
mod create_key;
mod create_user;
mod drop_migration_lock;
mod migrate_stats_to_v2;
mod pagerduty;
mod popularity_contest;
mod proxyd;
mod rpc_accounting;
mod search_kafka;
mod sentryd;
mod transfer_key;
mod user_export;
mod user_import;
pub use self::change_admin_status::ChangeAdminStatusSubCommand;
pub use self::change_user_address::ChangeUserAddressSubCommand;
pub use self::change_user_tier::ChangeUserTierSubCommand;
pub use self::change_user_tier_by_address::ChangeUserTierByAddressSubCommand;
pub use self::change_user_tier_by_key::ChangeUserTierByKeySubCommand;
pub use self::check_config::CheckConfigSubCommand;
pub use self::count_users::CountUsersSubCommand;
pub use self::create_key::CreateKeySubCommand;
pub use self::create_user::CreateUserSubCommand;
pub use self::drop_migration_lock::DropMigrationLockSubCommand;
pub use self::migrate_stats_to_v2::MigrateStatsToV2SubCommand;
pub use self::pagerduty::PagerdutySubCommand;
pub use self::popularity_contest::PopularityContestSubCommand;
pub use self::proxyd::ProxydSubCommand;
pub use self::rpc_accounting::RpcAccountingSubCommand;
pub use self::search_kafka::SearchKafkaSubCommand;
pub use self::sentryd::SentrydSubCommand;
pub use self::transfer_key::TransferKeySubCommand;
pub use self::user_export::UserExportSubCommand;
pub use self::user_import::UserImportSubCommand;

@ -1,11 +1,11 @@
use crate::{
config::TopConfig,
pagerduty::{pagerduty_alert, pagerduty_alert_for_config},
};
use argh::FromArgs;
use pagerduty_rs::{eventsv2async::EventsV2 as PagerdutyAsyncEventsV2, types::Event};
use serde_json::json;
use tracing::{error, info};
use web3_proxy::{
config::TopConfig,
pagerduty::{pagerduty_alert, pagerduty_alert_for_config},
};
#[derive(FromArgs, PartialEq, Debug, Eq)]
/// Quickly create a pagerduty alert

@ -0,0 +1,268 @@
#![forbid(unsafe_code)]
use crate::app::{flatten_handle, flatten_handles, Web3ProxyApp};
use crate::config::TopConfig;
use crate::{frontend, prometheus};
use argh::FromArgs;
use futures::StreamExt;
use num::Zero;
use std::path::PathBuf;
use std::sync::atomic::AtomicU16;
use std::sync::Arc;
use std::time::Duration;
use std::{fs, thread};
use tokio::sync::broadcast;
use tracing::{error, info, trace, warn};
/// start the main proxy daemon
#[derive(FromArgs, PartialEq, Debug, Eq)]
#[argh(subcommand, name = "proxyd")]
pub struct ProxydSubCommand {
/// path to a toml of rpc servers
/// what port the proxy should listen on
#[argh(option, default = "8544")]
pub port: u16,
/// what port the proxy should expose prometheus stats on
#[argh(option, default = "8543")]
pub prometheus_port: u16,
}
impl ProxydSubCommand {
pub async fn main(
self,
top_config: TopConfig,
top_config_path: PathBuf,
num_workers: usize,
) -> anyhow::Result<()> {
let (shutdown_sender, _) = broadcast::channel(1);
// TODO: i think there is a small race. if config_path changes
let frontend_port = Arc::new(self.port.into());
let prometheus_port = Arc::new(self.prometheus_port.into());
Self::_main(
top_config,
Some(top_config_path),
frontend_port,
prometheus_port,
num_workers,
shutdown_sender,
)
.await
}
/// this shouldn't really be pub except it makes test fixtures easier
pub async fn _main(
top_config: TopConfig,
top_config_path: Option<PathBuf>,
frontend_port: Arc<AtomicU16>,
prometheus_port: Arc<AtomicU16>,
num_workers: usize,
frontend_shutdown_sender: broadcast::Sender<()>,
) -> anyhow::Result<()> {
// tokio has code for catching ctrl+c so we use that
// this shutdown sender is currently only used in tests, but we might make a /shutdown endpoint or something
// we do not need this receiver. new receivers are made by `shutdown_sender.subscribe()`
// TODO: should we use a watch or broadcast for these?
// Maybe this one ?
// let mut shutdown_receiver = shutdown_sender.subscribe();
let (app_shutdown_sender, _app_shutdown_receiver) = broadcast::channel(1);
let frontend_shutdown_receiver = frontend_shutdown_sender.subscribe();
let prometheus_shutdown_receiver = app_shutdown_sender.subscribe();
// TODO: should we use a watch or broadcast for these?
let (frontend_shutdown_complete_sender, mut frontend_shutdown_complete_receiver) =
broadcast::channel(1);
// start the main app
let mut spawned_app = Web3ProxyApp::spawn(
frontend_port,
prometheus_port,
top_config.clone(),
num_workers,
app_shutdown_sender.clone(),
)
.await?;
// start thread for watching config
if let Some(top_config_path) = top_config_path {
let config_sender = spawned_app.new_top_config;
{
let mut current_config = config_sender.borrow().clone();
thread::spawn(move || loop {
match fs::read_to_string(&top_config_path) {
Ok(new_top_config) => match toml::from_str::<TopConfig>(&new_top_config) {
Ok(new_top_config) => {
if new_top_config != current_config {
// TODO: print the differences
// TODO: first run seems to always see differences. why?
info!("config @ {:?} changed", top_config_path);
config_sender.send(new_top_config.clone()).unwrap();
current_config = new_top_config;
}
}
Err(err) => {
// TODO: panic?
error!("Unable to parse config! {:#?}", err);
}
},
Err(err) => {
// TODO: panic?
error!("Unable to read config! {:#?}", err);
}
}
thread::sleep(Duration::from_secs(10));
});
}
}
// start the prometheus metrics port
let prometheus_handle = tokio::spawn(prometheus::serve(
spawned_app.app.clone(),
prometheus_shutdown_receiver,
));
info!("waiting for head block");
loop {
spawned_app.app.head_block_receiver().changed().await?;
if spawned_app
.app
.head_block_receiver()
.borrow_and_update()
.is_some()
{
break;
} else {
info!("no head block yet!");
}
}
// start the frontend port
let frontend_handle = tokio::spawn(frontend::serve(
spawned_app.app,
frontend_shutdown_receiver,
frontend_shutdown_complete_sender,
));
let frontend_handle = flatten_handle(frontend_handle);
// if everything is working, these should all run forever
let mut exited_with_err = false;
let mut frontend_exited = false;
tokio::select! {
x = flatten_handles(spawned_app.app_handles) => {
match x {
Ok(_) => info!("app_handle exited"),
Err(e) => {
error!("app_handle exited: {:#?}", e);
exited_with_err = true;
}
}
}
x = frontend_handle => {
frontend_exited = true;
match x {
Ok(_) => info!("frontend exited"),
Err(e) => {
error!("frontend exited: {:#?}", e);
exited_with_err = true;
}
}
}
x = flatten_handle(prometheus_handle) => {
match x {
Ok(_) => info!("prometheus exited"),
Err(e) => {
error!("prometheus exited: {:#?}", e);
exited_with_err = true;
}
}
}
x = tokio::signal::ctrl_c() => {
// TODO: unix terminate signal, too
match x {
Ok(_) => info!("quiting from ctrl-c"),
Err(e) => {
// TODO: i don't think this is possible
error!("error quiting from ctrl-c: {:#?}", e);
exited_with_err = true;
}
}
}
// TODO: This seems to have been removed on the main branch
// TODO: how can we properly watch background handles here? this returns None immediatly and the app exits. i think the bug is somewhere else though
x = spawned_app.background_handles.next() => {
match x {
Some(Ok(_)) => info!("quiting from background handles"),
Some(Err(e)) => {
error!("quiting from background handle error: {:#?}", e);
exited_with_err = true;
}
None => {
// TODO: is this an error?
warn!("background handles exited");
}
}
}
};
// TODO: This is also not there on the main branch
// if a future above completed, make sure the frontend knows to start turning off
if !frontend_exited {
if let Err(err) = frontend_shutdown_sender.send(()) {
// TODO: this is actually expected if the frontend is already shut down
warn!(?err, "shutdown sender");
};
}
// TODO: Also not there on main branch
// TODO: wait until the frontend completes
if let Err(err) = frontend_shutdown_complete_receiver.recv().await {
warn!(?err, "shutdown completition");
} else {
info!("frontend exited gracefully");
}
// now that the frontend is complete, tell all the other futures to finish
if let Err(err) = app_shutdown_sender.send(()) {
warn!(?err, "backend sender");
};
info!(
"waiting on {} important background tasks",
spawned_app.background_handles.len()
);
let mut background_errors = 0;
while let Some(x) = spawned_app.background_handles.next().await {
match x {
Err(e) => {
error!("{:?}", e);
background_errors += 1;
}
Ok(Err(e)) => {
error!("{:?}", e);
background_errors += 1;
}
Ok(Ok(_)) => {
// TODO: how can we know which handle exited?
trace!("a background handle exited");
continue;
}
}
}
if background_errors.is_zero() && !exited_with_err {
info!("finished");
Ok(())
} else {
// TODO: collect all the errors here instead?
Err(anyhow::anyhow!("finished with errors!"))
}
}
}

@ -1,3 +1,4 @@
use crate::{config::TopConfig, frontend::authorization::RpcSecretKey, relational_db::get_db};
use anyhow::Context;
use argh::FromArgs;
use entities::rpc_key;
@ -10,7 +11,6 @@ use rdkafka::{
use std::num::NonZeroU64;
use tracing::info;
use uuid::Uuid;
use web3_proxy::{config::TopConfig, frontend::authorization::RpcSecretKey, relational_db::get_db};
/// Second subcommand.
#[derive(FromArgs, PartialEq, Debug, Eq)]

@ -1,3 +1,4 @@
use crate::jsonrpc::JsonRpcErrorData;
use anyhow::{anyhow, Context};
use chrono::{DateTime, Utc};
use ethers::types::{Block, TxHash, H256};
@ -5,7 +6,6 @@ use futures::{stream::FuturesUnordered, StreamExt};
use serde::{Deserialize, Serialize};
use serde_json::json;
use tracing::{debug, warn};
use web3_proxy::jsonrpc::JsonRpcErrorData;
use super::{SentrydErrorBuilder, SentrydResult};

@ -1,6 +1,7 @@
mod compare;
mod simple;
use crate::{config::TopConfig, pagerduty::pagerduty_alert};
use anyhow::Context;
use argh::FromArgs;
use futures::{
@ -13,7 +14,6 @@ use std::time::Duration;
use tokio::sync::mpsc;
use tokio::time::{interval, MissedTickBehavior};
use tracing::{debug, error, info, warn, Level};
use web3_proxy::{config::TopConfig, pagerduty::pagerduty_alert};
#[derive(FromArgs, PartialEq, Debug, Eq)]
/// Loop healthchecks and send pager duty alerts if any fail

@ -1,3 +1,4 @@
use crate::frontend::authorization::RpcSecretKey;
use anyhow::Context;
use argh::FromArgs;
use entities::{rpc_key, user};
@ -8,7 +9,6 @@ use migration::sea_orm::{
};
use sea_orm::prelude::Uuid;
use tracing::{debug, info};
use web3_proxy::frontend::authorization::RpcSecretKey;
/// change a key's owner.
#[derive(FromArgs, PartialEq, Eq, Debug)]

@ -0,0 +1,155 @@
use ethers::{
prelude::{Http, Provider},
types::Address,
utils::{Anvil, AnvilInstance},
};
use hashbrown::HashMap;
use parking_lot::Mutex;
use std::{
env,
str::FromStr,
sync::atomic::{AtomicU16, Ordering},
};
use std::{sync::Arc, time::Duration};
use tokio::{
sync::broadcast::{self, error::SendError},
task::JoinHandle,
time::{sleep, Instant},
};
use tracing::info;
use web3_proxy::{
config::{AppConfig, TopConfig, Web3RpcConfig},
sub_commands::ProxydSubCommand,
};
pub struct TestApp {
/// anvil shuts down when this guard is dropped.
_anvil: AnvilInstance,
/// spawn handle for the proxy.
handle: Mutex<Option<JoinHandle<anyhow::Result<()>>>>,
/// tell the app to shut down (use `self.stop()`).
shutdown_sender: broadcast::Sender<()>,
/// connection to anvil.
pub anvil_provider: Provider<Http>,
/// connection to the proxy that is connected to anil.
pub proxy_provider: Provider<Http>,
}
impl TestApp {
pub async fn spawn() -> Self {
let num_workers = 2;
// TODO: move basic setup into a test fixture
let path = env::var("PATH").unwrap();
info!("path: {}", path);
// TODO: configurable rpc and block
let anvil = Anvil::new()
// .fork("https://polygon.llamarpc.com@44300000")
.spawn();
info!("Anvil running at `{}`", anvil.endpoint());
let anvil_provider = Provider::<Http>::try_from(anvil.endpoint()).unwrap();
// make a test TopConfig
// TODO: load TopConfig from a file? CliConfig could have `cli_config.load_top_config`. would need to inject our endpoint ports
let top_config = TopConfig {
app: AppConfig {
chain_id: 31337,
db_url: Some("sqlite://:memory:".into()),
default_user_max_requests_per_period: Some(6_000_000),
deposit_factory_contract: Address::from_str(
"4e3BC2054788De923A04936C6ADdB99A05B0Ea36",
)
.ok(),
min_sum_soft_limit: 1,
min_synced_rpcs: 1,
public_requests_per_period: Some(1_000_000),
response_cache_max_bytes: 10_u64.pow(7),
..Default::default()
},
balanced_rpcs: HashMap::from([(
"anvil_both".to_string(),
Web3RpcConfig {
http_url: Some(anvil.endpoint()),
ws_url: Some(anvil.ws_endpoint()),
..Default::default()
},
)]),
private_rpcs: None,
bundler_4337_rpcs: None,
extra: Default::default(),
};
let (shutdown_sender, _shutdown_receiver) = broadcast::channel(1);
let frontend_port_arc = Arc::new(AtomicU16::new(0));
let prometheus_port_arc = Arc::new(AtomicU16::new(0));
// spawn the app
// TODO: spawn in a thread so we can run from non-async tests and so the Drop impl can wait for it to stop
let handle = {
tokio::spawn(ProxydSubCommand::_main(
top_config,
None,
frontend_port_arc.clone(),
prometheus_port_arc,
num_workers,
shutdown_sender.clone(),
))
};
let mut frontend_port = frontend_port_arc.load(Ordering::Relaxed);
let start = Instant::now();
while frontend_port == 0 {
if start.elapsed() > Duration::from_secs(1) {
panic!("took too long to start!");
}
sleep(Duration::from_millis(10)).await;
frontend_port = frontend_port_arc.load(Ordering::Relaxed);
}
let proxy_endpoint = format!("http://127.0.0.1:{}", frontend_port);
let proxy_provider = Provider::<Http>::try_from(proxy_endpoint).unwrap();
Self {
handle: Mutex::new(Some(handle)),
anvil_provider,
proxy_provider,
shutdown_sender,
_anvil: anvil,
}
}
pub fn stop(&self) -> Result<usize, SendError<()>> {
self.shutdown_sender.send(())
}
pub async fn wait(&self) {
// TODO: lock+take feels weird, but it works
let handle = self.handle.lock().take();
if let Some(handle) = handle {
let _ = self.stop();
info!("waiting for the app to stop...");
handle.await.unwrap().unwrap();
}
}
}
impl Drop for TestApp {
fn drop(&mut self) {
let _ = self.stop();
// TODO: do we care about waiting for it to stop? it will slow our tests down so we probably only care about waiting in some tests
}
}

@ -0,0 +1,69 @@
mod common;
use crate::common::TestApp;
use ethers::prelude::U256;
use std::time::Duration;
use tokio::time::{sleep, Instant};
use web3_proxy::rpcs::blockchain::ArcBlock;
#[test_log::test(tokio::test)]
async fn it_works() {
let x = TestApp::spawn().await;
let anvil_provider = &x.anvil_provider;
let proxy_provider = &x.proxy_provider;
let anvil_result = anvil_provider
.request::<_, Option<ArcBlock>>("eth_getBlockByNumber", ("latest", false))
.await
.unwrap()
.unwrap();
let proxy_result = proxy_provider
.request::<_, Option<ArcBlock>>("eth_getBlockByNumber", ("latest", false))
.await
.unwrap()
.unwrap();
assert_eq!(anvil_result, proxy_result);
let first_block_num = anvil_result.number.unwrap();
// mine a block
let _: U256 = anvil_provider.request("evm_mine", ()).await.unwrap();
// make sure the block advanced
let anvil_result = anvil_provider
.request::<_, Option<ArcBlock>>("eth_getBlockByNumber", ("latest", false))
.await
.unwrap()
.unwrap();
let second_block_num = anvil_result.number.unwrap();
assert_eq!(first_block_num, second_block_num - 1);
let mut proxy_result;
let start = Instant::now();
loop {
if start.elapsed() > Duration::from_secs(1) {
panic!("took too long to sync!");
}
proxy_result = proxy_provider
.request::<_, Option<ArcBlock>>("eth_getBlockByNumber", ("latest", false))
.await
.unwrap();
if let Some(ref proxy_result) = proxy_result {
if proxy_result.number != Some(first_block_num) {
break;
}
}
sleep(Duration::from_millis(10)).await;
}
assert_eq!(anvil_result, proxy_result.unwrap());
x.wait().await;
}