diff --git a/Cargo.lock b/Cargo.lock index 714c1b33..3cf79aeb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3759,7 +3759,6 @@ dependencies = [ "arc-swap", "argh", "atomic-counter", - "dashmap", "derive_more", "ethers", "futures", diff --git a/Cargo.toml b/Cargo.toml index 2ad455f7..79ad55bc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,7 +10,6 @@ arc-swap = "1.5.0" argh = "0.1.7" anyhow = "1.0.57" atomic-counter = "1.0.1" -dashmap = "5.2.0" derive_more = "0.99" ethers = { git = "https://github.com/gakonst/ethers-rs", features = ["rustls", "ws"] } futures = { version = "0.3.21", features = ["thread-pool"] } diff --git a/src/block_watcher.rs b/src/block_watcher.rs index 11a5ebe2..b88d3894 100644 --- a/src/block_watcher.rs +++ b/src/block_watcher.rs @@ -1,7 +1,7 @@ ///! Track the head block of all the web3 providers -use dashmap::DashMap; use ethers::prelude::{Block, TxHash}; use std::cmp; +use std::collections::HashMap; use std::fmt; use std::sync::atomic::{self, AtomicU64}; use std::sync::Arc; @@ -26,7 +26,8 @@ pub struct BlockWatcher { sender: BlockWatcherSender, /// this Mutex is locked over awaits, so we want an async lock receiver: Mutex, - block_numbers: DashMap, + // TODO: better key + block_numbers: HashMap, head_block_number: AtomicU64, } @@ -38,13 +39,15 @@ impl fmt::Debug for BlockWatcher { } impl BlockWatcher { - pub fn new() -> Self { + pub fn new(rpcs: Vec) -> Self { let (sender, receiver) = mpsc::unbounded_channel(); + let block_numbers = rpcs.into_iter().map(|rpc| (rpc, 0.into())).collect(); + Self { sender, receiver: Mutex::new(receiver), - block_numbers: Default::default(), + block_numbers, head_block_number: Default::default(), } } @@ -55,12 +58,14 @@ impl BlockWatcher { pub fn sync_status(&self, rpc: &str, allowed_lag: u64) -> SyncStatus { match ( - self.head_block_number.load(atomic::Ordering::SeqCst), + self.head_block_number.load(atomic::Ordering::Acquire), self.block_numbers.get(rpc), ) { (0, _) => SyncStatus::Unknown, (_, None) => SyncStatus::Unknown, (head_block_number, Some(rpc_block_number)) => { + let rpc_block_number = rpc_block_number.load(atomic::Ordering::Acquire); + match head_block_number.cmp(&rpc_block_number) { cmp::Ordering::Equal => SyncStatus::Synced(0), cmp::Ordering::Greater => { @@ -70,7 +75,7 @@ impl BlockWatcher { } cmp::Ordering::Less => { // allow being some behind - let lag = head_block_number - *rpc_block_number; + let lag = head_block_number - rpc_block_number; if lag <= allowed_lag { SyncStatus::Synced(lag) @@ -94,10 +99,12 @@ impl BlockWatcher { { if let Some(rpc_block_number) = self.block_numbers.get(&rpc) { + let rpc_block_number = rpc_block_number.load(atomic::Ordering::Acquire); + // if we already have this block height // this probably own't happen with websockets, but is likely with polling against http rpcs // TODO: should we compare more than just height? hash too? - if *rpc_block_number == new_block_number { + if rpc_block_number == new_block_number { continue; } } @@ -110,14 +117,17 @@ impl BlockWatcher { // save the block for this rpc // TODO: store the actual chain as a graph and then have self.blocks point to that? - self.block_numbers.insert(rpc.clone(), new_block_number); + self.block_numbers + .get(&rpc) + .unwrap() + .swap(new_block_number, atomic::Ordering::Release); - let head_number = self.head_block_number.load(atomic::Ordering::SeqCst); + let head_number = self.head_block_number.load(atomic::Ordering::Acquire); let label_slow_heads = if head_number == 0 { // first block seen self.head_block_number - .swap(new_block_number, atomic::Ordering::SeqCst); + .swap(new_block_number, atomic::Ordering::AcqRel); ", +".to_string() } else { // TODO: what if they have the same number but different hashes? @@ -130,7 +140,7 @@ impl BlockWatcher { cmp::Ordering::Greater => { // new_block is the new head_block self.head_block_number - .swap(new_block_number, atomic::Ordering::SeqCst); + .swap(new_block_number, atomic::Ordering::AcqRel); ", +".to_string() } cmp::Ordering::Less => { diff --git a/src/main.rs b/src/main.rs index ce849a00..028f6ad4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -57,6 +57,22 @@ impl Web3ProxyApp { ) -> anyhow::Result { let clock = QuantaClock::default(); + let mut rpcs = vec![]; + for balanced_rpc_tier in balanced_rpc_tiers.iter() { + for rpc_data in balanced_rpc_tier { + let rpc = rpc_data.0.to_string(); + + rpcs.push(rpc); + } + } + for rpc_data in private_rpcs.iter() { + let rpc = rpc_data.0.to_string(); + + rpcs.push(rpc); + } + + let block_watcher = Arc::new(BlockWatcher::new(rpcs)); + // make a http shared client // TODO: how should we configure the connection pool? // TODO: 5 minutes is probably long enough. unlimited is a bad idea if something is wrong with the remote server @@ -65,17 +81,6 @@ impl Web3ProxyApp { .user_agent(APP_USER_AGENT) .build()?; - let block_watcher = Arc::new(BlockWatcher::new()); - - let (new_block_sender, mut new_block_receiver) = watch::channel::("".to_string()); - - { - // TODO: spawn this later? - // spawn a future for the block_watcher - let block_watcher = block_watcher.clone(); - tokio::spawn(async move { block_watcher.run(new_block_sender).await }); - } - let balanced_rpc_tiers = Arc::new( future::join_all(balanced_rpc_tiers.into_iter().map(|balanced_rpc_tier| { Web3ProviderTier::try_new( @@ -105,6 +110,15 @@ impl Web3ProxyApp { )) }; + let (new_block_sender, mut new_block_receiver) = watch::channel::("".to_string()); + + { + // TODO: spawn this later? + // spawn a future for the block_watcher + let block_watcher = block_watcher.clone(); + tokio::spawn(async move { block_watcher.run(new_block_sender).await }); + } + { // spawn a future for sorting our synced rpcs // TODO: spawn this later? @@ -368,7 +382,7 @@ impl Web3ProxyApp { let response = provider.request(&method, params).await; - connections.get_mut(&rpc).unwrap().dec_active_requests(); + connections.get(&rpc).unwrap().dec_active_requests(); let response = response?; diff --git a/src/provider.rs b/src/provider.rs index f816e9af..e0a9953e 100644 --- a/src/provider.rs +++ b/src/provider.rs @@ -8,6 +8,7 @@ use governor::state::{InMemoryState, NotKeyed}; use governor::NotUntil; use governor::RateLimiter; use std::fmt; +use std::sync::atomic::{self, AtomicUsize}; use std::time::Duration; use std::{cmp::Ordering, sync::Arc}; use tokio::time::interval; @@ -91,7 +92,7 @@ impl Web3Provider { #[derive(Debug)] pub struct Web3Connection { /// keep track of currently open requests. We sort on this - active_requests: u32, + active_requests: AtomicUsize, provider: Arc, ratelimiter: Option, } @@ -146,13 +147,13 @@ impl Web3Connection { }); Ok(Web3Connection { - active_requests: 0, + active_requests: Default::default(), provider, ratelimiter, }) } - pub fn try_inc_active_requests(&mut self) -> Result<(), NotUntil> { + pub fn try_inc_active_requests(&self) -> Result<(), NotUntil> { // check rate limits if let Some(ratelimiter) = self.ratelimiter.as_ref() { match ratelimiter.check() { @@ -170,13 +171,15 @@ impl Web3Connection { } }; - self.active_requests += 1; + // TODO: what ordering?! + self.active_requests.fetch_add(1, atomic::Ordering::AcqRel); Ok(()) } - pub fn dec_active_requests(&mut self) { - self.active_requests -= 1; + pub fn dec_active_requests(&self) { + // TODO: what ordering?! + self.active_requests.fetch_sub(1, atomic::Ordering::AcqRel); } } @@ -184,7 +187,10 @@ impl Eq for Web3Connection {} impl Ord for Web3Connection { fn cmp(&self, other: &Self) -> std::cmp::Ordering { - self.active_requests.cmp(&other.active_requests) + // TODO: what atomic ordering?! + self.active_requests + .load(atomic::Ordering::Acquire) + .cmp(&other.active_requests.load(atomic::Ordering::Acquire)) } } @@ -197,6 +203,8 @@ impl PartialOrd for Web3Connection { /// note that this is just comparing the active requests. two providers with different rpc urls are equal! impl PartialEq for Web3Connection { fn eq(&self, other: &Self) -> bool { - self.active_requests == other.active_requests + // TODO: what ordering?! + self.active_requests.load(atomic::Ordering::Acquire) + == other.active_requests.load(atomic::Ordering::Acquire) } } diff --git a/src/provider_tiers.rs b/src/provider_tiers.rs index 0254afc5..2026721d 100644 --- a/src/provider_tiers.rs +++ b/src/provider_tiers.rs @@ -1,6 +1,5 @@ ///! Communicate with groups of web3 providers use arc_swap::ArcSwap; -use dashmap::DashMap; use governor::clock::{QuantaClock, QuantaInstant}; use governor::NotUntil; use std::cmp; @@ -13,7 +12,7 @@ use crate::block_watcher::{BlockWatcher, SyncStatus}; use crate::provider::Web3Connection; // TODO: move the rate limiter into the connection -pub type Web3ConnectionMap = DashMap; +pub type Web3ConnectionMap = HashMap; /// Load balance to the rpc pub struct Web3ProviderTier { @@ -39,7 +38,7 @@ impl Web3ProviderTier { clock: &QuantaClock, ) -> anyhow::Result { let mut rpcs: Vec = vec![]; - let connections = DashMap::new(); + let mut connections = HashMap::new(); for (s, limit) in servers.into_iter() { rpcs.push(s.to_string()); @@ -144,7 +143,7 @@ impl Web3ProviderTier { self.connections .get(a) .unwrap() - .cmp(&self.connections.get(b).unwrap()) + .cmp(self.connections.get(b).unwrap()) }); // filter out @@ -166,7 +165,7 @@ impl Web3ProviderTier { // increment our connection counter if let Err(not_until) = self .connections - .get_mut(selected_rpc) + .get(selected_rpc) .unwrap() .try_inc_active_requests() { @@ -204,7 +203,7 @@ impl Web3ProviderTier { // TODO: share code with next_upstream_server if let Err(not_until) = self .connections - .get_mut(selected_rpc) + .get(selected_rpc) .unwrap() .try_inc_active_requests() {