diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 5687df69..15959a5c 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -1288,7 +1288,7 @@ impl Web3ProxyApp { authorization, &request, Some(request_metadata.clone()), - Some(&head_block_num), + None, None, Level::Trace, num, diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index ab2cfbcd..3425d64a 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -1676,7 +1676,156 @@ mod tests { } } - fn test_all_connections() { - todo!() + #[tokio::test] + async fn test_all_connections() { + let _ = env_logger::builder() + .filter_level(LevelFilter::Error) + .filter_module("web3_proxy", LevelFilter::Trace) + .is_test(true) + .try_init(); + + // TODO: use chrono, not SystemTime + let now: U256 = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs() + .into(); + + let block_1 = Block { + hash: Some(H256::random()), + number: Some(1_000_000.into()), + parent_hash: H256::random(), + timestamp: now, + ..Default::default() + }; + let block_2 = Block { + hash: Some(H256::random()), + number: Some(1_000_001.into()), + parent_hash: block_1.hash.unwrap(), + timestamp: now + 1, + ..Default::default() + }; + + let block_1: Web3ProxyBlock = Arc::new(block_1).try_into().unwrap(); + let block_2: Web3ProxyBlock = Arc::new(block_2).try_into().unwrap(); + + let mock_geth = Web3Rpc { + name: "mock_geth".to_string(), + soft_limit: 1_000, + automatic_block_limit: false, + backup: false, + block_data_limit: 64.into(), + tier: 0, + head_block: RwLock::new(Some(block_1.clone())), + provider: AsyncRwLock::new(Some(Arc::new(Web3Provider::Mock))), + ..Default::default() + }; + + let mock_erigon_archive = Web3Rpc { + name: "mock_erigon_archive".to_string(), + soft_limit: 1_000, + automatic_block_limit: false, + backup: false, + block_data_limit: u64::MAX.into(), + tier: 1, + head_block: RwLock::new(Some(block_2.clone())), + provider: AsyncRwLock::new(Some(Arc::new(Web3Provider::Mock))), + ..Default::default() + }; + + assert!(mock_geth.has_block_data(block_1.number())); + assert!(mock_erigon_archive.has_block_data(block_1.number())); + assert!(!mock_geth.has_block_data(block_2.number())); + assert!(mock_erigon_archive.has_block_data(block_2.number())); + + let mock_geth = Arc::new(mock_geth); + let mock_erigon_archive = Arc::new(mock_erigon_archive); + + let rpcs_by_name = HashMap::from([ + (mock_geth.name.clone(), mock_geth.clone()), + ( + mock_erigon_archive.name.clone(), + mock_erigon_archive.clone(), + ), + ]); + + let (block_sender, _) = flume::unbounded(); + let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded(); + let (watch_consensus_rpcs_sender, _) = watch::channel(Default::default()); + let (watch_consensus_head_sender, _watch_consensus_head_receiver) = + watch::channel(Default::default()); + + // TODO: make a Web3Rpcs::new + let rpcs = Web3Rpcs { + block_sender, + by_name: RwLock::new(rpcs_by_name), + http_interval_sender: None, + watch_consensus_head_sender: Some(watch_consensus_head_sender), + watch_consensus_rpcs_sender, + pending_transaction_cache: Cache::builder() + .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()), + pending_tx_id_receiver, + pending_tx_id_sender, + blocks_by_hash: Cache::builder() + .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()), + blocks_by_number: Cache::builder() + .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()), + min_head_rpcs: 1, + min_sum_soft_limit: 1_000, + max_block_age: None, + max_block_lag: None, + }; + + let authorization = Arc::new(Authorization::internal(None).unwrap()); + + let mut connection_heads = ConsensusFinder::new(None, None); + + rpcs.process_block_from_rpc( + &authorization, + &mut connection_heads, + Some(block_1.clone()), + mock_geth.clone(), + &None, + ) + .await + .unwrap(); + + rpcs.process_block_from_rpc( + &authorization, + &mut connection_heads, + Some(block_2.clone()), + mock_erigon_archive.clone(), + &None, + ) + .await + .unwrap(); + + assert_eq!(rpcs.num_synced_rpcs(), 1); + + // best_synced_backend_connection requires servers to be synced with the head block + // TODO: test with and without passing the head_block.number? + let head_connections = rpcs + .all_connections(&authorization, Some(block_2.number()), None, None, false) + .await; + + debug!("head_connections: {:#?}", head_connections); + + assert_eq!( + head_connections.unwrap().len(), + 1, + "wrong number of connections" + ); + + let all_connections = rpcs + .all_connections(&authorization, Some(block_1.number()), None, None, false) + .await; + + debug!("all_connections: {:#?}", all_connections); + + assert_eq!( + all_connections.unwrap().len(), + 2, + "wrong number of connections" + ) } }