diff --git a/src/block_watcher.rs b/src/block_watcher.rs index 7701de78..6509ac95 100644 --- a/src/block_watcher.rs +++ b/src/block_watcher.rs @@ -1,8 +1,8 @@ ///! Track the head block of all the web3 providers -use arc_swap::ArcSwapOption; use dashmap::DashMap; use ethers::prelude::{Block, TxHash}; -use std::cmp::Ordering; +use std::cmp; +use std::sync::atomic::{self, AtomicU64}; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; use tokio::sync::{mpsc, Mutex}; @@ -19,8 +19,8 @@ pub struct BlockWatcher { receiver: Mutex, // TODO: i don't think we want a RwLock. we want an ArcSwap or something // TODO: should we just store the block number? - blocks: DashMap>>, - head_block: ArcSwapOption>, + block_numbers: DashMap, + head_block_number: AtomicU64, } impl BlockWatcher { @@ -31,8 +31,8 @@ impl BlockWatcher { Self { sender, receiver: Mutex::new(receiver), - blocks: Default::default(), - head_block: Default::default(), + block_numbers: Default::default(), + head_block_number: Default::default(), } } @@ -41,25 +41,24 @@ impl BlockWatcher { } pub async fn is_synced(&self, rpc: String, allowed_lag: u64) -> anyhow::Result { - match self.head_block.load().as_ref() { - None => Ok(false), - Some(latest_block) => { - match self.blocks.get(&rpc) { - None => Ok(false), - Some(rpc_block) => { - match latest_block.number.cmp(&rpc_block.number) { - Ordering::Equal => Ok(true), - Ordering::Greater => { - // this probably won't happen, but it might if the block arrives at the exact wrong time - Ok(true) - } - Ordering::Less => { - // allow being some behind - let lag = latest_block.number.expect("no block") - - rpc_block.number.expect("no block"); - Ok(lag.as_u64() <= allowed_lag) - } - } + match ( + self.head_block_number.load(atomic::Ordering::SeqCst), + self.block_numbers.get(&rpc), + ) { + (0, _) => Ok(false), + (_, None) => Ok(false), + (head_block_number, Some(rpc_block_number)) => { + match head_block_number.cmp(&rpc_block_number) { + cmp::Ordering::Equal => Ok(true), + cmp::Ordering::Greater => { + // this probably won't happen, but it might if the block arrives at the exact wrong time + Ok(true) + } + cmp::Ordering::Less => { + // allow being some behind + // TODO: why do we need a clone here? + let lag = head_block_number - *rpc_block_number; + Ok(lag <= allowed_lag) } } } @@ -70,56 +69,52 @@ impl BlockWatcher { let mut receiver = self.receiver.lock().await; while let Some((rpc, new_block)) = receiver.recv().await { - let now = SystemTime::now() - .duration_since(UNIX_EPOCH) - .expect("Time went backwards") - .as_secs() as i64; + let new_block_number = new_block.number.unwrap().as_u64(); { - if let Some(current_block) = self.blocks.get(&rpc) { + if let Some(rpc_block_number) = self.block_numbers.get(&rpc) { // if we already have this block height + // this probably own't happen with websockets, but is likely with polling against http rpcs // TODO: should we compare more than just height? hash too? - if current_block.number == new_block.number { + if *rpc_block_number == new_block_number { continue; } } } - let rpc_block = Arc::new(new_block); + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("Time went backwards") + .as_secs() as i64; // save the block for this rpc - // TODO: this could be more efficient. store the actual chain as a graph and then have self.blocks point to that - self.blocks.insert(rpc.clone(), rpc_block.clone()); + // TODO:store the actual chain as a graph and then have self.blocks point to that? + self.block_numbers.insert(rpc.clone(), new_block_number); - let head_number = self.head_block.load(); + let head_number = self.head_block_number.load(atomic::Ordering::SeqCst); - let label_slow_heads = if head_number.is_none() { - self.head_block.swap(Some(rpc_block.clone())); - "+" + let label_slow_heads = if head_number == 0 { + self.head_block_number + .swap(new_block_number, atomic::Ordering::SeqCst); + "+".to_string() } else { // TODO: what if they have the same number but different hashes? // TODO: alert if there is a large chain split? - let head_number = head_number - .as_ref() - .expect("should be impossible") - .number - .expect("should be impossible"); - - let rpc_number = rpc_block.number.expect("should be impossible"); - - match rpc_number.cmp(&head_number) { - Ordering::Equal => { + match (new_block_number).cmp(&head_number) { + cmp::Ordering::Equal => { // this block is saved - "" + "".to_string() } - Ordering::Greater => { + cmp::Ordering::Greater => { // new_block is the new head_block - self.head_block.swap(Some(rpc_block.clone())); - "+" + self.head_block_number + .swap(new_block_number, atomic::Ordering::SeqCst); + "+".to_string() } - Ordering::Less => { + cmp::Ordering::Less => { // TODO: include how many blocks behind? - "-" + let lag = new_block_number as i64 - head_number as i64; + lag.to_string() } } }; @@ -127,12 +122,12 @@ impl BlockWatcher { // TODO: include time since last update? info!( "{:?} = {} Ts: {:?}, block number: {}, age: {}s {}", - rpc_block.hash.unwrap(), + new_block.hash.unwrap(), rpc, // TODO: human readable time? - rpc_block.timestamp, - rpc_block.number.unwrap(), - now - rpc_block.timestamp.as_u64() as i64, + new_block.timestamp, + new_block.number.unwrap(), + now - new_block.timestamp.as_u64() as i64, label_slow_heads ); } diff --git a/src/provider.rs b/src/provider.rs index 8003d1af..c4fe493b 100644 --- a/src/provider.rs +++ b/src/provider.rs @@ -4,6 +4,7 @@ use ethers::prelude::Middleware; use futures::StreamExt; use std::time::Duration; use std::{cmp::Ordering, sync::Arc}; +use tokio::time::interval; use tracing::{info, warn}; use crate::block_watcher::BlockWatcherSender; @@ -40,17 +41,16 @@ impl Web3Provider { // TODO: automatically reconnect match &self { Web3Provider::Http(_provider) => { - /* - // TODO: not all providers have this. we need to write our interval checking - let mut stream = provider.watch_blocks().await?; - while let Some(block_number) = stream.next().await { - let block = provider.get_block(block_number).await?.expect("no block"); - block_watcher_sender - .send(Some((url.clone(), block))) - .unwrap(); + // TODO: there is a "watch_blocks" function, but a lot of public nodes do not support the necessary rpc endpoints + // TODO: how often? + let mut interval = interval(Duration::from_secs(2)); + + loop { + // wait for 2 seconds + interval.tick().await; + + unimplemented!("todo: send block number") } - */ - info!("work in progress"); } Web3Provider::Ws(provider) => { let mut stream = provider.subscribe_blocks().await?;