diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 93672c53..b455666d 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -1410,6 +1410,7 @@ impl Web3ProxyApp { // TODO: eth_sendBundle (flashbots/eden command) // broadcast transactions to all private rpcs at once "eth_sendRawTransaction" => { + // TODO: eth_sendPrivateTransaction // TODO: decode the transaction // TODO: error if the chain_id is incorrect @@ -1423,7 +1424,7 @@ impl Web3ProxyApp { // sometimes we get an error that the transaction is already known by our nodes, // that's not really an error. Return the hash like a successful response would. - // TODO: move this to a helper function + // TODO: move this to a helper function. probably part of try_send_protected if let JsonRpcResponseEnum::RpcError{ error_data, ..} = &response { if error_data.code == -32000 && (error_data.message == "ALREADY_EXISTS: already known" diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index 53079c18..a772bc07 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -212,9 +212,6 @@ impl RankedRpcs { ranked_rpcs.push(x.clone()); } - ranked_rpcs - .sort_by_cached_key(|x| x.sort_for_load_balancing_on(Some(best_block.number()))); - // consensus found! trace!(?ranked_rpcs); @@ -260,10 +257,25 @@ impl RankedRpcs { let mut rng = nanorand::tls_rng(); - // we use shuffle instead of sort. we will compare weights when iterating RankedRpcsForRequest - inner.sort_by_cached_key(|x| x.shuffle_for_load_balancing_on(&mut rng, max_block_needed)); - outer.sort_by_cached_key(|x| x.shuffle_for_load_balancing_on(&mut rng, max_block_needed)); + // TODO: use web3_request.start_instant? I think we want it to be now + let now = Instant::now(); + match self.sort_mode { + SortMethod::Shuffle => { + // we use shuffle instead of sort. we will compare weights when iterating RankedRpcsForRequest + inner.sort_by_cached_key(|x| { + x.shuffle_for_load_balancing_on(max_block_needed, &mut rng, now) + }); + outer.sort_by_cached_key(|x| { + x.shuffle_for_load_balancing_on(max_block_needed, &mut rng, now) + }); + } + SortMethod::Sort => { + // we use shuffle instead of sort. we will compare weights when iterating RankedRpcsForRequest + inner.sort_by_cached_key(|x| x.sort_for_load_balancing_on(max_block_needed, now)); + outer.sort_by_cached_key(|x| x.sort_for_load_balancing_on(max_block_needed, now)); + } + } Some(RpcsForRequest { inner, outer, @@ -376,14 +388,7 @@ impl RankedRpcs { } } - // TODO: i think its better to do rate limits later anyways. think more about it though - // // TODO: this might be a big perf hit. benchmark - // if let Some(x) = rpc.hard_limit_until.as_ref() { - // if *x.borrow() > Instant::now() { - // trace!("{} is rate limited. will not work now", rpc,); - // return false; - // } - // } + // rate limit are handled by sort order true } @@ -976,11 +981,15 @@ impl RpcsForRequest { loop { let mut earliest_retry_at = None; + let now = Instant::now(); + + // TODO: need an iter of self.inner, then self.outer + for (rpc_a, rpc_b) in self.inner.iter().circular_tuple_windows() { trace!("{} vs {}", rpc_a, rpc_b); // TODO: ties within X% to the server with the smallest block_data_limit? // find rpc with the lowest weighted peak latency. backups always lose. rate limits always lose - let faster_rpc = min_by_key(rpc_a, rpc_b, |x| (Reverse(x.next_available()), x.backup, x.weighted_peak_latency())); + let faster_rpc = min_by_key(rpc_a, rpc_b, |x| (Reverse(x.next_available(now)), x.backup, x.weighted_peak_latency())); trace!("winner: {}", faster_rpc); match faster_rpc diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index d0cd833c..5fb267ff 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -916,7 +916,10 @@ mod tests { .map(Arc::new) .collect(); - rpcs.sort_by_cached_key(|x| x.sort_for_load_balancing_on(None)); + let now = Instant::now(); + + // TODO: also test with max_block = 0 and 1 + rpcs.sort_by_cached_key(|x| x.sort_for_load_balancing_on(Some(2.into()), now)); let names_in_sort_order: Vec<_> = rpcs.iter().map(|x| x.name.as_str()).collect(); diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index 14fa56f8..2133ea8b 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -231,10 +231,14 @@ impl Web3Rpc { Ok((new_connection, handle)) } - pub fn next_available(&self) -> Instant { - let hard_limit_until = *self.hard_limit_until.as_ref().unwrap().borrow(); + pub fn next_available(&self, now: Instant) -> Instant { + if let Some(hard_limit_until) = self.hard_limit_until.as_ref() { + let hard_limit_until = *hard_limit_until.borrow(); - hard_limit_until.max(Instant::now()) + hard_limit_until.max(now) + } else { + now + } } /// sort by... @@ -246,7 +250,11 @@ impl Web3Rpc { /// TODO: should tier or block number take priority? /// TODO: should this return a struct that implements sorting traits? /// TODO: move this to consensus.rs - fn sort_on(&self, max_block: Option) -> (Reverse, bool, Reverse, u32) { + fn sort_on( + &self, + max_block: Option, + start_instant: Instant, + ) -> (Reverse, bool, Reverse, u32) { let mut head_block = self .head_block_sender .as_ref() @@ -261,26 +269,18 @@ impl Web3Rpc { let backup = self.backup; - let rate_limit_until = if let Some(hard_limit_until) = self.hard_limit_until.as_ref() { - (*hard_limit_until.borrow()).max(Instant::now()) - } else { - Instant::now() - }; + let next_available = self.next_available(start_instant); - ( - Reverse(rate_limit_until), - !backup, - Reverse(head_block), - tier, - ) + (Reverse(next_available), !backup, Reverse(head_block), tier) } /// TODO: move this to consensus.rs pub fn sort_for_load_balancing_on( &self, max_block: Option, + start_instant: Instant, ) -> ((Reverse, bool, Reverse, u32), Duration) { - let sort_on = self.sort_on(max_block); + let sort_on = self.sort_on(max_block, start_instant); let weighted_peak_latency = self.weighted_peak_latency(); @@ -296,10 +296,11 @@ impl Web3Rpc { /// TODO: this return type is too complex pub fn shuffle_for_load_balancing_on( &self, - rng: &mut TlsWyRand, max_block: Option, + rng: &mut TlsWyRand, + start_instant: Instant, ) -> ((Reverse, bool, Reverse, u32), u8) { - let sort_on = self.sort_on(max_block); + let sort_on = self.sort_on(max_block, start_instant); let r = rng.generate::(); @@ -1041,12 +1042,11 @@ impl Web3Rpc { // TODO: if websocket is reconnecting, return an error? // check cached rate limits - if let Some(hard_limit_until) = self.hard_limit_until.as_ref() { - let hard_limit_ready = *hard_limit_until.borrow(); - let now = Instant::now(); - if now < hard_limit_ready { - return Ok(OpenRequestResult::RetryAt(hard_limit_ready)); - } + let now = Instant::now(); + let hard_limit_until = self.next_available(now); + + if now >= hard_limit_until { + return Ok(OpenRequestResult::RetryAt(hard_limit_until)); } // check shared rate limits