diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index 85860e23..c45be599 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -1004,6 +1004,7 @@ fn best_rpc<'a>(rpc_a: &'a Arc, rpc_b: &'a Arc) -> &'a Arc impl Stream { // TODO: get error_handler out of the web3_request, probably the authorization // let error_handler = web3_request.authorization.error_handler; @@ -1014,6 +1015,7 @@ impl RpcsForRequest { if self.request.ttl_expired() { break; } else { + // TODO: think about this more yield_now().await; } @@ -1022,79 +1024,57 @@ impl RpcsForRequest { // first check the inners // TODO: DRY - for (rpc_a, rpc_b) in self.inner.iter().circular_tuple_windows() { - // TODO: ties within X% to the server with the smallest block_data_limit? - // find rpc with the lowest weighted peak latency. backups always lose. rate limits always lose - // TODO: should next_available be reversed? - // TODO: this is similar to sort_for_load_balancing_on, but at this point we don't want to prefer tiers - // TODO: move ethis to a helper function just so we can test it - // TODO: should x.next_available should be Reverse<_>? - let best_rpc = best_rpc(rpc_a, rpc_b); + for rpcs_iter in [self.inner.iter(), self.outer.iter()] { + for (rpc_a, rpc_b) in rpcs_iter.circular_tuple_windows() { + // TODO: ties within X% to the server with the smallest block_data_limit? + // find rpc with the lowest weighted peak latency. backups always lose. rate limits always lose + // TODO: should next_available be reversed? + // TODO: this is similar to sort_for_load_balancing_on, but at this point we don't want to prefer tiers + // TODO: move ethis to a helper function just so we can test it + // TODO: should x.next_available should be Reverse<_>? + let best_rpc = best_rpc(rpc_a, rpc_b); - match best_rpc - .try_request_handle(&self.request, error_handler) - .await - { - Ok(OpenRequestResult::Handle(handle)) => { - trace!("opened handle: {}", best_rpc); - yield handle; - } - Ok(OpenRequestResult::RetryAt(retry_at)) => { - trace!( - "retry on {} @ {}", - best_rpc, - retry_at.duration_since(Instant::now()).as_secs_f32() - ); - - earliest_retry_at = earliest_retry_at.min(Some(retry_at)); - continue; - } - Ok(OpenRequestResult::Lagged(x)) => { - trace!("{} is lagged. will not work now", best_rpc); - // this will probably always be the same block, right? - if wait_for_sync.is_none() { - wait_for_sync = Some(x); + match best_rpc + .try_request_handle(&self.request, error_handler) + .await + { + Ok(OpenRequestResult::Handle(handle)) => { + trace!("opened handle: {}", best_rpc); + yield handle; + } + Ok(OpenRequestResult::RetryAt(retry_at)) => { + trace!( + "retry on {} @ {}", + best_rpc, + retry_at.duration_since(Instant::now()).as_secs_f32() + ); + + earliest_retry_at = earliest_retry_at.min(Some(retry_at)); + continue; + } + Ok(OpenRequestResult::Lagged(x)) => { + trace!("{} is lagged. will not work now", best_rpc); + // this will probably always be the same block, right? + if wait_for_sync.is_none() { + wait_for_sync = Some(x); + } + continue; + } + Ok(OpenRequestResult::NotReady) => { + // TODO: log a warning? emit a stat? + trace!("best_rpc not ready: {}", best_rpc); + continue; + } + Err(err) => { + trace!("No request handle for {}. err={:?}", best_rpc, err); + continue; } - continue; - } - Ok(OpenRequestResult::NotReady) => { - // TODO: log a warning? emit a stat? - trace!("best_rpc not ready: {}", best_rpc); - continue; - } - Err(err) => { - trace!("No request handle for {}. err={:?}", best_rpc, err); - continue; } } } - // check self.outer only after self.inner. thats because the outer rpcs weren't ready to serve the request - for (rpc_a, rpc_b) in self.outer.iter().circular_tuple_windows() { - // TODO: ties within X% to the server with the smallest block_data_limit? - // find rpc with the lowest weighted peak latency. backups always lose. rate limits always lose - // TODO: should next_available be reversed? - // TODO: this is similar to sort_for_load_balancing_on, but at this point we don't want to prefer tiers - // TODO: move ethis to a helper function just so we can test it - // TODO: should x.next_available should be Reverse<_>? - let best_rpc = best_rpc(rpc_a, rpc_b); - - match best_rpc - .wait_for_request_handle(&self.request, error_handler) - .await - { - Ok(handle) => { - trace!("opened handle: {}", best_rpc); - yield handle; - } - Err(err) => { - trace!("No request handle for {}. err={:?}", best_rpc, err); - continue; - } - } - } - - // if we got this far, no rpcs are ready + // if we got this far, no inner or outer rpcs are ready. thats suprising since an inner should have been + // maybe someone requested something silly like a far future block? // clear earliest_retry_at if it is too far in the future to help us if let Some(retry_at) = earliest_retry_at { @@ -1104,16 +1084,21 @@ impl RpcsForRequest { } } + // TODO: i think theres a bug here so i match (wait_for_sync, earliest_retry_at) { (None, None) => { // we have nothing to wait for. uh oh! break; } - (None, Some(retry_at)) => { + (_, Some(retry_at)) => { // try again after rate limits are done sleep_until(retry_at).await; } (Some(wait_for_sync), None) => { + break; + + // TODO: think about this more + /* select! { x = wait_for_sync => { match x { @@ -1132,7 +1117,9 @@ impl RpcsForRequest { break; } } + */ } + /* (Some(wait_for_sync), Some(retry_at)) => { select! { x = wait_for_sync => { @@ -1154,6 +1141,7 @@ impl RpcsForRequest { } } } + */ } } }