no waiting and simpler rpc looping

This commit is contained in:
Bryan Stitt 2023-10-11 01:23:08 -07:00
parent 20413b883f
commit 15216b7d33
5 changed files with 49 additions and 78 deletions

4
Cargo.lock generated

@ -6576,7 +6576,7 @@ dependencies = [
[[package]]
name = "web3_proxy"
version = "1.43.35"
version = "1.43.36"
dependencies = [
"anyhow",
"arc-swap",
@ -6655,7 +6655,7 @@ dependencies = [
[[package]]
name = "web3_proxy_cli"
version = "1.43.35"
version = "1.43.36"
dependencies = [
"env_logger",
"parking_lot",

@ -1,6 +1,6 @@
[package]
name = "web3_proxy"
version = "1.43.35"
version = "1.43.36"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

@ -866,14 +866,8 @@ impl RpcsForRequest {
// let error_handler = web3_request.authorization.error_handler;
let error_handler = None;
let max_len = self.inner.len();
// TODO: do this without having 3 Vecs
let mut filtered = Vec::with_capacity(max_len);
let mut attempted = HashSet::with_capacity(max_len);
let mut completed = HashSet::with_capacity(max_len);
// todo!("be sure to set server_error if we exit without any rpcs!");
#[allow(clippy::never_loop)]
loop {
if self.request.connect_timeout() {
break;
@ -884,72 +878,46 @@ impl RpcsForRequest {
let mut earliest_retry_at = None;
let mut wait_for_sync = FuturesUnordered::new();
// first check the inners, then the outers
attempted.clear();
while attempted.len() + completed.len() < self.inner.len() {
filtered.clear();
// TODO: i'd like to do this without the collect, but since we push into `attempted`, having `attempted.contains` causes issues
filtered.extend(self.inner.iter().filter(|x| !(attempted.contains(x) || completed.contains(x))));
// tuple_windows doesn't do anything for single item iters. make the code DRY by just having it compare itself
if filtered.len() == 1 {
filtered.push(filtered[0]);
}
for (rpc_a, rpc_b) in filtered.iter().tuple_windows() {
// TODO: ties within X% to the server with the smallest block_data_limit?
// find rpc with the lowest weighted peak latency. backups always lose. rate limits always lose
// TODO: should next_available be reversed?
// TODO: this is similar to sort_for_load_balancing_on, but at this point we don't want to prefer tiers
// TODO: move ethis to a helper function just so we can test it
// TODO: should x.next_available should be Reverse<_>?
let best_rpc = best_rpc(rpc_a, rpc_b);
match best_rpc
.try_request_handle(&self.request, error_handler, false)
.await
{
Ok(OpenRequestResult::Handle(handle)) => {
trace!("opened handle: {}", best_rpc);
completed.insert(best_rpc);
yield handle;
}
Ok(OpenRequestResult::RetryAt(retry_at)) => {
trace!(
"retry on {} @ {}",
best_rpc,
retry_at.duration_since(Instant::now()).as_secs_f32()
);
attempted.insert(best_rpc);
earliest_retry_at = earliest_retry_at.min(Some(retry_at, ));
}
Ok(OpenRequestResult::Lagged(x)) => {
// this will probably always be the same block, right?
trace!("{} is lagged. will not work now", best_rpc);
attempted.insert(best_rpc);
wait_for_sync.push(x);
}
Ok(OpenRequestResult::Failed) => {
// TODO: log a warning? emit a stat?
trace!("best_rpc not ready: {}", best_rpc);
completed.insert(best_rpc);
}
Err(err) => {
trace!("No request handle for {}. err={:?}", best_rpc, err);
completed.insert(best_rpc);
}
// TODO: we used to do a neat power of 2 random choices here, but it had bugs. bring that back
for best_rpc in self.inner.iter() {
match best_rpc
.try_request_handle(&self.request, error_handler, false)
.await
{
Ok(OpenRequestResult::Handle(handle)) => {
trace!("opened handle: {}", best_rpc);
yield handle;
}
Ok(OpenRequestResult::RetryAt(retry_at)) => {
trace!(
"retry on {} @ {}",
best_rpc,
retry_at.duration_since(Instant::now()).as_secs_f32()
);
earliest_retry_at = earliest_retry_at.min(Some(retry_at, ));
}
Ok(OpenRequestResult::Lagged(x)) => {
// this will probably always be the same block, right?
trace!("{} is lagged. will not work now", best_rpc);
wait_for_sync.push(x);
}
Ok(OpenRequestResult::Failed) => {
// TODO: log a warning? emit a stat?
trace!("best_rpc not ready: {}", best_rpc);
}
Err(err) => {
trace!("No request handle for {}. err={:?}", best_rpc, err);
}
}
debug_assert!(!(attempted.is_empty() && completed.is_empty()));
yield_now().await;
}
// if we got this far, no inner or outer rpcs are ready. thats suprising since an inner should have been ready. maybe it got rate limited
warn!(?earliest_retry_at, num_waits=%wait_for_sync.len(), "no rpcs ready");
break;
let min_wait_until = Instant::now() + Duration::from_millis(10);
// clear earliest_retry_at if it is too far in the future to help us
@ -958,6 +926,8 @@ impl RpcsForRequest {
// set a minimum of 100ms. this is probably actually a bug we should figure out.
earliest_retry_at = Some(corrected);
} else if wait_for_sync.is_empty() {
break;
} else {
earliest_retry_at = Some(self.request.connect_timeout_at());
}

@ -18,7 +18,6 @@ use latency::{EwmaLatency, PeakEwmaLatency, RollingQuantileLatency};
use migration::sea_orm::DatabaseConnection;
use nanorand::tls::TlsWyRand;
use nanorand::Rng;
use ordered_float::OrderedFloat;
use redis_rate_limiter::{RedisPool, RedisRateLimitResult, RedisRateLimiter};
use serde::ser::{SerializeStruct, Serializer};
use serde::Serialize;
@ -293,17 +292,19 @@ impl Web3Rpc {
&self,
max_block: Option<U64>,
start_instant: Instant,
) -> ((Instant, bool, Reverse<U64>, u32), OrderedFloat<f32>) {
) -> ((Instant, bool, Reverse<U64>, u32), Duration) {
let sort_on = self.sort_on(max_block, start_instant);
// TODO: i think median is better than weighted at this point. we save checking weighted for the very end
let median_latency = self
.median_latency
.as_ref()
.map(|x| x.seconds())
.unwrap_or_default();
// // TODO: once we do power-of-2 choices, put median_latency back
// let median_latency = self
// .median_latency
// .as_ref()
// .map(|x| x.seconds())
// .unwrap_or_default();
let x = (sort_on, OrderedFloat::from(median_latency));
let weighted_latency = self.weighted_peak_latency();
let x = (sort_on, weighted_latency);
trace!("sort_for_load_balancing {}: {:?}", self, x);

@ -1,6 +1,6 @@
[package]
name = "web3_proxy_cli"
version = "1.43.35"
version = "1.43.36"
edition = "2021"
default-run = "web3_proxy_cli"