diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index 956aec15..5cf85efe 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -34,7 +34,7 @@ impl Serialize for Web3ProxyBlock { // TODO: i'm not sure about this name let mut state = serializer.serialize_struct("saved_block", 2)?; - state.serialize_field("age", &self.age())?; + state.serialize_field("age", &self.age().as_secs_f32())?; let block = json!({ "hash": self.0.hash, diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index 878c77c2..3b666228 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -124,6 +124,7 @@ pub struct RankedRpcs { // TODO: could these be refs? The owning RankedRpcs lifetime might work. `stream!` might make it complicated pub struct RpcsForRequest { inner: Vec>, + outer: Vec>, request: Arc, } @@ -276,10 +277,9 @@ impl RankedRpcs { } } - inner.extend(outer); - Some(RpcsForRequest { inner, + outer, request: web3_request.clone(), }) } @@ -972,6 +972,18 @@ impl ConsensusFinder { } } +fn best_rpc<'a>(rpc_a: &'a Arc, rpc_b: &'a Arc) -> &'a Arc { + let now = Instant::now(); + + let faster = min_by_key(rpc_a, rpc_b, |x| { + (x.next_available(now), x.backup, x.weighted_peak_latency()) + }); + + trace!("{} vs {} = {}", rpc_a, rpc_b, faster); + + faster +} + impl RpcsForRequest { pub fn to_stream(self) -> impl Stream { // TODO: get error_handler out of the web3_request, probably the authorization @@ -982,30 +994,29 @@ impl RpcsForRequest { loop { let mut earliest_retry_at = None; - let now = Instant::now(); - - // TODO: need an iter of self.inner, then self.outer - - // we have to collect cuz apparently a Chain isn't sized + // first check the inners + // TODO: DRY for (rpc_a, rpc_b) in self.inner.iter().circular_tuple_windows() { - trace!("{} vs {}", rpc_a, rpc_b); // TODO: ties within X% to the server with the smallest block_data_limit? // find rpc with the lowest weighted peak latency. backups always lose. rate limits always lose - let faster_rpc = min_by_key(rpc_a, rpc_b, |x| (Reverse(x.next_available(now)), x.backup, x.weighted_peak_latency())); - trace!("winner: {}", faster_rpc); + // TODO: should next_available be reversed? + // TODO: this is similar to sort_for_load_balancing_on, but at this point we don't want to prefer tiers + // TODO: move ethis to a helper function just so we can test it + // TODO: should x.next_available should be Reverse<_>? + let best_rpc = best_rpc(rpc_a, rpc_b); - match faster_rpc + match best_rpc .try_request_handle(&self.request, error_handler) .await { Ok(OpenRequestResult::Handle(handle)) => { - trace!("opened handle: {}", faster_rpc); + trace!("opened handle: {}", best_rpc); yield handle; } Ok(OpenRequestResult::RetryAt(retry_at)) => { trace!( "retry on {} @ {}", - faster_rpc, + best_rpc, retry_at.duration_since(Instant::now()).as_secs_f32() ); @@ -1014,17 +1025,40 @@ impl RpcsForRequest { } Ok(OpenRequestResult::NotReady) => { // TODO: log a warning? emit a stat? - trace!("best_rpc not ready: {}", faster_rpc); + trace!("best_rpc not ready: {}", best_rpc); continue; } Err(err) => { - trace!("No request handle for {}. err={:?}", faster_rpc, err); + trace!("No request handle for {}. err={:?}", best_rpc, err); continue; } } } - // TODO: check self.outer + // check self.outer only after self.inner. thats because the outer rpcs weren't ready to serve the request + for (rpc_a, rpc_b) in self.outer.iter().circular_tuple_windows() { + // TODO: ties within X% to the server with the smallest block_data_limit? + // find rpc with the lowest weighted peak latency. backups always lose. rate limits always lose + // TODO: should next_available be reversed? + // TODO: this is similar to sort_for_load_balancing_on, but at this point we don't want to prefer tiers + // TODO: move ethis to a helper function just so we can test it + // TODO: should x.next_available should be Reverse<_>? + let best_rpc = best_rpc(rpc_a, rpc_b); + + match best_rpc + .wait_for_request_handle(&self.request, error_handler) + .await + { + Ok(handle) => { + trace!("opened handle: {}", best_rpc); + yield handle; + } + Err(err) => { + trace!("No request handle for {}. err={:?}", best_rpc, err); + continue; + } + } + } if let Some(retry_at) = earliest_retry_at { if self.request.expire_instant <= retry_at {