more block watcher

This commit is contained in:
Bryan Stitt 2022-04-26 21:42:10 +00:00
parent 1af30a46d3
commit bbc2e8d3d7
2 changed files with 38 additions and 8 deletions

@ -2,13 +2,14 @@ use ethers::prelude::{Block, TxHash};
use governor::clock::{Clock, QuantaClock, QuantaInstant};
use std::collections::HashMap;
use tokio::sync::mpsc;
use tokio::time::{interval, Duration};
use tracing::{info, warn};
// TODO: what type for the Item? String url works, but i don't love it
// TODO: think about boxing this
#[derive(Debug)]
pub enum BlockWatcherItem {
NewHead((String, Block<TxHash>)),
NewHead(Box<(String, Block<TxHash>)>),
SubscribeHttp(String),
Interval,
}
@ -18,8 +19,9 @@ pub type BlockWatcherReceiver = mpsc::UnboundedReceiver<BlockWatcherItem>;
pub struct BlockWatcher {
clock: QuantaClock,
sender: BlockWatcherSender,
receiver: BlockWatcherReceiver,
last_update: QuantaInstant,
last_poll: QuantaInstant,
/// TODO: i don't think we want a hashmap. we want a left-right or some other concurrent map
blocks: HashMap<String, Block<TxHash>>,
}
@ -29,11 +31,12 @@ impl BlockWatcher {
// TODO: this also needs to return a reader for blocks
let (sender, receiver) = mpsc::unbounded_channel();
let last_update = clock.now();
let last_poll = clock.now();
let watcher = Self {
clock,
last_update,
last_poll,
sender: sender.clone(),
receiver,
blocks: Default::default(),
};
@ -42,14 +45,39 @@ impl BlockWatcher {
}
pub async fn run(&mut self) -> anyhow::Result<()> {
// TODO:
// TODO: we should probably set this to something like blocktime / 3
let mut poll_interval = interval(Duration::from_secs(2));
// TODO: think more about what missed tick behavior we want
// interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
// TODO: only do this if we have http providers to watch
let interval_sender = self.sender.clone();
tokio::spawn(async move {
loop {
poll_interval.tick().await;
interval_sender
.send(BlockWatcherItem::Interval)
.expect("sending BlockWatcherItem::Interval failed");
}
});
// don't poll faster than every second
let min_wait_nanos = 1_000_000_000.into();
while let Some(x) = self.receiver.recv().await {
match x {
BlockWatcherItem::Interval => {
if self.clock.now() < self.last_poll + min_wait_nanos {
// we already updated recently
continue;
}
// TODO: we got an interval. if we haven't updated the blocks recently,
info!("TODO: query all the subscribed http providers")
}
BlockWatcherItem::NewHead((rpc, block)) => {
BlockWatcherItem::NewHead(new_head) => {
let (rpc, block) = *new_head;
info!(
"{:?} = {} Ts: {:?}, block number: {}",
block.hash.unwrap(),
@ -59,7 +87,9 @@ impl BlockWatcher {
);
self.blocks.insert(rpc, block);
self.last_update = self.clock.now();
self.sender
.send(BlockWatcherItem::Interval)
.expect("sending BlockWatcherItem::Interval failed");
}
BlockWatcherItem::SubscribeHttp(rpc) => {
warn!("subscribing to {} is not yet supported", rpc);

@ -57,7 +57,7 @@ impl Web3Provider {
let mut stream = provider.subscribe_blocks().await?;
while let Some(block) = stream.next().await {
block_watcher_sender
.send(BlockWatcherItem::NewHead((url.clone(), block)))
.send(BlockWatcherItem::NewHead(Box::new((url.clone(), block))))
.unwrap();
}
}