From 7a3a3271bb02a1b0e38b15b18559d17eeb15334f Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Wed, 18 May 2022 20:18:01 +0000 Subject: [PATCH] back to arcswap and usizes --- Cargo.lock | 42 +----- web3-proxy/Cargo.toml | 2 +- web3-proxy/src/app.rs | 13 +- web3-proxy/src/connection.rs | 21 +-- web3-proxy/src/connections.rs | 258 +++++++++++++--------------------- 5 files changed, 120 insertions(+), 216 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1988a8e6..b563d087 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -74,6 +74,12 @@ version = "1.0.57" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "08f9b8508dccb7687a1d6c4ce66b2b0ecef467c94667de27d8d7fe1f8d2a9cdc" +[[package]] +name = "arc-swap" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c5d78ce20460b82d3fa150275ed9d55e21064fc7951177baacf86a145c4a4b1f" + [[package]] name = "argh" version = "0.1.7" @@ -1586,19 +1592,6 @@ dependencies = [ "byteorder", ] -[[package]] -name = "generator" -version = "0.6.25" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "061d3be1afec479d56fa3bd182bf966c7999ec175fcfdb87ac14d417241366c6" -dependencies = [ - "cc", - "libc", - "log", - "rustversion", - "winapi", -] - [[package]] name = "generic-array" version = "0.12.4" @@ -2103,16 +2096,6 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" -[[package]] -name = "left-right" -version = "0.11.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2033c37d2bc68719774fa095ccdd46a14b846e9c52ec54107eda92fbf966b203" -dependencies = [ - "loom", - "slab", -] - [[package]] name = "libc" version = "0.2.125" @@ -2155,17 +2138,6 @@ dependencies = [ "cfg-if", ] -[[package]] -name = "loom" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "27a6650b2f722ae8c0e2ebc46d07f80c9923464fc206d962332f1eff83143530" -dependencies = [ - "cfg-if", - "generator", - "scoped-tls", -] - [[package]] name = "mach" version = "0.3.2" @@ -4298,6 +4270,7 @@ name = "web3-proxy" version = "0.1.0" dependencies = [ "anyhow", + "arc-swap", "argh", "console-subscriber", "dashmap", @@ -4308,7 +4281,6 @@ dependencies = [ "futures", "governor", "hashbrown 0.12.1", - "left-right", "linkedhashmap", "parking_lot 0.12.0", "proctitle", diff --git a/web3-proxy/Cargo.toml b/web3-proxy/Cargo.toml index a111441a..698fd267 100644 --- a/web3-proxy/Cargo.toml +++ b/web3-proxy/Cargo.toml @@ -7,6 +7,7 @@ edition = "2021" [dependencies] anyhow = "1.0.57" +arc-swap = "1.5.0" argh = "0.1.7" # axum = "*" # TODO: use this instead of warp? console-subscriber = { version = "0.1.5", features = ["parking_lot"] } @@ -19,7 +20,6 @@ futures = { version = "0.3.21", features = ["thread-pool"] } # TODO: governor has a "futures" and "futures-timer" feature. do we want those? governor = { version = "0.4.2", features = ["dashmap", "std"] } hashbrown = "0.12.1" -left-right = "0.11.4" linkedhashmap = { path = "../linkedhashmap", features = ["inline-more"] } # TODO: parking_lot has an "arc_lock" feature that we might want to use parking_lot = { version = "0.12.0", features = ["deadlock_detection"] } diff --git a/web3-proxy/src/app.rs b/web3-proxy/src/app.rs index 7149e874..6704efb0 100644 --- a/web3-proxy/src/app.rs +++ b/web3-proxy/src/app.rs @@ -231,21 +231,16 @@ impl Web3ProxyApp { // todo: move getting a cache_key or the result into a helper function. then we could have multiple caches // TODO: i think we are maybe getting stuck on this lock. maybe a new block arrives, it tries to write and gets hung up on something. then this can't proceed - trace!("{:?} waiting for best_block_hash", request); + trace!("{:?} waiting for head_block_hash", request); - let best_block_hash = self - .balanced_rpcs - .get_synced_rpcs() - .enter() - .map(|x| *x.get_head_block_hash()) - .unwrap(); + let head_block_hash = self.balanced_rpcs.get_head_block_hash(); - trace!("{:?} best_block_hash {}", request, best_block_hash); + trace!("{:?} head_block_hash {}", request, head_block_hash); // TODO: building this cache key is slow and its large, but i don't see a better way right now // TODO: inspect the params and see if a block is specified. if so, use that block number instead of current_block let cache_key = ( - best_block_hash, + head_block_hash, request.method.clone(), request.params.clone().map(|x| x.to_string()), ); diff --git a/web3-proxy/src/connection.rs b/web3-proxy/src/connection.rs index 54f2130b..7a947e3d 100644 --- a/web3-proxy/src/connection.rs +++ b/web3-proxy/src/connection.rs @@ -98,7 +98,8 @@ impl Web3Connection { #[instrument(skip_all)] pub async fn reconnect( self: &Arc, - block_sender: &flume::Sender<(u64, H256, Arc)>, + block_sender: &flume::Sender<(u64, H256, usize)>, + rpc_id: usize, ) -> anyhow::Result<()> { // websocket doesn't need the http client let http_client = None; @@ -108,7 +109,7 @@ impl Web3Connection { // TODO: tell the block subscriber that we are at 0 block_sender - .send_async((0, H256::default(), self.clone())) + .send_async((0, H256::default(), rpc_id)) .await?; let new_provider = Web3Provider::from_str(&self.url, http_client).await?; @@ -203,7 +204,8 @@ impl Web3Connection { async fn send_block( self: &Arc, block: Result, ProviderError>, - block_sender: &flume::Sender<(u64, H256, Arc)>, + block_sender: &flume::Sender<(u64, H256, usize)>, + rpc_id: usize, ) { match block { Ok(block) => { @@ -212,7 +214,7 @@ impl Web3Connection { // TODO: i'm pretty sure we don't need send_async, but double check block_sender - .send_async((block_number, block_hash, self.clone())) + .send_async((block_number, block_hash, rpc_id)) .await .unwrap(); } @@ -227,7 +229,8 @@ impl Web3Connection { #[instrument(skip_all)] pub async fn subscribe_new_heads( self: Arc, - block_sender: flume::Sender<(u64, H256, Arc)>, + rpc_id: usize, + block_sender: flume::Sender<(u64, H256, usize)>, reconnect: bool, ) -> anyhow::Result<()> { loop { @@ -272,7 +275,7 @@ impl Web3Connection { last_hash = new_hash; } - self.send_block(block, &block_sender).await; + self.send_block(block, &block_sender, rpc_id).await; } Err(e) => { warn!("Failed getting latest block from {}: {:?}", self, e); @@ -300,7 +303,7 @@ impl Web3Connection { .request("eth_getBlockByNumber", ("latest", false)) .await; - self.send_block(block, &block_sender).await; + self.send_block(block, &block_sender, rpc_id).await; // TODO: what should this timeout be? needs to be larger than worst case block time // TODO: although reconnects will make this less of an issue @@ -309,7 +312,7 @@ impl Web3Connection { .await { Ok(Some(new_block)) => { - self.send_block(Ok(new_block), &block_sender).await; + self.send_block(Ok(new_block), &block_sender, rpc_id).await; // TODO: really not sure about this task::yield_now().await; @@ -334,7 +337,7 @@ impl Web3Connection { warn!("new heads subscription exited. reconnecting in 10 seconds..."); sleep(Duration::from_secs(10)).await; - self.reconnect(&block_sender).await?; + self.reconnect(&block_sender, rpc_id).await?; } else { break; } diff --git a/web3-proxy/src/connections.rs b/web3-proxy/src/connections.rs index 023d713a..8725552b 100644 --- a/web3-proxy/src/connections.rs +++ b/web3-proxy/src/connections.rs @@ -1,4 +1,5 @@ ///! Load balanced communication with a group of web3 providers +use arc_swap::ArcSwap; use derive_more::From; use ethers::prelude::H256; use futures::future::join_all; @@ -6,13 +7,10 @@ use futures::stream::FuturesUnordered; use futures::StreamExt; use governor::clock::{QuantaClock, QuantaInstant}; use governor::NotUntil; -use hashbrown::HashMap; -use left_right::{Absorb, ReadHandleFactory, WriteHandle}; use serde_json::value::RawValue; use std::cmp; use std::fmt; use std::sync::Arc; -use tokio::sync::Mutex; use tokio::task; use tracing::Instrument; use tracing::{info, info_span, instrument, trace, warn}; @@ -20,11 +18,11 @@ use tracing::{info, info_span, instrument, trace, warn}; use crate::config::Web3ConnectionConfig; use crate::connection::{ActiveRequestHandle, Web3Connection}; -#[derive(Clone, Default)] -pub struct SyncedConnections { - head_block_number: u64, +#[derive(Clone)] +struct SyncedConnections { + head_block_num: u64, head_block_hash: H256, - inner: Vec>, + inner: Vec, } impl fmt::Debug for SyncedConnections { @@ -35,111 +33,24 @@ impl fmt::Debug for SyncedConnections { } impl SyncedConnections { + fn new(max_connections: usize) -> Self { + Self { + head_block_num: 0, + head_block_hash: Default::default(), + inner: Vec::with_capacity(max_connections), + } + } + pub fn get_head_block_hash(&self) -> &H256 { &self.head_block_hash } - - fn update( - &mut self, - log: bool, - new_block_num: u64, - new_block_hash: H256, - rpc: Arc, - ) { - // TODO: double check this logic - match new_block_num.cmp(&self.head_block_number) { - cmp::Ordering::Greater => { - // the rpc's newest block is the new overall best block - if log { - info!("new head {} from {}", new_block_num, rpc); - } - - self.inner.clear(); - self.inner.push(rpc); - - self.head_block_number = new_block_num; - self.head_block_hash = new_block_hash; - } - cmp::Ordering::Equal => { - if new_block_hash != self.head_block_hash { - // same height, but different chain - // TODO: anything else we should do? set some "nextSafeBlockHeight" to delay sending transactions? - // TODO: sometimes a node changes its block. if that happens, a new block is probably right behind this one - if log { - warn!( - "chain is forked at #{}! {} has {}. {} rpcs have {}", - new_block_num, - rpc, - new_block_hash, - self.inner.len(), - self.head_block_hash - ); - } - return; - } - - // do not clear synced_connections. - // we just want to add this rpc to the end - self.inner.push(rpc); - } - cmp::Ordering::Less => { - // this isn't the best block in the tier. don't do anything - return; - } - } - - // TODO: better log - if log { - trace!("Now synced: {:?}", self.inner); - } else { - trace!("Now synced #2: {:?}", self.inner); - } - } -} - -enum SyncedConnectionsOp { - SyncedConnectionsUpdate(u64, H256, Arc), - SyncedConnectionsCapacity(usize), -} - -impl Absorb for SyncedConnections { - fn absorb_first(&mut self, operation: &mut SyncedConnectionsOp, _: &Self) { - match operation { - SyncedConnectionsOp::SyncedConnectionsUpdate(new_block_number, new_block_hash, rpc) => { - self.update(true, *new_block_number, *new_block_hash, rpc.clone()); - } - SyncedConnectionsOp::SyncedConnectionsCapacity(capacity) => { - self.inner = Vec::with_capacity(*capacity); - } - } - } - - fn absorb_second(&mut self, operation: SyncedConnectionsOp, _: &Self) { - match operation { - SyncedConnectionsOp::SyncedConnectionsUpdate(new_block_number, new_block_hash, rpc) => { - // TODO: disable logging on this one? - self.update(false, new_block_number, new_block_hash, rpc); - } - SyncedConnectionsOp::SyncedConnectionsCapacity(capacity) => { - self.inner = Vec::with_capacity(capacity); - } - } - } - - fn drop_first(self: Box) {} - - fn sync_with(&mut self, first: &Self) { - // TODO: not sure about this - *self = first.clone() - } } /// A collection of web3 connections. Sends requests either the current best server or all servers. #[derive(From)] pub struct Web3Connections { inner: Vec>, - synced_connections_reader: ReadHandleFactory, - synced_connections_writer: Mutex>, + synced_connections: ArcSwap, } impl fmt::Debug for Web3Connections { @@ -180,20 +91,11 @@ impl Web3Connections { )); } - let (mut synced_connections_writer, synced_connections_reader) = - left_right::new::(); - - synced_connections_writer.append(SyncedConnectionsOp::SyncedConnectionsCapacity( - num_connections, - )); - trace!("publishing synced connections"); - synced_connections_writer.publish(); - trace!("published synced connections"); + let synced_connections = SyncedConnections::new(num_connections); let connections = Arc::new(Self { inner: connections, - synced_connections_reader: synced_connections_reader.factory(), - synced_connections_writer: Mutex::new(synced_connections_writer), + synced_connections: ArcSwap::new(Arc::new(synced_connections)), }); Ok(connections) @@ -204,7 +106,7 @@ impl Web3Connections { let mut handles = vec![]; - for connection in self.inner.iter() { + for (rpc_id, connection) in self.inner.iter().enumerate() { // subscribe to new heads in a spawned future // TODO: channel instead. then we can have one future with write access to a left-right? let connection = Arc::clone(connection); @@ -220,7 +122,7 @@ impl Web3Connections { // TODO: instead of passing Some(connections), pass Some(channel_sender). Then listen on the receiver below to keep local heads up-to-date // TODO: proper spann connection - .subscribe_new_heads(block_sender.clone(), true) + .subscribe_new_heads(rpc_id, block_sender.clone(), true) .instrument(tracing::info_span!("url")) .await }); @@ -238,8 +140,8 @@ impl Web3Connections { join_all(handles).await; } - pub fn get_synced_rpcs(&self) -> left_right::ReadHandle { - self.synced_connections_reader.handle() + pub fn get_head_block_hash(&self) -> H256 { + *self.synced_connections.load().get_head_block_hash() } /// Send the same request to all the handles. Returning the fastest successful result. @@ -309,37 +211,77 @@ impl Web3Connections { } /// TODO: possible dead lock here. investigate more. probably refactor + /// TODO: move parts of this onto SyncedConnections? #[instrument(skip_all)] async fn update_synced_rpcs( &self, - block_receiver: flume::Receiver<(u64, H256, Arc)>, + block_receiver: flume::Receiver<(u64, H256, usize)>, ) -> anyhow::Result<()> { - let mut synced_connections_writer = self.synced_connections_writer.lock().await; + let max_connections = self.inner.len(); - while let Ok((new_block_num, new_block_hash, rpc)) = block_receiver.recv_async().await { + let mut connection_states: Vec<(u64, H256)> = Vec::with_capacity(max_connections); + let mut head_block_hash = H256::zero(); + let mut head_block_num = 0u64; + + let mut synced_connections = SyncedConnections::new(max_connections); + + while let Ok((new_block_num, new_block_hash, rpc_id)) = block_receiver.recv_async().await { if new_block_num == 0 { - warn!("{} is still syncing", rpc); - continue; + // TODO: show the actual rpc url? + warn!("rpc #{} is still syncing", rpc_id); } - let span = info_span!("new_block_num", new_block_num,); + // TODO: span with rpc in it, too + let span = info_span!("new_block", new_block_num); let _enter = span.enter(); - synced_connections_writer.append(SyncedConnectionsOp::SyncedConnectionsUpdate( - new_block_num, - new_block_hash, - rpc, - )); + connection_states.insert(rpc_id, (new_block_num, new_block_hash)); - // TODO: only publish when the second block arrives? - // TODO: use spans properly - trace!("publishing synced connections for block {}", new_block_num,); - synced_connections_writer.publish(); - trace!( - "published synced connections for block {} from {}", - new_block_num, - "some rpc" - ); + // TODO: do something to update the synced blocks + match new_block_num.cmp(&head_block_num) { + cmp::Ordering::Greater => { + // the rpc's newest block is the new overall best block + info!("new head from #{}", rpc_id); + + synced_connections.inner.clear(); + synced_connections.inner.push(rpc_id); + + head_block_num = new_block_num; + head_block_hash = new_block_hash; + } + cmp::Ordering::Equal => { + if new_block_hash != head_block_hash { + // same height, but different chain + // TODO: anything else we should do? set some "nextSafeBlockHeight" to delay sending transactions? + // TODO: sometimes a node changes its block. if that happens, a new block is probably right behind this one + warn!( + "chain is forked at #{}! {} has {}. {} rpcs have {}", + new_block_num, + rpc_id, + new_block_hash, + synced_connections.inner.len(), + head_block_hash + ); + // TODO: don't continue. check to see which head block is more popular! + continue; + } + + // do not clear synced_connections. + // we just want to add this rpc to the end + synced_connections.inner.push(rpc_id); + } + cmp::Ordering::Less => { + // this isn't the best block in the tier. don't do anything + continue; + } + } + + // the synced connections have changed + let new_data = Arc::new(synced_connections.clone()); + + // TODO: only do this if there are 2 nodes synced to this block? + // do the arcswap + self.synced_connections.swap(new_data); } // TODO: if there was an error, we should return it @@ -355,37 +297,27 @@ impl Web3Connections { ) -> Result>> { let mut earliest_not_until = None; - let mut synced_rpc_arcs = self - .synced_connections_reader - .handle() - .enter() - .map(|x| x.inner.clone()) - .unwrap(); + let mut synced_rpc_indexes = self.synced_connections.load().inner.clone(); - // TODO: better key! - let sort_cache: HashMap = synced_rpc_arcs + let sort_cache: Vec<(f32, u32)> = synced_rpc_indexes .iter() - .map(|connection| { - // TODO: better key! - let key = format!("{}", connection); - let active_requests = connection.active_requests(); - let soft_limit = connection.soft_limit(); + .map(|rpc_id| { + let rpc = self.inner.get(*rpc_id).unwrap(); + + let active_requests = rpc.active_requests(); + let soft_limit = rpc.soft_limit(); // TODO: how should we include the soft limit? floats are slower than integer math let utilization = active_requests as f32 / soft_limit as f32; - (key, (utilization, soft_limit)) + (utilization, soft_limit) }) .collect(); // TODO: i think we might need to load active connections and then - synced_rpc_arcs.sort_unstable_by(|a, b| { - // TODO: better keys - let a_key = format!("{}", a); - let b_key = format!("{}", b); - - let (a_utilization, a_soft_limit) = sort_cache.get(&a_key).unwrap(); - let (b_utilization, b_soft_limit) = sort_cache.get(&b_key).unwrap(); + synced_rpc_indexes.sort_unstable_by(|a, b| { + let (a_utilization, a_soft_limit) = sort_cache.get(*a).unwrap(); + let (b_utilization, b_soft_limit) = sort_cache.get(*b).unwrap(); // TODO: i'm comparing floats. crap match a_utilization @@ -397,14 +329,16 @@ impl Web3Connections { } }); - for selected_rpc in synced_rpc_arcs.into_iter() { + for rpc_id in synced_rpc_indexes.into_iter() { + let rpc = self.inner.get(rpc_id).unwrap(); + // increment our connection counter - match selected_rpc.try_request_handle() { + match rpc.try_request_handle() { Err(not_until) => { earliest_possible(&mut earliest_not_until, not_until); } Ok(handle) => { - trace!("next server on {:?}: {:?}", self, selected_rpc); + trace!("next server on {:?}: {:?}", self, rpc_id); return Ok(handle); } }