From 8b08828473846521b6d16f07e4212453e6e99a2c Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Tue, 19 Sep 2023 14:57:42 -0700 Subject: [PATCH] make it compile. my editor didnt show any errors :shrug: --- deduped_broadcast/src/lib.rs | 21 +++++++++------------ web3_proxy/src/app/mod.rs | 3 +-- 2 files changed, 10 insertions(+), 14 deletions(-) diff --git a/deduped_broadcast/src/lib.rs b/deduped_broadcast/src/lib.rs index d2287f3a..29936ca0 100644 --- a/deduped_broadcast/src/lib.rs +++ b/deduped_broadcast/src/lib.rs @@ -2,7 +2,6 @@ use serde::ser::SerializeStruct; use serde::{Serialize, Serializer}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; -use std::time::Duration; use std::{ collections::hash_map::DefaultHasher, fmt::Debug, @@ -51,16 +50,14 @@ where // we don't actually care what the return value is. we just want to send only if the cache is empty // TODO: count new vs unique - self.cache - .get_or_insert(hashed, || { - self.total_filtered - .fetch_add(1, std::sync::atomic::Ordering::Relaxed); + self.cache.get_or_insert(hashed, || { + self.total_filtered + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); - if let Ok(x) = self.broadcast_filtered_tx.send(item) { - self.total_broadcasts.fetch_add(x, Ordering::Relaxed); - } - }) - .await; + if let Ok(x) = self.broadcast_filtered_tx.send(item) { + self.total_broadcasts.fetch_add(x, Ordering::Relaxed); + } + }); } } } @@ -69,7 +66,7 @@ impl DedupedBroadcaster where T: Clone + Debug + Hash + Send + Sync + 'static, { - pub fn new(capacity: usize, cache_capacity: u64) -> Self { + pub fn new(capacity: usize, cache_capacity: usize) -> Self { let (unfiltered_tx, unfiltered_rx) = mpsc::channel::(capacity); let (broadcast_filtered_tx, _) = broadcast::channel(capacity); @@ -175,7 +172,7 @@ mod tests { #[tokio::test] async fn test_deduped_broadcaster() { - let broadcaster = DedupedBroadcaster::new(10, 10, None); + let broadcaster = DedupedBroadcaster::new(10, 10); let mut receiver_1 = broadcaster.subscribe(); let _receiver_2 = broadcaster.subscribe(); diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 4cd8b490..7d5f0791 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -411,8 +411,7 @@ impl Web3ProxyApp { let chain_id = top_config.app.chain_id; - let deduped_txid_firehose = - DedupedBroadcaster::new(10_000, 10_000, Some(Duration::from_secs(5 * 60))); + let deduped_txid_firehose = DedupedBroadcaster::new(5_000, 5_000); // TODO: remove this. it should only be done by apply_top_config let (balanced_rpcs, balanced_handle, consensus_connections_watcher) = Web3Rpcs::spawn(