sentryd to pagerduty

This commit is contained in:
Bryan Stitt 2023-01-24 03:12:23 -08:00
parent 54d190acfc
commit 36d64489d8
4 changed files with 86 additions and 24 deletions

View File

@ -318,6 +318,7 @@ These are not yet ordered. There might be duplicates. We might not actually need
- [x] use channels instead of arcswap
- this will let us easily wait for a new head or a new synced connection
- [x] broadcast transactions to more servers
- [x] send sentryd errors to pagerduty
- [-] proxy mode for benchmarking all backends
- [-] proxy mode for sending to multiple backends
- [-] let users choose a % of reverts to log (or maybe x/second). someone like curve logging all reverts will be a BIG database very quickly

View File

@ -368,7 +368,7 @@ fn main() -> anyhow::Result<()> {
warn!("sentry_url is not set! Logs will only show in this console");
}
x.main().await
x.main(pagerduty_async).await
}
SubCommand::RpcAccounting(x) => {
let db_url = cli_config

View File

@ -3,7 +3,7 @@ use log::{error, info};
use pagerduty_rs::{eventsv2async::EventsV2 as PagerdutyAsyncEventsV2, types::Event};
use web3_proxy::{
config::TopConfig,
pagerduty::{pagerduty_event_for_config, trigger_pagerduty_alert},
pagerduty::{pagerduty_alert, pagerduty_event_for_config},
};
#[derive(FromArgs, PartialEq, Debug, Eq)]
@ -40,30 +40,33 @@ impl PagerdutySubCommand {
pagerduty_async: Option<PagerdutyAsyncEventsV2>,
top_config: Option<TopConfig>,
) -> anyhow::Result<()> {
// TODO: allow customizing severity
let event = top_config
.map(|top_config| {
pagerduty_event_for_config(
top_config,
self.class.clone(),
self.component.clone(),
None::<()>,
Some(self.group.clone()),
pagerduty_rs::types::Severity::Error,
self.summary.clone(),
None,
None::<()>,
top_config,
)
})
.unwrap_or_else(|| {
trigger_pagerduty_alert(
"web3-proxy".to_string(),
pagerduty_alert(
None,
self.class,
"web3-proxy".to_string(),
None,
self.component,
None::<()>,
Some(self.group),
pagerduty_rs::types::Severity::Error,
None,
self.summary,
None,
None::<()>,
)
});

View File

@ -6,9 +6,12 @@ use futures::{
stream::{FuturesUnordered, StreamExt},
Future,
};
use log::{error, info};
use pagerduty_rs::{eventsv2async::EventsV2 as PagerdutyAsyncEventsV2, types::Event};
use std::time::Duration;
use tokio::sync::mpsc;
use tokio::time::{interval, MissedTickBehavior};
use web3_proxy::pagerduty::pagerduty_alert;
#[derive(FromArgs, PartialEq, Debug, Eq)]
/// Loop healthchecks and send pager duty alerts if any fail
@ -39,8 +42,15 @@ pub struct SentrydSubCommand {
seconds: Option<u64>,
}
#[derive(Debug)]
struct Error {
class: String,
level: log::Level,
anyhow: anyhow::Error,
}
impl SentrydSubCommand {
pub async fn main(self) -> anyhow::Result<()> {
pub async fn main(self, pagerduty_async: Option<PagerdutyAsyncEventsV2>) -> anyhow::Result<()> {
// sentry logging should already be configured
let seconds = self.seconds.unwrap_or(60);
@ -48,15 +58,44 @@ impl SentrydSubCommand {
let mut handles = FuturesUnordered::new();
// channels and a task for sending errors to logs/pagerduty
let (error_sender, mut error_receiver) = mpsc::channel::<(log::Level, anyhow::Error)>(10);
let (error_sender, mut error_receiver) = mpsc::channel::<Error>(10);
{
let error_handler_f = async move {
while let Some((error_level, err)) = error_receiver.recv().await {
log::log!(error_level, "check failed: {:?}", err);
if pagerduty_async.is_none() {
info!("set PAGERDUTY_INTEGRATION_KEY to send create alerts for errors");
}
if matches!(error_level, log::Level::Error) {
todo!("send to pager duty if pager duty exists");
while let Some(err) = error_receiver.recv().await {
log::log!(err.level, "check failed: {:?}", err);
if matches!(err.level, log::Level::Error) {
let alert = pagerduty_alert(
None,
Some(err.class),
"web3-proxy-sentry".to_string(),
None,
None,
None::<()>,
Some("web3-proxy-sentry".to_string()),
pagerduty_rs::types::Severity::Error,
None,
format!("{}", err.anyhow),
None,
);
if let Some(pagerduty_async) = pagerduty_async.as_ref() {
info!(
"sending to pagerduty: {}",
serde_json::to_string_pretty(&alert)?
);
if let Err(err) =
pagerduty_async.event(Event::AlertTrigger(alert)).await
{
error!("Failed sending to pagerduty: {}", err);
}
}
}
}
@ -73,9 +112,13 @@ impl SentrydSubCommand {
let url = format!("{}/health", self.web3_proxy);
let error_sender = error_sender.clone();
let loop_f = a_loop(seconds, log::Level::Error, error_sender, move || {
simple::main(url.clone())
});
let loop_f = a_loop(
"main /health",
seconds,
log::Level::Error,
error_sender,
move || simple::main(url.clone()),
);
handles.push(tokio::spawn(loop_f));
}
@ -84,9 +127,13 @@ impl SentrydSubCommand {
let url = format!("{}/health", other_web3_proxy);
let error_sender = error_sender.clone();
let loop_f = a_loop(seconds, log::Level::Warn, error_sender, move || {
simple::main(url.clone())
});
let loop_f = a_loop(
"other /health",
seconds,
log::Level::Warn,
error_sender,
move || simple::main(url.clone()),
);
handles.push(tokio::spawn(loop_f));
}
@ -102,9 +149,13 @@ impl SentrydSubCommand {
others.extend(self.other_rpc.clone());
let loop_f = a_loop(seconds, log::Level::Error, error_sender, move || {
compare::main(rpc.clone(), others.clone(), max_age, max_lag)
});
let loop_f = a_loop(
"head block comparison",
seconds,
log::Level::Error,
error_sender,
move || compare::main(rpc.clone(), others.clone(), max_age, max_lag),
);
handles.push(tokio::spawn(loop_f));
}
@ -120,9 +171,10 @@ impl SentrydSubCommand {
}
async fn a_loop<T>(
class: &str,
seconds: u64,
error_level: log::Level,
error_sender: mpsc::Sender<(log::Level, anyhow::Error)>,
error_sender: mpsc::Sender<Error>,
f: impl Fn() -> T,
) -> anyhow::Result<()>
where
@ -137,7 +189,13 @@ where
interval.tick().await;
if let Err(err) = f().await {
error_sender.send((error_level, err)).await?;
let err = Error {
class: class.to_string(),
level: error_level,
anyhow: err,
};
error_sender.send(err).await?;
};
}
}