move backup indicator

This commit is contained in:
Bryan Stitt 2023-01-19 18:30:30 -08:00
parent c3b53eb5f2
commit e53030e053
2 changed files with 61 additions and 51 deletions

@ -331,20 +331,20 @@ impl Web3Connections {
.synced_connections .synced_connections
.swap(Arc::new(new_synced_connections)); .swap(Arc::new(new_synced_connections));
let includes_backups_str = if includes_backups { "B" } else { "" }; let includes_backups_str = if includes_backups { "B " } else { "" };
if let Some(consensus_saved_block) = consensus_head_block { if let Some(consensus_saved_block) = consensus_head_block {
match &old_synced_connections.head_block { match &old_synced_connections.head_block {
None => { None => {
debug!( debug!(
"first {}/{}/{}/{} block={}, rpc={} {}", "first {}{}/{}/{}/{} block={}, rpc={}",
includes_backups_str,
num_consensus_rpcs, num_consensus_rpcs,
num_checked_rpcs, num_checked_rpcs,
num_active_rpcs, num_active_rpcs,
total_rpcs, total_rpcs,
consensus_saved_block, consensus_saved_block,
rpc, rpc,
includes_backups_str,
); );
if includes_backups { if includes_backups {
@ -371,7 +371,8 @@ impl Web3Connections {
if consensus_saved_block.hash() == old_head_block.hash() { if consensus_saved_block.hash() == old_head_block.hash() {
// no change in hash. no need to use head_block_sender // no change in hash. no need to use head_block_sender
debug!( debug!(
"con {}/{}/{}/{} con={} rpc={}@{} {}", "con {}{}/{}/{}/{} con={} rpc={}@{}",
includes_backups_str,
num_consensus_rpcs, num_consensus_rpcs,
num_checked_rpcs, num_checked_rpcs,
num_active_rpcs, num_active_rpcs,
@ -379,7 +380,6 @@ impl Web3Connections {
consensus_saved_block, consensus_saved_block,
rpc, rpc,
rpc_head_str, rpc_head_str,
includes_backups_str,
) )
} else { } else {
// hash changed // hash changed
@ -390,7 +390,8 @@ impl Web3Connections {
} }
debug!( debug!(
"unc {}/{}/{}/{} con_head={} old={} rpc={}@{} {}", "unc {}{}/{}/{}/{} con_head={} old={} rpc={}@{}",
includes_backups_str,
num_consensus_rpcs, num_consensus_rpcs,
num_checked_rpcs, num_checked_rpcs,
num_active_rpcs, num_active_rpcs,
@ -399,7 +400,6 @@ impl Web3Connections {
old_head_block, old_head_block,
rpc, rpc,
rpc_head_str, rpc_head_str,
includes_backups_str,
); );
let consensus_head_block = self let consensus_head_block = self
@ -416,7 +416,8 @@ impl Web3Connections {
// this is unlikely but possible // this is unlikely but possible
// TODO: better log // TODO: better log
warn!( warn!(
"chain rolled back {}/{}/{}/{} con={} old={} rpc={}@{} {}", "chain rolled back {}{}/{}/{}/{} con={} old={} rpc={}@{}",
includes_backups_str,
num_consensus_rpcs, num_consensus_rpcs,
num_checked_rpcs, num_checked_rpcs,
num_active_rpcs, num_active_rpcs,
@ -425,7 +426,6 @@ impl Web3Connections {
old_head_block, old_head_block,
rpc, rpc,
rpc_head_str, rpc_head_str,
includes_backups_str,
); );
if includes_backups { if includes_backups {
@ -447,7 +447,8 @@ impl Web3Connections {
} }
Ordering::Greater => { Ordering::Greater => {
debug!( debug!(
"new {}/{}/{}/{} con={} rpc={}@{} {}", "new {}{}/{}/{}/{} con={} rpc={}@{}",
includes_backups_str,
num_consensus_rpcs, num_consensus_rpcs,
num_checked_rpcs, num_checked_rpcs,
num_active_rpcs, num_active_rpcs,
@ -455,7 +456,6 @@ impl Web3Connections {
consensus_saved_block, consensus_saved_block,
rpc, rpc,
rpc_head_str, rpc_head_str,
includes_backups_str,
); );
if includes_backups { if includes_backups {
@ -479,25 +479,25 @@ impl Web3Connections {
if num_checked_rpcs >= self.min_head_rpcs { if num_checked_rpcs >= self.min_head_rpcs {
error!( error!(
"non {}/{}/{}/{} rpc={}@{} {}", "non {}{}/{}/{}/{} rpc={}@{}",
includes_backups_str,
num_consensus_rpcs, num_consensus_rpcs,
num_checked_rpcs, num_checked_rpcs,
num_active_rpcs, num_active_rpcs,
total_rpcs, total_rpcs,
rpc, rpc,
rpc_head_str, rpc_head_str,
includes_backups_str,
); );
} else { } else {
debug!( debug!(
"non {}/{}/{}/{} rpc={}@{} {}", "non {}{}/{}/{}/{} rpc={}@{}",
includes_backups_str,
num_consensus_rpcs, num_consensus_rpcs,
num_checked_rpcs, num_checked_rpcs,
num_active_rpcs, num_active_rpcs,
total_rpcs, total_rpcs,
rpc, rpc,
rpc_head_str, rpc_head_str,
includes_backups_str,
); );
} }
} }

@ -27,9 +27,9 @@ use serde::Serialize;
use serde_json::json; use serde_json::json;
use serde_json::value::RawValue; use serde_json::value::RawValue;
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::fmt;
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
use std::sync::Arc; use std::sync::Arc;
use std::{cmp, fmt};
use thread_fast_rng::rand::seq::SliceRandom; use thread_fast_rng::rand::seq::SliceRandom;
use tokio::sync::{broadcast, watch}; use tokio::sync::{broadcast, watch};
use tokio::task; use tokio::task;
@ -446,10 +446,22 @@ impl Web3Connections {
let usable_rpcs_by_head_num_and_weight: BTreeMap< let usable_rpcs_by_head_num_and_weight: BTreeMap<
(Option<U64>, u64), (Option<U64>, u64),
Vec<Arc<Web3Connection>>, Vec<Arc<Web3Connection>>,
> = if let Some(min_block_needed) = min_block_needed { > = {
// need a potentially old block. check all the rpcs. prefer the most synced let synced_connections = self.synced_connections.load();
let head_block_num = if let Some(head_block) = synced_connections.head_block.as_ref() {
head_block.number()
} else {
return Ok(OpenRequestResult::NotReady);
};
let min_block_needed = min_block_needed.unwrap_or(&head_block_num);
let mut m = BTreeMap::new(); let mut m = BTreeMap::new();
match min_block_needed.cmp(&head_block_num) {
cmp::Ordering::Less => {
// need an old block. check all the rpcs. prefer the most synced
for x in self for x in self
.conns .conns
.values() .values()
@ -469,16 +481,9 @@ impl Web3Connections {
} }
} }
} }
}
m cmp::Ordering::Equal => {
} else { // need the consensus head block. filter the synced rpcs
// need latest. filter the synced rpcs
let synced_connections = self.synced_connections.load();
// TODO: if head_block is super old. emit an error!
let mut m = BTreeMap::new();
for x in synced_connections for x in synced_connections
.conns .conns
.iter() .iter()
@ -488,6 +493,11 @@ impl Web3Connections {
m.entry(key).or_insert_with(Vec::new).push(x.clone()); m.entry(key).or_insert_with(Vec::new).push(x.clone());
} }
}
cmp::Ordering::Greater => {
return Ok(OpenRequestResult::NotReady);
}
}
m m
}; };