2022-08-24 02:56:47 +03:00
|
|
|
///! Keep track of the blockchain as seen by a Web3Connections.
|
2022-08-24 03:11:49 +03:00
|
|
|
use super::connection::Web3Connection;
|
|
|
|
use super::connections::Web3Connections;
|
2022-08-24 03:59:05 +03:00
|
|
|
use super::transactions::TxStatus;
|
2022-08-27 02:44:25 +03:00
|
|
|
use crate::{
|
|
|
|
config::BlockAndRpc, jsonrpc::JsonRpcRequest, rpcs::synced_connections::SyncedConnections,
|
|
|
|
};
|
2022-09-01 08:58:55 +03:00
|
|
|
use anyhow::Context;
|
2022-08-26 20:26:17 +03:00
|
|
|
use derive_more::From;
|
2022-08-27 06:11:58 +03:00
|
|
|
use ethers::prelude::{Block, TxHash, H256, U64};
|
2022-08-27 02:44:25 +03:00
|
|
|
use hashbrown::{HashMap, HashSet};
|
2022-09-05 08:53:58 +03:00
|
|
|
use moka::future::Cache;
|
2022-09-01 08:58:55 +03:00
|
|
|
use serde::Serialize;
|
2022-08-24 02:56:47 +03:00
|
|
|
use serde_json::json;
|
2022-09-01 08:58:55 +03:00
|
|
|
use std::{cmp::Ordering, fmt::Display, sync::Arc};
|
2022-08-24 02:56:47 +03:00
|
|
|
use tokio::sync::{broadcast, watch};
|
2022-09-09 00:01:36 +03:00
|
|
|
use tracing::{debug, trace, warn};
|
2022-08-24 02:56:47 +03:00
|
|
|
|
2022-09-05 08:53:58 +03:00
|
|
|
// TODO: type for Hydrated Blocks with their full transactions?
|
2022-08-30 23:01:42 +03:00
|
|
|
pub type ArcBlock = Arc<Block<TxHash>>;
|
|
|
|
|
2022-09-05 08:53:58 +03:00
|
|
|
pub type BlockHashesMap = Cache<H256, ArcBlock>;
|
2022-08-30 23:01:42 +03:00
|
|
|
|
2022-08-27 06:11:58 +03:00
|
|
|
/// A block's hash and number.
|
2022-09-01 08:58:55 +03:00
|
|
|
#[derive(Clone, Debug, Default, From, Serialize)]
|
2022-08-26 20:26:17 +03:00
|
|
|
pub struct BlockId {
|
2022-09-07 06:54:16 +03:00
|
|
|
pub hash: H256,
|
|
|
|
pub num: U64,
|
2022-08-26 20:26:17 +03:00
|
|
|
}
|
|
|
|
|
2022-09-01 08:58:55 +03:00
|
|
|
impl Display for BlockId {
|
|
|
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
|
|
write!(f, "{} ({})", self.num, self.hash)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-08-24 02:56:47 +03:00
|
|
|
impl Web3Connections {
|
2022-08-27 05:13:36 +03:00
|
|
|
/// add a block to our map and it's hash to our graphmap of the blockchain
|
2022-09-05 09:13:36 +03:00
|
|
|
pub async fn save_block(&self, block: &ArcBlock, heaviest_chain: bool) -> anyhow::Result<()> {
|
2022-09-03 00:35:03 +03:00
|
|
|
// TODO: i think we can rearrange this function to make it faster on the hot path
|
2022-09-01 08:58:55 +03:00
|
|
|
let block_hash = block.hash.as_ref().context("no block hash")?;
|
2022-09-06 15:29:37 +03:00
|
|
|
|
|
|
|
// skip if
|
|
|
|
if block_hash.is_zero() {
|
|
|
|
return Ok(());
|
|
|
|
}
|
|
|
|
|
2022-09-01 08:58:55 +03:00
|
|
|
let block_num = block.number.as_ref().context("no block num")?;
|
2022-08-30 23:01:42 +03:00
|
|
|
let _block_td = block
|
|
|
|
.total_difficulty
|
|
|
|
.as_ref()
|
2022-09-05 08:53:58 +03:00
|
|
|
.expect("no block total difficulty");
|
2022-08-24 02:56:47 +03:00
|
|
|
|
2022-09-03 00:35:03 +03:00
|
|
|
// if self.block_hashes.contains_key(block_hash) {
|
|
|
|
// // this block is already included. no need to continue
|
|
|
|
// return Ok(());
|
|
|
|
// }
|
|
|
|
|
2022-09-05 08:53:58 +03:00
|
|
|
let mut blockchain = self.blockchain_graphmap.write().await;
|
2022-09-03 00:35:03 +03:00
|
|
|
|
2022-09-05 08:53:58 +03:00
|
|
|
// TODO: think more about heaviest_chain
|
2022-09-05 09:13:36 +03:00
|
|
|
if heaviest_chain {
|
|
|
|
// this is the only place that writes to block_numbers
|
|
|
|
// its inside a write lock on blockchain_graphmap, so i think there is no race
|
2022-09-05 08:53:58 +03:00
|
|
|
if let Some(old_hash) = self.block_numbers.get(block_num) {
|
|
|
|
if block_hash == &old_hash {
|
|
|
|
// this block has already been saved
|
|
|
|
return Ok(());
|
2022-09-02 08:40:56 +03:00
|
|
|
}
|
|
|
|
}
|
2022-09-05 09:13:36 +03:00
|
|
|
|
2022-09-05 08:53:58 +03:00
|
|
|
// i think a race here isn't that big of a problem. just 2 inserts
|
|
|
|
self.block_numbers.insert(*block_num, *block_hash).await;
|
2022-09-02 08:40:56 +03:00
|
|
|
}
|
|
|
|
|
2022-09-05 09:13:36 +03:00
|
|
|
if blockchain.contains_node(*block_hash) {
|
|
|
|
// this hash is already included
|
|
|
|
// return now since this work was already done.
|
|
|
|
return Ok(());
|
|
|
|
}
|
2022-08-24 02:56:47 +03:00
|
|
|
|
2022-08-26 20:26:17 +03:00
|
|
|
// TODO: prettier log? or probably move the log somewhere else
|
2022-08-28 02:49:41 +03:00
|
|
|
trace!(%block_hash, "new block");
|
2022-08-24 02:56:47 +03:00
|
|
|
|
2022-09-05 09:13:36 +03:00
|
|
|
// TODO: theres a small race between contains_key and insert
|
|
|
|
self.block_hashes
|
|
|
|
.insert(*block_hash, block.to_owned())
|
|
|
|
.await;
|
|
|
|
|
2022-08-30 23:01:42 +03:00
|
|
|
blockchain.add_node(*block_hash);
|
2022-08-24 02:56:47 +03:00
|
|
|
|
2022-08-26 20:26:17 +03:00
|
|
|
// what should edge weight be? and should the nodes be the blocks instead?
|
2022-08-27 02:44:25 +03:00
|
|
|
// TODO: maybe the weight should be the block?
|
2022-08-26 20:26:17 +03:00
|
|
|
// we store parent_hash -> hash because the block already stores the parent_hash
|
2022-08-30 23:01:42 +03:00
|
|
|
blockchain.add_edge(block.parent_hash, *block_hash, 0);
|
2022-08-28 02:49:41 +03:00
|
|
|
|
|
|
|
// TODO: prune block_numbers and block_map to only keep a configurable (256 on ETH?) number of blocks?
|
2022-08-24 02:56:47 +03:00
|
|
|
|
2022-08-26 20:26:17 +03:00
|
|
|
Ok(())
|
2022-08-24 02:56:47 +03:00
|
|
|
}
|
|
|
|
|
2022-08-27 05:13:36 +03:00
|
|
|
/// Get a block from caches with fallback.
|
|
|
|
/// Will query a specific node or the best available.
|
2022-09-02 23:16:20 +03:00
|
|
|
/// WARNING! If rpc is specified, this may wait forever. be sure this runs with your own timeout
|
2022-08-26 20:26:17 +03:00
|
|
|
pub async fn block(
|
|
|
|
&self,
|
|
|
|
hash: &H256,
|
2022-08-27 05:13:36 +03:00
|
|
|
rpc: Option<&Arc<Web3Connection>>,
|
2022-08-30 23:01:42 +03:00
|
|
|
) -> anyhow::Result<ArcBlock> {
|
2022-08-24 02:56:47 +03:00
|
|
|
// first, try to get the hash from our cache
|
2022-09-03 00:35:03 +03:00
|
|
|
// the cache is set last, so if its here, its everywhere
|
2022-08-28 02:49:41 +03:00
|
|
|
if let Some(block) = self.block_hashes.get(hash) {
|
2022-09-05 08:53:58 +03:00
|
|
|
return Ok(block);
|
2022-08-24 02:56:47 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
// block not in cache. we need to ask an rpc for it
|
2022-09-03 00:35:03 +03:00
|
|
|
let get_block_params = (hash, false);
|
2022-08-24 02:56:47 +03:00
|
|
|
// TODO: if error, retry?
|
2022-08-27 05:13:36 +03:00
|
|
|
let block: Block<TxHash> = match rpc {
|
2022-08-26 20:26:17 +03:00
|
|
|
Some(rpc) => {
|
2022-08-27 05:13:36 +03:00
|
|
|
rpc.wait_for_request_handle()
|
|
|
|
.await?
|
2022-09-03 00:35:03 +03:00
|
|
|
.request("eth_getBlockByHash", get_block_params)
|
2022-08-27 05:13:36 +03:00
|
|
|
.await?
|
2022-08-26 20:26:17 +03:00
|
|
|
}
|
2022-08-27 05:13:36 +03:00
|
|
|
None => {
|
2022-09-02 08:40:56 +03:00
|
|
|
// TODO: helper for method+params => JsonRpcRequest
|
|
|
|
// TODO: does this id matter?
|
2022-09-03 00:35:03 +03:00
|
|
|
let request = json!({ "id": "1", "method": "eth_getBlockByHash", "params": get_block_params });
|
2022-08-27 05:13:36 +03:00
|
|
|
let request: JsonRpcRequest = serde_json::from_value(request)?;
|
2022-08-24 02:56:47 +03:00
|
|
|
|
2022-08-27 05:13:36 +03:00
|
|
|
let response = self.try_send_best_upstream_server(request, None).await?;
|
2022-08-24 02:56:47 +03:00
|
|
|
|
2022-08-27 05:13:36 +03:00
|
|
|
let block = response.result.unwrap();
|
|
|
|
|
|
|
|
serde_json::from_str(block.get())?
|
|
|
|
}
|
|
|
|
};
|
2022-08-24 02:56:47 +03:00
|
|
|
|
|
|
|
let block = Arc::new(block);
|
|
|
|
|
2022-08-30 23:01:42 +03:00
|
|
|
// the block was fetched using eth_getBlockByHash, so it should have all fields
|
2022-09-05 09:13:36 +03:00
|
|
|
self.save_block(&block, true).await?;
|
2022-08-24 02:56:47 +03:00
|
|
|
|
|
|
|
Ok(block)
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Convenience method to get the cannonical block at a given block height.
|
|
|
|
pub async fn block_hash(&self, num: &U64) -> anyhow::Result<H256> {
|
|
|
|
let block = self.cannonical_block(num).await?;
|
|
|
|
|
|
|
|
let hash = block.hash.unwrap();
|
|
|
|
|
|
|
|
Ok(hash)
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Get the heaviest chain's block from cache or backend rpc
|
2022-08-30 23:01:42 +03:00
|
|
|
pub async fn cannonical_block(&self, num: &U64) -> anyhow::Result<ArcBlock> {
|
2022-08-28 02:49:41 +03:00
|
|
|
// we only have blocks by hash now
|
2022-09-05 08:53:58 +03:00
|
|
|
// maybe save them during save_block in a blocks_by_number Cache<U64, Vec<ArcBlock>>
|
2022-08-28 02:49:41 +03:00
|
|
|
// if theres multiple, use petgraph to find the one on the main chain (and remove the others if they have enough confirmations)
|
2022-08-26 20:26:17 +03:00
|
|
|
|
2022-09-03 00:35:03 +03:00
|
|
|
// be sure the requested block num exists
|
2022-09-01 08:58:55 +03:00
|
|
|
let head_block_num = self
|
|
|
|
.head_block_num()
|
|
|
|
.ok_or_else(|| anyhow::anyhow!("no servers in sync"))?;
|
2022-08-24 02:56:47 +03:00
|
|
|
if num > &head_block_num {
|
|
|
|
// TODO: i'm seeing this a lot when using ethspam. i dont know why though. i thought we delayed publishing
|
|
|
|
// TODO: instead of error, maybe just sleep and try again?
|
|
|
|
return Err(anyhow::anyhow!(
|
|
|
|
"Head block is #{}, but #{} was requested",
|
|
|
|
head_block_num,
|
|
|
|
num
|
|
|
|
));
|
|
|
|
}
|
|
|
|
|
2022-09-03 00:35:03 +03:00
|
|
|
// try to get the hash from our cache
|
|
|
|
// deref to not keep the lock open
|
2022-09-05 08:53:58 +03:00
|
|
|
if let Some(block_hash) = self.block_numbers.get(num) {
|
2022-09-03 00:35:03 +03:00
|
|
|
// TODO: sometimes this needs to fetch the block. why? i thought block_numbers would only be set if the block hash was set
|
|
|
|
return self.block(&block_hash, None).await;
|
|
|
|
}
|
|
|
|
|
|
|
|
// block number not in cache. we need to ask an rpc for it
|
2022-08-24 02:56:47 +03:00
|
|
|
// TODO: helper for method+params => JsonRpcRequest
|
|
|
|
let request = json!({ "jsonrpc": "2.0", "id": "1", "method": "eth_getBlockByNumber", "params": (num, false) });
|
|
|
|
let request: JsonRpcRequest = serde_json::from_value(request)?;
|
|
|
|
|
|
|
|
// TODO: if error, retry?
|
|
|
|
let response = self
|
|
|
|
.try_send_best_upstream_server(request, Some(num))
|
|
|
|
.await?;
|
|
|
|
|
2022-09-02 23:16:20 +03:00
|
|
|
let raw_block = response.result.context("no block result")?;
|
2022-08-24 02:56:47 +03:00
|
|
|
|
2022-09-02 23:16:20 +03:00
|
|
|
let block: Block<TxHash> = serde_json::from_str(raw_block.get())?;
|
2022-08-24 02:56:47 +03:00
|
|
|
|
|
|
|
let block = Arc::new(block);
|
|
|
|
|
2022-09-02 08:40:56 +03:00
|
|
|
// the block was fetched using eth_getBlockByNumber, so it should have all fields and be on the heaviest chain
|
2022-09-05 09:13:36 +03:00
|
|
|
self.save_block(&block, true).await?;
|
2022-08-24 02:56:47 +03:00
|
|
|
|
|
|
|
Ok(block)
|
|
|
|
}
|
|
|
|
|
2022-08-26 20:26:17 +03:00
|
|
|
pub(super) async fn process_incoming_blocks(
|
2022-08-24 03:59:05 +03:00
|
|
|
&self,
|
2022-08-26 20:26:17 +03:00
|
|
|
block_receiver: flume::Receiver<BlockAndRpc>,
|
2022-09-05 08:53:58 +03:00
|
|
|
// TODO: document that this is a watch sender and not a broadcast! if things get busy, blocks might get missed
|
|
|
|
// Geth's subscriptions have the same potential for skipping blocks.
|
2022-08-30 23:01:42 +03:00
|
|
|
head_block_sender: watch::Sender<ArcBlock>,
|
2022-08-24 03:59:05 +03:00
|
|
|
pending_tx_sender: Option<broadcast::Sender<TxStatus>>,
|
|
|
|
) -> anyhow::Result<()> {
|
|
|
|
// TODO: indexmap or hashmap? what hasher? with_capacity?
|
2022-08-26 20:26:17 +03:00
|
|
|
// TODO: this will grow unbounded. prune old heads on this at the same time we prune the graph?
|
|
|
|
let mut connection_heads = HashMap::new();
|
2022-08-24 03:59:05 +03:00
|
|
|
|
|
|
|
while let Ok((new_block, rpc)) = block_receiver.recv_async().await {
|
2022-08-27 02:44:25 +03:00
|
|
|
self.process_block_from_rpc(
|
2022-08-24 03:59:05 +03:00
|
|
|
&mut connection_heads,
|
|
|
|
new_block,
|
|
|
|
rpc,
|
|
|
|
&head_block_sender,
|
|
|
|
&pending_tx_sender,
|
|
|
|
)
|
|
|
|
.await?;
|
|
|
|
}
|
|
|
|
|
|
|
|
// TODO: if there was an error, we should return it
|
|
|
|
warn!("block_receiver exited!");
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2022-08-26 20:26:17 +03:00
|
|
|
/// `connection_heads` is a mapping of rpc_names to head block hashes.
|
|
|
|
/// self.blockchain_map is a mapping of hashes to the complete Block<TxHash>.
|
2022-08-27 02:44:25 +03:00
|
|
|
/// TODO: return something?
|
|
|
|
async fn process_block_from_rpc(
|
2022-08-24 02:56:47 +03:00
|
|
|
&self,
|
2022-08-26 20:26:17 +03:00
|
|
|
connection_heads: &mut HashMap<String, H256>,
|
2022-09-06 15:29:37 +03:00
|
|
|
rpc_head_block: Option<ArcBlock>,
|
2022-08-24 02:56:47 +03:00
|
|
|
rpc: Arc<Web3Connection>,
|
2022-08-30 23:01:42 +03:00
|
|
|
head_block_sender: &watch::Sender<ArcBlock>,
|
2022-08-24 03:59:05 +03:00
|
|
|
pending_tx_sender: &Option<broadcast::Sender<TxStatus>>,
|
2022-08-24 02:56:47 +03:00
|
|
|
) -> anyhow::Result<()> {
|
2022-09-06 15:29:37 +03:00
|
|
|
// add the rpc's block to connection_heads, or remove the rpc from connection_heads
|
2022-09-06 19:49:07 +03:00
|
|
|
let rpc_head_id = match rpc_head_block {
|
2022-09-06 15:29:37 +03:00
|
|
|
Some(rpc_head_block) => {
|
|
|
|
let rpc_head_num = rpc_head_block.number.unwrap();
|
|
|
|
let rpc_head_hash = rpc_head_block.hash.unwrap();
|
|
|
|
|
|
|
|
if rpc_head_num.is_zero() {
|
|
|
|
// TODO: i don't think we can get to this anymore now that we use Options
|
2022-08-26 20:26:17 +03:00
|
|
|
debug!(%rpc, "still syncing");
|
2022-08-24 02:56:47 +03:00
|
|
|
|
2022-08-26 20:26:17 +03:00
|
|
|
connection_heads.remove(&rpc.name);
|
2022-09-06 19:49:07 +03:00
|
|
|
|
|
|
|
None
|
2022-08-26 20:26:17 +03:00
|
|
|
} else {
|
2022-09-02 08:40:56 +03:00
|
|
|
// we don't know if its on the heaviest chain yet
|
2022-09-05 09:13:36 +03:00
|
|
|
self.save_block(&rpc_head_block, false).await?;
|
|
|
|
|
|
|
|
connection_heads.insert(rpc.name.to_owned(), rpc_head_hash);
|
2022-09-06 19:49:07 +03:00
|
|
|
|
|
|
|
Some(BlockId {
|
|
|
|
hash: rpc_head_hash,
|
|
|
|
num: rpc_head_num,
|
|
|
|
})
|
2022-08-26 20:26:17 +03:00
|
|
|
}
|
|
|
|
}
|
2022-09-06 15:29:37 +03:00
|
|
|
None => {
|
2022-09-02 23:46:39 +03:00
|
|
|
// TODO: warn is too verbose. this is expected if a node disconnects and has to reconnect
|
2022-09-03 05:59:30 +03:00
|
|
|
trace!(%rpc, "Block without number or hash!");
|
2022-08-24 02:56:47 +03:00
|
|
|
|
2022-08-26 20:26:17 +03:00
|
|
|
connection_heads.remove(&rpc.name);
|
2022-09-06 19:49:07 +03:00
|
|
|
|
|
|
|
None
|
2022-08-24 02:56:47 +03:00
|
|
|
}
|
2022-09-01 08:58:55 +03:00
|
|
|
};
|
2022-08-24 02:56:47 +03:00
|
|
|
|
2022-09-02 08:40:56 +03:00
|
|
|
// iterate the known heads to find the highest_work_block
|
2022-08-27 02:44:25 +03:00
|
|
|
let mut checked_heads = HashSet::new();
|
2022-09-05 08:53:58 +03:00
|
|
|
let mut highest_work_block: Option<ArcBlock> = None;
|
2022-09-05 18:49:53 +03:00
|
|
|
for (conn_name, conn_head_hash) in connection_heads.iter() {
|
2022-09-05 04:52:59 +03:00
|
|
|
if checked_heads.contains(conn_head_hash) {
|
2022-09-02 08:40:56 +03:00
|
|
|
// we already checked this head from another rpc
|
2022-08-27 02:44:25 +03:00
|
|
|
continue;
|
|
|
|
}
|
2022-09-02 08:40:56 +03:00
|
|
|
// don't check the same hash multiple times
|
2022-09-05 04:52:59 +03:00
|
|
|
checked_heads.insert(conn_head_hash);
|
2022-08-26 20:26:17 +03:00
|
|
|
|
2022-09-05 18:49:53 +03:00
|
|
|
let conn_head_block = if let Some(x) = self.block_hashes.get(conn_head_hash) {
|
2022-09-02 23:46:39 +03:00
|
|
|
x
|
|
|
|
} else {
|
|
|
|
// TODO: why does this happen?
|
2022-09-06 15:29:37 +03:00
|
|
|
warn!(%conn_head_hash, %conn_name, %rpc, "Missing block in connection_heads");
|
2022-09-02 23:46:39 +03:00
|
|
|
continue;
|
|
|
|
};
|
2022-08-24 02:56:47 +03:00
|
|
|
|
2022-09-05 18:49:53 +03:00
|
|
|
match &conn_head_block.total_difficulty {
|
2022-08-30 23:01:42 +03:00
|
|
|
None => {
|
2022-09-05 18:49:53 +03:00
|
|
|
panic!("block is missing total difficulty. this is a bug");
|
2022-08-30 23:01:42 +03:00
|
|
|
}
|
|
|
|
Some(td) => {
|
2022-09-02 08:40:56 +03:00
|
|
|
// if this is the first block we've tried
|
|
|
|
// or if this rpc's newest block has a higher total difficulty
|
2022-08-30 23:01:42 +03:00
|
|
|
if highest_work_block.is_none()
|
|
|
|
|| td
|
|
|
|
> highest_work_block
|
|
|
|
.as_ref()
|
|
|
|
.expect("there should always be a block here")
|
|
|
|
.total_difficulty
|
|
|
|
.as_ref()
|
|
|
|
.expect("there should always be total difficulty here")
|
|
|
|
{
|
2022-09-05 18:49:53 +03:00
|
|
|
highest_work_block = Some(conn_head_block);
|
2022-08-30 23:01:42 +03:00
|
|
|
}
|
|
|
|
}
|
2022-08-27 02:44:25 +03:00
|
|
|
}
|
|
|
|
}
|
2022-08-24 02:56:47 +03:00
|
|
|
|
2022-09-02 08:40:56 +03:00
|
|
|
// clone to release the read lock on self.block_hashes
|
2022-09-05 08:53:58 +03:00
|
|
|
if let Some(mut maybe_head_block) = highest_work_block {
|
2022-09-02 08:40:56 +03:00
|
|
|
// track rpcs on this heaviest chain so we can build a new SyncedConnections
|
2022-09-05 19:25:21 +03:00
|
|
|
let mut heavy_rpcs = HashSet::<&String>::new();
|
2022-09-02 08:40:56 +03:00
|
|
|
// a running total of the soft limits covered by the heavy rpcs
|
|
|
|
let mut heavy_sum_soft_limit: u32 = 0;
|
|
|
|
// TODO: also track heavy_sum_hard_limit?
|
|
|
|
|
|
|
|
// check the highest work block for a set of rpcs that can serve our request load
|
|
|
|
// if it doesn't have enough rpcs for our request load, check the parent block
|
|
|
|
// TODO: loop for how many parent blocks? we don't want to serve blocks that are too far behind. probably different per chain
|
|
|
|
// TODO: this loop is pretty long. any way to clean up this code?
|
|
|
|
for _ in 0..3 {
|
|
|
|
let maybe_head_hash = maybe_head_block
|
|
|
|
.hash
|
|
|
|
.as_ref()
|
|
|
|
.expect("blocks here always need hashes");
|
|
|
|
|
|
|
|
// find all rpcs with maybe_head_block as their current head
|
|
|
|
for (conn_name, conn_head_hash) in connection_heads.iter() {
|
|
|
|
if conn_head_hash != maybe_head_hash {
|
2022-09-05 19:29:21 +03:00
|
|
|
// connection is not on the desired block
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
if heavy_rpcs.contains(conn_name) {
|
|
|
|
// connection is on a child block
|
2022-09-02 08:40:56 +03:00
|
|
|
continue;
|
|
|
|
}
|
2022-08-27 02:44:25 +03:00
|
|
|
|
2022-09-02 08:40:56 +03:00
|
|
|
if let Some(rpc) = self.conns.get(conn_name) {
|
2022-09-05 19:25:21 +03:00
|
|
|
heavy_rpcs.insert(conn_name);
|
2022-09-01 08:58:55 +03:00
|
|
|
heavy_sum_soft_limit += rpc.soft_limit;
|
2022-09-02 08:40:56 +03:00
|
|
|
} else {
|
|
|
|
warn!("connection missing")
|
2022-08-27 02:44:25 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-09-02 08:40:56 +03:00
|
|
|
if heavy_sum_soft_limit < self.min_sum_soft_limit
|
|
|
|
|| heavy_rpcs.len() < self.min_synced_rpcs
|
|
|
|
{
|
|
|
|
// not enough rpcs yet. check the parent
|
|
|
|
if let Some(parent_block) = self.block_hashes.get(&maybe_head_block.parent_hash)
|
|
|
|
{
|
|
|
|
trace!(
|
|
|
|
child=%maybe_head_hash, parent=%parent_block.hash.unwrap(), "avoiding thundering herd",
|
|
|
|
);
|
|
|
|
|
|
|
|
maybe_head_block = parent_block.clone();
|
|
|
|
continue;
|
|
|
|
} else {
|
|
|
|
warn!(
|
|
|
|
"no parent to check. soft limit only {}/{} from {}/{} rpcs: {}%",
|
|
|
|
heavy_sum_soft_limit,
|
|
|
|
self.min_sum_soft_limit,
|
|
|
|
heavy_rpcs.len(),
|
|
|
|
self.min_synced_rpcs,
|
|
|
|
heavy_sum_soft_limit * 100 / self.min_sum_soft_limit
|
|
|
|
);
|
|
|
|
break;
|
2022-08-27 02:44:25 +03:00
|
|
|
}
|
2022-08-24 02:56:47 +03:00
|
|
|
}
|
2022-09-05 19:25:21 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
// TODO: if heavy_rpcs.is_empty, try another method of finding the head block
|
|
|
|
|
2022-09-06 15:29:37 +03:00
|
|
|
let num_connection_heads = connection_heads.len();
|
|
|
|
let total_conns = self.conns.len();
|
|
|
|
|
2022-09-05 19:25:21 +03:00
|
|
|
// we've done all the searching for the heaviest block that we can
|
|
|
|
if heavy_rpcs.is_empty() {
|
|
|
|
// if we get here, something is wrong. clear synced connections
|
|
|
|
let empty_synced_connections = SyncedConnections::default();
|
|
|
|
|
|
|
|
let old_synced_connections = self
|
|
|
|
.synced_connections
|
|
|
|
.swap(Arc::new(empty_synced_connections));
|
2022-08-24 02:56:47 +03:00
|
|
|
|
2022-09-05 19:25:21 +03:00
|
|
|
// TODO: log different things depending on old_synced_connections
|
2022-09-06 15:29:37 +03:00
|
|
|
warn!(%rpc, "no consensus head! {}/{}/{}", 0, num_connection_heads, total_conns);
|
2022-09-05 19:25:21 +03:00
|
|
|
} else {
|
2022-09-06 15:29:37 +03:00
|
|
|
// TODO: this is too verbose. move to trace
|
|
|
|
// i think "conns" is somehow getting dupes
|
2022-09-09 00:01:36 +03:00
|
|
|
trace!(?heavy_rpcs);
|
2022-09-06 15:29:37 +03:00
|
|
|
|
2022-09-02 08:40:56 +03:00
|
|
|
// success! this block has enough soft limit and nodes on it (or on later blocks)
|
2022-09-05 19:25:21 +03:00
|
|
|
let conns: Vec<Arc<Web3Connection>> = heavy_rpcs
|
|
|
|
.into_iter()
|
|
|
|
.filter_map(|conn_name| self.conns.get(conn_name).cloned())
|
|
|
|
.collect();
|
2022-09-02 08:40:56 +03:00
|
|
|
|
|
|
|
let heavy_block = maybe_head_block;
|
|
|
|
|
|
|
|
let heavy_hash = heavy_block.hash.expect("head blocks always have hashes");
|
|
|
|
let heavy_num = heavy_block.number.expect("head blocks always have numbers");
|
|
|
|
|
|
|
|
debug_assert_ne!(heavy_num, U64::zero());
|
|
|
|
|
2022-09-05 19:25:21 +03:00
|
|
|
// TODO: add these to the log messages
|
|
|
|
let num_consensus_rpcs = conns.len();
|
|
|
|
|
2022-09-02 08:40:56 +03:00
|
|
|
let heavy_block_id = BlockId {
|
|
|
|
hash: heavy_hash,
|
|
|
|
num: heavy_num,
|
|
|
|
};
|
|
|
|
|
|
|
|
let new_synced_connections = SyncedConnections {
|
|
|
|
head_block_id: Some(heavy_block_id.clone()),
|
|
|
|
conns,
|
|
|
|
};
|
|
|
|
|
|
|
|
let old_synced_connections = self
|
|
|
|
.synced_connections
|
|
|
|
.swap(Arc::new(new_synced_connections));
|
|
|
|
|
|
|
|
// TODO: if the rpc_head_block != heavy, log something somewhere in here
|
|
|
|
match &old_synced_connections.head_block_id {
|
|
|
|
None => {
|
2022-09-06 19:49:07 +03:00
|
|
|
debug!(block=%heavy_block_id, %rpc, "first {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns);
|
2022-09-02 23:16:20 +03:00
|
|
|
|
2022-09-06 15:29:37 +03:00
|
|
|
self.save_block(&heavy_block, true).await?;
|
2022-09-02 23:16:20 +03:00
|
|
|
|
2022-09-02 08:40:56 +03:00
|
|
|
head_block_sender.send(heavy_block)?;
|
|
|
|
}
|
|
|
|
Some(old_block_id) => {
|
2022-09-06 19:49:07 +03:00
|
|
|
// TODO: do this log item better
|
|
|
|
let rpc_head_str = rpc_head_id
|
|
|
|
.map(|x| x.to_string())
|
|
|
|
.unwrap_or_else(|| "None".to_string());
|
|
|
|
|
2022-09-02 08:40:56 +03:00
|
|
|
match heavy_block_id.num.cmp(&old_block_id.num) {
|
|
|
|
Ordering::Equal => {
|
2022-09-06 19:49:07 +03:00
|
|
|
// TODO: if rpc_block_id != heavy_block_id, do a different log
|
|
|
|
|
2022-09-02 08:40:56 +03:00
|
|
|
// multiple blocks with the same fork!
|
|
|
|
if heavy_block_id.hash == old_block_id.hash {
|
|
|
|
// no change in hash. no need to use head_block_sender
|
2022-09-06 19:49:07 +03:00
|
|
|
debug!(con_head=%heavy_block_id, rpc_head=%rpc_head_str, %rpc, "con {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns)
|
2022-09-02 08:40:56 +03:00
|
|
|
} else {
|
|
|
|
// hash changed
|
2022-09-07 23:24:45 +03:00
|
|
|
debug!(con_head=%heavy_block_id, old=%old_block_id, rpc_head=%rpc_head_str, %rpc, "unc {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns);
|
2022-09-02 08:40:56 +03:00
|
|
|
|
|
|
|
// todo!("handle equal by updating the cannonical chain");
|
2022-09-06 15:29:37 +03:00
|
|
|
self.save_block(&heavy_block, true).await?;
|
2022-09-02 08:40:56 +03:00
|
|
|
|
|
|
|
head_block_sender.send(heavy_block)?;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
Ordering::Less => {
|
|
|
|
// this is unlikely but possible
|
|
|
|
// TODO: better log
|
2022-09-06 19:49:07 +03:00
|
|
|
warn!(con_head=%heavy_block_id, rpc_head=%rpc_head_str, %rpc, "chain rolled back {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns);
|
2022-09-02 23:16:20 +03:00
|
|
|
|
2022-09-06 15:29:37 +03:00
|
|
|
self.save_block(&heavy_block, true).await?;
|
2022-09-02 23:16:20 +03:00
|
|
|
|
2022-09-02 08:40:56 +03:00
|
|
|
// todo!("handle less by removing higher blocks from the cannonical chain");
|
|
|
|
head_block_sender.send(heavy_block)?;
|
|
|
|
}
|
|
|
|
Ordering::Greater => {
|
2022-09-06 19:49:07 +03:00
|
|
|
debug!(con_head=%heavy_block_id, rpc_head=%rpc_head_str, %rpc, "new {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns);
|
2022-09-02 08:40:56 +03:00
|
|
|
|
|
|
|
// todo!("handle greater by adding this block to and any missing parents to the cannonical chain");
|
|
|
|
|
2022-09-06 15:29:37 +03:00
|
|
|
self.save_block(&heavy_block, true).await?;
|
2022-09-02 23:16:20 +03:00
|
|
|
|
2022-09-02 08:40:56 +03:00
|
|
|
head_block_sender.send(heavy_block)?;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2022-08-24 02:56:47 +03:00
|
|
|
}
|
|
|
|
}
|
2022-08-27 02:44:25 +03:00
|
|
|
}
|
|
|
|
|
2022-09-02 23:16:20 +03:00
|
|
|
Ok(())
|
2022-08-24 02:56:47 +03:00
|
|
|
}
|
|
|
|
}
|