web3-proxy/web3_proxy/src/rpcs/blockchain.rs

524 lines
22 KiB
Rust
Raw Normal View History

///! Keep track of the blockchain as seen by a Web3Connections.
2022-08-24 03:11:49 +03:00
use super::connection::Web3Connection;
use super::connections::Web3Connections;
2022-08-24 03:59:05 +03:00
use super::transactions::TxStatus;
use crate::frontend::authorization::AuthorizedRequest;
2022-08-27 02:44:25 +03:00
use crate::{
config::BlockAndRpc, jsonrpc::JsonRpcRequest, rpcs::synced_connections::SyncedConnections,
};
2022-09-01 08:58:55 +03:00
use anyhow::Context;
2022-08-26 20:26:17 +03:00
use derive_more::From;
2022-08-27 06:11:58 +03:00
use ethers::prelude::{Block, TxHash, H256, U64};
2022-08-27 02:44:25 +03:00
use hashbrown::{HashMap, HashSet};
2022-09-05 08:53:58 +03:00
use moka::future::Cache;
2022-09-01 08:58:55 +03:00
use serde::Serialize;
use serde_json::json;
2022-09-01 08:58:55 +03:00
use std::{cmp::Ordering, fmt::Display, sync::Arc};
use tokio::sync::{broadcast, watch};
use tokio::time::Duration;
2022-10-29 01:52:47 +03:00
use tracing::{debug, instrument, trace, warn, Level};
2022-09-05 08:53:58 +03:00
// TODO: type for Hydrated Blocks with their full transactions?
2022-08-30 23:01:42 +03:00
pub type ArcBlock = Arc<Block<TxHash>>;
pub type BlockHashesCache = Cache<H256, ArcBlock, hashbrown::hash_map::DefaultHashBuilder>;
2022-08-30 23:01:42 +03:00
2022-08-27 06:11:58 +03:00
/// A block's hash and number.
2022-09-01 08:58:55 +03:00
#[derive(Clone, Debug, Default, From, Serialize)]
2022-08-26 20:26:17 +03:00
pub struct BlockId {
2022-09-07 06:54:16 +03:00
pub hash: H256,
pub num: U64,
2022-08-26 20:26:17 +03:00
}
2022-09-01 08:58:55 +03:00
impl Display for BlockId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{} ({})", self.num, self.hash)
}
}
impl Web3Connections {
2022-08-27 05:13:36 +03:00
/// add a block to our map and it's hash to our graphmap of the blockchain
2022-10-29 01:52:47 +03:00
#[instrument]
2022-09-05 09:13:36 +03:00
pub async fn save_block(&self, block: &ArcBlock, heaviest_chain: bool) -> anyhow::Result<()> {
2022-09-03 00:35:03 +03:00
// TODO: i think we can rearrange this function to make it faster on the hot path
2022-09-01 08:58:55 +03:00
let block_hash = block.hash.as_ref().context("no block hash")?;
// skip Block::default()
if block_hash.is_zero() {
debug!("Skipping block without hash!");
return Ok(());
}
2022-09-05 08:53:58 +03:00
let mut blockchain = self.blockchain_graphmap.write().await;
2022-09-03 00:35:03 +03:00
2022-10-27 00:39:26 +03:00
let block_num = block.number.as_ref().context("no block num")?;
2022-09-30 07:18:18 +03:00
// TODO: think more about heaviest_chain. would be better to do the check inside this function
2022-09-05 09:13:36 +03:00
if heaviest_chain {
// this is the only place that writes to block_numbers
// its inside a write lock on blockchain_graphmap, so i think there is no race
// multiple inserts should be okay though
2022-09-05 08:53:58 +03:00
self.block_numbers.insert(*block_num, *block_hash).await;
}
2022-09-05 09:13:36 +03:00
if blockchain.contains_node(*block_hash) {
2022-09-30 07:18:18 +03:00
trace!(%block_hash, %block_num, "block already saved");
2022-09-05 09:13:36 +03:00
return Ok(());
}
2022-10-27 03:12:42 +03:00
trace!(%block_hash, %block_num, "saving new block");
2022-09-05 09:13:36 +03:00
self.block_hashes
.insert(*block_hash, block.to_owned())
.await;
2022-08-30 23:01:42 +03:00
blockchain.add_node(*block_hash);
2022-08-26 20:26:17 +03:00
// what should edge weight be? and should the nodes be the blocks instead?
// we store parent_hash -> hash because the block already stores the parent_hash
2022-08-30 23:01:42 +03:00
blockchain.add_edge(block.parent_hash, *block_hash, 0);
2022-08-28 02:49:41 +03:00
2022-09-30 07:18:18 +03:00
// TODO: prune blockchain to only keep a configurable (256 on ETH?) number of blocks?
2022-08-26 20:26:17 +03:00
Ok(())
}
2022-08-27 05:13:36 +03:00
/// Get a block from caches with fallback.
/// Will query a specific node or the best available.
2022-10-29 01:52:47 +03:00
#[instrument(level = "trace")]
2022-08-26 20:26:17 +03:00
pub async fn block(
&self,
2022-09-24 08:53:45 +03:00
authorized_request: Option<&Arc<AuthorizedRequest>>,
2022-08-26 20:26:17 +03:00
hash: &H256,
2022-08-27 05:13:36 +03:00
rpc: Option<&Arc<Web3Connection>>,
2022-08-30 23:01:42 +03:00
) -> anyhow::Result<ArcBlock> {
// first, try to get the hash from our cache
2022-09-03 00:35:03 +03:00
// the cache is set last, so if its here, its everywhere
2022-08-28 02:49:41 +03:00
if let Some(block) = self.block_hashes.get(hash) {
2022-09-05 08:53:58 +03:00
return Ok(block);
}
// block not in cache. we need to ask an rpc for it
2022-09-23 00:51:52 +03:00
let get_block_params = (*hash, false);
// TODO: if error, retry?
2022-08-27 05:13:36 +03:00
let block: Block<TxHash> = match rpc {
2022-08-26 20:26:17 +03:00
Some(rpc) => {
2022-09-24 08:53:45 +03:00
rpc.wait_for_request_handle(authorized_request, Duration::from_secs(30))
2022-08-27 05:13:36 +03:00
.await?
.request(
"eth_getBlockByHash",
&json!(get_block_params),
Level::ERROR.into(),
)
2022-08-27 05:13:36 +03:00
.await?
2022-08-26 20:26:17 +03:00
}
2022-08-27 05:13:36 +03:00
None => {
// TODO: helper for method+params => JsonRpcRequest
// TODO: does this id matter?
2022-09-03 00:35:03 +03:00
let request = json!({ "id": "1", "method": "eth_getBlockByHash", "params": get_block_params });
2022-08-27 05:13:36 +03:00
let request: JsonRpcRequest = serde_json::from_value(request)?;
// TODO: request_metadata? maybe we should put it in the authorized_request?
let response = self
.try_send_best_upstream_server(authorized_request, request, None, None)
.await?;
2022-08-27 05:13:36 +03:00
let block = response.result.unwrap();
serde_json::from_str(block.get())?
}
};
let block = Arc::new(block);
2022-08-30 23:01:42 +03:00
// the block was fetched using eth_getBlockByHash, so it should have all fields
// TODO: fill in heaviest_chain! if the block is old enough, is this definitely true?
self.save_block(&block, false).await?;
Ok(block)
}
/// Convenience method to get the cannonical block at a given block height.
2022-11-03 02:14:16 +03:00
pub async fn block_hash(&self, num: &U64) -> anyhow::Result<(H256, bool)> {
let (block, is_archive_block) = self.cannonical_block(num).await?;
2022-11-03 02:14:16 +03:00
let hash = block.hash.expect("Saved blocks should always have hashes");
2022-11-03 02:14:16 +03:00
Ok((hash, is_archive_block))
}
/// Get the heaviest chain's block from cache or backend rpc
2022-11-03 02:14:16 +03:00
pub async fn cannonical_block(&self, num: &U64) -> anyhow::Result<(ArcBlock, bool)> {
2022-08-28 02:49:41 +03:00
// we only have blocks by hash now
2022-09-05 08:53:58 +03:00
// maybe save them during save_block in a blocks_by_number Cache<U64, Vec<ArcBlock>>
2022-08-28 02:49:41 +03:00
// if theres multiple, use petgraph to find the one on the main chain (and remove the others if they have enough confirmations)
2022-08-26 20:26:17 +03:00
2022-09-03 00:35:03 +03:00
// be sure the requested block num exists
2022-09-14 05:11:48 +03:00
let head_block_num = self.head_block_num().context("no servers in sync")?;
2022-11-03 02:14:16 +03:00
// TODO: not 64 on all chains? get from config?
let archive_needed = num < &(head_block_num - U64::from(64));
if num > &head_block_num {
// TODO: i'm seeing this a lot when using ethspam. i dont know why though. i thought we delayed publishing
// TODO: instead of error, maybe just sleep and try again?
2022-11-03 02:14:16 +03:00
// TODO: this should be a 401, not a 500
return Err(anyhow::anyhow!(
"Head block is #{}, but #{} was requested",
head_block_num,
num
));
}
2022-09-03 00:35:03 +03:00
// try to get the hash from our cache
// deref to not keep the lock open
2022-09-05 08:53:58 +03:00
if let Some(block_hash) = self.block_numbers.get(num) {
2022-09-03 00:35:03 +03:00
// TODO: sometimes this needs to fetch the block. why? i thought block_numbers would only be set if the block hash was set
2022-09-24 08:53:45 +03:00
// TODO: pass authorized_request through here?
2022-11-03 02:14:16 +03:00
let block = self.block(None, &block_hash, None).await?;
return Ok((block, archive_needed));
2022-09-03 00:35:03 +03:00
}
// block number not in cache. we need to ask an rpc for it
// TODO: helper for method+params => JsonRpcRequest
let request = json!({ "jsonrpc": "2.0", "id": "1", "method": "eth_getBlockByNumber", "params": (num, false) });
let request: JsonRpcRequest = serde_json::from_value(request)?;
// TODO: if error, retry?
// TODO: request_metadata or authorized_request?
let response = self
.try_send_best_upstream_server(None, request, None, Some(num))
.await?;
2022-09-02 23:16:20 +03:00
let raw_block = response.result.context("no block result")?;
2022-09-02 23:16:20 +03:00
let block: Block<TxHash> = serde_json::from_str(raw_block.get())?;
let block = Arc::new(block);
// the block was fetched using eth_getBlockByNumber, so it should have all fields and be on the heaviest chain
2022-09-05 09:13:36 +03:00
self.save_block(&block, true).await?;
2022-11-03 02:14:16 +03:00
Ok((block, true))
}
2022-08-26 20:26:17 +03:00
pub(super) async fn process_incoming_blocks(
2022-08-24 03:59:05 +03:00
&self,
2022-08-26 20:26:17 +03:00
block_receiver: flume::Receiver<BlockAndRpc>,
2022-09-05 08:53:58 +03:00
// TODO: document that this is a watch sender and not a broadcast! if things get busy, blocks might get missed
// Geth's subscriptions have the same potential for skipping blocks.
2022-08-30 23:01:42 +03:00
head_block_sender: watch::Sender<ArcBlock>,
2022-08-24 03:59:05 +03:00
pending_tx_sender: Option<broadcast::Sender<TxStatus>>,
) -> anyhow::Result<()> {
// TODO: indexmap or hashmap? what hasher? with_capacity?
2022-08-26 20:26:17 +03:00
// TODO: this will grow unbounded. prune old heads on this at the same time we prune the graph?
let mut connection_heads = HashMap::new();
2022-08-24 03:59:05 +03:00
while let Ok((new_block, rpc)) = block_receiver.recv_async().await {
let rpc_name = rpc.name.clone();
if let Err(err) = self
.process_block_from_rpc(
&mut connection_heads,
new_block,
rpc,
&head_block_sender,
&pending_tx_sender,
)
.await
{
warn!(rpc=%rpc_name, ?err, "unable to process block from rpc");
}
2022-08-24 03:59:05 +03:00
}
// TODO: if there was an error, we should return it
warn!("block_receiver exited!");
Ok(())
}
2022-08-26 20:26:17 +03:00
/// `connection_heads` is a mapping of rpc_names to head block hashes.
/// self.blockchain_map is a mapping of hashes to the complete Block<TxHash>.
2022-08-27 02:44:25 +03:00
/// TODO: return something?
async fn process_block_from_rpc(
&self,
2022-08-26 20:26:17 +03:00
connection_heads: &mut HashMap<String, H256>,
rpc_head_block: Option<ArcBlock>,
rpc: Arc<Web3Connection>,
2022-08-30 23:01:42 +03:00
head_block_sender: &watch::Sender<ArcBlock>,
2022-08-24 03:59:05 +03:00
pending_tx_sender: &Option<broadcast::Sender<TxStatus>>,
) -> anyhow::Result<()> {
// add the rpc's block to connection_heads, or remove the rpc from connection_heads
let rpc_head_id = match rpc_head_block {
Some(rpc_head_block) => {
let rpc_head_num = rpc_head_block.number.unwrap();
let rpc_head_hash = rpc_head_block.hash.unwrap();
if rpc_head_num.is_zero() {
// TODO: i don't think we can get to this anymore now that we use Options
2022-08-26 20:26:17 +03:00
debug!(%rpc, "still syncing");
2022-08-26 20:26:17 +03:00
connection_heads.remove(&rpc.name);
None
2022-08-26 20:26:17 +03:00
} else {
// we don't know if its on the heaviest chain yet
2022-09-05 09:13:36 +03:00
self.save_block(&rpc_head_block, false).await?;
connection_heads.insert(rpc.name.to_owned(), rpc_head_hash);
Some(BlockId {
hash: rpc_head_hash,
num: rpc_head_num,
})
2022-08-26 20:26:17 +03:00
}
}
None => {
2022-09-02 23:46:39 +03:00
// TODO: warn is too verbose. this is expected if a node disconnects and has to reconnect
2022-09-03 05:59:30 +03:00
trace!(%rpc, "Block without number or hash!");
2022-08-26 20:26:17 +03:00
connection_heads.remove(&rpc.name);
None
}
2022-09-01 08:58:55 +03:00
};
// iterate the known heads to find the highest_work_block
2022-08-27 02:44:25 +03:00
let mut checked_heads = HashSet::new();
let mut highest_num_block: Option<ArcBlock> = None;
2022-09-12 17:33:55 +03:00
for (conn_name, connection_head_hash) in connection_heads.iter() {
if checked_heads.contains(connection_head_hash) {
// we already checked this head from another rpc
2022-08-27 02:44:25 +03:00
continue;
}
// don't check the same hash multiple times
2022-09-12 17:33:55 +03:00
checked_heads.insert(connection_head_hash);
2022-08-26 20:26:17 +03:00
2022-09-12 17:33:55 +03:00
let conn_head_block = if let Some(x) = self.block_hashes.get(connection_head_hash) {
2022-09-02 23:46:39 +03:00
x
} else {
// TODO: why does this happen?!?! seems to only happen with uncled blocks
// TODO: maybe we should do get_with?
// TODO: maybe we should just continue. this only seems to happen when an older block is received
warn!(%connection_head_hash, %conn_name, %rpc, "Missing connection_head_block in block_hashes. Fetching now");
// this option should always be populated
let conn_rpc = self.conns.get(conn_name);
match self.block(None, connection_head_hash, conn_rpc).await {
Ok(block) => block,
Err(err) => {
warn!(%connection_head_hash, %conn_name, %rpc, ?err, "Failed fetching connection_head_block for block_hashes");
continue;
}
}
2022-09-02 23:46:39 +03:00
};
match &conn_head_block.number {
2022-08-30 23:01:42 +03:00
None => {
panic!("block is missing number. this is a bug");
2022-08-30 23:01:42 +03:00
}
Some(conn_head_num) => {
// if this is the first block we've tried
// or if this rpc's newest block has a higher number
// we used to check total difficulty, but that isn't a thing anymore
if highest_num_block.is_none()
|| conn_head_num
> highest_num_block
2022-08-30 23:01:42 +03:00
.as_ref()
.expect("there should always be a block here")
.number
2022-08-30 23:01:42 +03:00
.as_ref()
.expect("there should always be number here")
2022-08-30 23:01:42 +03:00
{
highest_num_block = Some(conn_head_block);
2022-08-30 23:01:42 +03:00
}
}
2022-08-27 02:44:25 +03:00
}
}
if let Some(mut maybe_head_block) = highest_num_block {
// track rpcs on this heaviest chain so we can build a new SyncedConnections
let mut highest_rpcs = HashSet::<&String>::new();
// a running total of the soft limits covered by the rpcs that agree on the head block
let mut highest_rpcs_sum_soft_limit: u32 = 0;
// TODO: also track highest_rpcs_sum_hard_limit? llama doesn't need this, so it can wait
// check the highest work block for a set of rpcs that can serve our request load
// if it doesn't have enough rpcs for our request load, check the parent block
// TODO: loop for how many parent blocks? we don't want to serve blocks that are too far behind. probably different per chain
// TODO: this loop is pretty long. any way to clean up this code?
for _ in 0..3 {
let maybe_head_hash = maybe_head_block
.hash
.as_ref()
.expect("blocks here always need hashes");
// find all rpcs with maybe_head_block as their current head
for (conn_name, conn_head_hash) in connection_heads.iter() {
if conn_head_hash != maybe_head_hash {
2022-09-05 19:29:21 +03:00
// connection is not on the desired block
continue;
}
if highest_rpcs.contains(conn_name) {
2022-09-05 19:29:21 +03:00
// connection is on a child block
continue;
}
2022-08-27 02:44:25 +03:00
if let Some(rpc) = self.conns.get(conn_name) {
highest_rpcs.insert(conn_name);
highest_rpcs_sum_soft_limit += rpc.soft_limit;
} else {
warn!("connection missing")
2022-08-27 02:44:25 +03:00
}
}
if highest_rpcs_sum_soft_limit < self.min_sum_soft_limit
|| highest_rpcs.len() < self.min_synced_rpcs
{
// not enough rpcs yet. check the parent
if let Some(parent_block) = self.block_hashes.get(&maybe_head_block.parent_hash)
{
trace!(
child=%maybe_head_hash, parent=%parent_block.hash.unwrap(), "avoiding thundering herd",
);
maybe_head_block = parent_block;
continue;
} else {
warn!(
"no parent to check. soft limit only {}/{} from {}/{} rpcs: {}%",
highest_rpcs_sum_soft_limit,
self.min_sum_soft_limit,
highest_rpcs.len(),
self.min_synced_rpcs,
highest_rpcs_sum_soft_limit * 100 / self.min_sum_soft_limit
);
break;
2022-08-27 02:44:25 +03:00
}
}
2022-09-05 19:25:21 +03:00
}
// TODO: if consensus_head_rpcs.is_empty, try another method of finding the head block
2022-09-05 19:25:21 +03:00
let num_connection_heads = connection_heads.len();
let total_conns = self.conns.len();
2022-09-05 19:25:21 +03:00
// we've done all the searching for the heaviest block that we can
if highest_rpcs.is_empty() {
2022-09-05 19:25:21 +03:00
// if we get here, something is wrong. clear synced connections
let empty_synced_connections = SyncedConnections::default();
2022-11-01 21:54:39 +03:00
let _ = self
2022-09-05 19:25:21 +03:00
.synced_connections
.swap(Arc::new(empty_synced_connections));
2022-09-05 19:25:21 +03:00
// TODO: log different things depending on old_synced_connections
warn!(%rpc, "no consensus head! {}/{}/{}", 0, num_connection_heads, total_conns);
2022-09-05 19:25:21 +03:00
} else {
trace!(?highest_rpcs);
// TODO: if maybe_head_block.time() is old, ignore it
// success! this block has enough soft limit and nodes on it (or on later blocks)
let conns: Vec<Arc<Web3Connection>> = highest_rpcs
2022-09-05 19:25:21 +03:00
.into_iter()
.filter_map(|conn_name| self.conns.get(conn_name).cloned())
.collect();
let consensus_head_block = maybe_head_block;
let consensus_head_hash = consensus_head_block
.hash
.expect("head blocks always have hashes");
let consensus_head_num = consensus_head_block
.number
.expect("head blocks always have numbers");
debug_assert_ne!(consensus_head_num, U64::zero());
2022-09-05 19:25:21 +03:00
let num_consensus_rpcs = conns.len();
let consensus_head_block_id = BlockId {
hash: consensus_head_hash,
num: consensus_head_num,
};
let new_synced_connections = SyncedConnections {
head_block_id: Some(consensus_head_block_id.clone()),
conns,
};
let old_synced_connections = self
.synced_connections
.swap(Arc::new(new_synced_connections));
// TODO: if the rpc_head_block != consensus_head_block, log something?
match &old_synced_connections.head_block_id {
None => {
debug!(block=%consensus_head_block_id, %rpc, "first {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns);
2022-09-02 23:16:20 +03:00
self.save_block(&consensus_head_block, true).await?;
2022-09-02 23:16:20 +03:00
head_block_sender
.send(consensus_head_block)
.context("head_block_sender sending consensus_head_block")?;
}
Some(old_block_id) => {
// TODO: do this log item better
let rpc_head_str = rpc_head_id
.map(|x| x.to_string())
.unwrap_or_else(|| "None".to_string());
match consensus_head_block_id.num.cmp(&old_block_id.num) {
Ordering::Equal => {
// TODO: if rpc_block_id != consensus_head_block_id, do a different log?
// multiple blocks with the same fork!
if consensus_head_block_id.hash == old_block_id.hash {
// no change in hash. no need to use head_block_sender
debug!(con_head=%consensus_head_block_id, rpc_head=%rpc_head_str, %rpc, "con {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns)
} else {
// hash changed
debug!(con_head=%consensus_head_block_id, old=%old_block_id, rpc_head=%rpc_head_str, %rpc, "unc {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns);
self.save_block(&consensus_head_block, true)
.await
.context("save consensus_head_block as heaviest chain")?;
head_block_sender.send(consensus_head_block).context(
"head_block_sender sending consensus_head_block",
)?;
}
}
Ordering::Less => {
// this is unlikely but possible
// TODO: better log
warn!(con_head=%consensus_head_block_id, old_head=%old_block_id, rpc_head=%rpc_head_str, %rpc, "chain rolled back {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns);
2022-09-02 23:16:20 +03:00
// TODO: tell save_block to remove any higher block numbers from the cache. not needed because we have other checks on requested blocks being > head, but still seems slike a good idea
self.save_block(&consensus_head_block, true).await.context(
"save_block sending consensus_head_block as heaviest chain",
)?;
2022-09-02 23:16:20 +03:00
head_block_sender
.send(consensus_head_block)
.context("head_block_sender sending consensus_head_block")?;
}
Ordering::Greater => {
debug!(con_head=%consensus_head_block_id, rpc_head=%rpc_head_str, %rpc, "new {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns);
self.save_block(&consensus_head_block, true).await?;
2022-09-02 23:16:20 +03:00
head_block_sender.send(consensus_head_block)?;
}
}
}
}
}
2022-08-27 02:44:25 +03:00
}
2022-09-02 23:16:20 +03:00
Ok(())
}
}