diff --git a/web3_proxy/src/frontend/rpc_proxy_ws.rs b/web3_proxy/src/frontend/rpc_proxy_ws.rs index 727d7f35..20cd93e7 100644 --- a/web3_proxy/src/frontend/rpc_proxy_ws.rs +++ b/web3_proxy/src/frontend/rpc_proxy_ws.rs @@ -34,7 +34,7 @@ use std::net::IpAddr; use std::str::from_utf8_mut; use std::sync::atomic::AtomicU64; use std::sync::Arc; -use tokio::sync::{broadcast, OwnedSemaphorePermit, RwLock}; +use tokio::sync::{broadcast, OwnedSemaphorePermit, RwLock as AsyncRwLock}; use tracing::{info, trace}; /// How to select backend servers for a request @@ -319,7 +319,7 @@ async fn handle_socket_payload( payload: &str, response_sender: &flume::Sender, subscription_count: &AtomicU64, - subscriptions: Arc>>, + subscriptions: Arc>>, ) -> Web3ProxyResult<(Message, Option)> { let (authorization, semaphore) = authorization.check_again(&app).await?; @@ -436,8 +436,7 @@ async fn read_web3_socket( mut ws_rx: SplitStream, response_sender: flume::Sender, ) { - // RwLock should be fine here. a user isn't going to be opening tons of subscriptions - let subscriptions = Arc::new(RwLock::new(HashMap::new())); + let subscriptions = Arc::new(AsyncRwLock::new(HashMap::new())); let subscription_count = Arc::new(AtomicU64::new(1)); let (close_sender, mut close_receiver) = broadcast::channel(1); diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index 5a620e63..b4ee288a 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -658,7 +658,10 @@ impl ConsensusFinder { let latency = first_seen.elapsed(); // record the time behind the fastest node - rpc.head_delay.write().record_secs(latency.as_secs_f32()); + rpc.head_delay + .write() + .await + .record_secs(latency.as_secs_f32()); // update the local mapping of rpc -> block self.rpc_heads.insert(rpc, block) diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 6bfa5e5f..3746e331 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -20,8 +20,7 @@ use futures::StreamExt; use hashbrown::HashMap; use itertools::Itertools; use migration::sea_orm::DatabaseConnection; -use moka::future::{Cache, CacheBuilder}; -use parking_lot::RwLock; +use moka::future::{Cache, CacheBuilder, ConcurrentCacheExt}; use serde::ser::{SerializeStruct, Serializer}; use serde::Serialize; use serde_json::json; @@ -45,7 +44,7 @@ pub struct Web3Rpcs { pub(crate) block_sender: flume::Sender<(Option, Arc)>, /// any requests will be forwarded to one (or more) of these connections /// TODO: hopefully this not being an async lock will be okay. if you need it across awaits, clone the arc - pub(crate) by_name: RwLock>>, + pub(crate) by_name: Cache>, /// all providers with the same consensus head block. won't update if there is no `self.watch_consensus_head_sender` /// TODO: document that this is a watch sender and not a broadcast! if things get busy, blocks might get missed /// TODO: why is watch_consensus_head_sender in an Option, but this one isn't? @@ -114,7 +113,7 @@ impl Web3Rpcs { watch::channel(Default::default()); // by_name starts empty. self.apply_server_configs will add to it - let by_name = Default::default(); + let by_name = Cache::builder().build(); let max_head_block_lag = max_head_block_lag.unwrap_or(5.into()); @@ -230,9 +229,7 @@ impl Web3Rpcs { // web3 connection worked // clean up the old rpc if it exists - let old_rpc = self.by_name.read().get(&rpc.name).map(Arc::clone); - - if let Some(old_rpc) = old_rpc { + if let Some(old_rpc) = self.by_name.get(&rpc.name) { trace!("old_rpc: {}", old_rpc); // if the old rpc was synced, wait for the new one to sync @@ -251,7 +248,8 @@ impl Web3Rpcs { // new rpc is synced (or old one was not synced). update the local map // make sure that any new requests use the new connection - self.by_name.write().insert(rpc.name.clone(), rpc); + self.by_name.insert(rpc.name.clone(), rpc).await; + self.by_name.sync(); // tell the old rpc to disconnect if let Some(ref disconnect_sender) = old_rpc.disconnect_watch { @@ -259,7 +257,8 @@ impl Web3Rpcs { disconnect_sender.send_replace(true); } } else { - self.by_name.write().insert(rpc.name.clone(), rpc); + self.by_name.insert(rpc.name.clone(), rpc).await; + self.by_name.sync(); } } Ok(Err(err)) => { @@ -288,17 +287,20 @@ impl Web3Rpcs { } pub fn get(&self, conn_name: &str) -> Option> { - self.by_name.read().get(conn_name).map(Arc::clone) + self.by_name.get(conn_name) } pub fn len(&self) -> usize { - self.by_name.read().len() + // // TODO: this seems not great. investigate better ways to do this + // self.by_name.sync(); + self.by_name.entry_count() as usize } pub fn is_empty(&self) -> bool { - self.by_name.read().is_empty() + self.len() > 0 } + /// TODO: rename to be consistent between "head" and "synced" pub fn min_head_rpcs(&self) -> usize { self.min_synced_rpcs } @@ -527,7 +529,7 @@ impl Web3Rpcs { let mut watch_ranked_rpcs = self.watch_ranked_rpcs.subscribe(); - let mut potential_rpcs = Vec::with_capacity(self.len()); + let mut potential_rpcs = Vec::new(); loop { // TODO: need a change so that protected and 4337 rpcs set watch_consensus_rpcs on start @@ -649,10 +651,13 @@ impl Web3Rpcs { ) -> Result, Option> { let mut earliest_retry_at = None; + // TODO: filter the rpcs with Ranked.will_work_now + let mut all_rpcs: Vec<_> = self.by_name.iter().map(|x| x.1).collect(); + let mut max_count = if let Some(max_count) = max_count { max_count } else { - self.len() + all_rpcs.len() }; trace!("max_count: {}", max_count); @@ -664,9 +669,6 @@ impl Web3Rpcs { let mut selected_rpcs = Vec::with_capacity(max_count); - // TODO: filter the rpcs with Ranked.will_work_now - let mut all_rpcs: Vec<_> = self.by_name.read().values().cloned().collect(); - // TODO: this sorts them all even though we probably won't need all of them. think about this more all_rpcs.sort_by_cached_key(|x| x.sort_for_load_balancing_on(max_block_needed.copied())); @@ -1298,8 +1300,7 @@ impl Serialize for Web3Rpcs { let mut state = serializer.serialize_struct("Web3Rpcs", 6)?; { - let by_name = self.by_name.read(); - let rpcs: Vec<&Web3Rpc> = by_name.values().map(|x| x.as_ref()).collect(); + let rpcs: Vec> = self.by_name.iter().map(|x| x.1).collect(); // TODO: coordinate with frontend team to rename "conns" to "rpcs" state.serialize_field("conns", &rpcs)?; } @@ -1352,7 +1353,6 @@ mod tests { use ethers::types::{Block, U256}; use latency::PeakEwmaLatency; use moka::future::CacheBuilder; - use parking_lot::RwLock; use std::time::{SystemTime, UNIX_EPOCH}; use tracing::trace; @@ -1508,11 +1508,6 @@ mod tests { let head_rpc = Arc::new(head_rpc); let lagged_rpc = Arc::new(lagged_rpc); - let rpcs_by_name = HashMap::from([ - (head_rpc.name.clone(), head_rpc.clone()), - (lagged_rpc.name.clone(), lagged_rpc.clone()), - ]); - let (block_sender, _block_receiver) = flume::unbounded(); let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded(); let (watch_ranked_rpcs, _watch_consensus_rpcs_receiver) = watch::channel(None); @@ -1520,10 +1515,18 @@ mod tests { let chain_id = 1; + let by_name = Cache::builder().build(); + by_name + .insert(head_rpc.name.clone(), head_rpc.clone()) + .await; + by_name + .insert(lagged_rpc.name.clone(), lagged_rpc.clone()) + .await; + // TODO: make a Web3Rpcs::new let rpcs = Web3Rpcs { block_sender: block_sender.clone(), - by_name: RwLock::new(rpcs_by_name), + by_name, chain_id, name: "test".to_string(), watch_head_block: Some(watch_consensus_head_sender), @@ -1779,11 +1782,6 @@ mod tests { let pruned_rpc = Arc::new(pruned_rpc); let archive_rpc = Arc::new(archive_rpc); - let rpcs_by_name = HashMap::from([ - (pruned_rpc.name.clone(), pruned_rpc.clone()), - (archive_rpc.name.clone(), archive_rpc.clone()), - ]); - let (block_sender, _) = flume::unbounded(); let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded(); let (watch_ranked_rpcs, _) = watch::channel(None); @@ -1791,9 +1789,17 @@ mod tests { let chain_id = 1; + let by_name = Cache::builder().build(); + by_name + .insert(pruned_rpc.name.clone(), pruned_rpc.clone()) + .await; + by_name + .insert(archive_rpc.name.clone(), archive_rpc.clone()) + .await; + let rpcs = Web3Rpcs { block_sender, - by_name: RwLock::new(rpcs_by_name), + by_name, chain_id, name: "test".to_string(), watch_head_block: Some(watch_consensus_head_sender), @@ -1964,14 +1970,6 @@ mod tests { let mock_geth = Arc::new(mock_geth); let mock_erigon_archive = Arc::new(mock_erigon_archive); - let rpcs_by_name = HashMap::from([ - (mock_geth.name.clone(), mock_geth.clone()), - ( - mock_erigon_archive.name.clone(), - mock_erigon_archive.clone(), - ), - ]); - let (block_sender, _) = flume::unbounded(); let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded(); let (watch_ranked_rpcs, _) = watch::channel(None); @@ -1979,10 +1977,21 @@ mod tests { let chain_id = 1; + let by_name = Cache::builder().build(); + by_name + .insert(mock_geth.name.clone(), mock_geth.clone()) + .await; + by_name + .insert( + mock_erigon_archive.name.clone(), + mock_erigon_archive.clone(), + ) + .await; + // TODO: make a Web3Rpcs::new let rpcs = Web3Rpcs { block_sender, - by_name: RwLock::new(rpcs_by_name), + by_name, chain_id, name: "test".to_string(), watch_head_block: Some(watch_consensus_head_sender), diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index 1e58e97f..ce8c9cf8 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -17,7 +17,6 @@ use futures::StreamExt; use latency::{EwmaLatency, PeakEwmaLatency, RollingQuantileLatency}; use migration::sea_orm::DatabaseConnection; use nanorand::Rng; -use parking_lot::RwLock; use redis_rate_limiter::{RedisPool, RedisRateLimitResult, RedisRateLimiter}; use serde::ser::{SerializeStruct, Serializer}; use serde::Serialize; @@ -27,7 +26,7 @@ use std::fmt; use std::hash::{Hash, Hasher}; use std::sync::atomic::{self, AtomicU32, AtomicU64, AtomicUsize}; use std::{cmp::Ordering, sync::Arc}; -use tokio::sync::watch; +use tokio::sync::{watch, RwLock as AsyncRwLock}; use tokio::time::{interval, sleep, sleep_until, Duration, Instant, MissedTickBehavior}; use tracing::{debug, info, trace, warn, Level}; use url::Url; @@ -62,8 +61,7 @@ pub struct Web3Rpc { /// head_block is only inside an Option so that the "Default" derive works. it will always be set. pub(super) head_block: Option>>, /// Track head block latency. - /// RwLock is fine because this isn't updated often and is for monitoring. It is not used on the hot path. - pub(super) head_delay: RwLock, + pub(super) head_delay: AsyncRwLock, /// Track peak request latency /// peak_latency is only inside an Option so that the "Default" derive works. it will always be set. pub(super) peak_latency: Option, @@ -1178,8 +1176,8 @@ impl Serialize for Web3Rpc { where S: Serializer, { - // 3 is the number of fields in the struct. - let mut state = serializer.serialize_struct("Web3Rpc", 14)?; + // 14 if we bring head_delay back + let mut state = serializer.serialize_struct("Web3Rpc", 13)?; // the url is excluded because it likely includes private information. just show the name that we use in keys state.serialize_field("name", &self.name)?; @@ -1224,10 +1222,10 @@ impl Serialize for Web3Rpc { &self.active_requests.load(atomic::Ordering::Relaxed), )?; - { - let head_delay_ms = self.head_delay.read().latency().as_secs_f32() * 1000.0; - state.serialize_field("head_delay_ms", &(head_delay_ms))?; - } + // { + // let head_delay_ms = self.head_delay.read().await.latency().as_secs_f32() * 1000.0; + // state.serialize_field("head_delay_ms", &(head_delay_ms))?; + // } { let median_latency_ms = self @@ -1390,7 +1388,7 @@ mod tests { backup: false, block_data_limit: block_data_limit.into(), tier: 0, - head_block: RwLock::new(Some(head_block.clone())), + head_block: AsyncRwLock::new(Some(head_block.clone())), }; assert!(!x.has_block_data(&0.into()));