dryer pagerduty code

This commit is contained in:
Bryan Stitt 2023-01-24 02:45:48 -08:00
parent a242244a35
commit 54d190acfc
3 changed files with 71 additions and 41 deletions

@ -1,11 +1,10 @@
use argh::FromArgs;
use gethostname::gethostname;
use log::{error, info};
use pagerduty_rs::{
eventsv2async::EventsV2 as PagerdutyAsyncEventsV2,
types::{AlertTrigger, AlertTriggerPayload, Event},
use pagerduty_rs::{eventsv2async::EventsV2 as PagerdutyAsyncEventsV2, types::Event};
use web3_proxy::{
config::TopConfig,
pagerduty::{pagerduty_event_for_config, trigger_pagerduty_alert},
};
use web3_proxy::config::TopConfig;
#[derive(FromArgs, PartialEq, Debug, Eq)]
/// Quickly create a pagerduty alert
@ -19,6 +18,10 @@ pub struct PagerdutySubCommand {
/// the class/type of the event
class: Option<String>,
#[argh(option)]
/// the component of the event
component: Option<String>,
#[argh(option)]
/// deduplicate alerts based on this key.
/// If there are no open incidents with this key, a new incident will be created.
@ -33,40 +36,36 @@ pub struct PagerdutySubCommand {
impl PagerdutySubCommand {
pub async fn main(
&self,
self,
pagerduty_async: Option<PagerdutyAsyncEventsV2>,
top_config: Option<TopConfig>,
) -> anyhow::Result<()> {
let client = top_config
.as_ref()
.map(|top_config| format!("web3-proxy chain #{}", top_config.app.chain_id))
.unwrap_or_else(|| format!("web3-proxy w/o chain"));
let client_url = top_config
.as_ref()
.and_then(|x| x.app.redirect_public_url.clone());
let hostname = gethostname().into_string().unwrap_or("unknown".to_string());
let payload = AlertTriggerPayload {
severity: pagerduty_rs::types::Severity::Error,
summary: self.summary.clone(),
source: hostname,
timestamp: None,
component: None,
group: Some(self.group.clone()),
class: self.class.clone(),
custom_details: None::<()>,
};
let event = AlertTrigger {
payload,
dedup_key: None,
images: None,
links: None,
client: Some(client),
client_url: client_url,
};
let event = top_config
.map(|top_config| {
pagerduty_event_for_config(
top_config,
self.class.clone(),
self.component.clone(),
Some(self.group.clone()),
self.summary.clone(),
None,
None::<()>,
)
})
.unwrap_or_else(|| {
trigger_pagerduty_alert(
"web3-proxy".to_string(),
None,
self.class,
None,
self.component,
Some(self.group),
None,
self.summary,
None,
None::<()>,
)
});
if let Some(pagerduty_async) = pagerduty_async {
info!(

@ -7,6 +7,7 @@ use futures::{
Future,
};
use std::time::Duration;
use tokio::sync::mpsc;
use tokio::time::{interval, MissedTickBehavior};
#[derive(FromArgs, PartialEq, Debug, Eq)]
@ -46,13 +47,33 @@ impl SentrydSubCommand {
let mut handles = FuturesUnordered::new();
// channels and a task for sending errors to logs/pagerduty
let (error_sender, mut error_receiver) = mpsc::channel::<(log::Level, anyhow::Error)>(10);
{
let error_handler_f = async move {
while let Some((error_level, err)) = error_receiver.recv().await {
log::log!(error_level, "check failed: {:?}", err);
if matches!(error_level, log::Level::Error) {
todo!("send to pager duty if pager duty exists");
}
}
Ok(())
};
handles.push(tokio::spawn(error_handler_f));
}
// spawn a bunch of health check loops that do their checks on an interval
// check the main rpc's /health endpoint
{
let url = format!("{}/health", self.web3_proxy);
let error_sender = error_sender.clone();
let loop_f = a_loop(seconds, log::Level::Error, move || {
let loop_f = a_loop(seconds, log::Level::Error, error_sender, move || {
simple::main(url.clone())
});
@ -61,8 +82,11 @@ impl SentrydSubCommand {
// check any other web3-proxy /health endpoints
for other_web3_proxy in self.other_proxy.iter() {
let url = format!("{}/health", other_web3_proxy);
let error_sender = error_sender.clone();
let loop_f = a_loop(seconds, log::Level::Warn, move || simple::main(url.clone()));
let loop_f = a_loop(seconds, log::Level::Warn, error_sender, move || {
simple::main(url.clone())
});
handles.push(tokio::spawn(loop_f));
}
@ -72,12 +96,13 @@ impl SentrydSubCommand {
let max_age = self.max_age;
let max_lag = self.max_lag;
let rpc = self.web3_proxy.clone();
let error_sender = error_sender.clone();
let mut others = self.other_proxy.clone();
others.extend(self.other_rpc.clone());
let loop_f = a_loop(seconds, log::Level::Error, move || {
let loop_f = a_loop(seconds, log::Level::Error, error_sender, move || {
compare::main(rpc.clone(), others.clone(), max_age, max_lag)
});
@ -94,7 +119,12 @@ impl SentrydSubCommand {
}
}
async fn a_loop<T>(seconds: u64, error_level: log::Level, f: impl Fn() -> T) -> anyhow::Result<()>
async fn a_loop<T>(
seconds: u64,
error_level: log::Level,
error_sender: mpsc::Sender<(log::Level, anyhow::Error)>,
f: impl Fn() -> T,
) -> anyhow::Result<()>
where
T: Future<Output = anyhow::Result<()>> + Send + 'static,
{
@ -107,7 +137,7 @@ where
interval.tick().await;
if let Err(err) = f().await {
log::log!(error_level, "check failed: {:?}", err);
error_sender.send((error_level, err)).await?;
};
}
}

@ -6,6 +6,7 @@ pub mod frontend;
pub mod jsonrpc;
pub mod metered;
pub mod metrics_frontend;
pub mod pagerduty;
pub mod rpcs;
pub mod user_queries;
pub mod user_token;