From 3c5f973107656c6a0f0580311431503b358633c5 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Tue, 11 Apr 2023 12:04:16 -0700 Subject: [PATCH 01/12] more stats --- .../src/bin/web3_proxy_cli/rpc_accounting.rs | 65 ++++++++++++------- 1 file changed, 41 insertions(+), 24 deletions(-) diff --git a/web3_proxy/src/bin/web3_proxy_cli/rpc_accounting.rs b/web3_proxy/src/bin/web3_proxy_cli/rpc_accounting.rs index 653ecb02..6b73238b 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/rpc_accounting.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/rpc_accounting.rs @@ -42,12 +42,12 @@ impl RpcAccountingSubCommand { #[derive(Serialize, FromQueryResult)] struct SelectResult { total_frontend_requests: Decimal, - // pub total_backend_retries: Decimal, - // pub total_cache_misses: Decimal, + total_backend_retries: Decimal, + // total_cache_misses: Decimal, total_cache_hits: Decimal, total_response_bytes: Decimal, total_error_responses: Decimal, - // pub total_response_millis: Decimal, + total_response_millis: Decimal, first_period_datetime: DateTimeUtc, last_period_datetime: DateTimeUtc, } @@ -58,10 +58,10 @@ impl RpcAccountingSubCommand { rpc_accounting::Column::FrontendRequests.sum(), "total_frontend_requests", ) - // .column_as( - // rpc_accounting::Column::BackendRequests.sum(), - // "total_backend_retries", - // ) + .column_as( + rpc_accounting::Column::BackendRequests.sum(), + "total_backend_retries", + ) // .column_as( // rpc_accounting::Column::CacheMisses.sum(), // "total_cache_misses", @@ -76,10 +76,10 @@ impl RpcAccountingSubCommand { rpc_accounting::Column::ErrorResponse.sum(), "total_error_responses", ) - // .column_as( - // rpc_accounting::Column::SumResponseMillis.sum(), - // "total_response_millis", - // ) + .column_as( + rpc_accounting::Column::SumResponseMillis.sum(), + "total_response_millis", + ) .column_as( rpc_accounting::Column::PeriodDatetime.min(), "first_period_datetime", @@ -131,25 +131,42 @@ impl RpcAccountingSubCommand { q = q.filter(condition); - // TODO: make this work without into_json. i think we need to make a struct - let query_response = q + let stats = q .into_model::() .one(db_conn) .await? .context("no query result")?; - info!( - "query_response for chain {:?}: {:#}", - self.chain_id, - json!(query_response) - ); + if let Some(chain_id) = self.chain_id { + info!("stats for chain {}", chain_id); + } else { + info!("stats for all chains"); + } - // let query_seconds: Decimal = query_response - // .last_period_datetime - // .signed_duration_since(query_response.first_period_datetime) - // .num_seconds() - // .into(); - // info!("query seconds: {}", query_seconds); + info!("stats: {:#}", json!(&stats)); + + let query_seconds: Decimal = stats + .last_period_datetime + .signed_duration_since(stats.first_period_datetime) + .num_seconds() + .into(); + dbg!(query_seconds); + + let avg_request_per_second = (stats.total_frontend_requests / query_seconds).round_dp(2); + dbg!(avg_request_per_second); + + let cache_hit_rate = (stats.total_cache_hits / stats.total_frontend_requests + * Decimal::from(100)) + .round_dp(2); + dbg!(cache_hit_rate); + + let avg_response_millis = + (stats.total_response_millis / stats.total_frontend_requests).round_dp(3); + dbg!(avg_response_millis); + + let avg_response_bytes = + (stats.total_response_bytes / stats.total_frontend_requests).round(); + dbg!(avg_response_bytes); Ok(()) } From d035049c8f19b68324be196ccff207d01470fd0f Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Thu, 13 Apr 2023 17:15:01 -0700 Subject: [PATCH 02/12] add /backups_needed endpoint for easy alerts --- web3_proxy/src/frontend/mod.rs | 1 + web3_proxy/src/frontend/status.rs | 26 +++++++++++- web3_proxy/src/rpcs/consensus.rs | 66 +++++++++++++++++++++---------- web3_proxy/src/rpcs/many.rs | 13 +++--- web3_proxy/src/rpcs/one.rs | 11 +++--- 5 files changed, 84 insertions(+), 33 deletions(-) diff --git a/web3_proxy/src/frontend/mod.rs b/web3_proxy/src/frontend/mod.rs index 0d0e5146..ecfbd5b2 100644 --- a/web3_proxy/src/frontend/mod.rs +++ b/web3_proxy/src/frontend/mod.rs @@ -158,6 +158,7 @@ pub async fn serve(port: u16, proxy_app: Arc) -> anyhow::Result<() // .route("/health", get(status::health)) .route("/status", get(status::status)) + .route("/status/backups_needed", get(status::backups_needed)) // // User stuff // diff --git a/web3_proxy/src/frontend/status.rs b/web3_proxy/src/frontend/status.rs index f678e34f..26945cfa 100644 --- a/web3_proxy/src/frontend/status.rs +++ b/web3_proxy/src/frontend/status.rs @@ -7,8 +7,6 @@ use super::{FrontendHealthCache, FrontendResponseCache, FrontendResponseCaches}; use crate::app::{Web3ProxyApp, APP_USER_AGENT}; use axum::{http::StatusCode, response::IntoResponse, Extension, Json}; use axum_macros::debug_handler; -use hashbrown::HashMap; -use http::HeaderMap; use serde_json::json; use std::sync::Arc; @@ -29,6 +27,30 @@ pub async fn health( } } +/// Easy alerting if backup servers are in use. +pub async fn backups_needed(Extension(app): Extension>) -> impl IntoResponse { + let code = { + let consensus_rpcs = app.balanced_rpcs.watch_consensus_rpcs_sender.borrow(); + + if let Some(consensus_rpcs) = consensus_rpcs.as_ref() { + if consensus_rpcs.backups_needed { + StatusCode::INTERNAL_SERVER_ERROR + } else { + StatusCode::OK + } + } else { + // if no consensus, we still "need backups". we just don't have any. which is worse + StatusCode::INTERNAL_SERVER_ERROR + } + }; + + if matches!(code, StatusCode::OK) { + (code, "no backups needed. :)") + } else { + (code, "backups needed! :(") + } +} + /// Very basic status page. /// /// TODO: replace this with proper stats and monitoring diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index a6f94523..ed0aa23c 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -7,7 +7,7 @@ use anyhow::Context; use ethers::prelude::{H256, U64}; use hashbrown::{HashMap, HashSet}; use itertools::{Itertools, MinMaxResult}; -use log::{trace, warn, debug}; +use log::{trace, warn}; use moka::future::Cache; use serde::Serialize; use std::cmp::Reverse; @@ -19,12 +19,12 @@ use tokio::time::Instant; /// Serialize is so we can print it on our debug endpoint #[derive(Clone, Serialize)] pub struct ConsensusWeb3Rpcs { - pub(super) tier: u64, - pub(super) head_block: Web3ProxyBlock, - pub(super) best_rpcs: Vec>, + pub(crate) tier: u64, + pub(crate) head_block: Web3ProxyBlock, + pub(crate) best_rpcs: Vec>, // TODO: functions like "compare_backup_vote()" // pub(super) backups_voted: Option, - pub(super) backups_needed: bool, + pub(crate) backups_needed: bool, } impl ConsensusWeb3Rpcs { @@ -204,9 +204,7 @@ impl ConsensusFinder { authorization: &Arc, web3_rpcs: &Web3Rpcs, ) -> anyhow::Result> { - let minmax_block = self - .rpc_heads - .values().minmax_by_key(|&x| x.number()); + let minmax_block = self.rpc_heads.values().minmax_by_key(|&x| x.number()); let (lowest_block, highest_block) = match minmax_block { MinMaxResult::NoElements => return Ok(None), @@ -220,7 +218,8 @@ impl ConsensusFinder { trace!("lowest_block_number: {}", lowest_block.number()); - let max_lag_block_number = highest_block_number.saturating_sub(self.max_block_lag.unwrap_or_else(|| U64::from(10))); + let max_lag_block_number = highest_block_number + .saturating_sub(self.max_block_lag.unwrap_or_else(|| U64::from(10))); trace!("max_lag_block_number: {}", max_lag_block_number); @@ -245,7 +244,11 @@ impl ConsensusFinder { let mut rpc_heads_by_tier: Vec<_> = self.rpc_heads.iter().collect(); rpc_heads_by_tier.sort_by_cached_key(|(rpc, _)| rpc.tier); - let current_tier = rpc_heads_by_tier.first().expect("rpc_heads_by_tier should never be empty").0.tier; + let current_tier = rpc_heads_by_tier + .first() + .expect("rpc_heads_by_tier should never be empty") + .0 + .tier; // loop over all the rpc heads (grouped by tier) and their parents to find consensus // TODO: i'm sure theres a lot of shortcuts that could be taken, but this is simplest to implement @@ -253,13 +256,13 @@ impl ConsensusFinder { if current_tier != rpc.tier { // we finished processing a tier. check for primary results if let Some(consensus) = self.count_votes(&primary_votes, web3_rpcs) { - return Ok(Some(consensus)) + return Ok(Some(consensus)); } // only set backup consensus once. we don't want it to keep checking on worse tiers if it already found consensus if backup_consensus.is_none() { if let Some(consensus) = self.count_votes(&backup_votes, web3_rpcs) { - backup_consensus =Some(consensus) + backup_consensus = Some(consensus) } } } @@ -281,7 +284,10 @@ impl ConsensusFinder { backup_entry.0.insert(&rpc.name); backup_entry.1 += rpc.soft_limit; - match web3_rpcs.block(authorization, block_to_check.parent_hash(), Some(rpc)).await { + match web3_rpcs + .block(authorization, block_to_check.parent_hash(), Some(rpc)) + .await + { Ok(parent_block) => block_to_check = parent_block, Err(err) => { warn!("Problem fetching parent block of {:#?} during consensus finding: {:#?}", block_to_check, err); @@ -293,7 +299,7 @@ impl ConsensusFinder { // we finished processing all tiers. check for primary results (if anything but the last tier found consensus, we already returned above) if let Some(consensus) = self.count_votes(&primary_votes, web3_rpcs) { - return Ok(Some(consensus)) + return Ok(Some(consensus)); } // only set backup consensus once. we don't want it to keep checking on worse tiers if it already found consensus @@ -301,15 +307,28 @@ impl ConsensusFinder { return Ok(Some(consensus)); } - // count votes one last time + // count votes one last time Ok(self.count_votes(&backup_votes, web3_rpcs)) } // TODO: have min_sum_soft_limit and min_head_rpcs on self instead of on Web3Rpcs - fn count_votes(&self, votes: &HashMap, u32)>, web3_rpcs: &Web3Rpcs) -> Option { + fn count_votes( + &self, + votes: &HashMap, u32)>, + web3_rpcs: &Web3Rpcs, + ) -> Option { // sort the primary votes ascending by tier and descending by block num - let mut votes: Vec<_> = votes.iter().map(|(block, (rpc_names, sum_soft_limit))| (block, sum_soft_limit, rpc_names)).collect(); - votes.sort_by_cached_key(|(block, sum_soft_limit, rpc_names)| (Reverse(*block.number()), Reverse(*sum_soft_limit), Reverse(rpc_names.len()))); + let mut votes: Vec<_> = votes + .iter() + .map(|(block, (rpc_names, sum_soft_limit))| (block, sum_soft_limit, rpc_names)) + .collect(); + votes.sort_by_cached_key(|(block, sum_soft_limit, rpc_names)| { + ( + Reverse(*block.number()), + Reverse(*sum_soft_limit), + Reverse(rpc_names.len()), + ) + }); // return the first result that exceededs confgured minimums (if any) for (maybe_head_block, sum_soft_limit, rpc_names) in votes { @@ -324,14 +343,21 @@ impl ConsensusFinder { trace!("rpc_names: {:#?}", rpc_names); // consensus likely found! load the rpcs to make sure they all have active connections - let consensus_rpcs: Vec<_> = rpc_names.into_iter().filter_map(|x| web3_rpcs.get(x)).collect(); + let consensus_rpcs: Vec<_> = rpc_names + .into_iter() + .filter_map(|x| web3_rpcs.get(x)) + .collect(); if consensus_rpcs.len() < web3_rpcs.min_head_rpcs { continue; } // consensus found! - let tier = consensus_rpcs.iter().map(|x| x.tier).max().expect("there should always be a max"); + let tier = consensus_rpcs + .iter() + .map(|x| x.tier) + .max() + .expect("there should always be a max"); let backups_needed = consensus_rpcs.iter().any(|x| x.backup); diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index ee719932..3d944ac0 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -51,7 +51,7 @@ pub struct Web3Rpcs { /// TODO: document that this is a watch sender and not a broadcast! if things get busy, blocks might get missed /// TODO: why is watch_consensus_head_sender in an Option, but this one isn't? /// Geth's subscriptions have the same potential for skipping blocks. - pub(super) watch_consensus_rpcs_sender: watch::Sender>>, + pub(crate) watch_consensus_rpcs_sender: watch::Sender>>, /// this head receiver makes it easy to wait until there is a new block pub(super) watch_consensus_head_sender: Option>>, pub(super) pending_transaction_cache: @@ -1222,11 +1222,12 @@ impl Serialize for Web3Rpcs { /// TODO: i think we still have sorts scattered around the code that should use this /// TODO: take AsRef or something like that? We don't need an Arc here fn rpc_sync_status_sort_key(x: &Arc) -> (Reverse, u64, bool, OrderedFloat) { - let head_block = x.head_block - .read() - .as_ref() - .map(|x| *x.number()) - .unwrap_or_default(); + let head_block = x + .head_block + .read() + .as_ref() + .map(|x| *x.number()) + .unwrap_or_default(); let tier = x.tier; diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index 8b0b4394..6c4f73a8 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -9,9 +9,8 @@ use crate::rpcs::request::RequestRevertHandler; use anyhow::{anyhow, Context}; use ethers::prelude::{Bytes, Middleware, ProviderError, TxHash, H256, U64}; use ethers::types::{Address, Transaction, U256}; -use futures::StreamExt; use futures::future::try_join_all; -use futures::stream::FuturesUnordered; +use futures::StreamExt; use log::{debug, error, info, trace, warn, Level}; use migration::sea_orm::DatabaseConnection; use ordered_float::OrderedFloat; @@ -701,7 +700,7 @@ impl Web3Rpc { } else { RequestRevertHandler::ErrorLevel }; - + let mut delay_start = false; // this does loop. just only when reconnect is enabled @@ -888,7 +887,7 @@ impl Web3Rpc { continue; } - + // reconnect is not enabled. if *disconnect_receiver.borrow() { info!("{} is disconnecting", self); @@ -1150,7 +1149,9 @@ impl Web3Rpc { if self.should_disconnect() { Ok(()) } else { - Err(anyhow!("pending_transactions subscription exited. reconnect needed")) + Err(anyhow!( + "pending_transactions subscription exited. reconnect needed" + )) } } From f3435bc6e006ff86e118c5fbae5f7bd24bbd722d Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Fri, 14 Apr 2023 00:04:35 -0700 Subject: [PATCH 03/12] add bundler_4337_rpcs --- web3_proxy/src/app/mod.rs | 109 +++++++++++++++++++- web3_proxy/src/bin/web3_proxy_cli/proxyd.rs | 1 + web3_proxy/src/config.rs | 2 +- 3 files changed, 107 insertions(+), 5 deletions(-) diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 5ccdd5c6..c697f572 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -202,6 +202,8 @@ impl DatabaseReplica { pub struct Web3ProxyApp { /// Send requests to the best server available pub balanced_rpcs: Arc, + /// Send 4337 Abstraction Bundler requests to one of these servers + pub bundler_4337_rpcs: Option>, pub http_client: Option, /// Send private requests (like eth_sendRawTransaction) to all these servers pub private_rpcs: Option>, @@ -695,11 +697,46 @@ impl Web3ProxyApp { Some(private_rpcs) }; - let hostname = hostname::get().ok().and_then(|x| x.to_str().map(|x| x.to_string())); + // prepare a Web3Rpcs to hold all our 4337 Abstraction Bundler connections + // only some chains have this, so this is optional + let bundler_4337_rpcs = if top_config.bundler_4337_rpcs.is_none() { + warn!("No bundler_4337_rpcs configured"); + None + } else { + // TODO: do something with the spawn handle + let (bundler_4337_rpcs, bundler_4337_rpcs_handle) = Web3Rpcs::spawn( + top_config.app.chain_id, + db_conn.clone(), + http_client.clone(), + // bundler_4337_rpcs don't get subscriptions, so no need for max_block_age or max_block_lag + None, + None, + 0, + 0, + pending_transactions.clone(), + None, + // subscribing to new heads here won't work well. if they are fast, they might be ahead of balanced_rpcs + // they also often have low rate limits + // however, they are well connected to miners/validators. so maybe using them as a safety check would be good + // TODO: but maybe we could include privates in the "backup" tier + None, + ) + .await + .context("spawning bundler_4337_rpcs")?; + + app_handles.push(bundler_4337_rpcs_handle); + + Some(bundler_4337_rpcs) + }; + + let hostname = hostname::get() + .ok() + .and_then(|x| x.to_str().map(|x| x.to_string())); let app = Self { config: top_config.app.clone(), balanced_rpcs, + bundler_4337_rpcs, http_client, kafka_producer, private_rpcs, @@ -778,6 +815,17 @@ impl Web3ProxyApp { } } + if let Some(bundler_4337_rpc_configs) = new_top_config.bundler_4337_rpcs { + if let Some(bundler_4337_rpcs) = self.bundler_4337_rpcs.as_ref() { + bundler_4337_rpcs + .apply_server_configs(self, bundler_4337_rpc_configs) + .await?; + } else { + // TODO: maybe we should have bundler_4337_rpcs just be empty instead of being None + todo!("handle toggling bundler_4337_rpcs") + } + } + Ok(()) } @@ -1074,6 +1122,7 @@ impl Web3ProxyApp { } // #[measure([ErrorCount, HitCount, ResponseTime, Throughput])] + // TODO: more robust stats and kafka logic! if we use the try operator, they aren't saved! async fn proxy_cached_request( self: &Arc, authorization: &Arc, @@ -1250,6 +1299,59 @@ impl Web3ProxyApp { vec![], )); } + method @ ("debug_bundler_sendBundleNow" + | "debug_bundler_clearState" + | "debug_bundler_dumpMempool") => { + return Ok(( + JsonRpcForwardedResponse::from_string( + // TODO: we should probably have some escaping on this. but maybe serde will protect us enough + format!("method unsupported: {}", method), + None, + Some(request_id), + ), + vec![], + )); + } + _method @ ("eth_sendUserOperation" + | "eth_estimateUserOperationGas" + | "eth_getUserOperationByHash" + | "eth_getUserOperationReceipt" + | "eth_supportedEntryPoints") => match self.bundler_4337_rpcs.as_ref() { + Some(bundler_4337_rpcs) => { + let response = bundler_4337_rpcs + .try_proxy_connection( + authorization, + request, + Some(&request_metadata), + None, + None, + ) + .await?; + + // TODO: DRY + let rpcs = request_metadata.backend_requests.lock().clone(); + + if let Some(stat_sender) = self.stat_sender.as_ref() { + let response_stat = ProxyResponseStat::new( + request_method, + authorization.clone(), + request_metadata, + response.num_bytes(), + ); + + stat_sender + .send_async(response_stat.into()) + .await + .context("stat_sender sending bundler_4337 response stat")?; + } + + return Ok((response, rpcs)); + } + None => { + // TODO: stats! + return Err(anyhow::anyhow!("no bundler_4337_rpcs available").into()); + } + }, // some commands can use local data or caches "eth_accounts" => { // no stats on this. its cheap @@ -1297,6 +1399,8 @@ impl Web3ProxyApp { // i think this is always an error response let rpcs = request_metadata.backend_requests.lock().clone(); + // TODO! save stats + return Ok((response, rpcs)); }; @@ -1492,7 +1596,6 @@ impl Web3ProxyApp { serde_json::Value::String(APP_USER_AGENT.to_string()) } "web3_sha3" => { - // emit stats // returns Keccak-256 (not the standardized SHA3-256) of the given data. match &request.params { Some(serde_json::Value::Array(params)) => { @@ -1558,8 +1661,6 @@ impl Web3ProxyApp { return Err(FrontendErrorResponse::AccessDenied); } - // emit stats - // TODO: if no servers synced, wait for them to be synced? probably better to error and let haproxy retry another server let head_block_num = head_block_num .or(self.balanced_rpcs.head_block_num()) diff --git a/web3_proxy/src/bin/web3_proxy_cli/proxyd.rs b/web3_proxy/src/bin/web3_proxy_cli/proxyd.rs index 3c16dc59..b30ef1ef 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/proxyd.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/proxyd.rs @@ -316,6 +316,7 @@ mod tests { ), ]), private_rpcs: None, + bundler_4337_rpcs: None, extra: Default::default(), }; diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index 5501091c..40ec8828 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -42,8 +42,8 @@ pub struct CliConfig { pub struct TopConfig { pub app: AppConfig, pub balanced_rpcs: HashMap, - // TODO: instead of an option, give it a default pub private_rpcs: Option>, + pub bundler_4337_rpcs: Option>, /// unknown config options get put here #[serde(flatten, default = "HashMap::default")] pub extra: HashMap, From 3621d71037024d3a5bded75fe6c01068a3218c2f Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Fri, 14 Apr 2023 00:15:27 -0700 Subject: [PATCH 04/12] if not watching heads, send to any server --- web3_proxy/src/rpcs/many.rs | 274 +++++++++++++++++++----------------- 1 file changed, 147 insertions(+), 127 deletions(-) diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 3d944ac0..b964a9d9 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -492,142 +492,162 @@ impl Web3Rpcs { max_block_needed: Option<&U64>, ) -> anyhow::Result { let usable_rpcs_by_tier_and_head_number: BTreeMap<(u64, Option), Vec>> = { - let synced_connections = self.watch_consensus_rpcs_sender.borrow().clone(); + if self.watch_consensus_head_sender.is_none() { + // pick any server + let mut m = BTreeMap::new(); - if synced_connections.is_none() { - return Ok(OpenRequestResult::NotReady); - } - let synced_connections = - synced_connections.expect("synced_connections can't be None here"); + let key = (0, None); - let head_block_num = synced_connections.head_block.number(); - let head_block_age = synced_connections.head_block.age(); - - // TODO: double check the logic on this. especially if only min is set - let needed_blocks_comparison = match (min_block_needed, max_block_needed) { - (None, None) => { - // no required block given. treat this like they requested the consensus head block - cmp::Ordering::Equal - } - (None, Some(max_block_needed)) => max_block_needed.cmp(head_block_num), - (Some(min_block_needed), None) => min_block_needed.cmp(head_block_num), - (Some(min_block_needed), Some(max_block_needed)) => { - match min_block_needed.cmp(max_block_needed) { - cmp::Ordering::Less | cmp::Ordering::Equal => { - min_block_needed.cmp(head_block_num) - } - cmp::Ordering::Greater => { - // TODO: force a debug log of the original request to see if our logic is wrong? - // TODO: attach the rpc_key_id so we can find the user to ask if they need help - return Err(anyhow::anyhow!( - "Invalid blocks bounds requested. min ({}) > max ({})", - min_block_needed, - max_block_needed - )); - } + for x in self.by_name.read().values() { + if skip.contains(x) { + trace!("skipping: {}", x); + continue; } + trace!("not skipped!"); + + m.entry(key).or_insert_with(Vec::new).push(x.clone()); } - }; - trace!("needed_blocks_comparison: {:?}", needed_blocks_comparison); + m + } else { + let synced_connections = self.watch_consensus_rpcs_sender.borrow().clone(); - // collect "usable_rpcs_by_head_num_and_weight" - // TODO: MAKE SURE None SORTS LAST? - let mut m = BTreeMap::new(); - - match needed_blocks_comparison { - cmp::Ordering::Less => { - // need an old block. check all the rpcs. ignore rpcs that are still syncing - trace!("old block needed"); - - let min_block_age = - self.max_block_age.map(|x| head_block_age.saturating_sub(x)); - let min_sync_num = self.max_block_lag.map(|x| head_block_num.saturating_sub(x)); - - // TODO: cache this somehow? - // TODO: maybe have a helper on synced_connections? that way sum_soft_limits/min_synced_rpcs will be DRY - for x in self - .by_name - .read() - .values() - .filter(|x| { - // TODO: move a bunch of this onto a rpc.is_synced function - #[allow(clippy::if_same_then_else)] - if skip.contains(x) { - // we've already tried this server or have some other reason to skip it - false - } else if max_block_needed - .map(|max_block_needed| !x.has_block_data(max_block_needed)) - .unwrap_or(false) - { - // server does not have the max block - trace!( - "{} does not have the max block ({:?})", - x, - max_block_needed - ); - false - } else { - !min_block_needed - .map(|min_block_needed| !x.has_block_data(min_block_needed)) - .unwrap_or(false) - } - }) - .cloned() - { - let x_head_block = x.head_block.read().clone(); - - if let Some(x_head) = x_head_block { - // TODO: should nodes that are ahead of the consensus block have priority? seems better to spread the load - let x_head_num = x_head.number().min(head_block_num); - - // TODO: do we really need to check head_num and age? - if let Some(min_sync_num) = min_sync_num.as_ref() { - if x_head_num < min_sync_num { - trace!("rpc is still syncing"); - continue; - } - } - if let Some(min_block_age) = min_block_age { - if x_head.age() > min_block_age { - // rpc is still syncing - trace!("server's block is too old"); - continue; - } - } - - let key = (x.tier, Some(*x_head_num)); - - m.entry(key).or_insert_with(Vec::new).push(x); - } - } - - // TODO: check min_synced_rpcs and min_sum_soft_limits? or maybe better to just try to serve the request? - } - cmp::Ordering::Equal => { - // using the consensus head block. filter the synced rpcs - - // the key doesn't matter if we are checking synced connections - // they are all at the same block and it is already sized to what we need - let key = (0, None); - - for x in synced_connections.best_rpcs.iter() { - if skip.contains(x) { - trace!("skipping: {}", x); - continue; - } - trace!("not skipped!"); - - m.entry(key).or_insert_with(Vec::new).push(x.clone()); - } - } - cmp::Ordering::Greater => { - // TODO? if the blocks is close, maybe we could wait for change on a watch_consensus_connections_receiver().subscribe() + if synced_connections.is_none() { return Ok(OpenRequestResult::NotReady); } - } + let synced_connections = + synced_connections.expect("synced_connections can't be None here"); - m + let head_block_num = synced_connections.head_block.number(); + let head_block_age = synced_connections.head_block.age(); + + // TODO: double check the logic on this. especially if only min is set + let needed_blocks_comparison = match (min_block_needed, max_block_needed) { + (None, None) => { + // no required block given. treat this like they requested the consensus head block + cmp::Ordering::Equal + } + (None, Some(max_block_needed)) => max_block_needed.cmp(head_block_num), + (Some(min_block_needed), None) => min_block_needed.cmp(head_block_num), + (Some(min_block_needed), Some(max_block_needed)) => { + match min_block_needed.cmp(max_block_needed) { + cmp::Ordering::Less | cmp::Ordering::Equal => { + min_block_needed.cmp(head_block_num) + } + cmp::Ordering::Greater => { + // TODO: force a debug log of the original request to see if our logic is wrong? + // TODO: attach the rpc_key_id so we can find the user to ask if they need help + return Err(anyhow::anyhow!( + "Invalid blocks bounds requested. min ({}) > max ({})", + min_block_needed, + max_block_needed + )); + } + } + } + }; + + trace!("needed_blocks_comparison: {:?}", needed_blocks_comparison); + + // collect "usable_rpcs_by_head_num_and_weight" + // TODO: MAKE SURE None SORTS LAST? + let mut m = BTreeMap::new(); + + match needed_blocks_comparison { + cmp::Ordering::Less => { + // need an old block. check all the rpcs. ignore rpcs that are still syncing + trace!("old block needed"); + + let min_block_age = + self.max_block_age.map(|x| head_block_age.saturating_sub(x)); + let min_sync_num = + self.max_block_lag.map(|x| head_block_num.saturating_sub(x)); + + // TODO: cache this somehow? + // TODO: maybe have a helper on synced_connections? that way sum_soft_limits/min_synced_rpcs will be DRY + for x in self + .by_name + .read() + .values() + .filter(|x| { + // TODO: move a bunch of this onto a rpc.is_synced function + #[allow(clippy::if_same_then_else)] + if skip.contains(x) { + // we've already tried this server or have some other reason to skip it + false + } else if max_block_needed + .map(|max_block_needed| !x.has_block_data(max_block_needed)) + .unwrap_or(false) + { + // server does not have the max block + trace!( + "{} does not have the max block ({:?})", + x, + max_block_needed + ); + false + } else { + !min_block_needed + .map(|min_block_needed| !x.has_block_data(min_block_needed)) + .unwrap_or(false) + } + }) + .cloned() + { + let x_head_block = x.head_block.read().clone(); + + if let Some(x_head) = x_head_block { + // TODO: should nodes that are ahead of the consensus block have priority? seems better to spread the load + let x_head_num = x_head.number().min(head_block_num); + + // TODO: do we really need to check head_num and age? + if let Some(min_sync_num) = min_sync_num.as_ref() { + if x_head_num < min_sync_num { + trace!("rpc is still syncing"); + continue; + } + } + if let Some(min_block_age) = min_block_age { + if x_head.age() > min_block_age { + // rpc is still syncing + trace!("server's block is too old"); + continue; + } + } + + let key = (x.tier, Some(*x_head_num)); + + m.entry(key).or_insert_with(Vec::new).push(x); + } + } + + // TODO: check min_synced_rpcs and min_sum_soft_limits? or maybe better to just try to serve the request? + } + cmp::Ordering::Equal => { + // using the consensus head block. filter the synced rpcs + + // the key doesn't matter if we are checking synced connections + // they are all at the same block and it is already sized to what we need + let key = (0, None); + + for x in synced_connections.best_rpcs.iter() { + if skip.contains(x) { + trace!("skipping: {}", x); + continue; + } + trace!("not skipped!"); + + m.entry(key).or_insert_with(Vec::new).push(x.clone()); + } + } + cmp::Ordering::Greater => { + // TODO? if the blocks is close, maybe we could wait for change on a watch_consensus_connections_receiver().subscribe() + return Ok(OpenRequestResult::NotReady); + } + } + + m + } }; trace!( From 8ed71e1cf1b5374809d893f92be644aaef118b79 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Fri, 14 Apr 2023 00:36:46 -0700 Subject: [PATCH 05/12] more goerli fixes --- web3_proxy/src/rpcs/many.rs | 7 ++++++- web3_proxy/src/rpcs/one.rs | 2 +- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index b964a9d9..229c60ed 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -95,6 +95,8 @@ impl Web3Rpcs { let expected_block_time_ms = match chain_id { // ethereum 1 => 12_000, + // ethereum-goerli + 5 => 12_000, // polygon 137 => 2_000, // fantom @@ -103,7 +105,10 @@ impl Web3Rpcs { 42161 => 500, // anything else _ => { - warn!("unexpected chain_id. polling every {} seconds", 10); + warn!( + "unexpected chain_id ({}). polling every {} seconds", + chain_id, 10 + ); 10_000 } }; diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index 6c4f73a8..ae196bd4 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -706,7 +706,7 @@ impl Web3Rpc { // this does loop. just only when reconnect is enabled #[allow(clippy::never_loop)] loop { - debug!("subscription loop started"); + trace!("subscription loop started on {}", self); let mut futures = vec![]; From df19619b77851be678686355570f9648488769b4 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Fri, 14 Apr 2023 00:38:49 -0700 Subject: [PATCH 06/12] add context to apply_server_configs --- web3_proxy/src/app/mod.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index c697f572..57e64b2d 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -802,13 +802,15 @@ impl Web3ProxyApp { // connect to the backends self.balanced_rpcs .apply_server_configs(self, new_top_config.balanced_rpcs) - .await?; + .await + .context("updating balanced rpcs")?; if let Some(private_rpc_configs) = new_top_config.private_rpcs { if let Some(private_rpcs) = self.private_rpcs.as_ref() { private_rpcs .apply_server_configs(self, private_rpc_configs) - .await?; + .await + .context("updating private_rpcs")?; } else { // TODO: maybe we should have private_rpcs just be empty instead of being None todo!("handle toggling private_rpcs") @@ -819,7 +821,8 @@ impl Web3ProxyApp { if let Some(bundler_4337_rpcs) = self.bundler_4337_rpcs.as_ref() { bundler_4337_rpcs .apply_server_configs(self, bundler_4337_rpc_configs) - .await?; + .await + .context("updating bundler_4337_rpcs")?; } else { // TODO: maybe we should have bundler_4337_rpcs just be empty instead of being None todo!("handle toggling bundler_4337_rpcs") From c2710858e142d2ab1d1d34c7ed80092dbbdac2f9 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Fri, 14 Apr 2023 00:41:51 -0700 Subject: [PATCH 07/12] only warn if rpc_configs is too short --- web3_proxy/src/rpcs/many.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 229c60ed..539e21dc 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -211,11 +211,14 @@ impl Web3Rpcs { ) -> anyhow::Result<()> { // safety checks if rpc_configs.len() < app.config.min_synced_rpcs { - return Err(anyhow::anyhow!( + // TODO: don't count disabled servers! + // TODO: include if this is balanced, private, or 4337 + warn!( "Only {}/{} rpcs! Add more rpcs or reduce min_synced_rpcs.", rpc_configs.len(), app.config.min_synced_rpcs - )); + ); + return Ok(()); } // safety check on sum soft limit From fffd645acf7c3b4ec7c03f91568d9ad0d3cfb145 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Fri, 14 Apr 2023 02:10:49 -0700 Subject: [PATCH 08/12] remove copypasta comment --- web3_proxy/src/app/mod.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 57e64b2d..6651bffa 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -715,10 +715,6 @@ impl Web3ProxyApp { 0, pending_transactions.clone(), None, - // subscribing to new heads here won't work well. if they are fast, they might be ahead of balanced_rpcs - // they also often have low rate limits - // however, they are well connected to miners/validators. so maybe using them as a safety check would be good - // TODO: but maybe we could include privates in the "backup" tier None, ) .await From 2652f88f52bf8f5e01cdca2f10f82bae056f27f8 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Fri, 14 Apr 2023 03:43:49 -0700 Subject: [PATCH 09/12] attach more contexts --- web3_proxy/src/rpcs/one.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index ae196bd4..51ddfe0d 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -537,7 +537,8 @@ impl Web3Rpc { // trace!("waiting on chain id for {}", self); let found_chain_id: Result = self .wait_for_request_handle(&authorization, None, unlocked_provider.clone()) - .await? + .await + .context(format!("waiting for request handle on {}", self))? .request( "eth_chainId", &json!(Option::None::<()>), @@ -560,18 +561,20 @@ impl Web3Rpc { } } Err(e) => { - return Err(anyhow::Error::from(e)); + return Err(anyhow::Error::from(e) + .context(format!("unable to parse eth_chainId from {}", self))); } } self.check_block_data_limit(&authorization, unlocked_provider.clone()) - .await?; + .await + .context(format!("unable to parse eth_chainId from {}", self))?; drop(unlocked_provider); info!("successfully connected to {}", self); } else if self.provider.read().await.is_none() { - return Err(anyhow!("failed waiting for client")); + return Err(anyhow!("failed waiting for client {}", self)); }; Ok(()) From 5ff28943da858112f64a121d1e219a46e73bb356 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Fri, 14 Apr 2023 03:44:32 -0700 Subject: [PATCH 10/12] fix incorrect context --- web3_proxy/src/rpcs/one.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index 51ddfe0d..87ff1210 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -568,7 +568,7 @@ impl Web3Rpc { self.check_block_data_limit(&authorization, unlocked_provider.clone()) .await - .context(format!("unable to parse eth_chainId from {}", self))?; + .context(format!("unable to check_block_data_limit of {}", self))?; drop(unlocked_provider); From e921d02eb294e7a4f53655e18e0b70316e446791 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Mon, 17 Apr 2023 16:48:01 -0700 Subject: [PATCH 11/12] remove stale todo --- web3_proxy/src/rpcs/one.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index 87ff1210..3717c0ea 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -185,7 +185,6 @@ impl Web3Rpc { }; let tx_id_sender = if config.subscribe_txs { - // TODO: warn if tx_id_sender is None? tx_id_sender } else { None From bafb6cdd8f3053489a60155fb9510775f5420364 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Tue, 18 Apr 2023 10:36:53 -0700 Subject: [PATCH 12/12] copy migrations from devel (#51) --- Cargo.lock | 2 +- migration/Cargo.toml | 2 +- migration/src/m20230125_204810_stats_v2.rs | 159 ++++++++++++++++++ ...ate_rpc_accounting_to_rpc_accounting_v2.rs | 37 ++++ 4 files changed, 198 insertions(+), 2 deletions(-) create mode 100644 migration/src/m20230125_204810_stats_v2.rs create mode 100644 migration/src/m20230307_002623_migrate_rpc_accounting_to_rpc_accounting_v2.rs diff --git a/Cargo.lock b/Cargo.lock index f5592e65..5969f100 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2938,7 +2938,7 @@ dependencies = [ [[package]] name = "migration" -version = "0.17.0" +version = "0.19.0" dependencies = [ "sea-orm-migration", "tokio", diff --git a/migration/Cargo.toml b/migration/Cargo.toml index cea86b6b..06cc2134 100644 --- a/migration/Cargo.toml +++ b/migration/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "migration" -version = "0.17.0" +version = "0.19.0" edition = "2021" publish = false diff --git a/migration/src/m20230125_204810_stats_v2.rs b/migration/src/m20230125_204810_stats_v2.rs new file mode 100644 index 00000000..7082fec0 --- /dev/null +++ b/migration/src/m20230125_204810_stats_v2.rs @@ -0,0 +1,159 @@ +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .create_table( + Table::create() + .table(RpcAccountingV2::Table) + .col( + ColumnDef::new(RpcAccountingV2::Id) + .big_unsigned() + .not_null() + .auto_increment() + .primary_key(), + ) + .col( + ColumnDef::new(RpcAccountingV2::RpcKeyId) + .big_unsigned() + .not_null() + .default(0), + ) + .col( + ColumnDef::new(RpcAccountingV2::ChainId) + .big_unsigned() + .not_null(), + ) + .col( + ColumnDef::new(RpcAccountingV2::Origin) + .string() + .not_null() + .default(""), + ) + .col( + ColumnDef::new(RpcAccountingV2::PeriodDatetime) + .timestamp() + .not_null(), + ) + .col( + ColumnDef::new(RpcAccountingV2::Method) + .string() + .not_null() + .default(""), + ) + .col( + ColumnDef::new(RpcAccountingV2::ArchiveNeeded) + .boolean() + .not_null(), + ) + .col( + ColumnDef::new(RpcAccountingV2::ErrorResponse) + .boolean() + .not_null(), + ) + .col( + ColumnDef::new(RpcAccountingV2::FrontendRequests) + .big_unsigned() + .not_null(), + ) + .col( + ColumnDef::new(RpcAccountingV2::BackendRequests) + .big_unsigned() + .not_null(), + ) + .col( + ColumnDef::new(RpcAccountingV2::BackendRetries) + .big_unsigned() + .not_null(), + ) + .col( + ColumnDef::new(RpcAccountingV2::NoServers) + .big_unsigned() + .not_null(), + ) + .col( + ColumnDef::new(RpcAccountingV2::CacheMisses) + .big_unsigned() + .not_null(), + ) + .col( + ColumnDef::new(RpcAccountingV2::CacheHits) + .big_unsigned() + .not_null(), + ) + .col( + ColumnDef::new(RpcAccountingV2::SumRequestBytes) + .big_unsigned() + .not_null(), + ) + .col( + ColumnDef::new(RpcAccountingV2::SumResponseMillis) + .big_unsigned() + .not_null(), + ) + .col( + ColumnDef::new(RpcAccountingV2::SumResponseBytes) + .big_unsigned() + .not_null(), + ) + // cannot use NULL columns for any of these because unique indexes allow duplicates on NULL + .index( + sea_query::Index::create() + .col(RpcAccountingV2::RpcKeyId) + .col(RpcAccountingV2::ChainId) + .col(RpcAccountingV2::Origin) + .col(RpcAccountingV2::PeriodDatetime) + .col(RpcAccountingV2::Method) + .col(RpcAccountingV2::ArchiveNeeded) + .col(RpcAccountingV2::ErrorResponse) + .unique(), + ) + // cannot use a foreign key for RpcKeyId because the UNIQUE index uses 0 instead of NULL + .index(sea_query::Index::create().col(RpcAccountingV2::RpcKeyId)) + .index(sea_query::Index::create().col(RpcAccountingV2::ChainId)) + .index(sea_query::Index::create().col(RpcAccountingV2::Origin)) + .index(sea_query::Index::create().col(RpcAccountingV2::PeriodDatetime)) + .index(sea_query::Index::create().col(RpcAccountingV2::Method)) + .index(sea_query::Index::create().col(RpcAccountingV2::ArchiveNeeded)) + .index(sea_query::Index::create().col(RpcAccountingV2::ErrorResponse)) + .to_owned(), + ) + .await?; + + Ok(()) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .drop_table(Table::drop().table(RpcAccountingV2::Table).to_owned()) + .await?; + + Ok(()) + } +} + +#[derive(Iden)] +enum RpcAccountingV2 { + Table, + Id, + RpcKeyId, + ChainId, + Origin, + PeriodDatetime, + Method, + ArchiveNeeded, + ErrorResponse, + FrontendRequests, + BackendRequests, + BackendRetries, + NoServers, + CacheMisses, + CacheHits, + SumRequestBytes, + SumResponseMillis, + SumResponseBytes, +} diff --git a/migration/src/m20230307_002623_migrate_rpc_accounting_to_rpc_accounting_v2.rs b/migration/src/m20230307_002623_migrate_rpc_accounting_to_rpc_accounting_v2.rs new file mode 100644 index 00000000..46a82c93 --- /dev/null +++ b/migration/src/m20230307_002623_migrate_rpc_accounting_to_rpc_accounting_v2.rs @@ -0,0 +1,37 @@ +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + // Add a nullable timestamp column to check if things were migrated in the rpc_accounting table + manager + .alter_table( + Table::alter() + .table(RpcAccounting::Table) + .add_column(ColumnDef::new(RpcAccounting::Migrated).timestamp()) + .to_owned(), + ) + .await + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .alter_table( + Table::alter() + .table(RpcAccounting::Table) + .drop_column(RpcAccounting::Migrated) + .to_owned(), + ) + .await + } +} + +/// partial table for RpcAccounting +#[derive(Iden)] +enum RpcAccounting { + Table, + Migrated, +}