From 79507c92cbd6f60765ff673f09ce6544919973b5 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Mon, 2 May 2022 21:35:49 +0000 Subject: [PATCH] arcswap again --- Cargo.lock | 23 ++++++++---- Cargo.toml | 11 +++--- src/connection.rs | 16 +++++--- src/connections.rs | 91 ++++++++++++++++++++++++++++++---------------- src/main.rs | 1 + 5 files changed, 92 insertions(+), 50 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0bdfe6a2..6278329d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -48,6 +48,12 @@ version = "1.0.57" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "08f9b8508dccb7687a1d6c4ce66b2b0ecef467c94667de27d8d7fe1f8d2a9cdc" +[[package]] +name = "arc-swap" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c5d78ce20460b82d3fa150275ed9d55e21064fc7951177baacf86a145c4a4b1f" + [[package]] name = "argh" version = "0.1.7" @@ -2820,9 +2826,9 @@ checksum = "930c0acf610d3fdb5e2ab6213019aaa04e227ebe9547b0649ba599b16d788bd7" [[package]] name = "serde" -version = "1.0.136" +version = "1.0.137" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce31e24b01e1e524df96f1c2fdd054405f8d7376249a5110886fb4b658484789" +checksum = "61ea8d54c77f8315140a05f4c7237403bf38b72704d031543aa1d16abbf517d1" dependencies = [ "serde_derive", ] @@ -2839,9 +2845,9 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.136" +version = "1.0.137" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "08597e7152fcd306f41838ed3e37be9eaeed2b61c42e2117266a554fab4662f9" +checksum = "1f26faba0c3959972377d3b2d306ee9f71faee9714294e41bb777f83f88578be" dependencies = [ "proc-macro2", "quote", @@ -2850,9 +2856,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.79" +version = "1.0.80" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e8d9fa5c3b304765ce1fd9c4c8a3de2c8db365a5b91be52f186efc675681d95" +checksum = "f972498cf015f7c0746cac89ebe1d6ef10c293b94175a243a2d9442c163d9944" dependencies = [ "itoa", "ryu", @@ -3222,9 +3228,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" [[package]] name = "tokio" -version = "1.18.0" +version = "1.18.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f48b6d60512a392e34dbf7fd456249fd2de3c83669ab642e021903f4015185b" +checksum = "dce653fb475565de9f6fb0614b28bca8df2c430c0cf84bcd9c843f15de5414cc" dependencies = [ "bytes", "libc", @@ -3750,6 +3756,7 @@ name = "web3-proxy" version = "0.1.0" dependencies = [ "anyhow", + "arc-swap", "argh", "derive_more", "ethers", diff --git a/Cargo.toml b/Cargo.toml index 7868db99..a02e4919 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +arc-swap = "1.5.0" argh = "0.1.7" anyhow = "1.0.57" derive_more = "0.99.17" @@ -13,13 +14,13 @@ ethers = { git = "https://github.com/gakonst/ethers-rs", features = ["rustls", " futures = { version = "0.3.21", features = ["thread-pool"] } fxhash = "0.2.1" governor = { version = "0.4.2", features = ["dashmap", "std"] } -tokio = { version = "1.18.0", features = ["full"] } -parking_lot = { version = "0.12.0" } +tokio = { version = "1.18.1", features = ["full"] } +parking_lot = "0.12.0" regex = "1.5.5" reqwest = { version = "0.11.10", features = ["json", "rustls"] } -rustc-hash = { version = "1.0" } -serde = { version = "1.0.136", features = [] } -serde_json = { version = "1.0.79", default-features = false, features = ["alloc"] } +rustc-hash = "1.1.0" +serde = { version = "1.0.137", features = [] } +serde_json = { version = "1.0.80", default-features = false, features = ["alloc"] } tracing = "0.1.34" tracing-subscriber = "0.3.11" url = "2.2.2" diff --git a/src/connection.rs b/src/connection.rs index 44c1af23..2ab661e9 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -160,6 +160,10 @@ impl Web3Connection { self.active_requests.load(atomic::Ordering::Acquire) } + pub fn url(&self) -> &str { + &self.url + } + /// Subscribe to new blocks // #[instrument] pub async fn new_heads( @@ -185,7 +189,7 @@ impl Web3Connection { // TODO: also send something to the provider_tier so it can sort? let old_block_number = self .head_block_number - .swap(block_number, atomic::Ordering::SeqCst); + .swap(block_number, atomic::Ordering::AcqRel); if old_block_number != block_number { info!("new block on {}: {}", self, block_number); @@ -208,7 +212,7 @@ impl Web3Connection { // TODO: also send something to the provider_tier so it can sort? let old_block_number = self .head_block_number - .swap(block_number, atomic::Ordering::SeqCst); + .swap(block_number, atomic::Ordering::AcqRel); if old_block_number != block_number { info!("new block on {}: {}", self, block_number); @@ -257,14 +261,14 @@ impl Web3Connection { }; // TODO: what ordering?! - self.active_requests.fetch_add(1, atomic::Ordering::SeqCst); + self.active_requests.fetch_add(1, atomic::Ordering::AcqRel); Ok(()) } pub fn dec_active_requests(&self) { // TODO: what ordering?! - self.active_requests.fetch_sub(1, atomic::Ordering::SeqCst); + self.active_requests.fetch_sub(1, atomic::Ordering::AcqRel); } } @@ -273,8 +277,8 @@ impl Eq for Web3Connection {} impl Ord for Web3Connection { fn cmp(&self, other: &Self) -> std::cmp::Ordering { // TODO: what atomic ordering?! - let a = self.active_requests.load(atomic::Ordering::SeqCst); - let b = other.active_requests.load(atomic::Ordering::SeqCst); + let a = self.active_requests.load(atomic::Ordering::Acquire); + let b = other.active_requests.load(atomic::Ordering::Acquire); // TODO: how should we include the soft limit? floats are slower than integer math let a = a as f32 / self.soft_limit as f32; diff --git a/src/connections.rs b/src/connections.rs index 91ca53ca..91058c4d 100644 --- a/src/connections.rs +++ b/src/connections.rs @@ -1,11 +1,11 @@ ///! Communicate with a group of web3 providers +use arc_swap::ArcSwap; use derive_more::From; use futures::stream::FuturesUnordered; use futures::StreamExt; use fxhash::FxHashMap; use governor::clock::{QuantaClock, QuantaInstant}; use governor::NotUntil; -use parking_lot::RwLock; use serde_json::value::RawValue; use std::cmp; use std::fmt; @@ -18,7 +18,14 @@ use crate::connection::{JsonRpcForwardedResponse, Web3Connection}; #[derive(Clone, Default)] struct SyncedConnections { head_block_number: u64, - inner: Vec>, + inner: Vec, +} + +impl fmt::Debug for SyncedConnections { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + // TODO: the default formatter takes forever to write. this is too quiet though + f.debug_struct("SyncedConnections").finish_non_exhaustive() + } } impl SyncedConnections { @@ -37,7 +44,8 @@ impl SyncedConnections { pub struct Web3Connections { inner: Vec>, /// TODO: what is the best type for this? Heavy reads with writes every few seconds. When writes happen, there is a burst of them - synced_connections: RwLock, + /// TODO: arcswap was a lot faster, but i think we need a lock for proper logic + synced_connections: ArcSwap, } impl fmt::Debug for Web3Connections { @@ -77,7 +85,7 @@ impl Web3Connections { let connections = Arc::new(Self { inner: connections, - synced_connections: RwLock::new(SyncedConnections::new(num_connections)), + synced_connections: ArcSwap::new(Arc::new(SyncedConnections::new(num_connections))), }); for connection in connections.inner.iter() { @@ -85,12 +93,15 @@ impl Web3Connections { let connection = Arc::clone(connection); let connections = connections.clone(); tokio::spawn(async move { + // TODO: instead of passing Some(connections), pass Some(channel_sender). Then listen on the receiver below to keep local heads up-to-date if let Err(e) = connection.new_heads(Some(connections)).await { warn!("new_heads error: {:?}", e); } }); } + // TODO: listen on a receiver mpsc channel? + Ok(connections) } @@ -177,34 +188,47 @@ impl Web3Connections { rpc: &Arc, new_block: u64, ) -> anyhow::Result<()> { - // TODO: is RwLock the best type for this? + // TODO: is RwLock the best type for this? i don't think so anymore. we probably want to use channels and have a single writer using a left-right or arcswap or something // TODO: start with a read lock? - let mut synced_connections = self.synced_connections.write(); + let synced_connections = self.synced_connections.load(); // should we load new_block here? - match synced_connections.head_block_number.cmp(&new_block) { - cmp::Ordering::Equal => { - // this rpc is synced, but it isn't the first to this block - } - cmp::Ordering::Less => { - // this is a new head block. clear the current synced connections - // TODO: this is too verbose with a bunch of tiers. include the tier - // info!("new head block from {:?}: {}", rpc, new_block); + let mut new_synced_connections: SyncedConnections = + match synced_connections.head_block_number.cmp(&new_block) { + cmp::Ordering::Equal => { + // this rpc is synced, but it isn't the first to this block + (**synced_connections).to_owned() + } + cmp::Ordering::Less => { + // this is a new head block. clear the current synced connections + // TODO: this is too verbose with a bunch of tiers. include the tier + // info!("new head block from {:?}: {}", rpc, new_block); - synced_connections.inner.clear(); + let mut new_synced_connections = SyncedConnections::new(self.inner.len()); - synced_connections.head_block_number = new_block; - } - cmp::Ordering::Greater => { - // not the latest block. return now - return Ok(()); - } - } + // synced_connections.inner.clear(); - let rpc = Arc::clone(rpc); + new_synced_connections.head_block_number = new_block; - synced_connections.inner.push(rpc); + new_synced_connections + } + cmp::Ordering::Greater => { + // not the latest block. return now + return Ok(()); + } + }; + + let rpc_index = self + .inner + .iter() + .position(|x| x.url() == rpc.url()) + .unwrap(); + + new_synced_connections.inner.push(rpc_index); + + self.synced_connections + .swap(Arc::new(new_synced_connections)); Ok(()) } @@ -216,24 +240,29 @@ impl Web3Connections { let mut earliest_not_until = None; // TODO: this clone is probably not the best way to do this - let mut synced_rpcs = self.synced_connections.read().inner.clone(); + let mut synced_rpc_indexes = self.synced_connections.load().inner.clone(); - // i'm pretty sure i did this safely. Hash on Web3Connection just uses the url and not any of the atomics - #[allow(clippy::mutable_key_type)] - let cache: FxHashMap, u32> = synced_rpcs + let cache: FxHashMap = synced_rpc_indexes .iter() - .map(|synced_rpc| (synced_rpc.clone(), synced_rpc.active_requests())) + .map(|synced_index| { + ( + *synced_index, + self.inner.get(*synced_index).unwrap().active_requests(), + ) + }) .collect(); // TODO: i think we might need to load active connections and then - synced_rpcs.sort_unstable_by(|a, b| { + synced_rpc_indexes.sort_unstable_by(|a, b| { let a = cache.get(a).unwrap(); let b = cache.get(b).unwrap(); a.cmp(b) }); - for selected_rpc in synced_rpcs.iter() { + for selected_rpc in synced_rpc_indexes.into_iter() { + let selected_rpc = self.inner.get(selected_rpc).unwrap(); + // increment our connection counter if let Err(not_until) = selected_rpc.try_inc_active_requests() { earliest_possible(&mut earliest_not_until, not_until); diff --git a/src/main.rs b/src/main.rs index 3ca67a93..81b095a2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -191,6 +191,7 @@ impl Web3ProxyApp { return Ok(warp::reply::json(&response)); } Err(None) => { + // TODO: this is too verbose. if there are other servers in other tiers, use those! warn!("No servers in sync!"); } Err(Some(not_until)) => {