This commit is contained in:
Bryan Stitt 2022-08-26 17:26:17 +00:00
parent 101104ac5d
commit 81254a24be
8 changed files with 371 additions and 229 deletions

22
TODO.md

@ -90,6 +90,28 @@
- whenever blocks were slow, we started checking as fast as possible
- [x] create user script should allow setting requests per minute
- [x] cache api keys that are not in the database
- [ ] improve consensus block selection. Our goal is to find the highest work chain with a block over a minimum threshold of sum_soft_limit.
- [x] A new block arrives at a connection.
- [x] It checks that it isn't the same that it already has (which is a problem with polling nodes)
- [x] If its new to this node...
- [x] if the block does not have total work, check our cache. otherwise, query the node
- [x] save the block num and hash so that http polling doesn't send duplicates
- [x] send the deduped block through a channel to be handled by the connections grouping.
- [ ] The connections group...
- [x] input = rpc, new_block
- [ ] adds the block and rpc to it's internal BlockchainMap (this persists).
- [x] connection_heads: HashMap<rpc_name, blockhash>
- [x] block_map: DashMap<blockhash, Arc<Block>>
- [x] blockchain: DiGraphMap<blockhash, ?>
- [ ] iterate the rpc_map to find the highest_work_block
- [ ] oldest_block_num = highest_work_block.number - 256
- think more about this. if we have to go back more than a couple blocks, we will serve very stale data
- [ ] while sum_soft_limit < min_sum_soft_limit:
- [ ] consensus_head_hash = block.parent_hash
- [ ] sum_soft_limit = ??? (something with iterating rpc_map, caches, and petgraph's all_simple_paths)
- if all_simple_paths returns no paths, warn about a chain split?
- [ ] error if this is too old? sucks to have downtime, but its the chain thats having problems
- [ ] now that we have a consensus head with enough soft limit, update SyncedConnections
- [-] use siwe messages and signatures for sign up and login
- [-] requests for "Get transactions receipts" are routed to the private_rpcs and not the balanced_rpcs. do this better.
- [x] quick fix, send to balanced_rpcs for now. we will just live with errors on new transactions.

@ -6,7 +6,7 @@ use crate::jsonrpc::JsonRpcForwardedResponse;
use crate::jsonrpc::JsonRpcForwardedResponseEnum;
use crate::jsonrpc::JsonRpcRequest;
use crate::jsonrpc::JsonRpcRequestEnum;
use crate::rpcs::connections::Web3Connections;
use crate::rpcs::connections::{BlockMap, Web3Connections};
use crate::rpcs::transactions::TxStatus;
use crate::stats::AppStats;
use anyhow::Context;
@ -245,11 +245,15 @@ impl Web3ProxyApp {
// TODO: once a transaction is "Confirmed" we remove it from the map. this should prevent major memory leaks.
// TODO: we should still have some sort of expiration or maximum size limit for the map
// this block map is shared between balanced_rpcs and private_rpcs.
let block_map = BlockMap::default();
let (balanced_rpcs, balanced_handle) = Web3Connections::spawn(
top_config.app.chain_id,
balanced_rpcs,
http_client.clone(),
redis_pool.clone(),
block_map.clone(),
Some(head_block_sender),
Some(pending_tx_sender.clone()),
pending_transactions.clone(),
@ -269,6 +273,7 @@ impl Web3ProxyApp {
private_rpcs,
http_client.clone(),
redis_pool.clone(),
block_map,
// subscribing to new heads here won't work well. if they are fast, they might be ahead of balanced_rpcs
None,
// TODO: subscribe to pending transactions on the private rpcs? they seem to have low rate limits

@ -1,3 +1,6 @@
use crate::app::AnyhowJoinHandle;
use crate::rpcs::connection::Web3Connection;
use crate::rpcs::connections::BlockMap;
use argh::FromArgs;
use derive_more::Constructor;
use ethers::prelude::{Block, TxHash};
@ -6,9 +9,6 @@ use serde::Deserialize;
use std::sync::Arc;
use tokio::sync::broadcast;
use crate::app::AnyhowJoinHandle;
use crate::rpcs::connection::Web3Connection;
pub type BlockAndRpc = (Arc<Block<TxHash>>, Arc<Web3Connection>);
#[derive(Debug, FromArgs)]
@ -85,6 +85,7 @@ impl Web3ConnectionConfig {
chain_id: u64,
http_client: Option<reqwest::Client>,
http_interval_sender: Option<Arc<broadcast::Sender<()>>>,
block_map: BlockMap,
block_sender: Option<flume::Sender<BlockAndRpc>>,
tx_id_sender: Option<flume::Sender<(TxHash, Arc<Web3Connection>)>>,
) -> anyhow::Result<(Arc<Web3Connection>, AnyhowJoinHandle<()>)> {
@ -107,6 +108,7 @@ impl Web3ConnectionConfig {
http_interval_sender,
hard_limit,
self.soft_limit,
block_map,
block_sender,
tx_id_sender,
true,

@ -70,16 +70,14 @@ pub async fn serve(port: u16, proxy_app: Arc<Web3ProxyApp>) -> anyhow::Result<()
// TODO: allow only listening on localhost? top_config.app.host.parse()?
let addr = SocketAddr::from(([0, 0, 0, 0], port));
info!("listening on port {}", port);
// TODO: into_make_service is enough if we always run behind a proxy. make into_make_service_with_connect_info optional?
// TODO: into_make_service is enough if we always run behind a proxy. make into_make_service_with_connect_info optional?
/*
It sequentially looks for an IP in:
- x-forwarded-for header (de-facto standard)
- x-real-ip header
- forwarded header (new standard)
- axum::extract::ConnectInfo (if not behind proxy)
So we probably won't need into_make_service_with_connect_info, but it shouldn't hurt
*/
let service = app.into_make_service_with_connect_info::<SocketAddr>();
// let service = app.into_make_service();

@ -1,21 +1,119 @@
///! Keep track of the blockchain as seen by a Web3Connections.
use super::connection::Web3Connection;
use super::connections::Web3Connections;
use super::synced_connections::SyncedConnections;
use super::transactions::TxStatus;
use crate::jsonrpc::JsonRpcRequest;
use anyhow::Context;
use crate::{config::BlockAndRpc, jsonrpc::JsonRpcRequest};
use derive_more::From;
use ethers::prelude::{Block, TxHash, H256, U256, U64};
use indexmap::IndexMap;
use hashbrown::HashMap;
use petgraph::prelude::DiGraphMap;
use serde_json::json;
use std::sync::Arc;
use tokio::sync::{broadcast, watch};
use tracing::{debug, trace, warn};
use tracing::{debug, warn};
#[derive(Default, From)]
pub struct BlockId {
pub(super) hash: H256,
pub(super) num: U64,
}
/// TODO: do we need this? probably big refactor still to do
pub(super) struct BlockMetadata<'a> {
pub(super) block: &'a Arc<Block<TxHash>>,
pub(super) rpc_names: Vec<&'a str>,
pub(super) sum_soft_limit: u32,
pub(super) conns: Vec<&'a str>,
}
/// TODO: do we need this? probably big refactor still to do
/// The RPCs grouped by number and hash.
#[derive(Default)]
struct BlockchainAndRpcs<'a> {
// TODO: fifomap? or just manually remove once we add too much
rpcs_by_num: HashMap<U64, Vec<&'a str>>,
rpcs_by_hash: HashMap<H256, Vec<&'a str>>,
blocks_by_hash: HashMap<H256, Arc<Block<TxHash>>>,
/// Node is the blockhash.
/// You can get the blocks from block_map on the Web3Connections
/// TODO: what should the edge weight be? difficulty?
blockchain: DiGraphMap<H256, u8>,
total_soft_limit: u32,
}
impl<'a> BlockchainAndRpcs<'a> {
/// group the RPCs by their current head block
pub async fn new(
// TODO: think more about this key. maybe it should be an Arc?
connection_heads: &'a HashMap<String, Arc<Block<TxHash>>>,
web3_conns: &Web3Connections,
) -> Option<BlockchainAndRpcs<'a>> {
let mut new = Self::default();
let lowest_block_num = if let Some(lowest_block) = connection_heads
.values()
.min_by(|a, b| a.number.cmp(&b.number))
{
lowest_block
.number
.expect("all blocks here should have a number")
} else {
// if no lowest block number, then no servers are in sync
return None;
};
// TODO: what if lowest_block_num is far from the highest head block num?
for (rpc_name, head_block) in connection_heads.iter() {
if let Some(rpc) = web3_conns.get(rpc_name) {
// we need the total soft limit in order to know when its safe to update the backends
new.total_soft_limit += rpc.soft_limit;
let head_hash = head_block.hash.unwrap();
// save the block
new.blocks_by_hash
.entry(head_hash)
.or_insert_with(|| head_block.clone());
// add the rpc to all relevant block heights
// TODO: i feel like we should be able to do this with a graph
let mut block = head_block.clone();
while block.number.unwrap() >= lowest_block_num {
let block_hash = block.hash.unwrap();
let block_num = block.number.unwrap();
// save the rpc by the head hash
let rpc_urls_by_hash =
new.rpcs_by_hash.entry(block_hash).or_insert_with(Vec::new);
rpc_urls_by_hash.push(rpc_name);
// save the rpc by the head number
let rpc_names_by_num =
new.rpcs_by_num.entry(block_num).or_insert_with(Vec::new);
rpc_names_by_num.push(rpc_name);
if let Ok(parent) = web3_conns
.block(&block.parent_hash, Some(rpc.as_ref()))
.await
{
// save the parent block
new.blocks_by_hash.insert(block.parent_hash, parent.clone());
block = parent
} else {
// log this? eventually we will hit a block we don't have, so it's not an error
break;
}
}
}
}
Some(new)
}
fn consensus_head() {
todo!()
}
}
impl<'a> BlockMetadata<'a> {
@ -46,35 +144,44 @@ impl<'a> BlockMetadata<'a> {
}
impl Web3Connections {
pub fn add_block(&self, block: Arc<Block<TxHash>>, cannonical: bool) {
let hash = block.hash.unwrap();
/// adds a block to our map of the blockchain
pub fn add_block_to_chain(&self, block: Arc<Block<TxHash>>) -> anyhow::Result<()> {
let hash = block.hash.ok_or_else(|| anyhow::anyhow!("no block hash"))?;
if cannonical {
let num = block.number.unwrap();
let entry = self.chain_map.entry(num);
let mut is_new = false;
// TODO: this might be wrong. we might need to update parents, too
entry.or_insert_with(|| {
is_new = true;
block.clone()
});
// TODO: prune chain_map?
if !is_new {
return;
}
if self.blockchain_map.read().contains_node(hash) {
// this block is already included
return Ok(());
}
// TODO: prune block_map?
// theres a small race having the read and then the write
let mut blockchain = self.blockchain_map.write();
self.block_map.entry(hash).or_insert(block);
if blockchain.contains_node(hash) {
// this hash is already included. we must have hit that race condition
// return now since this work was already done.
return Ok(());
}
// TODO: prettier log? or probably move the log somewhere else
debug!(%hash, "new block");
// TODO: prune block_map to only keep a configurable (256 on ETH?) number of blocks?
blockchain.add_node(hash);
// what should edge weight be? and should the nodes be the blocks instead?
// maybe the weight should be the height
// we store parent_hash -> hash because the block already stores the parent_hash
blockchain.add_edge(block.parent_hash, hash, 0);
Ok(())
}
pub async fn block(&self, hash: &H256) -> anyhow::Result<Arc<Block<TxHash>>> {
pub async fn block(
&self,
hash: &H256,
rpc: Option<&Web3Connection>,
) -> anyhow::Result<Arc<Block<TxHash>>> {
// first, try to get the hash from our cache
if let Some(block) = self.block_map.get(hash) {
return Ok(block.clone());
@ -84,11 +191,17 @@ impl Web3Connections {
// TODO: helper for method+params => JsonRpcRequest
// TODO: get block with the transactions?
// TODO: does this id matter?
let request = json!({ "id": "1", "method": "eth_getBlockByHash", "params": (hash, false) });
let request: JsonRpcRequest = serde_json::from_value(request)?;
// TODO: if error, retry?
let response = self.try_send_best_upstream_server(request, None).await?;
let response = match rpc {
Some(rpc) => {
todo!("send request to this rpc")
}
None => self.try_send_best_upstream_server(request, None).await?,
};
let block = response.result.unwrap();
@ -96,7 +209,7 @@ impl Web3Connections {
let block = Arc::new(block);
self.add_block(block.clone(), false);
self.add_block_to_chain(block.clone())?;
Ok(block)
}
@ -112,6 +225,9 @@ impl Web3Connections {
/// Get the heaviest chain's block from cache or backend rpc
pub async fn cannonical_block(&self, num: &U64) -> anyhow::Result<Arc<Block<TxHash>>> {
todo!();
/*
// first, try to get the hash from our cache
if let Some(block) = self.chain_map.get(num) {
return Ok(block.clone());
@ -149,19 +265,19 @@ impl Web3Connections {
self.add_block(block.clone(), true);
Ok(block)
*/
}
// TODO: rename this?
pub(super) async fn update_synced_rpcs(
pub(super) async fn process_incoming_blocks(
&self,
block_receiver: flume::Receiver<(Arc<Block<TxHash>>, Arc<Web3Connection>)>,
block_receiver: flume::Receiver<BlockAndRpc>,
// TODO: head_block_sender should be a broadcast_sender like pending_tx_sender
head_block_sender: watch::Sender<Arc<Block<TxHash>>>,
pending_tx_sender: Option<broadcast::Sender<TxStatus>>,
) -> anyhow::Result<()> {
// TODO: indexmap or hashmap? what hasher? with_capacity?
// TODO: this will grow unbounded. prune old heads automatically
let mut connection_heads = IndexMap::<String, Arc<Block<TxHash>>>::new();
// TODO: this will grow unbounded. prune old heads on this at the same time we prune the graph?
let mut connection_heads = HashMap::new();
while let Ok((new_block, rpc)) = block_receiver.recv_async().await {
self.recv_block_from_rpc(
@ -180,127 +296,52 @@ impl Web3Connections {
Ok(())
}
pub async fn recv_block_from_rpc(
/// `connection_heads` is a mapping of rpc_names to head block hashes.
/// self.blockchain_map is a mapping of hashes to the complete Block<TxHash>.
///
async fn recv_block_from_rpc(
&self,
connection_heads: &mut IndexMap<String, Arc<Block<TxHash>>>,
connection_heads: &mut HashMap<String, H256>,
new_block: Arc<Block<TxHash>>,
rpc: Arc<Web3Connection>,
head_block_sender: &watch::Sender<Arc<Block<TxHash>>>,
pending_tx_sender: &Option<broadcast::Sender<TxStatus>>,
) -> anyhow::Result<()> {
let new_block_hash = if let Some(hash) = new_block.hash {
hash
} else {
// TODO: rpc name instead of url (will do this with config reload revamp)
connection_heads.remove(&rpc.name);
// add the block to connection_heads
match (new_block.hash, new_block.number) {
(Some(hash), Some(num)) => {
if num == U64::zero() {
debug!(%rpc, "still syncing");
// TODO: return here is wrong. synced rpcs needs an update
return Ok(());
};
connection_heads.remove(&rpc.name);
} else {
connection_heads.insert(rpc.name.clone(), hash);
// TODO: dry this with the code above
let new_block_num = if let Some(num) = new_block.number {
num
} else {
// this seems unlikely, but i'm pretty sure we have seen it
// maybe when a node is syncing or reconnecting?
warn!(%rpc, ?new_block, "Block without number!");
// TODO: rpc name instead of url (will do this with config reload revamp)
connection_heads.remove(&rpc.name);
// TODO: return here is wrong. synced rpcs needs an update
return Ok(());
};
// TODO: span with more in it?
// TODO: make sure i'm doing this span right
// TODO: show the actual rpc url?
// TODO: clippy lint to make sure we don't hold this across an awaited future
// TODO: what level?
// let _span = info_span!("block_receiver", %rpc, %new_block_num).entered();
if new_block_num == U64::zero() {
warn!(%rpc, %new_block_num, "still syncing");
connection_heads.remove(&rpc.name);
} else {
connection_heads.insert(rpc.name.clone(), new_block.clone());
self.add_block(new_block.clone(), false);
}
// iterate connection_heads to find the oldest block
let lowest_block_num = if let Some(lowest_block) = connection_heads
.values()
.min_by(|a, b| a.number.cmp(&b.number))
{
lowest_block
.number
.expect("all blocks here should have a number")
} else {
// TODO: return here is wrong. synced rpcs needs an update
return Ok(());
};
// iterate connection_heads to find the consensus block
let mut rpcs_by_num = IndexMap::<U64, Vec<&str>>::new();
let mut blocks_by_hash = IndexMap::<H256, Arc<Block<TxHash>>>::new();
// block_hash => soft_limit, rpcs
// TODO: proper type for this?
let mut rpcs_by_hash = IndexMap::<H256, Vec<&str>>::new();
let mut total_soft_limit = 0;
for (rpc_url, head_block) in connection_heads.iter() {
if let Some(rpc) = self.conns.get(rpc_url) {
// we need the total soft limit in order to know when its safe to update the backends
total_soft_limit += rpc.soft_limit;
let head_hash = head_block.hash.unwrap();
// save the block
blocks_by_hash
.entry(head_hash)
.or_insert_with(|| head_block.clone());
// add the rpc to all relevant block heights
let mut block = head_block.clone();
while block.number.unwrap() >= lowest_block_num {
let block_hash = block.hash.unwrap();
let block_num = block.number.unwrap();
// save the rpcs and the sum of their soft limit by their head hash
let rpc_urls_by_hash = rpcs_by_hash.entry(block_hash).or_insert_with(Vec::new);
rpc_urls_by_hash.push(rpc_url);
// save the rpcs by their number
let rpc_urls_by_num = rpcs_by_num.entry(block_num).or_insert_with(Vec::new);
rpc_urls_by_num.push(rpc_url);
if let Ok(parent) = self.block(&block.parent_hash).await {
// save the parent block
blocks_by_hash.insert(block.parent_hash, parent.clone());
block = parent
} else {
// log this? eventually we will hit a block we don't have, so it's not an error
break;
}
self.add_block_to_chain(new_block.clone())?;
}
}
_ => {
warn!(%rpc, ?new_block, "Block without number or hash!");
connection_heads.remove(&rpc.name);
// don't return yet! self.synced_connections likely needs an update
}
}
let mut chain_and_rpcs = BlockchainAndRpcs::default();
// TODO: default_min_soft_limit? without, we start serving traffic at the start too quickly
// let min_soft_limit = total_soft_limit / 2;
let min_soft_limit = 1;
let num_possible_heads = rpcs_by_hash.len();
trace!(?rpcs_by_hash);
let num_possible_heads = chain_and_rpcs.rpcs_by_hash.len();
// trace!(?rpcs_by_hash);
let total_rpcs = self.conns.len();
/*
// TODO: this needs tests
if let Some(x) = rpcs_by_hash
.into_iter()
@ -395,7 +436,7 @@ impl Web3Connections {
.collect::<Vec<_>>(),
);
// TODO: what if the hashes don't match?
if pending_synced_connections.head_block_hash == new_block_hash {
if Some(pending_synced_connections.head_block_hash) == new_block.hash {
// mark all transactions in the block as confirmed
if pending_tx_sender.is_some() {
for tx_hash in &new_block.transactions {
@ -447,6 +488,7 @@ impl Web3Connections {
warn!("not enough rpcs in sync");
}
*/
Ok(())
}
}

@ -1,3 +1,5 @@
use super::blockchain::BlockId;
use super::connections::BlockMap;
///! Rate-limited communication with a web3 provider.
use super::provider::Web3Provider;
use super::request::OpenRequestHandle;
@ -5,6 +7,8 @@ use super::request::OpenRequestResult;
use crate::app::{flatten_handle, AnyhowJoinHandle};
use crate::config::BlockAndRpc;
use anyhow::Context;
use dashmap::mapref::entry::Entry;
use dashmap::DashMap;
use ethers::prelude::{Block, Bytes, Middleware, ProviderError, TxHash, H256, U64};
use futures::future::try_join_all;
use futures::StreamExt;
@ -19,13 +23,14 @@ use std::{cmp::Ordering, sync::Arc};
use tokio::sync::broadcast;
use tokio::sync::RwLock as AsyncRwLock;
use tokio::time::{interval, sleep, sleep_until, Duration, MissedTickBehavior};
use tracing::debug;
use tracing::{error, info, instrument, trace, warn};
/// An active connection to a Web3Rpc
pub struct Web3Connection {
pub name: String,
/// TODO: can we get this from the provider? do we even need it?
pub url: String,
url: String,
/// keep track of currently open requests. We sort on this
pub(super) active_requests: AtomicU32,
/// provider is in a RwLock so that we can replace it if re-connecting
@ -34,10 +39,12 @@ pub struct Web3Connection {
/// rate limits are stored in a central redis so that multiple proxies can share their rate limits
hard_limit: Option<RedisRateLimit>,
/// used for load balancing to the least loaded server
pub soft_limit: u32,
pub(super) soft_limit: u32,
block_data_limit: AtomicU64,
pub weight: u32,
head_block: RwLock<(H256, U64)>,
/// Lower weight are higher priority when sending requests
pub(super) weight: u32,
// TODO: async lock?
pub(super) head_block: RwLock<BlockId>,
}
impl Web3Connection {
@ -56,6 +63,7 @@ impl Web3Connection {
hard_limit: Option<(u64, RedisPool)>,
// TODO: think more about this type
soft_limit: u32,
block_map: BlockMap,
block_sender: Option<flume::Sender<BlockAndRpc>>,
tx_id_sender: Option<flume::Sender<(TxHash, Arc<Self>)>>,
reconnect: bool,
@ -83,7 +91,7 @@ impl Web3Connection {
hard_limit,
soft_limit,
block_data_limit: Default::default(),
head_block: RwLock::new((H256::zero(), 0isize.into())),
head_block: RwLock::new(Default::default()),
weight,
};
@ -125,7 +133,13 @@ impl Web3Connection {
let new_connection = new_connection.clone();
tokio::spawn(async move {
new_connection
.subscribe(http_interval_sender, block_sender, tx_id_sender, reconnect)
.subscribe(
http_interval_sender,
block_map,
block_sender,
tx_id_sender,
reconnect,
)
.await
})
};
@ -143,7 +157,7 @@ impl Web3Connection {
sleep(Duration::from_millis(250)).await;
for block_data_limit in [u64::MAX, 90_000, 128, 64, 32] {
let mut head_block_num = new_connection.head_block.read().1;
let mut head_block_num = new_connection.head_block.read().num;
// TODO: wait until head block is set outside the loop? if we disconnect while starting we could actually get 0 though
while head_block_num == U64::zero() {
@ -152,7 +166,7 @@ impl Web3Connection {
// TODO: subscribe to a channel instead of polling? subscribe to http_interval_sender?
sleep(Duration::from_secs(1)).await;
head_block_num = new_connection.head_block.read().1;
head_block_num = new_connection.head_block.read().num;
}
// TODO: subtract 1 from block_data_limit for safety?
@ -197,7 +211,7 @@ impl Web3Connection {
pub fn has_block_data(&self, needed_block_num: &U64) -> bool {
let block_data_limit: U64 = self.block_data_limit();
let newest_block_num = self.head_block.read().1;
let newest_block_num = self.head_block.read().num;
let oldest_block_num = newest_block_num
.saturating_sub(block_data_limit)
@ -249,36 +263,59 @@ impl Web3Connection {
}
#[instrument(skip_all)]
async fn send_block_result(
self: Arc<Self>,
block: Result<Block<TxHash>, ProviderError>,
async fn send_head_block_result(
self: &Arc<Self>,
new_head_block: Result<Arc<Block<TxHash>>, ProviderError>,
block_sender: &flume::Sender<BlockAndRpc>,
block_map: BlockMap,
) -> anyhow::Result<()> {
match block {
Ok(block) => {
{
// TODO: is this true? Block::default probably doesn't
let hash = block.hash.expect("blocks here should always have hashes");
let num = block
.number
.expect("blocks here should always have numbers");
match new_head_block {
Ok(new_head_block) => {
// TODO: is unwrap_or_default ok? we might have an empty block
let new_hash = new_head_block.hash.unwrap_or_default();
// if we already have this block saved, we don't need to store this copy
let new_head_block = match block_map.entry(new_hash) {
Entry::Occupied(x) => x.get().clone(),
Entry::Vacant(x) => {
// TODO: remove this once https://github.com/ledgerwatch/erigon/issues/5190 is closed
// TODO: include transactions?
let new_head_block = if new_head_block.total_difficulty.is_none() {
self.wait_for_request_handle()
.await?
.request("eth_getBlockByHash", (new_hash, false))
.await?
} else {
new_head_block
};
x.insert(new_head_block).clone()
}
};
let new_num = new_head_block.number.unwrap_or_default();
// save the block so we don't send the same one multiple times
// also save so that archive checks can know how far back to query
{
let mut head_block = self.head_block.write();
*head_block = (hash, num);
head_block.hash = new_hash;
head_block.num = new_num;
}
block_sender
.send_async((Arc::new(block), self))
.send_async((new_head_block, self.clone()))
.await
.context("block_sender")?;
}
Err(e) => {
warn!("unable to get block from {}: {}", self, e);
// TODO: do something to rpc_chain?
// send an empty block to take this server out of rotation
block_sender
.send_async((Arc::new(Block::default()), self))
.send_async((Arc::new(Block::default()), self.clone()))
.await
.context("block_sender")?;
}
@ -290,6 +327,7 @@ impl Web3Connection {
async fn subscribe(
self: Arc<Self>,
http_interval_sender: Option<Arc<broadcast::Sender<()>>>,
block_map: BlockMap,
block_sender: Option<flume::Sender<BlockAndRpc>>,
tx_id_sender: Option<flume::Sender<(TxHash, Arc<Self>)>>,
reconnect: bool,
@ -300,9 +338,11 @@ impl Web3Connection {
let mut futures = vec![];
if let Some(block_sender) = &block_sender {
let f = self
.clone()
.subscribe_new_heads(http_interval_receiver, block_sender.clone());
let f = self.clone().subscribe_new_heads(
http_interval_receiver,
block_sender.clone(),
block_map.clone(),
);
futures.push(flatten_handle(tokio::spawn(f)));
}
@ -356,8 +396,9 @@ impl Web3Connection {
self: Arc<Self>,
http_interval_receiver: Option<broadcast::Receiver<()>>,
block_sender: flume::Sender<BlockAndRpc>,
block_map: BlockMap,
) -> anyhow::Result<()> {
info!("watching {}", self);
info!(?self, "watching new_heads");
// TODO: is a RwLock of an Option<Arc> the right thing here?
if let Some(provider) = self.provider.read().await.clone() {
@ -371,53 +412,60 @@ impl Web3Connection {
let mut last_hash = H256::zero();
loop {
match self.try_open_request().await {
Ok(OpenRequestResult::Handle(active_request_handle)) => {
// TODO: try, or wait_for?
match self.wait_for_request_handle().await {
Ok(active_request_handle) => {
let block: Result<Block<TxHash>, _> = active_request_handle
.request("eth_getBlockByNumber", ("latest", false))
.await;
if let Ok(block) = block {
// don't send repeat blocks
let new_hash =
block.hash.expect("blocks here should always have hashes");
match block {
Ok(block) => {
// don't send repeat blocks
let new_hash = block
.hash
.expect("blocks here should always have hashes");
if new_hash != last_hash {
last_hash = new_hash;
if new_hash != last_hash {
// new hash!
last_hash = new_hash;
self.clone()
.send_block_result(Ok(block), &block_sender)
self.send_head_block_result(
Ok(Arc::new(block)),
&block_sender,
block_map.clone(),
)
.await?;
}
}
Err(err) => {
// we did not get a block back. something is up with the server. take it out of rotation
self.send_head_block_result(
Err(err),
&block_sender,
block_map.clone(),
)
.await?;
}
} else {
// we did not get a block back. something is up with the server. take it out of rotation
self.clone().send_block_result(block, &block_sender).await?;
}
}
Ok(OpenRequestResult::RetryAt(retry_at)) => {
warn!(?retry_at, "Rate limited on latest block from {}", self);
sleep_until(retry_at).await;
continue;
}
Ok(OpenRequestResult::None) => {
warn!("No handle for latest block from {}", self);
// TODO: what should we do?
}
Err(err) => {
warn!(?err, "Internal error on latest block from {}", self);
// TODO: what should we do? sleep? extra time?
}
}
// wait for the interval
// wait for the next interval
// TODO: if error or rate limit, increase interval?
while let Err(err) = http_interval_receiver.recv().await {
match err {
broadcast::error::RecvError::Closed => {
// channel is closed! that's not good. bubble the error up
return Err(err.into());
}
broadcast::error::RecvError::Lagged(lagged) => {
// querying the block was delayed. this can happen if tokio is very busy.
// querying the block was delayed
// this can happen if tokio is very busy or waiting for requests limits took too long
warn!(?err, ?self, "http interval lagging by {}!", lagged);
}
}
@ -434,18 +482,23 @@ impl Web3Connection {
// query the block once since the subscription doesn't send the current block
// there is a very small race condition here where the stream could send us a new block right now
// all it does is print "new block" for the same block as current block
let block: Result<Block<TxHash>, _> = self
let block: Result<Arc<Block<TxHash>>, _> = self
.wait_for_request_handle()
.await?
.request("eth_getBlockByNumber", ("latest", false))
.await;
.await
.map(Arc::new);
self.clone().send_block_result(block, &block_sender).await?;
self.send_head_block_result(block, &block_sender, block_map.clone())
.await?;
while let Some(new_block) = stream.next().await {
self.clone()
.send_block_result(Ok(new_block), &block_sender)
.await?;
self.send_head_block_result(
Ok(Arc::new(new_block)),
&block_sender,
block_map.clone(),
)
.await?;
}
warn!(?self, "subscription ended");
@ -526,7 +579,7 @@ impl Web3Connection {
// TODO: maximum wait time? i think timeouts in other parts of the code are probably best
loop {
match self.try_open_request().await {
match self.try_request_handle().await {
Ok(OpenRequestResult::Handle(handle)) => return Ok(handle),
Ok(OpenRequestResult::RetryAt(retry_at)) => {
// TODO: emit a stat?
@ -543,7 +596,7 @@ impl Web3Connection {
}
}
pub async fn try_open_request(self: &Arc<Self>) -> anyhow::Result<OpenRequestResult> {
pub async fn try_request_handle(self: &Arc<Self>) -> anyhow::Result<OpenRequestResult> {
// check that we are connected
if !self.has_provider().await {
// TODO: emit a stat?

@ -3,7 +3,7 @@ use super::connection::Web3Connection;
use super::request::{OpenRequestHandle, OpenRequestResult};
use super::synced_connections::SyncedConnections;
use crate::app::{flatten_handle, AnyhowJoinHandle};
use crate::config::Web3ConnectionConfig;
use crate::config::{BlockAndRpc, Web3ConnectionConfig};
use crate::jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest};
use crate::rpcs::transactions::TxStatus;
use arc_swap::ArcSwap;
@ -15,7 +15,8 @@ use futures::future::{join_all, try_join_all};
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use hashbrown::HashMap;
use indexmap::IndexMap;
use parking_lot::RwLock;
use petgraph::graphmap::DiGraphMap;
use serde::ser::{SerializeStruct, Serializer};
use serde::Serialize;
use serde_json::value::RawValue;
@ -29,28 +30,36 @@ use tokio::time::{interval, sleep, sleep_until, MissedTickBehavior};
use tokio::time::{Duration, Instant};
use tracing::{error, info, instrument, trace, warn};
pub type BlockMap = Arc<DashMap<H256, Arc<Block<TxHash>>>>;
pub struct BlockchainAndHeads {
pub(super) graph: DiGraphMap<H256, Arc<Block<TxHash>>>,
pub(super) heads: HashMap<String, H256>,
}
/// A collection of web3 connections. Sends requests either the current best server or all servers.
#[derive(From)]
pub struct Web3Connections {
pub(super) conns: IndexMap<String, Arc<Web3Connection>>,
pub(super) conns: HashMap<String, Arc<Web3Connection>>,
pub(super) synced_connections: ArcSwap<SyncedConnections>,
pub(super) pending_transactions: Arc<DashMap<TxHash, TxStatus>>,
/// only includes blocks on the main chain.
/// TODO: this map is going to grow forever unless we do some sort of pruning. maybe store pruned in redis?
pub(super) chain_map: DashMap<U64, Arc<Block<TxHash>>>,
/// all blocks, including orphans
pub(super) block_map: BlockMap,
/// TODO: this map is going to grow forever unless we do some sort of pruning. maybe store pruned in redis?
pub(super) block_map: DashMap<H256, Arc<Block<TxHash>>>,
// TODO: petgraph? might help with pruning the maps
/// TODO: what should we use for edges?
pub(super) blockchain_map: RwLock<DiGraphMap<H256, u32>>,
}
impl Web3Connections {
/// Spawn durable connections to multiple Web3 providers.
#[allow(clippy::too_many_arguments)]
pub async fn spawn(
chain_id: u64,
server_configs: HashMap<String, Web3ConnectionConfig>,
http_client: Option<reqwest::Client>,
redis_client_pool: Option<redis_rate_limit::RedisPool>,
block_map: BlockMap,
head_block_sender: Option<watch::Sender<Arc<Block<TxHash>>>>,
pending_tx_sender: Option<broadcast::Sender<TxStatus>>,
pending_transactions: Arc<DashMap<TxHash, TxStatus>>,
@ -110,6 +119,7 @@ impl Web3Connections {
};
let pending_tx_id_sender = Some(pending_tx_id_sender.clone());
let block_map = block_map.clone();
tokio::spawn(async move {
server_config
@ -119,6 +129,7 @@ impl Web3Connections {
chain_id,
http_client,
http_interval_sender,
block_map,
block_sender,
pending_tx_id_sender,
)
@ -128,7 +139,7 @@ impl Web3Connections {
.collect();
// map of connection names to their connection
let mut connections = IndexMap::new();
let mut connections = HashMap::new();
let mut handles = vec![];
// TODO: futures unordered?
@ -160,8 +171,8 @@ impl Web3Connections {
conns: connections,
synced_connections: ArcSwap::new(Arc::new(synced_connections)),
pending_transactions,
chain_map: Default::default(),
block_map: Default::default(),
blockchain_map: Default::default(),
});
let handle = {
@ -183,13 +194,17 @@ impl Web3Connections {
Ok((connections, handle))
}
pub fn get(&self, conn_name: &str) -> Option<&Arc<Web3Connection>> {
self.conns.get(conn_name)
}
/// subscribe to blocks and transactions from all the backend rpcs.
/// blocks are processed by all the `Web3Connection`s and then sent to the `block_receiver`
/// transaction ids from all the `Web3Connection`s are deduplicated and forwarded to `pending_tx_sender`
async fn subscribe(
self: Arc<Self>,
pending_tx_id_receiver: flume::Receiver<(TxHash, Arc<Web3Connection>)>,
block_receiver: flume::Receiver<(Arc<Block<TxHash>>, Arc<Web3Connection>)>,
block_receiver: flume::Receiver<BlockAndRpc>,
head_block_sender: Option<watch::Sender<Arc<Block<TxHash>>>>,
pending_tx_sender: Option<broadcast::Sender<TxStatus>>,
) -> anyhow::Result<()> {
@ -200,11 +215,11 @@ impl Web3Connections {
// fetches new transactions from the notifying rpc
// forwards new transacitons to pending_tx_receipt_sender
if let Some(pending_tx_sender) = pending_tx_sender.clone() {
// TODO: do something with the handle so we can catch any errors
let clone = self.clone();
let handle = task::spawn(async move {
// TODO: set up this future the same as the block funnel
while let Ok((pending_tx_id, rpc)) = pending_tx_id_receiver.recv_async().await {
let f = clone.clone().funnel_transaction(
let f = clone.clone().process_incoming_tx_id(
rpc,
pending_tx_id,
pending_tx_sender.clone(),
@ -223,10 +238,14 @@ impl Web3Connections {
let connections = Arc::clone(&self);
let pending_tx_sender = pending_tx_sender.clone();
let handle = task::Builder::default()
.name("update_synced_rpcs")
.name("process_incoming_blocks")
.spawn(async move {
connections
.update_synced_rpcs(block_receiver, head_block_sender, pending_tx_sender)
.process_incoming_blocks(
block_receiver,
head_block_sender,
pending_tx_sender,
)
.await
});
@ -235,11 +254,11 @@ impl Web3Connections {
if futures.is_empty() {
// no transaction or block subscriptions.
// todo!("every second, check that the provider is still connected");?
let handle = task::Builder::default().name("noop").spawn(async move {
loop {
sleep(Duration::from_secs(600)).await;
// TODO: "every interval, check that the provider is still connected"
}
});
@ -265,7 +284,7 @@ impl Web3Connections {
// TODO: remove this box once i figure out how to do the options
params: Option<&serde_json::Value>,
) -> Result<Box<RawValue>, ProviderError> {
// TODO: if only 1 active_request_handles, do self.try_send_request
// TODO: if only 1 active_request_handles, do self.try_send_request?
let responses = active_request_handles
.into_iter()
@ -283,6 +302,8 @@ impl Web3Connections {
let mut counts: Counter<String> = Counter::new();
let mut any_ok = false;
for response in responses {
// TODO: i think we need to do something smarter with provider error. we at least need to wrap it up as JSON
// TODO: emit stats errors?
let s = format!("{:?}", response);
if count_map.get(&s).is_none() {
@ -325,12 +346,10 @@ impl Web3Connections {
// TODO: we are going to be checking "has_block_data" a lot now. i think we pretty much always have min_block_needed now that we override "latest"
let mut synced_rpcs: Vec<Arc<Web3Connection>> =
if let Some(min_block_needed) = min_block_needed {
// TODO: this includes ALL archive servers. but we only want them if they are on a somewhat recent block
// TODO: maybe instead of "archive_needed" bool it should be the minimum height. then even delayed servers might be fine. will need to track all heights then
self.conns
.values()
.filter(|x| x.has_block_data(min_block_needed))
.filter(|x| !skip.contains(x))
.filter(|x| x.has_block_data(min_block_needed))
.cloned()
.collect()
} else {
@ -344,7 +363,8 @@ impl Web3Connections {
};
if synced_rpcs.is_empty() {
// TODO: what should happen here? might be nicer to retry in a second
// TODO: what should happen here? automatic retry?
// TODO: more detailed error
return Err(anyhow::anyhow!("not synced"));
}
@ -375,7 +395,7 @@ impl Web3Connections {
// now that the rpcs are sorted, try to get an active request handle for one of them
for rpc in synced_rpcs.into_iter() {
// increment our connection counter
match rpc.try_open_request().await {
match rpc.try_request_handle().await {
Ok(OpenRequestResult::Handle(handle)) => {
trace!("next server on {:?}: {:?}", self, rpc);
return Ok(OpenRequestResult::Handle(handle));
@ -420,7 +440,7 @@ impl Web3Connections {
}
// check rate limits and increment our connection counter
match connection.try_open_request().await {
match connection.try_request_handle().await {
Ok(OpenRequestResult::RetryAt(retry_at)) => {
// this rpc is not available. skip it
earliest_retry_at = earliest_retry_at.min(Some(retry_at));

@ -24,7 +24,7 @@ impl Web3Connections {
// TODO: there is a race here on geth. sometimes the rpc isn't yet ready to serve the transaction (even though they told us about it!)
// TODO: yearn devs have had better luck with batching these, but i think that's likely just adding a delay itself
// TODO: if one rpc fails, try another?
let tx: Transaction = match rpc.try_open_request().await {
let tx: Transaction = match rpc.try_request_handle().await {
Ok(OpenRequestResult::Handle(handle)) => {
handle
.request("eth_getTransactionByHash", (pending_tx_id,))
@ -55,7 +55,7 @@ impl Web3Connections {
}
/// dedupe transaction and send them to any listening clients
pub(super) async fn funnel_transaction(
pub(super) async fn process_incoming_tx_id(
self: Arc<Self>,
rpc: Arc<Web3Connection>,
pending_tx_id: TxHash,