From 15216b7d3363e1df594284c89e8d8e0e5f605493 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Wed, 11 Oct 2023 01:23:08 -0700 Subject: [PATCH] no waiting and simpler rpc looping --- Cargo.lock | 4 +- web3_proxy/Cargo.toml | 2 +- web3_proxy/src/rpcs/consensus.rs | 100 +++++++++++-------------------- web3_proxy/src/rpcs/one.rs | 19 +++--- web3_proxy_cli/Cargo.toml | 2 +- 5 files changed, 49 insertions(+), 78 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b9557491..e1fdb199 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6576,7 +6576,7 @@ dependencies = [ [[package]] name = "web3_proxy" -version = "1.43.35" +version = "1.43.36" dependencies = [ "anyhow", "arc-swap", @@ -6655,7 +6655,7 @@ dependencies = [ [[package]] name = "web3_proxy_cli" -version = "1.43.35" +version = "1.43.36" dependencies = [ "env_logger", "parking_lot", diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index 2f274415..3228eb97 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "web3_proxy" -version = "1.43.35" +version = "1.43.36" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index 744ce25f..c60a4c51 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -866,14 +866,8 @@ impl RpcsForRequest { // let error_handler = web3_request.authorization.error_handler; let error_handler = None; - let max_len = self.inner.len(); - - // TODO: do this without having 3 Vecs - let mut filtered = Vec::with_capacity(max_len); - let mut attempted = HashSet::with_capacity(max_len); - let mut completed = HashSet::with_capacity(max_len); - // todo!("be sure to set server_error if we exit without any rpcs!"); + #[allow(clippy::never_loop)] loop { if self.request.connect_timeout() { break; @@ -884,72 +878,46 @@ impl RpcsForRequest { let mut earliest_retry_at = None; let mut wait_for_sync = FuturesUnordered::new(); - // first check the inners, then the outers - attempted.clear(); - - while attempted.len() + completed.len() < self.inner.len() { - filtered.clear(); - - // TODO: i'd like to do this without the collect, but since we push into `attempted`, having `attempted.contains` causes issues - filtered.extend(self.inner.iter().filter(|x| !(attempted.contains(x) || completed.contains(x)))); - - // tuple_windows doesn't do anything for single item iters. make the code DRY by just having it compare itself - if filtered.len() == 1 { - filtered.push(filtered[0]); - } - - for (rpc_a, rpc_b) in filtered.iter().tuple_windows() { - // TODO: ties within X% to the server with the smallest block_data_limit? - // find rpc with the lowest weighted peak latency. backups always lose. rate limits always lose - // TODO: should next_available be reversed? - // TODO: this is similar to sort_for_load_balancing_on, but at this point we don't want to prefer tiers - // TODO: move ethis to a helper function just so we can test it - // TODO: should x.next_available should be Reverse<_>? - let best_rpc = best_rpc(rpc_a, rpc_b); - - match best_rpc - .try_request_handle(&self.request, error_handler, false) - .await - { - Ok(OpenRequestResult::Handle(handle)) => { - trace!("opened handle: {}", best_rpc); - completed.insert(best_rpc); - yield handle; - } - Ok(OpenRequestResult::RetryAt(retry_at)) => { - trace!( - "retry on {} @ {}", - best_rpc, - retry_at.duration_since(Instant::now()).as_secs_f32() - ); - attempted.insert(best_rpc); - earliest_retry_at = earliest_retry_at.min(Some(retry_at, )); - } - Ok(OpenRequestResult::Lagged(x)) => { - // this will probably always be the same block, right? - trace!("{} is lagged. will not work now", best_rpc); - attempted.insert(best_rpc); - wait_for_sync.push(x); - } - Ok(OpenRequestResult::Failed) => { - // TODO: log a warning? emit a stat? - trace!("best_rpc not ready: {}", best_rpc); - completed.insert(best_rpc); - } - Err(err) => { - trace!("No request handle for {}. err={:?}", best_rpc, err); - completed.insert(best_rpc); - } + // TODO: we used to do a neat power of 2 random choices here, but it had bugs. bring that back + for best_rpc in self.inner.iter() { + match best_rpc + .try_request_handle(&self.request, error_handler, false) + .await + { + Ok(OpenRequestResult::Handle(handle)) => { + trace!("opened handle: {}", best_rpc); + yield handle; + } + Ok(OpenRequestResult::RetryAt(retry_at)) => { + trace!( + "retry on {} @ {}", + best_rpc, + retry_at.duration_since(Instant::now()).as_secs_f32() + ); + earliest_retry_at = earliest_retry_at.min(Some(retry_at, )); + } + Ok(OpenRequestResult::Lagged(x)) => { + // this will probably always be the same block, right? + trace!("{} is lagged. will not work now", best_rpc); + wait_for_sync.push(x); + } + Ok(OpenRequestResult::Failed) => { + // TODO: log a warning? emit a stat? + trace!("best_rpc not ready: {}", best_rpc); + } + Err(err) => { + trace!("No request handle for {}. err={:?}", best_rpc, err); } } - debug_assert!(!(attempted.is_empty() && completed.is_empty())); - + yield_now().await; } // if we got this far, no inner or outer rpcs are ready. thats suprising since an inner should have been ready. maybe it got rate limited warn!(?earliest_retry_at, num_waits=%wait_for_sync.len(), "no rpcs ready"); + break; + let min_wait_until = Instant::now() + Duration::from_millis(10); // clear earliest_retry_at if it is too far in the future to help us @@ -958,6 +926,8 @@ impl RpcsForRequest { // set a minimum of 100ms. this is probably actually a bug we should figure out. earliest_retry_at = Some(corrected); + } else if wait_for_sync.is_empty() { + break; } else { earliest_retry_at = Some(self.request.connect_timeout_at()); } diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index 92eac914..e783b3c5 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -18,7 +18,6 @@ use latency::{EwmaLatency, PeakEwmaLatency, RollingQuantileLatency}; use migration::sea_orm::DatabaseConnection; use nanorand::tls::TlsWyRand; use nanorand::Rng; -use ordered_float::OrderedFloat; use redis_rate_limiter::{RedisPool, RedisRateLimitResult, RedisRateLimiter}; use serde::ser::{SerializeStruct, Serializer}; use serde::Serialize; @@ -293,17 +292,19 @@ impl Web3Rpc { &self, max_block: Option, start_instant: Instant, - ) -> ((Instant, bool, Reverse, u32), OrderedFloat) { + ) -> ((Instant, bool, Reverse, u32), Duration) { let sort_on = self.sort_on(max_block, start_instant); - // TODO: i think median is better than weighted at this point. we save checking weighted for the very end - let median_latency = self - .median_latency - .as_ref() - .map(|x| x.seconds()) - .unwrap_or_default(); + // // TODO: once we do power-of-2 choices, put median_latency back + // let median_latency = self + // .median_latency + // .as_ref() + // .map(|x| x.seconds()) + // .unwrap_or_default(); - let x = (sort_on, OrderedFloat::from(median_latency)); + let weighted_latency = self.weighted_peak_latency(); + + let x = (sort_on, weighted_latency); trace!("sort_for_load_balancing {}: {:?}", self, x); diff --git a/web3_proxy_cli/Cargo.toml b/web3_proxy_cli/Cargo.toml index 65c6ae53..abc265d5 100644 --- a/web3_proxy_cli/Cargo.toml +++ b/web3_proxy_cli/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "web3_proxy_cli" -version = "1.43.35" +version = "1.43.36" edition = "2021" default-run = "web3_proxy_cli"