From af7759e49187b5c1bea92d53a66e5074ae3f739d Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Fri, 16 Jun 2023 10:40:02 -0700 Subject: [PATCH] by_name needs a lock to prevent races during disconnects --- web3_proxy/src/app/mod.rs | 7 ----- web3_proxy/src/rpcs/blockchain.rs | 2 +- web3_proxy/src/rpcs/many.rs | 51 ++++++++++++++++--------------- 3 files changed, 28 insertions(+), 32 deletions(-) diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 8c68e80d..13244fd9 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -904,13 +904,6 @@ impl Web3ProxyApp { } }; - // app.pending_transactions.sync(); - // app.rpc_secret_key_cache.sync(); - // "pending_transactions_count": app.pending_transactions.entry_count(), - // "pending_transactions_size": app.pending_transactions.weighted_size(), - // "user_cache_count": app.rpc_secret_key_cache.entry_count(), - // "user_cache_size": app.rpc_secret_key_cache.weighted_size(), - #[derive(Serialize)] struct CombinedMetrics { recent_ip_counts: RecentCounts, diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index e8fb77b8..cea3b749 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -476,7 +476,7 @@ impl Web3Rpcs { let consensus_head_block = new_consensus_rpcs.head_block.clone(); let num_consensus_rpcs = new_consensus_rpcs.num_consensus_rpcs(); let num_active_rpcs = consensus_finder.len(); - let total_rpcs = self.by_name.load().len(); + let total_rpcs = self.len(); let new_consensus_rpcs = Arc::new(new_consensus_rpcs); diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index ecc62412..48cbdeac 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -11,7 +11,6 @@ use crate::frontend::rpc_proxy_ws::ProxyMode; use crate::frontend::status::MokaCacheSerializer; use crate::jsonrpc::{JsonRpcErrorData, JsonRpcParams, JsonRpcResultData}; use crate::rpcs::transactions::TxStatus; -use arc_swap::ArcSwap; use counter::Counter; use derive_more::From; use ethers::prelude::{ProviderError, TxHash, U64}; @@ -23,6 +22,7 @@ use itertools::Itertools; use log::{debug, error, info, trace, warn}; use migration::sea_orm::DatabaseConnection; use moka::future::{Cache, CacheBuilder}; +use parking_lot::RwLock; use serde::ser::{SerializeStruct, Serializer}; use serde::Serialize; use serde_json::json; @@ -38,11 +38,13 @@ use tokio::time::{sleep, sleep_until, Duration, Instant}; /// A collection of web3 connections. Sends requests either the current best server or all servers. #[derive(From)] pub struct Web3Rpcs { + /// TODO: this should be a Cow pub(crate) name: String, /// if watch_consensus_head_sender is some, Web3Rpc inside self will send blocks here when they get them pub(crate) block_sender: flume::Sender<(Option, Arc)>, /// any requests will be forwarded to one (or more) of these connections - pub(crate) by_name: ArcSwap>>, + /// TODO: hopefully this not being an async lock will be okay. if you need it across awaits, clone the arc + pub(crate) by_name: RwLock>>, /// all providers with the same consensus head block. won't update if there is no `self.watch_consensus_head_sender` /// TODO: document that this is a watch sender and not a broadcast! if things get busy, blocks might get missed /// TODO: why is watch_consensus_head_sender in an Option, but this one isn't? @@ -215,15 +217,10 @@ impl Web3Rpcs { match x { Ok(Ok((rpc, _handle))) => { // web3 connection worked - let mut new_by_name = (*self.by_name.load_full()).clone(); - // make sure that any new requests use the new connection - let old_rpc = new_by_name.insert(rpc.name.clone(), rpc.clone()); + // clean up the old rpc if it exists + let old_rpc = self.by_name.read().get(&rpc.name).map(Arc::clone); - // update the arc swap - self.by_name.store(Arc::new(new_by_name)); - - // clean up the old rpc if let Some(old_rpc) = old_rpc { trace!("old_rpc: {}", old_rpc); @@ -241,11 +238,17 @@ impl Web3Rpcs { } } + // new rpc is synced (or old one was not synced). update the local map + // make sure that any new requests use the new connection + self.by_name.write().insert(rpc.name.clone(), rpc); + // tell the old rpc to disconnect if let Some(ref disconnect_sender) = old_rpc.disconnect_watch { trace!("telling {} to disconnect", old_rpc); disconnect_sender.send_replace(true); } + } else { + self.by_name.write().insert(rpc.name.clone(), rpc); } } Ok(Err(err)) => { @@ -261,7 +264,7 @@ impl Web3Rpcs { } } - let num_rpcs = self.by_name.load().len(); + let num_rpcs = self.len(); if num_rpcs < self.min_synced_rpcs { return Err(Web3ProxyError::NotEnoughRpcs { @@ -274,15 +277,15 @@ impl Web3Rpcs { } pub fn get(&self, conn_name: &str) -> Option> { - self.by_name.load().get(conn_name).cloned() + self.by_name.read().get(conn_name).map(Arc::clone) } pub fn len(&self) -> usize { - self.by_name.load().len() + self.by_name.read().len() } pub fn is_empty(&self) -> bool { - self.by_name.load().is_empty() + self.by_name.read().is_empty() } pub fn min_head_rpcs(&self) -> usize { @@ -512,9 +515,9 @@ impl Web3Rpcs { if self.watch_consensus_head_sender.is_none() { // this Web3Rpcs is not tracking head blocks. pick any server - let by_name = self.by_name.load(); - - let mut potential_rpcs: Vec<_> = by_name + let mut potential_rpcs: Vec<_> = self + .by_name + .read() .values() .filter(|rpc| !skip_rpcs.contains(rpc)) .filter(|rpc| { @@ -553,7 +556,7 @@ impl Web3Rpcs { let mut watch_consensus_rpcs = self.watch_consensus_rpcs_sender.subscribe(); - let mut potential_rpcs = Vec::with_capacity(self.by_name.load().len()); + let mut potential_rpcs = Vec::with_capacity(self.len()); loop { let consensus_rpcs = watch_consensus_rpcs.borrow_and_update().clone(); @@ -751,7 +754,7 @@ impl Web3Rpcs { let mut max_count = if let Some(max_count) = max_count { max_count } else { - self.by_name.load().len() + self.len() }; trace!("max_count: {}", max_count); @@ -777,7 +780,7 @@ impl Web3Rpcs { // if there aren't enough synced connections, include more connections // TODO: only do this sorting if the synced_rpcs isn't enough - let mut all_rpcs: Vec<_> = self.by_name.load().values().cloned().collect(); + let mut all_rpcs: Vec<_> = self.by_name.read().values().cloned().collect(); all_rpcs.sort_by_cached_key(|x| x.sort_for_load_balancing_on(max_block_needed.copied())); trace!("all_rpcs: {:#?}", all_rpcs); @@ -1071,7 +1074,7 @@ impl Web3Rpcs { return Err(err.into()); } - let num_conns = self.by_name.load().len(); + let num_conns = self.len(); let num_skipped = skip_rpcs.len(); let needed = min_block_needed.max(max_block_needed); @@ -1269,7 +1272,7 @@ impl Serialize for Web3Rpcs { let mut state = serializer.serialize_struct("Web3Rpcs", 8)?; { - let by_name = self.by_name.load(); + let by_name = self.by_name.read(); let rpcs: Vec<&Web3Rpc> = by_name.values().map(|x| x.as_ref()).collect(); // TODO: coordinate with frontend team to rename "conns" to "rpcs" state.serialize_field("conns", &rpcs)?; @@ -1509,7 +1512,7 @@ mod tests { // TODO: make a Web3Rpcs::new let rpcs = Web3Rpcs { block_sender: block_sender.clone(), - by_name: ArcSwap::from_pointee(rpcs_by_name), + by_name: RwLock::new(rpcs_by_name), name: "test".to_string(), watch_consensus_head_sender: Some(watch_consensus_head_sender), watch_consensus_rpcs_sender, @@ -1789,7 +1792,7 @@ mod tests { let rpcs = Web3Rpcs { block_sender, - by_name: ArcSwap::from_pointee(rpcs_by_name), + by_name: RwLock::new(rpcs_by_name), name: "test".to_string(), watch_consensus_head_sender: Some(watch_consensus_head_sender), watch_consensus_rpcs_sender, @@ -1975,7 +1978,7 @@ mod tests { // TODO: make a Web3Rpcs::new let rpcs = Web3Rpcs { block_sender, - by_name: ArcSwap::from_pointee(rpcs_by_name), + by_name: RwLock::new(rpcs_by_name), name: "test".to_string(), watch_consensus_head_sender: Some(watch_consensus_head_sender), watch_consensus_rpcs_sender,