From e53030e0532b3215b75447905a91b2064e20466b Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Thu, 19 Jan 2023 18:30:30 -0800 Subject: [PATCH] move backup indicator --- web3_proxy/src/rpcs/blockchain.rs | 30 +++++------ web3_proxy/src/rpcs/connections.rs | 82 +++++++++++++++++------------- 2 files changed, 61 insertions(+), 51 deletions(-) diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index 2be784a6..8b8cbce7 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -331,20 +331,20 @@ impl Web3Connections { .synced_connections .swap(Arc::new(new_synced_connections)); - let includes_backups_str = if includes_backups { "B" } else { "" }; + let includes_backups_str = if includes_backups { "B " } else { "" }; if let Some(consensus_saved_block) = consensus_head_block { match &old_synced_connections.head_block { None => { debug!( - "first {}/{}/{}/{} block={}, rpc={} {}", + "first {}{}/{}/{}/{} block={}, rpc={}", + includes_backups_str, num_consensus_rpcs, num_checked_rpcs, num_active_rpcs, total_rpcs, consensus_saved_block, rpc, - includes_backups_str, ); if includes_backups { @@ -371,7 +371,8 @@ impl Web3Connections { if consensus_saved_block.hash() == old_head_block.hash() { // no change in hash. no need to use head_block_sender debug!( - "con {}/{}/{}/{} con={} rpc={}@{} {}", + "con {}{}/{}/{}/{} con={} rpc={}@{}", + includes_backups_str, num_consensus_rpcs, num_checked_rpcs, num_active_rpcs, @@ -379,7 +380,6 @@ impl Web3Connections { consensus_saved_block, rpc, rpc_head_str, - includes_backups_str, ) } else { // hash changed @@ -390,7 +390,8 @@ impl Web3Connections { } debug!( - "unc {}/{}/{}/{} con_head={} old={} rpc={}@{} {}", + "unc {}{}/{}/{}/{} con_head={} old={} rpc={}@{}", + includes_backups_str, num_consensus_rpcs, num_checked_rpcs, num_active_rpcs, @@ -399,7 +400,6 @@ impl Web3Connections { old_head_block, rpc, rpc_head_str, - includes_backups_str, ); let consensus_head_block = self @@ -416,7 +416,8 @@ impl Web3Connections { // this is unlikely but possible // TODO: better log warn!( - "chain rolled back {}/{}/{}/{} con={} old={} rpc={}@{} {}", + "chain rolled back {}{}/{}/{}/{} con={} old={} rpc={}@{}", + includes_backups_str, num_consensus_rpcs, num_checked_rpcs, num_active_rpcs, @@ -425,7 +426,6 @@ impl Web3Connections { old_head_block, rpc, rpc_head_str, - includes_backups_str, ); if includes_backups { @@ -447,7 +447,8 @@ impl Web3Connections { } Ordering::Greater => { debug!( - "new {}/{}/{}/{} con={} rpc={}@{} {}", + "new {}{}/{}/{}/{} con={} rpc={}@{}", + includes_backups_str, num_consensus_rpcs, num_checked_rpcs, num_active_rpcs, @@ -455,7 +456,6 @@ impl Web3Connections { consensus_saved_block, rpc, rpc_head_str, - includes_backups_str, ); if includes_backups { @@ -479,25 +479,25 @@ impl Web3Connections { if num_checked_rpcs >= self.min_head_rpcs { error!( - "non {}/{}/{}/{} rpc={}@{} {}", + "non {}{}/{}/{}/{} rpc={}@{}", + includes_backups_str, num_consensus_rpcs, num_checked_rpcs, num_active_rpcs, total_rpcs, rpc, rpc_head_str, - includes_backups_str, ); } else { debug!( - "non {}/{}/{}/{} rpc={}@{} {}", + "non {}{}/{}/{}/{} rpc={}@{}", + includes_backups_str, num_consensus_rpcs, num_checked_rpcs, num_active_rpcs, total_rpcs, rpc, rpc_head_str, - includes_backups_str, ); } } diff --git a/web3_proxy/src/rpcs/connections.rs b/web3_proxy/src/rpcs/connections.rs index 0953cde6..9618d92d 100644 --- a/web3_proxy/src/rpcs/connections.rs +++ b/web3_proxy/src/rpcs/connections.rs @@ -27,9 +27,9 @@ use serde::Serialize; use serde_json::json; use serde_json::value::RawValue; use std::collections::BTreeMap; -use std::fmt; use std::sync::atomic::Ordering; use std::sync::Arc; +use std::{cmp, fmt}; use thread_fast_rng::rand::seq::SliceRandom; use tokio::sync::{broadcast, watch}; use tokio::task; @@ -446,47 +446,57 @@ impl Web3Connections { let usable_rpcs_by_head_num_and_weight: BTreeMap< (Option, u64), Vec>, - > = if let Some(min_block_needed) = min_block_needed { - // need a potentially old block. check all the rpcs. prefer the most synced - let mut m = BTreeMap::new(); - - for x in self - .conns - .values() - .filter(|x| if allow_backups { true } else { !x.backup }) - .filter(|x| !skip.contains(x)) - .filter(|x| x.has_block_data(min_block_needed)) - .cloned() - { - let x_head_block = x.head_block.read().clone(); - - match x_head_block { - None => continue, - Some(x_head) => { - let key = (Some(x_head.number()), u64::MAX - x.tier); - - m.entry(key).or_insert_with(Vec::new).push(x); - } - } - } - - m - } else { - // need latest. filter the synced rpcs + > = { let synced_connections = self.synced_connections.load(); - // TODO: if head_block is super old. emit an error! + let head_block_num = if let Some(head_block) = synced_connections.head_block.as_ref() { + head_block.number() + } else { + return Ok(OpenRequestResult::NotReady); + }; + + let min_block_needed = min_block_needed.unwrap_or(&head_block_num); let mut m = BTreeMap::new(); - for x in synced_connections - .conns - .iter() - .filter(|x| !skip.contains(x)) - { - let key = (None, u64::MAX - x.tier); + match min_block_needed.cmp(&head_block_num) { + cmp::Ordering::Less => { + // need an old block. check all the rpcs. prefer the most synced + for x in self + .conns + .values() + .filter(|x| if allow_backups { true } else { !x.backup }) + .filter(|x| !skip.contains(x)) + .filter(|x| x.has_block_data(min_block_needed)) + .cloned() + { + let x_head_block = x.head_block.read().clone(); - m.entry(key).or_insert_with(Vec::new).push(x.clone()); + match x_head_block { + None => continue, + Some(x_head) => { + let key = (Some(x_head.number()), u64::MAX - x.tier); + + m.entry(key).or_insert_with(Vec::new).push(x); + } + } + } + } + cmp::Ordering::Equal => { + // need the consensus head block. filter the synced rpcs + for x in synced_connections + .conns + .iter() + .filter(|x| !skip.contains(x)) + { + let key = (None, u64::MAX - x.tier); + + m.entry(key).or_insert_with(Vec::new).push(x.clone()); + } + } + cmp::Ordering::Greater => { + return Ok(OpenRequestResult::NotReady); + } } m