From 6450a4cd420c35c7477c28011a9d2bb3e46817f4 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Thu, 1 Sep 2022 05:58:55 +0000 Subject: [PATCH] more fork detection work --- Cargo.lock | 17 +- TODO.md | 15 +- redis-rate-limit/Cargo.toml | 2 +- redis-rate-limit/src/lib.rs | 4 +- web3_proxy/Cargo.toml | 4 +- web3_proxy/src/app.rs | 26 ++-- web3_proxy/src/rpcs/blockchain.rs | 182 ++++++++++++++-------- web3_proxy/src/rpcs/connections.rs | 19 +-- web3_proxy/src/rpcs/synced_connections.rs | 32 ++-- 9 files changed, 177 insertions(+), 124 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index dcfd12b0..cee5c7be 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -85,9 +85,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.62" +version = "1.0.63" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1485d4d2cc45e7b201ee3767015c96faa5904387c9d87c6efdd0fb511f12d305" +checksum = "a26fa4d7e3f2eebadf743988fc8aec9fa9a9e82611acafd77c1462ed6262440a" dependencies = [ "backtrace", ] @@ -1245,13 +1245,14 @@ dependencies = [ [[package]] name = "dashmap" -version = "5.3.4" +version = "5.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3495912c9c1ccf2e18976439f4443f3fee0fd61f424ff99fde6a66b15ecb448f" +checksum = "907076dfda823b0b36d2a1bb5f90c96660a5bbcd7729e10727f07858f22c4edc" dependencies = [ "cfg-if", "hashbrown", "lock_api", + "once_cell", "parking_lot_core 0.9.3", ] @@ -2671,9 +2672,9 @@ dependencies = [ [[package]] name = "lock_api" -version = "0.4.7" +version = "0.4.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "327fa5b6a6940e4699ec49a9beae1ea4845c6bab9314e4f84ac68742139d8c53" +checksum = "9f80bf5aacaf25cbfc8210d1cfb718f2bf3b11c4c54e5afe36c236853a8ec390" dependencies = [ "autocfg 1.1.0", "scopeguard", @@ -2954,9 +2955,9 @@ dependencies = [ [[package]] name = "once_cell" -version = "1.13.0" +version = "1.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "18a6dbe30758c9f83eb00cbea4ac95966305f5a7772f3f42ebfc7fc7eddbd8e1" +checksum = "074864da206b4973b84eb91683020dbefd6a8c3f0f38e054d93954e891935e4e" [[package]] name = "oorandom" diff --git a/TODO.md b/TODO.md index 626c823c..509e8fe0 100644 --- a/TODO.md +++ b/TODO.md @@ -90,7 +90,8 @@ - whenever blocks were slow, we started checking as fast as possible - [x] create user script should allow setting requests per minute - [x] cache api keys that are not in the database -- [x] improve consensus block selection. Our goal is to find the highest work chain with a block over a minimum threshold of sum_soft_limit. +- [ ] improve consensus block selection. Our goal is to find the highest work chain with a block over a minimum threshold of sum_soft_limit. + - [x] i saw a fork of like 300 blocks. probably just because a node was restarted and had fallen behind. need some checks to ignore things that are far behind. this improvement should fix this problem - [x] A new block arrives at a connection. - [x] It checks that it isn't the same that it already has (which is a problem with polling nodes) - [x] If its new to this node... @@ -113,7 +114,12 @@ - if all_simple_paths returns no paths, warn about a chain split? - [x] now that we have a consensus head with enough soft limit (or an empty set), update SyncedConnections - [x] send the block through new head_block_sender - - [x] rewrite cannonical_block + - [x] rewrite cannonical_block to work as long as there are no forks + - [ ] rewrite cannonical_block (again) and related functions to handle forks + - [ ] todo!("do something with the old hash. we need to update a bunch more block numbers") + - [ ] todo!("handle equal") and also less and greater + - [x] "chain is forked" message is wrong. it includes nodes just being on different heights of the same chain. need a smarter check + - i think there is also a bug because i've seen "server not synced" a couple times - [x] bug around eth_getBlockByHash sometimes causes tokio to lock up - i keep a mapping of blocks so that i can go from hash -> block. it has some consistent hashing it does to split them up across multiple maps each with their own lock. so a lot of the time reads dont block writes because they are in different internal maps. this was fine. - but after changing my fork detection logic to use the same rules as erigon, i discovered that when you get blocks from a websocket subscription in erigon and geth, theres a missing field (https://github.com/ledgerwatch/erigon/issues/5190). so i added a query to get the block that includes the missing field. @@ -125,12 +131,9 @@ - but under heavy load, we hit their rate limits. need a "retry_until_success" function that goes to balanced_rpcs. or maybe store in redis the txids that we broadcast privately and use that to route. - [ ] write a function for receipts that tries balanced_rpcs and only on error of all balanced tries privates - [-] basic request method stats (using the user_id and other fields that are in the tracing frame) -- [ ] "chain is forked" message is wrong. it includes nodes just being on different heights of the same chain. need a smarter check - - i think there is also a bug because i've seen "server not synced" a couple times - - [x] i saw a fork of like 300 blocks. probably just because a node was restarted and had fallen behind. need some checks to ignore things that are far behind -- [ ] todo!("pick the block on the current consensus chain") - [ ] web3connection3.block(...) might wait forever. be sure to do it safely - [ ] search for all "todo!" +- [ ] replace all `.context("no servers in sync")` with proper error type ## V1 diff --git a/redis-rate-limit/Cargo.toml b/redis-rate-limit/Cargo.toml index a07f481b..399ead19 100644 --- a/redis-rate-limit/Cargo.toml +++ b/redis-rate-limit/Cargo.toml @@ -5,6 +5,6 @@ authors = ["Bryan Stitt "] edition = "2021" [dependencies] -anyhow = "1.0.62" +anyhow = "1.0.63" bb8-redis = "0.11.0" tracing = "0.1.36" diff --git a/redis-rate-limit/src/lib.rs b/redis-rate-limit/src/lib.rs index efdc754f..45dd3d43 100644 --- a/redis-rate-limit/src/lib.rs +++ b/redis-rate-limit/src/lib.rs @@ -84,9 +84,7 @@ impl RedisRateLimit { .await .context("increment rate limit")?; - let new_count = x - .first() - .ok_or_else(|| anyhow::anyhow!("check rate limit result"))?; + let new_count = x.first().context("check rate limit result")?; if new_count > &max_per_period { let seconds_left_in_period = self.period - (now % self.period); diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index d8153d67..444fe0d8 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -16,7 +16,7 @@ entities = { path = "../entities" } migration = { path = "../migration" } redis-rate-limit = { path = "../redis-rate-limit" } -anyhow = { version = "1.0.62", features = ["backtrace"] } +anyhow = { version = "1.0.63", features = ["backtrace"] } arc-swap = "1.5.1" argh = "0.1.8" axum = { version = "0.5.15", features = ["headers", "serde_json", "tokio-tungstenite", "ws"] } @@ -25,7 +25,7 @@ axum-client-ip = "0.2.0" axum-macros = "0.2.3" # TODO: turn tokio-console on with a feature. console-subscriber = { version = "0.1.7" } counter = "0.5.6" -dashmap = "5.3.4" +dashmap = "5.4.0" derive_more = "0.99.17" dotenv = "0.15.0" ethers = { version = "0.17.0", features = ["rustls", "ws"] } diff --git a/web3_proxy/src/app.rs b/web3_proxy/src/app.rs index 90e345b2..e4f493ff 100644 --- a/web3_proxy/src/app.rs +++ b/web3_proxy/src/app.rs @@ -597,7 +597,9 @@ impl Web3ProxyApp { self.balanced_rpcs.block_hash(min_block_needed).await? } else { // TODO: maybe this should be on the app and not on balanced_rpcs - self.balanced_rpcs.head_block_hash() + self.balanced_rpcs + .head_block_hash() + .context("no servers in sync")? }; // TODO: better key? benchmark this @@ -724,16 +726,15 @@ impl Web3ProxyApp { serde_json::Value::Array(vec![]) } "eth_blockNumber" => { - // TODO: emit stats - - let head_block_number = self.balanced_rpcs.head_block_num(); - - // TODO: technically, block 0 is okay. i guess we should be using an option - if head_block_number.as_u64() == 0 { - return Err(anyhow::anyhow!("no servers synced")); + match self.balanced_rpcs.head_block_num() { + Some(head_block_num) => { + json!(head_block_num) + } + None => { + // TODO: what does geth do if this happens? + return Err(anyhow::anyhow!("no servers synced")); + } } - - json!(head_block_number) } // TODO: eth_callBundle (https://docs.flashbots.net/flashbots-auction/searchers/advanced/rpc-endpoint#eth_callbundle) // TODO: eth_cancelPrivateTransaction (https://docs.flashbots.net/flashbots-auction/searchers/advanced/rpc-endpoint#eth_cancelprivatetransaction, but maybe just reject) @@ -807,7 +808,10 @@ impl Web3ProxyApp { method => { // emit stats - let head_block_number = self.balanced_rpcs.head_block_num(); + let head_block_number = self + .balanced_rpcs + .head_block_num() + .context("no servers synced")?; // we do this check before checking caches because it might modify the request params // TODO: add a stat for archive vs full since they should probably cost different diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index 8850bb54..7e0a3883 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -5,6 +5,7 @@ use super::transactions::TxStatus; use crate::{ config::BlockAndRpc, jsonrpc::JsonRpcRequest, rpcs::synced_connections::SyncedConnections, }; +use anyhow::Context; use dashmap::{ mapref::{entry::Entry, one::Ref}, DashMap, @@ -13,8 +14,9 @@ use derive_more::From; use ethers::prelude::{Block, TxHash, H256, U64}; use hashbrown::{HashMap, HashSet}; use petgraph::algo::all_simple_paths; +use serde::Serialize; use serde_json::json; -use std::sync::Arc; +use std::{cmp::Ordering, fmt::Display, sync::Arc}; use tokio::sync::{broadcast, watch}; use tracing::{debug, info, trace, warn}; @@ -23,27 +25,27 @@ pub type ArcBlock = Arc>; pub type BlockHashesMap = Arc>; /// A block's hash and number. -#[derive(Default, From)] +#[derive(Clone, Debug, Default, From, Serialize)] pub struct BlockId { pub(super) hash: H256, pub(super) num: U64, } +impl Display for BlockId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{} ({})", self.num, self.hash) + } +} + impl Web3Connections { /// add a block to our map and it's hash to our graphmap of the blockchain pub fn save_block(&self, block: &ArcBlock) -> anyhow::Result<()> { - let block_hash = block - .hash - .as_ref() - .ok_or_else(|| anyhow::anyhow!("no block hash"))?; - let block_num = block - .number - .as_ref() - .ok_or_else(|| anyhow::anyhow!("no block num"))?; + let block_hash = block.hash.as_ref().context("no block hash")?; + let block_num = block.number.as_ref().context("no block num")?; let _block_td = block .total_difficulty .as_ref() - .ok_or_else(|| anyhow::anyhow!("no block total difficulty"))?; + .context("no block total difficulty")?; if self.block_hashes.contains_key(block_hash) { // this block is already included. no need to continue @@ -68,10 +70,14 @@ impl Web3Connections { match self.block_numbers.entry(*block_num) { Entry::Occupied(mut x) => { - x.get_mut().push(*block_hash); + let old = x.insert(*block_hash); + + todo!( + "do something with the old hash. we need to update a bunch more block numbers" + ) } Entry::Vacant(x) => { - x.insert(vec![*block_hash]); + x.insert(*block_hash); } } @@ -157,30 +163,20 @@ impl Web3Connections { // first, try to get the hash from our cache if let Some(block_hash) = self.block_numbers.get(num) { - match block_hash.len() { - 0 => { - unimplemented!("block_numbers is broken") - } - 1 => { - let block_hash = block_hash.get(0).expect("length was checked"); + let block = self + .block_hashes + .get(&block_hash) + .expect("block_numbers gave us this hash"); - let block = self - .block_hashes - .get(block_hash) - .expect("block_numbers gave us this hash"); - - return Ok(block.clone()); - } - _ => { - // TODO: maybe the vec should be sorted by total difficulty. - todo!("pick the block on the current consensus chain") - } - } + return Ok(block.clone()); } // block not in cache. we need to ask an rpc for it // but before we do any queries, be sure the requested block num exists - let head_block_num = self.head_block_num(); + let head_block_num = self + .head_block_num() + .ok_or_else(|| anyhow::anyhow!("no servers in sync"))?; + if num > &head_block_num { // TODO: i'm seeing this a lot when using ethspam. i dont know why though. i thought we delayed publishing // TODO: instead of error, maybe just sleep and try again? @@ -252,16 +248,23 @@ impl Web3Connections { pending_tx_sender: &Option>, ) -> anyhow::Result<()> { // add the block to connection_heads - match (rpc_head_block.hash, rpc_head_block.number) { + let rpc_block_id = match (rpc_head_block.hash, rpc_head_block.number) { (Some(rpc_head_hash), Some(rpc_head_num)) => { if rpc_head_num == U64::zero() { debug!(%rpc, "still syncing"); connection_heads.remove(&rpc.name); + + None } else { connection_heads.insert(rpc.name.to_owned(), rpc_head_hash); self.save_block(&rpc_head_block)?; + + Some(BlockId { + hash: rpc_head_hash, + num: rpc_head_num, + }) } } _ => { @@ -270,14 +273,15 @@ impl Web3Connections { connection_heads.remove(&rpc.name); // don't return yet! self.synced_connections likely needs an update + None } - } + }; // iterate the rpc_map to find the highest_work_block let mut checked_heads = HashSet::new(); let mut highest_work_block: Option> = None; - for (_rpc_name, rpc_head_hash) in connection_heads.iter() { + for rpc_head_hash in connection_heads.values() { if checked_heads.contains(rpc_head_hash) { continue; } @@ -318,11 +322,11 @@ impl Web3Connections { }; // track names so we don't check the same node multiple times - let mut consensus_names: HashSet<&String> = HashSet::new(); + let mut heavy_names: HashSet<&String> = HashSet::new(); // track rpcs so we can build a new SyncedConnections - let mut consensus_rpcs: Vec<&Arc> = vec![]; + let mut heavy_rpcs: Vec<&Arc> = vec![]; // a running total of the soft limits covered by the rpcs - let mut consensus_sum_soft_limit: u32 = 0; + let mut heavy_sum_soft_limit: u32 = 0; // check the highest work block and its parents for a set of rpcs that can serve our request load // TODO: loop for how many parent blocks? we don't want to serve blocks that are too far behind @@ -331,7 +335,7 @@ impl Web3Connections { let highest_work_hash = highest_work_block.hash.as_ref().unwrap(); for (rpc_name, rpc_head_hash) in connection_heads.iter() { - if consensus_names.contains(rpc_name) { + if heavy_names.contains(rpc_name) { // this block is already included continue; } @@ -339,9 +343,9 @@ impl Web3Connections { // TODO: does all_simple_paths make this check? if rpc_head_hash == highest_work_hash { if let Some(rpc) = self.conns.get(rpc_name) { - consensus_names.insert(rpc_name); - consensus_rpcs.push(rpc); - consensus_sum_soft_limit += rpc.soft_limit; + heavy_names.insert(rpc_name); + heavy_rpcs.push(rpc); + heavy_sum_soft_limit += rpc.soft_limit; } continue; } @@ -362,21 +366,21 @@ impl Web3Connections { if is_connected { if let Some(rpc) = self.conns.get(rpc_name) { - consensus_rpcs.push(rpc); - consensus_sum_soft_limit += rpc.soft_limit; + heavy_rpcs.push(rpc); + heavy_sum_soft_limit += rpc.soft_limit; } } } // TODO: min_sum_soft_limit as a percentage of total_soft_limit? // let min_sum_soft_limit = total_soft_limit / self.min_sum_soft_limit; - if consensus_sum_soft_limit >= self.min_sum_soft_limit { + if heavy_sum_soft_limit >= self.min_sum_soft_limit { // success! this block has enough nodes on it break; } // else, we need to try the parent block - trace!(%consensus_sum_soft_limit, ?highest_work_hash, "avoiding thundering herd"); + trace!(%heavy_sum_soft_limit, ?highest_work_hash, "avoiding thundering herd"); // // TODO: this automatically queries for parents, but need to rearrange lifetimes to make an await work here // highest_work_block = self @@ -389,9 +393,9 @@ impl Web3Connections { None => { warn!( "ran out of parents to check. soft limit only {}/{}: {}%", - consensus_sum_soft_limit, + heavy_sum_soft_limit, self.min_sum_soft_limit, - consensus_sum_soft_limit * 100 / self.min_sum_soft_limit + heavy_sum_soft_limit * 100 / self.min_sum_soft_limit ); break; } @@ -403,29 +407,34 @@ impl Web3Connections { // unlock self.blockchain_graphmap drop(blockchain_guard); - let soft_limit_met = consensus_sum_soft_limit >= self.min_sum_soft_limit; - let num_synced_rpcs = consensus_rpcs.len() as u32; + let soft_limit_met = heavy_sum_soft_limit >= self.min_sum_soft_limit; + let num_synced_rpcs = heavy_rpcs.len() as u32; let new_synced_connections = if soft_limit_met { - // we have a consensus large enough to serve traffic + // we have a heavy large enough to serve traffic let head_block_hash = highest_work_block.hash.unwrap(); let head_block_num = highest_work_block.number.unwrap(); if num_synced_rpcs < self.min_synced_rpcs { - trace!(hash=%head_block_hash, num=?head_block_num, "not enough rpcs are synced to advance"); + // TODO: warn is too loud. if we are first starting, this is expected to happen + warn!(hash=%head_block_hash, num=?head_block_num, "not enough rpcs are synced to advance"); - return Ok(()); + SyncedConnections::default() } else { // TODO: wait until at least most of the rpcs have given their initial block? // otherwise, if there is a syncing node that is fast, our first head block might not be good // TODO: have a configurable "minimum rpcs" number that we can set // TODO: sort by weight and soft limit? do we need an IndexSet, or is a Vec fine? - let conns = consensus_rpcs.into_iter().cloned().collect(); + let conns = heavy_rpcs.into_iter().cloned().collect(); + + let head_block_id = BlockId { + hash: head_block_hash, + num: head_block_num, + }; SyncedConnections { - head_block_num, - head_block_hash, + head_block_id: Some(head_block_id), conns, } } @@ -438,36 +447,71 @@ impl Web3Connections { SyncedConnections::default() }; - let consensus_block_hash = new_synced_connections.head_block_hash; - let consensus_block_num = new_synced_connections.head_block_num; + let heavy_block_id = new_synced_connections.head_block_id.clone(); let new_synced_connections = Arc::new(new_synced_connections); let num_connection_heads = connection_heads.len(); - let total_rpcs = self.conns.len(); + let total_conns = self.conns.len(); let old_synced_connections = self.synced_connections.swap(new_synced_connections); - let old_head_hash = old_synced_connections.head_block_hash; - - if rpc_head_block.hash.is_some() && Some(consensus_block_hash) != rpc_head_block.hash { - info!(new=%rpc_head_block.hash.unwrap(), new_num=?rpc_head_block.number.unwrap(), consensus=%consensus_block_hash, num=%consensus_block_num, %rpc, "non consensus head"); - // TODO: anything else to do? maybe warn if these blocks are very far apart or forked for an extended period of time - // TODO: if there is any non-consensus head log how many nodes are on it + match (&old_synced_connections.head_block_id, &heavy_block_id) { + (None, None) => warn!("no servers synced"), + (None, Some(heavy_block_id)) => { + debug!(block=%heavy_block_id, %rpc, "first consensus head"); + } + (Some(_), None) => warn!("no longer synced!"), + (Some(old_block_id), Some(heavy_block_id)) => { + match heavy_block_id.num.cmp(&old_block_id.num) { + Ordering::Equal => { + todo!("handle equal") + } + Ordering::Less => { + todo!("handle less") + } + Ordering::Greater => { + todo!("handle greater") + } + } + } } + /* + if old_synced_connections.head_block_id.is_none() && rpc_head_block.hash.is_some() { + // this is fine. we have our first hash + } else if rpc_head_block.hash.is_some() + && old_synced_connections.head_block_id.is_some() + && old_synced_connections + .head_block_id + .as_ref() + .map_ok(|x| x.num) + != rpc_head_block.hash + { + info!(new=%rpc_head_block.hash.unwrap(), new_num=?rpc_head_block.number.unwrap(), heavy=?heavy_block_id, %rpc, "non heavy head"); + // TODO: anything else to do? maybe warn if these blocks are very far apart or forked for an extended period of time + // TODO: if there is any non-heavy head log how many nodes are on it + } */ - if consensus_block_hash == old_head_hash { - debug!(hash=%consensus_block_hash, num=%consensus_block_num, limit=%consensus_sum_soft_limit, %rpc, "cur consensus head {}/{}/{}", num_synced_rpcs, num_connection_heads, total_rpcs); + /* + if heavy_block_num == U64::zero { + warn!(?soft_limit_met, %heavy_block_hash, %old_head_hash, %rpc, "NO heavy head {}/{}/{}", num_synced_rpcs, num_connection_heads, total_rpcs) + } else if heavy_block_hash == old_head_hash { + debug!(hash=%heavy_block_hash, num=%heavy_block_num, limit=%heavy_sum_soft_limit, %rpc, "cur heavy head {}/{}/{}", num_synced_rpcs, num_connection_heads, total_rpcs); } else if soft_limit_met { // TODO: if new's parent is not old, warn? - debug!(hash=%consensus_block_hash, num=%consensus_block_num, limit=%consensus_sum_soft_limit, %rpc, "NEW consensus head {}/{}/{}", num_synced_rpcs, num_connection_heads, total_rpcs); + debug!(hash=%heavy_block_hash, num=%heavy_block_num, limit=%heavy_sum_soft_limit, %rpc, "NEW heavy head {}/{}/{}", num_synced_rpcs, num_connection_heads, total_rpcs); // the head hash changed. forward to any subscribers head_block_sender.send(highest_work_block)?; // TODO: do something with pending_tx_sender } else { - warn!(?soft_limit_met, %consensus_block_hash, %old_head_hash, %rpc, "NO consensus head {}/{}/{}", num_synced_rpcs, num_connection_heads, total_rpcs) + // TODO: i don't think we can get here + warn!(?soft_limit_met, %heavy_block_id, %old_head_hash, %rpc, "NO heavy head {}/{}/{}", num_synced_rpcs, num_connection_heads, total_rpcs) } + */ + + // TODO: the head hash changed. forward to any subscribers + head_block_sender.send(highest_work_block)?; Ok(()) } diff --git a/web3_proxy/src/rpcs/connections.rs b/web3_proxy/src/rpcs/connections.rs index 221991f0..2d4ef3ec 100644 --- a/web3_proxy/src/rpcs/connections.rs +++ b/web3_proxy/src/rpcs/connections.rs @@ -34,14 +34,15 @@ use tracing::{error, info, instrument, trace, warn}; /// A collection of web3 connections. Sends requests either the current best server or all servers. #[derive(From)] pub struct Web3Connections { - pub(super) conns: HashMap>, + pub(super) inner: HashMap>, /// any requests will be forwarded to one (or more) of these connections pub(super) synced_connections: ArcSwap, pub(super) pending_transactions: Arc>, /// TODO: this map is going to grow forever unless we do some sort of pruning. maybe store pruned in redis? /// all blocks, including orphans pub(super) block_hashes: BlockHashesMap, - pub(super) block_numbers: DashMap>, + /// blocks on the heaviest chain + pub(super) block_numbers: DashMap, /// TODO: this map is going to grow forever unless we do some sort of pruning. maybe store pruned in redis? /// TODO: what should we use for edges? pub(super) blockchain_graphmap: RwLock>, @@ -168,7 +169,7 @@ impl Web3Connections { let synced_connections = SyncedConnections::default(); let connections = Arc::new(Self { - conns: connections, + inner: connections, synced_connections: ArcSwap::new(Arc::new(synced_connections)), pending_transactions, block_hashes: Default::default(), @@ -198,7 +199,7 @@ impl Web3Connections { } pub fn get(&self, conn_name: &str) -> Option<&Arc> { - self.conns.get(conn_name) + self.inner.get(conn_name) } /// subscribe to blocks and transactions from all the backend rpcs. @@ -349,7 +350,7 @@ impl Web3Connections { // TODO: we are going to be checking "has_block_data" a lot now. i think we pretty much always have min_block_needed now that we override "latest" let mut synced_rpcs: Vec> = if let Some(min_block_needed) = min_block_needed { - self.conns + self.inner .values() .filter(|x| !skip.contains(x)) .filter(|x| x.has_block_data(min_block_needed)) @@ -435,7 +436,7 @@ impl Web3Connections { // TODO: with capacity? let mut selected_rpcs = vec![]; - for connection in self.conns.values() { + for connection in self.inner.values() { if let Some(min_block_needed) = min_block_needed { if !connection.has_block_data(min_block_needed) { continue; @@ -476,7 +477,7 @@ impl Web3Connections { // TODO: maximum retries? loop { - if skip_rpcs.len() == self.conns.len() { + if skip_rpcs.len() == self.inner.len() { break; } match self @@ -623,7 +624,7 @@ impl fmt::Debug for Web3Connections { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { // TODO: the default formatter takes forever to write. this is too quiet though f.debug_struct("Web3Connections") - .field("conns", &self.conns) + .field("conns", &self.inner) .finish_non_exhaustive() } } @@ -633,7 +634,7 @@ impl Serialize for Web3Connections { where S: Serializer, { - let conns: Vec<&Web3Connection> = self.conns.iter().map(|x| x.1.as_ref()).collect(); + let conns: Vec<&Web3Connection> = self.inner.iter().map(|x| x.1.as_ref()).collect(); let mut state = serializer.serialize_struct("Web3Connections", 2)?; state.serialize_field("conns", &conns)?; diff --git a/web3_proxy/src/rpcs/synced_connections.rs b/web3_proxy/src/rpcs/synced_connections.rs index 7dc61be7..a4d5d165 100644 --- a/web3_proxy/src/rpcs/synced_connections.rs +++ b/web3_proxy/src/rpcs/synced_connections.rs @@ -1,3 +1,4 @@ +use super::blockchain::BlockId; use super::connection::Web3Connection; use super::connections::Web3Connections; use ethers::prelude::{H256, U64}; @@ -11,8 +12,7 @@ use std::sync::Arc; #[derive(Clone, Default, Serialize)] pub struct SyncedConnections { // TODO: store ArcBlock instead? - pub(super) head_block_num: U64, - pub(super) head_block_hash: H256, + pub(super) head_block_id: Option, // TODO: this should be able to serialize, but it isn't #[serde(skip_serializing)] pub(super) conns: IndexSet>, @@ -23,29 +23,31 @@ impl fmt::Debug for SyncedConnections { // TODO: the default formatter takes forever to write. this is too quiet though // TODO: print the actual conns? f.debug_struct("SyncedConnections") - .field("head_num", &self.head_block_num) - .field("head_hash", &self.head_block_hash) + .field("head_block_id", &self.head_block_id) .field("num_conns", &self.conns.len()) .finish_non_exhaustive() } } impl Web3Connections { - pub fn head_block(&self) -> (U64, H256) { - let synced_connections = self.synced_connections.load(); - - ( - synced_connections.head_block_num, - synced_connections.head_block_hash, - ) + pub fn head_block_id(&self) -> Option { + self.synced_connections.load().head_block_id.clone() } - pub fn head_block_hash(&self) -> H256 { - self.synced_connections.load().head_block_hash + pub fn head_block_hash(&self) -> Option { + self.synced_connections + .load() + .head_block_id + .as_ref() + .map(|head_block_id| head_block_id.hash) } - pub fn head_block_num(&self) -> U64 { - self.synced_connections.load().head_block_num + pub fn head_block_num(&self) -> Option { + self.synced_connections + .load() + .head_block_id + .as_ref() + .map(|head_block_id| head_block_id.num) } pub fn synced(&self) -> bool {