This commit is contained in:
Bryan Stitt 2023-10-07 16:52:54 -07:00
parent e02a1e3b7c
commit d04f99a0e4

View File

@ -1004,6 +1004,7 @@ fn best_rpc<'a>(rpc_a: &'a Arc<Web3Rpc>, rpc_b: &'a Arc<Web3Rpc>) -> &'a Arc<Web
} }
impl RpcsForRequest { impl RpcsForRequest {
/// TODO: uncomment the code that makes this wait for rpcs if it thinks they will be ready soon
pub fn to_stream(self) -> impl Stream<Item = OpenRequestHandle> { pub fn to_stream(self) -> impl Stream<Item = OpenRequestHandle> {
// TODO: get error_handler out of the web3_request, probably the authorization // TODO: get error_handler out of the web3_request, probably the authorization
// let error_handler = web3_request.authorization.error_handler; // let error_handler = web3_request.authorization.error_handler;
@ -1014,6 +1015,7 @@ impl RpcsForRequest {
if self.request.ttl_expired() { if self.request.ttl_expired() {
break; break;
} else { } else {
// TODO: think about this more
yield_now().await; yield_now().await;
} }
@ -1022,79 +1024,57 @@ impl RpcsForRequest {
// first check the inners // first check the inners
// TODO: DRY // TODO: DRY
for (rpc_a, rpc_b) in self.inner.iter().circular_tuple_windows() { for rpcs_iter in [self.inner.iter(), self.outer.iter()] {
// TODO: ties within X% to the server with the smallest block_data_limit? for (rpc_a, rpc_b) in rpcs_iter.circular_tuple_windows() {
// find rpc with the lowest weighted peak latency. backups always lose. rate limits always lose // TODO: ties within X% to the server with the smallest block_data_limit?
// TODO: should next_available be reversed? // find rpc with the lowest weighted peak latency. backups always lose. rate limits always lose
// TODO: this is similar to sort_for_load_balancing_on, but at this point we don't want to prefer tiers // TODO: should next_available be reversed?
// TODO: move ethis to a helper function just so we can test it // TODO: this is similar to sort_for_load_balancing_on, but at this point we don't want to prefer tiers
// TODO: should x.next_available should be Reverse<_>? // TODO: move ethis to a helper function just so we can test it
let best_rpc = best_rpc(rpc_a, rpc_b); // TODO: should x.next_available should be Reverse<_>?
let best_rpc = best_rpc(rpc_a, rpc_b);
match best_rpc match best_rpc
.try_request_handle(&self.request, error_handler) .try_request_handle(&self.request, error_handler)
.await .await
{ {
Ok(OpenRequestResult::Handle(handle)) => { Ok(OpenRequestResult::Handle(handle)) => {
trace!("opened handle: {}", best_rpc); trace!("opened handle: {}", best_rpc);
yield handle; yield handle;
} }
Ok(OpenRequestResult::RetryAt(retry_at)) => { Ok(OpenRequestResult::RetryAt(retry_at)) => {
trace!( trace!(
"retry on {} @ {}", "retry on {} @ {}",
best_rpc, best_rpc,
retry_at.duration_since(Instant::now()).as_secs_f32() retry_at.duration_since(Instant::now()).as_secs_f32()
); );
earliest_retry_at = earliest_retry_at.min(Some(retry_at)); earliest_retry_at = earliest_retry_at.min(Some(retry_at));
continue; continue;
} }
Ok(OpenRequestResult::Lagged(x)) => { Ok(OpenRequestResult::Lagged(x)) => {
trace!("{} is lagged. will not work now", best_rpc); trace!("{} is lagged. will not work now", best_rpc);
// this will probably always be the same block, right? // this will probably always be the same block, right?
if wait_for_sync.is_none() { if wait_for_sync.is_none() {
wait_for_sync = Some(x); wait_for_sync = Some(x);
}
continue;
}
Ok(OpenRequestResult::NotReady) => {
// TODO: log a warning? emit a stat?
trace!("best_rpc not ready: {}", best_rpc);
continue;
}
Err(err) => {
trace!("No request handle for {}. err={:?}", best_rpc, err);
continue;
} }
continue;
}
Ok(OpenRequestResult::NotReady) => {
// TODO: log a warning? emit a stat?
trace!("best_rpc not ready: {}", best_rpc);
continue;
}
Err(err) => {
trace!("No request handle for {}. err={:?}", best_rpc, err);
continue;
} }
} }
} }
// check self.outer only after self.inner. thats because the outer rpcs weren't ready to serve the request // if we got this far, no inner or outer rpcs are ready. thats suprising since an inner should have been
for (rpc_a, rpc_b) in self.outer.iter().circular_tuple_windows() { // maybe someone requested something silly like a far future block?
// TODO: ties within X% to the server with the smallest block_data_limit?
// find rpc with the lowest weighted peak latency. backups always lose. rate limits always lose
// TODO: should next_available be reversed?
// TODO: this is similar to sort_for_load_balancing_on, but at this point we don't want to prefer tiers
// TODO: move ethis to a helper function just so we can test it
// TODO: should x.next_available should be Reverse<_>?
let best_rpc = best_rpc(rpc_a, rpc_b);
match best_rpc
.wait_for_request_handle(&self.request, error_handler)
.await
{
Ok(handle) => {
trace!("opened handle: {}", best_rpc);
yield handle;
}
Err(err) => {
trace!("No request handle for {}. err={:?}", best_rpc, err);
continue;
}
}
}
// if we got this far, no rpcs are ready
// clear earliest_retry_at if it is too far in the future to help us // clear earliest_retry_at if it is too far in the future to help us
if let Some(retry_at) = earliest_retry_at { if let Some(retry_at) = earliest_retry_at {
@ -1104,16 +1084,21 @@ impl RpcsForRequest {
} }
} }
// TODO: i think theres a bug here so i
match (wait_for_sync, earliest_retry_at) { match (wait_for_sync, earliest_retry_at) {
(None, None) => { (None, None) => {
// we have nothing to wait for. uh oh! // we have nothing to wait for. uh oh!
break; break;
} }
(None, Some(retry_at)) => { (_, Some(retry_at)) => {
// try again after rate limits are done // try again after rate limits are done
sleep_until(retry_at).await; sleep_until(retry_at).await;
} }
(Some(wait_for_sync), None) => { (Some(wait_for_sync), None) => {
break;
// TODO: think about this more
/*
select! { select! {
x = wait_for_sync => { x = wait_for_sync => {
match x { match x {
@ -1132,7 +1117,9 @@ impl RpcsForRequest {
break; break;
} }
} }
*/
} }
/*
(Some(wait_for_sync), Some(retry_at)) => { (Some(wait_for_sync), Some(retry_at)) => {
select! { select! {
x = wait_for_sync => { x = wait_for_sync => {
@ -1154,6 +1141,7 @@ impl RpcsForRequest {
} }
} }
} }
*/
} }
} }
} }