split inner and outer

This commit is contained in:
Bryan Stitt 2023-10-06 18:34:36 -07:00
parent 949c3eeb5d
commit f7997a4878
2 changed files with 51 additions and 17 deletions

View File

@ -34,7 +34,7 @@ impl Serialize for Web3ProxyBlock {
// TODO: i'm not sure about this name
let mut state = serializer.serialize_struct("saved_block", 2)?;
state.serialize_field("age", &self.age())?;
state.serialize_field("age", &self.age().as_secs_f32())?;
let block = json!({
"hash": self.0.hash,

View File

@ -124,6 +124,7 @@ pub struct RankedRpcs {
// TODO: could these be refs? The owning RankedRpcs lifetime might work. `stream!` might make it complicated
pub struct RpcsForRequest {
inner: Vec<Arc<Web3Rpc>>,
outer: Vec<Arc<Web3Rpc>>,
request: Arc<Web3Request>,
}
@ -276,10 +277,9 @@ impl RankedRpcs {
}
}
inner.extend(outer);
Some(RpcsForRequest {
inner,
outer,
request: web3_request.clone(),
})
}
@ -972,6 +972,18 @@ impl ConsensusFinder {
}
}
fn best_rpc<'a>(rpc_a: &'a Arc<Web3Rpc>, rpc_b: &'a Arc<Web3Rpc>) -> &'a Arc<Web3Rpc> {
let now = Instant::now();
let faster = min_by_key(rpc_a, rpc_b, |x| {
(x.next_available(now), x.backup, x.weighted_peak_latency())
});
trace!("{} vs {} = {}", rpc_a, rpc_b, faster);
faster
}
impl RpcsForRequest {
pub fn to_stream(self) -> impl Stream<Item = OpenRequestHandle> {
// TODO: get error_handler out of the web3_request, probably the authorization
@ -982,30 +994,29 @@ impl RpcsForRequest {
loop {
let mut earliest_retry_at = None;
let now = Instant::now();
// TODO: need an iter of self.inner, then self.outer
// we have to collect cuz apparently a Chain isn't sized
// first check the inners
// TODO: DRY
for (rpc_a, rpc_b) in self.inner.iter().circular_tuple_windows() {
trace!("{} vs {}", rpc_a, rpc_b);
// TODO: ties within X% to the server with the smallest block_data_limit?
// find rpc with the lowest weighted peak latency. backups always lose. rate limits always lose
let faster_rpc = min_by_key(rpc_a, rpc_b, |x| (Reverse(x.next_available(now)), x.backup, x.weighted_peak_latency()));
trace!("winner: {}", faster_rpc);
// TODO: should next_available be reversed?
// TODO: this is similar to sort_for_load_balancing_on, but at this point we don't want to prefer tiers
// TODO: move ethis to a helper function just so we can test it
// TODO: should x.next_available should be Reverse<_>?
let best_rpc = best_rpc(rpc_a, rpc_b);
match faster_rpc
match best_rpc
.try_request_handle(&self.request, error_handler)
.await
{
Ok(OpenRequestResult::Handle(handle)) => {
trace!("opened handle: {}", faster_rpc);
trace!("opened handle: {}", best_rpc);
yield handle;
}
Ok(OpenRequestResult::RetryAt(retry_at)) => {
trace!(
"retry on {} @ {}",
faster_rpc,
best_rpc,
retry_at.duration_since(Instant::now()).as_secs_f32()
);
@ -1014,17 +1025,40 @@ impl RpcsForRequest {
}
Ok(OpenRequestResult::NotReady) => {
// TODO: log a warning? emit a stat?
trace!("best_rpc not ready: {}", faster_rpc);
trace!("best_rpc not ready: {}", best_rpc);
continue;
}
Err(err) => {
trace!("No request handle for {}. err={:?}", faster_rpc, err);
trace!("No request handle for {}. err={:?}", best_rpc, err);
continue;
}
}
}
// TODO: check self.outer
// check self.outer only after self.inner. thats because the outer rpcs weren't ready to serve the request
for (rpc_a, rpc_b) in self.outer.iter().circular_tuple_windows() {
// TODO: ties within X% to the server with the smallest block_data_limit?
// find rpc with the lowest weighted peak latency. backups always lose. rate limits always lose
// TODO: should next_available be reversed?
// TODO: this is similar to sort_for_load_balancing_on, but at this point we don't want to prefer tiers
// TODO: move ethis to a helper function just so we can test it
// TODO: should x.next_available should be Reverse<_>?
let best_rpc = best_rpc(rpc_a, rpc_b);
match best_rpc
.wait_for_request_handle(&self.request, error_handler)
.await
{
Ok(handle) => {
trace!("opened handle: {}", best_rpc);
yield handle;
}
Err(err) => {
trace!("No request handle for {}. err={:?}", best_rpc, err);
continue;
}
}
}
if let Some(retry_at) = earliest_retry_at {
if self.request.expire_instant <= retry_at {