From 4eeacbb30c9a66505e574abe5f762478c6fa1a43 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Fri, 29 Apr 2022 19:51:32 +0000 Subject: [PATCH] fix where we sort and lock less --- src/block_watcher.rs | 29 +++++++++++++++++++ src/main.rs | 28 +------------------ src/provider_tiers.rs | 65 ++++++++++++------------------------------- 3 files changed, 48 insertions(+), 74 deletions(-) diff --git a/src/block_watcher.rs b/src/block_watcher.rs index b88d3894..66c1f627 100644 --- a/src/block_watcher.rs +++ b/src/block_watcher.rs @@ -16,12 +16,41 @@ pub type BlockWatcherSender = mpsc::UnboundedSender; pub type BlockWatcherReceiver = mpsc::UnboundedReceiver; // TODO: ethers has a similar SyncingStatus +#[derive(Eq)] pub enum SyncStatus { Synced(u64), Behind(u64), Unknown, } +impl Ord for SyncStatus { + fn cmp(&self, other: &Self) -> cmp::Ordering { + match (self, other) { + (SyncStatus::Synced(a), SyncStatus::Synced(b)) => a.cmp(b), + (SyncStatus::Synced(_), SyncStatus::Unknown) => cmp::Ordering::Greater, + (SyncStatus::Unknown, SyncStatus::Synced(_)) => cmp::Ordering::Less, + (SyncStatus::Unknown, SyncStatus::Unknown) => cmp::Ordering::Equal, + (SyncStatus::Synced(_), SyncStatus::Behind(_)) => cmp::Ordering::Greater, + (SyncStatus::Behind(_), SyncStatus::Synced(_)) => cmp::Ordering::Less, + (SyncStatus::Behind(_), SyncStatus::Unknown) => cmp::Ordering::Greater, + (SyncStatus::Behind(a), SyncStatus::Behind(b)) => a.cmp(b), + (SyncStatus::Unknown, SyncStatus::Behind(_)) => cmp::Ordering::Less, + } + } +} + +impl PartialOrd for SyncStatus { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl PartialEq for SyncStatus { + fn eq(&self, other: &Self) -> bool { + self.cmp(other) == cmp::Ordering::Equal + } +} + pub struct BlockWatcher { sender: BlockWatcherSender, /// this Mutex is locked over awaits, so we want an async lock diff --git a/src/main.rs b/src/main.rs index 2f730d6c..c11f9952 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,12 +9,11 @@ use std::collections::HashMap; use std::fmt; use std::sync::Arc; use std::time::Duration; -use tokio::sync::{mpsc, watch, RwLock}; +use tokio::sync::{mpsc, watch}; use tokio::time::sleep; use tracing::warn; use warp::Filter; -// use crate::types::{BlockMap, ConnectionsMap, RpcRateLimiterMap}; use crate::block_watcher::BlockWatcher; use crate::provider::JsonRpcRequest; use crate::provider_tiers::Web3ProviderTier; @@ -36,11 +35,6 @@ struct Web3ProxyApp { balanced_rpc_tiers: Arc>, /// Send private requests (like eth_sendRawTransaction) to all these servers private_rpcs: Option>, - /// write lock on these when all rate limits are hit - /// this lock will be held open over an await, so use async locking - balanced_rpc_ratelimiter_lock: RwLock<()>, - /// this lock will be held open over an await, so use async locking - private_rpcs_ratelimiter_lock: RwLock<()>, } impl fmt::Debug for Web3ProxyApp { @@ -165,8 +159,6 @@ impl Web3ProxyApp { clock, balanced_rpc_tiers, private_rpcs, - balanced_rpc_ratelimiter_lock: Default::default(), - private_rpcs_ratelimiter_lock: Default::default(), }) } @@ -182,8 +174,6 @@ impl Web3ProxyApp { // there are private rpcs configured and the request is eth_sendSignedTransaction. send to all private rpcs loop { // TODO: think more about this lock. i think it won't actually help the herd. it probably makes it worse if we have a tight lag_limit - let read_lock = self.private_rpcs_ratelimiter_lock.read().await; - match private_rpcs.get_upstream_servers().await { Ok(upstream_servers) => { let (tx, mut rx) = mpsc::unbounded_channel(); @@ -216,16 +206,10 @@ impl Web3ProxyApp { Err(not_until) => { // TODO: move this to a helper function // sleep (with a lock) until our rate limits should be available - drop(read_lock); - if let Some(not_until) = not_until { - let write_lock = self.balanced_rpc_ratelimiter_lock.write().await; - let deadline = not_until.wait_time_from(self.clock.now()); sleep(deadline).await; - - drop(write_lock); } } }; @@ -234,8 +218,6 @@ impl Web3ProxyApp { // this is not a private transaction (or no private relays are configured) // try to send to each tier, stopping at the first success loop { - let read_lock = self.balanced_rpc_ratelimiter_lock.read().await; - // there are multiple tiers. save the earliest not_until (if any). if we don't return, we will sleep until then and then try again let mut earliest_not_until = None; @@ -300,18 +282,10 @@ impl Web3ProxyApp { } // we haven't returned an Ok, sleep and try again - // TODO: move this to a helper function - drop(read_lock); - - // unwrap should be safe since we would have returned if it wasn't set if let Some(earliest_not_until) = earliest_not_until { - let write_lock = self.balanced_rpc_ratelimiter_lock.write().await; - let deadline = earliest_not_until.wait_time_from(self.clock.now()); sleep(deadline).await; - - drop(write_lock); } else { // TODO: how long should we wait? // TODO: max wait time? diff --git a/src/provider_tiers.rs b/src/provider_tiers.rs index e94a0486..fc3c5767 100644 --- a/src/provider_tiers.rs +++ b/src/provider_tiers.rs @@ -6,7 +6,6 @@ use futures::StreamExt; use governor::clock::{QuantaClock, QuantaInstant}; use governor::NotUntil; use serde_json::value::RawValue; -use std::cmp; use std::collections::HashMap; use std::fmt; use std::num::NonZeroU32; @@ -192,62 +191,22 @@ impl Web3ProviderTier { }) .collect(); - // sort rpcs by their sync status and active connections + // sort rpcs by their sync status + // TODO: if we only changed one row, we don't need to sort the whole thing. i think we can do this better available_rpcs.sort_unstable_by(|a, b| { let a_synced = sync_status.get(a).unwrap(); let b_synced = sync_status.get(b).unwrap(); - match (a_synced, b_synced) { - (SyncStatus::Synced(a), SyncStatus::Synced(b)) => { - if a != b { - return a.cmp(b); - } - // else they are equal and we want to compare on active connections - } - (SyncStatus::Synced(_), SyncStatus::Unknown) => { - return cmp::Ordering::Greater; - } - (SyncStatus::Unknown, SyncStatus::Synced(_)) => { - return cmp::Ordering::Less; - } - (SyncStatus::Unknown, SyncStatus::Unknown) => { - // neither rpc is synced - // this means neither will have connections - return cmp::Ordering::Equal; - } - (SyncStatus::Synced(_), SyncStatus::Behind(_)) => { - return cmp::Ordering::Greater; - } - (SyncStatus::Behind(_), SyncStatus::Synced(_)) => { - return cmp::Ordering::Less; - } - (SyncStatus::Behind(_), SyncStatus::Unknown) => { - return cmp::Ordering::Greater; - } - (SyncStatus::Behind(a), SyncStatus::Behind(b)) => { - if a != b { - return a.cmp(b); - } - // else they are equal and we want to compare on active connections - } - (SyncStatus::Unknown, SyncStatus::Behind(_)) => { - return cmp::Ordering::Less; - } - } - - // sort on active connections - self.connections - .get(a) - .unwrap() - .cmp(self.connections.get(b).unwrap()) + a_synced.cmp(b_synced) }); - // filter out + // filter out unsynced rpcs let synced_rpcs: Vec = available_rpcs .into_iter() .take_while(|rpc| matches!(sync_status.get(rpc).unwrap(), SyncStatus::Synced(_))) .collect(); + // TODO: is arcswap the best type for this? self.synced_rpcs.swap(Arc::new(synced_rpcs)); Ok(()) @@ -257,7 +216,19 @@ impl Web3ProviderTier { pub async fn next_upstream_server(&self) -> Result>> { let mut earliest_not_until = None; - for selected_rpc in self.synced_rpcs.load().iter() { + // TODO: this clone is probably not the best way to do this + let mut synced_rpcs = Vec::clone(&*self.synced_rpcs.load()); + + // TODO: we don't want to sort on active connections. we want to sort on remaining capacity for connections. for example, geth can handle more than erigon + synced_rpcs.sort_unstable_by(|a, b| { + // sort on active connections + self.connections + .get(a) + .unwrap() + .cmp(self.connections.get(b).unwrap()) + }); + + for selected_rpc in synced_rpcs.iter() { // increment our connection counter if let Err(not_until) = self .connections