sync sort should include backup
This commit is contained in:
parent
2e9803de72
commit
baf07f7de5
@ -405,9 +405,17 @@ impl Web3Rpcs {
|
||||
let backups_needed = new_synced_connections.backups_needed;
|
||||
let consensus_head_block = new_synced_connections.head_block.clone();
|
||||
let num_consensus_rpcs = new_synced_connections.num_conns();
|
||||
let mut num_synced_rpcs = 0;
|
||||
let num_active_rpcs = consensus_finder
|
||||
.all_rpcs_group()
|
||||
.map(|x| x.len())
|
||||
.map(|x| {
|
||||
for v in x.rpc_to_block.values() {
|
||||
if *v == consensus_head_block {
|
||||
num_synced_rpcs += 1;
|
||||
}
|
||||
}
|
||||
x.len()
|
||||
})
|
||||
.unwrap_or_default();
|
||||
let total_rpcs = self.by_name.read().len();
|
||||
|
||||
@ -460,11 +468,12 @@ impl Web3Rpcs {
|
||||
// no change in hash. no need to use watch_consensus_head_sender
|
||||
// TODO: trace level if rpc is backup
|
||||
debug!(
|
||||
"con {}/{} {}{}/{}/{} con={} rpc={}@{}",
|
||||
"con {}/{} {}{}/{}/{}/{} con={} rpc={}@{}",
|
||||
consensus_tier,
|
||||
total_tiers,
|
||||
backups_voted_str,
|
||||
num_consensus_rpcs,
|
||||
num_synced_rpcs,
|
||||
num_active_rpcs,
|
||||
total_rpcs,
|
||||
consensus_head_block,
|
||||
@ -479,11 +488,12 @@ impl Web3Rpcs {
|
||||
}
|
||||
|
||||
debug!(
|
||||
"unc {}/{} {}{}/{}/{} con_head={} old={} rpc={}@{}",
|
||||
"unc {}/{} {}{}/{}/{}/{} con_head={} old={} rpc={}@{}",
|
||||
consensus_tier,
|
||||
total_tiers,
|
||||
backups_voted_str,
|
||||
num_consensus_rpcs,
|
||||
num_synced_rpcs,
|
||||
num_active_rpcs,
|
||||
total_rpcs,
|
||||
consensus_head_block,
|
||||
@ -506,11 +516,12 @@ impl Web3Rpcs {
|
||||
// this is unlikely but possible
|
||||
// TODO: better log
|
||||
warn!(
|
||||
"chain rolled back {}/{} {}{}/{}/{} con={} old={} rpc={}@{}",
|
||||
"chain rolled back {}/{} {}{}/{}/{}/{} con={} old={} rpc={}@{}",
|
||||
consensus_tier,
|
||||
total_tiers,
|
||||
backups_voted_str,
|
||||
num_consensus_rpcs,
|
||||
num_synced_rpcs,
|
||||
num_active_rpcs,
|
||||
total_rpcs,
|
||||
consensus_head_block,
|
||||
@ -536,11 +547,12 @@ impl Web3Rpcs {
|
||||
}
|
||||
Ordering::Greater => {
|
||||
debug!(
|
||||
"new {}/{} {}{}/{}/{} con={} rpc={}@{}",
|
||||
"new {}/{} {}{}/{}/{}/{} con={} rpc={}@{}",
|
||||
consensus_tier,
|
||||
total_tiers,
|
||||
backups_voted_str,
|
||||
num_consensus_rpcs,
|
||||
num_synced_rpcs,
|
||||
num_active_rpcs,
|
||||
total_rpcs,
|
||||
consensus_head_block,
|
||||
|
@ -93,9 +93,9 @@ impl Web3Rpcs {
|
||||
type FirstSeenCache = Cache<H256, Instant, hashbrown::hash_map::DefaultHashBuilder>;
|
||||
|
||||
pub struct ConnectionsGroup {
|
||||
rpc_to_block: HashMap<Arc<Web3Rpc>, Web3ProxyBlock>,
|
||||
pub rpc_to_block: HashMap<Arc<Web3Rpc>, Web3ProxyBlock>,
|
||||
// TODO: what if there are two blocks with the same number?
|
||||
highest_block: Option<Web3ProxyBlock>,
|
||||
pub highest_block: Option<Web3ProxyBlock>,
|
||||
/// used to track rpc.head_latency. The same cache should be shared between all ConnectionsGroups
|
||||
first_seen: FirstSeenCache,
|
||||
}
|
||||
|
@ -702,6 +702,7 @@ impl Web3Rpcs {
|
||||
/// this prefers synced servers, but it will return servers even if they aren't fully in sync.
|
||||
/// This is useful for broadcasting signed transactions.
|
||||
// TODO: better type on this that can return an anyhow::Result
|
||||
// TODO: this is broken
|
||||
pub async fn all_connections(
|
||||
&self,
|
||||
authorization: &Arc<Authorization>,
|
||||
@ -1199,7 +1200,7 @@ impl Serialize for Web3Rpcs {
|
||||
/// TODO: should this be moved into a `impl Web3Rpc`?
|
||||
/// TODO: i think we still have sorts scattered around the code that should use this
|
||||
/// TODO: take AsRef or something like that? We don't need an Arc here
|
||||
fn rpc_sync_status_sort_key(x: &Arc<Web3Rpc>) -> (U64, u64, OrderedFloat<f64>) {
|
||||
fn rpc_sync_status_sort_key(x: &Arc<Web3Rpc>) -> (U64, u64, bool, OrderedFloat<f64>) {
|
||||
let reversed_head_block = U64::MAX
|
||||
- x.head_block
|
||||
.read()
|
||||
@ -1209,7 +1210,8 @@ fn rpc_sync_status_sort_key(x: &Arc<Web3Rpc>) -> (U64, u64, OrderedFloat<f64>) {
|
||||
|
||||
let tier = x.tier;
|
||||
|
||||
// TODO: use request instead of head latency
|
||||
// TODO: use request latency instead of head latency
|
||||
// TODO: have the latency decay automatically
|
||||
let head_ewma = x.head_latency.read().value();
|
||||
|
||||
let active_requests = x.active_requests.load(atomic::Ordering::Relaxed) as f64;
|
||||
@ -1218,7 +1220,9 @@ fn rpc_sync_status_sort_key(x: &Arc<Web3Rpc>) -> (U64, u64, OrderedFloat<f64>) {
|
||||
// TODO: i don't think this actually counts as peak. investigate with atomics.rs and peak_ewma.rs
|
||||
let peak_ewma = OrderedFloat(head_ewma * active_requests);
|
||||
|
||||
(reversed_head_block, tier, peak_ewma)
|
||||
let backup = x.backup;
|
||||
|
||||
(reversed_head_block, tier, backup, peak_ewma)
|
||||
}
|
||||
|
||||
mod tests {
|
||||
@ -1671,4 +1675,8 @@ mod tests {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn test_all_connections() {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user