From 2606844c61e692f92a71750e4273438a5abfc602 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Fri, 2 Sep 2022 05:40:56 +0000 Subject: [PATCH] almost done with heaviest chain tracking --- Cargo.lock | 4 +- web3_proxy/Cargo.toml | 2 +- web3_proxy/src/app.rs | 26 +-- web3_proxy/src/bin/web3_proxy.rs | 2 - web3_proxy/src/config.rs | 4 +- web3_proxy/src/frontend/users.rs | 4 +- web3_proxy/src/rpcs/blockchain.rs | 352 +++++++++++++++++------------ web3_proxy/src/rpcs/connections.rs | 20 +- 8 files changed, 235 insertions(+), 179 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cee5c7be..05ef9d8a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4403,9 +4403,9 @@ checksum = "7bd3e3206899af3f8b12af284fafc038cc1dc2b41d1b89dd17297221c5d225de" [[package]] name = "siwe" -version = "0.4.1" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a35b706108fb328661325c3b6882f6b1d62da47d533da0b87fca36ac769877db" +checksum = "f24fe2b646c33a670e7d79a232bffb41821fed28b1870a8bd1a47e6ae686ace6" dependencies = [ "hex", "http", diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index 444fe0d8..8d6b33e7 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -48,7 +48,7 @@ regex = "1.6.0" reqwest = { version = "0.11.11", default-features = false, features = ["json", "tokio-rustls"] } handlebars = "4.3.3" rustc-hash = "1.1.0" -siwe = "0.4.1" +siwe = "0.4.2" sea-orm = { version = "0.9.2", features = ["macros"] } serde = { version = "1.0.144", features = [] } serde_json = { version = "1.0.85", default-features = false, features = ["alloc", "raw_value"] } diff --git a/web3_proxy/src/app.rs b/web3_proxy/src/app.rs index e4f493ff..67cb1742 100644 --- a/web3_proxy/src/app.rs +++ b/web3_proxy/src/app.rs @@ -146,18 +146,6 @@ pub async fn get_migrated_db( } impl Web3ProxyApp { - pub async fn redis_conn(&self) -> anyhow::Result> { - match self.redis_pool.as_ref() { - None => Err(anyhow::anyhow!("no redis server configured")), - Some(redis_pool) => { - let redis_conn = redis_pool.get().await?; - - Ok(redis_conn) - } - } - } - - // TODO: should we just take the rpc config as the only arg instead? pub async fn spawn( app_stats: AppStats, top_config: TopConfig, @@ -553,7 +541,6 @@ impl Web3ProxyApp { Ok(response) } - // #[instrument(skip_all)] async fn proxy_web3_rpc_requests( &self, requests: Vec, @@ -580,6 +567,17 @@ impl Web3ProxyApp { Ok(collected) } + pub async fn redis_conn(&self) -> anyhow::Result> { + match self.redis_pool.as_ref() { + None => Err(anyhow::anyhow!("no redis server configured")), + Some(redis_pool) => { + let redis_conn = redis_pool.get().await?; + + Ok(redis_conn) + } + } + } + async fn cached_response( &self, // TODO: accept a block hash here also? @@ -620,13 +618,11 @@ impl Web3ProxyApp { trace!(?request.method, "cache miss!"); } - // TODO: multiple caches. if head_block_hash is None, have a persistent cache (disk backed?) let cache = &self.response_cache; Ok((key, Err(cache))) } - // #[instrument(skip_all)] async fn proxy_web3_rpc_request( &self, mut request: JsonRpcRequest, diff --git a/web3_proxy/src/bin/web3_proxy.rs b/web3_proxy/src/bin/web3_proxy.rs index 03c4d2c2..2d2b17e3 100644 --- a/web3_proxy/src/bin/web3_proxy.rs +++ b/web3_proxy/src/bin/web3_proxy.rs @@ -65,8 +65,6 @@ fn run( // start tokio's async runtime let rt = rt_builder.build()?; - // we use this worker count to also set our redis connection pool size - // TODO: think about this more let num_workers = rt.metrics().num_workers(); debug!(?num_workers); diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index afad3c26..f8c5f260 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -51,7 +51,7 @@ pub struct AppConfig { #[serde(default = "default_min_sum_soft_limit")] pub min_sum_soft_limit: u32, #[serde(default = "default_min_synced_rpcs")] - pub min_synced_rpcs: u32, + pub min_synced_rpcs: usize, pub redis_url: Option, #[serde(default = "default_public_rate_limit_per_minute")] pub public_rate_limit_per_minute: u64, @@ -72,7 +72,7 @@ fn default_min_sum_soft_limit() -> u32 { 1 } -fn default_min_synced_rpcs() -> u32 { +fn default_min_synced_rpcs() -> usize { 1 } diff --git a/web3_proxy/src/frontend/users.rs b/web3_proxy/src/frontend/users.rs index 2e1f9ebf..4caa06a2 100644 --- a/web3_proxy/src/frontend/users.rs +++ b/web3_proxy/src/frontend/users.rs @@ -85,7 +85,7 @@ pub async fn get_login( // TODO: if no redis server, store in local cache? // the address isn't enough. we need to save the actual message so we can read the nonce - // TODO: what message format is the most efficient to store in redis? probably eip191_string + // TODO: what message format is the most efficient to store in redis? probably eip191_bytes // we add 1 to expire_seconds just to be sure redis has the key for the full expiration_time app.redis_conn() .await? @@ -100,7 +100,7 @@ pub async fn get_login( let message: String = match message_eip.as_str() { "eip4361" => message.to_string(), // https://github.com/spruceid/siwe/issues/98 - "eip191_string" => Bytes::from(message.eip191_string().unwrap()).to_string(), + "eip191_bytes" => Bytes::from(message.eip191_bytes().unwrap()).to_string(), "eip191_hash" => Bytes::from(&message.eip191_hash().unwrap()).to_string(), _ => return Err(anyhow::anyhow!("invalid message eip given").into()), }; diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index 7e0a3883..e1652c72 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -13,7 +13,6 @@ use dashmap::{ use derive_more::From; use ethers::prelude::{Block, TxHash, H256, U64}; use hashbrown::{HashMap, HashSet}; -use petgraph::algo::all_simple_paths; use serde::Serialize; use serde_json::json; use std::{cmp::Ordering, fmt::Display, sync::Arc}; @@ -39,7 +38,7 @@ impl Display for BlockId { impl Web3Connections { /// add a block to our map and it's hash to our graphmap of the blockchain - pub fn save_block(&self, block: &ArcBlock) -> anyhow::Result<()> { + pub fn save_block(&self, block: &ArcBlock, heaviest_chain: bool) -> anyhow::Result<()> { let block_hash = block.hash.as_ref().context("no block hash")?; let block_num = block.number.as_ref().context("no block num")?; let _block_td = block @@ -47,6 +46,22 @@ impl Web3Connections { .as_ref() .context("no block total difficulty")?; + if heaviest_chain { + match self.block_numbers.entry(*block_num) { + Entry::Occupied(mut x) => { + let old = x.insert(*block_hash); + + // TODO: what should we do? + warn!( + "do something with the old hash. we may need to update a bunch more block numbers" + ) + } + Entry::Vacant(x) => { + x.insert(*block_hash); + } + } + } + if self.block_hashes.contains_key(block_hash) { // this block is already included. no need to continue return Ok(()); @@ -68,19 +83,6 @@ impl Web3Connections { return Ok(()); } - match self.block_numbers.entry(*block_num) { - Entry::Occupied(mut x) => { - let old = x.insert(*block_hash); - - todo!( - "do something with the old hash. we need to update a bunch more block numbers" - ) - } - Entry::Vacant(x) => { - x.insert(*block_hash); - } - } - // TODO: prettier log? or probably move the log somewhere else trace!(%block_hash, "new block"); @@ -110,13 +112,7 @@ impl Web3Connections { } // block not in cache. we need to ask an rpc for it - - // TODO: helper for method+params => JsonRpcRequest - // TODO: get block with the transactions? - // TODO: does this id matter? - let request_params = (hash, false); - // TODO: if error, retry? let block: Block = match rpc { Some(rpc) => { @@ -126,6 +122,8 @@ impl Web3Connections { .await? } None => { + // TODO: helper for method+params => JsonRpcRequest + // TODO: does this id matter? let request = json!({ "id": "1", "method": "eth_getBlockByHash", "params": request_params }); let request: JsonRpcRequest = serde_json::from_value(request)?; @@ -141,7 +139,10 @@ impl Web3Connections { let block = Arc::new(block); // the block was fetched using eth_getBlockByHash, so it should have all fields - self.save_block(&block)?; + // TODO: how should we set this? all_simple_paths on the map? + let heaviest_chain = false; + + self.save_block(&block, heaviest_chain)?; Ok(block) } @@ -202,8 +203,10 @@ impl Web3Connections { let block = Arc::new(block); - // the block was fetched using eth_getBlockByNumber, so it should have all fields - self.save_block(&block)?; + // the block was fetched using eth_getBlockByNumber, so it should have all fields and be on the heaviest chain + let heaviest_chain = true; + + self.save_block(&block, heaviest_chain)?; Ok(block) } @@ -259,7 +262,8 @@ impl Web3Connections { } else { connection_heads.insert(rpc.name.to_owned(), rpc_head_hash); - self.save_block(&rpc_head_block)?; + // we don't know if its on the heaviest chain yet + self.save_block(&rpc_head_block, false)?; Some(BlockId { hash: rpc_head_hash, @@ -277,27 +281,27 @@ impl Web3Connections { } }; - // iterate the rpc_map to find the highest_work_block + // iterate the known heads to find the highest_work_block let mut checked_heads = HashSet::new(); let mut highest_work_block: Option> = None; - for rpc_head_hash in connection_heads.values() { if checked_heads.contains(rpc_head_hash) { + // we already checked this head from another rpc continue; } - + // don't check the same hash multiple times checked_heads.insert(rpc_head_hash); let rpc_head_block = self.block_hashes.get(rpc_head_hash).unwrap(); match &rpc_head_block.total_difficulty { None => { - // no total difficulty - // TODO: should we fetch the block here? I think this shouldn't happen - warn!(?rpc, %rpc_head_hash, "block is missing total difficulty"); - continue; + // no total difficulty. this is a bug + unimplemented!("block is missing total difficulty"); } Some(td) => { + // if this is the first block we've tried + // or if this rpc's newest block has a higher total difficulty if highest_work_block.is_none() || td > highest_work_block @@ -313,100 +317,149 @@ impl Web3Connections { } } - // clone to release the read lock - let highest_work_block = highest_work_block.map(|x| x.clone()); + // clone to release the read lock on self.block_hashes + if let Some(mut maybe_head_block) = highest_work_block.map(|x| x.clone()) { + // track rpcs on this heaviest chain so we can build a new SyncedConnections + let mut heavy_rpcs: Vec<&Arc> = vec![]; + // a running total of the soft limits covered by the heavy rpcs + let mut heavy_sum_soft_limit: u32 = 0; + // TODO: also track heavy_sum_hard_limit? - let mut highest_work_block = match highest_work_block { - None => todo!("no servers are in sync"), - Some(highest_work_block) => highest_work_block, - }; + // check the highest work block for a set of rpcs that can serve our request load + // if it doesn't have enough rpcs for our request load, check the parent block + // TODO: loop for how many parent blocks? we don't want to serve blocks that are too far behind. probably different per chain + // TODO: this loop is pretty long. any way to clean up this code? + for _ in 0..3 { + let maybe_head_hash = maybe_head_block + .hash + .as_ref() + .expect("blocks here always need hashes"); - // track names so we don't check the same node multiple times - let mut heavy_names: HashSet<&String> = HashSet::new(); - // track rpcs so we can build a new SyncedConnections - let mut heavy_rpcs: Vec<&Arc> = vec![]; - // a running total of the soft limits covered by the rpcs - let mut heavy_sum_soft_limit: u32 = 0; + // find all rpcs with maybe_head_block as their current head + for (conn_name, conn_head_hash) in connection_heads.iter() { + if conn_head_hash != maybe_head_hash { + continue; + } - // check the highest work block and its parents for a set of rpcs that can serve our request load - // TODO: loop for how many parent blocks? we don't want to serve blocks that are too far behind - let blockchain_guard = self.blockchain_graphmap.read(); - for _ in 0..3 { - let highest_work_hash = highest_work_block.hash.as_ref().unwrap(); - - for (rpc_name, rpc_head_hash) in connection_heads.iter() { - if heavy_names.contains(rpc_name) { - // this block is already included - continue; - } - - // TODO: does all_simple_paths make this check? - if rpc_head_hash == highest_work_hash { - if let Some(rpc) = self.conns.get(rpc_name) { - heavy_names.insert(rpc_name); - heavy_rpcs.push(rpc); - heavy_sum_soft_limit += rpc.soft_limit; - } - continue; - } - - // TODO: cache all_simple_paths. there should be a high hit rate - // TODO: use an algo that saves scratch space? - // TODO: how slow is this? - let is_connected = all_simple_paths::, _>( - &*blockchain_guard, - *highest_work_hash, - *rpc_head_hash, - 0, - // TODO: what should this max be? probably configurable per chain - Some(10), - ) - .next() - .is_some(); - - if is_connected { - if let Some(rpc) = self.conns.get(rpc_name) { + if let Some(rpc) = self.conns.get(conn_name) { heavy_rpcs.push(rpc); heavy_sum_soft_limit += rpc.soft_limit; + } else { + warn!("connection missing") } } - } - // TODO: min_sum_soft_limit as a percentage of total_soft_limit? - // let min_sum_soft_limit = total_soft_limit / self.min_sum_soft_limit; - if heavy_sum_soft_limit >= self.min_sum_soft_limit { - // success! this block has enough nodes on it - break; - } - // else, we need to try the parent block + if heavy_sum_soft_limit < self.min_sum_soft_limit + || heavy_rpcs.len() < self.min_synced_rpcs + { + // not enough rpcs yet. check the parent + if let Some(parent_block) = self.block_hashes.get(&maybe_head_block.parent_hash) + { + trace!( + child=%maybe_head_hash, parent=%parent_block.hash.unwrap(), "avoiding thundering herd", + ); - trace!(%heavy_sum_soft_limit, ?highest_work_hash, "avoiding thundering herd"); - - // // TODO: this automatically queries for parents, but need to rearrange lifetimes to make an await work here - // highest_work_block = self - // .block(&highest_work_block.parent_hash, Some(&rpc)) - // .await?; - // we give known stale data just because we don't have enough capacity to serve the latest. - // TODO: maybe we should delay serving requests if this happens. - // TODO: don't unwrap. break if none? - match self.block_hashes.get(&highest_work_block.parent_hash) { - None => { - warn!( - "ran out of parents to check. soft limit only {}/{}: {}%", - heavy_sum_soft_limit, - self.min_sum_soft_limit, - heavy_sum_soft_limit * 100 / self.min_sum_soft_limit - ); - break; + maybe_head_block = parent_block.clone(); + continue; + } else { + warn!( + "no parent to check. soft limit only {}/{} from {}/{} rpcs: {}%", + heavy_sum_soft_limit, + self.min_sum_soft_limit, + heavy_rpcs.len(), + self.min_synced_rpcs, + heavy_sum_soft_limit * 100 / self.min_sum_soft_limit + ); + break; + } } - Some(parent_block) => { - highest_work_block = parent_block.clone(); + + // success! this block has enough soft limit and nodes on it (or on later blocks) + let conns = heavy_rpcs.into_iter().cloned().collect(); + + let heavy_block = maybe_head_block; + + let heavy_hash = heavy_block.hash.expect("head blocks always have hashes"); + let heavy_num = heavy_block.number.expect("head blocks always have numbers"); + + debug_assert_ne!(heavy_num, U64::zero()); + + let heavy_block_id = BlockId { + hash: heavy_hash, + num: heavy_num, + }; + + let new_synced_connections = SyncedConnections { + head_block_id: Some(heavy_block_id.clone()), + conns, + }; + + let old_synced_connections = self + .synced_connections + .swap(Arc::new(new_synced_connections)); + + let num_connection_heads = connection_heads.len(); + let total_conns = self.conns.len(); + + // TODO: if the rpc_head_block != heavy, log something somewhere in here + match &old_synced_connections.head_block_id { + None => { + debug!(block=%heavy_block_id, %rpc, "first consensus head"); + head_block_sender.send(heavy_block)?; + } + Some(old_block_id) => { + match heavy_block_id.num.cmp(&old_block_id.num) { + Ordering::Equal => { + // multiple blocks with the same fork! + if heavy_block_id.hash == old_block_id.hash { + // no change in hash. no need to use head_block_sender + debug!(heavy=%heavy_block_id, %rpc, "consensus head block") + } else { + // hash changed + // TODO: better log + warn!(heavy=%heavy_block_id, %rpc, "fork detected"); + + // todo!("handle equal by updating the cannonical chain"); + + head_block_sender.send(heavy_block)?; + } + } + Ordering::Less => { + // this is unlikely but possible + // TODO: better log + debug!("chain rolled back"); + // todo!("handle less by removing higher blocks from the cannonical chain"); + head_block_sender.send(heavy_block)?; + } + Ordering::Greater => { + debug!(heavy=%heavy_block_id, %rpc, "new head block"); + + // todo!("handle greater by adding this block to and any missing parents to the cannonical chain"); + + head_block_sender.send(heavy_block)?; + } + } + } } + + return Ok(()); } + + // if we get here, something is wrong. clear synced connections + let empty_synced_connections = SyncedConnections::default(); + + let old_synced_connections = self + .synced_connections + .swap(Arc::new(empty_synced_connections)); + + // TODO: log different things depending on old_synced_connections } - // unlock self.blockchain_graphmap - drop(blockchain_guard); + return Ok(()); + + todo!("double check everything under this"); + + /* let soft_limit_met = heavy_sum_soft_limit >= self.min_sum_soft_limit; let num_synced_rpcs = heavy_rpcs.len() as u32; @@ -419,11 +472,10 @@ impl Web3Connections { // TODO: warn is too loud. if we are first starting, this is expected to happen warn!(hash=%head_block_hash, num=?head_block_num, "not enough rpcs are synced to advance"); - SyncedConnections::default() + None } else { // TODO: wait until at least most of the rpcs have given their initial block? // otherwise, if there is a syncing node that is fast, our first head block might not be good - // TODO: have a configurable "minimum rpcs" number that we can set // TODO: sort by weight and soft limit? do we need an IndexSet, or is a Vec fine? let conns = heavy_rpcs.into_iter().cloned().collect(); @@ -433,47 +485,62 @@ impl Web3Connections { num: head_block_num, }; - SyncedConnections { + let new_synced_connections = SyncedConnections { head_block_id: Some(head_block_id), conns, - } + }; + + Some(new_synced_connections) } } else { // failure even after checking parent heads! // not enough servers are in sync to server traffic // TODO: at startup this is fine, but later its a problem - warn!("empty SyncedConnections"); - - SyncedConnections::default() + None }; - let heavy_block_id = new_synced_connections.head_block_id.clone(); - let new_synced_connections = Arc::new(new_synced_connections); - let num_connection_heads = connection_heads.len(); - let total_conns = self.conns.len(); + if let Some(new_synced_connections) = new_synced_connections { + let heavy_block_id = new_synced_connections.head_block_id.clone(); - let old_synced_connections = self.synced_connections.swap(new_synced_connections); + let new_synced_connections = Arc::new(new_synced_connections); - match (&old_synced_connections.head_block_id, &heavy_block_id) { - (None, None) => warn!("no servers synced"), - (None, Some(heavy_block_id)) => { - debug!(block=%heavy_block_id, %rpc, "first consensus head"); - } - (Some(_), None) => warn!("no longer synced!"), - (Some(old_block_id), Some(heavy_block_id)) => { - match heavy_block_id.num.cmp(&old_block_id.num) { - Ordering::Equal => { - todo!("handle equal") - } - Ordering::Less => { - todo!("handle less") - } - Ordering::Greater => { - todo!("handle greater") + let old_synced_connections = self.synced_connections.swap(new_synced_connections); + + let num_connection_heads = connection_heads.len(); + let total_conns = self.conns.len(); + + match (&old_synced_connections.head_block_id, &heavy_block_id) { + (None, None) => warn!("no servers synced"), + (None, Some(heavy_block_id)) => { + debug!(block=%heavy_block_id, %rpc, "first consensus head"); + } + (Some(_), None) => warn!("no longer synced!"), + (Some(old_block_id), Some(heavy_block_id)) => { + debug_assert_ne!(heavy_block_id.num, U64::zero()); + + match heavy_block_id.num.cmp(&old_block_id.num) { + Ordering::Equal => { + // multiple blocks with the same fork! + debug!("fork detected"); + todo!("handle equal"); + } + Ordering::Less => { + // this seems unlikely + warn!("chain rolled back"); + todo!("handle less"); + } + Ordering::Greater => { + info!(heavy=%heavy_block_id, %rpc, "new head block"); + + todo!("handle greater"); + } } } } + } else { + todo!() } + */ /* if old_synced_connections.head_block_id.is_none() && rpc_head_block.hash.is_some() { // this is fine. we have our first hash @@ -509,10 +576,5 @@ impl Web3Connections { warn!(?soft_limit_met, %heavy_block_id, %old_head_hash, %rpc, "NO heavy head {}/{}/{}", num_synced_rpcs, num_connection_heads, total_rpcs) } */ - - // TODO: the head hash changed. forward to any subscribers - head_block_sender.send(highest_work_block)?; - - Ok(()) } } diff --git a/web3_proxy/src/rpcs/connections.rs b/web3_proxy/src/rpcs/connections.rs index 2d4ef3ec..f063fa8b 100644 --- a/web3_proxy/src/rpcs/connections.rs +++ b/web3_proxy/src/rpcs/connections.rs @@ -34,7 +34,7 @@ use tracing::{error, info, instrument, trace, warn}; /// A collection of web3 connections. Sends requests either the current best server or all servers. #[derive(From)] pub struct Web3Connections { - pub(super) inner: HashMap>, + pub(super) conns: HashMap>, /// any requests will be forwarded to one (or more) of these connections pub(super) synced_connections: ArcSwap, pub(super) pending_transactions: Arc>, @@ -46,7 +46,7 @@ pub struct Web3Connections { /// TODO: this map is going to grow forever unless we do some sort of pruning. maybe store pruned in redis? /// TODO: what should we use for edges? pub(super) blockchain_graphmap: RwLock>, - pub(super) min_synced_rpcs: u32, + pub(super) min_synced_rpcs: usize, pub(super) min_sum_soft_limit: u32, } @@ -61,7 +61,7 @@ impl Web3Connections { block_map: BlockHashesMap, head_block_sender: Option>, min_sum_soft_limit: u32, - min_synced_rpcs: u32, + min_synced_rpcs: usize, pending_tx_sender: Option>, pending_transactions: Arc>, ) -> anyhow::Result<(Arc, AnyhowJoinHandle<()>)> { @@ -169,7 +169,7 @@ impl Web3Connections { let synced_connections = SyncedConnections::default(); let connections = Arc::new(Self { - inner: connections, + conns: connections, synced_connections: ArcSwap::new(Arc::new(synced_connections)), pending_transactions, block_hashes: Default::default(), @@ -199,7 +199,7 @@ impl Web3Connections { } pub fn get(&self, conn_name: &str) -> Option<&Arc> { - self.inner.get(conn_name) + self.conns.get(conn_name) } /// subscribe to blocks and transactions from all the backend rpcs. @@ -350,7 +350,7 @@ impl Web3Connections { // TODO: we are going to be checking "has_block_data" a lot now. i think we pretty much always have min_block_needed now that we override "latest" let mut synced_rpcs: Vec> = if let Some(min_block_needed) = min_block_needed { - self.inner + self.conns .values() .filter(|x| !skip.contains(x)) .filter(|x| x.has_block_data(min_block_needed)) @@ -436,7 +436,7 @@ impl Web3Connections { // TODO: with capacity? let mut selected_rpcs = vec![]; - for connection in self.inner.values() { + for connection in self.conns.values() { if let Some(min_block_needed) = min_block_needed { if !connection.has_block_data(min_block_needed) { continue; @@ -477,7 +477,7 @@ impl Web3Connections { // TODO: maximum retries? loop { - if skip_rpcs.len() == self.inner.len() { + if skip_rpcs.len() == self.conns.len() { break; } match self @@ -624,7 +624,7 @@ impl fmt::Debug for Web3Connections { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { // TODO: the default formatter takes forever to write. this is too quiet though f.debug_struct("Web3Connections") - .field("conns", &self.inner) + .field("conns", &self.conns) .finish_non_exhaustive() } } @@ -634,7 +634,7 @@ impl Serialize for Web3Connections { where S: Serializer, { - let conns: Vec<&Web3Connection> = self.inner.iter().map(|x| x.1.as_ref()).collect(); + let conns: Vec<&Web3Connection> = self.conns.iter().map(|x| x.1.as_ref()).collect(); let mut state = serializer.serialize_struct("Web3Connections", 2)?; state.serialize_field("conns", &conns)?;