broadcast transactions to more servers

This commit is contained in:
Bryan Stitt 2023-01-24 01:58:31 -08:00
parent 7ec4c69fd7
commit a242244a35
3 changed files with 39 additions and 14 deletions

@ -312,6 +312,12 @@ These are not yet ordered. There might be duplicates. We might not actually need
- [x] public is 3900, but free is 360. free should be at least 3900 but probably more
- [x] add --max-wait to wait_for_sync
- [x] add automatic compare urls to wait_for_sync
- [x] send panics to pagerduty
- [x] enable lto on release builds
- [x] less logs for backup servers
- [x] use channels instead of arcswap
- this will let us easily wait for a new head or a new synced connection
- [x] broadcast transactions to more servers
- [-] proxy mode for benchmarking all backends
- [-] proxy mode for sending to multiple backends
- [-] let users choose a % of reverts to log (or maybe x/second). someone like curve logging all reverts will be a BIG database very quickly

@ -1194,12 +1194,12 @@ impl Web3ProxyApp {
// TODO: how should we handle private_mode here?
let default_num = match proxy_mode {
// TODO: how many balanced rpcs should we send to? configurable? percentage of total?
ProxyMode::Best => Some(2),
ProxyMode::Best => Some(4),
ProxyMode::Fastest(0) => None,
// TODO: how many balanced rpcs should we send to? configurable? percentage of total?
// TODO: what if we do 2 per tier? we want to blast the third party rpcs
// TODO: maybe having the third party rpcs in their own Web3Connections would be good for this
ProxyMode::Fastest(x) => Some(x * 2),
ProxyMode::Fastest(x) => Some(x * 4),
ProxyMode::Versus => None,
};

@ -17,7 +17,7 @@ use ethers::prelude::{ProviderError, TxHash, H256, U64};
use futures::future::{join_all, try_join_all};
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use hashbrown::HashMap;
use hashbrown::{HashMap, HashSet};
use log::{debug, error, info, trace, warn, Level};
use migration::sea_orm::DatabaseConnection;
use moka::future::{Cache, ConcurrentCacheExt};
@ -635,26 +635,27 @@ impl Web3Connections {
}
/// get all rpc servers that are not rate limited
/// returns servers even if they aren't fully in sync. This is useful for broadcasting signed transactions
/// this prefers synced servers, but it will return servers even if they aren't fully in sync.
/// This is useful for broadcasting signed transactions.
// TODO: better type on this that can return an anyhow::Result
pub async fn all_synced_connections(
pub async fn all_connections(
&self,
authorization: &Arc<Authorization>,
block_needed: Option<&U64>,
max_count: Option<usize>,
) -> Result<Vec<OpenRequestHandle>, Option<Instant>> {
if let Ok(without_backups) = self
._all_synced_connections(false, authorization, block_needed, max_count)
._all_connections(false, authorization, block_needed, max_count)
.await
{
return Ok(without_backups);
}
self._all_synced_connections(true, authorization, block_needed, max_count)
self._all_connections(true, authorization, block_needed, max_count)
.await
}
async fn _all_synced_connections(
async fn _all_connections(
&self,
allow_backups: bool,
authorization: &Arc<Authorization>,
@ -665,17 +666,35 @@ impl Web3Connections {
// TODO: with capacity?
let mut selected_rpcs = vec![];
let mut max_count = if max_count.is_none() {
self.conns.len()
let mut max_count = if let Some(max_count) = max_count {
max_count
} else {
self.conns.len().min(max_count.unwrap())
self.conns.len()
};
for connection in self.conns.values() {
let mut tried = HashSet::new();
let conns_to_try = itertools::chain(
// TODO: sort by tier
self.watch_consensus_connections_sender
.borrow()
.conns
.clone(),
// TODO: sort by tier
self.conns.values().cloned(),
);
for connection in conns_to_try {
if max_count == 0 {
break;
}
if tried.contains(&connection.name) {
continue;
}
tried.insert(connection.name.clone());
if !allow_backups && connection.backup {
continue;
}
@ -927,7 +946,7 @@ impl Web3Connections {
) -> anyhow::Result<JsonRpcForwardedResponse> {
loop {
match self
.all_synced_connections(authorization, block_needed, max_count)
.all_connections(authorization, block_needed, max_count)
.await
{
Ok(active_request_handles) => {
@ -1224,7 +1243,7 @@ mod tests {
// all_backend_connections gives everything regardless of sync status
assert_eq!(
conns
.all_synced_connections(&authorization, None, None)
.all_connections(&authorization, None, None)
.await
.unwrap()
.len(),