From bd87fcb13c0d8cea09e950298a65e199b16c5f52 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Fri, 9 Jun 2023 13:09:58 -0700 Subject: [PATCH] move sort and shuffle for loadbalancing into proper functions --- web3_proxy/src/rpcs/consensus.rs | 12 ++--- web3_proxy/src/rpcs/many.rs | 84 +++++++++++++------------------- web3_proxy/src/rpcs/one.rs | 56 +++++++++++++++++++++ 3 files changed, 96 insertions(+), 56 deletions(-) diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index 81ba2d9a..483a76df 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -11,7 +11,7 @@ use hashbrown::{HashMap, HashSet}; use hdrhistogram::serialization::{Serializer, V2DeflateSerializer}; use hdrhistogram::Histogram; use itertools::{Itertools, MinMaxResult}; -use log::{debug, log_enabled, trace, warn, Level}; +use log::{log_enabled, trace, warn, Level}; use moka::future::Cache; use serde::Serialize; use std::cmp::{Ordering, Reverse}; @@ -73,6 +73,8 @@ impl RpcRanking { fn sort_key(&self) -> (bool, u8, Reverse>) { // TODO: add soft_limit here? add peak_ewma here? // TODO: should backup or tier be checked first? now that tiers are automated, backups + // TODO: should we include a random number in here? + // TODO: should we include peak_ewma_latency or weighted_peak_ewma_latency? (!self.backup, self.tier, Reverse(self.head_num)) } } @@ -433,11 +435,7 @@ impl ConsensusFinder { Ok(changed) } - pub async fn update_tiers( - &mut self, - authorization: &Arc, - web3_rpcs: &Web3Rpcs, - ) -> Web3ProxyResult<()> { + pub async fn update_tiers(&mut self) -> Web3ProxyResult<()> { match self.rpc_heads.len() { 0 => {} 1 => { @@ -525,7 +523,7 @@ impl ConsensusFinder { authorization: &Arc, web3_rpcs: &Web3Rpcs, ) -> Web3ProxyResult> { - self.update_tiers(authorization, web3_rpcs).await?; + self.update_tiers().await?; let minmax_block = self.rpc_heads.values().minmax_by_key(|&x| x.number()); diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 92552b48..4644bac4 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -23,17 +23,15 @@ use itertools::Itertools; use log::{debug, error, info, trace, warn}; use migration::sea_orm::DatabaseConnection; use moka::future::{Cache, CacheBuilder}; -use ordered_float::OrderedFloat; use serde::ser::{SerializeStruct, Serializer}; use serde::Serialize; use serde_json::json; use serde_json::value::RawValue; use std::borrow::Cow; -use std::cmp::{min_by_key, Reverse}; +use std::cmp::min_by_key; use std::fmt::{self, Display}; -use std::sync::atomic::{self, Ordering}; +use std::sync::atomic::Ordering; use std::sync::Arc; -use thread_fast_rng::rand::seq::SliceRandom; use tokio::select; use tokio::sync::{broadcast, watch}; use tokio::time::{sleep, sleep_until, Duration, Instant}; @@ -523,8 +521,8 @@ impl Web3Rpcs { .cloned() .collect(); - // TODO: include tiers in this? - potential_rpcs.shuffle(&mut thread_fast_rng::thread_fast_rng()); + potential_rpcs + .sort_by_cached_key(|x| x.shuffle_for_load_balancing_on(max_block_needed.copied())); match self ._best_available_rpc(&authorization, error_handler, &potential_rpcs, skip_rpcs) @@ -572,10 +570,11 @@ impl Web3Rpcs { .cloned(), ); - potential_rpcs.shuffle(&mut thread_fast_rng::thread_fast_rng()); - if potential_rpcs.len() >= self.min_head_rpcs { // we have enough potential rpcs. try to load balance + potential_rpcs.sort_by_cached_key(|x| { + x.shuffle_for_load_balancing_on(max_block_needed.copied()) + }); match self ._best_available_rpc( @@ -604,8 +603,7 @@ impl Web3Rpcs { } for next_rpcs in consensus_rpcs.other_rpcs.values() { - // we have to collect in order to shuffle - let mut more_rpcs: Vec<_> = next_rpcs + let more_rpcs = next_rpcs .iter() .filter(|rpc| { consensus_rpcs.rpc_will_work_now( @@ -615,16 +613,16 @@ impl Web3Rpcs { rpc, ) }) - .cloned() - .collect(); + .cloned(); - // shuffle only the new entries. that way the highest tier still gets preference - more_rpcs.shuffle(&mut thread_fast_rng::thread_fast_rng()); - - potential_rpcs.extend(more_rpcs.into_iter()); + potential_rpcs.extend(more_rpcs); if potential_rpcs.len() >= self.min_head_rpcs { // we have enough potential rpcs. try to load balance + potential_rpcs.sort_by_cached_key(|x| { + x.shuffle_for_load_balancing_on(max_block_needed.copied()) + }); + match self ._best_available_rpc( &authorization, @@ -654,6 +652,10 @@ impl Web3Rpcs { if !potential_rpcs.is_empty() { // even after scanning all the tiers, there are not enough rpcs that can serve this request. try anyways + potential_rpcs.sort_by_cached_key(|x| { + x.shuffle_for_load_balancing_on(max_block_needed.copied()) + }); + match self ._best_available_rpc( &authorization, @@ -760,14 +762,14 @@ impl Web3Rpcs { }; // synced connections are all on the same block. sort them by tier with higher soft limits first - synced_rpcs.sort_by_cached_key(rpc_sync_status_sort_key); + synced_rpcs.sort_by_cached_key(|x| x.sort_for_load_balancing_on(max_block_needed.copied())); trace!("synced_rpcs: {:#?}", synced_rpcs); // if there aren't enough synced connections, include more connections // TODO: only do this sorting if the synced_rpcs isn't enough let mut all_rpcs: Vec<_> = self.by_name.load().values().cloned().collect(); - all_rpcs.sort_by_cached_key(rpc_sync_status_sort_key); + all_rpcs.sort_by_cached_key(|x| x.sort_for_load_balancing_on(max_block_needed.copied())); trace!("all_rpcs: {:#?}", all_rpcs); @@ -1284,27 +1286,6 @@ impl Serialize for Web3Rpcs { } } -/// sort by block number (descending) and tier (ascending) -/// TODO: should this be moved into a `impl Web3Rpc`? -/// TODO: i think we still have sorts scattered around the code that should use this -/// TODO: take AsRef or something like that? We don't need an Arc here -/// TODO: tests on this! -fn rpc_sync_status_sort_key(x: &Arc) -> (bool, Reverse, u8, OrderedFloat) { - let head_block = x - .head_block - .as_ref() - .and_then(|x| x.borrow().as_ref().map(|x| *x.number())) - .unwrap_or_default(); - - let tier = x.tier.load(atomic::Ordering::Relaxed); - - let peak_ewma = x.weighted_peak_ewma_seconds(); - - let backup = x.backup; - - (!backup, Reverse(head_block), tier, peak_ewma) -} - mod tests { #![allow(unused_imports)] @@ -1327,8 +1308,14 @@ mod tests { PeakEwmaLatency::spawn(Duration::from_secs(1), 4, Duration::from_secs(1)) } - #[tokio::test] + #[tokio::test(start_paused = true)] async fn test_sort_connections_by_sync_status() { + let _ = env_logger::builder() + .filter_level(LevelFilter::Error) + .filter_module("web3_proxy", LevelFilter::Trace) + .is_test(true) + .try_init(); + let block_0 = Block { number: Some(0.into()), hash: Some(H256::random()), @@ -1362,42 +1349,42 @@ mod tests { let mut rpcs: Vec<_> = [ Web3Rpc { name: "a".to_string(), - // tier: 0, + tier: 0.into(), head_block: Some(tx_a), peak_latency: Some(new_peak_latency()), ..Default::default() }, Web3Rpc { name: "b".to_string(), - // tier: 0, + tier: 0.into(), head_block: Some(tx_b), peak_latency: Some(new_peak_latency()), ..Default::default() }, Web3Rpc { name: "c".to_string(), - // tier: 0, + tier: 0.into(), head_block: Some(tx_c), peak_latency: Some(new_peak_latency()), ..Default::default() }, Web3Rpc { name: "d".to_string(), - // tier: 1, + tier: 1.into(), head_block: Some(tx_d), peak_latency: Some(new_peak_latency()), ..Default::default() }, Web3Rpc { name: "e".to_string(), - // tier: 1, + tier: 1.into(), head_block: Some(tx_e), peak_latency: Some(new_peak_latency()), ..Default::default() }, Web3Rpc { name: "f".to_string(), - // tier: 1, + tier: 1.into(), head_block: Some(tx_f), peak_latency: Some(new_peak_latency()), ..Default::default() @@ -1407,12 +1394,11 @@ mod tests { .map(Arc::new) .collect(); - rpcs.sort_by_cached_key(rpc_sync_status_sort_key); + rpcs.sort_by_cached_key(|x| x.sort_for_load_balancing_on(None)); let names_in_sort_order: Vec<_> = rpcs.iter().map(|x| x.name.as_str()).collect(); - // TODO: the tier refactor likely broke this - assert_eq!(names_in_sort_order, ["c", "f", "b", "e", "a", "d"]); + assert_eq!(names_in_sort_order, ["c", "b", "a", "f", "e", "d"]); } #[tokio::test] diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index 4a30b983..dc07a732 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -22,10 +22,12 @@ use redis_rate_limiter::{RedisPool, RedisRateLimitResult, RedisRateLimiter}; use serde::ser::{SerializeStruct, Serializer}; use serde::Serialize; use serde_json::json; +use std::cmp::Reverse; use std::fmt; use std::hash::{Hash, Hasher}; use std::sync::atomic::{self, AtomicU64, AtomicU8, AtomicUsize}; use std::{cmp::Ordering, sync::Arc}; +use thread_fast_rng::rand::Rng; use tokio::sync::watch; use tokio::time::{sleep, sleep_until, timeout, Duration, Instant}; use url::Url; @@ -218,6 +220,60 @@ impl Web3Rpc { Ok((new_connection, handle)) } + /// sort by... + /// - backups last + /// - tier (ascending) + /// - block number (descending) + /// TODO: tests on this! + /// TODO: should tier or block number take priority? + /// TODO: should this return a struct that implements sorting traits? + fn sort_on(&self, max_block: Option) -> (bool, u8, Reverse) { + let mut head_block = self + .head_block + .as_ref() + .and_then(|x| x.borrow().as_ref().map(|x| *x.number())) + .unwrap_or_default(); + + if let Some(max_block) = max_block { + head_block = head_block.min(max_block); + } + + let tier = self.tier.load(atomic::Ordering::Relaxed); + + let backup = self.backup; + + (!backup, tier, Reverse(head_block)) + } + + pub fn sort_for_load_balancing_on( + &self, + max_block: Option, + ) -> ((bool, u8, Reverse), OrderedFloat) { + let sort_on = self.sort_on(max_block); + + let weighted_peak_ewma_seconds = self.weighted_peak_ewma_seconds(); + + let x = (sort_on, weighted_peak_ewma_seconds); + + trace!("sort_for_load_balancing {}: {:?}", self, x); + + x + } + + /// like sort_for_load_balancing, but shuffles tiers randomly instead of sorting by weighted_peak_ewma_seconds + pub fn shuffle_for_load_balancing_on( + &self, + max_block: Option, + ) -> ((bool, u8, Reverse), u32) { + let sort_on = self.sort_on(max_block); + + let mut rng = thread_fast_rng::thread_fast_rng(); + + let r = rng.gen::(); + + (sort_on, r) + } + pub fn weighted_peak_ewma_seconds(&self) -> OrderedFloat { let peak_latency = if let Some(peak_latency) = self.peak_latency.as_ref() { peak_latency.latency().as_secs_f64()