From a242244a353060a6ad5c47bdd9d80b0fdcab15a8 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Tue, 24 Jan 2023 01:58:31 -0800 Subject: [PATCH] broadcast transactions to more servers --- TODO.md | 6 +++++ web3_proxy/src/app/mod.rs | 4 +-- web3_proxy/src/rpcs/connections.rs | 43 +++++++++++++++++++++--------- 3 files changed, 39 insertions(+), 14 deletions(-) diff --git a/TODO.md b/TODO.md index b28791d7..10393a3e 100644 --- a/TODO.md +++ b/TODO.md @@ -312,6 +312,12 @@ These are not yet ordered. There might be duplicates. We might not actually need - [x] public is 3900, but free is 360. free should be at least 3900 but probably more - [x] add --max-wait to wait_for_sync - [x] add automatic compare urls to wait_for_sync +- [x] send panics to pagerduty +- [x] enable lto on release builds +- [x] less logs for backup servers +- [x] use channels instead of arcswap + - this will let us easily wait for a new head or a new synced connection +- [x] broadcast transactions to more servers - [-] proxy mode for benchmarking all backends - [-] proxy mode for sending to multiple backends - [-] let users choose a % of reverts to log (or maybe x/second). someone like curve logging all reverts will be a BIG database very quickly diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index accd2d60..2234382f 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -1194,12 +1194,12 @@ impl Web3ProxyApp { // TODO: how should we handle private_mode here? let default_num = match proxy_mode { // TODO: how many balanced rpcs should we send to? configurable? percentage of total? - ProxyMode::Best => Some(2), + ProxyMode::Best => Some(4), ProxyMode::Fastest(0) => None, // TODO: how many balanced rpcs should we send to? configurable? percentage of total? // TODO: what if we do 2 per tier? we want to blast the third party rpcs // TODO: maybe having the third party rpcs in their own Web3Connections would be good for this - ProxyMode::Fastest(x) => Some(x * 2), + ProxyMode::Fastest(x) => Some(x * 4), ProxyMode::Versus => None, }; diff --git a/web3_proxy/src/rpcs/connections.rs b/web3_proxy/src/rpcs/connections.rs index d0c28e85..edd94dc3 100644 --- a/web3_proxy/src/rpcs/connections.rs +++ b/web3_proxy/src/rpcs/connections.rs @@ -17,7 +17,7 @@ use ethers::prelude::{ProviderError, TxHash, H256, U64}; use futures::future::{join_all, try_join_all}; use futures::stream::FuturesUnordered; use futures::StreamExt; -use hashbrown::HashMap; +use hashbrown::{HashMap, HashSet}; use log::{debug, error, info, trace, warn, Level}; use migration::sea_orm::DatabaseConnection; use moka::future::{Cache, ConcurrentCacheExt}; @@ -635,26 +635,27 @@ impl Web3Connections { } /// get all rpc servers that are not rate limited - /// returns servers even if they aren't fully in sync. This is useful for broadcasting signed transactions + /// this prefers synced servers, but it will return servers even if they aren't fully in sync. + /// This is useful for broadcasting signed transactions. // TODO: better type on this that can return an anyhow::Result - pub async fn all_synced_connections( + pub async fn all_connections( &self, authorization: &Arc, block_needed: Option<&U64>, max_count: Option, ) -> Result, Option> { if let Ok(without_backups) = self - ._all_synced_connections(false, authorization, block_needed, max_count) + ._all_connections(false, authorization, block_needed, max_count) .await { return Ok(without_backups); } - self._all_synced_connections(true, authorization, block_needed, max_count) + self._all_connections(true, authorization, block_needed, max_count) .await } - async fn _all_synced_connections( + async fn _all_connections( &self, allow_backups: bool, authorization: &Arc, @@ -665,17 +666,35 @@ impl Web3Connections { // TODO: with capacity? let mut selected_rpcs = vec![]; - let mut max_count = if max_count.is_none() { - self.conns.len() + let mut max_count = if let Some(max_count) = max_count { + max_count } else { - self.conns.len().min(max_count.unwrap()) + self.conns.len() }; - for connection in self.conns.values() { + let mut tried = HashSet::new(); + + let conns_to_try = itertools::chain( + // TODO: sort by tier + self.watch_consensus_connections_sender + .borrow() + .conns + .clone(), + // TODO: sort by tier + self.conns.values().cloned(), + ); + + for connection in conns_to_try { if max_count == 0 { break; } + if tried.contains(&connection.name) { + continue; + } + + tried.insert(connection.name.clone()); + if !allow_backups && connection.backup { continue; } @@ -927,7 +946,7 @@ impl Web3Connections { ) -> anyhow::Result { loop { match self - .all_synced_connections(authorization, block_needed, max_count) + .all_connections(authorization, block_needed, max_count) .await { Ok(active_request_handles) => { @@ -1224,7 +1243,7 @@ mod tests { // all_backend_connections gives everything regardless of sync status assert_eq!( conns - .all_synced_connections(&authorization, None, None) + .all_connections(&authorization, None, None) .await .unwrap() .len(),