interval instead of watch_blocks

This commit is contained in:
Bryan Stitt 2023-06-12 22:26:10 -07:00
parent 6324d9c7d9
commit 55e8471b19
2 changed files with 20 additions and 16 deletions

View File

@ -82,10 +82,7 @@ pub enum JsonRpcRequestEnum {
impl JsonRpcRequestEnum {
pub fn first_id(&self) -> Option<Box<RawValue>> {
match self {
Self::Batch(x) => match x.first() {
Some(x) => Some(x.id.clone()),
None => None,
},
Self::Batch(x) => x.first().map(|x| x.id.clone()),
Self::Single(x) => Some(x.id.clone()),
}
}

View File

@ -31,13 +31,14 @@ use std::sync::atomic::{self, AtomicU32, AtomicU64, AtomicUsize};
use std::{cmp::Ordering, sync::Arc};
use tokio::select;
use tokio::sync::watch;
use tokio::time::{sleep, sleep_until, timeout, Duration, Instant};
use tokio::time::{interval, sleep, sleep_until, timeout, Duration, Instant, MissedTickBehavior};
use url::Url;
/// An active connection to a Web3 RPC server like geth or erigon.
#[derive(Default)]
pub struct Web3Rpc {
pub name: String,
pub block_interval: Duration,
pub display_name: Option<String>,
pub db_conn: Option<DatabaseConnection>,
/// most all requests prefer use the http_provider
@ -190,6 +191,7 @@ impl Web3Rpc {
automatic_block_limit,
backup,
block_data_limit,
block_interval,
created_at: Some(created_at),
db_conn,
display_name: config.display_name,
@ -804,25 +806,30 @@ impl Web3Rpc {
self.send_head_block_result(Ok(Some(block)), &block_sender, &block_map)
.await?;
}
} else if let Some(http_provider) = self.http_provider.as_ref() {
} else if self.http_provider.is_some() {
// there is a "watch_blocks" function, but a lot of public nodes do not support the necessary rpc endpoints
let mut blocks = http_provider.watch_blocks().await?;
// TODO: is 1/2 the block time okay?
let mut i = interval(self.block_interval / 2);
i.set_missed_tick_behavior(MissedTickBehavior::Delay);
while let Some(block_hash) = blocks.next().await {
loop {
if *subscribe_stop_rx.borrow() {
break;
}
let block = if let Some(block) = block_map.get(&block_hash) {
block.block
} else if let Some(block) = http_provider.get_block(block_hash).await? {
Arc::new(block)
} else {
continue;
};
let block_result = self
.authorized_request::<_, Option<ArcBlock>>(
"eth_getBlockByNumber",
&("latest", false),
&authorization,
Some(Level::Warn.into()),
)
.await;
self.send_head_block_result(Ok(Some(block)), &block_sender, &block_map)
self.send_head_block_result(block_result, &block_sender, &block_map)
.await?;
i.tick().await;
}
} else {
unimplemented!("no ws or http provider!")