From 55e8471b19094744725145dfc9656d9e3bbecbc1 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Mon, 12 Jun 2023 22:26:10 -0700 Subject: [PATCH] interval instead of watch_blocks --- web3_proxy/src/jsonrpc.rs | 5 +---- web3_proxy/src/rpcs/one.rs | 31 +++++++++++++++++++------------ 2 files changed, 20 insertions(+), 16 deletions(-) diff --git a/web3_proxy/src/jsonrpc.rs b/web3_proxy/src/jsonrpc.rs index 9cccfa23..fd6ef7f1 100644 --- a/web3_proxy/src/jsonrpc.rs +++ b/web3_proxy/src/jsonrpc.rs @@ -82,10 +82,7 @@ pub enum JsonRpcRequestEnum { impl JsonRpcRequestEnum { pub fn first_id(&self) -> Option> { match self { - Self::Batch(x) => match x.first() { - Some(x) => Some(x.id.clone()), - None => None, - }, + Self::Batch(x) => x.first().map(|x| x.id.clone()), Self::Single(x) => Some(x.id.clone()), } } diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index 12d80f44..0026f31e 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -31,13 +31,14 @@ use std::sync::atomic::{self, AtomicU32, AtomicU64, AtomicUsize}; use std::{cmp::Ordering, sync::Arc}; use tokio::select; use tokio::sync::watch; -use tokio::time::{sleep, sleep_until, timeout, Duration, Instant}; +use tokio::time::{interval, sleep, sleep_until, timeout, Duration, Instant, MissedTickBehavior}; use url::Url; /// An active connection to a Web3 RPC server like geth or erigon. #[derive(Default)] pub struct Web3Rpc { pub name: String, + pub block_interval: Duration, pub display_name: Option, pub db_conn: Option, /// most all requests prefer use the http_provider @@ -190,6 +191,7 @@ impl Web3Rpc { automatic_block_limit, backup, block_data_limit, + block_interval, created_at: Some(created_at), db_conn, display_name: config.display_name, @@ -804,25 +806,30 @@ impl Web3Rpc { self.send_head_block_result(Ok(Some(block)), &block_sender, &block_map) .await?; } - } else if let Some(http_provider) = self.http_provider.as_ref() { + } else if self.http_provider.is_some() { // there is a "watch_blocks" function, but a lot of public nodes do not support the necessary rpc endpoints - let mut blocks = http_provider.watch_blocks().await?; + // TODO: is 1/2 the block time okay? + let mut i = interval(self.block_interval / 2); + i.set_missed_tick_behavior(MissedTickBehavior::Delay); - while let Some(block_hash) = blocks.next().await { + loop { if *subscribe_stop_rx.borrow() { break; } - let block = if let Some(block) = block_map.get(&block_hash) { - block.block - } else if let Some(block) = http_provider.get_block(block_hash).await? { - Arc::new(block) - } else { - continue; - }; + let block_result = self + .authorized_request::<_, Option>( + "eth_getBlockByNumber", + &("latest", false), + &authorization, + Some(Level::Warn.into()), + ) + .await; - self.send_head_block_result(Ok(Some(block)), &block_sender, &block_map) + self.send_head_block_result(block_result, &block_sender, &block_map) .await?; + + i.tick().await; } } else { unimplemented!("no ws or http provider!")