improve sort order during eth_sendRawTransaction

This commit is contained in:
Bryan Stitt 2023-02-02 14:48:23 -08:00
parent 9ac3ef1e3d
commit ca1e550370
3 changed files with 114 additions and 10 deletions

@ -324,6 +324,9 @@ These are not yet ordered. There might be duplicates. We might not actually need
- [x] improve waiting for sync when rate limited
- [x] improve pager duty errors for smarter deduping
- [x] add create_key cli command
- [x] short lived cache on /health
- [x] cache /status for longer
- [x] sort connections during eth_sendRawTransaction
- [-] proxy mode for benchmarking all backends
- [-] proxy mode for sending to multiple backends
- [-] let users choose a % of reverts to log (or maybe x/second). someone like curve logging all reverts will be a BIG database very quickly

@ -35,6 +35,12 @@ pub enum ProviderState {
Connected(Arc<Web3Provider>),
}
impl Default for ProviderState {
fn default() -> Self {
Self::None
}
}
impl ProviderState {
pub async fn provider(&self, allow_not_ready: bool) -> Option<&Arc<Web3Provider>> {
match self {
@ -59,6 +65,7 @@ impl ProviderState {
}
/// An active connection to a Web3 RPC server like geth or erigon.
#[derive(Default)]
pub struct Web3Connection {
pub name: String,
pub display_name: Option<String>,

@ -678,17 +678,21 @@ impl Web3Connections {
let mut tried = HashSet::new();
let conns_to_try = itertools::chain(
// TODO: sort by tier
self.watch_consensus_connections_sender
.borrow()
.conns
.clone(),
// TODO: sort by tier
self.conns.values().cloned(),
);
let mut synced_conns = self
.watch_consensus_connections_sender
.borrow()
.conns
.clone();
for connection in conns_to_try {
// synced connections are all on the same block. sort them by tier with higher soft limits first
synced_conns.sort_by_cached_key(|x| (x.tier, u32::MAX - x.soft_limit));
// if there aren't enough synced connections, include more connections
let mut all_conns: Vec<_> = self.conns.values().cloned().collect();
sort_connections_by_sync_status(&mut all_conns);
for connection in itertools::chain(synced_conns, all_conns) {
if max_count == 0 {
break;
}
@ -1147,6 +1151,22 @@ impl Serialize for Web3Connections {
}
}
/// sort by block number (descending) and tier (ascending)
fn sort_connections_by_sync_status(rpcs: &mut Vec<Arc<Web3Connection>>) {
rpcs.sort_by_cached_key(|x| {
let reversed_head_block = u64::MAX
- x.head_block
.read()
.as_ref()
.map(|x| x.number().as_u64())
.unwrap_or(0);
let tier = x.tier;
(reversed_head_block, tier)
});
}
mod tests {
// TODO: why is this allow needed? does tokio::test get in the way somehow?
#![allow(unused_imports)]
@ -1162,6 +1182,80 @@ mod tests {
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::sync::RwLock as AsyncRwLock;
#[tokio::test]
async fn test_sort_connections_by_sync_status() {
let block_0 = Block {
number: Some(0.into()),
hash: Some(H256::random()),
..Default::default()
};
let block_1 = Block {
number: Some(1.into()),
hash: Some(H256::random()),
parent_hash: block_0.hash.unwrap(),
..Default::default()
};
let block_2 = Block {
number: Some(2.into()),
hash: Some(H256::random()),
parent_hash: block_1.hash.unwrap(),
..Default::default()
};
let blocks: Vec<_> = [block_0, block_1, block_2]
.into_iter()
.map(|x| SavedBlock::new(Arc::new(x)))
.collect();
let mut rpcs = [
Web3Connection {
name: "a".to_string(),
tier: 0,
head_block: RwLock::new(None),
..Default::default()
},
Web3Connection {
name: "b".to_string(),
tier: 0,
head_block: RwLock::new(blocks.get(1).cloned()),
..Default::default()
},
Web3Connection {
name: "c".to_string(),
tier: 0,
head_block: RwLock::new(blocks.get(2).cloned()),
..Default::default()
},
Web3Connection {
name: "d".to_string(),
tier: 1,
head_block: RwLock::new(None),
..Default::default()
},
Web3Connection {
name: "e".to_string(),
tier: 1,
head_block: RwLock::new(blocks.get(1).cloned()),
..Default::default()
},
Web3Connection {
name: "f".to_string(),
tier: 1,
head_block: RwLock::new(blocks.get(2).cloned()),
..Default::default()
},
]
.into_iter()
.map(Arc::new)
.collect();
sort_connections_by_sync_status(&mut rpcs);
let names_in_sort_order: Vec<_> = rpcs.iter().map(|x| x.name.as_str()).collect();
assert_eq!(names_in_sort_order, ["c", "f", "b", "e", "a", "d"]);
}
#[tokio::test]
async fn test_server_selection_by_height() {
// TODO: do this better. can test_env_logger and tokio test be stacked?