From d5f2d6eb18961c960d11460fd0539b0c8efa7f21 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Wed, 25 Jan 2023 14:04:06 -0800 Subject: [PATCH] improve pager duty errors for smarter deduping --- web3_proxy/src/bin/web3_proxy_cli/main.rs | 61 +-------- .../src/bin/web3_proxy_cli/pagerduty.rs | 13 +- .../src/bin/web3_proxy_cli/sentryd/compare.rs | 94 ++++++++----- .../src/bin/web3_proxy_cli/sentryd/mod.rs | 78 ++++++++--- .../src/bin/web3_proxy_cli/sentryd/simple.rs | 52 ++++++- web3_proxy/src/config.rs | 6 +- web3_proxy/src/pagerduty.rs | 129 ++++++++++++++++-- 7 files changed, 293 insertions(+), 140 deletions(-) diff --git a/web3_proxy/src/bin/web3_proxy_cli/main.rs b/web3_proxy/src/bin/web3_proxy_cli/main.rs index 70cbf73c..852334f9 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/main.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/main.rs @@ -18,19 +18,16 @@ mod user_import; use anyhow::Context; use argh::FromArgs; use ethers::types::U256; -use gethostname::gethostname; -use log::{error, info, warn}; +use log::{info, warn}; +use pagerduty_rs::eventsv2async::EventsV2 as PagerdutyAsyncEventsV2; use pagerduty_rs::eventsv2sync::EventsV2 as PagerdutySyncEventsV2; -use pagerduty_rs::types::{AlertTrigger, AlertTriggerPayload}; -use pagerduty_rs::{eventsv2async::EventsV2 as PagerdutyAsyncEventsV2, types::Event}; -use std::collections::hash_map::DefaultHasher; -use std::hash::{Hash, Hasher}; use std::{ fs, panic, path::Path, sync::atomic::{self, AtomicUsize}, }; use tokio::runtime; +use web3_proxy::pagerduty::panic_handler; use web3_proxy::{ app::{get_db, get_migrated_db, APP_USER_AGENT}, config::TopConfig, @@ -237,57 +234,13 @@ fn main() -> anyhow::Result<()> { (None, None) }; - // panic handler that sends to pagerduty - // TODO: there is a `pagerduty_panic` module that looks like it would work with minor tweaks, but ethers-rs panics when a websocket exit and that would fire too many alerts - + // panic handler that sends to pagerduty. + // TODO: use the sentry handler if no pager duty. use default if no sentry if let Some(pagerduty_sync) = pagerduty_sync { - let client = top_config - .as_ref() - .map(|top_config| format!("web3-proxy chain #{}", top_config.app.chain_id)) - .unwrap_or_else(|| format!("web3-proxy w/o chain")); - - let client_url = top_config - .as_ref() - .and_then(|x| x.app.redirect_public_url.clone()); + let top_config = top_config.clone(); panic::set_hook(Box::new(move |x| { - let hostname = gethostname().into_string().unwrap_or("unknown".to_string()); - let panic_msg = format!("{} {:?}", x, x); - - if panic_msg.starts_with("panicked at 'WS Server panic") { - info!("Underlying library {}", panic_msg); - } else { - error!("sending panic to pagerduty: {}", panic_msg); - - let mut s = DefaultHasher::new(); - panic_msg.hash(&mut s); - panic_msg.hash(&mut s); - let dedup_key = s.finish().to_string(); - - let payload = AlertTriggerPayload { - severity: pagerduty_rs::types::Severity::Error, - summary: panic_msg, - source: hostname, - timestamp: None, - component: None, - group: Some("web3-proxy".to_string()), - class: Some("panic".to_string()), - custom_details: None::<()>, - }; - - let event = Event::AlertTrigger(AlertTrigger { - payload, - dedup_key: Some(dedup_key), - images: None, - links: None, - client: Some(client.clone()), - client_url: client_url.clone(), - }); - - if let Err(err) = pagerduty_sync.event(event) { - error!("Failed sending panic to pagerduty: {}", err); - } - } + panic_handler(top_config.clone(), &pagerduty_sync, x); })); } diff --git a/web3_proxy/src/bin/web3_proxy_cli/pagerduty.rs b/web3_proxy/src/bin/web3_proxy_cli/pagerduty.rs index 3c43daa3..7e55103a 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/pagerduty.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/pagerduty.rs @@ -3,7 +3,7 @@ use log::{error, info}; use pagerduty_rs::{eventsv2async::EventsV2 as PagerdutyAsyncEventsV2, types::Event}; use web3_proxy::{ config::TopConfig, - pagerduty::{pagerduty_alert, pagerduty_event_for_config}, + pagerduty::{pagerduty_alert, pagerduty_alert_for_config}, }; #[derive(FromArgs, PartialEq, Debug, Eq)] @@ -31,11 +31,6 @@ pub struct PagerdutySubCommand { /// If there are no open incidents with this key, a new incident will be created. /// If there is an open incident with a matching key, the new event will be appended to that incident's Alerts log as an additional Trigger log entry. dedup_key: Option, - - #[argh(option, default = "\"web3-proxy\".to_string()")] - /// a cluster or grouping of sources. - /// For example, sources "ethereum-proxy" and "polygon-proxy" might both be part of "web3-proxy". - group: String, } impl PagerdutySubCommand { @@ -47,11 +42,10 @@ impl PagerdutySubCommand { // TODO: allow customizing severity let event = top_config .map(|top_config| { - pagerduty_event_for_config( + pagerduty_alert_for_config( self.class.clone(), self.component.clone(), None::<()>, - Some(self.group.clone()), pagerduty_rs::types::Severity::Error, self.summary.clone(), None, @@ -62,11 +56,10 @@ impl PagerdutySubCommand { pagerduty_alert( self.chain_id, self.class, - "web3-proxy".to_string(), + None, None, self.component, None::<()>, - Some(self.group), pagerduty_rs::types::Severity::Error, None, self.summary, diff --git a/web3_proxy/src/bin/web3_proxy_cli/sentryd/compare.rs b/web3_proxy/src/bin/web3_proxy_cli/sentryd/compare.rs index 1421afd3..5333c738 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/sentryd/compare.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/sentryd/compare.rs @@ -7,6 +7,8 @@ use serde::{Deserialize, Serialize}; use serde_json::json; use web3_proxy::jsonrpc::JsonRpcErrorData; +use super::{SentrydErrorBuilder, SentrydResult}; + #[derive(Debug, Deserialize, Serialize)] struct JsonRpcResponse { // pub jsonrpc: String, @@ -35,11 +37,12 @@ impl From> for AbbreviatedBlock { } pub async fn main( + error_builder: SentrydErrorBuilder, rpc: String, others: Vec, max_age: i64, max_lag: i64, -) -> anyhow::Result<()> { +) -> SentrydResult { let client = reqwest::Client::new(); let block_by_number_request = json!({ @@ -54,30 +57,48 @@ pub async fn main( .json(&block_by_number_request) .send() .await - .context(format!("error fetching block from {}", rpc))? + .context(format!("error querying block from {}", rpc)) + .map_err(|x| error_builder.build(x))?; + + // TODO: capture response headers now in case of error. store them in the extra data on the pager duty alert + let headers = format!("{:#?}", a.headers()); + + let a = a .json::>>() .await - .context(format!("error parsing block from {}", rpc))? - .result - .context(format!("no block from {}", rpc))?; + .context(format!("error parsing block from {}", rpc)) + .map_err(|x| error_builder.build(x))?; + + let a = if let Some(block) = a.result { + block + } else if let Some(err) = a.error { + return error_builder.result( + anyhow::anyhow!("headers: {:#?}. err: {:#?}", headers, err) + .context(format!("jsonrpc error from {}", rpc)), + ); + } else { + return error_builder + .result(anyhow!("{:#?}", a).context(format!("empty response from {}", rpc))); + }; // check the parent because b and c might not be as fast as a let parent_hash = a.parent_hash; - let rpc_block = check_rpc(parent_hash, client.clone(), rpc.clone()) + let rpc_block = check_rpc(parent_hash, client.clone(), rpc.to_string()) .await - .context("Error while querying primary rpc")?; + .context(format!("Error while querying primary rpc: {}", rpc)) + .map_err(|err| error_builder.build(err))?; let fs = FuturesUnordered::new(); for other in others.iter() { - let f = check_rpc(parent_hash, client.clone(), other.clone()); + let f = check_rpc(parent_hash, client.clone(), other.to_string()); fs.push(tokio::spawn(f)); } let other_check: Vec<_> = fs.collect().await; if other_check.is_empty() { - return Err(anyhow::anyhow!("No other RPCs to check!")); + return error_builder.result(anyhow::anyhow!("No other RPCs to check!")); } // TODO: collect into a counter instead? @@ -99,22 +120,27 @@ pub async fn main( match duration_since.abs().cmp(&max_lag) { std::cmp::Ordering::Less | std::cmp::Ordering::Equal => {} std::cmp::Ordering::Greater => match duration_since.cmp(&0) { - std::cmp::Ordering::Equal => unimplemented!(), + std::cmp::Ordering::Equal => { + unimplemented!("we already checked that they are not equal") + } std::cmp::Ordering::Less => { - return Err(anyhow::anyhow!( + return error_builder.result(anyhow::anyhow!( "Our RPC is too far ahead ({} s)! Something might be wrong.\n{:#}\nvs\n{:#}", duration_since.abs(), json!(rpc_block), json!(newest_other), - )); + ).context(format!("{} is too far ahead", rpc))); } std::cmp::Ordering::Greater => { - return Err(anyhow::anyhow!( - "Our RPC is too far behind ({} s)!\n{:#}\nvs\n{:#}", - duration_since, - json!(rpc_block), - json!(newest_other), - )); + return error_builder.result( + anyhow::anyhow!( + "Behind {} s!\n{:#}\nvs\n{:#}", + duration_since, + json!(rpc_block), + json!(newest_other), + ) + .context(format!("{} is too far behind", rpc)), + ); } }, } @@ -130,25 +156,31 @@ pub async fn main( std::cmp::Ordering::Greater => match duration_since.cmp(&0) { std::cmp::Ordering::Equal => unimplemented!(), std::cmp::Ordering::Less => { - return Err(anyhow::anyhow!( - "Our clock is too far behind ({} s)! Something might be wrong.\n{:#}\nvs\n{:#}", - block_age.abs(), - json!(now), - json!(newest_other), - )); + return error_builder.result( + anyhow::anyhow!( + "Clock is behind {}s! Something might be wrong.\n{:#}\nvs\n{:#}", + block_age.abs(), + json!(now), + json!(newest_other), + ) + .context(format!("Clock is too far behind on {}!", rpc)), + ); } std::cmp::Ordering::Greater => { - return Err(anyhow::anyhow!( - "block is too old ({} s)!\n{:#}\nvs\n{:#}", - block_age, - json!(now), - json!(newest_other), - )); + return error_builder.result( + anyhow::anyhow!( + "block is too old ({}s)!\n{:#}\nvs\n{:#}", + block_age, + json!(now), + json!(newest_other), + ) + .context(format!("block is too old on {}!", rpc)), + ); } }, } } else { - return Err(anyhow::anyhow!("No other RPC times to check!")); + return error_builder.result(anyhow::anyhow!("No other RPC times to check!")); } debug!("rpc comparison ok: {:#}", json!(rpc_block)); diff --git a/web3_proxy/src/bin/web3_proxy_cli/sentryd/mod.rs b/web3_proxy/src/bin/web3_proxy_cli/sentryd/mod.rs index b9a77937..3f32e68b 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/sentryd/mod.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/sentryd/mod.rs @@ -9,6 +9,7 @@ use futures::{ }; use log::{error, info}; use pagerduty_rs::{eventsv2async::EventsV2 as PagerdutyAsyncEventsV2, types::Event}; +use serde_json::json; use std::time::Duration; use tokio::sync::mpsc; use tokio::time::{interval, MissedTickBehavior}; @@ -48,12 +49,41 @@ pub struct SentrydSubCommand { } #[derive(Debug)] -struct Error { +pub struct SentrydError { + /// The class/type of the event, for example ping failure or cpu load + class: String, + /// Errors will send a pagerduty alert. others just give log messages + level: log::Level, + /// A short summary that should be mostly static + summary: String, + /// Lots of detail about the error + extra: Option, +} + +/// helper for creating SentrydErrors +#[derive(Clone)] +pub struct SentrydErrorBuilder { class: String, level: log::Level, - anyhow: anyhow::Error, } +impl SentrydErrorBuilder { + fn build(&self, err: anyhow::Error) -> SentrydError { + SentrydError { + class: self.class.to_owned(), + level: self.level.to_owned(), + summary: format!("{}", err), + extra: Some(json!(format!("{:#?}", err))), + } + } + + fn result(&self, err: anyhow::Error) -> SentrydResult { + Err(self.build(err)) + } +} + +type SentrydResult = Result<(), SentrydError>; + impl SentrydSubCommand { pub async fn main( self, @@ -86,7 +116,7 @@ impl SentrydSubCommand { let mut handles = FuturesUnordered::new(); // channels and a task for sending errors to logs/pagerduty - let (error_sender, mut error_receiver) = mpsc::channel::(10); + let (error_sender, mut error_receiver) = mpsc::channel::(10); { let error_handler_f = async move { @@ -101,14 +131,13 @@ impl SentrydSubCommand { let alert = pagerduty_alert( Some(chain_id), Some(err.class), - "web3-proxy-sentry".to_string(), - None, - None, - None::<()>, Some("web3-proxy-sentry".to_string()), + None, + None, + err.extra, pagerduty_rs::types::Severity::Error, None, - format!("{:#?}", err.anyhow), + err.summary, None, ); @@ -140,12 +169,15 @@ impl SentrydSubCommand { let url = format!("{}/health", web3_proxy); let error_sender = error_sender.clone(); + // TODO: what timeout? + let timeout = Duration::from_secs(1); + let loop_f = a_loop( "main /health", seconds, log::Level::Error, error_sender, - move || simple::main(url.clone()), + move |error_builder| simple::main(error_builder, url.clone(), timeout), ); handles.push(tokio::spawn(loop_f)); @@ -162,12 +194,15 @@ impl SentrydSubCommand { let error_sender = error_sender.clone(); + // TODO: what timeout? + let timeout = Duration::from_secs(1); + let loop_f = a_loop( "other /health", seconds, log::Level::Warn, error_sender, - move || simple::main(url.clone()), + move |error_builder| simple::main(error_builder, url.clone(), timeout), ); handles.push(tokio::spawn(loop_f)); @@ -189,7 +224,9 @@ impl SentrydSubCommand { seconds, log::Level::Error, error_sender, - move || compare::main(rpc.clone(), others.clone(), max_age, max_lag), + move |error_builder| { + compare::main(error_builder, rpc.clone(), others.clone(), max_age, max_lag) + }, ); handles.push(tokio::spawn(loop_f)); @@ -209,12 +246,17 @@ async fn a_loop( class: &str, seconds: u64, error_level: log::Level, - error_sender: mpsc::Sender, - f: impl Fn() -> T, + error_sender: mpsc::Sender, + f: impl Fn(SentrydErrorBuilder) -> T, ) -> anyhow::Result<()> where - T: Future> + Send + 'static, + T: Future + Send + 'static, { + let error_builder = SentrydErrorBuilder { + class: class.to_owned(), + level: error_level, + }; + let mut interval = interval(Duration::from_secs(seconds)); // TODO: should we warn if there are delays? @@ -223,13 +265,7 @@ where loop { interval.tick().await; - if let Err(err) = f().await { - let err = Error { - class: class.to_string(), - level: error_level, - anyhow: err, - }; - + if let Err(err) = f(error_builder.clone()).await { error_sender.send(err).await?; }; } diff --git a/web3_proxy/src/bin/web3_proxy_cli/sentryd/simple.rs b/web3_proxy/src/bin/web3_proxy_cli/sentryd/simple.rs index 1904553d..54dffde4 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/sentryd/simple.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/sentryd/simple.rs @@ -1,22 +1,60 @@ +use std::time::Duration; + +use super::{SentrydErrorBuilder, SentrydResult}; use anyhow::Context; use log::{debug, trace}; +use tokio::time::Instant; /// GET the url and return an error if it wasn't a success -pub async fn main(url: String) -> anyhow::Result<()> { +pub async fn main( + error_builder: SentrydErrorBuilder, + url: String, + timeout: Duration, +) -> SentrydResult { + let start = Instant::now(); + let r = reqwest::get(&url) .await - .context(format!("Failed GET {}", url))?; + .context(format!("Failed GET {}", &url)) + .map_err(|x| error_builder.build(x))?; + + let elapsed = start.elapsed(); + + if elapsed > timeout { + return error_builder.result( + anyhow::anyhow!( + "query took longer than {}ms ({}ms): {:#?}", + timeout.as_millis(), + elapsed.as_millis(), + r + ) + .context(format!("fetching {} took too long", &url)), + ); + } + + // TODO: what should we do if we get rate limited here? if r.status().is_success() { - // warn if latency is high? - debug!("{} is healthy", url); + debug!("{} is healthy", &url); trace!("Successful {:#?}", r); return Ok(()); } - let debug_str = format!("{:#?}", r); + // TODO: capture headers? or is that already part of r? + let detail = format!("{:#?}", r); - let body = r.text().await?; + let summary = format!("{} is unhealthy: {}", &url, r.status()); - Err(anyhow::anyhow!("{}: {}", debug_str, body)) + let body = r + .text() + .await + .context(detail.clone()) + .context(summary.clone()) + .map_err(|x| error_builder.build(x))?; + + error_builder.result( + anyhow::anyhow!("body: {}", body) + .context(detail) + .context(summary), + ) } diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index 67f48870..9e40db5a 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -39,7 +39,7 @@ pub struct CliConfig { pub cookie_key_filename: String, } -#[derive(Debug, Deserialize)] +#[derive(Clone, Debug, Deserialize)] pub struct TopConfig { pub app: AppConfig, pub balanced_rpcs: HashMap, @@ -52,7 +52,7 @@ pub struct TopConfig { /// shared configuration between Web3Connections // TODO: no String, only &str -#[derive(Debug, Default, Deserialize)] +#[derive(Clone, Debug, Default, Deserialize)] pub struct AppConfig { /// Request limit for allowed origins for anonymous users. /// These requests get rate limited by IP. @@ -190,7 +190,7 @@ fn default_response_cache_max_bytes() -> usize { } /// Configuration for a backend web3 RPC server -#[derive(Debug, Deserialize)] +#[derive(Clone, Debug, Deserialize)] pub struct Web3ConnectionConfig { /// simple way to disable a connection without deleting the row #[serde(default)] diff --git a/web3_proxy/src/pagerduty.rs b/web3_proxy/src/pagerduty.rs index 75675ec8..9eacbff9 100644 --- a/web3_proxy/src/pagerduty.rs +++ b/web3_proxy/src/pagerduty.rs @@ -1,19 +1,122 @@ +use crate::config::TopConfig; +use gethostname::gethostname; +use log::{debug, error}; +use pagerduty_rs::eventsv2sync::EventsV2 as PagerdutySyncEventsV2; +use pagerduty_rs::types::{AlertTrigger, AlertTriggerPayload, Event}; +use serde::Serialize; use std::{ collections::hash_map::DefaultHasher, hash::{Hash, Hasher}, + panic::PanicInfo, }; - -use crate::config::TopConfig; -use gethostname::gethostname; -use pagerduty_rs::types::{AlertTrigger, AlertTriggerPayload}; -use serde::Serialize; use time::OffsetDateTime; -pub fn pagerduty_event_for_config( +/* + + let client = top_config + .as_ref() + .map(|top_config| format!("web3-proxy chain #{}", top_config.app.chain_id)) + .unwrap_or_else(|| format!("web3-proxy w/o chain")); + + let client_url = top_config + .as_ref() + .and_then(|x| x.app.redirect_public_url.clone()); + + panic::set_hook(Box::new(move |x| { + let hostname = gethostname().into_string().unwrap_or("unknown".to_string()); + let panic_msg = format!("{} {:?}", x, x); + + if panic_msg.starts_with("panicked at 'WS Server panic") { + info!("Underlying library {}", panic_msg); + } else { + error!("sending panic to pagerduty: {}", panic_msg); + + let mut s = DefaultHasher::new(); + panic_msg.hash(&mut s); + panic_msg.hash(&mut s); + let dedup_key = s.finish().to_string(); + + let payload = AlertTriggerPayload { + severity: pagerduty_rs::types::Severity::Error, + summary: panic_msg, + source: hostname, + timestamp: None, + component: None, + group: Some("web3-proxy".to_string()), + class: Some("panic".to_string()), + custom_details: None::<()>, + }; + + let event = Event::AlertTrigger(AlertTrigger { + payload, + dedup_key: Some(dedup_key), + images: None, + links: None, + client: Some(client.clone()), + client_url: client_url.clone(), + }); + + if let Err(err) = pagerduty_sync.event(event) { + error!("Failed sending panic to pagerduty: {}", err); + } + } + })); + +*/ + +pub fn panic_handler( + top_config: Option, + pagerduty_sync: &PagerdutySyncEventsV2, + panic_info: &PanicInfo, +) { + let summary = format!("{}", panic_info); + + let details = format!("{:#?}", panic_info); + + if summary.starts_with("panicked at 'WS Server panic") { + // the ethers-rs library panics when websockets disconnect. this isn't a panic we care about reporting + debug!("Underlying library {}", details); + return; + } + + let class = Some("panic".to_string()); + + let alert = if let Some(top_config) = top_config { + pagerduty_alert_for_config( + class, + None, + Some(details), + pagerduty_rs::types::Severity::Critical, + summary, + None, + top_config, + ) + } else { + pagerduty_alert( + None, + class, + None, + None, + None, + Some(details), + pagerduty_rs::types::Severity::Critical, + None, + summary, + None, + ) + }; + + let event = Event::AlertTrigger(alert); + + if let Err(err) = pagerduty_sync.event(event) { + error!("Failed sending alert to pagerduty! {:#?}", err); + } +} + +pub fn pagerduty_alert_for_config( class: Option, component: Option, custom_details: Option, - group: Option, severity: pagerduty_rs::types::Severity, summary: String, timestamp: Option, @@ -26,11 +129,10 @@ pub fn pagerduty_event_for_config( pagerduty_alert( Some(chain_id), class, - "web3-proxy".to_string(), + None, client_url, component, custom_details, - group, severity, None, summary, @@ -41,19 +143,18 @@ pub fn pagerduty_event_for_config( pub fn pagerduty_alert( chain_id: Option, class: Option, - client: String, + client: Option, client_url: Option, component: Option, custom_details: Option, - group: Option, severity: pagerduty_rs::types::Severity, source: Option, summary: String, timestamp: Option, ) -> AlertTrigger { - let client = chain_id - .map(|x| format!("{} chain #{}", client, x)) - .unwrap_or_else(|| format!("{} w/o chain", client)); + let client = client.unwrap_or_else(|| "web3-proxy".to_string()); + + let group = chain_id.map(|x| format!("chain #{}", x)); let source = source.unwrap_or_else(|| gethostname().into_string().unwrap_or("unknown".to_string()));