better block saving

This commit is contained in:
Bryan Stitt 2022-12-02 21:31:03 -08:00
parent 1b0cab9f54
commit eb4b487aae
8 changed files with 243 additions and 218 deletions

1
Cargo.lock generated

@ -5533,7 +5533,6 @@ dependencies = [
"num", "num",
"num-traits", "num-traits",
"parking_lot 0.12.1", "parking_lot 0.12.1",
"petgraph",
"proctitle", "proctitle",
"redis-rate-limiter", "redis-rate-limiter",
"regex", "regex",

@ -47,7 +47,6 @@ num = "0.4.0"
# TODO: import num_traits from sea-orm so we always have the same version # TODO: import num_traits from sea-orm so we always have the same version
num-traits = "0.2.15" num-traits = "0.2.15"
parking_lot = { version = "0.12.1", features = ["arc_lock"] } parking_lot = { version = "0.12.1", features = ["arc_lock"] }
petgraph = "0.6.2"
proctitle = "0.1.1" proctitle = "0.1.1"
# TODO: regex has several "perf" features that we might want to use # TODO: regex has several "perf" features that we might want to use
regex = "1.7.0" regex = "1.7.0"

@ -9,7 +9,7 @@ use crate::jsonrpc::JsonRpcForwardedResponse;
use crate::jsonrpc::JsonRpcForwardedResponseEnum; use crate::jsonrpc::JsonRpcForwardedResponseEnum;
use crate::jsonrpc::JsonRpcRequest; use crate::jsonrpc::JsonRpcRequest;
use crate::jsonrpc::JsonRpcRequestEnum; use crate::jsonrpc::JsonRpcRequestEnum;
use crate::rpcs::blockchain::{ArcBlock, BlockId}; use crate::rpcs::blockchain::{ArcBlock, SavedBlock};
use crate::rpcs::connections::Web3Connections; use crate::rpcs::connections::Web3Connections;
use crate::rpcs::request::OpenRequestHandleMetrics; use crate::rpcs::request::OpenRequestHandleMetrics;
use crate::rpcs::transactions::TxStatus; use crate::rpcs::transactions::TxStatus;
@ -859,18 +859,18 @@ impl Web3ProxyApp {
// emit stats // emit stats
// TODO: if no servers synced, wait for them to be synced? // TODO: if no servers synced, wait for them to be synced?
let head_block_id = self let head_block = self
.balanced_rpcs .balanced_rpcs
.head_block_id() .head_block_id()
.context("no servers synced")?; .context("no servers synced")?;
// we do this check before checking caches because it might modify the request params // we do this check before checking caches because it might modify the request params
// TODO: add a stat for archive vs full since they should probably cost different // TODO: add a stat for archive vs full since they should probably cost different
let request_block_id = if let Some(request_block_needed) = block_needed( let request_block = if let Some(request_block_needed) = block_needed(
authorization, authorization,
method, method,
request.params.as_mut(), request.params.as_mut(),
head_block_id.num, head_block.number(),
&self.balanced_rpcs, &self.balanced_rpcs,
) )
.await? .await?
@ -887,18 +887,20 @@ impl Web3ProxyApp {
.store(true, atomic::Ordering::Relaxed); .store(true, atomic::Ordering::Relaxed);
} }
BlockId { let request_block = self
num: request_block_needed, .balanced_rpcs
hash: request_block_hash, .block(authorization, &request_block_hash, None)
} .await?;
SavedBlock::new(request_block)
} else { } else {
head_block_id head_block
}; };
// TODO: struct for this? // TODO: struct for this?
// TODO: this can be rather large. is that okay? // TODO: this can be rather large. is that okay?
let cache_key = ( let cache_key = (
request_block_id.hash, request_block.hash(),
request.method.clone(), request.method.clone(),
request.params.clone().map(|x| x.to_string()), request.params.clone().map(|x| x.to_string()),
); );
@ -919,7 +921,7 @@ impl Web3ProxyApp {
&authorization, &authorization,
request, request,
Some(&request_metadata), Some(&request_metadata),
Some(&request_block_id.num), Some(&request_block.number()),
) )
.await?; .await?;

@ -10,7 +10,8 @@
use anyhow::Context; use anyhow::Context;
use futures::StreamExt; use futures::StreamExt;
use log::{debug, info, warn}; use log::{debug, error, info, warn};
use num::Zero;
use parking_lot::deadlock; use parking_lot::deadlock;
use std::fs; use std::fs;
use std::path::Path; use std::path::Path;
@ -78,6 +79,7 @@ fn run(
let app_frontend_port = cli_config.port; let app_frontend_port = cli_config.port;
let app_prometheus_port = cli_config.prometheus_port; let app_prometheus_port = cli_config.prometheus_port;
// start the main app
let mut spawned_app = let mut spawned_app =
Web3ProxyApp::spawn(top_config, num_workers, shutdown_sender.subscribe()).await?; Web3ProxyApp::spawn(top_config, num_workers, shutdown_sender.subscribe()).await?;
@ -90,7 +92,6 @@ fn run(
)); ));
// if everything is working, these should both run forever // if everything is working, these should both run forever
// TODO: join these instead and use shutdown handler properly. probably use tokio's ctrl+c helper
tokio::select! { tokio::select! {
x = flatten_handles(spawned_app.app_handles) => { x = flatten_handles(spawned_app.app_handles) => {
match x { match x {
@ -139,16 +140,29 @@ fn run(
warn!("shutdown sender err={:?}", err); warn!("shutdown sender err={:?}", err);
}; };
// wait on all the important background tasks (like saving stats to the database) to complete // wait for things like saving stats to the database to complete
info!("waiting on important background tasks");
let mut background_errors = 0;
while let Some(x) = spawned_app.background_handles.next().await { while let Some(x) = spawned_app.background_handles.next().await {
match x { match x {
Err(e) => return Err(e.into()), Err(e) => {
Ok(Err(e)) => return Err(e), error!("{:?}", e);
background_errors += 1;
}
Ok(Err(e)) => {
error!("{:?}", e);
background_errors += 1;
}
Ok(Ok(_)) => continue, Ok(Ok(_)) => continue,
} }
} }
info!("finished"); if background_errors.is_zero() {
info!("finished");
} else {
// TODO: collect instead?
error!("finished with errors!")
}
Ok(()) Ok(())
}) })

@ -7,7 +7,6 @@ use crate::{
config::BlockAndRpc, jsonrpc::JsonRpcRequest, rpcs::synced_connections::SyncedConnections, config::BlockAndRpc, jsonrpc::JsonRpcRequest, rpcs::synced_connections::SyncedConnections,
}; };
use anyhow::Context; use anyhow::Context;
use chrono::{DateTime, NaiveDateTime, Utc};
use derive_more::From; use derive_more::From;
use ethers::prelude::{Block, TxHash, H256, U64}; use ethers::prelude::{Block, TxHash, H256, U64};
use hashbrown::{HashMap, HashSet}; use hashbrown::{HashMap, HashSet};
@ -27,20 +26,75 @@ pub type BlockHashesCache = Cache<H256, ArcBlock, hashbrown::hash_map::DefaultHa
/// A block's hash and number. /// A block's hash and number.
#[derive(Clone, Debug, Default, From, Serialize)] #[derive(Clone, Debug, Default, From, Serialize)]
pub struct BlockId { pub struct SavedBlock {
pub hash: H256, pub block: ArcBlock,
pub num: U64, /// number of seconds this block was behind the current time when received
lag: u64,
} }
impl Display for BlockId { impl SavedBlock {
pub fn new(block: ArcBlock) -> Self {
// TODO: read this from a global config. different chains should probably have different gaps.
let allowed_lag: u64 = 60;
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("there should always be time");
// TODO: get this from config
// TODO: is this safe enough? what if something about the chain is actually lagged? what if its a chain like BTC with 10 minute blocks?
let oldest_allowed = now - Duration::from_secs(allowed_lag);
let block_timestamp = Duration::from_secs(block.timestamp.as_u64());
// TODO: recalculate this every time?
let lag = if block_timestamp < oldest_allowed {
// this server is still syncing from too far away to serve requests
// u64 is safe because ew checked equality above
(oldest_allowed - block_timestamp).as_secs() as u64
} else {
0
};
Self { block, lag }
}
pub fn hash(&self) -> H256 {
self.block.hash.unwrap()
}
// TODO: return as U64 or u64?
pub fn number(&self) -> U64 {
self.block.number.unwrap()
}
/// When the block was received, this node was still syncing
pub fn was_syncing(&self) -> bool {
// TODO: margin should come from a global config
self.lag > 60
}
}
impl From<ArcBlock> for SavedBlock {
fn from(x: ArcBlock) -> Self {
SavedBlock::new(x)
}
}
impl Display for SavedBlock {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{} ({})", self.num, self.hash) write!(f, "{} ({})", self.number(), self.hash())?;
if self.was_syncing() {
write!(f, " (behind by {} seconds)", self.lag)?;
}
Ok(())
} }
} }
impl Web3Connections { impl Web3Connections {
/// add a block to our map and it's hash to our graphmap of the blockchain /// add a block to our mappings and track the heaviest chain
pub async fn save_block(&self, block: &ArcBlock, heaviest_chain: bool) -> anyhow::Result<()> { pub async fn save_block(&self, block: &ArcBlock, heaviest_chain: bool) -> anyhow::Result<()> {
// TODO: i think we can rearrange this function to make it faster on the hot path // TODO: i think we can rearrange this function to make it faster on the hot path
let block_hash = block.hash.as_ref().context("no block hash")?; let block_hash = block.hash.as_ref().context("no block hash")?;
@ -51,38 +105,22 @@ impl Web3Connections {
return Ok(()); return Ok(());
} }
let mut blockchain = self.blockchain_graphmap.write().await;
let block_num = block.number.as_ref().context("no block num")?; let block_num = block.number.as_ref().context("no block num")?;
// TODO: think more about heaviest_chain. would be better to do the check inside this function // TODO: think more about heaviest_chain. would be better to do the check inside this function
if heaviest_chain { if heaviest_chain {
// this is the only place that writes to block_numbers // this is the only place that writes to block_numbers
// its inside a write lock on blockchain_graphmap, so i think there is no race
// multiple inserts should be okay though // multiple inserts should be okay though
// TODO: info that there was a fork?
self.block_numbers.insert(*block_num, *block_hash).await; self.block_numbers.insert(*block_num, *block_hash).await;
} }
if blockchain.contains_node(*block_hash) { // this block is very likely already in block_hashes
// trace!(%block_hash, %block_num, "block already saved"); // TODO: use their get_with
return Ok(());
}
// trace!(%block_hash, %block_num, "saving new block");
// TODO: this block is very likely already in block_hashes
self.block_hashes self.block_hashes
.insert(*block_hash, block.to_owned()) .get_with(*block_hash, async move { block.clone() })
.await; .await;
blockchain.add_node(*block_hash);
// what should edge weight be? and should the nodes be the blocks instead?
// we store parent_hash -> hash because the block already stores the parent_hash
blockchain.add_edge(block.parent_hash, *block_hash, 0);
// TODO: prune blockchain to only keep a configurable (256 on ETH?) number of blocks?
Ok(()) Ok(())
} }
@ -164,7 +202,7 @@ impl Web3Connections {
// be sure the requested block num exists // be sure the requested block num exists
let head_block_num = self.head_block_num().context("no servers in sync")?; let head_block_num = self.head_block_num().context("no servers in sync")?;
// TODO: not 64 on all chains? get from config? // TODO: geth does 64, erigon does 90k. sometimes we run a mix
let archive_needed = num < &(head_block_num - U64::from(64)); let archive_needed = num < &(head_block_num - U64::from(64));
if num > &head_block_num { if num > &head_block_num {
@ -206,7 +244,7 @@ impl Web3Connections {
// the block was fetched using eth_getBlockByNumber, so it should have all fields and be on the heaviest chain // the block was fetched using eth_getBlockByNumber, so it should have all fields and be on the heaviest chain
self.save_block(&block, true).await?; self.save_block(&block, true).await?;
Ok((block, true)) Ok((block, archive_needed))
} }
pub(super) async fn process_incoming_blocks( pub(super) async fn process_incoming_blocks(
@ -223,7 +261,10 @@ impl Web3Connections {
let mut connection_heads = HashMap::new(); let mut connection_heads = HashMap::new();
while let Ok((new_block, rpc)) = block_receiver.recv_async().await { while let Ok((new_block, rpc)) = block_receiver.recv_async().await {
let new_block = new_block.map(Into::into);
let rpc_name = rpc.name.clone(); let rpc_name = rpc.name.clone();
if let Err(err) = self if let Err(err) = self
.process_block_from_rpc( .process_block_from_rpc(
authorization, authorization,
@ -252,51 +293,49 @@ impl Web3Connections {
&self, &self,
authorization: &Arc<Authorization>, authorization: &Arc<Authorization>,
connection_heads: &mut HashMap<String, H256>, connection_heads: &mut HashMap<String, H256>,
rpc_head_block: Option<ArcBlock>, rpc_head_block: Option<SavedBlock>,
rpc: Arc<Web3Connection>, rpc: Arc<Web3Connection>,
head_block_sender: &watch::Sender<ArcBlock>, head_block_sender: &watch::Sender<ArcBlock>,
pending_tx_sender: &Option<broadcast::Sender<TxStatus>>, pending_tx_sender: &Option<broadcast::Sender<TxStatus>>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
// add the rpc's block to connection_heads, or remove the rpc from connection_heads // add the rpc's block to connection_heads, or remove the rpc from connection_heads
let rpc_head_id = match rpc_head_block { let rpc_head_block = match rpc_head_block {
Some(rpc_head_block) => { Some(rpc_head_block) => {
let rpc_head_num = rpc_head_block.number.unwrap(); let rpc_head_num = rpc_head_block.number();
let rpc_head_hash = rpc_head_block.hash.unwrap(); let rpc_head_hash = rpc_head_block.hash();
// we don't know if its on the heaviest chain yet // we don't know if its on the heaviest chain yet
self.save_block(&rpc_head_block, false).await?; self.save_block(&rpc_head_block.block, false).await?;
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.context("no time")?;
// TODO: get this from config
let oldest_allowed = now - Duration::from_secs(120);
let block_timestamp = Duration::from_secs(rpc_head_block.timestamp.as_u64());
if block_timestamp < oldest_allowed {
let behind_secs = (oldest_allowed - block_timestamp).as_secs();
if rpc_head_block.was_syncing() {
if connection_heads.remove(&rpc.name).is_some() { if connection_heads.remove(&rpc.name).is_some() {
warn!("{} is behind by {} seconds", &rpc.name, behind_secs); warn!("{} is behind by {} seconds", &rpc.name, rpc_head_block.lag);
}; };
None None
} else { } else {
connection_heads.insert(rpc.name.to_owned(), rpc_head_hash); if let Some(prev_hash) =
connection_heads.insert(rpc.name.to_owned(), rpc_head_hash)
{
if prev_hash == rpc_head_hash {
// this block was already sent by this node. return early
return Ok(());
}
}
Some(BlockId { // TODO: should we just keep the ArcBlock here?
hash: rpc_head_hash, Some(rpc_head_block)
num: rpc_head_num,
})
} }
} }
None => { None => {
// TODO: warn is too verbose. this is expected if a node disconnects and has to reconnect // TODO: warn is too verbose. this is expected if a node disconnects and has to reconnect
// // trace!(%rpc, "Block without number or hash!"); // // trace!(%rpc, "Block without number or hash!");
connection_heads.remove(&rpc.name); if connection_heads.remove(&rpc.name).is_none() {
// this connection was already removed.
// return early. no need to process synced connections
return Ok(());
}
None None
} }
@ -451,24 +490,19 @@ impl Web3Connections {
.filter_map(|conn_name| self.conns.get(conn_name).cloned()) .filter_map(|conn_name| self.conns.get(conn_name).cloned())
.collect(); .collect();
let consensus_head_block = maybe_head_block; let consensus_head_hash = maybe_head_block
let consensus_head_hash = consensus_head_block
.hash .hash
.expect("head blocks always have hashes"); .expect("head blocks always have hashes");
let consensus_head_num = consensus_head_block let consensus_head_num = maybe_head_block
.number .number
.expect("head blocks always have numbers"); .expect("head blocks always have numbers");
let num_consensus_rpcs = conns.len(); let num_consensus_rpcs = conns.len();
let consensus_head_block_id = BlockId { let consensus_head_block: SavedBlock = maybe_head_block.into();
hash: consensus_head_hash,
num: consensus_head_num,
};
let new_synced_connections = SyncedConnections { let new_synced_connections = SyncedConnections {
head_block_id: Some(consensus_head_block_id.clone()), head_block_id: Some(consensus_head_block.clone()),
conns, conns,
}; };
@ -484,35 +518,35 @@ impl Web3Connections {
num_consensus_rpcs, num_consensus_rpcs,
num_connection_heads, num_connection_heads,
total_conns, total_conns,
consensus_head_block_id, consensus_head_block,
rpc rpc
); );
self.save_block(&consensus_head_block, true).await?; self.save_block(&consensus_head_block.block, true).await?;
head_block_sender head_block_sender
.send(consensus_head_block) .send(consensus_head_block.block)
.context("head_block_sender sending consensus_head_block")?; .context("head_block_sender sending consensus_head_block")?;
} }
Some(old_block_id) => { Some(old_head_block) => {
// TODO: do this log item better // TODO: do this log item better
let rpc_head_str = rpc_head_id let rpc_head_str = rpc_head_block
.map(|x| x.to_string()) .map(|x| x.to_string())
.unwrap_or_else(|| "None".to_string()); .unwrap_or_else(|| "None".to_string());
match consensus_head_block_id.num.cmp(&old_block_id.num) { match consensus_head_block.number().cmp(&old_head_block.number()) {
Ordering::Equal => { Ordering::Equal => {
// TODO: if rpc_block_id != consensus_head_block_id, do a different log? // TODO: if rpc_block_id != consensus_head_block_id, do a different log?
// multiple blocks with the same fork! // multiple blocks with the same fork!
if consensus_head_block_id.hash == old_block_id.hash { if consensus_head_block.hash() == old_head_block.hash() {
// no change in hash. no need to use head_block_sender // no change in hash. no need to use head_block_sender
debug!( debug!(
"con {}/{}/{} con_head={} rpc={} rpc_head={}", "con {}/{}/{} con_head={} rpc={} rpc_head={}",
num_consensus_rpcs, num_consensus_rpcs,
num_connection_heads, num_connection_heads,
total_conns, total_conns,
consensus_head_block_id, consensus_head_block,
rpc, rpc,
rpc_head_str rpc_head_str
) )
@ -523,17 +557,17 @@ impl Web3Connections {
num_consensus_rpcs, num_consensus_rpcs,
num_connection_heads, num_connection_heads,
total_conns, total_conns,
consensus_head_block_id, consensus_head_block,
old_block_id, old_head_block,
rpc_head_str, rpc_head_str,
rpc rpc
); );
self.save_block(&consensus_head_block, true) self.save_block(&consensus_head_block.block, true)
.await .await
.context("save consensus_head_block as heaviest chain")?; .context("save consensus_head_block as heaviest chain")?;
head_block_sender.send(consensus_head_block).context( head_block_sender.send(consensus_head_block.block).context(
"head_block_sender sending consensus_head_block", "head_block_sender sending consensus_head_block",
)?; )?;
} }
@ -541,15 +575,17 @@ impl Web3Connections {
Ordering::Less => { Ordering::Less => {
// this is unlikely but possible // this is unlikely but possible
// TODO: better log // TODO: better log
warn!("chain rolled back {}/{}/{} con_head={} old_head={} rpc_head={} rpc={}", num_consensus_rpcs, num_connection_heads, total_conns, consensus_head_block_id, old_block_id, rpc_head_str, rpc); warn!("chain rolled back {}/{}/{} con_head={} old_head={} rpc_head={} rpc={}", num_consensus_rpcs, num_connection_heads, total_conns, consensus_head_block, old_head_block, rpc_head_str, rpc);
// TODO: tell save_block to remove any higher block numbers from the cache. not needed because we have other checks on requested blocks being > head, but still seems slike a good idea // TODO: tell save_block to remove any higher block numbers from the cache. not needed because we have other checks on requested blocks being > head, but still seems slike a good idea
self.save_block(&consensus_head_block, true).await.context( self.save_block(&consensus_head_block.block, true)
"save_block sending consensus_head_block as heaviest chain", .await
)?; .context(
"save_block sending consensus_head_block as heaviest chain",
)?;
head_block_sender head_block_sender
.send(consensus_head_block) .send(consensus_head_block.block)
.context("head_block_sender sending consensus_head_block")?; .context("head_block_sender sending consensus_head_block")?;
} }
Ordering::Greater => { Ordering::Greater => {
@ -558,14 +594,14 @@ impl Web3Connections {
num_consensus_rpcs, num_consensus_rpcs,
num_connection_heads, num_connection_heads,
total_conns, total_conns,
consensus_head_block_id, consensus_head_block,
rpc_head_str, rpc_head_str,
rpc rpc
); );
self.save_block(&consensus_head_block, true).await?; self.save_block(&consensus_head_block.block, true).await?;
head_block_sender.send(consensus_head_block)?; head_block_sender.send(consensus_head_block.block)?;
} }
} }
} }

@ -1,5 +1,5 @@
///! Rate-limited communication with a web3 provider. ///! Rate-limited communication with a web3 provider.
use super::blockchain::{ArcBlock, BlockHashesCache, BlockId}; use super::blockchain::{ArcBlock, BlockHashesCache, SavedBlock};
use super::provider::Web3Provider; use super::provider::Web3Provider;
use super::request::{OpenRequestHandle, OpenRequestHandleMetrics, OpenRequestResult}; use super::request::{OpenRequestHandle, OpenRequestHandleMetrics, OpenRequestResult};
use crate::app::{flatten_handle, AnyhowJoinHandle}; use crate::app::{flatten_handle, AnyhowJoinHandle};
@ -54,7 +54,7 @@ pub struct Web3Connection {
/// Lower weight are higher priority when sending requests. 0 to 99. /// Lower weight are higher priority when sending requests. 0 to 99.
pub(super) weight: f64, pub(super) weight: f64,
/// TODO: should this be an AsyncRwLock? /// TODO: should this be an AsyncRwLock?
pub(super) head_block_id: RwLock<Option<BlockId>>, pub(super) head_block: RwLock<Option<SavedBlock>>,
pub(super) open_request_handle_metrics: Arc<OpenRequestHandleMetrics>, pub(super) open_request_handle_metrics: Arc<OpenRequestHandleMetrics>,
} }
@ -111,7 +111,7 @@ impl Web3Connection {
hard_limit, hard_limit,
soft_limit, soft_limit,
block_data_limit, block_data_limit,
head_block_id: RwLock::new(Default::default()), head_block: RwLock::new(Default::default()),
weight, weight,
open_request_handle_metrics, open_request_handle_metrics,
}; };
@ -119,6 +119,7 @@ impl Web3Connection {
let new_connection = Arc::new(new_connection); let new_connection = Arc::new(new_connection);
// connect to the server (with retries) // connect to the server (with retries)
// TODO: PROBLEM! THIS RETRIES FOREVER AND BLOCKS THE APP STARTING
new_connection new_connection
.retrying_reconnect(block_sender.as_ref(), false) .retrying_reconnect(block_sender.as_ref(), false)
.await?; .await?;
@ -148,11 +149,11 @@ impl Web3Connection {
chain_id, chain_id,
found_chain_id found_chain_id
) )
.context(format!("failed @ {}", new_connection))); .context(format!("failed @ {}", new_connection.name)));
} }
} }
Err(e) => { Err(e) => {
let e = anyhow::Error::from(e).context(format!("failed @ {}", new_connection)); let e = anyhow::Error::from(e).context(format!("failed @ {}", new_connection.name));
return Err(e); return Err(e);
} }
} }
@ -212,7 +213,7 @@ impl Web3Connection {
// TODO: binary search between 90k and max? // TODO: binary search between 90k and max?
// TODO: start at 0 or 1 // TODO: start at 0 or 1
for block_data_limit in [0, 32, 64, 128, 256, 512, 1024, 90_000, u64::MAX] { for block_data_limit in [0, 32, 64, 128, 256, 512, 1024, 90_000, u64::MAX] {
let mut head_block_id = self.head_block_id.read().clone(); let mut head_block_id = self.head_block.read().clone();
// TODO: subscribe to a channel instead of polling. subscribe to http_interval_sender? // TODO: subscribe to a channel instead of polling. subscribe to http_interval_sender?
while head_block_id.is_none() { while head_block_id.is_none() {
@ -221,9 +222,9 @@ impl Web3Connection {
// TODO: sleep for the block time, or maybe subscribe to a channel instead of this simple pull // TODO: sleep for the block time, or maybe subscribe to a channel instead of this simple pull
sleep(Duration::from_secs(13)).await; sleep(Duration::from_secs(13)).await;
head_block_id = self.head_block_id.read().clone(); head_block_id = self.head_block.read().clone();
} }
let head_block_num = head_block_id.expect("is_none was checked above").num; let head_block_num = head_block_id.expect("is_none was checked above").number();
// TODO: subtract 1 from block_data_limit for safety? // TODO: subtract 1 from block_data_limit for safety?
let maybe_archive_block = head_block_num.saturating_sub((block_data_limit).into()); let maybe_archive_block = head_block_num.saturating_sub((block_data_limit).into());
@ -274,9 +275,9 @@ impl Web3Connection {
} }
pub fn has_block_data(&self, needed_block_num: &U64) -> bool { pub fn has_block_data(&self, needed_block_num: &U64) -> bool {
let head_block_num = match self.head_block_id.read().clone() { let head_block_num = match self.head_block.read().clone() {
None => return false, None => return false,
Some(x) => x.num, Some(x) => x.number(),
}; };
// this rpc doesn't have that block yet. still syncing // this rpc doesn't have that block yet. still syncing
@ -374,7 +375,7 @@ impl Web3Connection {
// reset sync status // reset sync status
{ {
let mut head_block_id = self.head_block_id.write(); let mut head_block_id = self.head_block.write();
*head_block_id = None; *head_block_id = None;
} }
@ -415,75 +416,60 @@ impl Web3Connection {
block_sender: &flume::Sender<BlockAndRpc>, block_sender: &flume::Sender<BlockAndRpc>,
block_map: BlockHashesCache, block_map: BlockHashesCache,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
match new_head_block { let new_head_block = match new_head_block {
Ok(None) => { Ok(None) => {
// TODO: i think this should clear the local block and then update over the block sender
warn!("unsynced server {}", self);
{ {
let mut head_block_id = self.head_block_id.write(); let mut head_block_id = self.head_block.write();
if head_block_id.is_none() {
// we previously sent a None. return early
return Ok(());
}
warn!("{} is not synced!", self);
*head_block_id = None; *head_block_id = None;
} }
block_sender None
.send_async((None, self.clone()))
.await
.context("clearing block_sender")?;
} }
Ok(Some(new_head_block)) => { Ok(Some(new_head_block)) => {
// TODO: is unwrap_or_default ok? we might have an empty block let new_hash = new_head_block
let new_hash = new_head_block.hash.unwrap_or_default(); .hash
.context("sending block to connections")?;
// if we already have this block saved, set new_head_block to that arc. otherwise store this copy // if we already have this block saved, set new_head_block to that arc. otherwise store this copy
let new_head_block = block_map let new_head_block = block_map
.get_with(new_hash, async move { new_head_block }) .get_with(new_hash, async move { new_head_block })
.await; .await;
let new_num = new_head_block.number.unwrap_or_default();
// save the block so we don't send the same one multiple times // save the block so we don't send the same one multiple times
// also save so that archive checks can know how far back to query // also save so that archive checks can know how far back to query
{ {
let mut head_block_id = self.head_block_id.write(); let mut head_block = self.head_block.write();
if head_block_id.is_none() { let _ = head_block.insert(new_head_block.clone().into());
*head_block_id = Some(BlockId {
hash: new_hash,
num: new_num,
});
} else {
head_block_id.as_mut().map(|x| {
x.hash = new_hash;
x.num = new_num;
x
});
}
} }
// send the block off to be saved Some(new_head_block)
block_sender
.send_async((Some(new_head_block), self.clone()))
.await
.context("block_sender")?;
} }
Err(err) => { Err(err) => {
warn!("unable to get block from {}. err={:?}", self, err); warn!("unable to get block from {}. err={:?}", self, err);
{ {
let mut head_block_id = self.head_block_id.write(); let mut head_block_id = self.head_block.write();
*head_block_id = None; *head_block_id = None;
} }
// send an empty block to take this server out of rotation None
// TODO: this is NOT working!!!!
block_sender
.send_async((None, self.clone()))
.await
.context("block_sender")?;
} }
} };
// send an empty block to take this server out of rotation
block_sender
.send_async((new_head_block, self.clone()))
.await
.context("block_sender")?;
Ok(()) Ok(())
} }
@ -965,7 +951,7 @@ impl Serialize for Web3Connection {
&self.frontend_requests.load(atomic::Ordering::Relaxed), &self.frontend_requests.load(atomic::Ordering::Relaxed),
)?; )?;
let head_block_id = &*self.head_block_id.read(); let head_block_id = &*self.head_block.read();
state.serialize_field("head_block_id", head_block_id)?; state.serialize_field("head_block_id", head_block_id)?;
state.end() state.end()
@ -999,13 +985,20 @@ impl fmt::Display for Web3Connection {
mod tests { mod tests {
#![allow(unused_imports)] #![allow(unused_imports)]
use super::*; use super::*;
use ethers::types::Block;
#[test] #[test]
fn test_archive_node_has_block_data() { fn test_archive_node_has_block_data() {
let head_block = BlockId { let random_block = Block {
hash: H256::random(), hash: Some(H256::random()),
num: 1_000_000.into(), number: Some(1_000_000.into()),
// TODO: timestamp?
..Default::default()
}; };
let random_block = Arc::new(random_block);
let head_block = SavedBlock::new(random_block);
let block_data_limit = u64::MAX; let block_data_limit = u64::MAX;
let metrics = OpenRequestHandleMetrics::default(); let metrics = OpenRequestHandleMetrics::default();
@ -1023,23 +1016,25 @@ mod tests {
soft_limit: 1_000, soft_limit: 1_000,
block_data_limit: block_data_limit.into(), block_data_limit: block_data_limit.into(),
weight: 100.0, weight: 100.0,
head_block_id: RwLock::new(Some(head_block.clone())), head_block: RwLock::new(Some(head_block.clone())),
open_request_handle_metrics: Arc::new(metrics), open_request_handle_metrics: Arc::new(metrics),
}; };
assert!(x.has_block_data(&0.into())); assert!(x.has_block_data(&0.into()));
assert!(x.has_block_data(&1.into())); assert!(x.has_block_data(&1.into()));
assert!(x.has_block_data(&head_block.num)); assert!(x.has_block_data(&head_block.number()));
assert!(!x.has_block_data(&(head_block.num + 1))); assert!(!x.has_block_data(&(head_block.number() + 1)));
assert!(!x.has_block_data(&(head_block.num + 1000))); assert!(!x.has_block_data(&(head_block.number() + 1000)));
} }
#[test] #[test]
fn test_pruned_node_has_block_data() { fn test_pruned_node_has_block_data() {
let head_block = BlockId { let head_block: SavedBlock = Arc::new(Block {
hash: H256::random(), hash: Some(H256::random()),
num: 1_000_000.into(), number: Some(1_000_000.into()),
}; ..Default::default()
})
.into();
let block_data_limit = 64; let block_data_limit = 64;
@ -1059,16 +1054,16 @@ mod tests {
soft_limit: 1_000, soft_limit: 1_000,
block_data_limit: block_data_limit.into(), block_data_limit: block_data_limit.into(),
weight: 100.0, weight: 100.0,
head_block_id: RwLock::new(Some(head_block.clone())), head_block: RwLock::new(Some(head_block.clone())),
open_request_handle_metrics: Arc::new(metrics), open_request_handle_metrics: Arc::new(metrics),
}; };
assert!(!x.has_block_data(&0.into())); assert!(!x.has_block_data(&0.into()));
assert!(!x.has_block_data(&1.into())); assert!(!x.has_block_data(&1.into()));
assert!(!x.has_block_data(&(head_block.num - block_data_limit - 1))); assert!(!x.has_block_data(&(head_block.number() - block_data_limit - 1)));
assert!(x.has_block_data(&(head_block.num - block_data_limit))); assert!(x.has_block_data(&(head_block.number() - block_data_limit)));
assert!(x.has_block_data(&head_block.num)); assert!(x.has_block_data(&head_block.number()));
assert!(!x.has_block_data(&(head_block.num + 1))); assert!(!x.has_block_data(&(head_block.number() + 1)));
assert!(!x.has_block_data(&(head_block.num + 1000))); assert!(!x.has_block_data(&(head_block.number() + 1000)));
} }
} }

@ -21,7 +21,6 @@ use hashbrown::HashMap;
use log::{error, info, trace, warn, Level}; use log::{error, info, trace, warn, Level};
use migration::sea_orm::DatabaseConnection; use migration::sea_orm::DatabaseConnection;
use moka::future::{Cache, ConcurrentCacheExt}; use moka::future::{Cache, ConcurrentCacheExt};
use petgraph::graphmap::DiGraphMap;
use serde::ser::{SerializeStruct, Serializer}; use serde::ser::{SerializeStruct, Serializer};
use serde::Serialize; use serde::Serialize;
use serde_json::json; use serde_json::json;
@ -30,7 +29,6 @@ use std::fmt;
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
use std::sync::Arc; use std::sync::Arc;
use thread_fast_rng::rand::seq::SliceRandom; use thread_fast_rng::rand::seq::SliceRandom;
use tokio::sync::RwLock as AsyncRwLock;
use tokio::sync::{broadcast, watch}; use tokio::sync::{broadcast, watch};
use tokio::task; use tokio::task;
use tokio::time::{interval, sleep, sleep_until, Duration, Instant, MissedTickBehavior}; use tokio::time::{interval, sleep, sleep_until, Duration, Instant, MissedTickBehavior};
@ -48,9 +46,6 @@ pub struct Web3Connections {
pub(super) block_hashes: BlockHashesCache, pub(super) block_hashes: BlockHashesCache,
/// blocks on the heaviest chain /// blocks on the heaviest chain
pub(super) block_numbers: Cache<U64, H256, hashbrown::hash_map::DefaultHashBuilder>, pub(super) block_numbers: Cache<U64, H256, hashbrown::hash_map::DefaultHashBuilder>,
/// TODO: this map is going to grow forever unless we do some sort of pruning. maybe store pruned in redis?
/// TODO: what should we use for edges?
pub(super) blockchain_graphmap: AsyncRwLock<DiGraphMap<H256, u32>>,
pub(super) min_head_rpcs: usize, pub(super) min_head_rpcs: usize,
pub(super) min_sum_soft_limit: u32, pub(super) min_sum_soft_limit: u32,
} }
@ -209,7 +204,6 @@ impl Web3Connections {
pending_transactions, pending_transactions,
block_hashes, block_hashes,
block_numbers, block_numbers,
blockchain_graphmap: Default::default(),
min_sum_soft_limit, min_sum_soft_limit,
min_head_rpcs, min_head_rpcs,
}); });
@ -846,15 +840,15 @@ impl Serialize for Web3Connections {
} }
mod tests { mod tests {
#![allow(unused_imports)]
use std::time::{SystemTime, UNIX_EPOCH};
// TODO: why is this allow needed? does tokio::test get in the way somehow? // TODO: why is this allow needed? does tokio::test get in the way somehow?
#![allow(unused_imports)]
use super::*; use super::*;
use crate::rpcs::{blockchain::BlockId, provider::Web3Provider}; use crate::rpcs::{blockchain::SavedBlock, provider::Web3Provider};
use ethers::types::{Block, U256}; use ethers::types::{Block, U256};
use log::{trace, LevelFilter}; use log::{trace, LevelFilter};
use parking_lot::RwLock; use parking_lot::RwLock;
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::sync::RwLock as AsyncRwLock;
#[tokio::test] #[tokio::test]
async fn test_server_selection_by_height() { async fn test_server_selection_by_height() {
@ -886,19 +880,13 @@ mod tests {
..Default::default() ..Default::default()
}; };
// TODO: write a impl From for Block -> BlockId?
let lagged_block_id = BlockId {
hash: lagged_block.hash.unwrap(),
num: lagged_block.number.unwrap(),
};
let head_block_id = BlockId {
hash: head_block.hash.unwrap(),
num: head_block.number.unwrap(),
};
let lagged_block = Arc::new(lagged_block); let lagged_block = Arc::new(lagged_block);
let head_block = Arc::new(head_block); let head_block = Arc::new(head_block);
// TODO: write a impl From for Block -> BlockId?
let lagged_block: SavedBlock = lagged_block.into();
let head_block: SavedBlock = head_block.into();
let block_data_limit = u64::MAX; let block_data_limit = u64::MAX;
let head_rpc = Web3Connection { let head_rpc = Web3Connection {
@ -914,7 +902,7 @@ mod tests {
soft_limit: 1_000, soft_limit: 1_000,
block_data_limit: block_data_limit.into(), block_data_limit: block_data_limit.into(),
weight: 100.0, weight: 100.0,
head_block_id: RwLock::new(Some(head_block_id)), head_block: RwLock::new(Some(head_block.clone())),
open_request_handle_metrics: Arc::new(Default::default()), open_request_handle_metrics: Arc::new(Default::default()),
}; };
@ -931,15 +919,15 @@ mod tests {
soft_limit: 1_000, soft_limit: 1_000,
block_data_limit: block_data_limit.into(), block_data_limit: block_data_limit.into(),
weight: 100.0, weight: 100.0,
head_block_id: RwLock::new(Some(lagged_block_id)), head_block: RwLock::new(Some(lagged_block.clone())),
open_request_handle_metrics: Arc::new(Default::default()), open_request_handle_metrics: Arc::new(Default::default()),
}; };
assert!(head_rpc.has_block_data(&lagged_block.number.unwrap())); assert!(head_rpc.has_block_data(&lagged_block.number()));
assert!(head_rpc.has_block_data(&head_block.number.unwrap())); assert!(head_rpc.has_block_data(&head_block.number()));
assert!(lagged_rpc.has_block_data(&lagged_block.number.unwrap())); assert!(lagged_rpc.has_block_data(&lagged_block.number()));
assert!(!lagged_rpc.has_block_data(&head_block.number.unwrap())); assert!(!lagged_rpc.has_block_data(&head_block.number()));
let head_rpc = Arc::new(head_rpc); let head_rpc = Arc::new(head_rpc);
let lagged_rpc = Arc::new(lagged_rpc); let lagged_rpc = Arc::new(lagged_rpc);
@ -961,7 +949,6 @@ mod tests {
block_numbers: Cache::builder() block_numbers: Cache::builder()
.max_capacity(10_000) .max_capacity(10_000)
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()), .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()),
blockchain_graphmap: Default::default(),
min_head_rpcs: 1, min_head_rpcs: 1,
min_sum_soft_limit: 1, min_sum_soft_limit: 1,
}; };
@ -1020,7 +1007,7 @@ mod tests {
assert!(matches!(x, OpenRequestResult::NotSynced)); assert!(matches!(x, OpenRequestResult::NotSynced));
// add lagged blocks to the conns. both servers should be allowed // add lagged blocks to the conns. both servers should be allowed
conns.save_block(&lagged_block, true).await.unwrap(); conns.save_block(&lagged_block.block, true).await.unwrap();
conns conns
.process_block_from_rpc( .process_block_from_rpc(
@ -1048,7 +1035,7 @@ mod tests {
assert_eq!(conns.num_synced_rpcs(), 2); assert_eq!(conns.num_synced_rpcs(), 2);
// add head block to the conns. lagged_rpc should not be available // add head block to the conns. lagged_rpc should not be available
conns.save_block(&head_block, true).await.unwrap(); conns.save_block(&head_block.block, true).await.unwrap();
conns conns
.process_block_from_rpc( .process_block_from_rpc(
@ -1109,7 +1096,7 @@ mod tests {
.as_secs() .as_secs()
.into(); .into();
let head_block: Block<TxHash> = Block { let head_block = Block {
hash: Some(H256::random()), hash: Some(H256::random()),
number: Some(1_000_000.into()), number: Some(1_000_000.into()),
parent_hash: H256::random(), parent_hash: H256::random(),
@ -1117,13 +1104,7 @@ mod tests {
..Default::default() ..Default::default()
}; };
// TODO: write a impl From for Block -> BlockId? let head_block: SavedBlock = Arc::new(head_block).into();
let head_block_id = BlockId {
hash: head_block.hash.unwrap(),
num: head_block.number.unwrap(),
};
let head_block = Arc::new(head_block);
let pruned_rpc = Web3Connection { let pruned_rpc = Web3Connection {
name: "pruned".to_string(), name: "pruned".to_string(),
@ -1138,7 +1119,7 @@ mod tests {
soft_limit: 3_000, soft_limit: 3_000,
block_data_limit: 64.into(), block_data_limit: 64.into(),
weight: 1.0, weight: 1.0,
head_block_id: RwLock::new(Some(head_block_id.clone())), head_block: RwLock::new(Some(head_block.clone())),
open_request_handle_metrics: Arc::new(Default::default()), open_request_handle_metrics: Arc::new(Default::default()),
}; };
@ -1156,12 +1137,12 @@ mod tests {
block_data_limit: u64::MAX.into(), block_data_limit: u64::MAX.into(),
// TODO: does weight = 0 work? // TODO: does weight = 0 work?
weight: 0.01, weight: 0.01,
head_block_id: RwLock::new(Some(head_block_id)), head_block: RwLock::new(Some(head_block.clone())),
open_request_handle_metrics: Arc::new(Default::default()), open_request_handle_metrics: Arc::new(Default::default()),
}; };
assert!(pruned_rpc.has_block_data(&head_block.number.unwrap())); assert!(pruned_rpc.has_block_data(&head_block.number()));
assert!(archive_rpc.has_block_data(&head_block.number.unwrap())); assert!(archive_rpc.has_block_data(&head_block.number()));
assert!(!pruned_rpc.has_block_data(&1.into())); assert!(!pruned_rpc.has_block_data(&1.into()));
assert!(archive_rpc.has_block_data(&1.into())); assert!(archive_rpc.has_block_data(&1.into()));
@ -1185,7 +1166,6 @@ mod tests {
block_numbers: Cache::builder() block_numbers: Cache::builder()
.max_capacity(10) .max_capacity(10)
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()), .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()),
blockchain_graphmap: Default::default(),
min_head_rpcs: 1, min_head_rpcs: 1,
min_sum_soft_limit: 3_000, min_sum_soft_limit: 3_000,
}; };
@ -1223,7 +1203,7 @@ mod tests {
// best_synced_backend_connection requires servers to be synced with the head block // best_synced_backend_connection requires servers to be synced with the head block
let best_head_server = conns let best_head_server = conns
.best_synced_backend_connection(&authorization, None, &[], head_block.number.as_ref()) .best_synced_backend_connection(&authorization, None, &[], Some(&head_block.number()))
.await; .await;
assert!(matches!( assert!(matches!(

@ -1,4 +1,4 @@
use super::blockchain::BlockId; use super::blockchain::SavedBlock;
use super::connection::Web3Connection; use super::connection::Web3Connection;
use super::connections::Web3Connections; use super::connections::Web3Connections;
use ethers::prelude::{H256, U64}; use ethers::prelude::{H256, U64};
@ -11,7 +11,7 @@ use std::sync::Arc;
#[derive(Clone, Default, Serialize)] #[derive(Clone, Default, Serialize)]
pub struct SyncedConnections { pub struct SyncedConnections {
// TODO: store ArcBlock instead? // TODO: store ArcBlock instead?
pub(super) head_block_id: Option<BlockId>, pub(super) head_block_id: Option<SavedBlock>,
// TODO: this should be able to serialize, but it isn't // TODO: this should be able to serialize, but it isn't
#[serde(skip_serializing)] #[serde(skip_serializing)]
pub(super) conns: Vec<Arc<Web3Connection>>, pub(super) conns: Vec<Arc<Web3Connection>>,
@ -29,7 +29,7 @@ impl fmt::Debug for SyncedConnections {
} }
impl Web3Connections { impl Web3Connections {
pub fn head_block_id(&self) -> Option<BlockId> { pub fn head_block_id(&self) -> Option<SavedBlock> {
self.synced_connections.load().head_block_id.clone() self.synced_connections.load().head_block_id.clone()
} }
@ -38,7 +38,7 @@ impl Web3Connections {
.load() .load()
.head_block_id .head_block_id
.as_ref() .as_ref()
.map(|head_block_id| head_block_id.hash) .map(|head_block_id| head_block_id.hash())
} }
pub fn head_block_num(&self) -> Option<U64> { pub fn head_block_num(&self) -> Option<U64> {
@ -46,7 +46,7 @@ impl Web3Connections {
.load() .load()
.head_block_id .head_block_id
.as_ref() .as_ref()
.map(|head_block_id| head_block_id.num) .map(|head_block_id| head_block_id.number())
} }
pub fn synced(&self) -> bool { pub fn synced(&self) -> bool {