add /backups_needed endpoint for easy alerts

This commit is contained in:
Bryan Stitt 2023-04-13 17:15:01 -07:00
parent 3c5f973107
commit d035049c8f
5 changed files with 84 additions and 33 deletions

@ -158,6 +158,7 @@ pub async fn serve(port: u16, proxy_app: Arc<Web3ProxyApp>) -> anyhow::Result<()
//
.route("/health", get(status::health))
.route("/status", get(status::status))
.route("/status/backups_needed", get(status::backups_needed))
//
// User stuff
//

@ -7,8 +7,6 @@ use super::{FrontendHealthCache, FrontendResponseCache, FrontendResponseCaches};
use crate::app::{Web3ProxyApp, APP_USER_AGENT};
use axum::{http::StatusCode, response::IntoResponse, Extension, Json};
use axum_macros::debug_handler;
use hashbrown::HashMap;
use http::HeaderMap;
use serde_json::json;
use std::sync::Arc;
@ -29,6 +27,30 @@ pub async fn health(
}
}
/// Easy alerting if backup servers are in use.
pub async fn backups_needed(Extension(app): Extension<Arc<Web3ProxyApp>>) -> impl IntoResponse {
let code = {
let consensus_rpcs = app.balanced_rpcs.watch_consensus_rpcs_sender.borrow();
if let Some(consensus_rpcs) = consensus_rpcs.as_ref() {
if consensus_rpcs.backups_needed {
StatusCode::INTERNAL_SERVER_ERROR
} else {
StatusCode::OK
}
} else {
// if no consensus, we still "need backups". we just don't have any. which is worse
StatusCode::INTERNAL_SERVER_ERROR
}
};
if matches!(code, StatusCode::OK) {
(code, "no backups needed. :)")
} else {
(code, "backups needed! :(")
}
}
/// Very basic status page.
///
/// TODO: replace this with proper stats and monitoring

@ -7,7 +7,7 @@ use anyhow::Context;
use ethers::prelude::{H256, U64};
use hashbrown::{HashMap, HashSet};
use itertools::{Itertools, MinMaxResult};
use log::{trace, warn, debug};
use log::{trace, warn};
use moka::future::Cache;
use serde::Serialize;
use std::cmp::Reverse;
@ -19,12 +19,12 @@ use tokio::time::Instant;
/// Serialize is so we can print it on our debug endpoint
#[derive(Clone, Serialize)]
pub struct ConsensusWeb3Rpcs {
pub(super) tier: u64,
pub(super) head_block: Web3ProxyBlock,
pub(super) best_rpcs: Vec<Arc<Web3Rpc>>,
pub(crate) tier: u64,
pub(crate) head_block: Web3ProxyBlock,
pub(crate) best_rpcs: Vec<Arc<Web3Rpc>>,
// TODO: functions like "compare_backup_vote()"
// pub(super) backups_voted: Option<Web3ProxyBlock>,
pub(super) backups_needed: bool,
pub(crate) backups_needed: bool,
}
impl ConsensusWeb3Rpcs {
@ -204,9 +204,7 @@ impl ConsensusFinder {
authorization: &Arc<Authorization>,
web3_rpcs: &Web3Rpcs,
) -> anyhow::Result<Option<ConsensusWeb3Rpcs>> {
let minmax_block = self
.rpc_heads
.values().minmax_by_key(|&x| x.number());
let minmax_block = self.rpc_heads.values().minmax_by_key(|&x| x.number());
let (lowest_block, highest_block) = match minmax_block {
MinMaxResult::NoElements => return Ok(None),
@ -220,7 +218,8 @@ impl ConsensusFinder {
trace!("lowest_block_number: {}", lowest_block.number());
let max_lag_block_number = highest_block_number.saturating_sub(self.max_block_lag.unwrap_or_else(|| U64::from(10)));
let max_lag_block_number = highest_block_number
.saturating_sub(self.max_block_lag.unwrap_or_else(|| U64::from(10)));
trace!("max_lag_block_number: {}", max_lag_block_number);
@ -245,7 +244,11 @@ impl ConsensusFinder {
let mut rpc_heads_by_tier: Vec<_> = self.rpc_heads.iter().collect();
rpc_heads_by_tier.sort_by_cached_key(|(rpc, _)| rpc.tier);
let current_tier = rpc_heads_by_tier.first().expect("rpc_heads_by_tier should never be empty").0.tier;
let current_tier = rpc_heads_by_tier
.first()
.expect("rpc_heads_by_tier should never be empty")
.0
.tier;
// loop over all the rpc heads (grouped by tier) and their parents to find consensus
// TODO: i'm sure theres a lot of shortcuts that could be taken, but this is simplest to implement
@ -253,13 +256,13 @@ impl ConsensusFinder {
if current_tier != rpc.tier {
// we finished processing a tier. check for primary results
if let Some(consensus) = self.count_votes(&primary_votes, web3_rpcs) {
return Ok(Some(consensus))
return Ok(Some(consensus));
}
// only set backup consensus once. we don't want it to keep checking on worse tiers if it already found consensus
if backup_consensus.is_none() {
if let Some(consensus) = self.count_votes(&backup_votes, web3_rpcs) {
backup_consensus =Some(consensus)
backup_consensus = Some(consensus)
}
}
}
@ -281,7 +284,10 @@ impl ConsensusFinder {
backup_entry.0.insert(&rpc.name);
backup_entry.1 += rpc.soft_limit;
match web3_rpcs.block(authorization, block_to_check.parent_hash(), Some(rpc)).await {
match web3_rpcs
.block(authorization, block_to_check.parent_hash(), Some(rpc))
.await
{
Ok(parent_block) => block_to_check = parent_block,
Err(err) => {
warn!("Problem fetching parent block of {:#?} during consensus finding: {:#?}", block_to_check, err);
@ -293,7 +299,7 @@ impl ConsensusFinder {
// we finished processing all tiers. check for primary results (if anything but the last tier found consensus, we already returned above)
if let Some(consensus) = self.count_votes(&primary_votes, web3_rpcs) {
return Ok(Some(consensus))
return Ok(Some(consensus));
}
// only set backup consensus once. we don't want it to keep checking on worse tiers if it already found consensus
@ -301,15 +307,28 @@ impl ConsensusFinder {
return Ok(Some(consensus));
}
// count votes one last time
// count votes one last time
Ok(self.count_votes(&backup_votes, web3_rpcs))
}
// TODO: have min_sum_soft_limit and min_head_rpcs on self instead of on Web3Rpcs
fn count_votes(&self, votes: &HashMap<Web3ProxyBlock, (HashSet<&str>, u32)>, web3_rpcs: &Web3Rpcs) -> Option<ConsensusWeb3Rpcs> {
fn count_votes(
&self,
votes: &HashMap<Web3ProxyBlock, (HashSet<&str>, u32)>,
web3_rpcs: &Web3Rpcs,
) -> Option<ConsensusWeb3Rpcs> {
// sort the primary votes ascending by tier and descending by block num
let mut votes: Vec<_> = votes.iter().map(|(block, (rpc_names, sum_soft_limit))| (block, sum_soft_limit, rpc_names)).collect();
votes.sort_by_cached_key(|(block, sum_soft_limit, rpc_names)| (Reverse(*block.number()), Reverse(*sum_soft_limit), Reverse(rpc_names.len())));
let mut votes: Vec<_> = votes
.iter()
.map(|(block, (rpc_names, sum_soft_limit))| (block, sum_soft_limit, rpc_names))
.collect();
votes.sort_by_cached_key(|(block, sum_soft_limit, rpc_names)| {
(
Reverse(*block.number()),
Reverse(*sum_soft_limit),
Reverse(rpc_names.len()),
)
});
// return the first result that exceededs confgured minimums (if any)
for (maybe_head_block, sum_soft_limit, rpc_names) in votes {
@ -324,14 +343,21 @@ impl ConsensusFinder {
trace!("rpc_names: {:#?}", rpc_names);
// consensus likely found! load the rpcs to make sure they all have active connections
let consensus_rpcs: Vec<_> = rpc_names.into_iter().filter_map(|x| web3_rpcs.get(x)).collect();
let consensus_rpcs: Vec<_> = rpc_names
.into_iter()
.filter_map(|x| web3_rpcs.get(x))
.collect();
if consensus_rpcs.len() < web3_rpcs.min_head_rpcs {
continue;
}
// consensus found!
let tier = consensus_rpcs.iter().map(|x| x.tier).max().expect("there should always be a max");
let tier = consensus_rpcs
.iter()
.map(|x| x.tier)
.max()
.expect("there should always be a max");
let backups_needed = consensus_rpcs.iter().any(|x| x.backup);

@ -51,7 +51,7 @@ pub struct Web3Rpcs {
/// TODO: document that this is a watch sender and not a broadcast! if things get busy, blocks might get missed
/// TODO: why is watch_consensus_head_sender in an Option, but this one isn't?
/// Geth's subscriptions have the same potential for skipping blocks.
pub(super) watch_consensus_rpcs_sender: watch::Sender<Option<Arc<ConsensusWeb3Rpcs>>>,
pub(crate) watch_consensus_rpcs_sender: watch::Sender<Option<Arc<ConsensusWeb3Rpcs>>>,
/// this head receiver makes it easy to wait until there is a new block
pub(super) watch_consensus_head_sender: Option<watch::Sender<Option<Web3ProxyBlock>>>,
pub(super) pending_transaction_cache:
@ -1222,11 +1222,12 @@ impl Serialize for Web3Rpcs {
/// TODO: i think we still have sorts scattered around the code that should use this
/// TODO: take AsRef or something like that? We don't need an Arc here
fn rpc_sync_status_sort_key(x: &Arc<Web3Rpc>) -> (Reverse<U64>, u64, bool, OrderedFloat<f64>) {
let head_block = x.head_block
.read()
.as_ref()
.map(|x| *x.number())
.unwrap_or_default();
let head_block = x
.head_block
.read()
.as_ref()
.map(|x| *x.number())
.unwrap_or_default();
let tier = x.tier;

@ -9,9 +9,8 @@ use crate::rpcs::request::RequestRevertHandler;
use anyhow::{anyhow, Context};
use ethers::prelude::{Bytes, Middleware, ProviderError, TxHash, H256, U64};
use ethers::types::{Address, Transaction, U256};
use futures::StreamExt;
use futures::future::try_join_all;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use log::{debug, error, info, trace, warn, Level};
use migration::sea_orm::DatabaseConnection;
use ordered_float::OrderedFloat;
@ -701,7 +700,7 @@ impl Web3Rpc {
} else {
RequestRevertHandler::ErrorLevel
};
let mut delay_start = false;
// this does loop. just only when reconnect is enabled
@ -888,7 +887,7 @@ impl Web3Rpc {
continue;
}
// reconnect is not enabled.
if *disconnect_receiver.borrow() {
info!("{} is disconnecting", self);
@ -1150,7 +1149,9 @@ impl Web3Rpc {
if self.should_disconnect() {
Ok(())
} else {
Err(anyhow!("pending_transactions subscription exited. reconnect needed"))
Err(anyhow!(
"pending_transactions subscription exited. reconnect needed"
))
}
}