arcswap again

This commit is contained in:
Bryan Stitt 2022-05-02 21:35:49 +00:00
parent 14381c55c6
commit 79507c92cb
5 changed files with 92 additions and 50 deletions

23
Cargo.lock generated

@ -48,6 +48,12 @@ version = "1.0.57"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "08f9b8508dccb7687a1d6c4ce66b2b0ecef467c94667de27d8d7fe1f8d2a9cdc"
[[package]]
name = "arc-swap"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c5d78ce20460b82d3fa150275ed9d55e21064fc7951177baacf86a145c4a4b1f"
[[package]]
name = "argh"
version = "0.1.7"
@ -2820,9 +2826,9 @@ checksum = "930c0acf610d3fdb5e2ab6213019aaa04e227ebe9547b0649ba599b16d788bd7"
[[package]]
name = "serde"
version = "1.0.136"
version = "1.0.137"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ce31e24b01e1e524df96f1c2fdd054405f8d7376249a5110886fb4b658484789"
checksum = "61ea8d54c77f8315140a05f4c7237403bf38b72704d031543aa1d16abbf517d1"
dependencies = [
"serde_derive",
]
@ -2839,9 +2845,9 @@ dependencies = [
[[package]]
name = "serde_derive"
version = "1.0.136"
version = "1.0.137"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "08597e7152fcd306f41838ed3e37be9eaeed2b61c42e2117266a554fab4662f9"
checksum = "1f26faba0c3959972377d3b2d306ee9f71faee9714294e41bb777f83f88578be"
dependencies = [
"proc-macro2",
"quote",
@ -2850,9 +2856,9 @@ dependencies = [
[[package]]
name = "serde_json"
version = "1.0.79"
version = "1.0.80"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e8d9fa5c3b304765ce1fd9c4c8a3de2c8db365a5b91be52f186efc675681d95"
checksum = "f972498cf015f7c0746cac89ebe1d6ef10c293b94175a243a2d9442c163d9944"
dependencies = [
"itoa",
"ryu",
@ -3222,9 +3228,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c"
[[package]]
name = "tokio"
version = "1.18.0"
version = "1.18.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0f48b6d60512a392e34dbf7fd456249fd2de3c83669ab642e021903f4015185b"
checksum = "dce653fb475565de9f6fb0614b28bca8df2c430c0cf84bcd9c843f15de5414cc"
dependencies = [
"bytes",
"libc",
@ -3750,6 +3756,7 @@ name = "web3-proxy"
version = "0.1.0"
dependencies = [
"anyhow",
"arc-swap",
"argh",
"derive_more",
"ethers",

@ -6,6 +6,7 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
arc-swap = "1.5.0"
argh = "0.1.7"
anyhow = "1.0.57"
derive_more = "0.99.17"
@ -13,13 +14,13 @@ ethers = { git = "https://github.com/gakonst/ethers-rs", features = ["rustls", "
futures = { version = "0.3.21", features = ["thread-pool"] }
fxhash = "0.2.1"
governor = { version = "0.4.2", features = ["dashmap", "std"] }
tokio = { version = "1.18.0", features = ["full"] }
parking_lot = { version = "0.12.0" }
tokio = { version = "1.18.1", features = ["full"] }
parking_lot = "0.12.0"
regex = "1.5.5"
reqwest = { version = "0.11.10", features = ["json", "rustls"] }
rustc-hash = { version = "1.0" }
serde = { version = "1.0.136", features = [] }
serde_json = { version = "1.0.79", default-features = false, features = ["alloc"] }
rustc-hash = "1.1.0"
serde = { version = "1.0.137", features = [] }
serde_json = { version = "1.0.80", default-features = false, features = ["alloc"] }
tracing = "0.1.34"
tracing-subscriber = "0.3.11"
url = "2.2.2"

@ -160,6 +160,10 @@ impl Web3Connection {
self.active_requests.load(atomic::Ordering::Acquire)
}
pub fn url(&self) -> &str {
&self.url
}
/// Subscribe to new blocks
// #[instrument]
pub async fn new_heads(
@ -185,7 +189,7 @@ impl Web3Connection {
// TODO: also send something to the provider_tier so it can sort?
let old_block_number = self
.head_block_number
.swap(block_number, atomic::Ordering::SeqCst);
.swap(block_number, atomic::Ordering::AcqRel);
if old_block_number != block_number {
info!("new block on {}: {}", self, block_number);
@ -208,7 +212,7 @@ impl Web3Connection {
// TODO: also send something to the provider_tier so it can sort?
let old_block_number = self
.head_block_number
.swap(block_number, atomic::Ordering::SeqCst);
.swap(block_number, atomic::Ordering::AcqRel);
if old_block_number != block_number {
info!("new block on {}: {}", self, block_number);
@ -257,14 +261,14 @@ impl Web3Connection {
};
// TODO: what ordering?!
self.active_requests.fetch_add(1, atomic::Ordering::SeqCst);
self.active_requests.fetch_add(1, atomic::Ordering::AcqRel);
Ok(())
}
pub fn dec_active_requests(&self) {
// TODO: what ordering?!
self.active_requests.fetch_sub(1, atomic::Ordering::SeqCst);
self.active_requests.fetch_sub(1, atomic::Ordering::AcqRel);
}
}
@ -273,8 +277,8 @@ impl Eq for Web3Connection {}
impl Ord for Web3Connection {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
// TODO: what atomic ordering?!
let a = self.active_requests.load(atomic::Ordering::SeqCst);
let b = other.active_requests.load(atomic::Ordering::SeqCst);
let a = self.active_requests.load(atomic::Ordering::Acquire);
let b = other.active_requests.load(atomic::Ordering::Acquire);
// TODO: how should we include the soft limit? floats are slower than integer math
let a = a as f32 / self.soft_limit as f32;

@ -1,11 +1,11 @@
///! Communicate with a group of web3 providers
use arc_swap::ArcSwap;
use derive_more::From;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use fxhash::FxHashMap;
use governor::clock::{QuantaClock, QuantaInstant};
use governor::NotUntil;
use parking_lot::RwLock;
use serde_json::value::RawValue;
use std::cmp;
use std::fmt;
@ -18,7 +18,14 @@ use crate::connection::{JsonRpcForwardedResponse, Web3Connection};
#[derive(Clone, Default)]
struct SyncedConnections {
head_block_number: u64,
inner: Vec<Arc<Web3Connection>>,
inner: Vec<usize>,
}
impl fmt::Debug for SyncedConnections {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// TODO: the default formatter takes forever to write. this is too quiet though
f.debug_struct("SyncedConnections").finish_non_exhaustive()
}
}
impl SyncedConnections {
@ -37,7 +44,8 @@ impl SyncedConnections {
pub struct Web3Connections {
inner: Vec<Arc<Web3Connection>>,
/// TODO: what is the best type for this? Heavy reads with writes every few seconds. When writes happen, there is a burst of them
synced_connections: RwLock<SyncedConnections>,
/// TODO: arcswap was a lot faster, but i think we need a lock for proper logic
synced_connections: ArcSwap<SyncedConnections>,
}
impl fmt::Debug for Web3Connections {
@ -77,7 +85,7 @@ impl Web3Connections {
let connections = Arc::new(Self {
inner: connections,
synced_connections: RwLock::new(SyncedConnections::new(num_connections)),
synced_connections: ArcSwap::new(Arc::new(SyncedConnections::new(num_connections))),
});
for connection in connections.inner.iter() {
@ -85,12 +93,15 @@ impl Web3Connections {
let connection = Arc::clone(connection);
let connections = connections.clone();
tokio::spawn(async move {
// TODO: instead of passing Some(connections), pass Some(channel_sender). Then listen on the receiver below to keep local heads up-to-date
if let Err(e) = connection.new_heads(Some(connections)).await {
warn!("new_heads error: {:?}", e);
}
});
}
// TODO: listen on a receiver mpsc channel?
Ok(connections)
}
@ -177,34 +188,47 @@ impl Web3Connections {
rpc: &Arc<Web3Connection>,
new_block: u64,
) -> anyhow::Result<()> {
// TODO: is RwLock the best type for this?
// TODO: is RwLock the best type for this? i don't think so anymore. we probably want to use channels and have a single writer using a left-right or arcswap or something
// TODO: start with a read lock?
let mut synced_connections = self.synced_connections.write();
let synced_connections = self.synced_connections.load();
// should we load new_block here?
match synced_connections.head_block_number.cmp(&new_block) {
cmp::Ordering::Equal => {
// this rpc is synced, but it isn't the first to this block
}
cmp::Ordering::Less => {
// this is a new head block. clear the current synced connections
// TODO: this is too verbose with a bunch of tiers. include the tier
// info!("new head block from {:?}: {}", rpc, new_block);
let mut new_synced_connections: SyncedConnections =
match synced_connections.head_block_number.cmp(&new_block) {
cmp::Ordering::Equal => {
// this rpc is synced, but it isn't the first to this block
(**synced_connections).to_owned()
}
cmp::Ordering::Less => {
// this is a new head block. clear the current synced connections
// TODO: this is too verbose with a bunch of tiers. include the tier
// info!("new head block from {:?}: {}", rpc, new_block);
synced_connections.inner.clear();
let mut new_synced_connections = SyncedConnections::new(self.inner.len());
synced_connections.head_block_number = new_block;
}
cmp::Ordering::Greater => {
// not the latest block. return now
return Ok(());
}
}
// synced_connections.inner.clear();
let rpc = Arc::clone(rpc);
new_synced_connections.head_block_number = new_block;
synced_connections.inner.push(rpc);
new_synced_connections
}
cmp::Ordering::Greater => {
// not the latest block. return now
return Ok(());
}
};
let rpc_index = self
.inner
.iter()
.position(|x| x.url() == rpc.url())
.unwrap();
new_synced_connections.inner.push(rpc_index);
self.synced_connections
.swap(Arc::new(new_synced_connections));
Ok(())
}
@ -216,24 +240,29 @@ impl Web3Connections {
let mut earliest_not_until = None;
// TODO: this clone is probably not the best way to do this
let mut synced_rpcs = self.synced_connections.read().inner.clone();
let mut synced_rpc_indexes = self.synced_connections.load().inner.clone();
// i'm pretty sure i did this safely. Hash on Web3Connection just uses the url and not any of the atomics
#[allow(clippy::mutable_key_type)]
let cache: FxHashMap<Arc<Web3Connection>, u32> = synced_rpcs
let cache: FxHashMap<usize, u32> = synced_rpc_indexes
.iter()
.map(|synced_rpc| (synced_rpc.clone(), synced_rpc.active_requests()))
.map(|synced_index| {
(
*synced_index,
self.inner.get(*synced_index).unwrap().active_requests(),
)
})
.collect();
// TODO: i think we might need to load active connections and then
synced_rpcs.sort_unstable_by(|a, b| {
synced_rpc_indexes.sort_unstable_by(|a, b| {
let a = cache.get(a).unwrap();
let b = cache.get(b).unwrap();
a.cmp(b)
});
for selected_rpc in synced_rpcs.iter() {
for selected_rpc in synced_rpc_indexes.into_iter() {
let selected_rpc = self.inner.get(selected_rpc).unwrap();
// increment our connection counter
if let Err(not_until) = selected_rpc.try_inc_active_requests() {
earliest_possible(&mut earliest_not_until, not_until);

@ -191,6 +191,7 @@ impl Web3ProxyApp {
return Ok(warp::reply::json(&response));
}
Err(None) => {
// TODO: this is too verbose. if there are other servers in other tiers, use those!
warn!("No servers in sync!");
}
Err(Some(not_until)) => {