From 0b5d2ca1cf91d6410bf95f12a2ad333b00d37bd5 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Wed, 27 Apr 2022 04:36:11 +0000 Subject: [PATCH] track latest blocks --- Cargo.lock | 7 +++ Cargo.toml | 1 + src/block_watcher.rs | 103 ++++++++++++++++++++++++++++++------------ src/provider_tiers.rs | 18 ++++++++ 4 files changed, 100 insertions(+), 29 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 313c58a6..a54b243c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -48,6 +48,12 @@ version = "1.0.57" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "08f9b8508dccb7687a1d6c4ce66b2b0ecef467c94667de27d8d7fe1f8d2a9cdc" +[[package]] +name = "arc-swap" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c5d78ce20460b82d3fa150275ed9d55e21064fc7951177baacf86a145c4a4b1f" + [[package]] name = "argh" version = "0.1.7" @@ -3750,6 +3756,7 @@ name = "web3-proxy" version = "0.1.0" dependencies = [ "anyhow", + "arc-swap", "argh", "atomic-counter", "dashmap", diff --git a/Cargo.toml b/Cargo.toml index 2411adc1..8be50d8a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +arc-swap = "1.5.0" argh = "0.1.7" anyhow = "1.0.57" atomic-counter = "1.0.1" diff --git a/src/block_watcher.rs b/src/block_watcher.rs index ab7855eb..7701de78 100644 --- a/src/block_watcher.rs +++ b/src/block_watcher.rs @@ -1,10 +1,11 @@ ///! Track the head block of all the web3 providers +use arc_swap::ArcSwapOption; +use dashmap::DashMap; use ethers::prelude::{Block, TxHash}; use std::cmp::Ordering; -use std::collections::HashMap; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; -use tokio::sync::{mpsc, RwLock}; +use tokio::sync::{mpsc, Mutex}; use tracing::info; // TODO: what type for the Item? String url works, but i don't love it @@ -15,10 +16,11 @@ pub type BlockWatcherReceiver = mpsc::UnboundedReceiver; pub struct BlockWatcher { sender: BlockWatcherSender, - receiver: RwLock, - /// TODO: i don't think we want a hashmap. we want a left-right or some other concurrent map - blocks: RwLock>>, - latest_block: RwLock>>, + receiver: Mutex, + // TODO: i don't think we want a RwLock. we want an ArcSwap or something + // TODO: should we just store the block number? + blocks: DashMap>>, + head_block: ArcSwapOption>, } impl BlockWatcher { @@ -28,9 +30,9 @@ impl BlockWatcher { Self { sender, - receiver: RwLock::new(receiver), + receiver: Mutex::new(receiver), blocks: Default::default(), - latest_block: RwLock::new(None), + head_block: Default::default(), } } @@ -38,38 +40,81 @@ impl BlockWatcher { self.sender.clone() } - pub async fn run(self: Arc) -> anyhow::Result<()> { - let mut receiver = self.receiver.write().await; + pub async fn is_synced(&self, rpc: String, allowed_lag: u64) -> anyhow::Result { + match self.head_block.load().as_ref() { + None => Ok(false), + Some(latest_block) => { + match self.blocks.get(&rpc) { + None => Ok(false), + Some(rpc_block) => { + match latest_block.number.cmp(&rpc_block.number) { + Ordering::Equal => Ok(true), + Ordering::Greater => { + // this probably won't happen, but it might if the block arrives at the exact wrong time + Ok(true) + } + Ordering::Less => { + // allow being some behind + let lag = latest_block.number.expect("no block") + - rpc_block.number.expect("no block"); + Ok(lag.as_u64() <= allowed_lag) + } + } + } + } + } + } + } - while let Some((rpc, block)) = receiver.recv().await { + pub async fn run(self: Arc) -> anyhow::Result<()> { + let mut receiver = self.receiver.lock().await; + + while let Some((rpc, new_block)) = receiver.recv().await { let now = SystemTime::now() .duration_since(UNIX_EPOCH) .expect("Time went backwards") .as_secs() as i64; { - let blocks = self.blocks.read().await; - if blocks.get(&rpc) == Some(&block) { - // we already have this block - continue; + if let Some(current_block) = self.blocks.get(&rpc) { + // if we already have this block height + // TODO: should we compare more than just height? hash too? + if current_block.number == new_block.number { + continue; + } } } + let rpc_block = Arc::new(new_block); + // save the block for this rpc - self.blocks.write().await.insert(rpc.clone(), block.clone()); + // TODO: this could be more efficient. store the actual chain as a graph and then have self.blocks point to that + self.blocks.insert(rpc.clone(), rpc_block.clone()); - // TODO: we don't always need this to have a write lock - let mut latest_block = self.latest_block.write().await; + let head_number = self.head_block.load(); - let label_slow_blocks = if latest_block.is_none() { - *latest_block = Some(block.clone()); + let label_slow_heads = if head_number.is_none() { + self.head_block.swap(Some(rpc_block.clone())); "+" } else { - // TODO: what if they have the same number but different hashes? or aren't on the same chain? - match block.number.cmp(&latest_block.as_ref().unwrap().number) { - Ordering::Equal => "", + // TODO: what if they have the same number but different hashes? + // TODO: alert if there is a large chain split? + let head_number = head_number + .as_ref() + .expect("should be impossible") + .number + .expect("should be impossible"); + + let rpc_number = rpc_block.number.expect("should be impossible"); + + match rpc_number.cmp(&head_number) { + Ordering::Equal => { + // this block is saved + "" + } Ordering::Greater => { - *latest_block = Some(block.clone()); + // new_block is the new head_block + self.head_block.swap(Some(rpc_block.clone())); "+" } Ordering::Less => { @@ -82,13 +127,13 @@ impl BlockWatcher { // TODO: include time since last update? info!( "{:?} = {} Ts: {:?}, block number: {}, age: {}s {}", - block.hash.unwrap(), + rpc_block.hash.unwrap(), rpc, // TODO: human readable time? - block.timestamp, - block.number.unwrap(), - now - block.timestamp.as_u64() as i64, - label_slow_blocks + rpc_block.timestamp, + rpc_block.number.unwrap(), + now - rpc_block.timestamp.as_u64() as i64, + label_slow_heads ); } diff --git a/src/provider_tiers.rs b/src/provider_tiers.rs index 8edeed69..eaaa1bec 100644 --- a/src/provider_tiers.rs +++ b/src/provider_tiers.rs @@ -89,6 +89,14 @@ impl Web3ProviderTier { for selected_rpc in balanced_rpcs.iter() { // TODO: check current block number. if too far behind, make our own NotUntil here + if !block_watcher + .is_synced(selected_rpc.clone(), 3) + .await + .expect("checking is_synced failed") + { + // skip this rpc because it is not synced + continue; + } let ratelimits = self.ratelimits.write().await; @@ -145,6 +153,16 @@ impl Web3ProviderTier { let mut selected_rpcs = vec![]; for selected_rpc in self.rpcs.read().await.iter() { + // check that the server is synced + if !block_watcher + .is_synced(selected_rpc.clone(), 3) + .await + .expect("checking is_synced failed") + { + // skip this rpc because it is not synced + continue; + } + // check rate limits match self .ratelimits