diff --git a/web3_proxy/src/bin/web3_proxy_cli/health_compass.rs b/web3_proxy/src/bin/web3_proxy_cli/health_compass.rs deleted file mode 100644 index 4bdffbe9..00000000 --- a/web3_proxy/src/bin/web3_proxy_cli/health_compass.rs +++ /dev/null @@ -1,137 +0,0 @@ -use argh::FromArgs; -use ethers::types::{Block, TxHash, H256}; -use log::{error, info, warn}; -use serde::{Deserialize, Serialize}; -use serde_json::json; -use web3_proxy::jsonrpc::JsonRpcErrorData; - -#[derive(FromArgs, PartialEq, Debug, Eq)] -/// Never bring only 2 compasses to sea. -#[argh(subcommand, name = "health_compass")] -pub struct HealthCompassSubCommand { - #[argh(positional)] - /// first rpc - rpc_a: String, - - #[argh(positional)] - /// second rpc - rpc_b: String, - - #[argh(positional)] - /// third rpc - rpc_c: String, -} - -#[derive(Debug, Deserialize, Serialize)] -struct JsonRpcResponse { - // pub jsonrpc: String, - // pub id: Box, - #[serde(skip_serializing_if = "Option::is_none")] - pub result: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub error: Option, -} - -impl HealthCompassSubCommand { - pub async fn main(self) -> anyhow::Result<()> { - let client = reqwest::Client::new(); - - let block_by_number_request = json!({ - "jsonrpc": "2.0", - "id": "1", - "method": "eth_getBlockByNumber", - "params": ["latest", false], - }); - - let a = client - .post(&self.rpc_a) - .json(&block_by_number_request) - .send() - .await? - .json::>>() - .await? - .result - .unwrap(); - - // check the parent because b and c might not be as fast as a - let parent_hash = a.parent_hash; - - let a = check_rpc(&parent_hash, &client, &self.rpc_a).await; - let b = check_rpc(&parent_hash, &client, &self.rpc_b).await; - let c = check_rpc(&parent_hash, &client, &self.rpc_c).await; - - match (a, b, c) { - (Ok(Ok(a)), Ok(Ok(b)), Ok(Ok(c))) => { - if a != b { - error!("A: {:?}\n\nB: {:?}\n\nC: {:?}", a, b, c); - return Err(anyhow::anyhow!("difference detected!")); - } - - if b != c { - error!("\nA: {:?}\n\nB: {:?}\n\nC: {:?}", a, b, c); - return Err(anyhow::anyhow!("difference detected!")); - } - - // all three rpcs agree - } - (Ok(Ok(a)), Ok(Ok(b)), c) => { - // not all successes! but still enough to compare - warn!("C failed: {:?}", c); - - if a != b { - error!("\nA: {:?}\n\nB: {:?}", a, b); - return Err(anyhow::anyhow!("difference detected!")); - } - } - (Ok(Ok(a)), b, Ok(Ok(c))) => { - // not all successes! but still enough to compare - warn!("B failed: {:?}", b); - - if a != c { - error!("\nA: {:?}\n\nC: {:?}", a, c); - return Err(anyhow::anyhow!("difference detected!")); - } - } - (a, b, c) => { - // not enough successes - error!("A: {:?}\n\nB: {:?}\n\nC: {:?}", a, b, c); - return Err(anyhow::anyhow!("All are failing!")); - } - } - - info!("OK"); - - Ok(()) - } -} - -// i don't think we need a whole provider. a simple http request is easiest -async fn check_rpc( - block_hash: &H256, - client: &reqwest::Client, - rpc: &str, -) -> anyhow::Result, JsonRpcErrorData>> { - let block_by_hash_request = json!({ - "jsonrpc": "2.0", - "id": "1", - "method": "eth_getBlockByHash", - "params": [block_hash, false], - }); - - // TODO: don't unwrap! don't use the try operator - let response: JsonRpcResponse> = client - .post(rpc) - .json(&block_by_hash_request) - .send() - .await? - .json() - .await?; - - if let Some(result) = response.result { - Ok(Ok(result)) - } else if let Some(result) = response.error { - Ok(Err(result)) - } else { - unimplemented!("{:?}", response) - } -} diff --git a/web3_proxy/src/bin/web3_proxy_cli/main.rs b/web3_proxy/src/bin/web3_proxy_cli/main.rs index c60b9446..e1241908 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/main.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/main.rs @@ -6,14 +6,15 @@ mod check_config; mod count_users; mod create_user; mod drop_migration_lock; -mod health_compass; mod list_user_tier; mod rpc_accounting; +mod sentryd; mod transfer_key; mod user_export; mod user_import; use argh::FromArgs; +use log::warn; use std::fs; use web3_proxy::{ app::{get_db, get_migrated_db}, @@ -27,13 +28,17 @@ pub struct CliConfig { #[argh(option)] pub config: Option, - /// if no config, what database the client should connect to. Defaults to dev db. + /// if no config, what database the client should connect to. Defaults to dev db #[argh( option, default = "\"mysql://root:dev_web3_proxy@127.0.0.1:13306/dev_web3_proxy\".to_string()" )] pub db_url: String, + /// if no config, what sentry url should the client should connect to + #[argh(option)] + pub sentry_url: Option, + /// this one cli can do multiple things #[argh(subcommand)] sub_command: SubCommand, @@ -50,8 +55,8 @@ enum SubCommand { CountUsers(count_users::CountUsersSubCommand), CreateUser(create_user::CreateUserSubCommand), DropMigrationLock(drop_migration_lock::DropMigrationLockSubCommand), - HealthCompass(health_compass::HealthCompassSubCommand), RpcAccounting(rpc_accounting::RpcAccountingSubCommand), + Sentryd(sentryd::SentrydSubCommand), TransferKey(transfer_key::TransferKeySubCommand), UserExport(user_export::UserExportSubCommand), UserImport(user_import::UserImportSubCommand), @@ -64,12 +69,10 @@ enum SubCommand { async fn main() -> anyhow::Result<()> { // if RUST_LOG isn't set, configure a default // TODO: is there a better way to do this? - if std::env::var("RUST_LOG").is_err() { - // std::env::set_var("RUST_LOG", "info,web3_proxy=debug,web3_proxy_cli=debug"); - std::env::set_var("RUST_LOG", "info,web3_proxy=debug,web3_proxy_cli=debug"); - } - - env_logger::init(); + let rust_log = match std::env::var("RUST_LOG") { + Ok(x) => x, + Err(_) => "info,web3_proxy=debug,web3_proxy_cli=debug".to_string(), + }; // this probably won't matter for us in docker, but better safe than sorry fdlimit::raise_fd_limit(); @@ -80,8 +83,12 @@ async fn main() -> anyhow::Result<()> { let top_config: String = fs::read_to_string(top_config_path)?; let top_config: TopConfig = toml::from_str(&top_config)?; - if let Some(top_config_db_url) = top_config.app.db_url.clone() { - cli_config.db_url = top_config_db_url; + if let Some(db_url) = top_config.app.db_url.clone() { + cli_config.db_url = db_url; + } + + if let Some(sentry_url) = top_config.app.sentry_url.clone() { + cli_config.sentry_url = Some(sentry_url); } Some(top_config) @@ -89,6 +96,36 @@ async fn main() -> anyhow::Result<()> { None }; + let logger = env_logger::builder().parse_filters(&rust_log).build(); + + let max_level = logger.filter(); + + // connect to sentry for error reporting + // if no sentry, only log to stdout + let _sentry_guard = if let Some(sentry_url) = cli_config.sentry_url.clone() { + let logger = sentry::integrations::log::SentryLogger::with_dest(logger); + + log::set_boxed_logger(Box::new(logger)).unwrap(); + + let guard = sentry::init(( + sentry_url, + sentry::ClientOptions { + release: sentry::release_name!(), + // TODO: Set this a to lower value (from config) in production + traces_sample_rate: 1.0, + ..Default::default() + }, + )); + + Some(guard) + } else { + log::set_boxed_logger(Box::new(logger)).unwrap(); + + None + }; + + log::set_max_level(max_level); + match cli_config.sub_command { SubCommand::ChangeUserAddress(x) => { let db_conn = get_db(cli_config.db_url, 1, 1).await?; @@ -127,7 +164,13 @@ async fn main() -> anyhow::Result<()> { x.main(&db_conn).await } - SubCommand::HealthCompass(x) => x.main().await, + SubCommand::Sentryd(x) => { + if cli_config.sentry_url.is_none() { + warn!("sentry_url is not set! Logs will only show in this console"); + } + + x.main().await + } SubCommand::RpcAccounting(x) => { let db_conn = get_migrated_db(cli_config.db_url, 1, 1).await?; diff --git a/web3_proxy/src/bin/web3_proxy_cli/sentryd/compare.rs b/web3_proxy/src/bin/web3_proxy_cli/sentryd/compare.rs new file mode 100644 index 00000000..50fcea7a --- /dev/null +++ b/web3_proxy/src/bin/web3_proxy_cli/sentryd/compare.rs @@ -0,0 +1,183 @@ +use anyhow::{anyhow, Context}; +use chrono::{DateTime, Utc}; +use ethers::types::{Block, TxHash, H256, U64}; +use futures::{stream::FuturesUnordered, StreamExt}; +use log::{debug, warn}; +use serde::{Deserialize, Serialize}; +use serde_json::json; +use web3_proxy::jsonrpc::JsonRpcErrorData; + +#[derive(Debug, Deserialize, Serialize)] +struct JsonRpcResponse { + // pub jsonrpc: String, + // pub id: Box, + #[serde(skip_serializing_if = "Option::is_none")] + pub result: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub error: Option, +} + +#[derive(Serialize, Ord, PartialEq, PartialOrd, Eq)] +struct AbbreviatedBlock { + pub num: u64, + pub time: DateTime, + pub hash: H256, +} + +impl From> for AbbreviatedBlock { + fn from(x: Block) -> Self { + Self { + num: x.number.unwrap().as_u64(), + hash: x.hash.unwrap(), + time: x.time().unwrap(), + } + } +} + +pub async fn main(rpc: String, others: Vec, max_lag: i64) -> anyhow::Result<()> { + let client = reqwest::Client::new(); + + let block_by_number_request = json!({ + "jsonrpc": "2.0", + "id": "1", + "method": "eth_getBlockByNumber", + "params": ["latest", false], + }); + + let a = client + .post(&rpc) + .json(&block_by_number_request) + .send() + .await? + .json::>>() + .await? + .result + .unwrap(); + + // check the parent because b and c might not be as fast as a + let parent_hash = a.parent_hash; + + let rpc_block = check_rpc(parent_hash, client.clone(), rpc.clone()) + .await + .context("Error while querying primary rpc")?; + + let fs = FuturesUnordered::new(); + for other in others.iter() { + let f = check_rpc(parent_hash, client.clone(), other.clone()); + + fs.push(tokio::spawn(f)); + } + let other_check: Vec<_> = fs.collect().await; + + if other_check.is_empty() { + return Err(anyhow::anyhow!("No other RPCs to check!")); + } + + // TODO: collect into a counter instead? + let mut newest_other = None; + for oc in other_check.iter() { + match oc { + Ok(Ok(x)) => newest_other = newest_other.max(Some(x)), + Ok(Err(err)) => warn!("failed checking other rpc: {:?}", err), + Err(err) => warn!("internal error checking other rpc: {:?}", err), + } + } + + if let Some(newest_other) = newest_other { + let duration_since = newest_other + .time + .signed_duration_since(rpc_block.time) + .num_seconds(); + + match duration_since.abs().cmp(&max_lag) { + std::cmp::Ordering::Less | std::cmp::Ordering::Equal => {} + std::cmp::Ordering::Greater => match duration_since.cmp(&0) { + std::cmp::Ordering::Equal => unimplemented!(), + std::cmp::Ordering::Less => { + return Err(anyhow::anyhow!( + "Our RPC is too far ahead ({} s)! Something might be wrong.\n{:#}\nvs\n{:#}", + duration_since.abs(), + json!(rpc_block), + json!(newest_other), + )); + } + std::cmp::Ordering::Greater => { + return Err(anyhow::anyhow!( + "Our RPC is too far behind ({} s)!\n{:#}\nvs\n{:#}", + duration_since, + json!(rpc_block), + json!(newest_other), + )); + } + }, + } + + let now = Utc::now(); + + let block_age = now + .signed_duration_since(newest_other.max(&rpc_block).time) + .num_seconds(); + + match block_age.abs().cmp(&max_lag) { + std::cmp::Ordering::Less | std::cmp::Ordering::Equal => {} + std::cmp::Ordering::Greater => match duration_since.cmp(&0) { + std::cmp::Ordering::Equal => unimplemented!(), + std::cmp::Ordering::Less => { + return Err(anyhow::anyhow!( + "Our clock is too far behind ({} s)! Something might be wrong.\n{:#}\nvs\n{:#}", + block_age.abs(), + json!(now), + json!(newest_other), + )); + } + std::cmp::Ordering::Greater => { + return Err(anyhow::anyhow!( + "block is too old ({} s)!\n{:#}\nvs\n{:#}", + block_age, + json!(now), + json!(newest_other), + )); + } + }, + } + } else { + return Err(anyhow::anyhow!("No other RPC times to check!")); + } + + debug!("rpc comparison ok: {:#}", json!(rpc_block)); + + Ok(()) +} + +// i don't think we need a whole provider. a simple http request is easiest +async fn check_rpc( + block_hash: H256, + client: reqwest::Client, + rpc: String, +) -> anyhow::Result { + let block_by_hash_request = json!({ + "jsonrpc": "2.0", + "id": "1", + "method": "eth_getBlockByHash", + "params": [block_hash, false], + }); + + // TODO: don't unwrap! don't use the try operator + let response: JsonRpcResponse> = client + .post(rpc) + .json(&block_by_hash_request) + .send() + .await? + .json() + .await?; + + if let Some(result) = response.result { + let abbreviated = AbbreviatedBlock::from(result); + + Ok(abbreviated) + } else if let Some(result) = response.error { + Err(anyhow!("Failed parsing response as JSON: {:?}", result)) + } else { + unimplemented!("{:?}", response) + } +} diff --git a/web3_proxy/src/bin/web3_proxy_cli/sentryd/mod.rs b/web3_proxy/src/bin/web3_proxy_cli/sentryd/mod.rs new file mode 100644 index 00000000..22b6aab1 --- /dev/null +++ b/web3_proxy/src/bin/web3_proxy_cli/sentryd/mod.rs @@ -0,0 +1,112 @@ +mod compare; +mod simple; + +use argh::FromArgs; +use futures::{ + stream::{FuturesUnordered, StreamExt}, + Future, +}; +use std::time::Duration; +use tokio::time::{interval, MissedTickBehavior}; + +#[derive(FromArgs, PartialEq, Debug, Eq)] +/// Loop healthchecks and send pager duty alerts if any fail +#[argh(subcommand, name = "sentryd")] +pub struct SentrydSubCommand { + #[argh(positional)] + /// a descriptive name for this node (probably the hostname) + location: String, + + #[argh(positional)] + /// the main (HTTP only) web3-proxy being checked. + web3_proxy: String, + + #[argh(option)] + /// warning threshold for seconds between the rpc and best other_rpc's head blocks + max_lag: i64, + + #[argh(option)] + /// other (HTTP only) rpcs to compare the main rpc to + other_rpc: Vec, + + #[argh(option)] + /// other (HTTP only) web3-proxies to compare the main rpc to + other_proxy: Vec, + + #[argh(option)] + /// how many seconds between running checks + seconds: Option, +} + +impl SentrydSubCommand { + pub async fn main(self) -> anyhow::Result<()> { + // sentry logging should already be configured + + let seconds = self.seconds.unwrap_or(60); + + let mut handles = FuturesUnordered::new(); + + // spawn a bunch of health check loops that do their checks on an interval + + // check the main rpc's /health endpoint + { + let url = format!("{}/health", self.web3_proxy); + + let loop_f = a_loop(seconds, log::Level::Error, move || { + simple::main(url.clone()) + }); + + handles.push(tokio::spawn(loop_f)); + } + // check any other web3-proxy /health endpoints + for other_web3_proxy in self.other_proxy.iter() { + let url = format!("{}/health", other_web3_proxy); + + let loop_f = a_loop(seconds, log::Level::Warn, move || simple::main(url.clone())); + + handles.push(tokio::spawn(loop_f)); + } + + // compare the main web3-proxy head block to all web3-proxies and rpcs + { + let max_lag = self.max_lag; + let rpc = self.web3_proxy.clone(); + + let mut others = self.other_proxy.clone(); + + others.extend(self.other_rpc.clone()); + + let loop_f = a_loop(seconds, log::Level::Error, move || { + compare::main(rpc.clone(), others.clone(), max_lag) + }); + + handles.push(tokio::spawn(loop_f)); + } + + // wait for any returned values (if everything is working, they will all run forever) + while let Some(x) = handles.next().await { + // any errors that make it here will end the program + x??; + } + + Ok(()) + } +} + +async fn a_loop(seconds: u64, error_level: log::Level, f: impl Fn() -> T) -> anyhow::Result<()> +where + T: Future> + Send + 'static, +{ + let mut interval = interval(Duration::from_secs(seconds)); + + // TODO: should we warn if there are delays? + interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + + loop { + interval.tick().await; + + if let Err(err) = f().await { + log::log!(error_level, "check failed: {:?}", err); + }; + } +} diff --git a/web3_proxy/src/bin/web3_proxy_cli/sentryd/simple.rs b/web3_proxy/src/bin/web3_proxy_cli/sentryd/simple.rs new file mode 100644 index 00000000..1904553d --- /dev/null +++ b/web3_proxy/src/bin/web3_proxy_cli/sentryd/simple.rs @@ -0,0 +1,22 @@ +use anyhow::Context; +use log::{debug, trace}; + +/// GET the url and return an error if it wasn't a success +pub async fn main(url: String) -> anyhow::Result<()> { + let r = reqwest::get(&url) + .await + .context(format!("Failed GET {}", url))?; + + if r.status().is_success() { + // warn if latency is high? + debug!("{} is healthy", url); + trace!("Successful {:#?}", r); + return Ok(()); + } + + let debug_str = format!("{:#?}", r); + + let body = r.text().await?; + + Err(anyhow::anyhow!("{}: {}", debug_str, body)) +} diff --git a/web3_proxy/src/frontend/mod.rs b/web3_proxy/src/frontend/mod.rs index 1507a835..f4005963 100644 --- a/web3_proxy/src/frontend/mod.rs +++ b/web3_proxy/src/frontend/mod.rs @@ -40,6 +40,8 @@ pub async fn serve(port: u16, proxy_app: Arc) -> anyhow::Result<() .time_to_live(Duration::from_secs(1)) .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()); + // TODO: read config for if fastest/versus should be available publicly. default off + // build our axum Router let app = Router::new() // TODO: i think these routes could be done a lot better