fix sort order (hopefully)

This commit is contained in:
Bryan Stitt 2023-10-06 15:57:18 -07:00
parent 0dcc324b61
commit c1df05f8b6
4 changed files with 54 additions and 41 deletions

View File

@ -1410,6 +1410,7 @@ impl Web3ProxyApp {
// TODO: eth_sendBundle (flashbots/eden command)
// broadcast transactions to all private rpcs at once
"eth_sendRawTransaction" => {
// TODO: eth_sendPrivateTransaction
// TODO: decode the transaction
// TODO: error if the chain_id is incorrect
@ -1423,7 +1424,7 @@ impl Web3ProxyApp {
// sometimes we get an error that the transaction is already known by our nodes,
// that's not really an error. Return the hash like a successful response would.
// TODO: move this to a helper function
// TODO: move this to a helper function. probably part of try_send_protected
if let JsonRpcResponseEnum::RpcError{ error_data, ..} = &response {
if error_data.code == -32000
&& (error_data.message == "ALREADY_EXISTS: already known"

View File

@ -212,9 +212,6 @@ impl RankedRpcs {
ranked_rpcs.push(x.clone());
}
ranked_rpcs
.sort_by_cached_key(|x| x.sort_for_load_balancing_on(Some(best_block.number())));
// consensus found!
trace!(?ranked_rpcs);
@ -260,10 +257,25 @@ impl RankedRpcs {
let mut rng = nanorand::tls_rng();
// we use shuffle instead of sort. we will compare weights when iterating RankedRpcsForRequest
inner.sort_by_cached_key(|x| x.shuffle_for_load_balancing_on(&mut rng, max_block_needed));
outer.sort_by_cached_key(|x| x.shuffle_for_load_balancing_on(&mut rng, max_block_needed));
// TODO: use web3_request.start_instant? I think we want it to be now
let now = Instant::now();
match self.sort_mode {
SortMethod::Shuffle => {
// we use shuffle instead of sort. we will compare weights when iterating RankedRpcsForRequest
inner.sort_by_cached_key(|x| {
x.shuffle_for_load_balancing_on(max_block_needed, &mut rng, now)
});
outer.sort_by_cached_key(|x| {
x.shuffle_for_load_balancing_on(max_block_needed, &mut rng, now)
});
}
SortMethod::Sort => {
// we use shuffle instead of sort. we will compare weights when iterating RankedRpcsForRequest
inner.sort_by_cached_key(|x| x.sort_for_load_balancing_on(max_block_needed, now));
outer.sort_by_cached_key(|x| x.sort_for_load_balancing_on(max_block_needed, now));
}
}
Some(RpcsForRequest {
inner,
outer,
@ -376,14 +388,7 @@ impl RankedRpcs {
}
}
// TODO: i think its better to do rate limits later anyways. think more about it though
// // TODO: this might be a big perf hit. benchmark
// if let Some(x) = rpc.hard_limit_until.as_ref() {
// if *x.borrow() > Instant::now() {
// trace!("{} is rate limited. will not work now", rpc,);
// return false;
// }
// }
// rate limit are handled by sort order
true
}
@ -976,11 +981,15 @@ impl RpcsForRequest {
loop {
let mut earliest_retry_at = None;
let now = Instant::now();
// TODO: need an iter of self.inner, then self.outer
for (rpc_a, rpc_b) in self.inner.iter().circular_tuple_windows() {
trace!("{} vs {}", rpc_a, rpc_b);
// TODO: ties within X% to the server with the smallest block_data_limit?
// find rpc with the lowest weighted peak latency. backups always lose. rate limits always lose
let faster_rpc = min_by_key(rpc_a, rpc_b, |x| (Reverse(x.next_available()), x.backup, x.weighted_peak_latency()));
let faster_rpc = min_by_key(rpc_a, rpc_b, |x| (Reverse(x.next_available(now)), x.backup, x.weighted_peak_latency()));
trace!("winner: {}", faster_rpc);
match faster_rpc

View File

@ -916,7 +916,10 @@ mod tests {
.map(Arc::new)
.collect();
rpcs.sort_by_cached_key(|x| x.sort_for_load_balancing_on(None));
let now = Instant::now();
// TODO: also test with max_block = 0 and 1
rpcs.sort_by_cached_key(|x| x.sort_for_load_balancing_on(Some(2.into()), now));
let names_in_sort_order: Vec<_> = rpcs.iter().map(|x| x.name.as_str()).collect();

View File

@ -231,10 +231,14 @@ impl Web3Rpc {
Ok((new_connection, handle))
}
pub fn next_available(&self) -> Instant {
let hard_limit_until = *self.hard_limit_until.as_ref().unwrap().borrow();
pub fn next_available(&self, now: Instant) -> Instant {
if let Some(hard_limit_until) = self.hard_limit_until.as_ref() {
let hard_limit_until = *hard_limit_until.borrow();
hard_limit_until.max(Instant::now())
hard_limit_until.max(now)
} else {
now
}
}
/// sort by...
@ -246,7 +250,11 @@ impl Web3Rpc {
/// TODO: should tier or block number take priority?
/// TODO: should this return a struct that implements sorting traits?
/// TODO: move this to consensus.rs
fn sort_on(&self, max_block: Option<U64>) -> (Reverse<Instant>, bool, Reverse<U64>, u32) {
fn sort_on(
&self,
max_block: Option<U64>,
start_instant: Instant,
) -> (Reverse<Instant>, bool, Reverse<U64>, u32) {
let mut head_block = self
.head_block_sender
.as_ref()
@ -261,26 +269,18 @@ impl Web3Rpc {
let backup = self.backup;
let rate_limit_until = if let Some(hard_limit_until) = self.hard_limit_until.as_ref() {
(*hard_limit_until.borrow()).max(Instant::now())
} else {
Instant::now()
};
let next_available = self.next_available(start_instant);
(
Reverse(rate_limit_until),
!backup,
Reverse(head_block),
tier,
)
(Reverse(next_available), !backup, Reverse(head_block), tier)
}
/// TODO: move this to consensus.rs
pub fn sort_for_load_balancing_on(
&self,
max_block: Option<U64>,
start_instant: Instant,
) -> ((Reverse<Instant>, bool, Reverse<U64>, u32), Duration) {
let sort_on = self.sort_on(max_block);
let sort_on = self.sort_on(max_block, start_instant);
let weighted_peak_latency = self.weighted_peak_latency();
@ -296,10 +296,11 @@ impl Web3Rpc {
/// TODO: this return type is too complex
pub fn shuffle_for_load_balancing_on(
&self,
rng: &mut TlsWyRand,
max_block: Option<U64>,
rng: &mut TlsWyRand,
start_instant: Instant,
) -> ((Reverse<Instant>, bool, Reverse<U64>, u32), u8) {
let sort_on = self.sort_on(max_block);
let sort_on = self.sort_on(max_block, start_instant);
let r = rng.generate::<u8>();
@ -1041,12 +1042,11 @@ impl Web3Rpc {
// TODO: if websocket is reconnecting, return an error?
// check cached rate limits
if let Some(hard_limit_until) = self.hard_limit_until.as_ref() {
let hard_limit_ready = *hard_limit_until.borrow();
let now = Instant::now();
if now < hard_limit_ready {
return Ok(OpenRequestResult::RetryAt(hard_limit_ready));
}
let now = Instant::now();
let hard_limit_until = self.next_available(now);
if now >= hard_limit_until {
return Ok(OpenRequestResult::RetryAt(hard_limit_until));
}
// check shared rate limits