remove parking_lot

This commit is contained in:
Bryan Stitt 2023-07-10 11:05:07 -07:00
parent 2ea53cc396
commit 9ef08c80e1
4 changed files with 66 additions and 57 deletions

@ -34,7 +34,7 @@ use std::net::IpAddr;
use std::str::from_utf8_mut; use std::str::from_utf8_mut;
use std::sync::atomic::AtomicU64; use std::sync::atomic::AtomicU64;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::{broadcast, OwnedSemaphorePermit, RwLock}; use tokio::sync::{broadcast, OwnedSemaphorePermit, RwLock as AsyncRwLock};
use tracing::{info, trace}; use tracing::{info, trace};
/// How to select backend servers for a request /// How to select backend servers for a request
@ -319,7 +319,7 @@ async fn handle_socket_payload(
payload: &str, payload: &str,
response_sender: &flume::Sender<Message>, response_sender: &flume::Sender<Message>,
subscription_count: &AtomicU64, subscription_count: &AtomicU64,
subscriptions: Arc<RwLock<HashMap<U64, AbortHandle>>>, subscriptions: Arc<AsyncRwLock<HashMap<U64, AbortHandle>>>,
) -> Web3ProxyResult<(Message, Option<OwnedSemaphorePermit>)> { ) -> Web3ProxyResult<(Message, Option<OwnedSemaphorePermit>)> {
let (authorization, semaphore) = authorization.check_again(&app).await?; let (authorization, semaphore) = authorization.check_again(&app).await?;
@ -436,8 +436,7 @@ async fn read_web3_socket(
mut ws_rx: SplitStream<WebSocket>, mut ws_rx: SplitStream<WebSocket>,
response_sender: flume::Sender<Message>, response_sender: flume::Sender<Message>,
) { ) {
// RwLock should be fine here. a user isn't going to be opening tons of subscriptions let subscriptions = Arc::new(AsyncRwLock::new(HashMap::new()));
let subscriptions = Arc::new(RwLock::new(HashMap::new()));
let subscription_count = Arc::new(AtomicU64::new(1)); let subscription_count = Arc::new(AtomicU64::new(1));
let (close_sender, mut close_receiver) = broadcast::channel(1); let (close_sender, mut close_receiver) = broadcast::channel(1);

@ -658,7 +658,10 @@ impl ConsensusFinder {
let latency = first_seen.elapsed(); let latency = first_seen.elapsed();
// record the time behind the fastest node // record the time behind the fastest node
rpc.head_delay.write().record_secs(latency.as_secs_f32()); rpc.head_delay
.write()
.await
.record_secs(latency.as_secs_f32());
// update the local mapping of rpc -> block // update the local mapping of rpc -> block
self.rpc_heads.insert(rpc, block) self.rpc_heads.insert(rpc, block)

@ -20,8 +20,7 @@ use futures::StreamExt;
use hashbrown::HashMap; use hashbrown::HashMap;
use itertools::Itertools; use itertools::Itertools;
use migration::sea_orm::DatabaseConnection; use migration::sea_orm::DatabaseConnection;
use moka::future::{Cache, CacheBuilder}; use moka::future::{Cache, CacheBuilder, ConcurrentCacheExt};
use parking_lot::RwLock;
use serde::ser::{SerializeStruct, Serializer}; use serde::ser::{SerializeStruct, Serializer};
use serde::Serialize; use serde::Serialize;
use serde_json::json; use serde_json::json;
@ -45,7 +44,7 @@ pub struct Web3Rpcs {
pub(crate) block_sender: flume::Sender<(Option<Web3ProxyBlock>, Arc<Web3Rpc>)>, pub(crate) block_sender: flume::Sender<(Option<Web3ProxyBlock>, Arc<Web3Rpc>)>,
/// any requests will be forwarded to one (or more) of these connections /// any requests will be forwarded to one (or more) of these connections
/// TODO: hopefully this not being an async lock will be okay. if you need it across awaits, clone the arc /// TODO: hopefully this not being an async lock will be okay. if you need it across awaits, clone the arc
pub(crate) by_name: RwLock<HashMap<String, Arc<Web3Rpc>>>, pub(crate) by_name: Cache<String, Arc<Web3Rpc>>,
/// all providers with the same consensus head block. won't update if there is no `self.watch_consensus_head_sender` /// all providers with the same consensus head block. won't update if there is no `self.watch_consensus_head_sender`
/// TODO: document that this is a watch sender and not a broadcast! if things get busy, blocks might get missed /// TODO: document that this is a watch sender and not a broadcast! if things get busy, blocks might get missed
/// TODO: why is watch_consensus_head_sender in an Option, but this one isn't? /// TODO: why is watch_consensus_head_sender in an Option, but this one isn't?
@ -114,7 +113,7 @@ impl Web3Rpcs {
watch::channel(Default::default()); watch::channel(Default::default());
// by_name starts empty. self.apply_server_configs will add to it // by_name starts empty. self.apply_server_configs will add to it
let by_name = Default::default(); let by_name = Cache::builder().build();
let max_head_block_lag = max_head_block_lag.unwrap_or(5.into()); let max_head_block_lag = max_head_block_lag.unwrap_or(5.into());
@ -230,9 +229,7 @@ impl Web3Rpcs {
// web3 connection worked // web3 connection worked
// clean up the old rpc if it exists // clean up the old rpc if it exists
let old_rpc = self.by_name.read().get(&rpc.name).map(Arc::clone); if let Some(old_rpc) = self.by_name.get(&rpc.name) {
if let Some(old_rpc) = old_rpc {
trace!("old_rpc: {}", old_rpc); trace!("old_rpc: {}", old_rpc);
// if the old rpc was synced, wait for the new one to sync // if the old rpc was synced, wait for the new one to sync
@ -251,7 +248,8 @@ impl Web3Rpcs {
// new rpc is synced (or old one was not synced). update the local map // new rpc is synced (or old one was not synced). update the local map
// make sure that any new requests use the new connection // make sure that any new requests use the new connection
self.by_name.write().insert(rpc.name.clone(), rpc); self.by_name.insert(rpc.name.clone(), rpc).await;
self.by_name.sync();
// tell the old rpc to disconnect // tell the old rpc to disconnect
if let Some(ref disconnect_sender) = old_rpc.disconnect_watch { if let Some(ref disconnect_sender) = old_rpc.disconnect_watch {
@ -259,7 +257,8 @@ impl Web3Rpcs {
disconnect_sender.send_replace(true); disconnect_sender.send_replace(true);
} }
} else { } else {
self.by_name.write().insert(rpc.name.clone(), rpc); self.by_name.insert(rpc.name.clone(), rpc).await;
self.by_name.sync();
} }
} }
Ok(Err(err)) => { Ok(Err(err)) => {
@ -288,17 +287,20 @@ impl Web3Rpcs {
} }
pub fn get(&self, conn_name: &str) -> Option<Arc<Web3Rpc>> { pub fn get(&self, conn_name: &str) -> Option<Arc<Web3Rpc>> {
self.by_name.read().get(conn_name).map(Arc::clone) self.by_name.get(conn_name)
} }
pub fn len(&self) -> usize { pub fn len(&self) -> usize {
self.by_name.read().len() // // TODO: this seems not great. investigate better ways to do this
// self.by_name.sync();
self.by_name.entry_count() as usize
} }
pub fn is_empty(&self) -> bool { pub fn is_empty(&self) -> bool {
self.by_name.read().is_empty() self.len() > 0
} }
/// TODO: rename to be consistent between "head" and "synced"
pub fn min_head_rpcs(&self) -> usize { pub fn min_head_rpcs(&self) -> usize {
self.min_synced_rpcs self.min_synced_rpcs
} }
@ -527,7 +529,7 @@ impl Web3Rpcs {
let mut watch_ranked_rpcs = self.watch_ranked_rpcs.subscribe(); let mut watch_ranked_rpcs = self.watch_ranked_rpcs.subscribe();
let mut potential_rpcs = Vec::with_capacity(self.len()); let mut potential_rpcs = Vec::new();
loop { loop {
// TODO: need a change so that protected and 4337 rpcs set watch_consensus_rpcs on start // TODO: need a change so that protected and 4337 rpcs set watch_consensus_rpcs on start
@ -649,10 +651,13 @@ impl Web3Rpcs {
) -> Result<Vec<OpenRequestHandle>, Option<Instant>> { ) -> Result<Vec<OpenRequestHandle>, Option<Instant>> {
let mut earliest_retry_at = None; let mut earliest_retry_at = None;
// TODO: filter the rpcs with Ranked.will_work_now
let mut all_rpcs: Vec<_> = self.by_name.iter().map(|x| x.1).collect();
let mut max_count = if let Some(max_count) = max_count { let mut max_count = if let Some(max_count) = max_count {
max_count max_count
} else { } else {
self.len() all_rpcs.len()
}; };
trace!("max_count: {}", max_count); trace!("max_count: {}", max_count);
@ -664,9 +669,6 @@ impl Web3Rpcs {
let mut selected_rpcs = Vec::with_capacity(max_count); let mut selected_rpcs = Vec::with_capacity(max_count);
// TODO: filter the rpcs with Ranked.will_work_now
let mut all_rpcs: Vec<_> = self.by_name.read().values().cloned().collect();
// TODO: this sorts them all even though we probably won't need all of them. think about this more // TODO: this sorts them all even though we probably won't need all of them. think about this more
all_rpcs.sort_by_cached_key(|x| x.sort_for_load_balancing_on(max_block_needed.copied())); all_rpcs.sort_by_cached_key(|x| x.sort_for_load_balancing_on(max_block_needed.copied()));
@ -1298,8 +1300,7 @@ impl Serialize for Web3Rpcs {
let mut state = serializer.serialize_struct("Web3Rpcs", 6)?; let mut state = serializer.serialize_struct("Web3Rpcs", 6)?;
{ {
let by_name = self.by_name.read(); let rpcs: Vec<Arc<Web3Rpc>> = self.by_name.iter().map(|x| x.1).collect();
let rpcs: Vec<&Web3Rpc> = by_name.values().map(|x| x.as_ref()).collect();
// TODO: coordinate with frontend team to rename "conns" to "rpcs" // TODO: coordinate with frontend team to rename "conns" to "rpcs"
state.serialize_field("conns", &rpcs)?; state.serialize_field("conns", &rpcs)?;
} }
@ -1352,7 +1353,6 @@ mod tests {
use ethers::types::{Block, U256}; use ethers::types::{Block, U256};
use latency::PeakEwmaLatency; use latency::PeakEwmaLatency;
use moka::future::CacheBuilder; use moka::future::CacheBuilder;
use parking_lot::RwLock;
use std::time::{SystemTime, UNIX_EPOCH}; use std::time::{SystemTime, UNIX_EPOCH};
use tracing::trace; use tracing::trace;
@ -1508,11 +1508,6 @@ mod tests {
let head_rpc = Arc::new(head_rpc); let head_rpc = Arc::new(head_rpc);
let lagged_rpc = Arc::new(lagged_rpc); let lagged_rpc = Arc::new(lagged_rpc);
let rpcs_by_name = HashMap::from([
(head_rpc.name.clone(), head_rpc.clone()),
(lagged_rpc.name.clone(), lagged_rpc.clone()),
]);
let (block_sender, _block_receiver) = flume::unbounded(); let (block_sender, _block_receiver) = flume::unbounded();
let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded(); let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded();
let (watch_ranked_rpcs, _watch_consensus_rpcs_receiver) = watch::channel(None); let (watch_ranked_rpcs, _watch_consensus_rpcs_receiver) = watch::channel(None);
@ -1520,10 +1515,18 @@ mod tests {
let chain_id = 1; let chain_id = 1;
let by_name = Cache::builder().build();
by_name
.insert(head_rpc.name.clone(), head_rpc.clone())
.await;
by_name
.insert(lagged_rpc.name.clone(), lagged_rpc.clone())
.await;
// TODO: make a Web3Rpcs::new // TODO: make a Web3Rpcs::new
let rpcs = Web3Rpcs { let rpcs = Web3Rpcs {
block_sender: block_sender.clone(), block_sender: block_sender.clone(),
by_name: RwLock::new(rpcs_by_name), by_name,
chain_id, chain_id,
name: "test".to_string(), name: "test".to_string(),
watch_head_block: Some(watch_consensus_head_sender), watch_head_block: Some(watch_consensus_head_sender),
@ -1779,11 +1782,6 @@ mod tests {
let pruned_rpc = Arc::new(pruned_rpc); let pruned_rpc = Arc::new(pruned_rpc);
let archive_rpc = Arc::new(archive_rpc); let archive_rpc = Arc::new(archive_rpc);
let rpcs_by_name = HashMap::from([
(pruned_rpc.name.clone(), pruned_rpc.clone()),
(archive_rpc.name.clone(), archive_rpc.clone()),
]);
let (block_sender, _) = flume::unbounded(); let (block_sender, _) = flume::unbounded();
let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded(); let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded();
let (watch_ranked_rpcs, _) = watch::channel(None); let (watch_ranked_rpcs, _) = watch::channel(None);
@ -1791,9 +1789,17 @@ mod tests {
let chain_id = 1; let chain_id = 1;
let by_name = Cache::builder().build();
by_name
.insert(pruned_rpc.name.clone(), pruned_rpc.clone())
.await;
by_name
.insert(archive_rpc.name.clone(), archive_rpc.clone())
.await;
let rpcs = Web3Rpcs { let rpcs = Web3Rpcs {
block_sender, block_sender,
by_name: RwLock::new(rpcs_by_name), by_name,
chain_id, chain_id,
name: "test".to_string(), name: "test".to_string(),
watch_head_block: Some(watch_consensus_head_sender), watch_head_block: Some(watch_consensus_head_sender),
@ -1964,14 +1970,6 @@ mod tests {
let mock_geth = Arc::new(mock_geth); let mock_geth = Arc::new(mock_geth);
let mock_erigon_archive = Arc::new(mock_erigon_archive); let mock_erigon_archive = Arc::new(mock_erigon_archive);
let rpcs_by_name = HashMap::from([
(mock_geth.name.clone(), mock_geth.clone()),
(
mock_erigon_archive.name.clone(),
mock_erigon_archive.clone(),
),
]);
let (block_sender, _) = flume::unbounded(); let (block_sender, _) = flume::unbounded();
let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded(); let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded();
let (watch_ranked_rpcs, _) = watch::channel(None); let (watch_ranked_rpcs, _) = watch::channel(None);
@ -1979,10 +1977,21 @@ mod tests {
let chain_id = 1; let chain_id = 1;
let by_name = Cache::builder().build();
by_name
.insert(mock_geth.name.clone(), mock_geth.clone())
.await;
by_name
.insert(
mock_erigon_archive.name.clone(),
mock_erigon_archive.clone(),
)
.await;
// TODO: make a Web3Rpcs::new // TODO: make a Web3Rpcs::new
let rpcs = Web3Rpcs { let rpcs = Web3Rpcs {
block_sender, block_sender,
by_name: RwLock::new(rpcs_by_name), by_name,
chain_id, chain_id,
name: "test".to_string(), name: "test".to_string(),
watch_head_block: Some(watch_consensus_head_sender), watch_head_block: Some(watch_consensus_head_sender),

@ -17,7 +17,6 @@ use futures::StreamExt;
use latency::{EwmaLatency, PeakEwmaLatency, RollingQuantileLatency}; use latency::{EwmaLatency, PeakEwmaLatency, RollingQuantileLatency};
use migration::sea_orm::DatabaseConnection; use migration::sea_orm::DatabaseConnection;
use nanorand::Rng; use nanorand::Rng;
use parking_lot::RwLock;
use redis_rate_limiter::{RedisPool, RedisRateLimitResult, RedisRateLimiter}; use redis_rate_limiter::{RedisPool, RedisRateLimitResult, RedisRateLimiter};
use serde::ser::{SerializeStruct, Serializer}; use serde::ser::{SerializeStruct, Serializer};
use serde::Serialize; use serde::Serialize;
@ -27,7 +26,7 @@ use std::fmt;
use std::hash::{Hash, Hasher}; use std::hash::{Hash, Hasher};
use std::sync::atomic::{self, AtomicU32, AtomicU64, AtomicUsize}; use std::sync::atomic::{self, AtomicU32, AtomicU64, AtomicUsize};
use std::{cmp::Ordering, sync::Arc}; use std::{cmp::Ordering, sync::Arc};
use tokio::sync::watch; use tokio::sync::{watch, RwLock as AsyncRwLock};
use tokio::time::{interval, sleep, sleep_until, Duration, Instant, MissedTickBehavior}; use tokio::time::{interval, sleep, sleep_until, Duration, Instant, MissedTickBehavior};
use tracing::{debug, info, trace, warn, Level}; use tracing::{debug, info, trace, warn, Level};
use url::Url; use url::Url;
@ -62,8 +61,7 @@ pub struct Web3Rpc {
/// head_block is only inside an Option so that the "Default" derive works. it will always be set. /// head_block is only inside an Option so that the "Default" derive works. it will always be set.
pub(super) head_block: Option<watch::Sender<Option<Web3ProxyBlock>>>, pub(super) head_block: Option<watch::Sender<Option<Web3ProxyBlock>>>,
/// Track head block latency. /// Track head block latency.
/// RwLock is fine because this isn't updated often and is for monitoring. It is not used on the hot path. pub(super) head_delay: AsyncRwLock<EwmaLatency>,
pub(super) head_delay: RwLock<EwmaLatency>,
/// Track peak request latency /// Track peak request latency
/// peak_latency is only inside an Option so that the "Default" derive works. it will always be set. /// peak_latency is only inside an Option so that the "Default" derive works. it will always be set.
pub(super) peak_latency: Option<PeakEwmaLatency>, pub(super) peak_latency: Option<PeakEwmaLatency>,
@ -1178,8 +1176,8 @@ impl Serialize for Web3Rpc {
where where
S: Serializer, S: Serializer,
{ {
// 3 is the number of fields in the struct. // 14 if we bring head_delay back
let mut state = serializer.serialize_struct("Web3Rpc", 14)?; let mut state = serializer.serialize_struct("Web3Rpc", 13)?;
// the url is excluded because it likely includes private information. just show the name that we use in keys // the url is excluded because it likely includes private information. just show the name that we use in keys
state.serialize_field("name", &self.name)?; state.serialize_field("name", &self.name)?;
@ -1224,10 +1222,10 @@ impl Serialize for Web3Rpc {
&self.active_requests.load(atomic::Ordering::Relaxed), &self.active_requests.load(atomic::Ordering::Relaxed),
)?; )?;
{ // {
let head_delay_ms = self.head_delay.read().latency().as_secs_f32() * 1000.0; // let head_delay_ms = self.head_delay.read().await.latency().as_secs_f32() * 1000.0;
state.serialize_field("head_delay_ms", &(head_delay_ms))?; // state.serialize_field("head_delay_ms", &(head_delay_ms))?;
} // }
{ {
let median_latency_ms = self let median_latency_ms = self
@ -1390,7 +1388,7 @@ mod tests {
backup: false, backup: false,
block_data_limit: block_data_limit.into(), block_data_limit: block_data_limit.into(),
tier: 0, tier: 0,
head_block: RwLock::new(Some(head_block.clone())), head_block: AsyncRwLock::new(Some(head_block.clone())),
}; };
assert!(!x.has_block_data(&0.into())); assert!(!x.has_block_data(&0.into()));