From d035049c8f19b68324be196ccff207d01470fd0f Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Thu, 13 Apr 2023 17:15:01 -0700 Subject: [PATCH] add /backups_needed endpoint for easy alerts --- web3_proxy/src/frontend/mod.rs | 1 + web3_proxy/src/frontend/status.rs | 26 +++++++++++- web3_proxy/src/rpcs/consensus.rs | 66 +++++++++++++++++++++---------- web3_proxy/src/rpcs/many.rs | 13 +++--- web3_proxy/src/rpcs/one.rs | 11 +++--- 5 files changed, 84 insertions(+), 33 deletions(-) diff --git a/web3_proxy/src/frontend/mod.rs b/web3_proxy/src/frontend/mod.rs index 0d0e5146..ecfbd5b2 100644 --- a/web3_proxy/src/frontend/mod.rs +++ b/web3_proxy/src/frontend/mod.rs @@ -158,6 +158,7 @@ pub async fn serve(port: u16, proxy_app: Arc) -> anyhow::Result<() // .route("/health", get(status::health)) .route("/status", get(status::status)) + .route("/status/backups_needed", get(status::backups_needed)) // // User stuff // diff --git a/web3_proxy/src/frontend/status.rs b/web3_proxy/src/frontend/status.rs index f678e34f..26945cfa 100644 --- a/web3_proxy/src/frontend/status.rs +++ b/web3_proxy/src/frontend/status.rs @@ -7,8 +7,6 @@ use super::{FrontendHealthCache, FrontendResponseCache, FrontendResponseCaches}; use crate::app::{Web3ProxyApp, APP_USER_AGENT}; use axum::{http::StatusCode, response::IntoResponse, Extension, Json}; use axum_macros::debug_handler; -use hashbrown::HashMap; -use http::HeaderMap; use serde_json::json; use std::sync::Arc; @@ -29,6 +27,30 @@ pub async fn health( } } +/// Easy alerting if backup servers are in use. +pub async fn backups_needed(Extension(app): Extension>) -> impl IntoResponse { + let code = { + let consensus_rpcs = app.balanced_rpcs.watch_consensus_rpcs_sender.borrow(); + + if let Some(consensus_rpcs) = consensus_rpcs.as_ref() { + if consensus_rpcs.backups_needed { + StatusCode::INTERNAL_SERVER_ERROR + } else { + StatusCode::OK + } + } else { + // if no consensus, we still "need backups". we just don't have any. which is worse + StatusCode::INTERNAL_SERVER_ERROR + } + }; + + if matches!(code, StatusCode::OK) { + (code, "no backups needed. :)") + } else { + (code, "backups needed! :(") + } +} + /// Very basic status page. /// /// TODO: replace this with proper stats and monitoring diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index a6f94523..ed0aa23c 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -7,7 +7,7 @@ use anyhow::Context; use ethers::prelude::{H256, U64}; use hashbrown::{HashMap, HashSet}; use itertools::{Itertools, MinMaxResult}; -use log::{trace, warn, debug}; +use log::{trace, warn}; use moka::future::Cache; use serde::Serialize; use std::cmp::Reverse; @@ -19,12 +19,12 @@ use tokio::time::Instant; /// Serialize is so we can print it on our debug endpoint #[derive(Clone, Serialize)] pub struct ConsensusWeb3Rpcs { - pub(super) tier: u64, - pub(super) head_block: Web3ProxyBlock, - pub(super) best_rpcs: Vec>, + pub(crate) tier: u64, + pub(crate) head_block: Web3ProxyBlock, + pub(crate) best_rpcs: Vec>, // TODO: functions like "compare_backup_vote()" // pub(super) backups_voted: Option, - pub(super) backups_needed: bool, + pub(crate) backups_needed: bool, } impl ConsensusWeb3Rpcs { @@ -204,9 +204,7 @@ impl ConsensusFinder { authorization: &Arc, web3_rpcs: &Web3Rpcs, ) -> anyhow::Result> { - let minmax_block = self - .rpc_heads - .values().minmax_by_key(|&x| x.number()); + let minmax_block = self.rpc_heads.values().minmax_by_key(|&x| x.number()); let (lowest_block, highest_block) = match minmax_block { MinMaxResult::NoElements => return Ok(None), @@ -220,7 +218,8 @@ impl ConsensusFinder { trace!("lowest_block_number: {}", lowest_block.number()); - let max_lag_block_number = highest_block_number.saturating_sub(self.max_block_lag.unwrap_or_else(|| U64::from(10))); + let max_lag_block_number = highest_block_number + .saturating_sub(self.max_block_lag.unwrap_or_else(|| U64::from(10))); trace!("max_lag_block_number: {}", max_lag_block_number); @@ -245,7 +244,11 @@ impl ConsensusFinder { let mut rpc_heads_by_tier: Vec<_> = self.rpc_heads.iter().collect(); rpc_heads_by_tier.sort_by_cached_key(|(rpc, _)| rpc.tier); - let current_tier = rpc_heads_by_tier.first().expect("rpc_heads_by_tier should never be empty").0.tier; + let current_tier = rpc_heads_by_tier + .first() + .expect("rpc_heads_by_tier should never be empty") + .0 + .tier; // loop over all the rpc heads (grouped by tier) and their parents to find consensus // TODO: i'm sure theres a lot of shortcuts that could be taken, but this is simplest to implement @@ -253,13 +256,13 @@ impl ConsensusFinder { if current_tier != rpc.tier { // we finished processing a tier. check for primary results if let Some(consensus) = self.count_votes(&primary_votes, web3_rpcs) { - return Ok(Some(consensus)) + return Ok(Some(consensus)); } // only set backup consensus once. we don't want it to keep checking on worse tiers if it already found consensus if backup_consensus.is_none() { if let Some(consensus) = self.count_votes(&backup_votes, web3_rpcs) { - backup_consensus =Some(consensus) + backup_consensus = Some(consensus) } } } @@ -281,7 +284,10 @@ impl ConsensusFinder { backup_entry.0.insert(&rpc.name); backup_entry.1 += rpc.soft_limit; - match web3_rpcs.block(authorization, block_to_check.parent_hash(), Some(rpc)).await { + match web3_rpcs + .block(authorization, block_to_check.parent_hash(), Some(rpc)) + .await + { Ok(parent_block) => block_to_check = parent_block, Err(err) => { warn!("Problem fetching parent block of {:#?} during consensus finding: {:#?}", block_to_check, err); @@ -293,7 +299,7 @@ impl ConsensusFinder { // we finished processing all tiers. check for primary results (if anything but the last tier found consensus, we already returned above) if let Some(consensus) = self.count_votes(&primary_votes, web3_rpcs) { - return Ok(Some(consensus)) + return Ok(Some(consensus)); } // only set backup consensus once. we don't want it to keep checking on worse tiers if it already found consensus @@ -301,15 +307,28 @@ impl ConsensusFinder { return Ok(Some(consensus)); } - // count votes one last time + // count votes one last time Ok(self.count_votes(&backup_votes, web3_rpcs)) } // TODO: have min_sum_soft_limit and min_head_rpcs on self instead of on Web3Rpcs - fn count_votes(&self, votes: &HashMap, u32)>, web3_rpcs: &Web3Rpcs) -> Option { + fn count_votes( + &self, + votes: &HashMap, u32)>, + web3_rpcs: &Web3Rpcs, + ) -> Option { // sort the primary votes ascending by tier and descending by block num - let mut votes: Vec<_> = votes.iter().map(|(block, (rpc_names, sum_soft_limit))| (block, sum_soft_limit, rpc_names)).collect(); - votes.sort_by_cached_key(|(block, sum_soft_limit, rpc_names)| (Reverse(*block.number()), Reverse(*sum_soft_limit), Reverse(rpc_names.len()))); + let mut votes: Vec<_> = votes + .iter() + .map(|(block, (rpc_names, sum_soft_limit))| (block, sum_soft_limit, rpc_names)) + .collect(); + votes.sort_by_cached_key(|(block, sum_soft_limit, rpc_names)| { + ( + Reverse(*block.number()), + Reverse(*sum_soft_limit), + Reverse(rpc_names.len()), + ) + }); // return the first result that exceededs confgured minimums (if any) for (maybe_head_block, sum_soft_limit, rpc_names) in votes { @@ -324,14 +343,21 @@ impl ConsensusFinder { trace!("rpc_names: {:#?}", rpc_names); // consensus likely found! load the rpcs to make sure they all have active connections - let consensus_rpcs: Vec<_> = rpc_names.into_iter().filter_map(|x| web3_rpcs.get(x)).collect(); + let consensus_rpcs: Vec<_> = rpc_names + .into_iter() + .filter_map(|x| web3_rpcs.get(x)) + .collect(); if consensus_rpcs.len() < web3_rpcs.min_head_rpcs { continue; } // consensus found! - let tier = consensus_rpcs.iter().map(|x| x.tier).max().expect("there should always be a max"); + let tier = consensus_rpcs + .iter() + .map(|x| x.tier) + .max() + .expect("there should always be a max"); let backups_needed = consensus_rpcs.iter().any(|x| x.backup); diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index ee719932..3d944ac0 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -51,7 +51,7 @@ pub struct Web3Rpcs { /// TODO: document that this is a watch sender and not a broadcast! if things get busy, blocks might get missed /// TODO: why is watch_consensus_head_sender in an Option, but this one isn't? /// Geth's subscriptions have the same potential for skipping blocks. - pub(super) watch_consensus_rpcs_sender: watch::Sender>>, + pub(crate) watch_consensus_rpcs_sender: watch::Sender>>, /// this head receiver makes it easy to wait until there is a new block pub(super) watch_consensus_head_sender: Option>>, pub(super) pending_transaction_cache: @@ -1222,11 +1222,12 @@ impl Serialize for Web3Rpcs { /// TODO: i think we still have sorts scattered around the code that should use this /// TODO: take AsRef or something like that? We don't need an Arc here fn rpc_sync_status_sort_key(x: &Arc) -> (Reverse, u64, bool, OrderedFloat) { - let head_block = x.head_block - .read() - .as_ref() - .map(|x| *x.number()) - .unwrap_or_default(); + let head_block = x + .head_block + .read() + .as_ref() + .map(|x| *x.number()) + .unwrap_or_default(); let tier = x.tier; diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index 8b0b4394..6c4f73a8 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -9,9 +9,8 @@ use crate::rpcs::request::RequestRevertHandler; use anyhow::{anyhow, Context}; use ethers::prelude::{Bytes, Middleware, ProviderError, TxHash, H256, U64}; use ethers::types::{Address, Transaction, U256}; -use futures::StreamExt; use futures::future::try_join_all; -use futures::stream::FuturesUnordered; +use futures::StreamExt; use log::{debug, error, info, trace, warn, Level}; use migration::sea_orm::DatabaseConnection; use ordered_float::OrderedFloat; @@ -701,7 +700,7 @@ impl Web3Rpc { } else { RequestRevertHandler::ErrorLevel }; - + let mut delay_start = false; // this does loop. just only when reconnect is enabled @@ -888,7 +887,7 @@ impl Web3Rpc { continue; } - + // reconnect is not enabled. if *disconnect_receiver.borrow() { info!("{} is disconnecting", self); @@ -1150,7 +1149,9 @@ impl Web3Rpc { if self.should_disconnect() { Ok(()) } else { - Err(anyhow!("pending_transactions subscription exited. reconnect needed")) + Err(anyhow!( + "pending_transactions subscription exited. reconnect needed" + )) } }