better output

This commit is contained in:
Bryan Stitt 2022-04-27 05:17:52 +00:00
parent 0b5d2ca1cf
commit 570d11a987
2 changed files with 63 additions and 68 deletions

@ -1,8 +1,8 @@
///! Track the head block of all the web3 providers
use arc_swap::ArcSwapOption;
use dashmap::DashMap;
use ethers::prelude::{Block, TxHash};
use std::cmp::Ordering;
use std::cmp;
use std::sync::atomic::{self, AtomicU64};
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::sync::{mpsc, Mutex};
@ -19,8 +19,8 @@ pub struct BlockWatcher {
receiver: Mutex<BlockWatcherReceiver>,
// TODO: i don't think we want a RwLock. we want an ArcSwap or something
// TODO: should we just store the block number?
blocks: DashMap<String, Arc<Block<TxHash>>>,
head_block: ArcSwapOption<Block<TxHash>>,
block_numbers: DashMap<String, u64>,
head_block_number: AtomicU64,
}
impl BlockWatcher {
@ -31,8 +31,8 @@ impl BlockWatcher {
Self {
sender,
receiver: Mutex::new(receiver),
blocks: Default::default(),
head_block: Default::default(),
block_numbers: Default::default(),
head_block_number: Default::default(),
}
}
@ -41,25 +41,24 @@ impl BlockWatcher {
}
pub async fn is_synced(&self, rpc: String, allowed_lag: u64) -> anyhow::Result<bool> {
match self.head_block.load().as_ref() {
None => Ok(false),
Some(latest_block) => {
match self.blocks.get(&rpc) {
None => Ok(false),
Some(rpc_block) => {
match latest_block.number.cmp(&rpc_block.number) {
Ordering::Equal => Ok(true),
Ordering::Greater => {
// this probably won't happen, but it might if the block arrives at the exact wrong time
Ok(true)
}
Ordering::Less => {
// allow being some behind
let lag = latest_block.number.expect("no block")
- rpc_block.number.expect("no block");
Ok(lag.as_u64() <= allowed_lag)
}
}
match (
self.head_block_number.load(atomic::Ordering::SeqCst),
self.block_numbers.get(&rpc),
) {
(0, _) => Ok(false),
(_, None) => Ok(false),
(head_block_number, Some(rpc_block_number)) => {
match head_block_number.cmp(&rpc_block_number) {
cmp::Ordering::Equal => Ok(true),
cmp::Ordering::Greater => {
// this probably won't happen, but it might if the block arrives at the exact wrong time
Ok(true)
}
cmp::Ordering::Less => {
// allow being some behind
// TODO: why do we need a clone here?
let lag = head_block_number - *rpc_block_number;
Ok(lag <= allowed_lag)
}
}
}
@ -70,56 +69,52 @@ impl BlockWatcher {
let mut receiver = self.receiver.lock().await;
while let Some((rpc, new_block)) = receiver.recv().await {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_secs() as i64;
let new_block_number = new_block.number.unwrap().as_u64();
{
if let Some(current_block) = self.blocks.get(&rpc) {
if let Some(rpc_block_number) = self.block_numbers.get(&rpc) {
// if we already have this block height
// this probably own't happen with websockets, but is likely with polling against http rpcs
// TODO: should we compare more than just height? hash too?
if current_block.number == new_block.number {
if *rpc_block_number == new_block_number {
continue;
}
}
}
let rpc_block = Arc::new(new_block);
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_secs() as i64;
// save the block for this rpc
// TODO: this could be more efficient. store the actual chain as a graph and then have self.blocks point to that
self.blocks.insert(rpc.clone(), rpc_block.clone());
// TODO:store the actual chain as a graph and then have self.blocks point to that?
self.block_numbers.insert(rpc.clone(), new_block_number);
let head_number = self.head_block.load();
let head_number = self.head_block_number.load(atomic::Ordering::SeqCst);
let label_slow_heads = if head_number.is_none() {
self.head_block.swap(Some(rpc_block.clone()));
"+"
let label_slow_heads = if head_number == 0 {
self.head_block_number
.swap(new_block_number, atomic::Ordering::SeqCst);
"+".to_string()
} else {
// TODO: what if they have the same number but different hashes?
// TODO: alert if there is a large chain split?
let head_number = head_number
.as_ref()
.expect("should be impossible")
.number
.expect("should be impossible");
let rpc_number = rpc_block.number.expect("should be impossible");
match rpc_number.cmp(&head_number) {
Ordering::Equal => {
match (new_block_number).cmp(&head_number) {
cmp::Ordering::Equal => {
// this block is saved
""
"".to_string()
}
Ordering::Greater => {
cmp::Ordering::Greater => {
// new_block is the new head_block
self.head_block.swap(Some(rpc_block.clone()));
"+"
self.head_block_number
.swap(new_block_number, atomic::Ordering::SeqCst);
"+".to_string()
}
Ordering::Less => {
cmp::Ordering::Less => {
// TODO: include how many blocks behind?
"-"
let lag = new_block_number as i64 - head_number as i64;
lag.to_string()
}
}
};
@ -127,12 +122,12 @@ impl BlockWatcher {
// TODO: include time since last update?
info!(
"{:?} = {} Ts: {:?}, block number: {}, age: {}s {}",
rpc_block.hash.unwrap(),
new_block.hash.unwrap(),
rpc,
// TODO: human readable time?
rpc_block.timestamp,
rpc_block.number.unwrap(),
now - rpc_block.timestamp.as_u64() as i64,
new_block.timestamp,
new_block.number.unwrap(),
now - new_block.timestamp.as_u64() as i64,
label_slow_heads
);
}

@ -4,6 +4,7 @@ use ethers::prelude::Middleware;
use futures::StreamExt;
use std::time::Duration;
use std::{cmp::Ordering, sync::Arc};
use tokio::time::interval;
use tracing::{info, warn};
use crate::block_watcher::BlockWatcherSender;
@ -40,17 +41,16 @@ impl Web3Provider {
// TODO: automatically reconnect
match &self {
Web3Provider::Http(_provider) => {
/*
// TODO: not all providers have this. we need to write our interval checking
let mut stream = provider.watch_blocks().await?;
while let Some(block_number) = stream.next().await {
let block = provider.get_block(block_number).await?.expect("no block");
block_watcher_sender
.send(Some((url.clone(), block)))
.unwrap();
// TODO: there is a "watch_blocks" function, but a lot of public nodes do not support the necessary rpc endpoints
// TODO: how often?
let mut interval = interval(Duration::from_secs(2));
loop {
// wait for 2 seconds
interval.tick().await;
unimplemented!("todo: send block number")
}
*/
info!("work in progress");
}
Web3Provider::Ws(provider) => {
let mut stream = provider.subscribe_blocks().await?;