diff --git a/Cargo.lock b/Cargo.lock index 8f1a8ee9..b131edf9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -105,12 +105,6 @@ dependencies = [ "backtrace", ] -[[package]] -name = "arc-swap" -version = "1.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bddcadddf5e9015d310179a59bb28c4d4b9920ad0f11e8e14dbadf654890c9a6" - [[package]] name = "argh" version = "0.1.10" @@ -5574,7 +5568,6 @@ name = "web3_proxy" version = "0.13.0" dependencies = [ "anyhow", - "arc-swap", "argh", "axum", "axum-client-ip", diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index 32cfe92c..ecd7015d 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -20,7 +20,6 @@ redis-rate-limiter = { path = "../redis-rate-limiter" } thread-fast-rng = { path = "../thread-fast-rng" } anyhow = { version = "1.0.68", features = ["backtrace"] } -arc-swap = "1.6.0" argh = "0.1.10" axum = { version = "0.6.3", features = ["headers", "ws"] } axum-client-ip = "0.3.1" diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 2e05ace5..0c60ce81 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -188,7 +188,7 @@ pub struct Web3ProxyApp { response_cache: ResponseCache, // don't drop this or the sender will stop working // TODO: broadcast channel instead? - head_block_receiver: watch::Receiver, + watch_consensus_head_receiver: watch::Receiver, pending_tx_sender: broadcast::Sender, pub config: AppConfig, pub db_conn: Option, @@ -533,7 +533,8 @@ impl Web3ProxyApp { }; // TODO: i don't like doing Block::default here! Change this to "None"? - let (head_block_sender, head_block_receiver) = watch::channel(Arc::new(Block::default())); + let (watch_consensus_head_sender, watch_consensus_head_receiver) = + watch::channel(Arc::new(Block::default())); // TODO: will one receiver lagging be okay? how big should this be? let (pending_tx_sender, pending_tx_receiver) = broadcast::channel(256); @@ -570,7 +571,7 @@ impl Web3ProxyApp { http_client.clone(), vredis_pool.clone(), block_map.clone(), - Some(head_block_sender), + Some(watch_consensus_head_sender), top_config.app.min_sum_soft_limit, top_config.app.min_synced_rpcs, Some(pending_tx_sender.clone()), @@ -598,6 +599,8 @@ impl Web3ProxyApp { vredis_pool.clone(), block_map, // subscribing to new heads here won't work well. if they are fast, they might be ahead of balanced_rpcs + // they also often have low rate limits + // however, they are well connected to miners/validators. so maybe using them as a safety check would be good None, 0, 0, @@ -706,7 +709,7 @@ impl Web3ProxyApp { balanced_rpcs, private_rpcs, response_cache, - head_block_receiver, + watch_consensus_head_receiver, pending_tx_sender, pending_transactions, frontend_ip_rate_limiter, @@ -730,7 +733,7 @@ impl Web3ProxyApp { } pub fn head_block_receiver(&self) -> watch::Receiver { - self.head_block_receiver.clone() + self.watch_consensus_head_receiver.clone() } pub async fn prometheus_metrics(&self) -> String { @@ -1362,10 +1365,10 @@ impl Web3ProxyApp { method => { // emit stats - // TODO: if no servers synced, wait for them to be synced? - let head_block = self + // TODO: if no servers synced, wait for them to be synced? probably better to error and let haproxy retry another server + let head_block_num = self .balanced_rpcs - .head_block() + .head_block_num() .context("no servers synced")?; // we do this check before checking caches because it might modify the request params @@ -1375,7 +1378,7 @@ impl Web3ProxyApp { authorization, method, request.params.as_mut(), - head_block.number(), + head_block_num, &self.balanced_rpcs, ) .await? diff --git a/web3_proxy/src/app/ws.rs b/web3_proxy/src/app/ws.rs index e6ac30c0..582ea814 100644 --- a/web3_proxy/src/app/ws.rs +++ b/web3_proxy/src/app/ws.rs @@ -50,7 +50,7 @@ impl Web3ProxyApp { match request_json.params.as_ref() { Some(x) if x == &json!(["newHeads"]) => { let authorization = authorization.clone(); - let head_block_receiver = self.head_block_receiver.clone(); + let head_block_receiver = self.watch_consensus_head_receiver.clone(); let stat_sender = self.stat_sender.clone(); trace!("newHeads subscription {:?}", subscription_id); diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index 8b8cbce7..199fb65b 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -165,9 +165,15 @@ impl Web3Connections { let request: JsonRpcRequest = serde_json::from_value(request)?; // TODO: request_metadata? maybe we should put it in the authorization? - // TODO: don't hard code allowed lag + // TODO: think more about this wait_for_sync let response = self - .try_send_best_consensus_head_connection(authorization, request, None, None) + .try_send_best_consensus_head_connection( + authorization, + request, + None, + None, + true, + ) .await?; let block = response.result.context("failed fetching block")?; @@ -199,6 +205,7 @@ impl Web3Connections { } /// Get the heaviest chain's block from cache or backend rpc + /// Caution! If a future block is requested, this might wait forever. Be sure to have a timeout outside of this! pub async fn cannonical_block( &self, authorization: &Arc, @@ -208,23 +215,33 @@ impl Web3Connections { // maybe save them during save_block in a blocks_by_number Cache> // if theres multiple, use petgraph to find the one on the main chain (and remove the others if they have enough confirmations) + let mut consensus_head_receiver = self + .watch_consensus_head_receiver + .as_ref() + .context("need new head subscriptions to fetch cannonical_block")? + .clone(); + // be sure the requested block num exists - let head_block_num = self.head_block_num().context("no servers in sync")?; + let mut head_block_num = consensus_head_receiver.borrow_and_update().number; + + loop { + if let Some(head_block_num) = head_block_num { + if num <= &head_block_num { + break; + } + } + + consensus_head_receiver.changed().await?; + + head_block_num = consensus_head_receiver.borrow_and_update().number; + } + + let head_block_num = + head_block_num.expect("we should only get here if we have a head block"); // TODO: geth does 64, erigon does 90k. sometimes we run a mix let archive_needed = num < &(head_block_num - U64::from(64)); - if num > &head_block_num { - // TODO: i'm seeing this a lot when using ethspam. i dont know why though. i thought we delayed publishing - // TODO: instead of error, maybe just sleep and try again? - // TODO: this should be a 401, not a 500 - return Err(anyhow::anyhow!( - "Head block is #{}, but #{} was requested", - head_block_num, - num - )); - } - // try to get the hash from our cache // deref to not keep the lock open if let Some(block_hash) = self.block_numbers.get(num) { @@ -243,7 +260,7 @@ impl Web3Connections { // TODO: if error, retry? // TODO: request_metadata or authorization? let response = self - .try_send_best_consensus_head_connection(authorization, request, None, Some(num)) + .try_send_best_consensus_head_connection(authorization, request, None, Some(num), true) .await?; let raw_block = response.result.context("no block result")?; @@ -320,6 +337,8 @@ impl Web3Connections { .best_consensus_connections(authorization, self) .await; + // TODO: what should we do if the block number of new_synced_connections is < old_synced_connections? wait? + let includes_backups = new_synced_connections.includes_backups; let consensus_head_block = new_synced_connections.head_block.clone(); let num_consensus_rpcs = new_synced_connections.num_conns(); @@ -327,14 +346,14 @@ impl Web3Connections { let num_active_rpcs = consensus_finder.all.rpc_name_to_hash.len(); let total_rpcs = self.conns.len(); - let old_synced_connections = self - .synced_connections - .swap(Arc::new(new_synced_connections)); + let old_consensus_head_connections = self + .watch_consensus_connections_sender + .send_replace(Arc::new(new_synced_connections)); let includes_backups_str = if includes_backups { "B " } else { "" }; if let Some(consensus_saved_block) = consensus_head_block { - match &old_synced_connections.head_block { + match &old_consensus_head_connections.head_block { None => { debug!( "first {}{}/{}/{}/{} block={}, rpc={}", @@ -843,7 +862,13 @@ impl ConsensusFinder { Some(x) => x.number.expect("blocks here should always have a number"), }; - let min_block_num = highest_block_num.saturating_sub(U64::from(5)); + // TODO: also needs to be not less than our current head + let mut min_block_num = highest_block_num.saturating_sub(U64::from(5)); + + // we also want to be sure we don't ever go backwards! + if let Some(current_consensus_head_num) = web3_connections.head_block_num() { + min_block_num = min_block_num.max(current_consensus_head_num); + } // TODO: pass `min_block_num` to consensus_head_connections? let consensus_head_for_main = self diff --git a/web3_proxy/src/rpcs/connections.rs b/web3_proxy/src/rpcs/connections.rs index 4db3c308..d0c28e85 100644 --- a/web3_proxy/src/rpcs/connections.rs +++ b/web3_proxy/src/rpcs/connections.rs @@ -11,7 +11,6 @@ use crate::frontend::authorization::{Authorization, RequestMetadata}; use crate::frontend::rpc_proxy_ws::ProxyMode; use crate::jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest}; use crate::rpcs::transactions::TxStatus; -use arc_swap::ArcSwap; use counter::Counter; use derive_more::From; use ethers::prelude::{ProviderError, TxHash, H256, U64}; @@ -38,9 +37,12 @@ use tokio::time::{interval, sleep, sleep_until, Duration, Instant, MissedTickBeh /// A collection of web3 connections. Sends requests either the current best server or all servers. #[derive(From)] pub struct Web3Connections { - pub(crate) conns: HashMap>, /// any requests will be forwarded to one (or more) of these connections - pub(super) synced_connections: ArcSwap, + pub(crate) conns: HashMap>, + /// all providers with the same consensus head block. won't update if there is no `self.watch_consensus_head_sender` + pub(super) watch_consensus_connections_sender: watch::Sender>, + /// this head receiver makes it easy to wait until there is a new block + pub(super) watch_consensus_head_receiver: Option>, pub(super) pending_transactions: Cache, /// TODO: this map is going to grow forever unless we do some sort of pruning. maybe store pruned in redis? @@ -62,7 +64,7 @@ impl Web3Connections { http_client: Option, redis_pool: Option, block_map: BlockHashesCache, - head_block_sender: Option>, + watch_consensus_head_sender: Option>, min_sum_soft_limit: u32, min_head_rpcs: usize, pending_tx_sender: Option>, @@ -138,7 +140,7 @@ impl Web3Connections { let redis_pool = redis_pool.clone(); let http_interval_sender = http_interval_sender.clone(); - let block_sender = if head_block_sender.is_some() { + let block_sender = if watch_consensus_head_sender.is_some() { Some(block_sender.clone()) } else { None @@ -192,8 +194,6 @@ impl Web3Connections { } } - let synced_connections = ConsensusConnections::default(); - // TODO: max_capacity and time_to_idle from config // all block hashes are the same size, so no need for weigher let block_hashes = Cache::builder() @@ -206,9 +206,15 @@ impl Web3Connections { .max_capacity(10_000) .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()); + let (watch_consensus_connections_sender, _) = watch::channel(Default::default()); + + let watch_consensus_head_receiver = + watch_consensus_head_sender.as_ref().map(|x| x.subscribe()); + let connections = Arc::new(Self { conns: connections, - synced_connections: ArcSwap::new(Arc::new(synced_connections)), + watch_consensus_connections_sender, + watch_consensus_head_receiver, pending_transactions, block_hashes, block_numbers, @@ -228,7 +234,7 @@ impl Web3Connections { authorization, pending_tx_id_receiver, block_receiver, - head_block_sender, + watch_consensus_head_sender, pending_tx_sender, ) .await @@ -447,11 +453,12 @@ impl Web3Connections { (Option, u64), Vec>, > = { - let synced_connections = self.synced_connections.load(); + let synced_connections = self.watch_consensus_connections_sender.borrow().clone(); let head_block_num = if let Some(head_block) = synced_connections.head_block.as_ref() { head_block.number() } else { + // TODO: optionally wait for a head block >= min_block_needed return Ok(OpenRequestResult::NotReady); }; @@ -495,6 +502,7 @@ impl Web3Connections { } } cmp::Ordering::Greater => { + // TODO? if the blocks is close and wait_for_sync and allow_backups, wait for change on a watch_consensus_connections_receiver().subscribe() return Ok(OpenRequestResult::NotReady); } } @@ -712,18 +720,27 @@ impl Web3Connections { } /// be sure there is a timeout on this or it might loop forever + /// TODO: think more about wait_for_sync pub async fn try_send_best_consensus_head_connection( &self, authorization: &Arc, request: JsonRpcRequest, request_metadata: Option<&Arc>, min_block_needed: Option<&U64>, + wait_for_sync: bool, ) -> anyhow::Result { let mut skip_rpcs = vec![]; + let mut watch_consensus_connections = if wait_for_sync { + Some(self.watch_consensus_connections_sender.subscribe()) + } else { + None + }; + // TODO: maximum retries? right now its the total number of servers loop { // TODO: is self.conns still right now that we split main and backup servers? + // TODO: if a new block arrives, we probably want to reset the skip list if skip_rpcs.len() == self.conns.len() { // no servers to try break; @@ -833,9 +850,6 @@ impl Web3Connections { rpc, err ); - // TODO: sleep how long? until synced_connections changes or rate limits are available - // sleep(Duration::from_millis(100)).await; - continue; } } @@ -851,16 +865,38 @@ impl Web3Connections { request_metadata.no_servers.fetch_add(1, Ordering::Release); } - sleep_until(retry_at).await; - - continue; + if let Some(watch_consensus_connections) = watch_consensus_connections.as_mut() + { + // TODO: if there are other servers in synced_connections, we should continue now + // wait until retry_at OR synced_connections changes + tokio::select! { + _ = sleep_until(retry_at) => { + skip_rpcs.pop(); + } + _ = watch_consensus_connections.changed() => { + // TODO: would be nice to save this retry_at so we don't keep hitting limits + let _ = watch_consensus_connections.borrow_and_update(); + } + } + continue; + } else { + break; + } } OpenRequestResult::NotReady => { if let Some(request_metadata) = request_metadata { request_metadata.no_servers.fetch_add(1, Ordering::Release); } - break; + if wait_for_sync { + // TODO: race here. there might have been a change while we were waiting on the previous server + self.watch_consensus_connections_sender + .subscribe() + .changed() + .await?; + } else { + break; + } } } } @@ -979,6 +1015,7 @@ impl Web3Connections { request, request_metadata, min_block_needed, + true, ) .await } @@ -1007,8 +1044,11 @@ impl Serialize for Web3Connections { let conns: Vec<&Web3Connection> = self.conns.values().map(|x| x.as_ref()).collect(); state.serialize_field("conns", &conns)?; - let synced_connections = &**self.synced_connections.load(); - state.serialize_field("synced_connections", synced_connections)?; + { + let consensus_connections = self.watch_consensus_connections_sender.borrow().clone(); + // TODO: rename synced_connections to consensus_connections? + state.serialize_field("synced_connections", &consensus_connections)?; + } self.block_hashes.sync(); self.block_numbers.sync(); @@ -1128,9 +1168,13 @@ mod tests { (lagged_rpc.name.clone(), lagged_rpc.clone()), ]); + let (watch_consensus_connections_sender, _) = watch::channel(Default::default()); + + // TODO: make a Web3Connections::new let conns = Web3Connections { conns, - synced_connections: Default::default(), + watch_consensus_head_receiver: None, + watch_consensus_connections_sender, pending_transactions: Cache::builder() .max_capacity(10_000) .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()), @@ -1350,9 +1394,13 @@ mod tests { (archive_rpc.name.clone(), archive_rpc.clone()), ]); + let (watch_consensus_connections_sender, _) = watch::channel(Default::default()); + + // TODO: make a Web3Connections::new let conns = Web3Connections { conns, - synced_connections: Default::default(), + watch_consensus_head_receiver: None, + watch_consensus_connections_sender, pending_transactions: Cache::builder() .max_capacity(10) .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()), diff --git a/web3_proxy/src/rpcs/synced_connections.rs b/web3_proxy/src/rpcs/synced_connections.rs index 824857ce..224381df 100644 --- a/web3_proxy/src/rpcs/synced_connections.rs +++ b/web3_proxy/src/rpcs/synced_connections.rs @@ -1,4 +1,4 @@ -use super::blockchain::SavedBlock; +use super::blockchain::{ArcBlock, SavedBlock}; use super::connection::Web3Connection; use super::connections::Web3Connections; use ethers::prelude::{H256, U64}; @@ -43,31 +43,29 @@ impl fmt::Debug for ConsensusConnections { } impl Web3Connections { - pub fn head_block(&self) -> Option { - self.synced_connections.load().head_block.clone() + pub fn head_block(&self) -> Option { + self.watch_consensus_head_receiver + .as_ref() + .map(|x| x.borrow().clone()) } pub fn head_block_hash(&self) -> Option { - self.synced_connections - .load() - .head_block - .as_ref() - .map(|head_block| head_block.hash()) + self.head_block().and_then(|x| x.hash) } pub fn head_block_num(&self) -> Option { - self.synced_connections - .load() - .head_block - .as_ref() - .map(|head_block| head_block.number()) + self.head_block().and_then(|x| x.number) } pub fn synced(&self) -> bool { - !self.synced_connections.load().conns.is_empty() + !self + .watch_consensus_connections_sender + .borrow() + .conns + .is_empty() } pub fn num_synced_rpcs(&self) -> usize { - self.synced_connections.load().conns.len() + self.watch_consensus_connections_sender.borrow().conns.len() } }