diff --git a/Cargo.lock b/Cargo.lock index 649a1221..4d133e01 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2972,6 +2972,15 @@ dependencies = [ "windows-sys 0.42.0", ] +[[package]] +name = "notify-debouncer-mini" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e23e9fa24f094b143c1eb61f90ac6457de87be6987bc70746e0179f7dbc9007b" +dependencies = [ + "notify", +] + [[package]] name = "num" version = "0.4.0" @@ -5826,7 +5835,7 @@ dependencies = [ "log", "migration", "moka", - "notify", + "notify-debouncer-mini", "num", "num-traits", "once_cell", diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index bfc3337f..2c6c6801 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -49,7 +49,7 @@ ipnet = "2.7.1" itertools = "0.10.5" log = "0.4.17" moka = { version = "0.10.0", default-features = false, features = ["future"] } -notify = "5.1.0" +notify-debouncer-mini = { version = "0.2.0", default-features = false } num = "0.4.0" num-traits = "0.2.15" once_cell = { version = "1.17.1" } diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 2fd31965..48ae6d72 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -10,7 +10,7 @@ use crate::frontend::rpc_proxy_ws::ProxyMode; use crate::jsonrpc::{ JsonRpcForwardedResponse, JsonRpcForwardedResponseEnum, JsonRpcRequest, JsonRpcRequestEnum, }; -use crate::rpcs::blockchain::{Web3ProxyBlock}; +use crate::rpcs::blockchain::Web3ProxyBlock; use crate::rpcs::many::Web3Rpcs; use crate::rpcs::one::Web3Rpc; use crate::rpcs::transactions::TxStatus; @@ -26,7 +26,7 @@ use ethers::core::utils::keccak256; use ethers::prelude::{Address, Bytes, Transaction, TxHash, H256, U64}; use ethers::types::U256; use ethers::utils::rlp::{Decodable, Rlp}; -use futures::future::join_all; +use futures::future::{join_all, pending}; use futures::stream::{FuturesUnordered, StreamExt}; use hashbrown::{HashMap, HashSet}; use ipnet::IpNet; @@ -37,18 +37,20 @@ use migration::sea_orm::{ use migration::sea_query::table::ColumnDef; use migration::{Alias, DbErr, Migrator, MigratorTrait, Table}; use moka::future::Cache; +use notify_debouncer_mini::{new_debouncer, notify, DebounceEventResult}; use redis_rate_limiter::redis::AsyncCommands; use redis_rate_limiter::{redis, DeadpoolRuntime, RedisConfig, RedisPool, RedisRateLimiter}; use serde::Serialize; use serde_json::json; use serde_json::value::to_raw_value; -use std::fmt; use std::hash::{Hash, Hasher}; use std::net::IpAddr; use std::num::NonZeroU64; +use std::path::PathBuf; use std::str::FromStr; use std::sync::{atomic, Arc}; use std::time::Duration; +use std::{fmt, fs}; use tokio::sync::{broadcast, watch, Semaphore}; use tokio::task::JoinHandle; use tokio::time::{sleep, timeout}; @@ -365,11 +367,13 @@ impl Web3ProxyApp { /// The main entrypoint. pub async fn spawn( top_config: TopConfig, + top_config_path: Option, num_workers: usize, shutdown_receiver: broadcast::Receiver<()>, ) -> anyhow::Result { // safety checks on the config // while i would prefer this to be in a "apply_top_config" function, that is a larger refactor + // TODO: maybe don't spawn with a config at all. have all config updates come through an apply_top_config call if let Some(redirect) = &top_config.app.redirect_rpc_key_url { assert!( redirect.contains("{{rpc_key_id}}"), @@ -391,13 +395,15 @@ impl Web3ProxyApp { ); } + // these futures are key parts of the app. if they stop running, the app has encountered an irrecoverable error + let app_handles = FuturesUnordered::new(); + // we must wait for these to end on their own (and they need to subscribe to shutdown_sender) let important_background_handles = FuturesUnordered::new(); + // connect to the database and make sure the latest migrations have run let mut db_conn = None::; let mut db_replica = None::; - - // connect to mysql and make sure the latest migrations have run if let Some(db_url) = top_config.app.db_url.clone() { let db_min_connections = top_config .app @@ -620,8 +626,6 @@ impl Web3ProxyApp { .time_to_idle(Duration::from_secs(120)) .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()); - let app_handles = FuturesUnordered::new(); - // prepare a Web3Rpcs to hold all our balanced connections let (balanced_rpcs, balanced_rpcs_handle) = Web3Rpcs::spawn( top_config.app.chain_id, @@ -697,9 +701,64 @@ impl Web3ProxyApp { let app = Arc::new(app); - app.apply_top_config(top_config).await?; - // TODO: use channel for receiving new top_configs - // TODO: return a channel for sending new top_configs + // watch for config changes + // TODO: initial config reload should be from this channel. not from the call to spawn + if let Some(top_config_path) = top_config_path { + let (top_config_sender, mut top_config_receiver) = watch::channel(top_config); + + // TODO: i think the debouncer is exiting + let mut debouncer = new_debouncer( + Duration::from_secs(2), + None, + move |res: DebounceEventResult| match res { + Ok(events) => events.iter().for_each(|e| { + debug!("Event {:?} for {:?}", e.kind, e.path); + + // TODO: use tokio::fs here? + let new_top_config: String = fs::read_to_string(&e.path).unwrap(); + + let new_top_config: TopConfig = toml::from_str(&new_top_config).unwrap(); + + top_config_sender.send_replace(new_top_config); + }), + Err(errors) => errors + .iter() + .for_each(|e| error!("config watcher error {:#?}", e)), + }, + ) + .context("failed starting debouncer config watcher")?; + + // Add a path to be watched. All files and directories at that path and below will be monitored for changes. + info!("watching config @ {}", top_config_path.display()); + debouncer + .watcher() + .watch(top_config_path.as_path(), notify::RecursiveMode::Recursive) + .context("failed starting config watcher")?; + + let app = app.clone(); + let config_handle = tokio::spawn(async move { + loop { + let new_top_config = top_config_receiver.borrow_and_update().to_owned(); + + app.apply_top_config(new_top_config) + .await + .context("failed applying new top_config")?; + + top_config_receiver + .changed() + .await + .context("failed awaiting top_config change")?; + + info!("config changed"); + } + }); + + app_handles.push(config_handle); + } else { + // no path to config, so we don't know what to watch + // this isn't an error. the config might just be in memory + app.apply_top_config(top_config).await?; + } Ok((app, app_handles, important_background_handles).into()) } diff --git a/web3_proxy/src/bin/web3_proxy_cli/main.rs b/web3_proxy/src/bin/web3_proxy_cli/main.rs index fba010b2..5dc564ac 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/main.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/main.rs @@ -6,11 +6,11 @@ mod check_config; mod count_users; mod create_key; mod create_user; -mod daemon; mod drop_migration_lock; mod list_user_tier; mod pagerduty; mod popularity_contest; +mod proxyd; mod rpc_accounting; mod sentryd; mod transfer_key; @@ -80,7 +80,7 @@ enum SubCommand { DropMigrationLock(drop_migration_lock::DropMigrationLockSubCommand), Pagerduty(pagerduty::PagerdutySubCommand), PopularityContest(popularity_contest::PopularityContestSubCommand), - Proxyd(daemon::ProxydSubCommand), + Proxyd(proxyd::ProxydSubCommand), RpcAccounting(rpc_accounting::RpcAccountingSubCommand), Sentryd(sentryd::SentrydSubCommand), TransferKey(transfer_key::TransferKeySubCommand), @@ -92,6 +92,9 @@ enum SubCommand { } fn main() -> anyhow::Result<()> { + // this probably won't matter for us in docker, but better safe than sorry + fdlimit::raise_fd_limit(); + #[cfg(feature = "deadlock")] { // spawn a thread for deadlock detection @@ -142,9 +145,6 @@ fn main() -> anyhow::Result<()> { .join(","), }; - // this probably won't matter for us in docker, but better safe than sorry - fdlimit::raise_fd_limit(); - let mut cli_config: Web3ProxyCli = argh::from_env(); if cli_config.config.is_none() && cli_config.db_url.is_none() && cli_config.sentry_url.is_none() @@ -154,12 +154,13 @@ fn main() -> anyhow::Result<()> { cli_config.config = Some("./config/development.toml".to_string()); } - let top_config = if let Some(top_config_path) = cli_config.config.clone() { + let (top_config, top_config_path) = if let Some(top_config_path) = cli_config.config.clone() { let top_config_path = Path::new(&top_config_path) .canonicalize() .context(format!("checking for config at {}", top_config_path))?; - let top_config: String = fs::read_to_string(top_config_path)?; + let top_config: String = fs::read_to_string(top_config_path.clone())?; + let mut top_config: TopConfig = toml::from_str(&top_config)?; // TODO: this doesn't seem to do anything @@ -184,9 +185,9 @@ fn main() -> anyhow::Result<()> { } } - Some(top_config) + (Some(top_config), Some(top_config_path)) } else { - None + (None, None) }; let logger = env_logger::builder().parse_filters(&rust_log).build(); @@ -343,8 +344,10 @@ fn main() -> anyhow::Result<()> { } SubCommand::Proxyd(x) => { let top_config = top_config.expect("--config is required to run proxyd"); + let top_config_path = + top_config_path.expect("path must be set if top_config exists"); - x.main(top_config, num_workers).await + x.main(top_config, top_config_path, num_workers).await } SubCommand::DropMigrationLock(x) => { let db_url = cli_config diff --git a/web3_proxy/src/bin/web3_proxy_cli/daemon.rs b/web3_proxy/src/bin/web3_proxy_cli/proxyd.rs similarity index 94% rename from web3_proxy/src/bin/web3_proxy_cli/daemon.rs rename to web3_proxy/src/bin/web3_proxy_cli/proxyd.rs index 465e545e..e8453cfa 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/daemon.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/proxyd.rs @@ -1,5 +1,7 @@ #![forbid(unsafe_code)] +use std::path::PathBuf; + use argh::FromArgs; use futures::StreamExt; use log::{error, info, warn}; @@ -24,11 +26,19 @@ pub struct ProxydSubCommand { } impl ProxydSubCommand { - pub async fn main(self, top_config: TopConfig, num_workers: usize) -> anyhow::Result<()> { + pub async fn main( + self, + top_config: TopConfig, + top_config_path: PathBuf, + num_workers: usize, + ) -> anyhow::Result<()> { let (shutdown_sender, _) = broadcast::channel(1); + // TODO: i think there is a small race. if config_path changes + run( top_config, + Some(top_config_path), self.port, self.prometheus_port, num_workers, @@ -40,6 +50,7 @@ impl ProxydSubCommand { async fn run( top_config: TopConfig, + top_config_path: Option, frontend_port: u16, prometheus_port: u16, num_workers: usize, @@ -54,8 +65,13 @@ async fn run( let mut shutdown_receiver = shutdown_sender.subscribe(); // start the main app - let mut spawned_app = - Web3ProxyApp::spawn(top_config, num_workers, shutdown_sender.subscribe()).await?; + let mut spawned_app = Web3ProxyApp::spawn( + top_config, + top_config_path, + num_workers, + shutdown_sender.subscribe(), + ) + .await?; // start the prometheus metrics port let prometheus_handle = tokio::spawn(metrics_frontend::serve( @@ -246,6 +262,7 @@ mod tests { tokio::spawn(async move { run( top_config, + None, frontend_port, prometheus_port, 2, diff --git a/web3_proxy/src/bin/web3_proxy_cli/sentryd/mod.rs b/web3_proxy/src/bin/web3_proxy_cli/sentryd/mod.rs index 90398f20..ed8274b3 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/sentryd/mod.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/sentryd/mod.rs @@ -97,18 +97,18 @@ impl SentrydSubCommand { .or_else(|| top_config.map(|x| x.app.chain_id)) .context("--config or --chain-id required")?; - let primary_proxy = self.web3_proxy.trim_end_matches("/").to_string(); + let primary_proxy = self.web3_proxy.trim_end_matches('/').to_string(); let other_proxy: Vec<_> = self .other_proxy .into_iter() - .map(|x| x.trim_end_matches("/").to_string()) + .map(|x| x.trim_end_matches('/').to_string()) .collect(); let other_rpc: Vec<_> = self .other_rpc .into_iter() - .map(|x| x.trim_end_matches("/").to_string()) + .map(|x| x.trim_end_matches('/').to_string()) .collect(); let seconds = self.seconds.unwrap_or(60); diff --git a/web3_proxy/src/jsonrpc.rs b/web3_proxy/src/jsonrpc.rs index 0356362a..7a601c20 100644 --- a/web3_proxy/src/jsonrpc.rs +++ b/web3_proxy/src/jsonrpc.rs @@ -211,6 +211,7 @@ impl JsonRpcForwardedResponse { error: Some(JsonRpcErrorData { code: code.unwrap_or(-32099), message, + // TODO: accept data as an argument data: None, }), } diff --git a/web3_proxy/src/pagerduty.rs b/web3_proxy/src/pagerduty.rs index 9eacbff9..8716df90 100644 --- a/web3_proxy/src/pagerduty.rs +++ b/web3_proxy/src/pagerduty.rs @@ -124,7 +124,7 @@ pub fn pagerduty_alert_for_config( ) -> AlertTrigger { let chain_id = top_config.app.chain_id; - let client_url = top_config.app.redirect_public_url.clone(); + let client_url = top_config.app.redirect_public_url; pagerduty_alert( Some(chain_id), @@ -140,6 +140,7 @@ pub fn pagerduty_alert_for_config( ) } +#[allow(clippy::too_many_arguments)] pub fn pagerduty_alert( chain_id: Option, class: Option, @@ -186,6 +187,6 @@ pub fn pagerduty_alert( images: None, links: None, client: Some(client), - client_url: client_url, + client_url, } } diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index ef75f72b..3494bbeb 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -8,7 +8,7 @@ use crate::{config::BlockAndRpc, jsonrpc::JsonRpcRequest}; use anyhow::{anyhow, Context}; use derive_more::From; use ethers::prelude::{Block, TxHash, H256, U64}; -use log::{debug, error, trace, warn, Level}; +use log::{debug, trace, warn, Level}; use moka::future::Cache; use serde::Serialize; use serde_json::json; @@ -72,7 +72,7 @@ impl Web3ProxyBlock { if block_timestamp < now { // this server is still syncing from too far away to serve requests // u64 is safe because ew checked equality above - (now - block_timestamp).as_secs() as u64 + (now - block_timestamp).as_secs() } else { 0 } @@ -392,8 +392,7 @@ impl Web3Rpcs { .await .context("no consensus head block!") .map_err(|err| { - self.watch_consensus_rpcs_sender - .send_replace(Arc::new(Default::default())); + self.watch_consensus_rpcs_sender.send_replace(None); err })?; @@ -414,100 +413,73 @@ impl Web3Rpcs { let old_consensus_head_connections = self .watch_consensus_rpcs_sender - .send_replace(Arc::new(new_synced_connections)); + .send_replace(Some(Arc::new(new_synced_connections))); let backups_voted_str = if backups_needed { "B " } else { "" }; - if let Some(consensus_head_block) = consensus_head_block { - match &old_consensus_head_connections.head_block { - None => { - debug!( - "first {}/{} {}{}/{}/{} block={}, rpc={}", - consensus_tier, - total_tiers, - backups_voted_str, - num_consensus_rpcs, - num_active_rpcs, - total_rpcs, - consensus_head_block, - rpc, - ); + match old_consensus_head_connections.as_ref() { + None => { + debug!( + "first {}/{} {}{}/{}/{} block={}, rpc={}", + consensus_tier, + total_tiers, + backups_voted_str, + num_consensus_rpcs, + num_active_rpcs, + total_rpcs, + consensus_head_block, + rpc, + ); - if backups_needed { - // TODO: what else should be in this error? - warn!("Backup RPCs are in use!"); - } - - // this should already be cached - let consensus_head_block = - self.try_cache_block(consensus_head_block, true).await?; - - watch_consensus_head_sender - .send(Some(consensus_head_block)) - .context( - "watch_consensus_head_sender failed sending first consensus_head_block", - )?; + if backups_needed { + // TODO: what else should be in this error? + warn!("Backup RPCs are in use!"); } - Some(old_head_block) => { - // TODO: do this log item better - let rpc_head_str = new_block - .map(|x| x.to_string()) - .unwrap_or_else(|| "None".to_string()); - match consensus_head_block.number().cmp(&old_head_block.number()) { - Ordering::Equal => { - // multiple blocks with the same fork! - if consensus_head_block.hash() == old_head_block.hash() { - // no change in hash. no need to use watch_consensus_head_sender - // TODO: trace level if rpc is backup - debug!( - "con {}/{} {}{}/{}/{} con={} rpc={}@{}", - consensus_tier, - total_tiers, - backups_voted_str, - num_consensus_rpcs, - num_active_rpcs, - total_rpcs, - consensus_head_block, - rpc, - rpc_head_str, - ) - } else { - // hash changed - if backups_needed { - // TODO: what else should be in this error? - warn!("Backup RPCs are in use!"); - } + // this should already be cached + let consensus_head_block = self.try_cache_block(consensus_head_block, true).await?; - debug!( - "unc {}/{} {}{}/{}/{} con_head={} old={} rpc={}@{}", - consensus_tier, - total_tiers, - backups_voted_str, - num_consensus_rpcs, - num_active_rpcs, - total_rpcs, - consensus_head_block, - old_head_block, - rpc, - rpc_head_str, - ); + watch_consensus_head_sender + .send(Some(consensus_head_block)) + .context( + "watch_consensus_head_sender failed sending first consensus_head_block", + )?; + } + Some(old_consensus_connections) => { + let old_head_block = &old_consensus_connections.head_block; - let consensus_head_block = self - .try_cache_block(consensus_head_block, true) - .await - .context("save consensus_head_block as heaviest chain")?; + // TODO: do this log item better + let rpc_head_str = new_block + .map(|x| x.to_string()) + .unwrap_or_else(|| "None".to_string()); - watch_consensus_head_sender - .send(Some(consensus_head_block)) - .context("watch_consensus_head_sender failed sending uncled consensus_head_block")?; + match consensus_head_block.number().cmp(old_head_block.number()) { + Ordering::Equal => { + // multiple blocks with the same fork! + if consensus_head_block.hash() == old_head_block.hash() { + // no change in hash. no need to use watch_consensus_head_sender + // TODO: trace level if rpc is backup + debug!( + "con {}/{} {}{}/{}/{} con={} rpc={}@{}", + consensus_tier, + total_tiers, + backups_voted_str, + num_consensus_rpcs, + num_active_rpcs, + total_rpcs, + consensus_head_block, + rpc, + rpc_head_str, + ) + } else { + // hash changed + if backups_needed { + // TODO: what else should be in this error? + warn!("Backup RPCs are in use!"); } - } - Ordering::Less => { - // this is unlikely but possible - // TODO: better log - warn!( - "chain rolled back {}/{} {}{}/{}/{} con={} old={} rpc={}@{}", + + debug!( + "unc {}/{} {}{}/{}/{} con_head={} old={} rpc={}@{}", consensus_tier, total_tiers, backups_voted_str, @@ -520,82 +492,73 @@ impl Web3Rpcs { rpc_head_str, ); - if backups_needed { - // TODO: what else should be in this error? - warn!("Backup RPCs are in use!"); - } - - // TODO: tell save_block to remove any higher block numbers from the cache. not needed because we have other checks on requested blocks being > head, but still seems like a good idea let consensus_head_block = self .try_cache_block(consensus_head_block, true) .await - .context( - "save_block sending consensus_head_block as heaviest chain", - )?; + .context("save consensus_head_block as heaviest chain")?; watch_consensus_head_sender .send(Some(consensus_head_block)) - .context("watch_consensus_head_sender failed sending rollback consensus_head_block")?; - } - Ordering::Greater => { - debug!( - "new {}/{} {}{}/{}/{} con={} rpc={}@{}", - consensus_tier, - total_tiers, - backups_voted_str, - num_consensus_rpcs, - num_active_rpcs, - total_rpcs, - consensus_head_block, - rpc, - rpc_head_str, - ); - - if backups_needed { - // TODO: what else should be in this error? - warn!("Backup RPCs are in use!"); - } - - let consensus_head_block = - self.try_cache_block(consensus_head_block, true).await?; - - watch_consensus_head_sender.send(Some(consensus_head_block)).context("watch_consensus_head_sender failed sending new consensus_head_block")?; + .context("watch_consensus_head_sender failed sending uncled consensus_head_block")?; } } - } - } - } else { - // TODO: do this log item better - let rpc_head_str = new_block - .map(|x| x.to_string()) - .unwrap_or_else(|| "None".to_string()); + Ordering::Less => { + // this is unlikely but possible + // TODO: better log + warn!( + "chain rolled back {}/{} {}{}/{}/{} con={} old={} rpc={}@{}", + consensus_tier, + total_tiers, + backups_voted_str, + num_consensus_rpcs, + num_active_rpcs, + total_rpcs, + consensus_head_block, + old_head_block, + rpc, + rpc_head_str, + ); - if num_active_rpcs >= self.min_head_rpcs { - // no consensus!!! - error!( - "non {}/{} {}{}/{}/{} rpc={}@{}", - consensus_tier, - total_tiers, - backups_voted_str, - num_consensus_rpcs, - num_active_rpcs, - total_rpcs, - rpc, - rpc_head_str, - ); - } else { - // no consensus, but we do not have enough rpcs connected yet to panic - debug!( - "non {}/{} {}{}/{}/{} rpc={}@{}", - consensus_tier, - total_tiers, - backups_voted_str, - num_consensus_rpcs, - num_active_rpcs, - total_rpcs, - rpc, - rpc_head_str, - ); + if backups_needed { + // TODO: what else should be in this error? + warn!("Backup RPCs are in use!"); + } + + // TODO: tell save_block to remove any higher block numbers from the cache. not needed because we have other checks on requested blocks being > head, but still seems like a good idea + let consensus_head_block = self + .try_cache_block(consensus_head_block, true) + .await + .context("save_block sending consensus_head_block as heaviest chain")?; + + watch_consensus_head_sender + .send(Some(consensus_head_block)) + .context("watch_consensus_head_sender failed sending rollback consensus_head_block")?; + } + Ordering::Greater => { + debug!( + "new {}/{} {}{}/{}/{} con={} rpc={}@{}", + consensus_tier, + total_tiers, + backups_voted_str, + num_consensus_rpcs, + num_active_rpcs, + total_rpcs, + consensus_head_block, + rpc, + rpc_head_str, + ); + + if backups_needed { + // TODO: what else should be in this error? + warn!("Backup RPCs are in use!"); + } + + let consensus_head_block = + self.try_cache_block(consensus_head_block, true).await?; + + watch_consensus_head_sender.send(Some(consensus_head_block)).context("watch_consensus_head_sender failed sending new consensus_head_block")?; + } + } } } diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index fd4036db..d9564df3 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -16,10 +16,11 @@ use tokio::time::Instant; /// A collection of Web3Rpcs that are on the same block. /// Serialize is so we can print it on our debug endpoint -#[derive(Clone, Default, Serialize)] +#[derive(Clone, Serialize)] pub struct ConsensusWeb3Rpcs { + // TODO: tier should be an option, or we should have consensus be stored as an Option pub(super) tier: u64, - pub(super) head_block: Option, + pub(super) head_block: Web3ProxyBlock, // TODO: this should be able to serialize, but it isn't #[serde(skip_serializing)] pub(super) rpcs: Vec>, @@ -69,11 +70,23 @@ impl Web3Rpcs { } pub fn synced(&self) -> bool { - !self.watch_consensus_rpcs_sender.borrow().rpcs.is_empty() + let consensus = self.watch_consensus_rpcs_sender.borrow(); + + if let Some(consensus) = consensus.as_ref() { + !consensus.rpcs.is_empty() + } else { + false + } } pub fn num_synced_rpcs(&self) -> usize { - self.watch_consensus_rpcs_sender.borrow().rpcs.len() + let consensus = self.watch_consensus_rpcs_sender.borrow(); + + if let Some(consensus) = consensus.as_ref() { + consensus.rpcs.len() + } else { + 0 + } } } @@ -100,6 +113,10 @@ impl ConnectionsGroup { self.rpc_name_to_block.len() } + pub fn is_empty(&self) -> bool { + self.rpc_name_to_block.is_empty() + } + fn remove(&mut self, rpc_name: &str) -> Option { if let Some(removed_block) = self.rpc_name_to_block.remove(rpc_name) { match self.highest_block.as_mut() { @@ -255,14 +272,14 @@ impl ConnectionsGroup { // not enough rpcs on this block. check the parent block match web3_rpcs - .block(authorization, &maybe_head_block.parent_hash(), None) + .block(authorization, maybe_head_block.parent_hash(), None) .await { Ok(parent_block) => { // trace!( // child=%maybe_head_hash, parent=%parent_block.hash.unwrap(), "avoiding thundering herd. checking consensus on parent block", // ); - maybe_head_block = parent_block.into(); + maybe_head_block = parent_block; continue; } Err(err) => { @@ -325,7 +342,7 @@ impl ConnectionsGroup { Ok(ConsensusWeb3Rpcs { tier: *tier, - head_block: Some(maybe_head_block), + head_block: maybe_head_block, rpcs, backups_voted: backup_rpcs_voted, backups_needed: primary_rpcs_voted.is_none(), @@ -371,6 +388,10 @@ impl ConsensusFinder { self.tiers.len() } + pub fn is_empty(&self) -> bool { + self.tiers.is_empty() + } + /// get the ConnectionsGroup that contains all rpcs /// panics if there are no tiers pub fn all_rpcs_group(&self) -> Option<&ConnectionsGroup> { @@ -457,26 +478,18 @@ impl ConsensusFinder { } if let Some(prev_block) = self.insert(&rpc, rpc_head_block.clone()).await { - if prev_block.hash() == rpc_head_block.hash() { - // this block was already sent by this rpc. return early - false - } else { - // new block for this rpc - true - } + // false if this block was already sent by this rpc. return early + // true if new block for this rpc + prev_block.hash() != rpc_head_block.hash() } else { // first block for this rpc true } } None => { - if self.remove(&rpc).is_none() { - // this rpc was already removed - false - } else { - // rpc head changed from being synced to not - true - } + // false if this rpc was already removed + // true if rpc head changed from being synced to not + self.remove(&rpc).is_some() } }; diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index d323b554..0de9b666 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -50,7 +50,7 @@ pub struct Web3Rpcs { /// TODO: document that this is a watch sender and not a broadcast! if things get busy, blocks might get missed /// TODO: why is watch_consensus_head_sender in an Option, but this one isn't? /// Geth's subscriptions have the same potential for skipping blocks. - pub(super) watch_consensus_rpcs_sender: watch::Sender>, + pub(super) watch_consensus_rpcs_sender: watch::Sender>>, /// this head receiver makes it easy to wait until there is a new block pub(super) watch_consensus_head_sender: Option>>, pub(super) pending_transaction_cache: @@ -125,7 +125,7 @@ impl Web3Rpcs { // trace!("http interval ready"); - if let Err(_) = sender.send(()) { + if sender.send(()).is_err() { // errors are okay. they mean that all receivers have been dropped, or the rpcs just haven't started yet trace!("no http receivers"); }; @@ -181,7 +181,7 @@ impl Web3Rpcs { max_block_lag, }); - let authorization = Arc::new(Authorization::internal(db_conn.clone())?); + let authorization = Arc::new(Authorization::internal(db_conn)?); let handle = { let connections = connections.clone(); @@ -478,12 +478,14 @@ impl Web3Rpcs { let usable_rpcs_by_tier_and_head_number: BTreeMap<(u64, Option), Vec>> = { let synced_connections = self.watch_consensus_rpcs_sender.borrow().clone(); - let (head_block_num, head_block_age) = - if let Some(head_block) = synced_connections.head_block.as_ref() { - (head_block.number(), head_block.age()) - } else { - return Ok(OpenRequestResult::NotReady); - }; + if synced_connections.is_none() { + return Ok(OpenRequestResult::NotReady); + } + let synced_connections = + synced_connections.expect("synced_connections can't be None here"); + + let head_block_num = synced_connections.head_block.number(); + let head_block_age = synced_connections.head_block.age(); let needed_blocks_comparison = match (min_block_needed, max_block_needed) { (None, None) => { @@ -530,28 +532,20 @@ impl Web3Rpcs { .values() .filter(|x| { // TODO: move a bunch of this onto a rpc.is_synced function + #[allow(clippy::if_same_then_else)] if skip.contains(x) { // we've already tried this server or have some other reason to skip it false } else if max_block_needed - .and_then(|max_block_needed| { - Some(!x.has_block_data(max_block_needed)) - }) + .map(|max_block_needed| !x.has_block_data(max_block_needed)) .unwrap_or(false) { // server does not have the max block false - } else if min_block_needed - .and_then(|min_block_needed| { - Some(!x.has_block_data(min_block_needed)) - }) - .unwrap_or(false) - { - // server does not have the min block - false } else { - // server has the block we need! - true + !min_block_needed + .map(|min_block_needed| !x.has_block_data(min_block_needed)) + .unwrap_or(false) } }) .cloned() @@ -599,9 +593,7 @@ impl Web3Rpcs { } trace!("not skipped!"); - m.entry(key.clone()) - .or_insert_with(Vec::new) - .push(x.clone()); + m.entry(key).or_insert_with(Vec::new).push(x.clone()); } } cmp::Ordering::Greater => { @@ -703,44 +695,9 @@ impl Web3Rpcs { min_block_needed: Option<&U64>, max_block_needed: Option<&U64>, max_count: Option, - always_include_backups: bool, - ) -> Result, Option> { - if !always_include_backups { - if let Ok(without_backups) = self - ._all_connections( - false, - authorization, - min_block_needed, - max_block_needed, - max_count, - ) - .await - { - return Ok(without_backups); - } - } - - self._all_connections( - true, - authorization, - min_block_needed, - max_block_needed, - max_count, - ) - .await - } - - async fn _all_connections( - &self, allow_backups: bool, - authorization: &Arc, - min_block_needed: Option<&U64>, - max_block_needed: Option<&U64>, - max_count: Option, ) -> Result, Option> { let mut earliest_retry_at = None; - // TODO: with capacity? - let mut selected_rpcs = vec![]; let mut max_count = if let Some(max_count) = max_count { max_count @@ -748,63 +705,83 @@ impl Web3Rpcs { self.by_name.read().len() }; + trace!("max_count: {}", max_count); + + let mut selected_rpcs = Vec::with_capacity(max_count); + let mut tried = HashSet::new(); - let mut synced_conns = self.watch_consensus_rpcs_sender.borrow().rpcs.clone(); + let mut synced_rpcs = { + let synced_rpcs = self.watch_consensus_rpcs_sender.borrow(); + + if let Some(synced_rpcs) = synced_rpcs.as_ref() { + synced_rpcs.rpcs.clone() + } else { + vec![] + } + }; // synced connections are all on the same block. sort them by tier with higher soft limits first - synced_conns.sort_by_cached_key(rpc_sync_status_sort_key); + synced_rpcs.sort_by_cached_key(rpc_sync_status_sort_key); + + trace!("synced_rpcs: {:#?}", synced_rpcs); // if there aren't enough synced connections, include more connections - // TODO: only do this sorting if the synced_conns isn't enough - let mut all_conns: Vec<_> = self.by_name.read().values().cloned().collect(); - all_conns.sort_by_cached_key(rpc_sync_status_sort_key); + // TODO: only do this sorting if the synced_rpcs isn't enough + let mut all_rpcs: Vec<_> = self.by_name.read().values().cloned().collect(); + all_rpcs.sort_by_cached_key(rpc_sync_status_sort_key); - for connection in itertools::chain(synced_conns, all_conns) { + trace!("all_rpcs: {:#?}", all_rpcs); + + for rpc in itertools::chain(synced_rpcs, all_rpcs) { if max_count == 0 { break; } - if tried.contains(&connection.name) { + if tried.contains(&rpc.name) { continue; } - tried.insert(connection.name.clone()); + trace!("trying {}", rpc); - if !allow_backups && connection.backup { + tried.insert(rpc.name.clone()); + + if !allow_backups && rpc.backup { + trace!("{} is a backup. skipping", rpc); continue; } if let Some(block_needed) = min_block_needed { - if !connection.has_block_data(block_needed) { + if !rpc.has_block_data(block_needed) { + trace!("{} is missing min_block_needed. skipping", rpc); continue; } } if let Some(block_needed) = max_block_needed { - if !connection.has_block_data(block_needed) { + if !rpc.has_block_data(block_needed) { + trace!("{} is missing max_block_needed. skipping", rpc); continue; } } // check rate limits and increment our connection counter - match connection.try_request_handle(authorization, None).await { + match rpc.try_request_handle(authorization, None).await { Ok(OpenRequestResult::RetryAt(retry_at)) => { // this rpc is not available. skip it + trace!("{} is rate limited. skipping", rpc); earliest_retry_at = earliest_retry_at.min(Some(retry_at)); } Ok(OpenRequestResult::Handle(handle)) => { + trace!("{} is available", rpc); max_count -= 1; selected_rpcs.push(handle) } Ok(OpenRequestResult::NotReady) => { - warn!("no request handle for {}", connection) + warn!("no request handle for {}", rpc) } Err(err) => { - warn!( - "error getting request handle for {}. err={:?}", - connection, err - ) + warn!("error getting request handle for {}. err={:?}", rpc, err) } } } @@ -1015,6 +992,7 @@ impl Web3Rpcs { if num_skipped == 0 { error!("No servers synced ({} known). None skipped", num_conns); + // TODO: what error code? Ok(JsonRpcForwardedResponse::from_str( "No servers synced", Some(-32000), @@ -1038,6 +1016,7 @@ impl Web3Rpcs { } /// be sure there is a timeout on this or it might loop forever + #[allow(clippy::too_many_arguments)] pub async fn try_send_all_synced_connections( &self, authorization: &Arc, @@ -1180,9 +1159,14 @@ impl Serialize for Web3Rpcs { } { - let consensus_connections = self.watch_consensus_rpcs_sender.borrow().clone(); - // TODO: rename synced_connections to consensus_connections? - state.serialize_field("synced_connections", &consensus_connections)?; + let consensus_rpcs = self.watch_consensus_rpcs_sender.borrow(); + // TODO: rename synced_connections to consensus_rpcs + + if let Some(consensus_rpcs) = consensus_rpcs.as_ref() { + state.serialize_field("synced_connections", consensus_rpcs)?; + } else { + state.serialize_field("synced_connections", &None::<()>)?; + } } self.blocks_by_hash.sync(); @@ -1369,11 +1353,11 @@ mod tests { ..Default::default() }; - assert!(head_rpc.has_block_data(&lagged_block.number())); - assert!(head_rpc.has_block_data(&head_block.number())); + assert!(head_rpc.has_block_data(lagged_block.number())); + assert!(head_rpc.has_block_data(head_block.number())); - assert!(lagged_rpc.has_block_data(&lagged_block.number())); - assert!(!lagged_rpc.has_block_data(&head_block.number())); + assert!(lagged_rpc.has_block_data(lagged_block.number())); + assert!(!lagged_rpc.has_block_data(head_block.number())); let head_rpc = Arc::new(head_rpc); let lagged_rpc = Arc::new(lagged_rpc); @@ -1383,10 +1367,12 @@ mod tests { (lagged_rpc.name.clone(), lagged_rpc.clone()), ]); - let (block_sender, _) = flume::unbounded(); + let (block_sender, _block_receiver) = flume::unbounded(); let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded(); - let (watch_consensus_rpcs_sender, _) = watch::channel(Default::default()); - let (watch_consensus_head_sender, _) = watch::channel(Default::default()); + let (watch_consensus_rpcs_sender, _watch_consensus_rpcs_receiver) = + watch::channel(Default::default()); + let (watch_consensus_head_sender, _watch_consensus_head_receiver) = + watch::channel(Default::default()); // TODO: make a Web3Rpcs::new let rpcs = Web3Rpcs { @@ -1570,8 +1556,8 @@ mod tests { ..Default::default() }; - assert!(pruned_rpc.has_block_data(&head_block.number())); - assert!(archive_rpc.has_block_data(&head_block.number())); + assert!(pruned_rpc.has_block_data(head_block.number())); + assert!(archive_rpc.has_block_data(head_block.number())); assert!(!pruned_rpc.has_block_data(&1.into())); assert!(archive_rpc.has_block_data(&1.into())); @@ -1640,7 +1626,7 @@ mod tests { // best_synced_backend_connection requires servers to be synced with the head block // TODO: test with and without passing the head_block.number? let best_available_server = rpcs - .best_available_rpc(&authorization, None, &[], Some(&head_block.number()), None) + .best_available_rpc(&authorization, None, &[], Some(head_block.number()), None) .await; debug!("best_available_server: {:#?}", best_available_server); diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index b6657eac..b4cf4af7 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -558,10 +558,8 @@ impl Web3Rpc { drop(unlocked_provider); info!("successfully connected to {}", self); - } else { - if self.provider.read().await.is_none() { - return Err(anyhow!("failed waiting for client")); - } + } else if self.provider.read().await.is_none() { + return Err(anyhow!("failed waiting for client")); }; Ok(()) @@ -604,7 +602,7 @@ impl Web3Rpc { { let mut head_block = self.head_block.write(); - let _ = head_block.insert(new_head_block.clone().into()); + let _ = head_block.insert(new_head_block.clone()); } if self.block_data_limit() == U64::zero() { @@ -712,7 +710,7 @@ impl Web3Rpc { let head_block = conn.head_block.read().clone(); if let Some((block_number, txid)) = head_block.and_then(|x| { - let block = x.block.clone(); + let block = x.block; let block_number = block.number?; let txid = block.transactions.last().cloned()?; @@ -1146,7 +1144,7 @@ impl Web3Rpc { } if let Some(hard_limit_until) = self.hard_limit_until.as_ref() { - let hard_limit_ready = hard_limit_until.borrow().clone(); + let hard_limit_ready = *hard_limit_until.borrow(); let now = Instant::now(); @@ -1178,7 +1176,7 @@ impl Web3Rpc { } if let Some(hard_limit_until) = self.hard_limit_until.as_ref() { - hard_limit_until.send_replace(retry_at.clone()); + hard_limit_until.send_replace(retry_at); } return Ok(OpenRequestResult::RetryAt(retry_at)); @@ -1355,7 +1353,7 @@ mod tests { assert!(x.has_block_data(&0.into())); assert!(x.has_block_data(&1.into())); - assert!(x.has_block_data(&head_block.number())); + assert!(x.has_block_data(head_block.number())); assert!(!x.has_block_data(&(head_block.number() + 1))); assert!(!x.has_block_data(&(head_block.number() + 1000))); } @@ -1394,7 +1392,7 @@ mod tests { assert!(!x.has_block_data(&1.into())); assert!(!x.has_block_data(&(head_block.number() - block_data_limit - 1))); assert!(x.has_block_data(&(head_block.number() - block_data_limit))); - assert!(x.has_block_data(&head_block.number())); + assert!(x.has_block_data(head_block.number())); assert!(!x.has_block_data(&(head_block.number() + 1))); assert!(!x.has_block_data(&(head_block.number() + 1000))); }