From 9e457154a3f36ce8df110b8a68550572643d906b Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Wed, 27 Apr 2022 20:02:51 +0000 Subject: [PATCH] sort on sync status --- TODO.md | 21 ++++++++++ src/block_watcher.rs | 57 ++++++++++++++++++++++---- src/main.rs | 4 +- src/provider.rs | 3 +- src/provider_tiers.rs | 93 +++++++++++++++++++++++++++++++++++-------- 5 files changed, 150 insertions(+), 28 deletions(-) diff --git a/TODO.md b/TODO.md index b33d2942..126bb15e 100644 --- a/TODO.md +++ b/TODO.md @@ -1,3 +1,24 @@ # Todo - [ ] tarpit ratelimiting at the start, but reject if incoming requests is super high +- [ ] thundering herd problem if we only allow a lag of 1 block. soft rate limits should help + +# notes +its almost working. when i curl it, it doesn't work exactly right though + +## first time: + + ``` + thread 'tokio-runtime-worker' panicked at 'not implemented', src/provider_tiers.rs:142:13 + note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace + ``` + +I think this is not seeing any as in sync. not sure why else it would not have any not_until set. +I believe this is because we don't know the first block. we should force an update or something at the start + +## second time: +"false" + +it loses all the "jsonrpc" parts and just has the simple result. need to return a proper jsonrpc response + +# TODO: add the backend server to the header diff --git a/src/block_watcher.rs b/src/block_watcher.rs index 2da85614..b46bf6ab 100644 --- a/src/block_watcher.rs +++ b/src/block_watcher.rs @@ -14,8 +14,41 @@ pub type NewHead = (String, Block); pub type BlockWatcherSender = mpsc::UnboundedSender; pub type BlockWatcherReceiver = mpsc::UnboundedReceiver; +#[derive(Eq)] +// TODO: ethers has a similar SyncingStatus +pub enum SyncStatus { + Synced(u64), + Behind(u64), + Unknown, +} + +// impl Ord for SyncStatus { +// fn cmp(&self, other: &Self) -> cmp::Ordering { +// self.height.cmp(&other.height) +// } +// } + +// impl PartialOrd for SyncStatus { +// fn partial_cmp(&self, other: &Self) -> Option { +// Some(self.cmp(other)) +// } +// } + +impl PartialEq for SyncStatus { + fn eq(&self, other: &Self) -> bool { + match (self, other) { + (Self::Synced(a), Self::Synced(b)) => a == b, + (Self::Unknown, Self::Unknown) => true, + (Self::Behind(a), Self::Behind(b)) => a == b, + _ => false, + } + } +} + +#[derive(Debug)] pub struct BlockWatcher { sender: BlockWatcherSender, + /// parking_lot::Mutex is supposed to be faster, but we only lock this once, so its fine receiver: Mutex, block_numbers: DashMap, head_block_number: AtomicU64, @@ -37,24 +70,30 @@ impl BlockWatcher { self.sender.clone() } - pub async fn is_synced(&self, rpc: String, allowed_lag: u64) -> anyhow::Result { + pub fn sync_status(&self, rpc: &str, allowed_lag: u64) -> SyncStatus { match ( self.head_block_number.load(atomic::Ordering::SeqCst), - self.block_numbers.get(&rpc), + self.block_numbers.get(rpc), ) { - (0, _) => Ok(false), - (_, None) => Ok(false), + (0, _) => SyncStatus::Unknown, + (_, None) => SyncStatus::Unknown, (head_block_number, Some(rpc_block_number)) => { match head_block_number.cmp(&rpc_block_number) { - cmp::Ordering::Equal => Ok(true), + cmp::Ordering::Equal => SyncStatus::Synced(0), cmp::Ordering::Greater => { // this probably won't happen, but it might if the block arrives at the exact wrong time - Ok(true) + // TODO: should this be negative? + SyncStatus::Synced(0) } cmp::Ordering::Less => { // allow being some behind let lag = head_block_number - *rpc_block_number; - Ok(lag <= allowed_lag) + + if lag <= allowed_lag { + SyncStatus::Synced(lag) + } else { + SyncStatus::Behind(lag) + } } } } @@ -90,6 +129,7 @@ impl BlockWatcher { let head_number = self.head_block_number.load(atomic::Ordering::SeqCst); let label_slow_heads = if head_number == 0 { + // first block seen self.head_block_number .swap(new_block_number, atomic::Ordering::SeqCst); "+".to_string() @@ -98,7 +138,7 @@ impl BlockWatcher { // TODO: alert if there is a large chain split? match (new_block_number).cmp(&head_number) { cmp::Ordering::Equal => { - // this block is saved + // this block is already saved as the head "".to_string() } cmp::Ordering::Greater => { @@ -108,6 +148,7 @@ impl BlockWatcher { "+".to_string() } cmp::Ordering::Less => { + // this rpc is behind let lag = new_block_number as i64 - head_number as i64; lag.to_string() } diff --git a/src/main.rs b/src/main.rs index 8e9eb81c..ac58f767 100644 --- a/src/main.rs +++ b/src/main.rs @@ -116,7 +116,7 @@ impl Web3ProxyApp { let read_lock = self.private_rpcs_ratelimiter_lock.read().await; match private_rpcs - .get_upstream_servers(self.block_watcher.clone()) + .get_upstream_servers(1, self.block_watcher.clone()) .await { Ok(upstream_servers) => { @@ -167,7 +167,7 @@ impl Web3ProxyApp { for balanced_rpcs in self.balanced_rpc_tiers.iter() { match balanced_rpcs - .next_upstream_server(self.block_watcher.clone()) + .next_upstream_server(1, self.block_watcher.clone()) .await { Ok(upstream_server) => { diff --git a/src/provider.rs b/src/provider.rs index 16ab0afa..98e39676 100644 --- a/src/provider.rs +++ b/src/provider.rs @@ -10,7 +10,7 @@ use tracing::{info, warn}; use crate::block_watcher::BlockWatcherSender; // TODO: instead of an enum, I tried to use Box, but hit https://github.com/gakonst/ethers-rs/issues/592 -#[derive(From)] +#[derive(From, Debug)] pub enum Web3Provider { Http(ethers::providers::Provider), Ws(ethers::providers::Provider), @@ -72,6 +72,7 @@ impl Web3Provider { } /// An active connection to a Web3Rpc +#[derive(Debug)] pub struct Web3Connection { /// keep track of currently open requests. We sort on this active_requests: u32, diff --git a/src/provider_tiers.rs b/src/provider_tiers.rs index 3b45ac2c..0c428d10 100644 --- a/src/provider_tiers.rs +++ b/src/provider_tiers.rs @@ -5,11 +5,13 @@ use governor::middleware::NoOpMiddleware; use governor::state::{InMemoryState, NotKeyed}; use governor::NotUntil; use governor::RateLimiter; +use std::cmp; use std::num::NonZeroU32; use std::sync::Arc; use tokio::sync::RwLock; +use tracing::{info, instrument}; -use crate::block_watcher::BlockWatcher; +use crate::block_watcher::{BlockWatcher, SyncStatus}; use crate::provider::Web3Connection; type Web3RateLimiter = @@ -20,9 +22,10 @@ type Web3RateLimiterMap = DashMap; pub type Web3ConnectionMap = DashMap; /// Load balance to the rpc +#[derive(Debug)] pub struct Web3ProviderTier { /// RPC urls sorted by active requests - /// TODO: what type for the rpc? + /// TODO: what type for the rpc? i think we want this to be the key for the provider and not the provider itself rpcs: RwLock>, connections: Arc, ratelimiters: Web3RateLimiterMap, @@ -72,14 +75,66 @@ impl Web3ProviderTier { } /// get the best available rpc server + #[instrument] pub async fn next_upstream_server( &self, + allowed_lag: u64, block_watcher: Arc, ) -> Result> { - let mut balanced_rpcs = self.rpcs.write().await; + let mut available_rpcs = self.rpcs.write().await; // sort rpcs by their active connections - balanced_rpcs.sort_unstable_by(|a, b| { + available_rpcs.sort_unstable_by(|a, b| { + self.connections + .get(a) + .unwrap() + .cmp(&self.connections.get(b).unwrap()) + }); + + // sort rpcs by their block height + available_rpcs.sort_unstable_by(|a, b| { + let a_synced = block_watcher.sync_status(a, allowed_lag); + let b_synced = block_watcher.sync_status(b, allowed_lag); + + match (a_synced, b_synced) { + (SyncStatus::Synced(a), SyncStatus::Synced(b)) => { + if a != b { + return a.cmp(&b); + } + // else they are equal and we want to compare on active connections + } + (SyncStatus::Synced(_), SyncStatus::Unknown) => { + return cmp::Ordering::Greater; + } + (SyncStatus::Unknown, SyncStatus::Synced(_)) => { + return cmp::Ordering::Less; + } + (SyncStatus::Unknown, SyncStatus::Unknown) => { + // neither rpc is synced + // this means neither will have connections + return cmp::Ordering::Equal; + } + (SyncStatus::Synced(_), SyncStatus::Behind(_)) => { + return cmp::Ordering::Greater; + } + (SyncStatus::Behind(_), SyncStatus::Synced(_)) => { + return cmp::Ordering::Less; + } + (SyncStatus::Behind(_), SyncStatus::Unknown) => { + return cmp::Ordering::Greater; + } + (SyncStatus::Behind(a), SyncStatus::Behind(b)) => { + if a != b { + return a.cmp(&b); + } + // else they are equal and we want to compare on active connections + } + (SyncStatus::Unknown, SyncStatus::Behind(_)) => { + return cmp::Ordering::Less; + } + } + + // sort on active connections self.connections .get(a) .unwrap() @@ -88,16 +143,19 @@ impl Web3ProviderTier { let mut earliest_not_until = None; - for selected_rpc in balanced_rpcs.iter() { + for selected_rpc in available_rpcs.iter() { // check current block number - if !block_watcher - .is_synced(selected_rpc.clone(), 3) - .await - .expect("checking is_synced failed") - { + // TODO: i don't like that we fetched sync_status above and then do it again here. cache? + if let SyncStatus::Synced(_) = block_watcher.sync_status(selected_rpc, allowed_lag) { + // rpc is synced + } else { // skip this rpc because it is not synced // TODO: make a NotUntil here? - continue; + // TODO: include how many blocks behind + // TODO: better log + info!("{} is not synced", selected_rpc); + // we sorted on block height. so if this one isn't synced, none of the later ones will be either + break; } // check rate limits @@ -109,6 +167,9 @@ impl Web3ProviderTier { Err(not_until) => { // rate limit failed // save the smallest not_until. if nothing succeeds, return an Err with not_until in it + // TODO: use tracing better + info!("Exhausted rate limit on {}: {}", selected_rpc, not_until); + if earliest_not_until.is_none() { earliest_not_until = Some(not_until); } else { @@ -146,6 +207,7 @@ impl Web3ProviderTier { /// get all available rpc servers pub async fn get_upstream_servers( &self, + allowed_lag: u64, block_watcher: Arc, ) -> Result, NotUntil> { let mut earliest_not_until = None; @@ -153,12 +215,9 @@ impl Web3ProviderTier { let mut selected_rpcs = vec![]; for selected_rpc in self.rpcs.read().await.iter() { - // check that the server is synced - if !block_watcher - .is_synced(selected_rpc.clone(), 1) - .await - .expect("checking is_synced failed") - { + if let SyncStatus::Synced(_) = block_watcher.sync_status(selected_rpc, allowed_lag) { + // rpc is synced + } else { // skip this rpc because it is not synced continue; }