diff --git a/src/block_watcher.rs b/src/block_watcher.rs index f50fc2c8..40342b6a 100644 --- a/src/block_watcher.rs +++ b/src/block_watcher.rs @@ -2,13 +2,14 @@ use ethers::prelude::{Block, TxHash}; use governor::clock::{Clock, QuantaClock, QuantaInstant}; use std::collections::HashMap; use tokio::sync::mpsc; +use tokio::time::{interval, Duration}; use tracing::{info, warn}; // TODO: what type for the Item? String url works, but i don't love it // TODO: think about boxing this #[derive(Debug)] pub enum BlockWatcherItem { - NewHead((String, Block)), + NewHead(Box<(String, Block)>), SubscribeHttp(String), Interval, } @@ -18,8 +19,9 @@ pub type BlockWatcherReceiver = mpsc::UnboundedReceiver; pub struct BlockWatcher { clock: QuantaClock, + sender: BlockWatcherSender, receiver: BlockWatcherReceiver, - last_update: QuantaInstant, + last_poll: QuantaInstant, /// TODO: i don't think we want a hashmap. we want a left-right or some other concurrent map blocks: HashMap>, } @@ -29,11 +31,12 @@ impl BlockWatcher { // TODO: this also needs to return a reader for blocks let (sender, receiver) = mpsc::unbounded_channel(); - let last_update = clock.now(); + let last_poll = clock.now(); let watcher = Self { clock, - last_update, + last_poll, + sender: sender.clone(), receiver, blocks: Default::default(), }; @@ -42,14 +45,39 @@ impl BlockWatcher { } pub async fn run(&mut self) -> anyhow::Result<()> { - // TODO: + // TODO: we should probably set this to something like blocktime / 3 + let mut poll_interval = interval(Duration::from_secs(2)); + // TODO: think more about what missed tick behavior we want + // interval.set_missed_tick_behavior(MissedTickBehavior::Skip); + + // TODO: only do this if we have http providers to watch + let interval_sender = self.sender.clone(); + tokio::spawn(async move { + loop { + poll_interval.tick().await; + + interval_sender + .send(BlockWatcherItem::Interval) + .expect("sending BlockWatcherItem::Interval failed"); + } + }); + + // don't poll faster than every second + let min_wait_nanos = 1_000_000_000.into(); while let Some(x) = self.receiver.recv().await { match x { BlockWatcherItem::Interval => { + if self.clock.now() < self.last_poll + min_wait_nanos { + // we already updated recently + continue; + } + // TODO: we got an interval. if we haven't updated the blocks recently, + info!("TODO: query all the subscribed http providers") } - BlockWatcherItem::NewHead((rpc, block)) => { + BlockWatcherItem::NewHead(new_head) => { + let (rpc, block) = *new_head; info!( "{:?} = {} Ts: {:?}, block number: {}", block.hash.unwrap(), @@ -59,7 +87,9 @@ impl BlockWatcher { ); self.blocks.insert(rpc, block); - self.last_update = self.clock.now(); + self.sender + .send(BlockWatcherItem::Interval) + .expect("sending BlockWatcherItem::Interval failed"); } BlockWatcherItem::SubscribeHttp(rpc) => { warn!("subscribing to {} is not yet supported", rpc); diff --git a/src/provider.rs b/src/provider.rs index e00d0904..3668bf1b 100644 --- a/src/provider.rs +++ b/src/provider.rs @@ -57,7 +57,7 @@ impl Web3Provider { let mut stream = provider.subscribe_blocks().await?; while let Some(block) = stream.next().await { block_watcher_sender - .send(BlockWatcherItem::NewHead((url.clone(), block))) + .send(BlockWatcherItem::NewHead(Box::new((url.clone(), block)))) .unwrap(); } }