From 54d190acfc0ce82f70e4e209abc9ba2203dbc70b Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Tue, 24 Jan 2023 02:45:48 -0800 Subject: [PATCH] dryer pagerduty code --- .../src/bin/web3_proxy_cli/pagerduty.rs | 71 +++++++++---------- .../src/bin/web3_proxy_cli/sentryd/mod.rs | 40 +++++++++-- web3_proxy/src/lib.rs | 1 + 3 files changed, 71 insertions(+), 41 deletions(-) diff --git a/web3_proxy/src/bin/web3_proxy_cli/pagerduty.rs b/web3_proxy/src/bin/web3_proxy_cli/pagerduty.rs index 2097068d..34cd5586 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/pagerduty.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/pagerduty.rs @@ -1,11 +1,10 @@ use argh::FromArgs; -use gethostname::gethostname; use log::{error, info}; -use pagerduty_rs::{ - eventsv2async::EventsV2 as PagerdutyAsyncEventsV2, - types::{AlertTrigger, AlertTriggerPayload, Event}, +use pagerduty_rs::{eventsv2async::EventsV2 as PagerdutyAsyncEventsV2, types::Event}; +use web3_proxy::{ + config::TopConfig, + pagerduty::{pagerduty_event_for_config, trigger_pagerduty_alert}, }; -use web3_proxy::config::TopConfig; #[derive(FromArgs, PartialEq, Debug, Eq)] /// Quickly create a pagerduty alert @@ -19,6 +18,10 @@ pub struct PagerdutySubCommand { /// the class/type of the event class: Option, + #[argh(option)] + /// the component of the event + component: Option, + #[argh(option)] /// deduplicate alerts based on this key. /// If there are no open incidents with this key, a new incident will be created. @@ -33,40 +36,36 @@ pub struct PagerdutySubCommand { impl PagerdutySubCommand { pub async fn main( - &self, + self, pagerduty_async: Option, top_config: Option, ) -> anyhow::Result<()> { - let client = top_config - .as_ref() - .map(|top_config| format!("web3-proxy chain #{}", top_config.app.chain_id)) - .unwrap_or_else(|| format!("web3-proxy w/o chain")); - - let client_url = top_config - .as_ref() - .and_then(|x| x.app.redirect_public_url.clone()); - - let hostname = gethostname().into_string().unwrap_or("unknown".to_string()); - - let payload = AlertTriggerPayload { - severity: pagerduty_rs::types::Severity::Error, - summary: self.summary.clone(), - source: hostname, - timestamp: None, - component: None, - group: Some(self.group.clone()), - class: self.class.clone(), - custom_details: None::<()>, - }; - - let event = AlertTrigger { - payload, - dedup_key: None, - images: None, - links: None, - client: Some(client), - client_url: client_url, - }; + let event = top_config + .map(|top_config| { + pagerduty_event_for_config( + top_config, + self.class.clone(), + self.component.clone(), + Some(self.group.clone()), + self.summary.clone(), + None, + None::<()>, + ) + }) + .unwrap_or_else(|| { + trigger_pagerduty_alert( + "web3-proxy".to_string(), + None, + self.class, + None, + self.component, + Some(self.group), + None, + self.summary, + None, + None::<()>, + ) + }); if let Some(pagerduty_async) = pagerduty_async { info!( diff --git a/web3_proxy/src/bin/web3_proxy_cli/sentryd/mod.rs b/web3_proxy/src/bin/web3_proxy_cli/sentryd/mod.rs index 8199bfc3..f9e46cd7 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/sentryd/mod.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/sentryd/mod.rs @@ -7,6 +7,7 @@ use futures::{ Future, }; use std::time::Duration; +use tokio::sync::mpsc; use tokio::time::{interval, MissedTickBehavior}; #[derive(FromArgs, PartialEq, Debug, Eq)] @@ -46,13 +47,33 @@ impl SentrydSubCommand { let mut handles = FuturesUnordered::new(); + // channels and a task for sending errors to logs/pagerduty + let (error_sender, mut error_receiver) = mpsc::channel::<(log::Level, anyhow::Error)>(10); + + { + let error_handler_f = async move { + while let Some((error_level, err)) = error_receiver.recv().await { + log::log!(error_level, "check failed: {:?}", err); + + if matches!(error_level, log::Level::Error) { + todo!("send to pager duty if pager duty exists"); + } + } + + Ok(()) + }; + + handles.push(tokio::spawn(error_handler_f)); + } + // spawn a bunch of health check loops that do their checks on an interval // check the main rpc's /health endpoint { let url = format!("{}/health", self.web3_proxy); + let error_sender = error_sender.clone(); - let loop_f = a_loop(seconds, log::Level::Error, move || { + let loop_f = a_loop(seconds, log::Level::Error, error_sender, move || { simple::main(url.clone()) }); @@ -61,8 +82,11 @@ impl SentrydSubCommand { // check any other web3-proxy /health endpoints for other_web3_proxy in self.other_proxy.iter() { let url = format!("{}/health", other_web3_proxy); + let error_sender = error_sender.clone(); - let loop_f = a_loop(seconds, log::Level::Warn, move || simple::main(url.clone())); + let loop_f = a_loop(seconds, log::Level::Warn, error_sender, move || { + simple::main(url.clone()) + }); handles.push(tokio::spawn(loop_f)); } @@ -72,12 +96,13 @@ impl SentrydSubCommand { let max_age = self.max_age; let max_lag = self.max_lag; let rpc = self.web3_proxy.clone(); + let error_sender = error_sender.clone(); let mut others = self.other_proxy.clone(); others.extend(self.other_rpc.clone()); - let loop_f = a_loop(seconds, log::Level::Error, move || { + let loop_f = a_loop(seconds, log::Level::Error, error_sender, move || { compare::main(rpc.clone(), others.clone(), max_age, max_lag) }); @@ -94,7 +119,12 @@ impl SentrydSubCommand { } } -async fn a_loop(seconds: u64, error_level: log::Level, f: impl Fn() -> T) -> anyhow::Result<()> +async fn a_loop( + seconds: u64, + error_level: log::Level, + error_sender: mpsc::Sender<(log::Level, anyhow::Error)>, + f: impl Fn() -> T, +) -> anyhow::Result<()> where T: Future> + Send + 'static, { @@ -107,7 +137,7 @@ where interval.tick().await; if let Err(err) = f().await { - log::log!(error_level, "check failed: {:?}", err); + error_sender.send((error_level, err)).await?; }; } } diff --git a/web3_proxy/src/lib.rs b/web3_proxy/src/lib.rs index 0ae97055..571e245f 100644 --- a/web3_proxy/src/lib.rs +++ b/web3_proxy/src/lib.rs @@ -6,6 +6,7 @@ pub mod frontend; pub mod jsonrpc; pub mod metered; pub mod metrics_frontend; +pub mod pagerduty; pub mod rpcs; pub mod user_queries; pub mod user_token;