major refactor to only use backup servers when absolutely necessary

This commit is contained in:
Bryan Stitt 2023-01-19 02:13:00 -08:00
parent e4b0d4b76d
commit 0c05b5bdee
8 changed files with 656 additions and 329 deletions

@ -305,6 +305,8 @@ These are not yet ordered. There might be duplicates. We might not actually need
- [x] status page should show version
- [x] combine the proxy and cli into one bin
- [x] improve rate limiting on websockets
- [x] retry another server if we get a jsonrpc response error about rate limits
- [x] major refactor to only use backup servers when absolutely necessary
- [-] proxy mode for benchmarking all backends
- [-] proxy mode for sending to multiple backends
- [-] let users choose a % of reverts to log (or maybe x/second). someone like curve logging all reverts will be a BIG database very quickly

@ -727,6 +727,10 @@ impl Web3ProxyApp {
Ok((app, cancellable_handles, important_background_handles).into())
}
pub fn head_block_receiver(&self) -> watch::Receiver<ArcBlock> {
self.head_block_receiver.clone()
}
pub async fn prometheus_metrics(&self) -> String {
let globals = HashMap::new();
// TODO: what globals? should this be the hostname or what?

@ -51,22 +51,25 @@ async fn run(
let app_frontend_port = frontend_port;
let app_prometheus_port = prometheus_port;
let mut shutdown_receiver = shutdown_sender.subscribe();
// start the main app
let mut spawned_app =
Web3ProxyApp::spawn(top_config, num_workers, shutdown_sender.subscribe()).await?;
let frontend_handle = tokio::spawn(frontend::serve(app_frontend_port, spawned_app.app.clone()));
// TODO: should we put this in a dedicated thread?
// start the prometheus metrics port
let prometheus_handle = tokio::spawn(metrics_frontend::serve(
spawned_app.app.clone(),
app_prometheus_port,
));
let mut shutdown_receiver = shutdown_sender.subscribe();
// wait until the app has seen its first consensus head block
let _ = spawned_app.app.head_block_receiver().changed().await;
// if everything is working, these should both run forever
// start the frontend port
let frontend_handle = tokio::spawn(frontend::serve(app_frontend_port, spawned_app.app.clone()));
// if everything is working, these should all run forever
tokio::select! {
x = flatten_handles(spawned_app.app_handles) => {
match x {
@ -204,6 +207,7 @@ mod tests {
disabled: false,
display_name: None,
url: anvil.endpoint(),
backup: Some(false),
block_data_limit: None,
soft_limit: 100,
hard_limit: None,
@ -218,6 +222,7 @@ mod tests {
disabled: false,
display_name: None,
url: anvil.ws_endpoint(),
backup: Some(false),
block_data_limit: None,
soft_limit: 100,
hard_limit: None,

@ -198,6 +198,8 @@ pub struct Web3ConnectionConfig {
pub soft_limit: u32,
/// the requests per second at which the server throws errors (rate limit or otherwise)
pub hard_limit: Option<u64>,
/// only use this rpc if everything else is lagging too far. this allows us to ignore fast but very low limit rpcs
pub backup: Option<bool>,
/// All else equal, a server with a lower tier receives all requests
#[serde(default = "default_tier")]
pub tier: u64,
@ -256,6 +258,8 @@ impl Web3ConnectionConfig {
None
};
let backup = self.backup.unwrap_or(false);
Web3Connection::spawn(
name,
allowed_lag,
@ -267,6 +271,7 @@ impl Web3ConnectionConfig {
http_interval_sender,
hard_limit,
self.soft_limit,
backup,
self.block_data_limit,
block_map,
block_sender,

@ -4,13 +4,13 @@ use super::connections::Web3Connections;
use super::transactions::TxStatus;
use crate::frontend::authorization::Authorization;
use crate::{
config::BlockAndRpc, jsonrpc::JsonRpcRequest, rpcs::synced_connections::SyncedConnections,
config::BlockAndRpc, jsonrpc::JsonRpcRequest, rpcs::synced_connections::ConsensusConnections,
};
use anyhow::Context;
use derive_more::From;
use ethers::prelude::{Block, TxHash, H256, U64};
use hashbrown::{HashMap, HashSet};
use log::{debug, warn, Level};
use log::{debug, error, warn, Level};
use moka::future::Cache;
use serde::Serialize;
use serde_json::json;
@ -24,7 +24,7 @@ pub type ArcBlock = Arc<Block<TxHash>>;
pub type BlockHashesCache = Cache<H256, ArcBlock, hashbrown::hash_map::DefaultHashBuilder>;
/// A block's hash and number.
/// A block and its age.
#[derive(Clone, Debug, Default, From, Serialize)]
pub struct SavedBlock {
pub block: ArcBlock,
@ -99,14 +99,18 @@ impl Display for SavedBlock {
impl Web3Connections {
/// add a block to our mappings and track the heaviest chain
pub async fn save_block(&self, block: &ArcBlock, heaviest_chain: bool) -> anyhow::Result<()> {
pub async fn save_block(
&self,
block: ArcBlock,
heaviest_chain: bool,
) -> anyhow::Result<ArcBlock> {
// TODO: i think we can rearrange this function to make it faster on the hot path
let block_hash = block.hash.as_ref().context("no block hash")?;
// skip Block::default()
if block_hash.is_zero() {
debug!("Skipping block without hash!");
return Ok(());
return Ok(block);
}
let block_num = block.number.as_ref().context("no block num")?;
@ -121,15 +125,17 @@ impl Web3Connections {
// this block is very likely already in block_hashes
// TODO: use their get_with
self.block_hashes
let block = self
.block_hashes
.get_with(*block_hash, async move { block.clone() })
.await;
Ok(())
Ok(block)
}
/// Get a block from caches with fallback.
/// Will query a specific node or the best available.
/// TODO: return anyhow::Result<Option<ArcBlock>>?
pub async fn block(
&self,
authorization: &Arc<Authorization>,
@ -138,6 +144,7 @@ impl Web3Connections {
) -> anyhow::Result<ArcBlock> {
// first, try to get the hash from our cache
// the cache is set last, so if its here, its everywhere
// TODO: use try_get_with
if let Some(block) = self.block_hashes.get(hash) {
return Ok(block);
}
@ -178,7 +185,7 @@ impl Web3Connections {
// the block was fetched using eth_getBlockByHash, so it should have all fields
// TODO: fill in heaviest_chain! if the block is old enough, is this definitely true?
self.save_block(&block, false).await?;
let block = self.save_block(block, false).await?;
Ok(block)
}
@ -249,7 +256,7 @@ impl Web3Connections {
let block: ArcBlock = serde_json::from_str(raw_block.get())?;
// the block was fetched using eth_getBlockByNumber, so it should have all fields and be on the heaviest chain
self.save_block(&block, true).await?;
let block = self.save_block(block, true).await?;
Ok((block, archive_needed))
}
@ -265,7 +272,7 @@ impl Web3Connections {
) -> anyhow::Result<()> {
// TODO: indexmap or hashmap? what hasher? with_capacity?
// TODO: this will grow unbounded. prune old heads on this at the same time we prune the graph?
let mut connection_heads = HashMap::new();
let mut connection_heads = ConsensusFinder::default();
while let Ok((new_block, rpc)) = block_receiver.recv_async().await {
let new_block = new_block.map(Into::into);
@ -287,7 +294,7 @@ impl Web3Connections {
}
}
// TODO: if there was an error, we should return it
// TODO: if there was an error, should we return it instead of an Ok?
warn!("block_receiver exited!");
Ok(())
@ -299,327 +306,590 @@ impl Web3Connections {
pub(crate) async fn process_block_from_rpc(
&self,
authorization: &Arc<Authorization>,
connection_heads: &mut HashMap<String, H256>,
consensus_finder: &mut ConsensusFinder,
rpc_head_block: Option<SavedBlock>,
rpc: Arc<Web3Connection>,
head_block_sender: &watch::Sender<ArcBlock>,
pending_tx_sender: &Option<broadcast::Sender<TxStatus>>,
) -> anyhow::Result<()> {
// add the rpc's block to connection_heads, or remove the rpc from connection_heads
let rpc_head_block = match rpc_head_block {
Some(rpc_head_block) => {
// we don't know if its on the heaviest chain yet
self.save_block(&rpc_head_block.block, false).await?;
// TODO: don't default to 60. different chains are differen
if rpc_head_block.syncing(60) {
if connection_heads.remove(&rpc.name).is_some() {
warn!("{} is behind by {} seconds", &rpc.name, rpc_head_block.age);
} else {
// we didn't remove anything and this block is old. exit early
return Ok(());
};
None
} else {
let rpc_head_hash = rpc_head_block.hash();
if let Some(prev_hash) =
connection_heads.insert(rpc.name.to_owned(), rpc_head_hash)
{
if prev_hash == rpc_head_hash {
// this block was already sent by this node. return early
return Ok(());
}
}
// TODO: should we just keep the ArcBlock here?
Some(rpc_head_block)
}
}
None => {
// // trace!(%rpc, "Block without number or hash!");
if connection_heads.remove(&rpc.name).is_none() {
// this connection was already removed.
// return early. no need to process synced connections
return Ok(());
}
None
}
};
// iterate the known heads to find the highest_work_block
let mut checked_heads = HashSet::new();
let mut highest_num_block: Option<ArcBlock> = None;
for (conn_name, connection_head_hash) in connection_heads.iter() {
if checked_heads.contains(connection_head_hash) {
// we already checked this head from another rpc
continue;
}
// don't check the same hash multiple times
checked_heads.insert(connection_head_hash);
let conn_head_block = if let Some(x) = self.block_hashes.get(connection_head_hash) {
x
} else {
// TODO: why does this happen?!?! seems to only happen with uncled blocks
// TODO: maybe we should do get_with?
// TODO: maybe we should just continue. this only seems to happen when an older block is received
warn!("Missing connection_head_block in block_hashes. Fetching now. hash={}. other={}. rpc={}", connection_head_hash, conn_name, rpc);
// this option should always be populated
let conn_rpc = self.conns.get(conn_name);
match self
.block(authorization, connection_head_hash, conn_rpc)
.await
{
Ok(block) => block,
Err(err) => {
warn!("Processing {}. Failed fetching connection_head_block for block_hashes. {} head hash={}. err={:?}", rpc, conn_name, connection_head_hash, err);
continue;
}
}
};
match &conn_head_block.number {
None => {
panic!("block is missing number. this is a bug");
}
Some(conn_head_num) => {
// if this is the first block we've tried
// or if this rpc's newest block has a higher number
// we used to check total difficulty, but that isn't a thing anymore
if highest_num_block.is_none()
|| conn_head_num
> highest_num_block
.as_ref()
.expect("there should always be a block here")
.number
.as_ref()
.expect("there should always be number here")
{
highest_num_block = Some(conn_head_block);
}
}
}
// TODO: how should we handle an error here?
if !consensus_finder
.update_rpc(rpc_head_block.clone(), rpc.clone(), self)
.await?
{
// nothing changed. no need
return Ok(());
}
if let Some(mut maybe_head_block) = highest_num_block {
// track rpcs on this heaviest chain so we can build a new SyncedConnections
let mut highest_rpcs = HashSet::<&String>::new();
// a running total of the soft limits covered by the rpcs that agree on the head block
let mut highest_rpcs_sum_soft_limit: u32 = 0;
// TODO: also track highest_rpcs_sum_hard_limit? llama doesn't need this, so it can wait
let new_synced_connections = consensus_finder
.best_consensus_connections(authorization, self)
.await;
// check the highest work block for a set of rpcs that can serve our request load
// if it doesn't have enough rpcs for our request load, check the parent block
// TODO: loop for how many parent blocks? we don't want to serve blocks that are too far behind. probably different per chain
// TODO: this loop is pretty long. any way to clean up this code?
for _ in 0..3 {
let maybe_head_hash = maybe_head_block
.hash
.as_ref()
.expect("blocks here always need hashes");
let includes_backups = new_synced_connections.includes_backups;
let consensus_head_block = new_synced_connections.head_block.clone();
let num_consensus_rpcs = new_synced_connections.num_conns();
let num_checked_rpcs = new_synced_connections.num_checked_conns;
let num_active_rpcs = consensus_finder.all.rpc_name_to_hash.len();
let total_rpcs = self.conns.len();
// find all rpcs with maybe_head_block as their current head
for (conn_name, conn_head_hash) in connection_heads.iter() {
if conn_head_hash != maybe_head_hash {
// connection is not on the desired block
continue;
}
if highest_rpcs.contains(conn_name) {
// connection is on a child block
continue;
let old_synced_connections = self
.synced_connections
.swap(Arc::new(new_synced_connections));
if let Some(consensus_saved_block) = consensus_head_block {
match &old_synced_connections.head_block {
None => {
debug!(
"first {}/{}/{}/{} block={}, rpc={}",
num_consensus_rpcs,
num_checked_rpcs,
num_active_rpcs,
total_rpcs,
consensus_saved_block,
rpc
);
if includes_backups {
// TODO: what else should be in this error?
warn!("Backup RPCs are in use!");
}
if let Some(rpc) = self.conns.get(conn_name) {
highest_rpcs.insert(conn_name);
highest_rpcs_sum_soft_limit += rpc.soft_limit;
} else {
warn!("connection missing")
}
let consensus_head_block =
self.save_block(consensus_saved_block.block, true).await?;
head_block_sender
.send(consensus_head_block)
.context("head_block_sender sending consensus_head_block")?;
}
Some(old_head_block) => {
// TODO: do this log item better
let rpc_head_str = rpc_head_block
.map(|x| x.to_string())
.unwrap_or_else(|| "None".to_string());
if highest_rpcs_sum_soft_limit < self.min_sum_soft_limit
|| highest_rpcs.len() < self.min_head_rpcs
{
// not enough rpcs yet. check the parent
if let Some(parent_block) = self.block_hashes.get(&maybe_head_block.parent_hash)
{
// // trace!(
// child=%maybe_head_hash, parent=%parent_block.hash.unwrap(), "avoiding thundering herd",
// );
maybe_head_block = parent_block;
continue;
} else {
// TODO: this message
warn!(
"soft limit {}/{} from {}/{} rpcs: {}%",
highest_rpcs_sum_soft_limit,
self.min_sum_soft_limit,
highest_rpcs.len(),
self.min_head_rpcs,
highest_rpcs_sum_soft_limit * 100 / self.min_sum_soft_limit
);
break;
}
}
}
// TODO: if consensus_head_rpcs.is_empty, try another method of finding the head block
let num_connection_heads = connection_heads.len();
let total_conns = self.conns.len();
// we've done all the searching for the heaviest block that we can
if highest_rpcs.is_empty() {
// if we get here, something is wrong. clear synced connections
let empty_synced_connections = SyncedConnections::default();
let _ = self
.synced_connections
.swap(Arc::new(empty_synced_connections));
// TODO: log different things depending on old_synced_connections
warn!(
"Processing {}. no consensus head! {}/{}/{}",
rpc, 0, num_connection_heads, total_conns
);
} else {
// // trace!(?highest_rpcs);
// TODO: if maybe_head_block.time() is old, ignore it
// success! this block has enough soft limit and nodes on it (or on later blocks)
let conns: Vec<Arc<Web3Connection>> = highest_rpcs
.into_iter()
.filter_map(|conn_name| self.conns.get(conn_name).cloned())
.collect();
// TODO: DEBUG only check
let _ = maybe_head_block
.hash
.expect("head blocks always have hashes");
let _ = maybe_head_block
.number
.expect("head blocks always have numbers");
let num_consensus_rpcs = conns.len();
let consensus_head_block: SavedBlock = maybe_head_block.into();
let new_synced_connections = SyncedConnections {
head_block: Some(consensus_head_block.clone()),
conns,
};
let old_synced_connections = self
.synced_connections
.swap(Arc::new(new_synced_connections));
// TODO: if the rpc_head_block != consensus_head_block, log something?
match &old_synced_connections.head_block {
None => {
debug!(
"first {}/{}/{} block={}, rpc={}",
num_consensus_rpcs,
num_connection_heads,
total_conns,
consensus_head_block,
rpc
);
self.save_block(&consensus_head_block.block, true).await?;
head_block_sender
.send(consensus_head_block.block)
.context("head_block_sender sending consensus_head_block")?;
}
Some(old_head_block) => {
// TODO: do this log item better
let rpc_head_str = rpc_head_block
.map(|x| x.to_string())
.unwrap_or_else(|| "None".to_string());
match consensus_head_block.number().cmp(&old_head_block.number()) {
Ordering::Equal => {
// TODO: if rpc_block_id != consensus_head_block, do a different log?
// multiple blocks with the same fork!
if consensus_head_block.hash() == old_head_block.hash() {
// no change in hash. no need to use head_block_sender
debug!(
"con {}/{}/{} con_head={} rpc_head={} rpc={}",
num_consensus_rpcs,
num_connection_heads,
total_conns,
consensus_head_block,
rpc_head_str,
rpc,
)
} else {
// hash changed
debug!(
"unc {}/{}/{} con_head={} old={} rpc_head={} rpc={}",
num_consensus_rpcs,
num_connection_heads,
total_conns,
consensus_head_block,
old_head_block,
rpc_head_str,
rpc,
);
self.save_block(&consensus_head_block.block, true)
.await
.context("save consensus_head_block as heaviest chain")?;
head_block_sender.send(consensus_head_block.block).context(
"head_block_sender sending consensus_head_block",
)?;
}
}
Ordering::Less => {
// this is unlikely but possible
// TODO: better log
warn!("chain rolled back {}/{}/{} con_head={} old_head={} rpc_head={} rpc={}", num_consensus_rpcs, num_connection_heads, total_conns, consensus_head_block, old_head_block, rpc_head_str, rpc);
// TODO: tell save_block to remove any higher block numbers from the cache. not needed because we have other checks on requested blocks being > head, but still seems slike a good idea
self.save_block(&consensus_head_block.block, true)
.await
.context(
"save_block sending consensus_head_block as heaviest chain",
)?;
head_block_sender
.send(consensus_head_block.block)
.context("head_block_sender sending consensus_head_block")?;
}
Ordering::Greater => {
match consensus_saved_block.number().cmp(&old_head_block.number()) {
Ordering::Equal => {
// multiple blocks with the same fork!
if consensus_saved_block.hash() == old_head_block.hash() {
// no change in hash. no need to use head_block_sender
debug!(
"new {}/{}/{} con_head={} rpc_head={} rpc={}",
"con {}/{}/{}/{} con={} rpc={}@{}",
num_consensus_rpcs,
num_connection_heads,
total_conns,
consensus_head_block,
num_checked_rpcs,
num_active_rpcs,
total_rpcs,
consensus_saved_block,
rpc,
rpc_head_str,
)
} else {
// hash changed
if includes_backups {
// TODO: what else should be in this error?
warn!("Backup RPCs are in use!");
}
debug!(
"unc {}/{}/{}/{} con_head={} old={} rpc={}@{}",
num_consensus_rpcs,
num_checked_rpcs,
num_active_rpcs,
total_rpcs,
consensus_saved_block,
old_head_block,
rpc,
rpc_head_str,
rpc
);
self.save_block(&consensus_head_block.block, true).await?;
let consensus_head_block = self
.save_block(consensus_saved_block.block, true)
.await
.context("save consensus_head_block as heaviest chain")?;
head_block_sender.send(consensus_head_block.block)?;
head_block_sender
.send(consensus_head_block)
.context("head_block_sender sending consensus_head_block")?;
}
}
Ordering::Less => {
// this is unlikely but possible
// TODO: better log
warn!(
"chain rolled back {}/{}/{}/{} con={} old={} rpc={}@{}",
num_consensus_rpcs,
num_checked_rpcs,
num_active_rpcs,
total_rpcs,
consensus_saved_block,
old_head_block,
rpc,
rpc_head_str,
);
if includes_backups {
// TODO: what else should be in this error?
warn!("Backup RPCs are in use!");
}
// TODO: tell save_block to remove any higher block numbers from the cache. not needed because we have other checks on requested blocks being > head, but still seems like a good idea
let consensus_head_block = self
.save_block(consensus_saved_block.block, true)
.await
.context(
"save_block sending consensus_head_block as heaviest chain",
)?;
head_block_sender
.send(consensus_head_block)
.context("head_block_sender sending consensus_head_block")?;
}
Ordering::Greater => {
debug!(
"new {}/{}/{}/{} con={} rpc={}@{}",
num_consensus_rpcs,
num_checked_rpcs,
num_active_rpcs,
total_rpcs,
consensus_saved_block,
rpc,
rpc_head_str,
);
if includes_backups {
// TODO: what else should be in this error?
warn!("Backup RPCs are in use!");
}
let consensus_head_block =
self.save_block(consensus_saved_block.block, true).await?;
head_block_sender.send(consensus_head_block)?;
}
}
}
}
} else {
// TODO: do this log item better
let rpc_head_str = rpc_head_block
.map(|x| x.to_string())
.unwrap_or_else(|| "None".to_string());
if num_checked_rpcs >= self.min_head_rpcs {
error!(
"non {}/{}/{}/{} rpc={}@{}",
num_consensus_rpcs,
num_checked_rpcs,
num_active_rpcs,
total_rpcs,
rpc,
rpc_head_str,
);
} else {
debug!(
"non {}/{}/{}/{} rpc={}@{}",
num_consensus_rpcs,
num_checked_rpcs,
num_active_rpcs,
total_rpcs,
rpc,
rpc_head_str,
);
}
}
Ok(())
}
}
struct ConnectionsGroup {
includes_backups: bool,
rpc_name_to_hash: HashMap<String, H256>,
}
impl ConnectionsGroup {
fn new(with_backups: bool) -> Self {
Self {
includes_backups: with_backups,
rpc_name_to_hash: Default::default(),
}
}
fn without_backups() -> Self {
Self::new(false)
}
fn with_backups() -> Self {
Self::new(true)
}
fn remove(&mut self, rpc: &Web3Connection) -> Option<H256> {
self.rpc_name_to_hash.remove(rpc.name.as_str())
}
fn insert(&mut self, rpc: &Web3Connection, block_hash: H256) -> Option<H256> {
self.rpc_name_to_hash.insert(rpc.name.clone(), block_hash)
}
// TODO: i don't love having this here. move to web3_connections?
async fn get_block_from_rpc(
&self,
rpc_name: &str,
hash: &H256,
authorization: &Arc<Authorization>,
web3_connections: &Web3Connections,
) -> anyhow::Result<ArcBlock> {
// // TODO: why does this happen?!?! seems to only happen with uncled blocks
// // TODO: maybe we should do try_get_with?
// // TODO: maybe we should just continue. this only seems to happen when an older block is received
// warn!(
// "Missing connection_head_block in block_hashes. Fetching now. hash={}. other={}",
// connection_head_hash, conn_name
// );
// this option should almost always be populated. if the connection reconnects at a bad time it might not be available though
let rpc = web3_connections.conns.get(rpc_name);
web3_connections.block(authorization, hash, rpc).await
}
// TODO: do this during insert/remove?
pub(self) async fn highest_block(
&self,
authorization: &Arc<Authorization>,
web3_connections: &Web3Connections,
) -> Option<ArcBlock> {
let mut checked_heads = HashSet::with_capacity(self.rpc_name_to_hash.len());
let mut highest_block = None::<ArcBlock>;
for (rpc_name, rpc_head_hash) in self.rpc_name_to_hash.iter() {
// don't waste time checking the same hash multiple times
if checked_heads.contains(rpc_head_hash) {
continue;
}
let rpc_block = match self
.get_block_from_rpc(rpc_name, rpc_head_hash, authorization, web3_connections)
.await
{
Ok(x) => x,
Err(err) => {
warn!(
"failed getting block {} from {} while finding highest block number: {:?}",
rpc_head_hash, rpc_name, err,
);
continue;
}
};
checked_heads.insert(rpc_head_hash);
// if this is the first block we've tried
// or if this rpc's newest block has a higher number
// we used to check total difficulty, but that isn't a thing anymore on ETH
// TODO: we still need total difficulty on some other PoW chains. whats annoying is it isn't considered part of the "block header" just the block. so websockets don't return it
let highest_num = highest_block
.as_ref()
.map(|x| x.number.expect("blocks here should always have a number"));
let rpc_num = rpc_block.as_ref().number;
if rpc_num > highest_num {
highest_block = Some(rpc_block);
}
}
highest_block
}
pub(self) async fn consensus_head_connections(
&self,
authorization: &Arc<Authorization>,
web3_connections: &Web3Connections,
) -> anyhow::Result<ConsensusConnections> {
let mut maybe_head_block = match self.highest_block(authorization, web3_connections).await {
None => return Err(anyhow::anyhow!("No blocks known")),
Some(x) => x,
};
let num_known = self.rpc_name_to_hash.len();
// track rpcs on this heaviest chain so we can build a new ConsensusConnections
let mut highest_rpcs = HashSet::<&str>::new();
// a running total of the soft limits covered by the rpcs that agree on the head block
let mut highest_rpcs_sum_soft_limit: u32 = 0;
// TODO: also track highest_rpcs_sum_hard_limit? llama doesn't need this, so it can wait
// check the highest work block for a set of rpcs that can serve our request load
// if it doesn't have enough rpcs for our request load, check the parent block
// TODO: loop for how many parent blocks? we don't want to serve blocks that are too far behind. probably different per chain
// TODO: this loop is pretty long. any way to clean up this code?
for _ in 0..6 {
let maybe_head_hash = maybe_head_block
.hash
.as_ref()
.expect("blocks here always need hashes");
// find all rpcs with maybe_head_block as their current head
for (rpc_name, rpc_head_hash) in self.rpc_name_to_hash.iter() {
if rpc_head_hash != maybe_head_hash {
// connection is not on the desired block
continue;
}
if highest_rpcs.contains(rpc_name.as_str()) {
// connection is on a child block
continue;
}
if let Some(rpc) = web3_connections.conns.get(rpc_name.as_str()) {
highest_rpcs.insert(rpc_name);
highest_rpcs_sum_soft_limit += rpc.soft_limit;
} else {
// i don't think this is an error. i think its just if a reconnect is currently happening
warn!("connection missing: {}", rpc_name);
}
}
if highest_rpcs_sum_soft_limit >= web3_connections.min_sum_soft_limit
&& highest_rpcs.len() >= web3_connections.min_head_rpcs
{
// we have enough servers with enough requests
break;
}
// not enough rpcs yet. check the parent block
if let Some(parent_block) = web3_connections
.block_hashes
.get(&maybe_head_block.parent_hash)
{
// trace!(
// child=%maybe_head_hash, parent=%parent_block.hash.unwrap(), "avoiding thundering herd",
// );
maybe_head_block = parent_block;
continue;
} else {
if num_known < web3_connections.min_head_rpcs {
return Err(anyhow::anyhow!(
"not enough rpcs connected: {}/{}/{}",
highest_rpcs.len(),
num_known,
web3_connections.min_head_rpcs,
));
} else {
let soft_limit_percent = (highest_rpcs_sum_soft_limit as f32
/ web3_connections.min_sum_soft_limit as f32)
* 100.0;
return Err(anyhow::anyhow!(
"ran out of parents to check. rpcs {}/{}/{}. soft limit: {:.2}% ({}/{})",
highest_rpcs.len(),
num_known,
web3_connections.min_head_rpcs,
highest_rpcs_sum_soft_limit,
web3_connections.min_sum_soft_limit,
soft_limit_percent,
));
}
}
}
// TODO: if consensus_head_rpcs.is_empty, try another method of finding the head block. will need to change the return Err above into breaks.
// we've done all the searching for the heaviest block that we can
if highest_rpcs.len() < web3_connections.min_head_rpcs
|| highest_rpcs_sum_soft_limit < web3_connections.min_sum_soft_limit
{
// if we get here, not enough servers are synced. return an error
let soft_limit_percent = (highest_rpcs_sum_soft_limit as f32
/ web3_connections.min_sum_soft_limit as f32)
* 100.0;
return Err(anyhow::anyhow!(
"Not enough resources. rpcs {}/{}/{}. soft limit: {:.2}% ({}/{})",
highest_rpcs.len(),
num_known,
web3_connections.min_head_rpcs,
highest_rpcs_sum_soft_limit,
web3_connections.min_sum_soft_limit,
soft_limit_percent,
));
}
// success! this block has enough soft limit and nodes on it (or on later blocks)
let conns: Vec<Arc<Web3Connection>> = highest_rpcs
.into_iter()
.filter_map(|conn_name| web3_connections.conns.get(conn_name).cloned())
.collect();
// TODO: DEBUG only check
let _ = maybe_head_block
.hash
.expect("head blocks always have hashes");
let _ = maybe_head_block
.number
.expect("head blocks always have numbers");
let consensus_head_block: SavedBlock = maybe_head_block.into();
Ok(ConsensusConnections {
head_block: Some(consensus_head_block),
conns,
num_checked_conns: self.rpc_name_to_hash.len(),
includes_backups: self.includes_backups,
})
}
}
/// A ConsensusConnections builder that tracks all connection heads across multiple groups of servers
pub struct ConsensusFinder {
/// only main servers
main: ConnectionsGroup,
/// main and backup servers
all: ConnectionsGroup,
}
impl Default for ConsensusFinder {
fn default() -> Self {
Self {
main: ConnectionsGroup::without_backups(),
all: ConnectionsGroup::with_backups(),
}
}
}
impl ConsensusFinder {
fn remove(&mut self, rpc: &Web3Connection) -> Option<H256> {
// TODO: should we have multiple backup tiers? (remote datacenters vs third party)
if !rpc.backup {
self.main.remove(rpc);
}
self.all.remove(rpc)
}
fn insert(&mut self, rpc: &Web3Connection, new_hash: H256) -> Option<H256> {
// TODO: should we have multiple backup tiers? (remote datacenters vs third party)
if !rpc.backup {
self.main.insert(rpc, new_hash);
}
self.all.insert(rpc, new_hash)
}
/// Update our tracking of the rpc and return true if something changed
async fn update_rpc(
&mut self,
rpc_head_block: Option<SavedBlock>,
rpc: Arc<Web3Connection>,
// we need this so we can save the block to caches. i don't like it though. maybe we should use a lazy_static Cache wrapper that has a "save_block" method?. i generally dislike globals but i also dislike all the types having to pass eachother around
web3_connections: &Web3Connections,
) -> anyhow::Result<bool> {
// add the rpc's block to connection_heads, or remove the rpc from connection_heads
let changed = match rpc_head_block {
Some(mut rpc_head_block) => {
// we don't know if its on the heaviest chain yet
rpc_head_block.block = web3_connections
.save_block(rpc_head_block.block, false)
.await?;
// we used to remove here if the block was too far behind. but it just made things more complicated
let rpc_head_hash = rpc_head_block.hash();
if let Some(prev_hash) = self.insert(&rpc, rpc_head_hash) {
if prev_hash == rpc_head_hash {
// this block was already sent by this rpc. return early
false
} else {
// new block for this rpc
true
}
} else {
// first block for this rpc
true
}
}
None => {
if self.remove(&rpc).is_none() {
// this rpc was already removed
false
} else {
// rpc head changed from being synced to not
true
}
}
};
Ok(changed)
}
// TODO: this could definitely be cleaner. i don't like the error handling/unwrapping
async fn best_consensus_connections(
&mut self,
authorization: &Arc<Authorization>,
web3_connections: &Web3Connections,
) -> ConsensusConnections {
let highest_block_num = match self
.all
.highest_block(authorization, web3_connections)
.await
{
None => {
return ConsensusConnections::default();
}
Some(x) => x.number.expect("blocks here should always have a number"),
};
let min_block_num = highest_block_num.saturating_sub(U64::from(5));
// TODO: pass `min_block_num` to consensus_head_connections?
let consensus_head_for_main = self
.main
.consensus_head_connections(authorization, web3_connections)
.await
.map_err(|err| err.context("cannot use main group"));
let consensus_num_for_main = consensus_head_for_main
.as_ref()
.ok()
.map(|x| x.head_block.as_ref().unwrap().number());
if let Some(consensus_num_for_main) = consensus_num_for_main {
if consensus_num_for_main >= min_block_num {
return consensus_head_for_main.unwrap();
}
}
// TODO: pass `min_block_num` to consensus_head_connections?
let consensus_connections_for_all = match self
.all
.consensus_head_connections(authorization, web3_connections)
.await
{
Err(err) => {
warn!("Unable to find any consensus head: {}", err);
return ConsensusConnections::default();
}
Ok(x) => x,
};
let consensus_num_for_all = consensus_connections_for_all
.head_block
.as_ref()
.map(|x| x.number());
if consensus_num_for_all > consensus_num_for_main {
if consensus_num_for_all < Some(min_block_num) {
// TODO: this should have an alarm in sentry
error!("CONSENSUS HEAD w/ BACKUP NODES IS VERY OLD!");
}
consensus_connections_for_all
} else {
if let Ok(x) = consensus_head_for_main {
error!("CONSENSUS HEAD IS VERY OLD! Backup RPCs did not improve this situation");
x
} else {
error!("NO CONSENSUS HEAD!");
ConsensusConnections::default()
}
}
}
}

@ -84,6 +84,8 @@ pub struct Web3Connection {
pub(super) soft_limit: u32,
/// use web3 queries to find the block data limit for archive/pruned nodes
pub(super) automatic_block_limit: bool,
/// only use this rpc if everything else is lagging too far. this allows us to ignore fast but very low limit rpcs
pub(super) backup: bool,
/// TODO: have an enum for this so that "no limit" prints pretty?
pub(super) block_data_limit: AtomicU64,
/// Lower tiers are higher priority when sending requests
@ -111,6 +113,7 @@ impl Web3Connection {
hard_limit: Option<(u64, RedisPool)>,
// TODO: think more about this type
soft_limit: u32,
backup: bool,
block_data_limit: Option<u64>,
block_map: BlockHashesCache,
block_sender: Option<flume::Sender<BlockAndRpc>>,
@ -149,6 +152,7 @@ impl Web3Connection {
hard_limit,
soft_limit,
automatic_block_limit,
backup,
block_data_limit,
head_block: RwLock::new(Default::default()),
tier,
@ -304,6 +308,7 @@ impl Web3Connection {
None => return false,
Some(x) => {
// TODO: this 60 second limit is causing our polygons to fall behind. change this to number of blocks?
// TODO: sometimes blocks might actually just take longer than 60 seconds
if x.syncing(60) {
// skip syncing nodes. even though they might be able to serve a query,
// latency will be poor and it will get in the way of them syncing further
@ -648,7 +653,7 @@ impl Web3Connection {
// if this block is too old, return an error so we reconnect
let current_lag = x.lag();
if current_lag > allowed_lag {
let level = if warned == 0 {
let level = if warned == 0 && !conn.backup {
log::Level::Warn
} else if warned % 100 == 0 {
log::Level::Debug
@ -1225,6 +1230,7 @@ mod tests {
hard_limit: None,
soft_limit: 1_000,
automatic_block_limit: false,
backup: false,
block_data_limit: block_data_limit.into(),
tier: 0,
head_block: RwLock::new(Some(head_block.clone())),
@ -1273,6 +1279,7 @@ mod tests {
hard_limit: None,
soft_limit: 1_000,
automatic_block_limit: false,
backup: false,
block_data_limit: block_data_limit.into(),
tier: 0,
head_block: RwLock::new(Some(head_block.clone())),
@ -1325,6 +1332,7 @@ mod tests {
hard_limit: None,
soft_limit: 1_000,
automatic_block_limit: false,
backup: false,
block_data_limit: block_data_limit.into(),
tier: 0,
head_block: RwLock::new(Some(head_block.clone())),

@ -4,7 +4,7 @@ use super::connection::Web3Connection;
use super::request::{
OpenRequestHandle, OpenRequestHandleMetrics, OpenRequestResult, RequestRevertHandler,
};
use super::synced_connections::SyncedConnections;
use super::synced_connections::ConsensusConnections;
use crate::app::{flatten_handle, AnyhowJoinHandle};
use crate::config::{BlockAndRpc, TxHashAndRpc, Web3ConnectionConfig};
use crate::frontend::authorization::{Authorization, RequestMetadata};
@ -40,7 +40,7 @@ use tokio::time::{interval, sleep, sleep_until, Duration, Instant, MissedTickBeh
pub struct Web3Connections {
pub(crate) conns: HashMap<String, Arc<Web3Connection>>,
/// any requests will be forwarded to one (or more) of these connections
pub(super) synced_connections: ArcSwap<SyncedConnections>,
pub(super) synced_connections: ArcSwap<ConsensusConnections>,
pub(super) pending_transactions:
Cache<TxHash, TxStatus, hashbrown::hash_map::DefaultHashBuilder>,
/// TODO: this map is going to grow forever unless we do some sort of pruning. maybe store pruned in redis?
@ -196,7 +196,7 @@ impl Web3Connections {
}
}
let synced_connections = SyncedConnections::default();
let synced_connections = ConsensusConnections::default();
// TODO: max_capacity and time_to_idle from config
// all block hashes are the same size, so no need for weigher
@ -329,6 +329,7 @@ impl Web3Connections {
}
/// Send the same request to all the handles. Returning the most common success or most common error.
/// TODO: option to return the fastest response and handles for all the others instead?
pub async fn try_send_parallel_requests(
&self,
active_request_handles: Vec<OpenRequestHandle>,
@ -501,7 +502,7 @@ impl Web3Connections {
.collect();
trace!("minimum available requests: {}", minimum);
trace!("maximum available requests: {}", minimum);
trace!("maximum available requests: {}", maximum);
if maximum < 0.0 {
// TODO: if maximum < 0 and there are other tiers on the same block, we should include them now
@ -725,10 +726,20 @@ impl Web3Connections {
}
// some errors should be retried on other nodes
let error_msg = error.message.as_str();
// different providers do different codes. check all of them
// TODO: there's probably more strings to add here
let rate_limit_substrings = ["limit", "exceeded"];
for rate_limit_substr in rate_limit_substrings {
if error_msg.contains(rate_limit_substr) {
warn!("rate limited by {:?}", skip_rpcs.last());
continue;
}
}
match error.code {
-32000 => {
let error_msg = error.message.as_str();
// TODO: regex?
let retry_prefixes = [
"header not found",
@ -866,7 +877,7 @@ impl Web3Connections {
// TODO: return a 502? if it does?
// return Err(anyhow::anyhow!("no available rpcs!"));
// TODO: sleep how long?
// TODO: subscribe to something in SyncedConnections instead
// TODO: subscribe to something in ConsensusConnections instead
sleep(Duration::from_millis(200)).await;
continue;
@ -951,7 +962,11 @@ mod tests {
// TODO: why is this allow needed? does tokio::test get in the way somehow?
#![allow(unused_imports)]
use super::*;
use crate::rpcs::{blockchain::SavedBlock, connection::ProviderState, provider::Web3Provider};
use crate::rpcs::{
blockchain::{ConsensusFinder, SavedBlock},
connection::ProviderState,
provider::Web3Provider,
};
use ethers::types::{Block, U256};
use log::{trace, LevelFilter};
use parking_lot::RwLock;
@ -992,8 +1007,8 @@ mod tests {
let head_block = Arc::new(head_block);
// TODO: write a impl From for Block -> BlockId?
let lagged_block: SavedBlock = lagged_block.into();
let head_block: SavedBlock = head_block.into();
let mut lagged_block: SavedBlock = lagged_block.into();
let mut head_block: SavedBlock = head_block.into();
let block_data_limit = u64::MAX;
@ -1012,6 +1027,7 @@ mod tests {
hard_limit: None,
soft_limit: 1_000,
automatic_block_limit: true,
backup: false,
block_data_limit: block_data_limit.into(),
tier: 0,
head_block: RwLock::new(Some(head_block.clone())),
@ -1032,6 +1048,7 @@ mod tests {
hard_limit: None,
soft_limit: 1_000,
automatic_block_limit: false,
backup: false,
block_data_limit: block_data_limit.into(),
tier: 0,
head_block: RwLock::new(Some(lagged_block.clone())),
@ -1072,7 +1089,7 @@ mod tests {
let (head_block_sender, _head_block_receiver) =
watch::channel::<ArcBlock>(Default::default());
let mut connection_heads = HashMap::new();
let mut connection_heads = ConsensusFinder::default();
// process None so that
conns
@ -1123,7 +1140,7 @@ mod tests {
assert!(matches!(x, OpenRequestResult::NotReady));
// add lagged blocks to the conns. both servers should be allowed
conns.save_block(&lagged_block.block, true).await.unwrap();
lagged_block.block = conns.save_block(lagged_block.block, true).await.unwrap();
conns
.process_block_from_rpc(
@ -1151,7 +1168,7 @@ mod tests {
assert_eq!(conns.num_synced_rpcs(), 2);
// add head block to the conns. lagged_rpc should not be available
conns.save_block(&head_block.block, true).await.unwrap();
head_block.block = conns.save_block(head_block.block, true).await.unwrap();
conns
.process_block_from_rpc(
@ -1236,6 +1253,7 @@ mod tests {
hard_limit: None,
soft_limit: 3_000,
automatic_block_limit: false,
backup: false,
block_data_limit: 64.into(),
tier: 1,
head_block: RwLock::new(Some(head_block.clone())),
@ -1256,6 +1274,7 @@ mod tests {
hard_limit: None,
soft_limit: 1_000,
automatic_block_limit: false,
backup: false,
block_data_limit: u64::MAX.into(),
tier: 2,
head_block: RwLock::new(Some(head_block.clone())),
@ -1295,7 +1314,7 @@ mod tests {
let (head_block_sender, _head_block_receiver) =
watch::channel::<ArcBlock>(Default::default());
let mut connection_heads = HashMap::new();
let mut connection_heads = ConsensusFinder::default();
conns
.process_block_from_rpc(

@ -9,19 +9,33 @@ use std::sync::Arc;
/// A collection of Web3Connections that are on the same block.
/// Serialize is so we can print it on our debug endpoint
#[derive(Clone, Default, Serialize)]
pub struct SyncedConnections {
pub struct ConsensusConnections {
// TODO: store ArcBlock instead?
pub(super) head_block: Option<SavedBlock>,
// TODO: this should be able to serialize, but it isn't
#[serde(skip_serializing)]
pub(super) conns: Vec<Arc<Web3Connection>>,
pub(super) num_checked_conns: usize,
pub(super) includes_backups: bool,
}
impl fmt::Debug for SyncedConnections {
impl ConsensusConnections {
pub fn num_conns(&self) -> usize {
self.conns.len()
}
pub fn sum_soft_limit(&self) -> u32 {
self.conns.iter().fold(0, |sum, rpc| sum + rpc.soft_limit)
}
// TODO: sum_hard_limit?
}
impl fmt::Debug for ConsensusConnections {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// TODO: the default formatter takes forever to write. this is too quiet though
// TODO: print the actual conns?
f.debug_struct("SyncedConnections")
f.debug_struct("ConsensusConnections")
.field("head_block", &self.head_block)
.field("num_conns", &self.conns.len())
.finish_non_exhaustive()