From 3c2f95de1093c60b3176277ad6fed52a21f74114 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Tue, 20 Jun 2023 14:22:14 -0700 Subject: [PATCH] Improve max wait and timeouts (#137) * pass max wait with more functions * move rpc_head_str higher up to use it in all the logs * refresh consensus at double the block time * new pricing that also includes archive check * lint * don't clone method * put caches into one list they serialize with their name, so it will still be easy to tell them apart * more consistent waiting. still waits more than I want right now * clean up default max waits * more timeouts --- web3_proxy/src/app/mod.rs | 23 ++- web3_proxy/src/block_number.rs | 2 +- web3_proxy/src/compute_units.rs | 9 +- web3_proxy/src/frontend/mod.rs | 2 +- web3_proxy/src/frontend/status.rs | 14 +- web3_proxy/src/rpcs/blockchain.rs | 282 ++++++-------------------- web3_proxy/src/rpcs/consensus.rs | 238 +++++++++++++++++++++- web3_proxy/src/rpcs/many.rs | 315 ++++++++++++++++++------------ web3_proxy/src/rpcs/one.rs | 41 ++-- web3_proxy/src/stats/mod.rs | 18 +- web3_proxy/src/user_token.rs | 3 +- 11 files changed, 545 insertions(+), 402 deletions(-) diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 79e6cd0f..8690efb9 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -1071,6 +1071,7 @@ impl Web3ProxyApp { Some(request_metadata), None, None, + Some(Duration::from_secs(30)), Some(Level::Trace.into()), None, true, @@ -1101,6 +1102,7 @@ impl Web3ProxyApp { Some(request_metadata), None, None, + Some(Duration::from_secs(30)), Some(Level::Trace.into()), num_public_rpcs, true, @@ -1271,6 +1273,7 @@ impl Web3ProxyApp { method, ¶ms, Some(request_metadata), + Some(Duration::from_secs(30)), None, None, ) @@ -1311,6 +1314,7 @@ impl Web3ProxyApp { method, ¶ms, Some(request_metadata), + Some(Duration::from_secs(30)), None, None, ) @@ -1343,6 +1347,7 @@ impl Web3ProxyApp { method, ¶ms, Some(request_metadata), + Some(Duration::from_secs(30)), None, None, ) @@ -1366,6 +1371,7 @@ impl Web3ProxyApp { method, ¶ms, Some(request_metadata), + Some(Duration::from_secs(30)), Some(&U64::one()), None, ) @@ -1610,7 +1616,7 @@ impl Web3ProxyApp { let request_block = self .balanced_rpcs - .block(&authorization, &request_block_hash, None) + .block(&authorization, &request_block_hash, None, None) .await? .block; @@ -1640,7 +1646,7 @@ impl Web3ProxyApp { let from_block = self .balanced_rpcs - .block(&authorization, &from_block_hash, None) + .block(&authorization, &from_block_hash, None, None) .await? .block; @@ -1651,7 +1657,7 @@ impl Web3ProxyApp { let to_block = self .balanced_rpcs - .block(&authorization, &to_block_hash, None) + .block(&authorization, &to_block_hash, None, None) .await? .block; @@ -1666,7 +1672,7 @@ impl Web3ProxyApp { }; // TODO: different timeouts for different user tiers. get the duration out of the request_metadata - let duration = Duration::from_secs(240); + let max_wait = Duration::from_secs(240); if let Some(cache_key) = cache_key { let from_block_num = cache_key.from_block_num(); @@ -1679,16 +1685,16 @@ impl Web3ProxyApp { .jsonrpc_response_cache .try_get_with::<_, Web3ProxyError>(cache_key.hash(), async { let response_data = timeout( - duration, + max_wait + Duration::from_millis(10), self.balanced_rpcs .try_proxy_connection::<_, Arc>( method, ¶ms, Some(request_metadata), + Some(max_wait), from_block_num.as_ref(), to_block_num.as_ref(), - ) - ) + )) .await?; if !cache_jsonrpc_errors && let Err(err) = response_data { @@ -1705,12 +1711,13 @@ impl Web3ProxyApp { }).await? } else { let x = timeout( - duration, + max_wait + Duration::from_millis(10), self.balanced_rpcs .try_proxy_connection::<_, Arc>( method, ¶ms, Some(request_metadata), + Some(max_wait), None, None, ) diff --git a/web3_proxy/src/block_number.rs b/web3_proxy/src/block_number.rs index 999daf78..1f71cf54 100644 --- a/web3_proxy/src/block_number.rs +++ b/web3_proxy/src/block_number.rs @@ -76,7 +76,7 @@ pub async fn clean_block_number( serde_json::from_value(block_hash).context("decoding blockHash")?; let block = rpcs - .block(authorization, &block_hash, None) + .block(authorization, &block_hash, None, None) .await .context("fetching block number from hash")?; diff --git a/web3_proxy/src/compute_units.rs b/web3_proxy/src/compute_units.rs index 628c83dd..ead9da11 100644 --- a/web3_proxy/src/compute_units.rs +++ b/web3_proxy/src/compute_units.rs @@ -8,6 +8,7 @@ use log::warn; use migration::sea_orm::prelude::Decimal; +use std::str::FromStr; pub struct ComputeUnit(Decimal); @@ -121,12 +122,16 @@ impl ComputeUnit { /// Compute cost per request /// All methods cost the same /// The number of bytes are based on input, and output bytes - pub fn cost(&self, cache_hit: bool, usd_per_cu: Decimal) -> Decimal { + pub fn cost(&self, archive_request: bool, cache_hit: bool, usd_per_cu: Decimal) -> Decimal { let mut cost = self.0 * usd_per_cu; + if archive_request { + cost *= Decimal::from_str("2.5").unwrap(); + } + // cache hits get a 50% discount if cache_hit { - cost /= Decimal::from(2) + cost *= Decimal::from_str("0.75").unwrap() } cost diff --git a/web3_proxy/src/frontend/mod.rs b/web3_proxy/src/frontend/mod.rs index 54259b22..b9b5168f 100644 --- a/web3_proxy/src/frontend/mod.rs +++ b/web3_proxy/src/frontend/mod.rs @@ -17,7 +17,7 @@ use axum::{ }; use http::{header::AUTHORIZATION, StatusCode}; use listenfd::ListenFd; -use log::{info}; +use log::info; use moka::future::{Cache, CacheBuilder}; use std::net::SocketAddr; use std::sync::Arc; diff --git a/web3_proxy/src/frontend/status.rs b/web3_proxy/src/frontend/status.rs index b0e7b1e5..35a595f0 100644 --- a/web3_proxy/src/frontend/status.rs +++ b/web3_proxy/src/frontend/status.rs @@ -138,17 +138,19 @@ async fn _status(app: Arc) -> (StatusCode, &'static str, Bytes) { // TODO: the hostname is probably not going to change. only get once at the start? let body = json!({ "balanced_rpcs": app.balanced_rpcs, - "bearer_token_semaphores": MokaCacheSerializer(&app.bearer_token_semaphores), "bundler_4337_rpcs": app.bundler_4337_rpcs, + "caches": [ + MokaCacheSerializer(&app.bearer_token_semaphores), + MokaCacheSerializer(&app.ip_semaphores), + MokaCacheSerializer(&app.jsonrpc_response_cache), + MokaCacheSerializer(&app.rpc_secret_key_cache), + MokaCacheSerializer(&app.user_balance_cache), + MokaCacheSerializer(&app.user_semaphores), + ], "chain_id": app.config.chain_id, "hostname": app.hostname, - "ip_semaphores": MokaCacheSerializer(&app.ip_semaphores), - "jsonrpc_response_cache": MokaCacheSerializer(&app.jsonrpc_response_cache), "payment_factory_address": app.config.deposit_factory_contract, "private_rpcs": app.private_rpcs, - "rpc_secret_key_cache": MokaCacheSerializer(&app.rpc_secret_key_cache), - "user_balance_cache": MokaCacheSerializer(&app.user_balance_cache), - "user_semaphores": MokaCacheSerializer(&app.user_semaphores), "version": APP_USER_AGENT, }); diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index 09b76c87..3e1d7459 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -3,20 +3,21 @@ use super::consensus::ConsensusFinder; use super::many::Web3Rpcs; use super::one::Web3Rpc; use super::transactions::TxStatus; -use crate::config::BlockAndRpc; +use crate::config::{average_block_interval, BlockAndRpc}; use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult}; use crate::frontend::authorization::Authorization; use derive_more::From; use ethers::prelude::{Block, TxHash, H256, U64}; -use log::{debug, trace, warn}; +use log::{debug, error, trace, warn}; use moka::future::Cache; use serde::ser::SerializeStruct; use serde::Serialize; use serde_json::json; use std::hash::Hash; use std::time::Duration; -use std::{cmp::Ordering, fmt::Display, sync::Arc}; +use std::{fmt::Display, sync::Arc}; use tokio::sync::broadcast; +use tokio::time::timeout; // TODO: type for Hydrated Blocks with their full transactions? pub type ArcBlock = Arc>; @@ -269,6 +270,7 @@ impl Web3Rpcs { authorization: &Arc, hash: &H256, rpc: Option<&Arc>, + max_wait: Option, ) -> Web3ProxyResult { // first, try to get the hash from our cache // the cache is set last, so if its here, its everywhere @@ -299,13 +301,19 @@ impl Web3Rpcs { &get_block_params, authorization, None, + max_wait, ) .await? } else { // ask any rpc + // TODO: retry if "Requested data is not available" // TODO: request_with_metadata instead of internal_request - self.internal_request::<_, Option>("eth_getBlockByHash", &get_block_params) - .await? + self.internal_request::<_, Option>( + "eth_getBlockByHash", + &get_block_params, + max_wait, + ) + .await? }; match block { @@ -375,8 +383,8 @@ impl Web3Rpcs { // deref to not keep the lock open if let Some(block_hash) = self.blocks_by_number.get(num) { // TODO: sometimes this needs to fetch the block. why? i thought block_numbers would only be set if the block hash was set - // TODO: pass authorization through here? - let block = self.block(authorization, &block_hash, None).await?; + // TODO: configurable max wait and rpc + let block = self.block(authorization, &block_hash, None, None).await?; return Ok((block, block_depth)); } @@ -384,7 +392,7 @@ impl Web3Rpcs { // block number not in cache. we need to ask an rpc for it // TODO: this error is too broad let response = self - .internal_request::<_, Option>("eth_getBlockByNumber", &(*num, false)) + .internal_request::<_, Option>("eth_getBlockByNumber", &(*num, false), None) .await? .ok_or(Web3ProxyError::NoBlocksKnown)?; @@ -404,240 +412,68 @@ impl Web3Rpcs { // Geth's subscriptions have the same potential for skipping blocks. pending_tx_sender: Option>, ) -> Web3ProxyResult<()> { - let mut connection_heads = + let mut consensus_finder = ConsensusFinder::new(Some(self.max_head_block_age), Some(self.max_head_block_lag)); + // TODO: what timeout on block receiver? we want to keep consensus_finder fresh so that server tiers are correct + let double_block_time = average_block_interval(self.chain_id).mul_f32(2.0); + loop { - match block_receiver.recv_async().await { - Ok((new_block, rpc)) => { + match timeout(double_block_time, block_receiver.recv_async()).await { + Ok(Ok((new_block, rpc))) => { let rpc_name = rpc.name.clone(); - if let Err(err) = self - .process_block_from_rpc( + // TODO: what timeout on this? + match timeout( + Duration::from_secs(2), + consensus_finder.process_block_from_rpc( + self, authorization, - &mut connection_heads, new_block, rpc, &pending_tx_sender, - ) - .await + ), + ) + .await { - warn!( - "error while processing block from rpc {}: {:#?}", - rpc_name, err - ); + Ok(Ok(_)) => {} + Ok(Err(err)) => { + error!( + "error while processing block from rpc {}: {:#?}", + rpc_name, err + ); + } + Err(timeout) => { + error!( + "timeout while processing block from {}: {:#?}", + rpc_name, timeout + ); + } } } - Err(err) => { - warn!("block_receiver exited! {:#?}", err); + Ok(Err(err)) => { + // TODO: panic is probably too much, but getting here is definitely not good + error!("block_receiver on {} exited! {:#?}", self, err); return Err(err.into()); } - } - } - } - - /// `connection_heads` is a mapping of rpc_names to head block hashes. - /// self.blockchain_map is a mapping of hashes to the complete ArcBlock. - /// TODO: return something? - pub(crate) async fn process_block_from_rpc( - &self, - authorization: &Arc, - consensus_finder: &mut ConsensusFinder, - new_block: Option, - rpc: Arc, - _pending_tx_sender: &Option>, - ) -> Web3ProxyResult<()> { - // TODO: how should we handle an error here? - if !consensus_finder - .update_rpc(new_block.clone(), rpc.clone(), self) - .await - .web3_context("failed to update rpc")? - { - // nothing changed. no need to scan for a new consensus head - return Ok(()); - } - - let new_consensus_rpcs = match consensus_finder - .find_consensus_connections(authorization, self) - .await - { - Err(err) => { - return Err(err).web3_context("error while finding consensus head block!"); - } - Ok(None) => { - return Err(Web3ProxyError::NoConsensusHeadBlock); - } - Ok(Some(x)) => x, - }; - - trace!("new_synced_connections: {:#?}", new_consensus_rpcs); - - let watch_consensus_head_sender = self.watch_consensus_head_sender.as_ref().unwrap(); - let consensus_tier = new_consensus_rpcs.tier; - // TODO: think more about the default for total_tiers - let total_tiers = consensus_finder.worst_tier().unwrap_or_default(); - let backups_needed = new_consensus_rpcs.backups_needed; - let consensus_head_block = new_consensus_rpcs.head_block.clone(); - let num_consensus_rpcs = new_consensus_rpcs.num_consensus_rpcs(); - let num_active_rpcs = consensus_finder.len(); - let total_rpcs = self.len(); - - let new_consensus_rpcs = Arc::new(new_consensus_rpcs); - - let old_consensus_head_connections = self - .watch_consensus_rpcs_sender - .send_replace(Some(new_consensus_rpcs.clone())); - - let backups_voted_str = if backups_needed { "B " } else { "" }; - - match old_consensus_head_connections.as_ref() { - None => { - debug!( - "first {}/{} {}{}/{}/{} block={}, rpc={}", - consensus_tier, - total_tiers, - backups_voted_str, - num_consensus_rpcs, - num_active_rpcs, - total_rpcs, - consensus_head_block, - rpc, - ); - - if backups_needed { - // TODO: what else should be in this error? - warn!("Backup RPCs are in use!"); - } - - // this should already be cached - let consensus_head_block = self.try_cache_block(consensus_head_block, true).await?; - - watch_consensus_head_sender - .send(Some(consensus_head_block)) - .or(Err(Web3ProxyError::WatchSendError)) - .web3_context( - "watch_consensus_head_sender failed sending first consensus_head_block", - )?; - } - Some(old_consensus_connections) => { - let old_head_block = &old_consensus_connections.head_block; - - // TODO: do this log item better - let rpc_head_str = new_block - .map(|x| x.to_string()) - .unwrap_or_else(|| "None".to_string()); - - match consensus_head_block.number().cmp(old_head_block.number()) { - Ordering::Equal => { - // multiple blocks with the same fork! - if consensus_head_block.hash() == old_head_block.hash() { - // no change in hash. no need to use watch_consensus_head_sender - // TODO: trace level if rpc is backup - debug!( - "con {}/{} {}{}/{}/{} con={} rpc={}@{}", - consensus_tier, - total_tiers, - backups_voted_str, - num_consensus_rpcs, - num_active_rpcs, - total_rpcs, - consensus_head_block, - rpc, - rpc_head_str, - ) - } else { - // hash changed - - debug!( - "unc {}/{} {}{}/{}/{} con={} old={} rpc={}@{}", - consensus_tier, - total_tiers, - backups_voted_str, - num_consensus_rpcs, - num_active_rpcs, - total_rpcs, - consensus_head_block, - old_head_block, - rpc, - rpc_head_str, - ); - - let consensus_head_block = self - .try_cache_block(consensus_head_block, true) - .await - .web3_context("save consensus_head_block as heaviest chain")?; - - watch_consensus_head_sender - .send(Some(consensus_head_block)) - .or(Err(Web3ProxyError::WatchSendError)) - .web3_context("watch_consensus_head_sender failed sending uncled consensus_head_block")?; + Err(_) => { + // TODO: what timeout on this? + match timeout( + Duration::from_secs(2), + consensus_finder.refresh(self, authorization, None, None), + ) + .await + { + Ok(Ok(_)) => {} + Ok(Err(err)) => { + error!("error while refreshing consensus finder: {:#?}", err); } - } - Ordering::Less => { - // this is unlikely but possible - // TODO: better log that includes all the votes - warn!( - "chain rolled back {}/{} {}{}/{}/{} con={} old={} rpc={}@{}", - consensus_tier, - total_tiers, - backups_voted_str, - num_consensus_rpcs, - num_active_rpcs, - total_rpcs, - consensus_head_block, - old_head_block, - rpc, - rpc_head_str, - ); - - if backups_needed { - // TODO: what else should be in this error? - warn!("Backup RPCs are in use!"); + Err(timeout) => { + error!("timeout while refreshing consensus finder: {:#?}", timeout); } - - // TODO: tell save_block to remove any higher block numbers from the cache. not needed because we have other checks on requested blocks being > head, but still seems like a good idea - let consensus_head_block = self - .try_cache_block(consensus_head_block, true) - .await - .web3_context( - "save_block sending consensus_head_block as heaviest chain", - )?; - - watch_consensus_head_sender - .send(Some(consensus_head_block)) - .or(Err(Web3ProxyError::WatchSendError)) - .web3_context("watch_consensus_head_sender failed sending rollback consensus_head_block")?; - } - Ordering::Greater => { - debug!( - "new {}/{} {}{}/{}/{} con={} rpc={}@{}", - consensus_tier, - total_tiers, - backups_voted_str, - num_consensus_rpcs, - num_active_rpcs, - total_rpcs, - consensus_head_block, - rpc, - rpc_head_str, - ); - - if backups_needed { - // TODO: what else should be in this error? - warn!("Backup RPCs are in use!"); - } - - let consensus_head_block = - self.try_cache_block(consensus_head_block, true).await?; - - watch_consensus_head_sender.send(Some(consensus_head_block)) - .or(Err(Web3ProxyError::WatchSendError)) - .web3_context("watch_consensus_head_sender failed sending new consensus_head_block")?; } } } } - - Ok(()) } } diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index 16e07617..35d423de 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -1,7 +1,8 @@ use super::blockchain::Web3ProxyBlock; use super::many::Web3Rpcs; use super::one::Web3Rpc; -use crate::errors::{Web3ProxyErrorContext, Web3ProxyResult}; +use super::transactions::TxStatus; +use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult}; use crate::frontend::authorization::Authorization; use base64::engine::general_purpose; use derive_more::Constructor; @@ -10,7 +11,7 @@ use hashbrown::{HashMap, HashSet}; use hdrhistogram::serialization::{Serializer, V2DeflateSerializer}; use hdrhistogram::Histogram; use itertools::{Itertools, MinMaxResult}; -use log::{log_enabled, trace, warn, Level}; +use log::{debug, log_enabled, trace, warn, Level}; use moka::future::Cache; use serde::Serialize; use std::cmp::{Ordering, Reverse}; @@ -18,6 +19,7 @@ use std::collections::BTreeMap; use std::fmt; use std::sync::{atomic, Arc}; use std::time::Duration; +use tokio::sync::broadcast; use tokio::time::Instant; #[derive(Clone, Serialize)] @@ -369,6 +371,229 @@ impl ConsensusFinder { self.rpc_heads.is_empty() } + /// `connection_heads` is a mapping of rpc_names to head block hashes. + /// self.blockchain_map is a mapping of hashes to the complete ArcBlock. + /// TODO: return something? + /// TODO: move this onto ConsensusFinder + pub(super) async fn refresh( + &mut self, + web3_rpcs: &Web3Rpcs, + authorization: &Arc, + rpc: Option<&Arc>, + new_block: Option, + ) -> Web3ProxyResult<()> { + let new_consensus_rpcs = match self + .find_consensus_connections(authorization, web3_rpcs) + .await + { + Err(err) => { + return Err(err).web3_context("error while finding consensus head block!"); + } + Ok(None) => { + return Err(Web3ProxyError::NoConsensusHeadBlock); + } + Ok(Some(x)) => x, + }; + + trace!("new_synced_connections: {:#?}", new_consensus_rpcs); + + let watch_consensus_head_sender = web3_rpcs.watch_consensus_head_sender.as_ref().unwrap(); + let consensus_tier = new_consensus_rpcs.tier; + // TODO: think more about the default for total_tiers + let total_tiers = self.worst_tier().unwrap_or_default(); + let backups_needed = new_consensus_rpcs.backups_needed; + let consensus_head_block = new_consensus_rpcs.head_block.clone(); + let num_consensus_rpcs = new_consensus_rpcs.num_consensus_rpcs(); + let num_active_rpcs = self.len(); + let total_rpcs = web3_rpcs.len(); + + let new_consensus_rpcs = Arc::new(new_consensus_rpcs); + + let old_consensus_head_connections = web3_rpcs + .watch_consensus_rpcs_sender + .send_replace(Some(new_consensus_rpcs.clone())); + + let backups_voted_str = if backups_needed { "B " } else { "" }; + + let rpc_head_str = if let Some(rpc) = rpc.as_ref() { + format!( + "{}@{}", + rpc, + new_block + .map(|x| x.to_string()) + .unwrap_or_else(|| "None".to_string()), + ) + } else { + "None".to_string() + }; + + match old_consensus_head_connections.as_ref() { + None => { + debug!( + "first {}/{} {}{}/{}/{} block={}, rpc={}", + consensus_tier, + total_tiers, + backups_voted_str, + num_consensus_rpcs, + num_active_rpcs, + total_rpcs, + consensus_head_block, + rpc_head_str, + ); + + if backups_needed { + // TODO: what else should be in this error? + warn!("Backup RPCs are in use!"); + } + + // this should already be cached + let consensus_head_block = web3_rpcs + .try_cache_block(consensus_head_block, true) + .await?; + + watch_consensus_head_sender + .send(Some(consensus_head_block)) + .or(Err(Web3ProxyError::WatchSendError)) + .web3_context( + "watch_consensus_head_sender failed sending first consensus_head_block", + )?; + } + Some(old_consensus_connections) => { + let old_head_block = &old_consensus_connections.head_block; + + match consensus_head_block.number().cmp(old_head_block.number()) { + Ordering::Equal => { + // multiple blocks with the same fork! + if consensus_head_block.hash() == old_head_block.hash() { + // no change in hash. no need to use watch_consensus_head_sender + // TODO: trace level if rpc is backup + debug!( + "con {}/{} {}{}/{}/{} con={} rpc={}", + consensus_tier, + total_tiers, + backups_voted_str, + num_consensus_rpcs, + num_active_rpcs, + total_rpcs, + consensus_head_block, + rpc_head_str, + ) + } else { + // hash changed + + debug!( + "unc {}/{} {}{}/{}/{} con={} old={} rpc={}", + consensus_tier, + total_tiers, + backups_voted_str, + num_consensus_rpcs, + num_active_rpcs, + total_rpcs, + consensus_head_block, + old_head_block, + rpc_head_str, + ); + + let consensus_head_block = web3_rpcs + .try_cache_block(consensus_head_block, true) + .await + .web3_context("save consensus_head_block as heaviest chain")?; + + watch_consensus_head_sender + .send(Some(consensus_head_block)) + .or(Err(Web3ProxyError::WatchSendError)) + .web3_context("watch_consensus_head_sender failed sending uncled consensus_head_block")?; + } + } + Ordering::Less => { + // this is unlikely but possible + // TODO: better log that includes all the votes + warn!( + "chain rolled back {}/{} {}{}/{}/{} con={} old={} rpc={}", + consensus_tier, + total_tiers, + backups_voted_str, + num_consensus_rpcs, + num_active_rpcs, + total_rpcs, + consensus_head_block, + old_head_block, + rpc_head_str, + ); + + if backups_needed { + // TODO: what else should be in this error? + warn!("Backup RPCs are in use!"); + } + + // TODO: tell save_block to remove any higher block numbers from the cache. not needed because we have other checks on requested blocks being > head, but still seems like a good idea + let consensus_head_block = web3_rpcs + .try_cache_block(consensus_head_block, true) + .await + .web3_context( + "save_block sending consensus_head_block as heaviest chain", + )?; + + watch_consensus_head_sender + .send(Some(consensus_head_block)) + .or(Err(Web3ProxyError::WatchSendError)) + .web3_context("watch_consensus_head_sender failed sending rollback consensus_head_block")?; + } + Ordering::Greater => { + debug!( + "new {}/{} {}{}/{}/{} con={} rpc={}", + consensus_tier, + total_tiers, + backups_voted_str, + num_consensus_rpcs, + num_active_rpcs, + total_rpcs, + consensus_head_block, + rpc_head_str, + ); + + if backups_needed { + // TODO: what else should be in this error? + warn!("Backup RPCs are in use!"); + } + + let consensus_head_block = web3_rpcs + .try_cache_block(consensus_head_block, true) + .await?; + + watch_consensus_head_sender.send(Some(consensus_head_block)) + .or(Err(Web3ProxyError::WatchSendError)) + .web3_context("watch_consensus_head_sender failed sending new consensus_head_block")?; + } + } + } + } + + Ok(()) + } + + pub(super) async fn process_block_from_rpc( + &mut self, + web3_rpcs: &Web3Rpcs, + authorization: &Arc, + new_block: Option, + rpc: Arc, + _pending_tx_sender: &Option>, + ) -> Web3ProxyResult<()> { + // TODO: how should we handle an error here? + if !self + .update_rpc(new_block.clone(), rpc.clone(), web3_rpcs) + .await + .web3_context("failed to update rpc")? + { + // nothing changed. no need to scan for a new consensus head + return Ok(()); + } + + self.refresh(web3_rpcs, authorization, Some(&rpc), new_block) + .await + } + fn remove(&mut self, rpc: &Arc) -> Option { self.rpc_heads.remove(rpc) } @@ -590,14 +815,17 @@ impl ConsensusFinder { backup_entry.0.insert(rpc); backup_entry.1 += rpc.soft_limit; + // we used to specify rpc on this, but it shouldn't be necessary + let parent_hash = block_to_check.parent_hash(); match web3_rpcs - .block(authorization, block_to_check.parent_hash(), Some(rpc)) + .block(authorization, parent_hash, None, None) .await { Ok(parent_block) => block_to_check = parent_block, Err(err) => { - warn!( - "Problem fetching parent block of {:?} during consensus finding: {:#?}", + debug!( + "Problem fetching {:?} (parent of {:?}) during consensus finding: {:#?}", + parent_hash, block_to_check.hash(), err ); diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 58c4852e..7cf193c2 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -40,6 +40,7 @@ use tokio::time::{sleep, sleep_until, Duration, Instant}; pub struct Web3Rpcs { /// TODO: this should be a Cow pub(crate) name: String, + pub(crate) chain_id: u64, /// if watch_consensus_head_sender is some, Web3Rpc inside self will send blocks here when they get them pub(crate) block_sender: flume::Sender<(Option, Arc)>, /// any requests will be forwarded to one (or more) of these connections @@ -126,6 +127,7 @@ impl Web3Rpcs { blocks_by_hash, blocks_by_number, by_name, + chain_id, max_head_block_age, max_head_block_lag, min_synced_rpcs: min_head_rpcs, @@ -515,6 +517,8 @@ impl Web3Rpcs { max_wait: Option, error_handler: Option, ) -> Web3ProxyResult { + let start = Instant::now(); + let mut earliest_retry_at: Option = None; // TODO: pass db_conn to the "default" authorization for revert logging @@ -561,9 +565,6 @@ impl Web3Rpcs { } } } else { - let stop_trying_at = - Instant::now() + max_wait.unwrap_or_else(|| Duration::from_secs(10)); - let mut watch_consensus_rpcs = self.watch_consensus_rpcs_sender.subscribe(); let mut potential_rpcs = Vec::with_capacity(self.len()); @@ -703,23 +704,31 @@ impl Web3Rpcs { let waiting_for = min_block_needed.max(max_block_needed); - match consensus_rpcs.should_wait_for_block(waiting_for, skip_rpcs) { - ShouldWaitForBlock::NeverReady => break, - ShouldWaitForBlock::Ready => { - if Instant::now() > stop_trying_at { - break; + if let Some(max_wait) = max_wait { + match consensus_rpcs.should_wait_for_block(waiting_for, skip_rpcs) { + ShouldWaitForBlock::NeverReady => break, + ShouldWaitForBlock::Ready => { + if start.elapsed() > max_wait { + break; + } } + ShouldWaitForBlock::Wait { .. } => select! { + _ = watch_consensus_rpcs.changed() => { + // no need to borrow_and_update because we do that at the top of the loop + }, + _ = sleep_until(start + max_wait) => break, + }, } - ShouldWaitForBlock::Wait { .. } => select! { - _ = watch_consensus_rpcs.changed() => {}, - _ = sleep_until(stop_trying_at) => break, + } + } else if let Some(max_wait) = max_wait { + select! { + _ = watch_consensus_rpcs.changed() => { + // no need to borrow_and_update because we do that at the top of the loop }, + _ = sleep_until(start + max_wait) => break, } } else { - select! { - _ = watch_consensus_rpcs.changed() => {}, - _ = sleep_until(stop_trying_at) => break, - } + break; } } } @@ -769,6 +778,11 @@ impl Web3Rpcs { trace!("max_count: {}", max_count); + if max_count == 0 { + // TODO: return a future that resolves when we know a head block? + return Err(None); + } + let mut selected_rpcs = Vec::with_capacity(max_count); let mut tried = HashSet::new(); @@ -779,6 +793,7 @@ impl Web3Rpcs { if let Some(synced_rpcs) = synced_rpcs.as_ref() { synced_rpcs.head_rpcs.clone() } else { + // TODO: make this an Option instead of making an empty vec? vec![] } }; @@ -800,10 +815,6 @@ impl Web3Rpcs { .unwrap_or_default(); for rpc in itertools::chain(synced_rpcs, all_rpcs) { - if max_count == 0 { - break; - } - if tried.contains(&rpc) { continue; } @@ -813,10 +824,11 @@ impl Web3Rpcs { tried.insert(rpc.clone()); if !allow_backups && rpc.backup { - warn!("{} is a backup. skipping", rpc); + trace!("{} is a backup. skipping", rpc); continue; } + // TODO: this has_block_data check is in a few places now. move it onto the rpc if let Some(block_needed) = min_block_needed { if !rpc.has_block_data(block_needed) { trace!("{} is missing min_block_needed. skipping", rpc); @@ -840,8 +852,12 @@ impl Web3Rpcs { } Ok(OpenRequestResult::Handle(handle)) => { trace!("{} is available", rpc); + selected_rpcs.push(handle); + max_count -= 1; - selected_rpcs.push(handle) + if max_count == 0 { + break; + } } Ok(OpenRequestResult::NotReady) => { warn!("no request handle for {}", rpc) @@ -864,9 +880,10 @@ impl Web3Rpcs { &self, method: &str, params: &P, + max_wait: Option, ) -> Web3ProxyResult { // TODO: no request_metadata means we won't have stats on this internal request. - self.request_with_metadata(method, params, None, None, None) + self.request_with_metadata(method, params, None, max_wait, None, None) .await } @@ -876,6 +893,7 @@ impl Web3Rpcs { method: &str, params: &P, request_metadata: Option<&Arc>, + max_wait: Option, min_block_needed: Option<&U64>, max_block_needed: Option<&U64>, ) -> Web3ProxyResult { @@ -886,20 +904,24 @@ impl Web3Rpcs { let start = Instant::now(); - // TODO: get from config or arguments - let max_wait = Duration::from_secs(10); - + // set error_handler to Save. this might be overridden depending on the request_metadata.authorization let error_handler = Some(RequestErrorHandler::Save); // TODO: the loop here feels somewhat redundant with the loop in best_available_rpc - while start.elapsed() < max_wait { + loop { + if let Some(max_wait) = max_wait { + if start.elapsed() > max_wait { + break; + } + } + match self .wait_for_best_rpc( request_metadata, &mut skip_rpcs, min_block_needed, max_block_needed, - None, + max_wait, error_handler, ) .await? @@ -1135,18 +1157,22 @@ impl Web3Rpcs { request_metadata: Option<&Arc>, min_block_needed: Option<&U64>, max_block_needed: Option<&U64>, + max_wait: Option, error_level: Option, max_sends: Option, include_backups: bool, ) -> Web3ProxyResult> { let mut watch_consensus_rpcs = self.watch_consensus_rpcs_sender.subscribe(); - // TODO: get from config or function arguments - let max_wait = Duration::from_secs(5); + let start = Instant::now(); - let wait_until = Instant::now() + max_wait; + loop { + if let Some(max_wait) = max_wait { + if start.elapsed() > max_wait { + break; + } + } - while Instant::now() < wait_until { match self .all_connections( request_metadata, @@ -1197,34 +1223,54 @@ impl Web3Rpcs { request_metadata.no_servers.fetch_add(1, Ordering::AcqRel); } + let max_sleep = if let Some(max_wait) = max_wait { + start + max_wait + } else { + break; + }; + tokio::select! { - _ = sleep_until(wait_until) => break, + _ = sleep_until(max_sleep) => { + // rpcs didn't change and we have waited too long. break to return an error + warn!("timeout waiting for try_send_all_synced_connections!"); + break; + }, _ = watch_consensus_rpcs.changed() => { + // consensus rpcs changed! watch_consensus_rpcs.borrow_and_update(); + // continue to try again + continue; } } - continue; } Err(Some(retry_at)) => { if let Some(request_metadata) = &request_metadata { request_metadata.no_servers.fetch_add(1, Ordering::AcqRel); } - if retry_at > wait_until { - warn!("All rate limits exceeded. And sleeping would take too long"); + if let Some(max_wait) = max_wait { + if start.elapsed() > max_wait { + warn!("All rate limits exceeded. And sleeping would take too long"); + break; + } + + warn!("All rate limits exceeded. Sleeping"); + + // TODO: only make one of these sleep_untils + + tokio::select! { + _ = sleep_until(start + max_wait) => {break} + _ = sleep_until(retry_at) => {} + _ = watch_consensus_rpcs.changed() => { + watch_consensus_rpcs.borrow_and_update(); + } + } + + continue; + } else { + warn!("All rate limits exceeded."); break; } - - warn!("All rate limits exceeded. Sleeping"); - - tokio::select! { - _ = sleep_until(retry_at) => {} - _ = watch_consensus_rpcs.changed() => { - watch_consensus_rpcs.borrow_and_update(); - } - } - - continue; } } } @@ -1237,6 +1283,7 @@ impl Web3Rpcs { method: &str, params: &P, request_metadata: Option<&Arc>, + max_wait: Option, min_block_needed: Option<&U64>, max_block_needed: Option<&U64>, ) -> Web3ProxyResult { @@ -1248,6 +1295,7 @@ impl Web3Rpcs { method, params, request_metadata, + max_wait, min_block_needed, max_block_needed, ) @@ -1518,10 +1566,13 @@ mod tests { let (watch_consensus_rpcs_sender, _watch_consensus_rpcs_receiver) = watch::channel(None); let (watch_consensus_head_sender, _watch_consensus_head_receiver) = watch::channel(None); + let chain_id = 1; + // TODO: make a Web3Rpcs::new let rpcs = Web3Rpcs { block_sender: block_sender.clone(), by_name: RwLock::new(rpcs_by_name), + chain_id, name: "test".to_string(), watch_consensus_head_sender: Some(watch_consensus_head_sender), watch_consensus_rpcs_sender, @@ -1548,25 +1599,16 @@ mod tests { let mut consensus_finder = ConsensusFinder::new(None, None); - // process None so that - rpcs.process_block_from_rpc( - &authorization, - &mut consensus_finder, - None, - lagged_rpc.clone(), - &None, - ) - .await - .expect("its lagged, but it should still be seen as consensus if its the first to report"); - rpcs.process_block_from_rpc( - &authorization, - &mut consensus_finder, - None, - head_rpc.clone(), - &None, - ) - .await - .unwrap(); + consensus_finder + .process_block_from_rpc(&rpcs, &authorization, None, lagged_rpc.clone(), &None) + .await + .expect( + "its lagged, but it should still be seen as consensus if its the first to report", + ); + consensus_finder + .process_block_from_rpc(&rpcs, &authorization, None, head_rpc.clone(), &None) + .await + .unwrap(); // no head block because the rpcs haven't communicated through their channels assert!(rpcs.head_block_hash().is_none()); @@ -1607,16 +1649,17 @@ mod tests { .await .unwrap(); - // TODO: this is fragile - rpcs.process_block_from_rpc( - &authorization, - &mut consensus_finder, - Some(lagged_block.clone().try_into().unwrap()), - lagged_rpc.clone(), - &None, - ) - .await - .unwrap(); + // TODO: calling process_block_from_rpc and send_head_block_result seperate seems very fragile + consensus_finder + .process_block_from_rpc( + &rpcs, + &authorization, + Some(lagged_block.clone().try_into().unwrap()), + lagged_rpc.clone(), + &None, + ) + .await + .unwrap(); head_rpc .send_head_block_result( @@ -1628,15 +1671,16 @@ mod tests { .unwrap(); // TODO: this is fragile - rpcs.process_block_from_rpc( - &authorization, - &mut consensus_finder, - Some(lagged_block.clone().try_into().unwrap()), - head_rpc.clone(), - &None, - ) - .await - .unwrap(); + consensus_finder + .process_block_from_rpc( + &rpcs, + &authorization, + Some(lagged_block.clone().try_into().unwrap()), + head_rpc.clone(), + &None, + ) + .await + .unwrap(); // TODO: how do we spawn this and wait for it to process things? subscribe and watch consensus connections? // rpcs.process_incoming_blocks(&authorization, block_receiver, pending_tx_sender) @@ -1660,15 +1704,16 @@ mod tests { .unwrap(); // TODO: this is fragile - rpcs.process_block_from_rpc( - &authorization, - &mut consensus_finder, - Some(head_block.clone().try_into().unwrap()), - head_rpc.clone(), - &None, - ) - .await - .unwrap(); + consensus_finder + .process_block_from_rpc( + &rpcs, + &authorization, + Some(head_block.clone().try_into().unwrap()), + head_rpc.clone(), + &None, + ) + .await + .unwrap(); assert_eq!(rpcs.num_synced_rpcs(), 1); @@ -1799,9 +1844,12 @@ mod tests { let (watch_consensus_rpcs_sender, _) = watch::channel(None); let (watch_consensus_head_sender, _watch_consensus_head_receiver) = watch::channel(None); + let chain_id = 1; + let rpcs = Web3Rpcs { block_sender, by_name: RwLock::new(rpcs_by_name), + chain_id, name: "test".to_string(), watch_consensus_head_sender: Some(watch_consensus_head_sender), watch_consensus_rpcs_sender, @@ -1827,25 +1875,27 @@ mod tests { let mut connection_heads = ConsensusFinder::new(None, None); // min sum soft limit will require tier 2 - rpcs.process_block_from_rpc( - &authorization, - &mut connection_heads, - Some(head_block.clone()), - pruned_rpc.clone(), - &None, - ) - .await - .unwrap_err(); + connection_heads + .process_block_from_rpc( + &rpcs, + &authorization, + Some(head_block.clone()), + pruned_rpc.clone(), + &None, + ) + .await + .unwrap_err(); - rpcs.process_block_from_rpc( - &authorization, - &mut connection_heads, - Some(head_block.clone()), - archive_rpc.clone(), - &None, - ) - .await - .unwrap(); + connection_heads + .process_block_from_rpc( + &rpcs, + &authorization, + Some(head_block.clone()), + archive_rpc.clone(), + &None, + ) + .await + .unwrap(); assert_eq!(rpcs.num_synced_rpcs(), 2); @@ -1984,10 +2034,13 @@ mod tests { let (watch_consensus_rpcs_sender, _) = watch::channel(None); let (watch_consensus_head_sender, _watch_consensus_head_receiver) = watch::channel(None); + let chain_id = 1; + // TODO: make a Web3Rpcs::new let rpcs = Web3Rpcs { block_sender, by_name: RwLock::new(rpcs_by_name), + chain_id, name: "test".to_string(), watch_consensus_head_sender: Some(watch_consensus_head_sender), watch_consensus_rpcs_sender, @@ -2004,27 +2057,29 @@ mod tests { let authorization = Arc::new(Authorization::internal(None).unwrap()); - let mut connection_heads = ConsensusFinder::new(None, None); + let mut consensus_finder = ConsensusFinder::new(None, None); - rpcs.process_block_from_rpc( - &authorization, - &mut connection_heads, - Some(block_1.clone()), - mock_geth.clone(), - &None, - ) - .await - .unwrap(); + consensus_finder + .process_block_from_rpc( + &rpcs, + &authorization, + Some(block_1.clone()), + mock_geth.clone(), + &None, + ) + .await + .unwrap(); - rpcs.process_block_from_rpc( - &authorization, - &mut connection_heads, - Some(block_2.clone()), - mock_erigon_archive.clone(), - &None, - ) - .await - .unwrap(); + consensus_finder + .process_block_from_rpc( + &rpcs, + &authorization, + Some(block_2.clone()), + mock_erigon_archive.clone(), + &None, + ) + .await + .unwrap(); assert_eq!(rpcs.num_synced_rpcs(), 1); diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index 38c12c32..02641248 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -29,7 +29,7 @@ use std::hash::{Hash, Hasher}; use std::sync::atomic::{self, AtomicU32, AtomicU64, AtomicUsize}; use std::{cmp::Ordering, sync::Arc}; use tokio::sync::watch; -use tokio::time::{interval, sleep, sleep_until, timeout, Duration, Instant, MissedTickBehavior}; +use tokio::time::{interval, sleep, sleep_until, Duration, Instant, MissedTickBehavior}; use url::Url; /// An active connection to a Web3 RPC server like geth or erigon. @@ -318,17 +318,16 @@ impl Web3Rpc { // TODO: binary search between 90k and max? // TODO: start at 0 or 1? for block_data_limit in [0, 32, 64, 128, 256, 512, 1024, 90_000, u64::MAX] { - let head_block_num_future = self.internal_request::<_, U256>( - "eth_blockNumber", - &(), - // error here are expected, so keep the level low - Some(Level::Debug.into()), - ); - - let head_block_num = timeout(Duration::from_secs(5), head_block_num_future) + let head_block_num = self + .internal_request::<_, U256>( + "eth_blockNumber", + &(), + // error here are expected, so keep the level low + Some(Level::Debug.into()), + Some(Duration::from_secs(5)), + ) .await - .context("timeout fetching eth_blockNumber")? - .context("provider error")?; + .context("head_block_num error during check_block_data_limit")?; let maybe_archive_block = head_block_num.saturating_sub((block_data_limit).into()); @@ -349,6 +348,7 @@ impl Web3Rpc { )), // error here are expected, so keep the level low Some(Level::Trace.into()), + Some(Duration::from_secs(5)), ) .await; @@ -434,7 +434,12 @@ impl Web3Rpc { // TODO: what should the timeout be? should there be a request timeout? // trace!("waiting on chain id for {}", self); let found_chain_id: U64 = self - .internal_request("eth_chainId", &(), Some(Level::Trace.into())) + .internal_request( + "eth_chainId", + &(), + Some(Level::Trace.into()), + Some(Duration::from_secs(5)), + ) .await?; trace!("found_chain_id: {:#?}", found_chain_id); @@ -556,6 +561,7 @@ impl Web3Rpc { "eth_getTransactionByHash", &(txid,), error_handler, + Some(Duration::from_secs(5)), ) .await? .context("no transaction")?; @@ -577,6 +583,7 @@ impl Web3Rpc { "eth_getCode", &(to, block_number), error_handler, + Some(Duration::from_secs(5)), ) .await?; } else { @@ -821,6 +828,7 @@ impl Web3Rpc { &("latest", false), &authorization, Some(Level::Warn.into()), + Some(Duration::from_secs(5)), ) .await; @@ -839,7 +847,7 @@ impl Web3Rpc { .await?; } } else if self.http_provider.is_some() { - // there is a "watch_blocks" function, but a lot of public nodes do not support the necessary rpc endpoints + // there is a "watch_blocks" function, but a lot of public nodes (including llamanodes) do not support the necessary rpc endpoints // TODO: is 1/2 the block time okay? let mut i = interval(self.block_interval / 2); i.set_missed_tick_behavior(MissedTickBehavior::Delay); @@ -856,6 +864,7 @@ impl Web3Rpc { &("latest", false), &authorization, Some(Level::Warn.into()), + Some(Duration::from_secs(5)), ) .await; @@ -1057,10 +1066,11 @@ impl Web3Rpc { method: &str, params: &P, error_handler: Option, + max_wait: Option, ) -> Web3ProxyResult { let authorization = Default::default(); - self.authorized_request(method, params, &authorization, error_handler) + self.authorized_request(method, params, &authorization, error_handler, max_wait) .await } @@ -1070,10 +1080,11 @@ impl Web3Rpc { params: &P, authorization: &Arc, error_handler: Option, + max_wait: Option, ) -> Web3ProxyResult { // TODO: take max_wait as a function argument? let x = self - .wait_for_request_handle(authorization, None, error_handler) + .wait_for_request_handle(authorization, max_wait, error_handler) .await? .request::(method, params) .await?; diff --git a/web3_proxy/src/stats/mod.rs b/web3_proxy/src/stats/mod.rs index e2c2711c..5a6e2cc7 100644 --- a/web3_proxy/src/stats/mod.rs +++ b/web3_proxy/src/stats/mod.rs @@ -27,6 +27,7 @@ use migration::{Expr, LockType, OnConflict}; use num_traits::ToPrimitive; use parking_lot::Mutex; use std::borrow::Cow; +use std::mem; use std::num::NonZeroU64; use std::str::FromStr; use std::sync::atomic::{self, Ordering}; @@ -806,26 +807,25 @@ impl TryFrom for RpcQueryStats { x => x, }; - let method = metadata.method.clone(); - let chain_id = metadata.chain_id; - - let cu = ComputeUnit::new(&method, metadata.chain_id); + let cu = ComputeUnit::new(&metadata.method, metadata.chain_id); // TODO: get from config? a helper function? how should we pick this? - let usd_per_cu = match chain_id { - 137 => Decimal::from_str("0.000000692307692307"), - _ => Decimal::from_str("0.000000692307692307"), + let usd_per_cu = match metadata.chain_id { + 137 => Decimal::from_str("0.000000533333333333333"), + _ => Decimal::from_str("0.000000400000000000000"), }?; let cache_hit = !backend_rpcs_used.is_empty(); - let compute_unit_cost = cu.cost(cache_hit, usd_per_cu); + let compute_unit_cost = cu.cost(archive_request, cache_hit, usd_per_cu); + + let method = mem::take(&mut metadata.method); let x = Self { archive_request, authorization, backend_rpcs_used, - chain_id, + chain_id: metadata.chain_id, compute_unit_cost, error_response, method, diff --git a/web3_proxy/src/user_token.rs b/web3_proxy/src/user_token.rs index 6c017eb0..e5043249 100644 --- a/web3_proxy/src/user_token.rs +++ b/web3_proxy/src/user_token.rs @@ -1,8 +1,7 @@ -use std::str::FromStr; - use axum::headers::authorization::Bearer; use migration::sea_orm::prelude::Uuid; use serde::Serialize; +use std::str::FromStr; use ulid::Ulid; /// Key used for caching the user's login