i think it works
This commit is contained in:
parent
81254a24be
commit
5719397466
20
TODO.md
20
TODO.md
@ -90,28 +90,28 @@
|
|||||||
- whenever blocks were slow, we started checking as fast as possible
|
- whenever blocks were slow, we started checking as fast as possible
|
||||||
- [x] create user script should allow setting requests per minute
|
- [x] create user script should allow setting requests per minute
|
||||||
- [x] cache api keys that are not in the database
|
- [x] cache api keys that are not in the database
|
||||||
- [ ] improve consensus block selection. Our goal is to find the highest work chain with a block over a minimum threshold of sum_soft_limit.
|
- [x] improve consensus block selection. Our goal is to find the highest work chain with a block over a minimum threshold of sum_soft_limit.
|
||||||
- [x] A new block arrives at a connection.
|
- [x] A new block arrives at a connection.
|
||||||
- [x] It checks that it isn't the same that it already has (which is a problem with polling nodes)
|
- [x] It checks that it isn't the same that it already has (which is a problem with polling nodes)
|
||||||
- [x] If its new to this node...
|
- [x] If its new to this node...
|
||||||
- [x] if the block does not have total work, check our cache. otherwise, query the node
|
- [x] if the block does not have total work, check our cache. otherwise, query the node
|
||||||
- [x] save the block num and hash so that http polling doesn't send duplicates
|
- [x] save the block num and hash so that http polling doesn't send duplicates
|
||||||
- [x] send the deduped block through a channel to be handled by the connections grouping.
|
- [x] send the deduped block through a channel to be handled by the connections grouping.
|
||||||
- [ ] The connections group...
|
- [x] The connections group...
|
||||||
- [x] input = rpc, new_block
|
- [x] input = rpc, new_block
|
||||||
- [ ] adds the block and rpc to it's internal BlockchainMap (this persists).
|
- [x] adds the block and rpc to it's internal maps
|
||||||
- [x] connection_heads: HashMap<rpc_name, blockhash>
|
- [x] connection_heads: HashMap<rpc_name, blockhash>
|
||||||
- [x] block_map: DashMap<blockhash, Arc<Block>>
|
- [x] block_map: DashMap<blockhash, Arc<Block>>
|
||||||
- [x] blockchain: DiGraphMap<blockhash, ?>
|
- [x] blockchain: DiGraphMap<blockhash, ?>
|
||||||
- [ ] iterate the rpc_map to find the highest_work_block
|
- [x] iterate the rpc_map to find the highest_work_block
|
||||||
- [ ] oldest_block_num = highest_work_block.number - 256
|
- [?] oldest_block_num = highest_work_block.number - 256
|
||||||
- think more about this. if we have to go back more than a couple blocks, we will serve very stale data
|
- think more about this. if we have to go back more than a couple blocks, we will serve very stale data
|
||||||
- [ ] while sum_soft_limit < min_sum_soft_limit:
|
- [x] while sum_soft_limit < min_sum_soft_limit:
|
||||||
- [ ] consensus_head_hash = block.parent_hash
|
- [x] consensus_head_hash = block.parent_hash
|
||||||
- [ ] sum_soft_limit = ??? (something with iterating rpc_map, caches, and petgraph's all_simple_paths)
|
- [x] sum_soft_limit = ??? (something with iterating rpc_map, caches, and petgraph's all_simple_paths)
|
||||||
- if all_simple_paths returns no paths, warn about a chain split?
|
- if all_simple_paths returns no paths, warn about a chain split?
|
||||||
- [ ] error if this is too old? sucks to have downtime, but its the chain thats having problems
|
- [x] now that we have a consensus head with enough soft limit (or an empty set), update SyncedConnections
|
||||||
- [ ] now that we have a consensus head with enough soft limit, update SyncedConnections
|
- [x] send the block through new head_block_sender
|
||||||
- [-] use siwe messages and signatures for sign up and login
|
- [-] use siwe messages and signatures for sign up and login
|
||||||
- [-] requests for "Get transactions receipts" are routed to the private_rpcs and not the balanced_rpcs. do this better.
|
- [-] requests for "Get transactions receipts" are routed to the private_rpcs and not the balanced_rpcs. do this better.
|
||||||
- [x] quick fix, send to balanced_rpcs for now. we will just live with errors on new transactions.
|
- [x] quick fix, send to balanced_rpcs for now. we will just live with errors on new transactions.
|
||||||
|
@ -2,15 +2,18 @@
|
|||||||
use super::connection::Web3Connection;
|
use super::connection::Web3Connection;
|
||||||
use super::connections::Web3Connections;
|
use super::connections::Web3Connections;
|
||||||
use super::transactions::TxStatus;
|
use super::transactions::TxStatus;
|
||||||
use crate::{config::BlockAndRpc, jsonrpc::JsonRpcRequest};
|
use crate::{
|
||||||
|
config::BlockAndRpc, jsonrpc::JsonRpcRequest, rpcs::synced_connections::SyncedConnections,
|
||||||
|
};
|
||||||
|
use dashmap::mapref::one::Ref;
|
||||||
use derive_more::From;
|
use derive_more::From;
|
||||||
use ethers::prelude::{Block, TxHash, H256, U256, U64};
|
use ethers::prelude::{Block, TxHash, H256, U256, U64};
|
||||||
use hashbrown::HashMap;
|
use hashbrown::{HashMap, HashSet};
|
||||||
use petgraph::prelude::DiGraphMap;
|
use petgraph::algo::all_simple_paths;
|
||||||
use serde_json::json;
|
use serde_json::json;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio::sync::{broadcast, watch};
|
use tokio::sync::{broadcast, watch};
|
||||||
use tracing::{debug, warn};
|
use tracing::{debug, info, trace, warn};
|
||||||
|
|
||||||
#[derive(Default, From)]
|
#[derive(Default, From)]
|
||||||
pub struct BlockId {
|
pub struct BlockId {
|
||||||
@ -25,97 +28,6 @@ pub(super) struct BlockMetadata<'a> {
|
|||||||
pub(super) sum_soft_limit: u32,
|
pub(super) sum_soft_limit: u32,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// TODO: do we need this? probably big refactor still to do
|
|
||||||
/// The RPCs grouped by number and hash.
|
|
||||||
#[derive(Default)]
|
|
||||||
struct BlockchainAndRpcs<'a> {
|
|
||||||
// TODO: fifomap? or just manually remove once we add too much
|
|
||||||
rpcs_by_num: HashMap<U64, Vec<&'a str>>,
|
|
||||||
rpcs_by_hash: HashMap<H256, Vec<&'a str>>,
|
|
||||||
blocks_by_hash: HashMap<H256, Arc<Block<TxHash>>>,
|
|
||||||
/// Node is the blockhash.
|
|
||||||
/// You can get the blocks from block_map on the Web3Connections
|
|
||||||
/// TODO: what should the edge weight be? difficulty?
|
|
||||||
blockchain: DiGraphMap<H256, u8>,
|
|
||||||
total_soft_limit: u32,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<'a> BlockchainAndRpcs<'a> {
|
|
||||||
/// group the RPCs by their current head block
|
|
||||||
pub async fn new(
|
|
||||||
// TODO: think more about this key. maybe it should be an Arc?
|
|
||||||
connection_heads: &'a HashMap<String, Arc<Block<TxHash>>>,
|
|
||||||
web3_conns: &Web3Connections,
|
|
||||||
) -> Option<BlockchainAndRpcs<'a>> {
|
|
||||||
let mut new = Self::default();
|
|
||||||
|
|
||||||
let lowest_block_num = if let Some(lowest_block) = connection_heads
|
|
||||||
.values()
|
|
||||||
.min_by(|a, b| a.number.cmp(&b.number))
|
|
||||||
{
|
|
||||||
lowest_block
|
|
||||||
.number
|
|
||||||
.expect("all blocks here should have a number")
|
|
||||||
} else {
|
|
||||||
// if no lowest block number, then no servers are in sync
|
|
||||||
return None;
|
|
||||||
};
|
|
||||||
|
|
||||||
// TODO: what if lowest_block_num is far from the highest head block num?
|
|
||||||
|
|
||||||
for (rpc_name, head_block) in connection_heads.iter() {
|
|
||||||
if let Some(rpc) = web3_conns.get(rpc_name) {
|
|
||||||
// we need the total soft limit in order to know when its safe to update the backends
|
|
||||||
new.total_soft_limit += rpc.soft_limit;
|
|
||||||
|
|
||||||
let head_hash = head_block.hash.unwrap();
|
|
||||||
|
|
||||||
// save the block
|
|
||||||
new.blocks_by_hash
|
|
||||||
.entry(head_hash)
|
|
||||||
.or_insert_with(|| head_block.clone());
|
|
||||||
|
|
||||||
// add the rpc to all relevant block heights
|
|
||||||
// TODO: i feel like we should be able to do this with a graph
|
|
||||||
let mut block = head_block.clone();
|
|
||||||
while block.number.unwrap() >= lowest_block_num {
|
|
||||||
let block_hash = block.hash.unwrap();
|
|
||||||
let block_num = block.number.unwrap();
|
|
||||||
|
|
||||||
// save the rpc by the head hash
|
|
||||||
let rpc_urls_by_hash =
|
|
||||||
new.rpcs_by_hash.entry(block_hash).or_insert_with(Vec::new);
|
|
||||||
rpc_urls_by_hash.push(rpc_name);
|
|
||||||
|
|
||||||
// save the rpc by the head number
|
|
||||||
let rpc_names_by_num =
|
|
||||||
new.rpcs_by_num.entry(block_num).or_insert_with(Vec::new);
|
|
||||||
rpc_names_by_num.push(rpc_name);
|
|
||||||
|
|
||||||
if let Ok(parent) = web3_conns
|
|
||||||
.block(&block.parent_hash, Some(rpc.as_ref()))
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
// save the parent block
|
|
||||||
new.blocks_by_hash.insert(block.parent_hash, parent.clone());
|
|
||||||
|
|
||||||
block = parent
|
|
||||||
} else {
|
|
||||||
// log this? eventually we will hit a block we don't have, so it's not an error
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Some(new)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn consensus_head() {
|
|
||||||
todo!()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<'a> BlockMetadata<'a> {
|
impl<'a> BlockMetadata<'a> {
|
||||||
// TODO: there are sortable traits, but this seems simpler
|
// TODO: there are sortable traits, but this seems simpler
|
||||||
/// sort the blocks in descending height
|
/// sort the blocks in descending height
|
||||||
@ -145,32 +57,38 @@ impl<'a> BlockMetadata<'a> {
|
|||||||
|
|
||||||
impl Web3Connections {
|
impl Web3Connections {
|
||||||
/// adds a block to our map of the blockchain
|
/// adds a block to our map of the blockchain
|
||||||
pub fn add_block_to_chain(&self, block: Arc<Block<TxHash>>) -> anyhow::Result<()> {
|
pub fn add_block_to_chain(&self, block: &Arc<Block<TxHash>>) -> anyhow::Result<()> {
|
||||||
let hash = block.hash.ok_or_else(|| anyhow::anyhow!("no block hash"))?;
|
let hash = block.hash.ok_or_else(|| anyhow::anyhow!("no block hash"))?;
|
||||||
|
|
||||||
if self.blockchain_map.read().contains_node(hash) {
|
if self.block_map.contains_key(&hash) {
|
||||||
// this block is already included
|
// this block is already included. no need to continue
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
// theres a small race having the read and then the write
|
let mut blockchain = self.blockchain_graphmap.write();
|
||||||
let mut blockchain = self.blockchain_map.write();
|
|
||||||
|
// TODO: theres a small race between contains_key and insert
|
||||||
|
if let Some(overwritten) = self.block_map.insert(hash, block.clone()) {
|
||||||
|
// there was a race and another thread wrote this block
|
||||||
|
// no need to continue because that other thread would have written (or soon will) write the
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
if blockchain.contains_node(hash) {
|
if blockchain.contains_node(hash) {
|
||||||
// this hash is already included. we must have hit that race condition
|
// this hash is already included. we must have hit another race condition
|
||||||
// return now since this work was already done.
|
// return now since this work was already done.
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: prettier log? or probably move the log somewhere else
|
// TODO: prettier log? or probably move the log somewhere else
|
||||||
debug!(%hash, "new block");
|
trace!(%hash, "new block");
|
||||||
|
|
||||||
// TODO: prune block_map to only keep a configurable (256 on ETH?) number of blocks?
|
// TODO: prune block_map to only keep a configurable (256 on ETH?) number of blocks?
|
||||||
|
|
||||||
blockchain.add_node(hash);
|
blockchain.add_node(hash);
|
||||||
|
|
||||||
// what should edge weight be? and should the nodes be the blocks instead?
|
// what should edge weight be? and should the nodes be the blocks instead?
|
||||||
// maybe the weight should be the height
|
// TODO: maybe the weight should be the block?
|
||||||
// we store parent_hash -> hash because the block already stores the parent_hash
|
// we store parent_hash -> hash because the block already stores the parent_hash
|
||||||
blockchain.add_edge(block.parent_hash, hash, 0);
|
blockchain.add_edge(block.parent_hash, hash, 0);
|
||||||
|
|
||||||
@ -209,7 +127,7 @@ impl Web3Connections {
|
|||||||
|
|
||||||
let block = Arc::new(block);
|
let block = Arc::new(block);
|
||||||
|
|
||||||
self.add_block_to_chain(block.clone())?;
|
self.add_block_to_chain(&block)?;
|
||||||
|
|
||||||
Ok(block)
|
Ok(block)
|
||||||
}
|
}
|
||||||
@ -280,7 +198,7 @@ impl Web3Connections {
|
|||||||
let mut connection_heads = HashMap::new();
|
let mut connection_heads = HashMap::new();
|
||||||
|
|
||||||
while let Ok((new_block, rpc)) = block_receiver.recv_async().await {
|
while let Ok((new_block, rpc)) = block_receiver.recv_async().await {
|
||||||
self.recv_block_from_rpc(
|
self.process_block_from_rpc(
|
||||||
&mut connection_heads,
|
&mut connection_heads,
|
||||||
new_block,
|
new_block,
|
||||||
rpc,
|
rpc,
|
||||||
@ -298,8 +216,8 @@ impl Web3Connections {
|
|||||||
|
|
||||||
/// `connection_heads` is a mapping of rpc_names to head block hashes.
|
/// `connection_heads` is a mapping of rpc_names to head block hashes.
|
||||||
/// self.blockchain_map is a mapping of hashes to the complete Block<TxHash>.
|
/// self.blockchain_map is a mapping of hashes to the complete Block<TxHash>.
|
||||||
///
|
/// TODO: return something?
|
||||||
async fn recv_block_from_rpc(
|
async fn process_block_from_rpc(
|
||||||
&self,
|
&self,
|
||||||
connection_heads: &mut HashMap<String, H256>,
|
connection_heads: &mut HashMap<String, H256>,
|
||||||
new_block: Arc<Block<TxHash>>,
|
new_block: Arc<Block<TxHash>>,
|
||||||
@ -317,7 +235,7 @@ impl Web3Connections {
|
|||||||
} else {
|
} else {
|
||||||
connection_heads.insert(rpc.name.clone(), hash);
|
connection_heads.insert(rpc.name.clone(), hash);
|
||||||
|
|
||||||
self.add_block_to_chain(new_block.clone())?;
|
self.add_block_to_chain(&new_block)?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
_ => {
|
_ => {
|
||||||
@ -329,166 +247,172 @@ impl Web3Connections {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut chain_and_rpcs = BlockchainAndRpcs::default();
|
// iterate the rpc_map to find the highest_work_block
|
||||||
|
let mut checked_heads = HashSet::new();
|
||||||
|
let mut highest_work_block: Option<Ref<H256, Arc<Block<TxHash>>>> = None;
|
||||||
|
|
||||||
// TODO: default_min_soft_limit? without, we start serving traffic at the start too quickly
|
for (_rpc_name, rpc_head_hash) in connection_heads.iter() {
|
||||||
// let min_soft_limit = total_soft_limit / 2;
|
if checked_heads.contains(rpc_head_hash) {
|
||||||
let min_soft_limit = 1;
|
continue;
|
||||||
|
|
||||||
let num_possible_heads = chain_and_rpcs.rpcs_by_hash.len();
|
|
||||||
|
|
||||||
// trace!(?rpcs_by_hash);
|
|
||||||
|
|
||||||
let total_rpcs = self.conns.len();
|
|
||||||
|
|
||||||
/*
|
|
||||||
// TODO: this needs tests
|
|
||||||
if let Some(x) = rpcs_by_hash
|
|
||||||
.into_iter()
|
|
||||||
.filter_map(|(hash, conns)| {
|
|
||||||
// TODO: move this to `State::new` function on
|
|
||||||
let sum_soft_limit = conns
|
|
||||||
.iter()
|
|
||||||
.map(|rpc_url| {
|
|
||||||
if let Some(rpc) = self.conns.get(*rpc_url) {
|
|
||||||
rpc.soft_limit
|
|
||||||
} else {
|
|
||||||
0
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.sum();
|
|
||||||
|
|
||||||
if sum_soft_limit < min_soft_limit {
|
|
||||||
trace!(?sum_soft_limit, ?min_soft_limit, "sum_soft_limit too low");
|
|
||||||
None
|
|
||||||
} else {
|
|
||||||
let block = blocks_by_hash.get(&hash).unwrap();
|
|
||||||
|
|
||||||
Some(BlockMetadata {
|
|
||||||
block,
|
|
||||||
sum_soft_limit,
|
|
||||||
conns,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
})
|
|
||||||
// sort b to a for descending order. sort a to b for ascending order? maybe not "max_by" is smart
|
|
||||||
.max_by(|a, b| a.sortable_values().cmp(&b.sortable_values()))
|
|
||||||
{
|
|
||||||
let best_head_num = x.block.number.unwrap();
|
|
||||||
let best_head_hash = x.block.hash.unwrap();
|
|
||||||
let best_rpcs = x.conns;
|
|
||||||
|
|
||||||
let synced_rpcs = rpcs_by_num.remove(&best_head_num).unwrap();
|
|
||||||
|
|
||||||
if best_rpcs.len() == synced_rpcs.len() {
|
|
||||||
trace!(
|
|
||||||
"{}/{}/{}/{} rpcs have {}",
|
|
||||||
best_rpcs.len(),
|
|
||||||
synced_rpcs.len(),
|
|
||||||
connection_heads.len(),
|
|
||||||
total_rpcs,
|
|
||||||
best_head_hash
|
|
||||||
);
|
|
||||||
} else {
|
|
||||||
// TODO: this isn't necessarily a fork. this might just be an rpc being slow
|
|
||||||
// TODO: log all the heads?
|
|
||||||
warn!(
|
|
||||||
"chain is forked! {} possible heads. {}/{}/{}/{} rpcs have {}",
|
|
||||||
num_possible_heads,
|
|
||||||
best_rpcs.len(),
|
|
||||||
synced_rpcs.len(),
|
|
||||||
connection_heads.len(),
|
|
||||||
total_rpcs,
|
|
||||||
best_head_hash
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let num_best_rpcs = best_rpcs.len();
|
checked_heads.insert(rpc_head_hash);
|
||||||
|
|
||||||
// TODOL: do this without clone?
|
let rpc_head_block = self.block_map.get(rpc_head_hash).unwrap();
|
||||||
let conns = best_rpcs
|
|
||||||
.into_iter()
|
|
||||||
.map(|x| self.conns.get(x).unwrap().clone())
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
let pending_synced_connections = SyncedConnections {
|
if highest_work_block.is_none()
|
||||||
head_block_num: best_head_num,
|
|| rpc_head_block.total_difficulty
|
||||||
head_block_hash: best_head_hash,
|
> highest_work_block.as_ref().unwrap().total_difficulty
|
||||||
conns,
|
{
|
||||||
};
|
highest_work_block = Some(rpc_head_block);
|
||||||
|
|
||||||
let current_head_block = self.head_block_hash();
|
|
||||||
let new_head_block = pending_synced_connections.head_block_hash != current_head_block;
|
|
||||||
|
|
||||||
if new_head_block {
|
|
||||||
self.add_block(new_block.clone(), true);
|
|
||||||
|
|
||||||
debug!(
|
|
||||||
"{}/{} rpcs at {} ({}). head at {:?}",
|
|
||||||
pending_synced_connections.conns.len(),
|
|
||||||
self.conns.len(),
|
|
||||||
pending_synced_connections.head_block_num,
|
|
||||||
pending_synced_connections.head_block_hash,
|
|
||||||
pending_synced_connections
|
|
||||||
.conns
|
|
||||||
.iter()
|
|
||||||
.map(|x| format!("{}", x))
|
|
||||||
.collect::<Vec<_>>(),
|
|
||||||
);
|
|
||||||
// TODO: what if the hashes don't match?
|
|
||||||
if Some(pending_synced_connections.head_block_hash) == new_block.hash {
|
|
||||||
// mark all transactions in the block as confirmed
|
|
||||||
if pending_tx_sender.is_some() {
|
|
||||||
for tx_hash in &new_block.transactions {
|
|
||||||
// TODO: should we mark as confirmed via pending_tx_sender?
|
|
||||||
// TODO: possible deadlock here!
|
|
||||||
// trace!("removing {}...", tx_hash);
|
|
||||||
let _ = self.pending_transactions.remove(tx_hash);
|
|
||||||
// trace!("removed {}", tx_hash);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// TODO: mark any orphaned transactions as unconfirmed
|
|
||||||
}
|
|
||||||
} else if num_best_rpcs == self.conns.len() {
|
|
||||||
trace!(
|
|
||||||
"all {} rpcs at {} ({})",
|
|
||||||
num_best_rpcs,
|
|
||||||
pending_synced_connections.head_block_num,
|
|
||||||
pending_synced_connections.head_block_hash,
|
|
||||||
);
|
|
||||||
} else {
|
|
||||||
trace!(
|
|
||||||
?pending_synced_connections,
|
|
||||||
"{}/{} rpcs at {} ({})",
|
|
||||||
num_best_rpcs,
|
|
||||||
self.conns.len(),
|
|
||||||
pending_synced_connections.head_block_num,
|
|
||||||
pending_synced_connections.head_block_hash,
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: do this before or after processing all the transactions in this block?
|
|
||||||
// TODO: only swap if there is a change?
|
|
||||||
trace!(?pending_synced_connections, "swapping");
|
|
||||||
self.synced_connections
|
|
||||||
.swap(Arc::new(pending_synced_connections));
|
|
||||||
|
|
||||||
if new_head_block {
|
|
||||||
// TODO: is new_head_block accurate?
|
|
||||||
// TODO: move this onto self.chain?
|
|
||||||
head_block_sender
|
|
||||||
.send(new_block.clone())
|
|
||||||
.context("head_block_sender")?;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// TODO: is this expected when we first start?
|
|
||||||
// TODO: make sure self.synced_connections is empty
|
|
||||||
// TODO: return an error
|
|
||||||
warn!("not enough rpcs in sync");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
*/
|
// clone to release the read lock
|
||||||
|
let highest_work_block = highest_work_block.map(|x| x.clone());
|
||||||
|
|
||||||
|
// TODO: default min_soft_limit? without, we start serving traffic at the start too quickly
|
||||||
|
// let min_sum_soft_limit = total_soft_limit / 2;
|
||||||
|
// TODO: this should be configurable
|
||||||
|
let min_sum_soft_limit = 1;
|
||||||
|
|
||||||
|
let mut highest_work_block = match highest_work_block {
|
||||||
|
None => todo!("no servers are in sync"),
|
||||||
|
Some(highest_work_block) => highest_work_block,
|
||||||
|
};
|
||||||
|
|
||||||
|
// track names so we don't check the same node multiple times
|
||||||
|
let mut consensus_names: HashSet<&String> = HashSet::new();
|
||||||
|
// track rpcs so we can build a new SyncedConnections
|
||||||
|
let mut consensus_rpcs: Vec<&Arc<Web3Connection>> = vec![];
|
||||||
|
// a running total of the soft limits covered by the rpcs
|
||||||
|
let mut consensus_soft_limit = 0;
|
||||||
|
|
||||||
|
// check the highest work block and its parents for a set of rpcs that can serve our request load
|
||||||
|
// TODO: loop for how many parent blocks? we don't want to serve blocks that are too far behind
|
||||||
|
let blockchain_guard = self.blockchain_graphmap.read();
|
||||||
|
for _ in 0..3 {
|
||||||
|
let highest_work_hash = highest_work_block.hash.as_ref().unwrap();
|
||||||
|
|
||||||
|
for (rpc_name, rpc_head_hash) in connection_heads.iter() {
|
||||||
|
if consensus_names.contains(rpc_name) {
|
||||||
|
// this block is already included
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: does all_simple_paths make this check?
|
||||||
|
if rpc_head_hash == highest_work_hash {
|
||||||
|
if let Some(rpc) = self.conns.get(rpc_name) {
|
||||||
|
consensus_names.insert(rpc_name);
|
||||||
|
consensus_rpcs.push(rpc);
|
||||||
|
consensus_soft_limit += rpc.soft_limit;
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: cache all_simple_paths. there should be a high hit rate
|
||||||
|
// TODO: use an algo that saves scratch space?
|
||||||
|
// TODO: how slow is this?
|
||||||
|
let is_connected = all_simple_paths::<Vec<H256>, _>(
|
||||||
|
&*blockchain_guard,
|
||||||
|
*highest_work_hash,
|
||||||
|
*rpc_head_hash,
|
||||||
|
0,
|
||||||
|
// TODO: what should this max be? probably configurable per chain
|
||||||
|
Some(10),
|
||||||
|
)
|
||||||
|
.next()
|
||||||
|
.is_some();
|
||||||
|
|
||||||
|
if is_connected {
|
||||||
|
if let Some(rpc) = self.conns.get(rpc_name) {
|
||||||
|
consensus_rpcs.push(rpc);
|
||||||
|
consensus_soft_limit += rpc.soft_limit;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if consensus_soft_limit >= min_sum_soft_limit {
|
||||||
|
// success! this block has enough nodes on it
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
// else, we need to try the parent block
|
||||||
|
|
||||||
|
trace!(%consensus_soft_limit, ?highest_work_hash, "avoiding thundering herd");
|
||||||
|
|
||||||
|
// // TODO: this automatically queries for parents, but need to rearrange lifetimes to make an await work here
|
||||||
|
// highest_work_block = self
|
||||||
|
// .block(&highest_work_block.parent_hash, Some(&rpc))
|
||||||
|
// .await?;
|
||||||
|
// we give known stale data just because we don't have enough capacity to serve the latest.
|
||||||
|
// TODO: maybe we should delay serving requests if this happens.
|
||||||
|
// TODO: don't unwrap. break if none?
|
||||||
|
match self.block_map.get(&highest_work_block.parent_hash) {
|
||||||
|
None => {
|
||||||
|
warn!(
|
||||||
|
"ran out of parents to check. soft limit only {}/{}: {}%",
|
||||||
|
consensus_soft_limit,
|
||||||
|
min_sum_soft_limit,
|
||||||
|
consensus_soft_limit * 100 / min_sum_soft_limit
|
||||||
|
);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Some(parent_block) => {
|
||||||
|
highest_work_block = parent_block.clone();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// unlock self.blockchain_graphmap
|
||||||
|
drop(blockchain_guard);
|
||||||
|
|
||||||
|
let soft_limit_met = consensus_soft_limit >= min_sum_soft_limit;
|
||||||
|
let num_synced_rpcs = consensus_rpcs.len();
|
||||||
|
// TODO: put this in config
|
||||||
|
let min_synced_rpcs = 2;
|
||||||
|
|
||||||
|
let new_synced_connections = if soft_limit_met {
|
||||||
|
// we have a consensus large enough to serve traffic
|
||||||
|
let head_block_hash = highest_work_block.hash.unwrap();
|
||||||
|
let head_block_num = highest_work_block.number.unwrap();
|
||||||
|
|
||||||
|
if num_synced_rpcs < min_synced_rpcs {
|
||||||
|
trace!(hash=%head_block_hash, num=?head_block_num, "not enough rpcs are synced to advance");
|
||||||
|
|
||||||
|
return Ok(());
|
||||||
|
} else {
|
||||||
|
// TODO: wait until at least most of the rpcs have given their initial block?
|
||||||
|
// otherwise, if there is a syncing node that is fast, our first head block might not be good
|
||||||
|
// TODO: have a configurable "minimum rpcs" number that we can set
|
||||||
|
|
||||||
|
// TODO: this logs too much. only log when the hash is first updated?
|
||||||
|
debug!(hash=%head_block_hash, num=%head_block_num, rpcs=%num_synced_rpcs, limit=%consensus_soft_limit, "consensus head");
|
||||||
|
|
||||||
|
// TODO: sort by weight and soft limit? do we need an IndexSet, or is a Vec fine?
|
||||||
|
let conns = consensus_rpcs.into_iter().cloned().collect();
|
||||||
|
|
||||||
|
SyncedConnections {
|
||||||
|
head_block_num,
|
||||||
|
head_block_hash,
|
||||||
|
conns,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// failure even after checking parent heads!
|
||||||
|
// not enough servers are in sync to server traffic
|
||||||
|
// TODO: at startup this is fine, but later its a problem
|
||||||
|
warn!("empty SyncedConnections");
|
||||||
|
|
||||||
|
SyncedConnections::default()
|
||||||
|
};
|
||||||
|
|
||||||
|
let old_synced_connections = Arc::new(new_synced_connections);
|
||||||
|
|
||||||
|
if soft_limit_met && Some(old_synced_connections.head_block_hash) != highest_work_block.hash
|
||||||
|
{
|
||||||
|
// the head hash changed. forward to any subscribers
|
||||||
|
head_block_sender.send(highest_work_block)?;
|
||||||
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
|
///! Rate-limited communication with a web3 provider.
|
||||||
use super::blockchain::BlockId;
|
use super::blockchain::BlockId;
|
||||||
use super::connections::BlockMap;
|
use super::connections::BlockMap;
|
||||||
///! Rate-limited communication with a web3 provider.
|
|
||||||
use super::provider::Web3Provider;
|
use super::provider::Web3Provider;
|
||||||
use super::request::OpenRequestHandle;
|
use super::request::OpenRequestHandle;
|
||||||
use super::request::OpenRequestResult;
|
use super::request::OpenRequestResult;
|
||||||
@ -8,7 +8,6 @@ use crate::app::{flatten_handle, AnyhowJoinHandle};
|
|||||||
use crate::config::BlockAndRpc;
|
use crate::config::BlockAndRpc;
|
||||||
use anyhow::Context;
|
use anyhow::Context;
|
||||||
use dashmap::mapref::entry::Entry;
|
use dashmap::mapref::entry::Entry;
|
||||||
use dashmap::DashMap;
|
|
||||||
use ethers::prelude::{Block, Bytes, Middleware, ProviderError, TxHash, H256, U64};
|
use ethers::prelude::{Block, Bytes, Middleware, ProviderError, TxHash, H256, U64};
|
||||||
use futures::future::try_join_all;
|
use futures::future::try_join_all;
|
||||||
use futures::StreamExt;
|
use futures::StreamExt;
|
||||||
@ -23,7 +22,6 @@ use std::{cmp::Ordering, sync::Arc};
|
|||||||
use tokio::sync::broadcast;
|
use tokio::sync::broadcast;
|
||||||
use tokio::sync::RwLock as AsyncRwLock;
|
use tokio::sync::RwLock as AsyncRwLock;
|
||||||
use tokio::time::{interval, sleep, sleep_until, Duration, MissedTickBehavior};
|
use tokio::time::{interval, sleep, sleep_until, Duration, MissedTickBehavior};
|
||||||
use tracing::debug;
|
|
||||||
use tracing::{error, info, instrument, trace, warn};
|
use tracing::{error, info, instrument, trace, warn};
|
||||||
|
|
||||||
/// An active connection to a Web3Rpc
|
/// An active connection to a Web3Rpc
|
||||||
|
@ -32,15 +32,11 @@ use tracing::{error, info, instrument, trace, warn};
|
|||||||
|
|
||||||
pub type BlockMap = Arc<DashMap<H256, Arc<Block<TxHash>>>>;
|
pub type BlockMap = Arc<DashMap<H256, Arc<Block<TxHash>>>>;
|
||||||
|
|
||||||
pub struct BlockchainAndHeads {
|
|
||||||
pub(super) graph: DiGraphMap<H256, Arc<Block<TxHash>>>,
|
|
||||||
pub(super) heads: HashMap<String, H256>,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// A collection of web3 connections. Sends requests either the current best server or all servers.
|
/// A collection of web3 connections. Sends requests either the current best server or all servers.
|
||||||
#[derive(From)]
|
#[derive(From)]
|
||||||
pub struct Web3Connections {
|
pub struct Web3Connections {
|
||||||
pub(super) conns: HashMap<String, Arc<Web3Connection>>,
|
pub(super) conns: HashMap<String, Arc<Web3Connection>>,
|
||||||
|
/// any requests will be forwarded to one (or more) of these connections
|
||||||
pub(super) synced_connections: ArcSwap<SyncedConnections>,
|
pub(super) synced_connections: ArcSwap<SyncedConnections>,
|
||||||
pub(super) pending_transactions: Arc<DashMap<TxHash, TxStatus>>,
|
pub(super) pending_transactions: Arc<DashMap<TxHash, TxStatus>>,
|
||||||
/// TODO: this map is going to grow forever unless we do some sort of pruning. maybe store pruned in redis?
|
/// TODO: this map is going to grow forever unless we do some sort of pruning. maybe store pruned in redis?
|
||||||
@ -48,7 +44,7 @@ pub struct Web3Connections {
|
|||||||
pub(super) block_map: BlockMap,
|
pub(super) block_map: BlockMap,
|
||||||
/// TODO: this map is going to grow forever unless we do some sort of pruning. maybe store pruned in redis?
|
/// TODO: this map is going to grow forever unless we do some sort of pruning. maybe store pruned in redis?
|
||||||
/// TODO: what should we use for edges?
|
/// TODO: what should we use for edges?
|
||||||
pub(super) blockchain_map: RwLock<DiGraphMap<H256, u32>>,
|
pub(super) blockchain_graphmap: RwLock<DiGraphMap<H256, u32>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Web3Connections {
|
impl Web3Connections {
|
||||||
@ -172,7 +168,7 @@ impl Web3Connections {
|
|||||||
synced_connections: ArcSwap::new(Arc::new(synced_connections)),
|
synced_connections: ArcSwap::new(Arc::new(synced_connections)),
|
||||||
pending_transactions,
|
pending_transactions,
|
||||||
block_map: Default::default(),
|
block_map: Default::default(),
|
||||||
blockchain_map: Default::default(),
|
blockchain_graphmap: Default::default(),
|
||||||
});
|
});
|
||||||
|
|
||||||
let handle = {
|
let handle = {
|
||||||
|
@ -10,6 +10,7 @@ use std::sync::Arc;
|
|||||||
/// Serialize is so we can print it on our debug endpoint
|
/// Serialize is so we can print it on our debug endpoint
|
||||||
#[derive(Clone, Default, Serialize)]
|
#[derive(Clone, Default, Serialize)]
|
||||||
pub struct SyncedConnections {
|
pub struct SyncedConnections {
|
||||||
|
// TODO: store Arc<Block<TxHash>> instead?
|
||||||
pub(super) head_block_num: U64,
|
pub(super) head_block_num: U64,
|
||||||
pub(super) head_block_hash: H256,
|
pub(super) head_block_hash: H256,
|
||||||
// TODO: this should be able to serialize, but it isn't
|
// TODO: this should be able to serialize, but it isn't
|
||||||
@ -48,12 +49,7 @@ impl Web3Connections {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn synced(&self) -> bool {
|
pub fn synced(&self) -> bool {
|
||||||
// TODO: require a minimum number of synced rpcs
|
!self.synced_connections.load().conns.is_empty()
|
||||||
// TODO: move this whole function to SyncedConnections
|
|
||||||
if self.synced_connections.load().conns.is_empty() {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
self.head_block_num() > U64::zero()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn num_synced_rpcs(&self) -> usize {
|
pub fn num_synced_rpcs(&self) -> usize {
|
||||||
|
Loading…
Reference in New Issue
Block a user