back to arcswap and usizes

This commit is contained in:
Bryan Stitt 2022-05-18 20:18:01 +00:00
parent 05563b46b1
commit 7a3a3271bb
5 changed files with 120 additions and 216 deletions

42
Cargo.lock generated
View File

@ -74,6 +74,12 @@ version = "1.0.57"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "08f9b8508dccb7687a1d6c4ce66b2b0ecef467c94667de27d8d7fe1f8d2a9cdc"
[[package]]
name = "arc-swap"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c5d78ce20460b82d3fa150275ed9d55e21064fc7951177baacf86a145c4a4b1f"
[[package]]
name = "argh"
version = "0.1.7"
@ -1586,19 +1592,6 @@ dependencies = [
"byteorder",
]
[[package]]
name = "generator"
version = "0.6.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "061d3be1afec479d56fa3bd182bf966c7999ec175fcfdb87ac14d417241366c6"
dependencies = [
"cc",
"libc",
"log",
"rustversion",
"winapi",
]
[[package]]
name = "generic-array"
version = "0.12.4"
@ -2103,16 +2096,6 @@ version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]]
name = "left-right"
version = "0.11.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2033c37d2bc68719774fa095ccdd46a14b846e9c52ec54107eda92fbf966b203"
dependencies = [
"loom",
"slab",
]
[[package]]
name = "libc"
version = "0.2.125"
@ -2155,17 +2138,6 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "loom"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "27a6650b2f722ae8c0e2ebc46d07f80c9923464fc206d962332f1eff83143530"
dependencies = [
"cfg-if",
"generator",
"scoped-tls",
]
[[package]]
name = "mach"
version = "0.3.2"
@ -4298,6 +4270,7 @@ name = "web3-proxy"
version = "0.1.0"
dependencies = [
"anyhow",
"arc-swap",
"argh",
"console-subscriber",
"dashmap",
@ -4308,7 +4281,6 @@ dependencies = [
"futures",
"governor",
"hashbrown 0.12.1",
"left-right",
"linkedhashmap",
"parking_lot 0.12.0",
"proctitle",

View File

@ -7,6 +7,7 @@ edition = "2021"
[dependencies]
anyhow = "1.0.57"
arc-swap = "1.5.0"
argh = "0.1.7"
# axum = "*" # TODO: use this instead of warp?
console-subscriber = { version = "0.1.5", features = ["parking_lot"] }
@ -19,7 +20,6 @@ futures = { version = "0.3.21", features = ["thread-pool"] }
# TODO: governor has a "futures" and "futures-timer" feature. do we want those?
governor = { version = "0.4.2", features = ["dashmap", "std"] }
hashbrown = "0.12.1"
left-right = "0.11.4"
linkedhashmap = { path = "../linkedhashmap", features = ["inline-more"] }
# TODO: parking_lot has an "arc_lock" feature that we might want to use
parking_lot = { version = "0.12.0", features = ["deadlock_detection"] }

View File

@ -231,21 +231,16 @@ impl Web3ProxyApp {
// todo: move getting a cache_key or the result into a helper function. then we could have multiple caches
// TODO: i think we are maybe getting stuck on this lock. maybe a new block arrives, it tries to write and gets hung up on something. then this can't proceed
trace!("{:?} waiting for best_block_hash", request);
trace!("{:?} waiting for head_block_hash", request);
let best_block_hash = self
.balanced_rpcs
.get_synced_rpcs()
.enter()
.map(|x| *x.get_head_block_hash())
.unwrap();
let head_block_hash = self.balanced_rpcs.get_head_block_hash();
trace!("{:?} best_block_hash {}", request, best_block_hash);
trace!("{:?} head_block_hash {}", request, head_block_hash);
// TODO: building this cache key is slow and its large, but i don't see a better way right now
// TODO: inspect the params and see if a block is specified. if so, use that block number instead of current_block
let cache_key = (
best_block_hash,
head_block_hash,
request.method.clone(),
request.params.clone().map(|x| x.to_string()),
);

View File

@ -98,7 +98,8 @@ impl Web3Connection {
#[instrument(skip_all)]
pub async fn reconnect(
self: &Arc<Self>,
block_sender: &flume::Sender<(u64, H256, Arc<Self>)>,
block_sender: &flume::Sender<(u64, H256, usize)>,
rpc_id: usize,
) -> anyhow::Result<()> {
// websocket doesn't need the http client
let http_client = None;
@ -108,7 +109,7 @@ impl Web3Connection {
// TODO: tell the block subscriber that we are at 0
block_sender
.send_async((0, H256::default(), self.clone()))
.send_async((0, H256::default(), rpc_id))
.await?;
let new_provider = Web3Provider::from_str(&self.url, http_client).await?;
@ -203,7 +204,8 @@ impl Web3Connection {
async fn send_block(
self: &Arc<Self>,
block: Result<Block<TxHash>, ProviderError>,
block_sender: &flume::Sender<(u64, H256, Arc<Self>)>,
block_sender: &flume::Sender<(u64, H256, usize)>,
rpc_id: usize,
) {
match block {
Ok(block) => {
@ -212,7 +214,7 @@ impl Web3Connection {
// TODO: i'm pretty sure we don't need send_async, but double check
block_sender
.send_async((block_number, block_hash, self.clone()))
.send_async((block_number, block_hash, rpc_id))
.await
.unwrap();
}
@ -227,7 +229,8 @@ impl Web3Connection {
#[instrument(skip_all)]
pub async fn subscribe_new_heads(
self: Arc<Self>,
block_sender: flume::Sender<(u64, H256, Arc<Self>)>,
rpc_id: usize,
block_sender: flume::Sender<(u64, H256, usize)>,
reconnect: bool,
) -> anyhow::Result<()> {
loop {
@ -272,7 +275,7 @@ impl Web3Connection {
last_hash = new_hash;
}
self.send_block(block, &block_sender).await;
self.send_block(block, &block_sender, rpc_id).await;
}
Err(e) => {
warn!("Failed getting latest block from {}: {:?}", self, e);
@ -300,7 +303,7 @@ impl Web3Connection {
.request("eth_getBlockByNumber", ("latest", false))
.await;
self.send_block(block, &block_sender).await;
self.send_block(block, &block_sender, rpc_id).await;
// TODO: what should this timeout be? needs to be larger than worst case block time
// TODO: although reconnects will make this less of an issue
@ -309,7 +312,7 @@ impl Web3Connection {
.await
{
Ok(Some(new_block)) => {
self.send_block(Ok(new_block), &block_sender).await;
self.send_block(Ok(new_block), &block_sender, rpc_id).await;
// TODO: really not sure about this
task::yield_now().await;
@ -334,7 +337,7 @@ impl Web3Connection {
warn!("new heads subscription exited. reconnecting in 10 seconds...");
sleep(Duration::from_secs(10)).await;
self.reconnect(&block_sender).await?;
self.reconnect(&block_sender, rpc_id).await?;
} else {
break;
}

View File

@ -1,4 +1,5 @@
///! Load balanced communication with a group of web3 providers
use arc_swap::ArcSwap;
use derive_more::From;
use ethers::prelude::H256;
use futures::future::join_all;
@ -6,13 +7,10 @@ use futures::stream::FuturesUnordered;
use futures::StreamExt;
use governor::clock::{QuantaClock, QuantaInstant};
use governor::NotUntil;
use hashbrown::HashMap;
use left_right::{Absorb, ReadHandleFactory, WriteHandle};
use serde_json::value::RawValue;
use std::cmp;
use std::fmt;
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::task;
use tracing::Instrument;
use tracing::{info, info_span, instrument, trace, warn};
@ -20,11 +18,11 @@ use tracing::{info, info_span, instrument, trace, warn};
use crate::config::Web3ConnectionConfig;
use crate::connection::{ActiveRequestHandle, Web3Connection};
#[derive(Clone, Default)]
pub struct SyncedConnections {
head_block_number: u64,
#[derive(Clone)]
struct SyncedConnections {
head_block_num: u64,
head_block_hash: H256,
inner: Vec<Arc<Web3Connection>>,
inner: Vec<usize>,
}
impl fmt::Debug for SyncedConnections {
@ -35,111 +33,24 @@ impl fmt::Debug for SyncedConnections {
}
impl SyncedConnections {
fn new(max_connections: usize) -> Self {
Self {
head_block_num: 0,
head_block_hash: Default::default(),
inner: Vec::with_capacity(max_connections),
}
}
pub fn get_head_block_hash(&self) -> &H256 {
&self.head_block_hash
}
fn update(
&mut self,
log: bool,
new_block_num: u64,
new_block_hash: H256,
rpc: Arc<Web3Connection>,
) {
// TODO: double check this logic
match new_block_num.cmp(&self.head_block_number) {
cmp::Ordering::Greater => {
// the rpc's newest block is the new overall best block
if log {
info!("new head {} from {}", new_block_num, rpc);
}
self.inner.clear();
self.inner.push(rpc);
self.head_block_number = new_block_num;
self.head_block_hash = new_block_hash;
}
cmp::Ordering::Equal => {
if new_block_hash != self.head_block_hash {
// same height, but different chain
// TODO: anything else we should do? set some "nextSafeBlockHeight" to delay sending transactions?
// TODO: sometimes a node changes its block. if that happens, a new block is probably right behind this one
if log {
warn!(
"chain is forked at #{}! {} has {}. {} rpcs have {}",
new_block_num,
rpc,
new_block_hash,
self.inner.len(),
self.head_block_hash
);
}
return;
}
// do not clear synced_connections.
// we just want to add this rpc to the end
self.inner.push(rpc);
}
cmp::Ordering::Less => {
// this isn't the best block in the tier. don't do anything
return;
}
}
// TODO: better log
if log {
trace!("Now synced: {:?}", self.inner);
} else {
trace!("Now synced #2: {:?}", self.inner);
}
}
}
enum SyncedConnectionsOp {
SyncedConnectionsUpdate(u64, H256, Arc<Web3Connection>),
SyncedConnectionsCapacity(usize),
}
impl Absorb<SyncedConnectionsOp> for SyncedConnections {
fn absorb_first(&mut self, operation: &mut SyncedConnectionsOp, _: &Self) {
match operation {
SyncedConnectionsOp::SyncedConnectionsUpdate(new_block_number, new_block_hash, rpc) => {
self.update(true, *new_block_number, *new_block_hash, rpc.clone());
}
SyncedConnectionsOp::SyncedConnectionsCapacity(capacity) => {
self.inner = Vec::with_capacity(*capacity);
}
}
}
fn absorb_second(&mut self, operation: SyncedConnectionsOp, _: &Self) {
match operation {
SyncedConnectionsOp::SyncedConnectionsUpdate(new_block_number, new_block_hash, rpc) => {
// TODO: disable logging on this one?
self.update(false, new_block_number, new_block_hash, rpc);
}
SyncedConnectionsOp::SyncedConnectionsCapacity(capacity) => {
self.inner = Vec::with_capacity(capacity);
}
}
}
fn drop_first(self: Box<Self>) {}
fn sync_with(&mut self, first: &Self) {
// TODO: not sure about this
*self = first.clone()
}
}
/// A collection of web3 connections. Sends requests either the current best server or all servers.
#[derive(From)]
pub struct Web3Connections {
inner: Vec<Arc<Web3Connection>>,
synced_connections_reader: ReadHandleFactory<SyncedConnections>,
synced_connections_writer: Mutex<WriteHandle<SyncedConnections, SyncedConnectionsOp>>,
synced_connections: ArcSwap<SyncedConnections>,
}
impl fmt::Debug for Web3Connections {
@ -180,20 +91,11 @@ impl Web3Connections {
));
}
let (mut synced_connections_writer, synced_connections_reader) =
left_right::new::<SyncedConnections, SyncedConnectionsOp>();
synced_connections_writer.append(SyncedConnectionsOp::SyncedConnectionsCapacity(
num_connections,
));
trace!("publishing synced connections");
synced_connections_writer.publish();
trace!("published synced connections");
let synced_connections = SyncedConnections::new(num_connections);
let connections = Arc::new(Self {
inner: connections,
synced_connections_reader: synced_connections_reader.factory(),
synced_connections_writer: Mutex::new(synced_connections_writer),
synced_connections: ArcSwap::new(Arc::new(synced_connections)),
});
Ok(connections)
@ -204,7 +106,7 @@ impl Web3Connections {
let mut handles = vec![];
for connection in self.inner.iter() {
for (rpc_id, connection) in self.inner.iter().enumerate() {
// subscribe to new heads in a spawned future
// TODO: channel instead. then we can have one future with write access to a left-right?
let connection = Arc::clone(connection);
@ -220,7 +122,7 @@ impl Web3Connections {
// TODO: instead of passing Some(connections), pass Some(channel_sender). Then listen on the receiver below to keep local heads up-to-date
// TODO: proper spann
connection
.subscribe_new_heads(block_sender.clone(), true)
.subscribe_new_heads(rpc_id, block_sender.clone(), true)
.instrument(tracing::info_span!("url"))
.await
});
@ -238,8 +140,8 @@ impl Web3Connections {
join_all(handles).await;
}
pub fn get_synced_rpcs(&self) -> left_right::ReadHandle<SyncedConnections> {
self.synced_connections_reader.handle()
pub fn get_head_block_hash(&self) -> H256 {
*self.synced_connections.load().get_head_block_hash()
}
/// Send the same request to all the handles. Returning the fastest successful result.
@ -309,37 +211,77 @@ impl Web3Connections {
}
/// TODO: possible dead lock here. investigate more. probably refactor
/// TODO: move parts of this onto SyncedConnections?
#[instrument(skip_all)]
async fn update_synced_rpcs(
&self,
block_receiver: flume::Receiver<(u64, H256, Arc<Web3Connection>)>,
block_receiver: flume::Receiver<(u64, H256, usize)>,
) -> anyhow::Result<()> {
let mut synced_connections_writer = self.synced_connections_writer.lock().await;
let max_connections = self.inner.len();
while let Ok((new_block_num, new_block_hash, rpc)) = block_receiver.recv_async().await {
let mut connection_states: Vec<(u64, H256)> = Vec::with_capacity(max_connections);
let mut head_block_hash = H256::zero();
let mut head_block_num = 0u64;
let mut synced_connections = SyncedConnections::new(max_connections);
while let Ok((new_block_num, new_block_hash, rpc_id)) = block_receiver.recv_async().await {
if new_block_num == 0 {
warn!("{} is still syncing", rpc);
continue;
// TODO: show the actual rpc url?
warn!("rpc #{} is still syncing", rpc_id);
}
let span = info_span!("new_block_num", new_block_num,);
// TODO: span with rpc in it, too
let span = info_span!("new_block", new_block_num);
let _enter = span.enter();
synced_connections_writer.append(SyncedConnectionsOp::SyncedConnectionsUpdate(
new_block_num,
new_block_hash,
rpc,
));
connection_states.insert(rpc_id, (new_block_num, new_block_hash));
// TODO: only publish when the second block arrives?
// TODO: use spans properly
trace!("publishing synced connections for block {}", new_block_num,);
synced_connections_writer.publish();
trace!(
"published synced connections for block {} from {}",
new_block_num,
"some rpc"
);
// TODO: do something to update the synced blocks
match new_block_num.cmp(&head_block_num) {
cmp::Ordering::Greater => {
// the rpc's newest block is the new overall best block
info!("new head from #{}", rpc_id);
synced_connections.inner.clear();
synced_connections.inner.push(rpc_id);
head_block_num = new_block_num;
head_block_hash = new_block_hash;
}
cmp::Ordering::Equal => {
if new_block_hash != head_block_hash {
// same height, but different chain
// TODO: anything else we should do? set some "nextSafeBlockHeight" to delay sending transactions?
// TODO: sometimes a node changes its block. if that happens, a new block is probably right behind this one
warn!(
"chain is forked at #{}! {} has {}. {} rpcs have {}",
new_block_num,
rpc_id,
new_block_hash,
synced_connections.inner.len(),
head_block_hash
);
// TODO: don't continue. check to see which head block is more popular!
continue;
}
// do not clear synced_connections.
// we just want to add this rpc to the end
synced_connections.inner.push(rpc_id);
}
cmp::Ordering::Less => {
// this isn't the best block in the tier. don't do anything
continue;
}
}
// the synced connections have changed
let new_data = Arc::new(synced_connections.clone());
// TODO: only do this if there are 2 nodes synced to this block?
// do the arcswap
self.synced_connections.swap(new_data);
}
// TODO: if there was an error, we should return it
@ -355,37 +297,27 @@ impl Web3Connections {
) -> Result<ActiveRequestHandle, Option<NotUntil<QuantaInstant>>> {
let mut earliest_not_until = None;
let mut synced_rpc_arcs = self
.synced_connections_reader
.handle()
.enter()
.map(|x| x.inner.clone())
.unwrap();
let mut synced_rpc_indexes = self.synced_connections.load().inner.clone();
// TODO: better key!
let sort_cache: HashMap<String, (f32, u32)> = synced_rpc_arcs
let sort_cache: Vec<(f32, u32)> = synced_rpc_indexes
.iter()
.map(|connection| {
// TODO: better key!
let key = format!("{}", connection);
let active_requests = connection.active_requests();
let soft_limit = connection.soft_limit();
.map(|rpc_id| {
let rpc = self.inner.get(*rpc_id).unwrap();
let active_requests = rpc.active_requests();
let soft_limit = rpc.soft_limit();
// TODO: how should we include the soft limit? floats are slower than integer math
let utilization = active_requests as f32 / soft_limit as f32;
(key, (utilization, soft_limit))
(utilization, soft_limit)
})
.collect();
// TODO: i think we might need to load active connections and then
synced_rpc_arcs.sort_unstable_by(|a, b| {
// TODO: better keys
let a_key = format!("{}", a);
let b_key = format!("{}", b);
let (a_utilization, a_soft_limit) = sort_cache.get(&a_key).unwrap();
let (b_utilization, b_soft_limit) = sort_cache.get(&b_key).unwrap();
synced_rpc_indexes.sort_unstable_by(|a, b| {
let (a_utilization, a_soft_limit) = sort_cache.get(*a).unwrap();
let (b_utilization, b_soft_limit) = sort_cache.get(*b).unwrap();
// TODO: i'm comparing floats. crap
match a_utilization
@ -397,14 +329,16 @@ impl Web3Connections {
}
});
for selected_rpc in synced_rpc_arcs.into_iter() {
for rpc_id in synced_rpc_indexes.into_iter() {
let rpc = self.inner.get(rpc_id).unwrap();
// increment our connection counter
match selected_rpc.try_request_handle() {
match rpc.try_request_handle() {
Err(not_until) => {
earliest_possible(&mut earliest_not_until, not_until);
}
Ok(handle) => {
trace!("next server on {:?}: {:?}", self, selected_rpc);
trace!("next server on {:?}: {:?}", self, rpc_id);
return Ok(handle);
}
}