diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index 3494bbeb..3f1c79a9 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -405,9 +405,17 @@ impl Web3Rpcs { let backups_needed = new_synced_connections.backups_needed; let consensus_head_block = new_synced_connections.head_block.clone(); let num_consensus_rpcs = new_synced_connections.num_conns(); + let mut num_synced_rpcs = 0; let num_active_rpcs = consensus_finder .all_rpcs_group() - .map(|x| x.len()) + .map(|x| { + for v in x.rpc_to_block.values() { + if *v == consensus_head_block { + num_synced_rpcs += 1; + } + } + x.len() + }) .unwrap_or_default(); let total_rpcs = self.by_name.read().len(); @@ -460,11 +468,12 @@ impl Web3Rpcs { // no change in hash. no need to use watch_consensus_head_sender // TODO: trace level if rpc is backup debug!( - "con {}/{} {}{}/{}/{} con={} rpc={}@{}", + "con {}/{} {}{}/{}/{}/{} con={} rpc={}@{}", consensus_tier, total_tiers, backups_voted_str, num_consensus_rpcs, + num_synced_rpcs, num_active_rpcs, total_rpcs, consensus_head_block, @@ -479,11 +488,12 @@ impl Web3Rpcs { } debug!( - "unc {}/{} {}{}/{}/{} con_head={} old={} rpc={}@{}", + "unc {}/{} {}{}/{}/{}/{} con_head={} old={} rpc={}@{}", consensus_tier, total_tiers, backups_voted_str, num_consensus_rpcs, + num_synced_rpcs, num_active_rpcs, total_rpcs, consensus_head_block, @@ -506,11 +516,12 @@ impl Web3Rpcs { // this is unlikely but possible // TODO: better log warn!( - "chain rolled back {}/{} {}{}/{}/{} con={} old={} rpc={}@{}", + "chain rolled back {}/{} {}{}/{}/{}/{} con={} old={} rpc={}@{}", consensus_tier, total_tiers, backups_voted_str, num_consensus_rpcs, + num_synced_rpcs, num_active_rpcs, total_rpcs, consensus_head_block, @@ -536,11 +547,12 @@ impl Web3Rpcs { } Ordering::Greater => { debug!( - "new {}/{} {}{}/{}/{} con={} rpc={}@{}", + "new {}/{} {}{}/{}/{}/{} con={} rpc={}@{}", consensus_tier, total_tiers, backups_voted_str, num_consensus_rpcs, + num_synced_rpcs, num_active_rpcs, total_rpcs, consensus_head_block, diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index f4e63259..8625f53b 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -93,9 +93,9 @@ impl Web3Rpcs { type FirstSeenCache = Cache; pub struct ConnectionsGroup { - rpc_to_block: HashMap, Web3ProxyBlock>, + pub rpc_to_block: HashMap, Web3ProxyBlock>, // TODO: what if there are two blocks with the same number? - highest_block: Option, + pub highest_block: Option, /// used to track rpc.head_latency. The same cache should be shared between all ConnectionsGroups first_seen: FirstSeenCache, } diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 8d41bcc6..ab2cfbcd 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -702,6 +702,7 @@ impl Web3Rpcs { /// this prefers synced servers, but it will return servers even if they aren't fully in sync. /// This is useful for broadcasting signed transactions. // TODO: better type on this that can return an anyhow::Result + // TODO: this is broken pub async fn all_connections( &self, authorization: &Arc, @@ -1199,7 +1200,7 @@ impl Serialize for Web3Rpcs { /// TODO: should this be moved into a `impl Web3Rpc`? /// TODO: i think we still have sorts scattered around the code that should use this /// TODO: take AsRef or something like that? We don't need an Arc here -fn rpc_sync_status_sort_key(x: &Arc) -> (U64, u64, OrderedFloat) { +fn rpc_sync_status_sort_key(x: &Arc) -> (U64, u64, bool, OrderedFloat) { let reversed_head_block = U64::MAX - x.head_block .read() @@ -1209,7 +1210,8 @@ fn rpc_sync_status_sort_key(x: &Arc) -> (U64, u64, OrderedFloat) { let tier = x.tier; - // TODO: use request instead of head latency + // TODO: use request latency instead of head latency + // TODO: have the latency decay automatically let head_ewma = x.head_latency.read().value(); let active_requests = x.active_requests.load(atomic::Ordering::Relaxed) as f64; @@ -1218,7 +1220,9 @@ fn rpc_sync_status_sort_key(x: &Arc) -> (U64, u64, OrderedFloat) { // TODO: i don't think this actually counts as peak. investigate with atomics.rs and peak_ewma.rs let peak_ewma = OrderedFloat(head_ewma * active_requests); - (reversed_head_block, tier, peak_ewma) + let backup = x.backup; + + (reversed_head_block, tier, backup, peak_ewma) } mod tests { @@ -1671,4 +1675,8 @@ mod tests { } } } + + fn test_all_connections() { + todo!() + } }