From c95911098652b8ddcfae14d71f55db06e054eeea Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Fri, 10 Feb 2023 20:24:20 -0800 Subject: [PATCH] better sorting of connections --- TODO.md | 5 +++++ web3_proxy/src/app/mod.rs | 4 +++- web3_proxy/src/rpcs/many.rs | 33 +++++++++++++++++---------------- web3_proxy/src/rpcs/one.rs | 8 ++++---- 4 files changed, 29 insertions(+), 21 deletions(-) diff --git a/TODO.md b/TODO.md index 207567c4..7b46c0e8 100644 --- a/TODO.md +++ b/TODO.md @@ -339,6 +339,10 @@ These are not yet ordered. There might be duplicates. We might not actually need - [-] add configurable size limits to all the Caches - instead of configuring each cache with MB sizes, have one value for total memory footprint and then percentages for each cache - https://github.com/moka-rs/moka/issues/201 +- [ ] have multiple providers on each backend rpc. one websocket for newHeads. and then http providers for handling requests + - erigon only streams the JSON over HTTP. that code isn't enabled for websockets. so this should save memory on the erigon servers + - i think this also means we don't need to worry about changing the id that the user gives us. + - have the healthcheck get the block over http. if it errors, or doesn't match what the websocket says, something is wrong (likely a deadlock in the websocket code) - [ ] have private transactions be enabled by a url setting rather than a setting on the key - [ ] cli for adding rpc keys to an existing user - [ ] rate limiting/throttling on query_user_stats @@ -349,6 +353,7 @@ These are not yet ordered. There might be duplicates. We might not actually need - if total difficulty is not on the block and we aren't on ETH, fetch the full block instead of just the header - if total difficulty is set and non-zero, use it for consensus instead of just the number - [ ] query_user_stats cache hit rate +- [ ] need debounce on reconnect. websockets are closing on us and then we reconnect twice. locks on ProviderState need more thought - [ ] having the whole block in status is very verbose. trim it down - [ ] `cost estimate` script - sum bytes and number of requests. prompt hosting costs. divide diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 75610ca2..a0805fbe 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -1222,13 +1222,15 @@ impl Web3ProxyApp { (&self.balanced_rpcs, default_num) }; + let head_block_num = self.balanced_rpcs.head_block_num(); + // try_send_all_upstream_servers puts the request id into the response. no need to do that ourselves here. let mut response = private_rpcs .try_send_all_synced_connections( authorization, &request, Some(request_metadata.clone()), - None, + head_block_num.as_ref(), Level::Trace, num, true, diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index a46e66f6..a2b555b5 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -680,12 +680,11 @@ impl Web3Rpcs { .clone(); // synced connections are all on the same block. sort them by tier with higher soft limits first - synced_conns.sort_by_cached_key(|x| (x.tier, u32::MAX - x.soft_limit)); + synced_conns.sort_by_cached_key(sort_rpcs_by_sync_status); // if there aren't enough synced connections, include more connections let mut all_conns: Vec<_> = self.conns.values().cloned().collect(); - - sort_connections_by_sync_status(&mut all_conns); + all_conns.sort_by_cached_key(sort_rpcs_by_sync_status); for connection in itertools::chain(synced_conns, all_conns) { if max_count == 0 { @@ -1153,19 +1152,21 @@ impl Serialize for Web3Rpcs { } /// sort by block number (descending) and tier (ascending) -fn sort_connections_by_sync_status(rpcs: &mut Vec>) { - rpcs.sort_by_cached_key(|x| { - let reversed_head_block = u64::MAX - - x.head_block - .read() - .as_ref() - .map(|x| x.number().as_u64()) - .unwrap_or(0); +/// TODO: should this be moved into a `impl Web3Rpc`? +/// TODO: take AsRef or something like that? We don't need an Arc here +fn sort_rpcs_by_sync_status(x: &Arc) -> (u64, u64, u32) { + let reversed_head_block = u64::MAX + - x.head_block + .read() + .as_ref() + .map(|x| x.number().as_u64()) + .unwrap_or(0); - let tier = x.tier; + let tier = x.tier; - (reversed_head_block, tier) - }); + let request_ewma = x.latency.request_ewma; + + (reversed_head_block, tier, request_ewma) } mod tests { @@ -1208,7 +1209,7 @@ mod tests { .map(|x| SavedBlock::new(Arc::new(x))) .collect(); - let mut rpcs = [ + let mut rpcs: Vec<_> = [ Web3Rpc { name: "a".to_string(), tier: 0, @@ -1250,7 +1251,7 @@ mod tests { .map(Arc::new) .collect(); - sort_connections_by_sync_status(&mut rpcs); + rpcs.sort_by_cached_key(sort_rpcs_by_sync_status); let names_in_sort_order: Vec<_> = rpcs.iter().map(|x| x.name.as_str()).collect(); diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index 05bc0e54..9a01cd80 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -67,13 +67,13 @@ impl ProviderState { pub struct Web3RpcLatencies { /// Traack how far behind the fastest node we are - new_head: Histogram, + pub new_head: Histogram, /// exponentially weighted moving average of how far behind the fastest node we are - new_head_ewma: u32, + pub new_head_ewma: u32, /// Track how long an rpc call takes on average - request: Histogram, + pub request: Histogram, /// exponentially weighted moving average of how far behind the fastest node we are - request_ewma: u32, + pub request_ewma: u32, } impl Default for Web3RpcLatencies {