web3-proxy/web3_proxy/src/rpcs/blockchain.rs

582 lines
22 KiB
Rust
Raw Normal View History

use super::consensus::ConsensusFinder;
use super::many::Web3Rpcs;
///! Keep track of the blockchain as seen by a Web3Rpcs.
use super::one::Web3Rpc;
2022-08-24 03:59:05 +03:00
use super::transactions::TxStatus;
use crate::frontend::authorization::Authorization;
use crate::{config::BlockAndRpc, jsonrpc::JsonRpcRequest};
2023-02-15 04:41:40 +03:00
use anyhow::{anyhow, Context};
2022-08-26 20:26:17 +03:00
use derive_more::From;
2022-08-27 06:11:58 +03:00
use ethers::prelude::{Block, TxHash, H256, U64};
use log::{debug, trace, warn, Level};
2022-09-05 08:53:58 +03:00
use moka::future::Cache;
2022-09-01 08:58:55 +03:00
use serde::Serialize;
use serde_json::json;
2022-12-01 01:11:14 +03:00
use std::time::{SystemTime, UNIX_EPOCH};
2022-09-01 08:58:55 +03:00
use std::{cmp::Ordering, fmt::Display, sync::Arc};
use tokio::sync::broadcast;
use tokio::time::Duration;
2022-09-05 08:53:58 +03:00
// TODO: type for Hydrated Blocks with their full transactions?
2022-08-30 23:01:42 +03:00
pub type ArcBlock = Arc<Block<TxHash>>;
pub type BlocksByHashCache = Cache<H256, Web3ProxyBlock, hashbrown::hash_map::DefaultHashBuilder>;
2022-08-30 23:01:42 +03:00
/// A block and its age.
2022-09-01 08:58:55 +03:00
#[derive(Clone, Debug, Default, From, Serialize)]
pub struct Web3ProxyBlock {
2022-12-03 08:31:03 +03:00
pub block: ArcBlock,
/// number of seconds this block was behind the current time when received
/// this is only set if the block is from a subscription
pub received_age: Option<u64>,
2022-08-26 20:26:17 +03:00
}
impl PartialEq for Web3ProxyBlock {
2022-12-17 07:05:01 +03:00
fn eq(&self, other: &Self) -> bool {
match (self.block.hash, other.block.hash) {
(None, None) => true,
(Some(_), None) => false,
(None, Some(_)) => false,
(Some(s), Some(o)) => s == o,
}
}
}
impl Web3ProxyBlock {
/// A new block has arrived over a subscription
2023-02-15 04:41:40 +03:00
pub fn try_new(block: ArcBlock) -> Option<Self> {
if block.number.is_none() || block.hash.is_none() {
return None;
}
let mut x = Self {
block,
received_age: None,
};
2022-12-06 00:13:36 +03:00
// no need to recalulate lag every time
// if the head block gets too old, a health check restarts this connection
// TODO: emit a stat for received_age
x.received_age = Some(x.age());
2022-12-06 00:13:36 +03:00
2023-02-15 04:41:40 +03:00
Some(x)
2022-12-06 00:13:36 +03:00
}
pub fn age(&self) -> u64 {
2022-12-03 08:31:03 +03:00
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("there should always be time");
2022-12-06 00:13:36 +03:00
let block_timestamp = Duration::from_secs(self.block.timestamp.as_u64());
2022-12-03 08:31:03 +03:00
2023-01-03 19:33:49 +03:00
if block_timestamp < now {
2022-12-03 08:31:03 +03:00
// this server is still syncing from too far away to serve requests
// u64 is safe because ew checked equality above
(now - block_timestamp).as_secs()
2022-12-03 08:31:03 +03:00
} else {
0
2022-12-06 00:13:36 +03:00
}
2022-12-03 08:31:03 +03:00
}
#[inline(always)]
pub fn parent_hash(&self) -> &H256 {
&self.block.parent_hash
}
#[inline(always)]
pub fn hash(&self) -> &H256 {
self.block
.hash
.as_ref()
.expect("saved blocks must have a hash")
2022-12-03 08:31:03 +03:00
}
#[inline(always)]
pub fn number(&self) -> &U64 {
self.block
.number
.as_ref()
.expect("saved blocks must have a number")
2022-12-03 08:31:03 +03:00
}
}
2023-02-15 04:41:40 +03:00
impl TryFrom<ArcBlock> for Web3ProxyBlock {
type Error = anyhow::Error;
fn try_from(x: ArcBlock) -> Result<Self, Self::Error> {
if x.number.is_none() || x.hash.is_none() {
return Err(anyhow!("Blocks here must have a number of hash"));
}
let b = Web3ProxyBlock {
block: x,
received_age: None,
2023-02-15 04:41:40 +03:00
};
Ok(b)
2022-12-03 08:31:03 +03:00
}
}
impl Display for Web3ProxyBlock {
2022-09-01 08:58:55 +03:00
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{} ({}, {}s old)",
self.number(),
self.hash(),
self.age()
)
2022-09-01 08:58:55 +03:00
}
}
impl Web3Rpcs {
2022-12-03 08:31:03 +03:00
/// add a block to our mappings and track the heaviest chain
pub async fn try_cache_block(
&self,
block: Web3ProxyBlock,
heaviest_chain: bool,
) -> anyhow::Result<Web3ProxyBlock> {
2022-09-03 00:35:03 +03:00
// TODO: i think we can rearrange this function to make it faster on the hot path
let block_hash = block.hash();
// skip Block::default()
if block_hash.is_zero() {
debug!("Skipping block without hash!");
return Ok(block);
}
let block_num = block.number();
2022-10-27 00:39:26 +03:00
2022-09-30 07:18:18 +03:00
// TODO: think more about heaviest_chain. would be better to do the check inside this function
2022-09-05 09:13:36 +03:00
if heaviest_chain {
// this is the only place that writes to block_numbers
// multiple inserts should be okay though
2022-12-03 08:31:03 +03:00
// TODO: info that there was a fork?
self.blocks_by_number.insert(*block_num, *block_hash).await;
}
2022-12-03 08:31:03 +03:00
// this block is very likely already in block_hashes
// TODO: use their get_with
let block = self
.blocks_by_hash
2022-12-03 08:31:03 +03:00
.get_with(*block_hash, async move { block.clone() })
2022-09-05 09:13:36 +03:00
.await;
Ok(block)
}
2022-08-27 05:13:36 +03:00
/// Get a block from caches with fallback.
/// Will query a specific node or the best available.
/// TODO: return anyhow::Result<Option<ArcBlock>>?
2022-08-26 20:26:17 +03:00
pub async fn block(
&self,
authorization: &Arc<Authorization>,
2022-08-26 20:26:17 +03:00
hash: &H256,
rpc: Option<&Arc<Web3Rpc>>,
) -> anyhow::Result<Web3ProxyBlock> {
// first, try to get the hash from our cache
2022-09-03 00:35:03 +03:00
// the cache is set last, so if its here, its everywhere
// TODO: use try_get_with
if let Some(block) = self.blocks_by_hash.get(hash) {
2022-09-05 08:53:58 +03:00
return Ok(block);
}
// block not in cache. we need to ask an rpc for it
2022-09-23 00:51:52 +03:00
let get_block_params = (*hash, false);
// TODO: if error, retry?
let block: Web3ProxyBlock = match rpc {
2022-12-06 03:18:31 +03:00
Some(rpc) => rpc
.wait_for_request_handle(authorization, Some(Duration::from_secs(30)), None)
2022-12-06 03:18:31 +03:00
.await?
.request::<_, Option<ArcBlock>>(
2022-12-06 03:18:31 +03:00
"eth_getBlockByHash",
&json!(get_block_params),
Level::Error.into(),
None,
2022-12-06 03:18:31 +03:00
)
.await?
2023-02-15 04:41:40 +03:00
.and_then(|x| {
if x.number.is_none() {
None
} else {
x.try_into().ok()
}
})
2022-12-06 03:18:31 +03:00
.context("no block!")?,
2022-08-27 05:13:36 +03:00
None => {
// TODO: helper for method+params => JsonRpcRequest
// TODO: does this id matter?
2022-09-03 00:35:03 +03:00
let request = json!({ "id": "1", "method": "eth_getBlockByHash", "params": get_block_params });
2022-08-27 05:13:36 +03:00
let request: JsonRpcRequest = serde_json::from_value(request)?;
// TODO: request_metadata? maybe we should put it in the authorization?
2023-01-23 09:02:08 +03:00
// TODO: think more about this wait_for_sync
let response = self
2023-02-11 07:45:57 +03:00
.try_send_best_consensus_head_connection(
authorization,
request,
None,
None,
None,
)
.await?;
2023-03-08 03:02:17 +03:00
if let Some(err) = response.error {
let err = anyhow::anyhow!("{:#?}", err);
return Err(err.context("failed fetching block"));
}
let block = response.result.context("no error, but also no block")?;
2022-08-27 05:13:36 +03:00
2022-12-06 03:18:31 +03:00
let block: Option<ArcBlock> = serde_json::from_str(block.get())?;
2023-02-15 04:41:40 +03:00
let block: ArcBlock = block.context("no block in the response")?;
// TODO: received time is going to be weird
Web3ProxyBlock::try_from(block)?
2022-08-27 05:13:36 +03:00
}
};
2022-08-30 23:01:42 +03:00
// the block was fetched using eth_getBlockByHash, so it should have all fields
// TODO: fill in heaviest_chain! if the block is old enough, is this definitely true?
let block = self.try_cache_block(block, false).await?;
Ok(block)
}
/// Convenience method to get the cannonical block at a given block height.
pub async fn block_hash(
&self,
authorization: &Arc<Authorization>,
num: &U64,
2023-02-06 04:58:03 +03:00
) -> anyhow::Result<(H256, u64)> {
let (block, block_depth) = self.cannonical_block(authorization, num).await?;
let hash = *block.hash();
2023-02-06 04:58:03 +03:00
Ok((hash, block_depth))
}
/// Get the heaviest chain's block from cache or backend rpc
2023-01-23 09:02:08 +03:00
/// Caution! If a future block is requested, this might wait forever. Be sure to have a timeout outside of this!
pub async fn cannonical_block(
&self,
authorization: &Arc<Authorization>,
num: &U64,
) -> anyhow::Result<(Web3ProxyBlock, u64)> {
2022-08-28 02:49:41 +03:00
// we only have blocks by hash now
2022-09-05 08:53:58 +03:00
// maybe save them during save_block in a blocks_by_number Cache<U64, Vec<ArcBlock>>
2022-08-28 02:49:41 +03:00
// if theres multiple, use petgraph to find the one on the main chain (and remove the others if they have enough confirmations)
2022-08-26 20:26:17 +03:00
2023-01-23 09:02:08 +03:00
let mut consensus_head_receiver = self
.watch_consensus_head_sender
2023-01-23 09:02:08 +03:00
.as_ref()
.context("need new head subscriptions to fetch cannonical_block")?
.subscribe();
2023-01-23 09:02:08 +03:00
2022-09-03 00:35:03 +03:00
// be sure the requested block num exists
// TODO: is this okay? what if we aren't synced?!
2023-02-15 04:41:40 +03:00
let mut head_block_num = *consensus_head_receiver
.borrow_and_update()
.as_ref()
.context("no consensus head block")?
.number();
2022-11-03 02:14:16 +03:00
2023-01-23 09:02:08 +03:00
loop {
if num <= &head_block_num {
break;
2023-01-23 09:02:08 +03:00
}
2022-11-03 02:14:16 +03:00
trace!("waiting for future block {} > {}", num, head_block_num);
2023-01-23 09:02:08 +03:00
consensus_head_receiver.changed().await?;
2023-02-15 04:41:40 +03:00
if let Some(head) = consensus_head_receiver.borrow_and_update().as_ref() {
head_block_num = *head.number();
}
}
let block_depth = (head_block_num - num).as_u64();
2023-01-23 09:02:08 +03:00
2022-09-03 00:35:03 +03:00
// try to get the hash from our cache
// deref to not keep the lock open
if let Some(block_hash) = self.blocks_by_number.get(num) {
2022-09-03 00:35:03 +03:00
// TODO: sometimes this needs to fetch the block. why? i thought block_numbers would only be set if the block hash was set
// TODO: pass authorization through here?
let block = self.block(authorization, &block_hash, None).await?;
2022-11-03 02:14:16 +03:00
2023-02-06 04:58:03 +03:00
return Ok((block, block_depth));
2022-09-03 00:35:03 +03:00
}
// block number not in cache. we need to ask an rpc for it
// TODO: helper for method+params => JsonRpcRequest
let request = json!({ "jsonrpc": "2.0", "id": "1", "method": "eth_getBlockByNumber", "params": (num, false) });
let request: JsonRpcRequest = serde_json::from_value(request)?;
// TODO: if error, retry?
// TODO: request_metadata or authorization?
// we don't actually set min_block_needed here because all nodes have all blocks
let response = self
2023-02-11 07:45:57 +03:00
.try_send_best_consensus_head_connection(authorization, request, None, None, None)
.await?;
if let Some(err) = response.error {
debug!("could not find canonical block {}: {:?}", num, err);
}
let raw_block = response.result.context("no cannonical block result")?;
2022-11-21 01:52:08 +03:00
let block: ArcBlock = serde_json::from_str(raw_block.get())?;
2023-02-15 04:41:40 +03:00
let block = Web3ProxyBlock::try_from(block)?;
// the block was fetched using eth_getBlockByNumber, so it should have all fields and be on the heaviest chain
let block = self.try_cache_block(block, true).await?;
2023-02-06 04:58:03 +03:00
Ok((block, block_depth))
}
2022-08-26 20:26:17 +03:00
pub(super) async fn process_incoming_blocks(
2022-08-24 03:59:05 +03:00
&self,
authorization: &Arc<Authorization>,
2022-08-26 20:26:17 +03:00
block_receiver: flume::Receiver<BlockAndRpc>,
2022-09-05 08:53:58 +03:00
// TODO: document that this is a watch sender and not a broadcast! if things get busy, blocks might get missed
// Geth's subscriptions have the same potential for skipping blocks.
2022-08-24 03:59:05 +03:00
pending_tx_sender: Option<broadcast::Sender<TxStatus>>,
) -> anyhow::Result<()> {
2023-02-27 07:00:13 +03:00
let mut connection_heads = ConsensusFinder::new(self.max_block_age, self.max_block_lag);
2022-08-24 03:59:05 +03:00
2023-02-07 02:20:36 +03:00
loop {
match block_receiver.recv_async().await {
Ok((new_block, rpc)) => {
let rpc_name = rpc.name.clone();
if let Err(err) = self
.process_block_from_rpc(
authorization,
&mut connection_heads,
new_block,
rpc,
&pending_tx_sender,
)
.await
{
warn!("unable to process block from rpc {}: {:#?}", rpc_name, err);
2023-02-07 02:20:36 +03:00
}
}
Err(err) => {
warn!("block_receiver exited! {:#?}", err);
return Err(err.into());
}
}
2022-08-24 03:59:05 +03:00
}
}
2022-08-26 20:26:17 +03:00
/// `connection_heads` is a mapping of rpc_names to head block hashes.
2022-11-21 01:52:08 +03:00
/// self.blockchain_map is a mapping of hashes to the complete ArcBlock.
2022-08-27 02:44:25 +03:00
/// TODO: return something?
pub(crate) async fn process_block_from_rpc(
&self,
authorization: &Arc<Authorization>,
consensus_finder: &mut ConsensusFinder,
2023-02-27 07:00:13 +03:00
new_block: Option<Web3ProxyBlock>,
rpc: Arc<Web3Rpc>,
2022-08-24 03:59:05 +03:00
pending_tx_sender: &Option<broadcast::Sender<TxStatus>>,
) -> anyhow::Result<()> {
// TODO: how should we handle an error here?
if !consensus_finder
2023-02-27 07:00:13 +03:00
.update_rpc(new_block.clone(), rpc.clone(), self)
.await
.context("failed to update rpc")?
{
// nothing changed. no need to scan for a new consensus head
return Ok(());
}
2022-12-01 01:11:14 +03:00
let new_synced_connections = consensus_finder
.best_consensus_connections(authorization, self)
.await
.context("no consensus head block!")
.map_err(|err| {
self.watch_consensus_rpcs_sender.send_replace(None);
err
})?;
2022-12-01 01:11:14 +03:00
2023-01-23 09:02:08 +03:00
// TODO: what should we do if the block number of new_synced_connections is < old_synced_connections? wait?
let watch_consensus_head_sender = self.watch_consensus_head_sender.as_ref().unwrap();
2023-02-15 23:33:43 +03:00
let consensus_tier = new_synced_connections.tier;
let total_tiers = consensus_finder.len();
let backups_needed = new_synced_connections.backups_needed;
let consensus_head_block = new_synced_connections.head_block.clone();
let num_consensus_rpcs = new_synced_connections.num_conns();
2023-03-02 00:35:50 +03:00
let mut num_synced_rpcs = 0;
let num_active_rpcs = consensus_finder
.all_rpcs_group()
2023-03-02 00:35:50 +03:00
.map(|x| {
for v in x.rpc_to_block.values() {
if *v == consensus_head_block {
num_synced_rpcs += 1;
}
}
x.len()
})
.unwrap_or_default();
2023-02-27 07:00:13 +03:00
let total_rpcs = self.by_name.read().len();
2023-01-23 09:02:08 +03:00
let old_consensus_head_connections = self
.watch_consensus_rpcs_sender
.send_replace(Some(Arc::new(new_synced_connections)));
let backups_voted_str = if backups_needed { "B " } else { "" };
2023-01-20 05:14:47 +03:00
match old_consensus_head_connections.as_ref() {
None => {
debug!(
"first {}/{} {}{}/{}/{} block={}, rpc={}",
consensus_tier,
total_tiers,
backups_voted_str,
num_consensus_rpcs,
num_active_rpcs,
total_rpcs,
consensus_head_block,
rpc,
);
if backups_needed {
// TODO: what else should be in this error?
warn!("Backup RPCs are in use!");
2022-12-03 08:31:03 +03:00
}
// this should already be cached
let consensus_head_block = self.try_cache_block(consensus_head_block, true).await?;
watch_consensus_head_sender
.send(Some(consensus_head_block))
.context(
"watch_consensus_head_sender failed sending first consensus_head_block",
)?;
}
Some(old_consensus_connections) => {
let old_head_block = &old_consensus_connections.head_block;
// TODO: do this log item better
let rpc_head_str = new_block
.map(|x| x.to_string())
.unwrap_or_else(|| "None".to_string());
match consensus_head_block.number().cmp(old_head_block.number()) {
Ordering::Equal => {
// multiple blocks with the same fork!
if consensus_head_block.hash() == old_head_block.hash() {
// no change in hash. no need to use watch_consensus_head_sender
// TODO: trace level if rpc is backup
debug!(
2023-03-02 00:35:50 +03:00
"con {}/{} {}{}/{}/{}/{} con={} rpc={}@{}",
2023-02-15 23:33:43 +03:00
consensus_tier,
total_tiers,
backups_voted_str,
num_consensus_rpcs,
2023-03-02 00:35:50 +03:00
num_synced_rpcs,
num_active_rpcs,
total_rpcs,
consensus_head_block,
rpc,
rpc_head_str,
)
} else {
// hash changed
debug!(
2023-03-02 00:35:50 +03:00
"unc {}/{} {}{}/{}/{}/{} con_head={} old={} rpc={}@{}",
2023-02-15 23:33:43 +03:00
consensus_tier,
total_tiers,
backups_voted_str,
num_consensus_rpcs,
2023-03-02 00:35:50 +03:00
num_synced_rpcs,
num_active_rpcs,
total_rpcs,
consensus_head_block,
old_head_block,
rpc,
rpc_head_str,
);
let consensus_head_block = self
.try_cache_block(consensus_head_block, true)
.await
.context("save consensus_head_block as heaviest chain")?;
watch_consensus_head_sender
.send(Some(consensus_head_block))
.context("watch_consensus_head_sender failed sending uncled consensus_head_block")?;
}
}
Ordering::Less => {
// this is unlikely but possible
// TODO: better log
warn!(
2023-03-02 00:35:50 +03:00
"chain rolled back {}/{} {}{}/{}/{}/{} con={} old={} rpc={}@{}",
consensus_tier,
total_tiers,
backups_voted_str,
num_consensus_rpcs,
2023-03-02 00:35:50 +03:00
num_synced_rpcs,
num_active_rpcs,
total_rpcs,
consensus_head_block,
old_head_block,
rpc,
rpc_head_str,
);
if backups_needed {
// TODO: what else should be in this error?
warn!("Backup RPCs are in use!");
}
// TODO: tell save_block to remove any higher block numbers from the cache. not needed because we have other checks on requested blocks being > head, but still seems like a good idea
let consensus_head_block = self
.try_cache_block(consensus_head_block, true)
.await
.context("save_block sending consensus_head_block as heaviest chain")?;
watch_consensus_head_sender
.send(Some(consensus_head_block))
.context("watch_consensus_head_sender failed sending rollback consensus_head_block")?;
}
Ordering::Greater => {
debug!(
2023-03-02 00:35:50 +03:00
"new {}/{} {}{}/{}/{}/{} con={} rpc={}@{}",
consensus_tier,
total_tiers,
backups_voted_str,
num_consensus_rpcs,
2023-03-02 00:35:50 +03:00
num_synced_rpcs,
num_active_rpcs,
total_rpcs,
consensus_head_block,
rpc,
rpc_head_str,
);
if backups_needed {
// TODO: what else should be in this error?
warn!("Backup RPCs are in use!");
}
let consensus_head_block =
self.try_cache_block(consensus_head_block, true).await?;
watch_consensus_head_sender.send(Some(consensus_head_block)).context("watch_consensus_head_sender failed sending new consensus_head_block")?;
}
}
}
}
Ok(())
}
}