improve pager duty errors for smarter deduping

This commit is contained in:
Bryan Stitt 2023-01-25 14:04:06 -08:00
parent f2260ecdba
commit d5f2d6eb18
7 changed files with 293 additions and 140 deletions

@ -18,19 +18,16 @@ mod user_import;
use anyhow::Context;
use argh::FromArgs;
use ethers::types::U256;
use gethostname::gethostname;
use log::{error, info, warn};
use log::{info, warn};
use pagerduty_rs::eventsv2async::EventsV2 as PagerdutyAsyncEventsV2;
use pagerduty_rs::eventsv2sync::EventsV2 as PagerdutySyncEventsV2;
use pagerduty_rs::types::{AlertTrigger, AlertTriggerPayload};
use pagerduty_rs::{eventsv2async::EventsV2 as PagerdutyAsyncEventsV2, types::Event};
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
use std::{
fs, panic,
path::Path,
sync::atomic::{self, AtomicUsize},
};
use tokio::runtime;
use web3_proxy::pagerduty::panic_handler;
use web3_proxy::{
app::{get_db, get_migrated_db, APP_USER_AGENT},
config::TopConfig,
@ -237,57 +234,13 @@ fn main() -> anyhow::Result<()> {
(None, None)
};
// panic handler that sends to pagerduty
// TODO: there is a `pagerduty_panic` module that looks like it would work with minor tweaks, but ethers-rs panics when a websocket exit and that would fire too many alerts
// panic handler that sends to pagerduty.
// TODO: use the sentry handler if no pager duty. use default if no sentry
if let Some(pagerduty_sync) = pagerduty_sync {
let client = top_config
.as_ref()
.map(|top_config| format!("web3-proxy chain #{}", top_config.app.chain_id))
.unwrap_or_else(|| format!("web3-proxy w/o chain"));
let client_url = top_config
.as_ref()
.and_then(|x| x.app.redirect_public_url.clone());
let top_config = top_config.clone();
panic::set_hook(Box::new(move |x| {
let hostname = gethostname().into_string().unwrap_or("unknown".to_string());
let panic_msg = format!("{} {:?}", x, x);
if panic_msg.starts_with("panicked at 'WS Server panic") {
info!("Underlying library {}", panic_msg);
} else {
error!("sending panic to pagerduty: {}", panic_msg);
let mut s = DefaultHasher::new();
panic_msg.hash(&mut s);
panic_msg.hash(&mut s);
let dedup_key = s.finish().to_string();
let payload = AlertTriggerPayload {
severity: pagerduty_rs::types::Severity::Error,
summary: panic_msg,
source: hostname,
timestamp: None,
component: None,
group: Some("web3-proxy".to_string()),
class: Some("panic".to_string()),
custom_details: None::<()>,
};
let event = Event::AlertTrigger(AlertTrigger {
payload,
dedup_key: Some(dedup_key),
images: None,
links: None,
client: Some(client.clone()),
client_url: client_url.clone(),
});
if let Err(err) = pagerduty_sync.event(event) {
error!("Failed sending panic to pagerduty: {}", err);
}
}
panic_handler(top_config.clone(), &pagerduty_sync, x);
}));
}

@ -3,7 +3,7 @@ use log::{error, info};
use pagerduty_rs::{eventsv2async::EventsV2 as PagerdutyAsyncEventsV2, types::Event};
use web3_proxy::{
config::TopConfig,
pagerduty::{pagerduty_alert, pagerduty_event_for_config},
pagerduty::{pagerduty_alert, pagerduty_alert_for_config},
};
#[derive(FromArgs, PartialEq, Debug, Eq)]
@ -31,11 +31,6 @@ pub struct PagerdutySubCommand {
/// If there are no open incidents with this key, a new incident will be created.
/// If there is an open incident with a matching key, the new event will be appended to that incident's Alerts log as an additional Trigger log entry.
dedup_key: Option<String>,
#[argh(option, default = "\"web3-proxy\".to_string()")]
/// a cluster or grouping of sources.
/// For example, sources "ethereum-proxy" and "polygon-proxy" might both be part of "web3-proxy".
group: String,
}
impl PagerdutySubCommand {
@ -47,11 +42,10 @@ impl PagerdutySubCommand {
// TODO: allow customizing severity
let event = top_config
.map(|top_config| {
pagerduty_event_for_config(
pagerduty_alert_for_config(
self.class.clone(),
self.component.clone(),
None::<()>,
Some(self.group.clone()),
pagerduty_rs::types::Severity::Error,
self.summary.clone(),
None,
@ -62,11 +56,10 @@ impl PagerdutySubCommand {
pagerduty_alert(
self.chain_id,
self.class,
"web3-proxy".to_string(),
None,
None,
self.component,
None::<()>,
Some(self.group),
pagerduty_rs::types::Severity::Error,
None,
self.summary,

@ -7,6 +7,8 @@ use serde::{Deserialize, Serialize};
use serde_json::json;
use web3_proxy::jsonrpc::JsonRpcErrorData;
use super::{SentrydErrorBuilder, SentrydResult};
#[derive(Debug, Deserialize, Serialize)]
struct JsonRpcResponse<V> {
// pub jsonrpc: String,
@ -35,11 +37,12 @@ impl From<Block<TxHash>> for AbbreviatedBlock {
}
pub async fn main(
error_builder: SentrydErrorBuilder,
rpc: String,
others: Vec<String>,
max_age: i64,
max_lag: i64,
) -> anyhow::Result<()> {
) -> SentrydResult {
let client = reqwest::Client::new();
let block_by_number_request = json!({
@ -54,30 +57,48 @@ pub async fn main(
.json(&block_by_number_request)
.send()
.await
.context(format!("error fetching block from {}", rpc))?
.context(format!("error querying block from {}", rpc))
.map_err(|x| error_builder.build(x))?;
// TODO: capture response headers now in case of error. store them in the extra data on the pager duty alert
let headers = format!("{:#?}", a.headers());
let a = a
.json::<JsonRpcResponse<Block<TxHash>>>()
.await
.context(format!("error parsing block from {}", rpc))?
.result
.context(format!("no block from {}", rpc))?;
.context(format!("error parsing block from {}", rpc))
.map_err(|x| error_builder.build(x))?;
let a = if let Some(block) = a.result {
block
} else if let Some(err) = a.error {
return error_builder.result(
anyhow::anyhow!("headers: {:#?}. err: {:#?}", headers, err)
.context(format!("jsonrpc error from {}", rpc)),
);
} else {
return error_builder
.result(anyhow!("{:#?}", a).context(format!("empty response from {}", rpc)));
};
// check the parent because b and c might not be as fast as a
let parent_hash = a.parent_hash;
let rpc_block = check_rpc(parent_hash, client.clone(), rpc.clone())
let rpc_block = check_rpc(parent_hash, client.clone(), rpc.to_string())
.await
.context("Error while querying primary rpc")?;
.context(format!("Error while querying primary rpc: {}", rpc))
.map_err(|err| error_builder.build(err))?;
let fs = FuturesUnordered::new();
for other in others.iter() {
let f = check_rpc(parent_hash, client.clone(), other.clone());
let f = check_rpc(parent_hash, client.clone(), other.to_string());
fs.push(tokio::spawn(f));
}
let other_check: Vec<_> = fs.collect().await;
if other_check.is_empty() {
return Err(anyhow::anyhow!("No other RPCs to check!"));
return error_builder.result(anyhow::anyhow!("No other RPCs to check!"));
}
// TODO: collect into a counter instead?
@ -99,22 +120,27 @@ pub async fn main(
match duration_since.abs().cmp(&max_lag) {
std::cmp::Ordering::Less | std::cmp::Ordering::Equal => {}
std::cmp::Ordering::Greater => match duration_since.cmp(&0) {
std::cmp::Ordering::Equal => unimplemented!(),
std::cmp::Ordering::Equal => {
unimplemented!("we already checked that they are not equal")
}
std::cmp::Ordering::Less => {
return Err(anyhow::anyhow!(
return error_builder.result(anyhow::anyhow!(
"Our RPC is too far ahead ({} s)! Something might be wrong.\n{:#}\nvs\n{:#}",
duration_since.abs(),
json!(rpc_block),
json!(newest_other),
));
).context(format!("{} is too far ahead", rpc)));
}
std::cmp::Ordering::Greater => {
return Err(anyhow::anyhow!(
"Our RPC is too far behind ({} s)!\n{:#}\nvs\n{:#}",
duration_since,
json!(rpc_block),
json!(newest_other),
));
return error_builder.result(
anyhow::anyhow!(
"Behind {} s!\n{:#}\nvs\n{:#}",
duration_since,
json!(rpc_block),
json!(newest_other),
)
.context(format!("{} is too far behind", rpc)),
);
}
},
}
@ -130,25 +156,31 @@ pub async fn main(
std::cmp::Ordering::Greater => match duration_since.cmp(&0) {
std::cmp::Ordering::Equal => unimplemented!(),
std::cmp::Ordering::Less => {
return Err(anyhow::anyhow!(
"Our clock is too far behind ({} s)! Something might be wrong.\n{:#}\nvs\n{:#}",
block_age.abs(),
json!(now),
json!(newest_other),
));
return error_builder.result(
anyhow::anyhow!(
"Clock is behind {}s! Something might be wrong.\n{:#}\nvs\n{:#}",
block_age.abs(),
json!(now),
json!(newest_other),
)
.context(format!("Clock is too far behind on {}!", rpc)),
);
}
std::cmp::Ordering::Greater => {
return Err(anyhow::anyhow!(
"block is too old ({} s)!\n{:#}\nvs\n{:#}",
block_age,
json!(now),
json!(newest_other),
));
return error_builder.result(
anyhow::anyhow!(
"block is too old ({}s)!\n{:#}\nvs\n{:#}",
block_age,
json!(now),
json!(newest_other),
)
.context(format!("block is too old on {}!", rpc)),
);
}
},
}
} else {
return Err(anyhow::anyhow!("No other RPC times to check!"));
return error_builder.result(anyhow::anyhow!("No other RPC times to check!"));
}
debug!("rpc comparison ok: {:#}", json!(rpc_block));

@ -9,6 +9,7 @@ use futures::{
};
use log::{error, info};
use pagerduty_rs::{eventsv2async::EventsV2 as PagerdutyAsyncEventsV2, types::Event};
use serde_json::json;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio::time::{interval, MissedTickBehavior};
@ -48,12 +49,41 @@ pub struct SentrydSubCommand {
}
#[derive(Debug)]
struct Error {
pub struct SentrydError {
/// The class/type of the event, for example ping failure or cpu load
class: String,
/// Errors will send a pagerduty alert. others just give log messages
level: log::Level,
/// A short summary that should be mostly static
summary: String,
/// Lots of detail about the error
extra: Option<serde_json::Value>,
}
/// helper for creating SentrydErrors
#[derive(Clone)]
pub struct SentrydErrorBuilder {
class: String,
level: log::Level,
anyhow: anyhow::Error,
}
impl SentrydErrorBuilder {
fn build(&self, err: anyhow::Error) -> SentrydError {
SentrydError {
class: self.class.to_owned(),
level: self.level.to_owned(),
summary: format!("{}", err),
extra: Some(json!(format!("{:#?}", err))),
}
}
fn result(&self, err: anyhow::Error) -> SentrydResult {
Err(self.build(err))
}
}
type SentrydResult = Result<(), SentrydError>;
impl SentrydSubCommand {
pub async fn main(
self,
@ -86,7 +116,7 @@ impl SentrydSubCommand {
let mut handles = FuturesUnordered::new();
// channels and a task for sending errors to logs/pagerduty
let (error_sender, mut error_receiver) = mpsc::channel::<Error>(10);
let (error_sender, mut error_receiver) = mpsc::channel::<SentrydError>(10);
{
let error_handler_f = async move {
@ -101,14 +131,13 @@ impl SentrydSubCommand {
let alert = pagerduty_alert(
Some(chain_id),
Some(err.class),
"web3-proxy-sentry".to_string(),
None,
None,
None::<()>,
Some("web3-proxy-sentry".to_string()),
None,
None,
err.extra,
pagerduty_rs::types::Severity::Error,
None,
format!("{:#?}", err.anyhow),
err.summary,
None,
);
@ -140,12 +169,15 @@ impl SentrydSubCommand {
let url = format!("{}/health", web3_proxy);
let error_sender = error_sender.clone();
// TODO: what timeout?
let timeout = Duration::from_secs(1);
let loop_f = a_loop(
"main /health",
seconds,
log::Level::Error,
error_sender,
move || simple::main(url.clone()),
move |error_builder| simple::main(error_builder, url.clone(), timeout),
);
handles.push(tokio::spawn(loop_f));
@ -162,12 +194,15 @@ impl SentrydSubCommand {
let error_sender = error_sender.clone();
// TODO: what timeout?
let timeout = Duration::from_secs(1);
let loop_f = a_loop(
"other /health",
seconds,
log::Level::Warn,
error_sender,
move || simple::main(url.clone()),
move |error_builder| simple::main(error_builder, url.clone(), timeout),
);
handles.push(tokio::spawn(loop_f));
@ -189,7 +224,9 @@ impl SentrydSubCommand {
seconds,
log::Level::Error,
error_sender,
move || compare::main(rpc.clone(), others.clone(), max_age, max_lag),
move |error_builder| {
compare::main(error_builder, rpc.clone(), others.clone(), max_age, max_lag)
},
);
handles.push(tokio::spawn(loop_f));
@ -209,12 +246,17 @@ async fn a_loop<T>(
class: &str,
seconds: u64,
error_level: log::Level,
error_sender: mpsc::Sender<Error>,
f: impl Fn() -> T,
error_sender: mpsc::Sender<SentrydError>,
f: impl Fn(SentrydErrorBuilder) -> T,
) -> anyhow::Result<()>
where
T: Future<Output = anyhow::Result<()>> + Send + 'static,
T: Future<Output = SentrydResult> + Send + 'static,
{
let error_builder = SentrydErrorBuilder {
class: class.to_owned(),
level: error_level,
};
let mut interval = interval(Duration::from_secs(seconds));
// TODO: should we warn if there are delays?
@ -223,13 +265,7 @@ where
loop {
interval.tick().await;
if let Err(err) = f().await {
let err = Error {
class: class.to_string(),
level: error_level,
anyhow: err,
};
if let Err(err) = f(error_builder.clone()).await {
error_sender.send(err).await?;
};
}

@ -1,22 +1,60 @@
use std::time::Duration;
use super::{SentrydErrorBuilder, SentrydResult};
use anyhow::Context;
use log::{debug, trace};
use tokio::time::Instant;
/// GET the url and return an error if it wasn't a success
pub async fn main(url: String) -> anyhow::Result<()> {
pub async fn main(
error_builder: SentrydErrorBuilder,
url: String,
timeout: Duration,
) -> SentrydResult {
let start = Instant::now();
let r = reqwest::get(&url)
.await
.context(format!("Failed GET {}", url))?;
.context(format!("Failed GET {}", &url))
.map_err(|x| error_builder.build(x))?;
let elapsed = start.elapsed();
if elapsed > timeout {
return error_builder.result(
anyhow::anyhow!(
"query took longer than {}ms ({}ms): {:#?}",
timeout.as_millis(),
elapsed.as_millis(),
r
)
.context(format!("fetching {} took too long", &url)),
);
}
// TODO: what should we do if we get rate limited here?
if r.status().is_success() {
// warn if latency is high?
debug!("{} is healthy", url);
debug!("{} is healthy", &url);
trace!("Successful {:#?}", r);
return Ok(());
}
let debug_str = format!("{:#?}", r);
// TODO: capture headers? or is that already part of r?
let detail = format!("{:#?}", r);
let body = r.text().await?;
let summary = format!("{} is unhealthy: {}", &url, r.status());
Err(anyhow::anyhow!("{}: {}", debug_str, body))
let body = r
.text()
.await
.context(detail.clone())
.context(summary.clone())
.map_err(|x| error_builder.build(x))?;
error_builder.result(
anyhow::anyhow!("body: {}", body)
.context(detail)
.context(summary),
)
}

@ -39,7 +39,7 @@ pub struct CliConfig {
pub cookie_key_filename: String,
}
#[derive(Debug, Deserialize)]
#[derive(Clone, Debug, Deserialize)]
pub struct TopConfig {
pub app: AppConfig,
pub balanced_rpcs: HashMap<String, Web3ConnectionConfig>,
@ -52,7 +52,7 @@ pub struct TopConfig {
/// shared configuration between Web3Connections
// TODO: no String, only &str
#[derive(Debug, Default, Deserialize)]
#[derive(Clone, Debug, Default, Deserialize)]
pub struct AppConfig {
/// Request limit for allowed origins for anonymous users.
/// These requests get rate limited by IP.
@ -190,7 +190,7 @@ fn default_response_cache_max_bytes() -> usize {
}
/// Configuration for a backend web3 RPC server
#[derive(Debug, Deserialize)]
#[derive(Clone, Debug, Deserialize)]
pub struct Web3ConnectionConfig {
/// simple way to disable a connection without deleting the row
#[serde(default)]

@ -1,19 +1,122 @@
use crate::config::TopConfig;
use gethostname::gethostname;
use log::{debug, error};
use pagerduty_rs::eventsv2sync::EventsV2 as PagerdutySyncEventsV2;
use pagerduty_rs::types::{AlertTrigger, AlertTriggerPayload, Event};
use serde::Serialize;
use std::{
collections::hash_map::DefaultHasher,
hash::{Hash, Hasher},
panic::PanicInfo,
};
use crate::config::TopConfig;
use gethostname::gethostname;
use pagerduty_rs::types::{AlertTrigger, AlertTriggerPayload};
use serde::Serialize;
use time::OffsetDateTime;
pub fn pagerduty_event_for_config<T: Serialize>(
/*
let client = top_config
.as_ref()
.map(|top_config| format!("web3-proxy chain #{}", top_config.app.chain_id))
.unwrap_or_else(|| format!("web3-proxy w/o chain"));
let client_url = top_config
.as_ref()
.and_then(|x| x.app.redirect_public_url.clone());
panic::set_hook(Box::new(move |x| {
let hostname = gethostname().into_string().unwrap_or("unknown".to_string());
let panic_msg = format!("{} {:?}", x, x);
if panic_msg.starts_with("panicked at 'WS Server panic") {
info!("Underlying library {}", panic_msg);
} else {
error!("sending panic to pagerduty: {}", panic_msg);
let mut s = DefaultHasher::new();
panic_msg.hash(&mut s);
panic_msg.hash(&mut s);
let dedup_key = s.finish().to_string();
let payload = AlertTriggerPayload {
severity: pagerduty_rs::types::Severity::Error,
summary: panic_msg,
source: hostname,
timestamp: None,
component: None,
group: Some("web3-proxy".to_string()),
class: Some("panic".to_string()),
custom_details: None::<()>,
};
let event = Event::AlertTrigger(AlertTrigger {
payload,
dedup_key: Some(dedup_key),
images: None,
links: None,
client: Some(client.clone()),
client_url: client_url.clone(),
});
if let Err(err) = pagerduty_sync.event(event) {
error!("Failed sending panic to pagerduty: {}", err);
}
}
}));
*/
pub fn panic_handler(
top_config: Option<TopConfig>,
pagerduty_sync: &PagerdutySyncEventsV2,
panic_info: &PanicInfo,
) {
let summary = format!("{}", panic_info);
let details = format!("{:#?}", panic_info);
if summary.starts_with("panicked at 'WS Server panic") {
// the ethers-rs library panics when websockets disconnect. this isn't a panic we care about reporting
debug!("Underlying library {}", details);
return;
}
let class = Some("panic".to_string());
let alert = if let Some(top_config) = top_config {
pagerduty_alert_for_config(
class,
None,
Some(details),
pagerduty_rs::types::Severity::Critical,
summary,
None,
top_config,
)
} else {
pagerduty_alert(
None,
class,
None,
None,
None,
Some(details),
pagerduty_rs::types::Severity::Critical,
None,
summary,
None,
)
};
let event = Event::AlertTrigger(alert);
if let Err(err) = pagerduty_sync.event(event) {
error!("Failed sending alert to pagerduty! {:#?}", err);
}
}
pub fn pagerduty_alert_for_config<T: Serialize>(
class: Option<String>,
component: Option<String>,
custom_details: Option<T>,
group: Option<String>,
severity: pagerduty_rs::types::Severity,
summary: String,
timestamp: Option<OffsetDateTime>,
@ -26,11 +129,10 @@ pub fn pagerduty_event_for_config<T: Serialize>(
pagerduty_alert(
Some(chain_id),
class,
"web3-proxy".to_string(),
None,
client_url,
component,
custom_details,
group,
severity,
None,
summary,
@ -41,19 +143,18 @@ pub fn pagerduty_event_for_config<T: Serialize>(
pub fn pagerduty_alert<T: Serialize>(
chain_id: Option<u64>,
class: Option<String>,
client: String,
client: Option<String>,
client_url: Option<String>,
component: Option<String>,
custom_details: Option<T>,
group: Option<String>,
severity: pagerduty_rs::types::Severity,
source: Option<String>,
summary: String,
timestamp: Option<OffsetDateTime>,
) -> AlertTrigger<T> {
let client = chain_id
.map(|x| format!("{} chain #{}", client, x))
.unwrap_or_else(|| format!("{} w/o chain", client));
let client = client.unwrap_or_else(|| "web3-proxy".to_string());
let group = chain_id.map(|x| format!("chain #{}", x));
let source =
source.unwrap_or_else(|| gethostname().into_string().unwrap_or("unknown".to_string()));