web3-proxy/web3_proxy/src/rpcs/blockchain.rs

519 lines
20 KiB
Rust
Raw Normal View History

///! Keep track of the blockchain as seen by a Web3Connections.
2022-08-24 03:11:49 +03:00
use super::connection::Web3Connection;
use super::connections::Web3Connections;
2022-08-24 03:59:05 +03:00
use super::transactions::TxStatus;
2022-08-27 02:44:25 +03:00
use crate::{
config::BlockAndRpc, jsonrpc::JsonRpcRequest, rpcs::synced_connections::SyncedConnections,
};
2022-09-01 08:58:55 +03:00
use anyhow::Context;
2022-08-30 23:01:42 +03:00
use dashmap::{
mapref::{entry::Entry, one::Ref},
DashMap,
};
2022-08-26 20:26:17 +03:00
use derive_more::From;
2022-08-27 06:11:58 +03:00
use ethers::prelude::{Block, TxHash, H256, U64};
2022-08-27 02:44:25 +03:00
use hashbrown::{HashMap, HashSet};
use petgraph::algo::all_simple_paths;
2022-09-01 08:58:55 +03:00
use serde::Serialize;
use serde_json::json;
2022-09-01 08:58:55 +03:00
use std::{cmp::Ordering, fmt::Display, sync::Arc};
use tokio::sync::{broadcast, watch};
2022-08-27 02:44:25 +03:00
use tracing::{debug, info, trace, warn};
2022-08-30 23:01:42 +03:00
pub type ArcBlock = Arc<Block<TxHash>>;
pub type BlockHashesMap = Arc<DashMap<H256, ArcBlock>>;
2022-08-27 06:11:58 +03:00
/// A block's hash and number.
2022-09-01 08:58:55 +03:00
#[derive(Clone, Debug, Default, From, Serialize)]
2022-08-26 20:26:17 +03:00
pub struct BlockId {
pub(super) hash: H256,
pub(super) num: U64,
}
2022-09-01 08:58:55 +03:00
impl Display for BlockId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{} ({})", self.num, self.hash)
}
}
impl Web3Connections {
2022-08-27 05:13:36 +03:00
/// add a block to our map and it's hash to our graphmap of the blockchain
2022-08-30 23:01:42 +03:00
pub fn save_block(&self, block: &ArcBlock) -> anyhow::Result<()> {
2022-09-01 08:58:55 +03:00
let block_hash = block.hash.as_ref().context("no block hash")?;
let block_num = block.number.as_ref().context("no block num")?;
2022-08-30 23:01:42 +03:00
let _block_td = block
.total_difficulty
.as_ref()
2022-09-01 08:58:55 +03:00
.context("no block total difficulty")?;
2022-08-30 23:01:42 +03:00
if self.block_hashes.contains_key(block_hash) {
2022-08-27 02:44:25 +03:00
// this block is already included. no need to continue
2022-08-26 20:26:17 +03:00
return Ok(());
}
2022-08-27 02:44:25 +03:00
let mut blockchain = self.blockchain_graphmap.write();
2022-08-30 23:01:42 +03:00
if blockchain.contains_node(*block_hash) {
2022-08-28 02:49:41 +03:00
// this hash is already included. we must have hit that race condition
// return now since this work was already done.
return Ok(());
}
2022-08-27 02:44:25 +03:00
// TODO: theres a small race between contains_key and insert
2022-08-30 23:01:42 +03:00
if let Some(_overwritten) = self.block_hashes.insert(*block_hash, block.clone()) {
2022-08-27 02:44:25 +03:00
// there was a race and another thread wrote this block
2022-08-28 02:49:41 +03:00
// i don't think this will happen. the blockchain.conains_node above should be enough
2022-08-27 02:44:25 +03:00
// no need to continue because that other thread would have written (or soon will) write the
return Ok(());
}
2022-08-30 23:01:42 +03:00
match self.block_numbers.entry(*block_num) {
2022-08-28 02:49:41 +03:00
Entry::Occupied(mut x) => {
2022-09-01 08:58:55 +03:00
let old = x.insert(*block_hash);
todo!(
"do something with the old hash. we need to update a bunch more block numbers"
)
2022-08-28 02:49:41 +03:00
}
Entry::Vacant(x) => {
2022-09-01 08:58:55 +03:00
x.insert(*block_hash);
2022-08-28 02:49:41 +03:00
}
2022-08-26 20:26:17 +03:00
}
2022-08-26 20:26:17 +03:00
// TODO: prettier log? or probably move the log somewhere else
2022-08-28 02:49:41 +03:00
trace!(%block_hash, "new block");
2022-08-30 23:01:42 +03:00
blockchain.add_node(*block_hash);
2022-08-26 20:26:17 +03:00
// what should edge weight be? and should the nodes be the blocks instead?
2022-08-27 02:44:25 +03:00
// TODO: maybe the weight should be the block?
2022-08-26 20:26:17 +03:00
// we store parent_hash -> hash because the block already stores the parent_hash
2022-08-30 23:01:42 +03:00
blockchain.add_edge(block.parent_hash, *block_hash, 0);
2022-08-28 02:49:41 +03:00
// TODO: prune block_numbers and block_map to only keep a configurable (256 on ETH?) number of blocks?
2022-08-26 20:26:17 +03:00
Ok(())
}
2022-08-27 05:13:36 +03:00
/// Get a block from caches with fallback.
/// Will query a specific node or the best available.
2022-08-30 23:01:42 +03:00
/// WARNING! This may wait forever. be sure this runs with your own timeout
2022-08-26 20:26:17 +03:00
pub async fn block(
&self,
hash: &H256,
2022-08-27 05:13:36 +03:00
rpc: Option<&Arc<Web3Connection>>,
2022-08-30 23:01:42 +03:00
) -> anyhow::Result<ArcBlock> {
// first, try to get the hash from our cache
2022-08-28 02:49:41 +03:00
if let Some(block) = self.block_hashes.get(hash) {
return Ok(block.clone());
}
// block not in cache. we need to ask an rpc for it
// TODO: helper for method+params => JsonRpcRequest
// TODO: get block with the transactions?
2022-08-26 20:26:17 +03:00
// TODO: does this id matter?
2022-08-27 05:13:36 +03:00
let request_params = (hash, false);
// TODO: if error, retry?
2022-08-27 05:13:36 +03:00
let block: Block<TxHash> = match rpc {
2022-08-26 20:26:17 +03:00
Some(rpc) => {
2022-08-27 05:13:36 +03:00
rpc.wait_for_request_handle()
.await?
.request("eth_getBlockByHash", request_params)
.await?
2022-08-26 20:26:17 +03:00
}
2022-08-27 05:13:36 +03:00
None => {
let request =
json!({ "id": "1", "method": "eth_getBlockByHash", "params": request_params });
let request: JsonRpcRequest = serde_json::from_value(request)?;
2022-08-27 05:13:36 +03:00
let response = self.try_send_best_upstream_server(request, None).await?;
2022-08-27 05:13:36 +03:00
let block = response.result.unwrap();
serde_json::from_str(block.get())?
}
};
let block = Arc::new(block);
2022-08-30 23:01:42 +03:00
// the block was fetched using eth_getBlockByHash, so it should have all fields
2022-08-27 05:13:36 +03:00
self.save_block(&block)?;
Ok(block)
}
/// Convenience method to get the cannonical block at a given block height.
pub async fn block_hash(&self, num: &U64) -> anyhow::Result<H256> {
let block = self.cannonical_block(num).await?;
let hash = block.hash.unwrap();
Ok(hash)
}
/// Get the heaviest chain's block from cache or backend rpc
2022-08-30 23:01:42 +03:00
pub async fn cannonical_block(&self, num: &U64) -> anyhow::Result<ArcBlock> {
2022-08-28 02:49:41 +03:00
// we only have blocks by hash now
2022-08-30 23:01:42 +03:00
// maybe save them during save_block in a blocks_by_number DashMap<U64, Vec<ArcBlock>>
2022-08-28 02:49:41 +03:00
// if theres multiple, use petgraph to find the one on the main chain (and remove the others if they have enough confirmations)
2022-08-26 20:26:17 +03:00
// first, try to get the hash from our cache
2022-08-28 02:49:41 +03:00
if let Some(block_hash) = self.block_numbers.get(num) {
2022-09-01 08:58:55 +03:00
let block = self
.block_hashes
.get(&block_hash)
.expect("block_numbers gave us this hash");
2022-08-28 02:49:41 +03:00
2022-09-01 08:58:55 +03:00
return Ok(block.clone());
}
// block not in cache. we need to ask an rpc for it
// but before we do any queries, be sure the requested block num exists
2022-09-01 08:58:55 +03:00
let head_block_num = self
.head_block_num()
.ok_or_else(|| anyhow::anyhow!("no servers in sync"))?;
if num > &head_block_num {
// TODO: i'm seeing this a lot when using ethspam. i dont know why though. i thought we delayed publishing
// TODO: instead of error, maybe just sleep and try again?
return Err(anyhow::anyhow!(
"Head block is #{}, but #{} was requested",
head_block_num,
num
));
}
// TODO: helper for method+params => JsonRpcRequest
let request = json!({ "jsonrpc": "2.0", "id": "1", "method": "eth_getBlockByNumber", "params": (num, false) });
let request: JsonRpcRequest = serde_json::from_value(request)?;
// TODO: if error, retry?
let response = self
.try_send_best_upstream_server(request, Some(num))
.await?;
let block = response.result.unwrap();
let block: Block<TxHash> = serde_json::from_str(block.get())?;
let block = Arc::new(block);
2022-08-30 23:01:42 +03:00
// the block was fetched using eth_getBlockByNumber, so it should have all fields
2022-08-28 02:49:41 +03:00
self.save_block(&block)?;
Ok(block)
}
2022-08-26 20:26:17 +03:00
pub(super) async fn process_incoming_blocks(
2022-08-24 03:59:05 +03:00
&self,
2022-08-26 20:26:17 +03:00
block_receiver: flume::Receiver<BlockAndRpc>,
2022-08-24 03:59:05 +03:00
// TODO: head_block_sender should be a broadcast_sender like pending_tx_sender
2022-08-30 23:01:42 +03:00
head_block_sender: watch::Sender<ArcBlock>,
2022-08-24 03:59:05 +03:00
pending_tx_sender: Option<broadcast::Sender<TxStatus>>,
) -> anyhow::Result<()> {
// TODO: indexmap or hashmap? what hasher? with_capacity?
2022-08-26 20:26:17 +03:00
// TODO: this will grow unbounded. prune old heads on this at the same time we prune the graph?
let mut connection_heads = HashMap::new();
2022-08-24 03:59:05 +03:00
while let Ok((new_block, rpc)) = block_receiver.recv_async().await {
2022-08-27 02:44:25 +03:00
self.process_block_from_rpc(
2022-08-24 03:59:05 +03:00
&mut connection_heads,
new_block,
rpc,
&head_block_sender,
&pending_tx_sender,
)
.await?;
}
// TODO: if there was an error, we should return it
warn!("block_receiver exited!");
Ok(())
}
2022-08-26 20:26:17 +03:00
/// `connection_heads` is a mapping of rpc_names to head block hashes.
/// self.blockchain_map is a mapping of hashes to the complete Block<TxHash>.
2022-08-27 02:44:25 +03:00
/// TODO: return something?
async fn process_block_from_rpc(
&self,
2022-08-26 20:26:17 +03:00
connection_heads: &mut HashMap<String, H256>,
2022-08-30 23:01:42 +03:00
rpc_head_block: ArcBlock,
rpc: Arc<Web3Connection>,
2022-08-30 23:01:42 +03:00
head_block_sender: &watch::Sender<ArcBlock>,
2022-08-24 03:59:05 +03:00
pending_tx_sender: &Option<broadcast::Sender<TxStatus>>,
) -> anyhow::Result<()> {
2022-08-26 20:26:17 +03:00
// add the block to connection_heads
2022-09-01 08:58:55 +03:00
let rpc_block_id = match (rpc_head_block.hash, rpc_head_block.number) {
2022-08-27 06:11:58 +03:00
(Some(rpc_head_hash), Some(rpc_head_num)) => {
if rpc_head_num == U64::zero() {
2022-08-26 20:26:17 +03:00
debug!(%rpc, "still syncing");
2022-08-26 20:26:17 +03:00
connection_heads.remove(&rpc.name);
2022-09-01 08:58:55 +03:00
None
2022-08-26 20:26:17 +03:00
} else {
2022-08-27 06:11:58 +03:00
connection_heads.insert(rpc.name.to_owned(), rpc_head_hash);
2022-08-27 06:11:58 +03:00
self.save_block(&rpc_head_block)?;
2022-09-01 08:58:55 +03:00
Some(BlockId {
hash: rpc_head_hash,
num: rpc_head_num,
})
2022-08-26 20:26:17 +03:00
}
}
_ => {
2022-08-27 06:11:58 +03:00
warn!(%rpc, ?rpc_head_block, "Block without number or hash!");
2022-08-26 20:26:17 +03:00
connection_heads.remove(&rpc.name);
2022-08-26 20:26:17 +03:00
// don't return yet! self.synced_connections likely needs an update
2022-09-01 08:58:55 +03:00
None
}
2022-09-01 08:58:55 +03:00
};
2022-08-27 02:44:25 +03:00
// iterate the rpc_map to find the highest_work_block
let mut checked_heads = HashSet::new();
2022-08-30 23:01:42 +03:00
let mut highest_work_block: Option<Ref<H256, ArcBlock>> = None;
2022-08-26 20:26:17 +03:00
2022-09-01 08:58:55 +03:00
for rpc_head_hash in connection_heads.values() {
2022-08-27 02:44:25 +03:00
if checked_heads.contains(rpc_head_hash) {
continue;
}
2022-08-27 02:44:25 +03:00
checked_heads.insert(rpc_head_hash);
2022-08-26 20:26:17 +03:00
2022-08-28 02:49:41 +03:00
let rpc_head_block = self.block_hashes.get(rpc_head_hash).unwrap();
2022-08-30 23:01:42 +03:00
match &rpc_head_block.total_difficulty {
None => {
// no total difficulty
// TODO: should we fetch the block here? I think this shouldn't happen
warn!(?rpc, %rpc_head_hash, "block is missing total difficulty");
continue;
}
Some(td) => {
if highest_work_block.is_none()
|| td
> highest_work_block
.as_ref()
.expect("there should always be a block here")
.total_difficulty
.as_ref()
.expect("there should always be total difficulty here")
{
highest_work_block = Some(rpc_head_block);
}
}
2022-08-27 02:44:25 +03:00
}
}
2022-08-27 02:44:25 +03:00
// clone to release the read lock
let highest_work_block = highest_work_block.map(|x| x.clone());
let mut highest_work_block = match highest_work_block {
None => todo!("no servers are in sync"),
Some(highest_work_block) => highest_work_block,
};
2022-08-27 02:44:25 +03:00
// track names so we don't check the same node multiple times
2022-09-01 08:58:55 +03:00
let mut heavy_names: HashSet<&String> = HashSet::new();
2022-08-27 02:44:25 +03:00
// track rpcs so we can build a new SyncedConnections
2022-09-01 08:58:55 +03:00
let mut heavy_rpcs: Vec<&Arc<Web3Connection>> = vec![];
2022-08-27 02:44:25 +03:00
// a running total of the soft limits covered by the rpcs
2022-09-01 08:58:55 +03:00
let mut heavy_sum_soft_limit: u32 = 0;
2022-08-27 02:44:25 +03:00
// check the highest work block and its parents for a set of rpcs that can serve our request load
// TODO: loop for how many parent blocks? we don't want to serve blocks that are too far behind
let blockchain_guard = self.blockchain_graphmap.read();
for _ in 0..3 {
let highest_work_hash = highest_work_block.hash.as_ref().unwrap();
for (rpc_name, rpc_head_hash) in connection_heads.iter() {
2022-09-01 08:58:55 +03:00
if heavy_names.contains(rpc_name) {
2022-08-27 02:44:25 +03:00
// this block is already included
continue;
}
// TODO: does all_simple_paths make this check?
if rpc_head_hash == highest_work_hash {
if let Some(rpc) = self.conns.get(rpc_name) {
2022-09-01 08:58:55 +03:00
heavy_names.insert(rpc_name);
heavy_rpcs.push(rpc);
heavy_sum_soft_limit += rpc.soft_limit;
2022-08-27 02:44:25 +03:00
}
continue;
}
// TODO: cache all_simple_paths. there should be a high hit rate
// TODO: use an algo that saves scratch space?
// TODO: how slow is this?
let is_connected = all_simple_paths::<Vec<H256>, _>(
&*blockchain_guard,
*highest_work_hash,
*rpc_head_hash,
0,
// TODO: what should this max be? probably configurable per chain
Some(10),
)
.next()
.is_some();
if is_connected {
if let Some(rpc) = self.conns.get(rpc_name) {
2022-09-01 08:58:55 +03:00
heavy_rpcs.push(rpc);
heavy_sum_soft_limit += rpc.soft_limit;
2022-08-27 02:44:25 +03:00
}
}
}
2022-08-27 06:11:58 +03:00
// TODO: min_sum_soft_limit as a percentage of total_soft_limit?
// let min_sum_soft_limit = total_soft_limit / self.min_sum_soft_limit;
2022-09-01 08:58:55 +03:00
if heavy_sum_soft_limit >= self.min_sum_soft_limit {
2022-08-27 02:44:25 +03:00
// success! this block has enough nodes on it
break;
}
// else, we need to try the parent block
2022-09-01 08:58:55 +03:00
trace!(%heavy_sum_soft_limit, ?highest_work_hash, "avoiding thundering herd");
2022-08-27 02:44:25 +03:00
// // TODO: this automatically queries for parents, but need to rearrange lifetimes to make an await work here
// highest_work_block = self
// .block(&highest_work_block.parent_hash, Some(&rpc))
// .await?;
// we give known stale data just because we don't have enough capacity to serve the latest.
// TODO: maybe we should delay serving requests if this happens.
// TODO: don't unwrap. break if none?
2022-08-28 02:49:41 +03:00
match self.block_hashes.get(&highest_work_block.parent_hash) {
2022-08-27 02:44:25 +03:00
None => {
warn!(
"ran out of parents to check. soft limit only {}/{}: {}%",
2022-09-01 08:58:55 +03:00
heavy_sum_soft_limit,
2022-08-27 06:11:58 +03:00
self.min_sum_soft_limit,
2022-09-01 08:58:55 +03:00
heavy_sum_soft_limit * 100 / self.min_sum_soft_limit
2022-08-27 02:44:25 +03:00
);
break;
}
Some(parent_block) => {
highest_work_block = parent_block.clone();
}
}
2022-08-27 02:44:25 +03:00
}
// unlock self.blockchain_graphmap
drop(blockchain_guard);
2022-09-01 08:58:55 +03:00
let soft_limit_met = heavy_sum_soft_limit >= self.min_sum_soft_limit;
let num_synced_rpcs = heavy_rpcs.len() as u32;
2022-08-27 02:44:25 +03:00
let new_synced_connections = if soft_limit_met {
2022-09-01 08:58:55 +03:00
// we have a heavy large enough to serve traffic
2022-08-27 02:44:25 +03:00
let head_block_hash = highest_work_block.hash.unwrap();
let head_block_num = highest_work_block.number.unwrap();
2022-08-27 03:33:45 +03:00
if num_synced_rpcs < self.min_synced_rpcs {
2022-09-01 08:58:55 +03:00
// TODO: warn is too loud. if we are first starting, this is expected to happen
warn!(hash=%head_block_hash, num=?head_block_num, "not enough rpcs are synced to advance");
2022-08-27 02:44:25 +03:00
2022-09-01 08:58:55 +03:00
SyncedConnections::default()
2022-08-27 02:44:25 +03:00
} else {
// TODO: wait until at least most of the rpcs have given their initial block?
// otherwise, if there is a syncing node that is fast, our first head block might not be good
// TODO: have a configurable "minimum rpcs" number that we can set
// TODO: sort by weight and soft limit? do we need an IndexSet, or is a Vec fine?
2022-09-01 08:58:55 +03:00
let conns = heavy_rpcs.into_iter().cloned().collect();
let head_block_id = BlockId {
hash: head_block_hash,
num: head_block_num,
};
2022-08-27 02:44:25 +03:00
SyncedConnections {
2022-09-01 08:58:55 +03:00
head_block_id: Some(head_block_id),
2022-08-27 02:44:25 +03:00
conns,
}
}
} else {
2022-08-27 02:44:25 +03:00
// failure even after checking parent heads!
// not enough servers are in sync to server traffic
// TODO: at startup this is fine, but later its a problem
warn!("empty SyncedConnections");
SyncedConnections::default()
};
2022-09-01 08:58:55 +03:00
let heavy_block_id = new_synced_connections.head_block_id.clone();
2022-08-27 05:13:36 +03:00
let new_synced_connections = Arc::new(new_synced_connections);
let num_connection_heads = connection_heads.len();
2022-09-01 08:58:55 +03:00
let total_conns = self.conns.len();
2022-08-27 05:13:36 +03:00
let old_synced_connections = self.synced_connections.swap(new_synced_connections);
2022-09-01 08:58:55 +03:00
match (&old_synced_connections.head_block_id, &heavy_block_id) {
(None, None) => warn!("no servers synced"),
(None, Some(heavy_block_id)) => {
debug!(block=%heavy_block_id, %rpc, "first consensus head");
}
(Some(_), None) => warn!("no longer synced!"),
(Some(old_block_id), Some(heavy_block_id)) => {
match heavy_block_id.num.cmp(&old_block_id.num) {
Ordering::Equal => {
todo!("handle equal")
}
Ordering::Less => {
todo!("handle less")
}
Ordering::Greater => {
todo!("handle greater")
}
}
}
2022-08-27 06:11:58 +03:00
}
2022-09-01 08:58:55 +03:00
/*
if old_synced_connections.head_block_id.is_none() && rpc_head_block.hash.is_some() {
// this is fine. we have our first hash
} else if rpc_head_block.hash.is_some()
&& old_synced_connections.head_block_id.is_some()
&& old_synced_connections
.head_block_id
.as_ref()
.map_ok(|x| x.num)
!= rpc_head_block.hash
{
info!(new=%rpc_head_block.hash.unwrap(), new_num=?rpc_head_block.number.unwrap(), heavy=?heavy_block_id, %rpc, "non heavy head");
// TODO: anything else to do? maybe warn if these blocks are very far apart or forked for an extended period of time
// TODO: if there is any non-heavy head log how many nodes are on it
} */
/*
if heavy_block_num == U64::zero {
warn!(?soft_limit_met, %heavy_block_hash, %old_head_hash, %rpc, "NO heavy head {}/{}/{}", num_synced_rpcs, num_connection_heads, total_rpcs)
} else if heavy_block_hash == old_head_hash {
debug!(hash=%heavy_block_hash, num=%heavy_block_num, limit=%heavy_sum_soft_limit, %rpc, "cur heavy head {}/{}/{}", num_synced_rpcs, num_connection_heads, total_rpcs);
2022-08-27 05:13:36 +03:00
} else if soft_limit_met {
// TODO: if new's parent is not old, warn?
2022-09-01 08:58:55 +03:00
debug!(hash=%heavy_block_hash, num=%heavy_block_num, limit=%heavy_sum_soft_limit, %rpc, "NEW heavy head {}/{}/{}", num_synced_rpcs, num_connection_heads, total_rpcs);
2022-08-27 02:44:25 +03:00
// the head hash changed. forward to any subscribers
head_block_sender.send(highest_work_block)?;
2022-08-30 23:01:42 +03:00
// TODO: do something with pending_tx_sender
2022-08-27 05:13:36 +03:00
} else {
2022-09-01 08:58:55 +03:00
// TODO: i don't think we can get here
warn!(?soft_limit_met, %heavy_block_id, %old_head_hash, %rpc, "NO heavy head {}/{}/{}", num_synced_rpcs, num_connection_heads, total_rpcs)
}
2022-09-01 08:58:55 +03:00
*/
// TODO: the head hash changed. forward to any subscribers
head_block_sender.send(highest_work_block)?;
Ok(())
}
}