track latest blocks

This commit is contained in:
Bryan Stitt 2022-04-27 04:36:11 +00:00
parent d54196f868
commit 0b5d2ca1cf
4 changed files with 100 additions and 29 deletions

7
Cargo.lock generated

@ -48,6 +48,12 @@ version = "1.0.57"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "08f9b8508dccb7687a1d6c4ce66b2b0ecef467c94667de27d8d7fe1f8d2a9cdc"
[[package]]
name = "arc-swap"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c5d78ce20460b82d3fa150275ed9d55e21064fc7951177baacf86a145c4a4b1f"
[[package]]
name = "argh"
version = "0.1.7"
@ -3750,6 +3756,7 @@ name = "web3-proxy"
version = "0.1.0"
dependencies = [
"anyhow",
"arc-swap",
"argh",
"atomic-counter",
"dashmap",

@ -6,6 +6,7 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
arc-swap = "1.5.0"
argh = "0.1.7"
anyhow = "1.0.57"
atomic-counter = "1.0.1"

@ -1,10 +1,11 @@
///! Track the head block of all the web3 providers
use arc_swap::ArcSwapOption;
use dashmap::DashMap;
use ethers::prelude::{Block, TxHash};
use std::cmp::Ordering;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::sync::{mpsc, RwLock};
use tokio::sync::{mpsc, Mutex};
use tracing::info;
// TODO: what type for the Item? String url works, but i don't love it
@ -15,10 +16,11 @@ pub type BlockWatcherReceiver = mpsc::UnboundedReceiver<NewHead>;
pub struct BlockWatcher {
sender: BlockWatcherSender,
receiver: RwLock<BlockWatcherReceiver>,
/// TODO: i don't think we want a hashmap. we want a left-right or some other concurrent map
blocks: RwLock<HashMap<String, Block<TxHash>>>,
latest_block: RwLock<Option<Block<TxHash>>>,
receiver: Mutex<BlockWatcherReceiver>,
// TODO: i don't think we want a RwLock. we want an ArcSwap or something
// TODO: should we just store the block number?
blocks: DashMap<String, Arc<Block<TxHash>>>,
head_block: ArcSwapOption<Block<TxHash>>,
}
impl BlockWatcher {
@ -28,9 +30,9 @@ impl BlockWatcher {
Self {
sender,
receiver: RwLock::new(receiver),
receiver: Mutex::new(receiver),
blocks: Default::default(),
latest_block: RwLock::new(None),
head_block: Default::default(),
}
}
@ -38,38 +40,81 @@ impl BlockWatcher {
self.sender.clone()
}
pub async fn run(self: Arc<Self>) -> anyhow::Result<()> {
let mut receiver = self.receiver.write().await;
pub async fn is_synced(&self, rpc: String, allowed_lag: u64) -> anyhow::Result<bool> {
match self.head_block.load().as_ref() {
None => Ok(false),
Some(latest_block) => {
match self.blocks.get(&rpc) {
None => Ok(false),
Some(rpc_block) => {
match latest_block.number.cmp(&rpc_block.number) {
Ordering::Equal => Ok(true),
Ordering::Greater => {
// this probably won't happen, but it might if the block arrives at the exact wrong time
Ok(true)
}
Ordering::Less => {
// allow being some behind
let lag = latest_block.number.expect("no block")
- rpc_block.number.expect("no block");
Ok(lag.as_u64() <= allowed_lag)
}
}
}
}
}
}
}
while let Some((rpc, block)) = receiver.recv().await {
pub async fn run(self: Arc<Self>) -> anyhow::Result<()> {
let mut receiver = self.receiver.lock().await;
while let Some((rpc, new_block)) = receiver.recv().await {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_secs() as i64;
{
let blocks = self.blocks.read().await;
if blocks.get(&rpc) == Some(&block) {
// we already have this block
continue;
if let Some(current_block) = self.blocks.get(&rpc) {
// if we already have this block height
// TODO: should we compare more than just height? hash too?
if current_block.number == new_block.number {
continue;
}
}
}
let rpc_block = Arc::new(new_block);
// save the block for this rpc
self.blocks.write().await.insert(rpc.clone(), block.clone());
// TODO: this could be more efficient. store the actual chain as a graph and then have self.blocks point to that
self.blocks.insert(rpc.clone(), rpc_block.clone());
// TODO: we don't always need this to have a write lock
let mut latest_block = self.latest_block.write().await;
let head_number = self.head_block.load();
let label_slow_blocks = if latest_block.is_none() {
*latest_block = Some(block.clone());
let label_slow_heads = if head_number.is_none() {
self.head_block.swap(Some(rpc_block.clone()));
"+"
} else {
// TODO: what if they have the same number but different hashes? or aren't on the same chain?
match block.number.cmp(&latest_block.as_ref().unwrap().number) {
Ordering::Equal => "",
// TODO: what if they have the same number but different hashes?
// TODO: alert if there is a large chain split?
let head_number = head_number
.as_ref()
.expect("should be impossible")
.number
.expect("should be impossible");
let rpc_number = rpc_block.number.expect("should be impossible");
match rpc_number.cmp(&head_number) {
Ordering::Equal => {
// this block is saved
""
}
Ordering::Greater => {
*latest_block = Some(block.clone());
// new_block is the new head_block
self.head_block.swap(Some(rpc_block.clone()));
"+"
}
Ordering::Less => {
@ -82,13 +127,13 @@ impl BlockWatcher {
// TODO: include time since last update?
info!(
"{:?} = {} Ts: {:?}, block number: {}, age: {}s {}",
block.hash.unwrap(),
rpc_block.hash.unwrap(),
rpc,
// TODO: human readable time?
block.timestamp,
block.number.unwrap(),
now - block.timestamp.as_u64() as i64,
label_slow_blocks
rpc_block.timestamp,
rpc_block.number.unwrap(),
now - rpc_block.timestamp.as_u64() as i64,
label_slow_heads
);
}

@ -89,6 +89,14 @@ impl Web3ProviderTier {
for selected_rpc in balanced_rpcs.iter() {
// TODO: check current block number. if too far behind, make our own NotUntil here
if !block_watcher
.is_synced(selected_rpc.clone(), 3)
.await
.expect("checking is_synced failed")
{
// skip this rpc because it is not synced
continue;
}
let ratelimits = self.ratelimits.write().await;
@ -145,6 +153,16 @@ impl Web3ProviderTier {
let mut selected_rpcs = vec![];
for selected_rpc in self.rpcs.read().await.iter() {
// check that the server is synced
if !block_watcher
.is_synced(selected_rpc.clone(), 3)
.await
.expect("checking is_synced failed")
{
// skip this rpc because it is not synced
continue;
}
// check rate limits
match self
.ratelimits