better sorting of connections

This commit is contained in:
Bryan Stitt 2023-02-10 20:24:20 -08:00 committed by yenicelik
parent 3232b5c7bb
commit 8af87845c9
4 changed files with 29 additions and 21 deletions

@ -339,6 +339,10 @@ These are not yet ordered. There might be duplicates. We might not actually need
- [-] add configurable size limits to all the Caches
- instead of configuring each cache with MB sizes, have one value for total memory footprint and then percentages for each cache
- https://github.com/moka-rs/moka/issues/201
- [ ] have multiple providers on each backend rpc. one websocket for newHeads. and then http providers for handling requests
- erigon only streams the JSON over HTTP. that code isn't enabled for websockets. so this should save memory on the erigon servers
- i think this also means we don't need to worry about changing the id that the user gives us.
- have the healthcheck get the block over http. if it errors, or doesn't match what the websocket says, something is wrong (likely a deadlock in the websocket code)
- [ ] have private transactions be enabled by a url setting rather than a setting on the key
- [ ] cli for adding rpc keys to an existing user
- [ ] rate limiting/throttling on query_user_stats
@ -349,6 +353,7 @@ These are not yet ordered. There might be duplicates. We might not actually need
- if total difficulty is not on the block and we aren't on ETH, fetch the full block instead of just the header
- if total difficulty is set and non-zero, use it for consensus instead of just the number
- [ ] query_user_stats cache hit rate
- [ ] need debounce on reconnect. websockets are closing on us and then we reconnect twice. locks on ProviderState need more thought
- [ ] having the whole block in status is very verbose. trim it down
- [ ] `cost estimate` script
- sum bytes and number of requests. prompt hosting costs. divide

@ -1222,13 +1222,15 @@ impl Web3ProxyApp {
(&self.balanced_rpcs, default_num)
};
let head_block_num = self.balanced_rpcs.head_block_num();
// try_send_all_upstream_servers puts the request id into the response. no need to do that ourselves here.
let mut response = private_rpcs
.try_send_all_synced_connections(
authorization,
&request,
Some(request_metadata.clone()),
None,
head_block_num.as_ref(),
Level::Trace,
num,
true,

@ -680,12 +680,11 @@ impl Web3Rpcs {
.clone();
// synced connections are all on the same block. sort them by tier with higher soft limits first
synced_conns.sort_by_cached_key(|x| (x.tier, u32::MAX - x.soft_limit));
synced_conns.sort_by_cached_key(sort_rpcs_by_sync_status);
// if there aren't enough synced connections, include more connections
let mut all_conns: Vec<_> = self.conns.values().cloned().collect();
sort_connections_by_sync_status(&mut all_conns);
all_conns.sort_by_cached_key(sort_rpcs_by_sync_status);
for connection in itertools::chain(synced_conns, all_conns) {
if max_count == 0 {
@ -1153,19 +1152,21 @@ impl Serialize for Web3Rpcs {
}
/// sort by block number (descending) and tier (ascending)
fn sort_connections_by_sync_status(rpcs: &mut Vec<Arc<Web3Rpc>>) {
rpcs.sort_by_cached_key(|x| {
let reversed_head_block = u64::MAX
- x.head_block
.read()
.as_ref()
.map(|x| x.number().as_u64())
.unwrap_or(0);
/// TODO: should this be moved into a `impl Web3Rpc`?
/// TODO: take AsRef or something like that? We don't need an Arc here
fn sort_rpcs_by_sync_status(x: &Arc<Web3Rpc>) -> (u64, u64, u32) {
let reversed_head_block = u64::MAX
- x.head_block
.read()
.as_ref()
.map(|x| x.number().as_u64())
.unwrap_or(0);
let tier = x.tier;
let tier = x.tier;
(reversed_head_block, tier)
});
let request_ewma = x.latency.request_ewma;
(reversed_head_block, tier, request_ewma)
}
mod tests {
@ -1208,7 +1209,7 @@ mod tests {
.map(|x| SavedBlock::new(Arc::new(x)))
.collect();
let mut rpcs = [
let mut rpcs: Vec<_> = [
Web3Rpc {
name: "a".to_string(),
tier: 0,
@ -1250,7 +1251,7 @@ mod tests {
.map(Arc::new)
.collect();
sort_connections_by_sync_status(&mut rpcs);
rpcs.sort_by_cached_key(sort_rpcs_by_sync_status);
let names_in_sort_order: Vec<_> = rpcs.iter().map(|x| x.name.as_str()).collect();

@ -67,13 +67,13 @@ impl ProviderState {
pub struct Web3RpcLatencies {
/// Traack how far behind the fastest node we are
new_head: Histogram<u64>,
pub new_head: Histogram<u64>,
/// exponentially weighted moving average of how far behind the fastest node we are
new_head_ewma: u32,
pub new_head_ewma: u32,
/// Track how long an rpc call takes on average
request: Histogram<u64>,
pub request: Histogram<u64>,
/// exponentially weighted moving average of how far behind the fastest node we are
request_ewma: u32,
pub request_ewma: u32,
}
impl Default for Web3RpcLatencies {