From 36d64489d8200e539da6ac74aa7f13f5b873f278 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Tue, 24 Jan 2023 03:12:23 -0800 Subject: [PATCH] sentryd to pagerduty --- TODO.md | 1 + web3_proxy/src/bin/web3_proxy_cli/main.rs | 2 +- .../src/bin/web3_proxy_cli/pagerduty.rs | 15 +-- .../src/bin/web3_proxy_cli/sentryd/mod.rs | 92 +++++++++++++++---- 4 files changed, 86 insertions(+), 24 deletions(-) diff --git a/TODO.md b/TODO.md index 10393a3e..15ba2239 100644 --- a/TODO.md +++ b/TODO.md @@ -318,6 +318,7 @@ These are not yet ordered. There might be duplicates. We might not actually need - [x] use channels instead of arcswap - this will let us easily wait for a new head or a new synced connection - [x] broadcast transactions to more servers +- [x] send sentryd errors to pagerduty - [-] proxy mode for benchmarking all backends - [-] proxy mode for sending to multiple backends - [-] let users choose a % of reverts to log (or maybe x/second). someone like curve logging all reverts will be a BIG database very quickly diff --git a/web3_proxy/src/bin/web3_proxy_cli/main.rs b/web3_proxy/src/bin/web3_proxy_cli/main.rs index 7a3a3ee9..a0aebbaa 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/main.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/main.rs @@ -368,7 +368,7 @@ fn main() -> anyhow::Result<()> { warn!("sentry_url is not set! Logs will only show in this console"); } - x.main().await + x.main(pagerduty_async).await } SubCommand::RpcAccounting(x) => { let db_url = cli_config diff --git a/web3_proxy/src/bin/web3_proxy_cli/pagerduty.rs b/web3_proxy/src/bin/web3_proxy_cli/pagerduty.rs index 34cd5586..4fadf11a 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/pagerduty.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/pagerduty.rs @@ -3,7 +3,7 @@ use log::{error, info}; use pagerduty_rs::{eventsv2async::EventsV2 as PagerdutyAsyncEventsV2, types::Event}; use web3_proxy::{ config::TopConfig, - pagerduty::{pagerduty_event_for_config, trigger_pagerduty_alert}, + pagerduty::{pagerduty_alert, pagerduty_event_for_config}, }; #[derive(FromArgs, PartialEq, Debug, Eq)] @@ -40,30 +40,33 @@ impl PagerdutySubCommand { pagerduty_async: Option, top_config: Option, ) -> anyhow::Result<()> { + // TODO: allow customizing severity let event = top_config .map(|top_config| { pagerduty_event_for_config( - top_config, self.class.clone(), self.component.clone(), + None::<()>, Some(self.group.clone()), + pagerduty_rs::types::Severity::Error, self.summary.clone(), None, - None::<()>, + top_config, ) }) .unwrap_or_else(|| { - trigger_pagerduty_alert( - "web3-proxy".to_string(), + pagerduty_alert( None, self.class, + "web3-proxy".to_string(), None, self.component, + None::<()>, Some(self.group), + pagerduty_rs::types::Severity::Error, None, self.summary, None, - None::<()>, ) }); diff --git a/web3_proxy/src/bin/web3_proxy_cli/sentryd/mod.rs b/web3_proxy/src/bin/web3_proxy_cli/sentryd/mod.rs index f9e46cd7..d7f03da9 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/sentryd/mod.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/sentryd/mod.rs @@ -6,9 +6,12 @@ use futures::{ stream::{FuturesUnordered, StreamExt}, Future, }; +use log::{error, info}; +use pagerduty_rs::{eventsv2async::EventsV2 as PagerdutyAsyncEventsV2, types::Event}; use std::time::Duration; use tokio::sync::mpsc; use tokio::time::{interval, MissedTickBehavior}; +use web3_proxy::pagerduty::pagerduty_alert; #[derive(FromArgs, PartialEq, Debug, Eq)] /// Loop healthchecks and send pager duty alerts if any fail @@ -39,8 +42,15 @@ pub struct SentrydSubCommand { seconds: Option, } +#[derive(Debug)] +struct Error { + class: String, + level: log::Level, + anyhow: anyhow::Error, +} + impl SentrydSubCommand { - pub async fn main(self) -> anyhow::Result<()> { + pub async fn main(self, pagerduty_async: Option) -> anyhow::Result<()> { // sentry logging should already be configured let seconds = self.seconds.unwrap_or(60); @@ -48,15 +58,44 @@ impl SentrydSubCommand { let mut handles = FuturesUnordered::new(); // channels and a task for sending errors to logs/pagerduty - let (error_sender, mut error_receiver) = mpsc::channel::<(log::Level, anyhow::Error)>(10); + let (error_sender, mut error_receiver) = mpsc::channel::(10); { let error_handler_f = async move { - while let Some((error_level, err)) = error_receiver.recv().await { - log::log!(error_level, "check failed: {:?}", err); + if pagerduty_async.is_none() { + info!("set PAGERDUTY_INTEGRATION_KEY to send create alerts for errors"); + } - if matches!(error_level, log::Level::Error) { - todo!("send to pager duty if pager duty exists"); + while let Some(err) = error_receiver.recv().await { + log::log!(err.level, "check failed: {:?}", err); + + if matches!(err.level, log::Level::Error) { + let alert = pagerduty_alert( + None, + Some(err.class), + "web3-proxy-sentry".to_string(), + None, + None, + None::<()>, + Some("web3-proxy-sentry".to_string()), + pagerduty_rs::types::Severity::Error, + None, + format!("{}", err.anyhow), + None, + ); + + if let Some(pagerduty_async) = pagerduty_async.as_ref() { + info!( + "sending to pagerduty: {}", + serde_json::to_string_pretty(&alert)? + ); + + if let Err(err) = + pagerduty_async.event(Event::AlertTrigger(alert)).await + { + error!("Failed sending to pagerduty: {}", err); + } + } } } @@ -73,9 +112,13 @@ impl SentrydSubCommand { let url = format!("{}/health", self.web3_proxy); let error_sender = error_sender.clone(); - let loop_f = a_loop(seconds, log::Level::Error, error_sender, move || { - simple::main(url.clone()) - }); + let loop_f = a_loop( + "main /health", + seconds, + log::Level::Error, + error_sender, + move || simple::main(url.clone()), + ); handles.push(tokio::spawn(loop_f)); } @@ -84,9 +127,13 @@ impl SentrydSubCommand { let url = format!("{}/health", other_web3_proxy); let error_sender = error_sender.clone(); - let loop_f = a_loop(seconds, log::Level::Warn, error_sender, move || { - simple::main(url.clone()) - }); + let loop_f = a_loop( + "other /health", + seconds, + log::Level::Warn, + error_sender, + move || simple::main(url.clone()), + ); handles.push(tokio::spawn(loop_f)); } @@ -102,9 +149,13 @@ impl SentrydSubCommand { others.extend(self.other_rpc.clone()); - let loop_f = a_loop(seconds, log::Level::Error, error_sender, move || { - compare::main(rpc.clone(), others.clone(), max_age, max_lag) - }); + let loop_f = a_loop( + "head block comparison", + seconds, + log::Level::Error, + error_sender, + move || compare::main(rpc.clone(), others.clone(), max_age, max_lag), + ); handles.push(tokio::spawn(loop_f)); } @@ -120,9 +171,10 @@ impl SentrydSubCommand { } async fn a_loop( + class: &str, seconds: u64, error_level: log::Level, - error_sender: mpsc::Sender<(log::Level, anyhow::Error)>, + error_sender: mpsc::Sender, f: impl Fn() -> T, ) -> anyhow::Result<()> where @@ -137,7 +189,13 @@ where interval.tick().await; if let Err(err) = f().await { - error_sender.send((error_level, err)).await?; + let err = Error { + class: class.to_string(), + level: error_level, + anyhow: err, + }; + + error_sender.send(err).await?; }; } }