sort on sync status

This commit is contained in:
Bryan Stitt 2022-04-27 20:02:51 +00:00
parent 2edf0cf4b3
commit 9e457154a3
5 changed files with 150 additions and 28 deletions

21
TODO.md
View File

@ -1,3 +1,24 @@
# Todo
- [ ] tarpit ratelimiting at the start, but reject if incoming requests is super high
- [ ] thundering herd problem if we only allow a lag of 1 block. soft rate limits should help
# notes
its almost working. when i curl it, it doesn't work exactly right though
## first time:
```
thread 'tokio-runtime-worker' panicked at 'not implemented', src/provider_tiers.rs:142:13
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
```
I think this is not seeing any as in sync. not sure why else it would not have any not_until set.
I believe this is because we don't know the first block. we should force an update or something at the start
## second time:
"false"
it loses all the "jsonrpc" parts and just has the simple result. need to return a proper jsonrpc response
# TODO: add the backend server to the header

View File

@ -14,8 +14,41 @@ pub type NewHead = (String, Block<TxHash>);
pub type BlockWatcherSender = mpsc::UnboundedSender<NewHead>;
pub type BlockWatcherReceiver = mpsc::UnboundedReceiver<NewHead>;
#[derive(Eq)]
// TODO: ethers has a similar SyncingStatus
pub enum SyncStatus {
Synced(u64),
Behind(u64),
Unknown,
}
// impl Ord for SyncStatus {
// fn cmp(&self, other: &Self) -> cmp::Ordering {
// self.height.cmp(&other.height)
// }
// }
// impl PartialOrd for SyncStatus {
// fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
// Some(self.cmp(other))
// }
// }
impl PartialEq for SyncStatus {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(Self::Synced(a), Self::Synced(b)) => a == b,
(Self::Unknown, Self::Unknown) => true,
(Self::Behind(a), Self::Behind(b)) => a == b,
_ => false,
}
}
}
#[derive(Debug)]
pub struct BlockWatcher {
sender: BlockWatcherSender,
/// parking_lot::Mutex is supposed to be faster, but we only lock this once, so its fine
receiver: Mutex<BlockWatcherReceiver>,
block_numbers: DashMap<String, u64>,
head_block_number: AtomicU64,
@ -37,24 +70,30 @@ impl BlockWatcher {
self.sender.clone()
}
pub async fn is_synced(&self, rpc: String, allowed_lag: u64) -> anyhow::Result<bool> {
pub fn sync_status(&self, rpc: &str, allowed_lag: u64) -> SyncStatus {
match (
self.head_block_number.load(atomic::Ordering::SeqCst),
self.block_numbers.get(&rpc),
self.block_numbers.get(rpc),
) {
(0, _) => Ok(false),
(_, None) => Ok(false),
(0, _) => SyncStatus::Unknown,
(_, None) => SyncStatus::Unknown,
(head_block_number, Some(rpc_block_number)) => {
match head_block_number.cmp(&rpc_block_number) {
cmp::Ordering::Equal => Ok(true),
cmp::Ordering::Equal => SyncStatus::Synced(0),
cmp::Ordering::Greater => {
// this probably won't happen, but it might if the block arrives at the exact wrong time
Ok(true)
// TODO: should this be negative?
SyncStatus::Synced(0)
}
cmp::Ordering::Less => {
// allow being some behind
let lag = head_block_number - *rpc_block_number;
Ok(lag <= allowed_lag)
if lag <= allowed_lag {
SyncStatus::Synced(lag)
} else {
SyncStatus::Behind(lag)
}
}
}
}
@ -90,6 +129,7 @@ impl BlockWatcher {
let head_number = self.head_block_number.load(atomic::Ordering::SeqCst);
let label_slow_heads = if head_number == 0 {
// first block seen
self.head_block_number
.swap(new_block_number, atomic::Ordering::SeqCst);
"+".to_string()
@ -98,7 +138,7 @@ impl BlockWatcher {
// TODO: alert if there is a large chain split?
match (new_block_number).cmp(&head_number) {
cmp::Ordering::Equal => {
// this block is saved
// this block is already saved as the head
"".to_string()
}
cmp::Ordering::Greater => {
@ -108,6 +148,7 @@ impl BlockWatcher {
"+".to_string()
}
cmp::Ordering::Less => {
// this rpc is behind
let lag = new_block_number as i64 - head_number as i64;
lag.to_string()
}

View File

@ -116,7 +116,7 @@ impl Web3ProxyApp {
let read_lock = self.private_rpcs_ratelimiter_lock.read().await;
match private_rpcs
.get_upstream_servers(self.block_watcher.clone())
.get_upstream_servers(1, self.block_watcher.clone())
.await
{
Ok(upstream_servers) => {
@ -167,7 +167,7 @@ impl Web3ProxyApp {
for balanced_rpcs in self.balanced_rpc_tiers.iter() {
match balanced_rpcs
.next_upstream_server(self.block_watcher.clone())
.next_upstream_server(1, self.block_watcher.clone())
.await
{
Ok(upstream_server) => {

View File

@ -10,7 +10,7 @@ use tracing::{info, warn};
use crate::block_watcher::BlockWatcherSender;
// TODO: instead of an enum, I tried to use Box<dyn Provider>, but hit https://github.com/gakonst/ethers-rs/issues/592
#[derive(From)]
#[derive(From, Debug)]
pub enum Web3Provider {
Http(ethers::providers::Provider<ethers::providers::Http>),
Ws(ethers::providers::Provider<ethers::providers::Ws>),
@ -72,6 +72,7 @@ impl Web3Provider {
}
/// An active connection to a Web3Rpc
#[derive(Debug)]
pub struct Web3Connection {
/// keep track of currently open requests. We sort on this
active_requests: u32,

View File

@ -5,11 +5,13 @@ use governor::middleware::NoOpMiddleware;
use governor::state::{InMemoryState, NotKeyed};
use governor::NotUntil;
use governor::RateLimiter;
use std::cmp;
use std::num::NonZeroU32;
use std::sync::Arc;
use tokio::sync::RwLock;
use tracing::{info, instrument};
use crate::block_watcher::BlockWatcher;
use crate::block_watcher::{BlockWatcher, SyncStatus};
use crate::provider::Web3Connection;
type Web3RateLimiter =
@ -20,9 +22,10 @@ type Web3RateLimiterMap = DashMap<String, Web3RateLimiter>;
pub type Web3ConnectionMap = DashMap<String, Web3Connection>;
/// Load balance to the rpc
#[derive(Debug)]
pub struct Web3ProviderTier {
/// RPC urls sorted by active requests
/// TODO: what type for the rpc?
/// TODO: what type for the rpc? i think we want this to be the key for the provider and not the provider itself
rpcs: RwLock<Vec<String>>,
connections: Arc<Web3ConnectionMap>,
ratelimiters: Web3RateLimiterMap,
@ -72,14 +75,66 @@ impl Web3ProviderTier {
}
/// get the best available rpc server
#[instrument]
pub async fn next_upstream_server(
&self,
allowed_lag: u64,
block_watcher: Arc<BlockWatcher>,
) -> Result<String, NotUntil<QuantaInstant>> {
let mut balanced_rpcs = self.rpcs.write().await;
let mut available_rpcs = self.rpcs.write().await;
// sort rpcs by their active connections
balanced_rpcs.sort_unstable_by(|a, b| {
available_rpcs.sort_unstable_by(|a, b| {
self.connections
.get(a)
.unwrap()
.cmp(&self.connections.get(b).unwrap())
});
// sort rpcs by their block height
available_rpcs.sort_unstable_by(|a, b| {
let a_synced = block_watcher.sync_status(a, allowed_lag);
let b_synced = block_watcher.sync_status(b, allowed_lag);
match (a_synced, b_synced) {
(SyncStatus::Synced(a), SyncStatus::Synced(b)) => {
if a != b {
return a.cmp(&b);
}
// else they are equal and we want to compare on active connections
}
(SyncStatus::Synced(_), SyncStatus::Unknown) => {
return cmp::Ordering::Greater;
}
(SyncStatus::Unknown, SyncStatus::Synced(_)) => {
return cmp::Ordering::Less;
}
(SyncStatus::Unknown, SyncStatus::Unknown) => {
// neither rpc is synced
// this means neither will have connections
return cmp::Ordering::Equal;
}
(SyncStatus::Synced(_), SyncStatus::Behind(_)) => {
return cmp::Ordering::Greater;
}
(SyncStatus::Behind(_), SyncStatus::Synced(_)) => {
return cmp::Ordering::Less;
}
(SyncStatus::Behind(_), SyncStatus::Unknown) => {
return cmp::Ordering::Greater;
}
(SyncStatus::Behind(a), SyncStatus::Behind(b)) => {
if a != b {
return a.cmp(&b);
}
// else they are equal and we want to compare on active connections
}
(SyncStatus::Unknown, SyncStatus::Behind(_)) => {
return cmp::Ordering::Less;
}
}
// sort on active connections
self.connections
.get(a)
.unwrap()
@ -88,16 +143,19 @@ impl Web3ProviderTier {
let mut earliest_not_until = None;
for selected_rpc in balanced_rpcs.iter() {
for selected_rpc in available_rpcs.iter() {
// check current block number
if !block_watcher
.is_synced(selected_rpc.clone(), 3)
.await
.expect("checking is_synced failed")
{
// TODO: i don't like that we fetched sync_status above and then do it again here. cache?
if let SyncStatus::Synced(_) = block_watcher.sync_status(selected_rpc, allowed_lag) {
// rpc is synced
} else {
// skip this rpc because it is not synced
// TODO: make a NotUntil here?
continue;
// TODO: include how many blocks behind
// TODO: better log
info!("{} is not synced", selected_rpc);
// we sorted on block height. so if this one isn't synced, none of the later ones will be either
break;
}
// check rate limits
@ -109,6 +167,9 @@ impl Web3ProviderTier {
Err(not_until) => {
// rate limit failed
// save the smallest not_until. if nothing succeeds, return an Err with not_until in it
// TODO: use tracing better
info!("Exhausted rate limit on {}: {}", selected_rpc, not_until);
if earliest_not_until.is_none() {
earliest_not_until = Some(not_until);
} else {
@ -146,6 +207,7 @@ impl Web3ProviderTier {
/// get all available rpc servers
pub async fn get_upstream_servers(
&self,
allowed_lag: u64,
block_watcher: Arc<BlockWatcher>,
) -> Result<Vec<String>, NotUntil<QuantaInstant>> {
let mut earliest_not_until = None;
@ -153,12 +215,9 @@ impl Web3ProviderTier {
let mut selected_rpcs = vec![];
for selected_rpc in self.rpcs.read().await.iter() {
// check that the server is synced
if !block_watcher
.is_synced(selected_rpc.clone(), 1)
.await
.expect("checking is_synced failed")
{
if let SyncStatus::Synced(_) = block_watcher.sync_status(selected_rpc, allowed_lag) {
// rpc is synced
} else {
// skip this rpc because it is not synced
continue;
}