From ca1e55037076e066f1efc111127202d63522b51d Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Thu, 2 Feb 2023 14:48:23 -0800 Subject: [PATCH] improve sort order during eth_sendRawTransaction --- TODO.md | 3 + web3_proxy/src/rpcs/connection.rs | 7 ++ web3_proxy/src/rpcs/connections.rs | 114 ++++++++++++++++++++++++++--- 3 files changed, 114 insertions(+), 10 deletions(-) diff --git a/TODO.md b/TODO.md index f115d336..6a1d8a58 100644 --- a/TODO.md +++ b/TODO.md @@ -324,6 +324,9 @@ These are not yet ordered. There might be duplicates. We might not actually need - [x] improve waiting for sync when rate limited - [x] improve pager duty errors for smarter deduping - [x] add create_key cli command +- [x] short lived cache on /health +- [x] cache /status for longer +- [x] sort connections during eth_sendRawTransaction - [-] proxy mode for benchmarking all backends - [-] proxy mode for sending to multiple backends - [-] let users choose a % of reverts to log (or maybe x/second). someone like curve logging all reverts will be a BIG database very quickly diff --git a/web3_proxy/src/rpcs/connection.rs b/web3_proxy/src/rpcs/connection.rs index cddb8feb..838023fa 100644 --- a/web3_proxy/src/rpcs/connection.rs +++ b/web3_proxy/src/rpcs/connection.rs @@ -35,6 +35,12 @@ pub enum ProviderState { Connected(Arc), } +impl Default for ProviderState { + fn default() -> Self { + Self::None + } +} + impl ProviderState { pub async fn provider(&self, allow_not_ready: bool) -> Option<&Arc> { match self { @@ -59,6 +65,7 @@ impl ProviderState { } /// An active connection to a Web3 RPC server like geth or erigon. +#[derive(Default)] pub struct Web3Connection { pub name: String, pub display_name: Option, diff --git a/web3_proxy/src/rpcs/connections.rs b/web3_proxy/src/rpcs/connections.rs index 5db5a432..5c536c27 100644 --- a/web3_proxy/src/rpcs/connections.rs +++ b/web3_proxy/src/rpcs/connections.rs @@ -678,17 +678,21 @@ impl Web3Connections { let mut tried = HashSet::new(); - let conns_to_try = itertools::chain( - // TODO: sort by tier - self.watch_consensus_connections_sender - .borrow() - .conns - .clone(), - // TODO: sort by tier - self.conns.values().cloned(), - ); + let mut synced_conns = self + .watch_consensus_connections_sender + .borrow() + .conns + .clone(); - for connection in conns_to_try { + // synced connections are all on the same block. sort them by tier with higher soft limits first + synced_conns.sort_by_cached_key(|x| (x.tier, u32::MAX - x.soft_limit)); + + // if there aren't enough synced connections, include more connections + let mut all_conns: Vec<_> = self.conns.values().cloned().collect(); + + sort_connections_by_sync_status(&mut all_conns); + + for connection in itertools::chain(synced_conns, all_conns) { if max_count == 0 { break; } @@ -1147,6 +1151,22 @@ impl Serialize for Web3Connections { } } +/// sort by block number (descending) and tier (ascending) +fn sort_connections_by_sync_status(rpcs: &mut Vec>) { + rpcs.sort_by_cached_key(|x| { + let reversed_head_block = u64::MAX + - x.head_block + .read() + .as_ref() + .map(|x| x.number().as_u64()) + .unwrap_or(0); + + let tier = x.tier; + + (reversed_head_block, tier) + }); +} + mod tests { // TODO: why is this allow needed? does tokio::test get in the way somehow? #![allow(unused_imports)] @@ -1162,6 +1182,80 @@ mod tests { use std::time::{SystemTime, UNIX_EPOCH}; use tokio::sync::RwLock as AsyncRwLock; + #[tokio::test] + async fn test_sort_connections_by_sync_status() { + let block_0 = Block { + number: Some(0.into()), + hash: Some(H256::random()), + ..Default::default() + }; + let block_1 = Block { + number: Some(1.into()), + hash: Some(H256::random()), + parent_hash: block_0.hash.unwrap(), + ..Default::default() + }; + let block_2 = Block { + number: Some(2.into()), + hash: Some(H256::random()), + parent_hash: block_1.hash.unwrap(), + ..Default::default() + }; + + let blocks: Vec<_> = [block_0, block_1, block_2] + .into_iter() + .map(|x| SavedBlock::new(Arc::new(x))) + .collect(); + + let mut rpcs = [ + Web3Connection { + name: "a".to_string(), + tier: 0, + head_block: RwLock::new(None), + ..Default::default() + }, + Web3Connection { + name: "b".to_string(), + tier: 0, + head_block: RwLock::new(blocks.get(1).cloned()), + ..Default::default() + }, + Web3Connection { + name: "c".to_string(), + tier: 0, + head_block: RwLock::new(blocks.get(2).cloned()), + ..Default::default() + }, + Web3Connection { + name: "d".to_string(), + tier: 1, + head_block: RwLock::new(None), + ..Default::default() + }, + Web3Connection { + name: "e".to_string(), + tier: 1, + head_block: RwLock::new(blocks.get(1).cloned()), + ..Default::default() + }, + Web3Connection { + name: "f".to_string(), + tier: 1, + head_block: RwLock::new(blocks.get(2).cloned()), + ..Default::default() + }, + ] + .into_iter() + .map(Arc::new) + .collect(); + + sort_connections_by_sync_status(&mut rpcs); + + let names_in_sort_order: Vec<_> = rpcs.iter().map(|x| x.name.as_str()).collect(); + + assert_eq!(names_in_sort_order, ["c", "f", "b", "e", "a", "d"]); + } + #[tokio::test] async fn test_server_selection_by_height() { // TODO: do this better. can test_env_logger and tokio test be stacked?