make it compile. my editor didnt show any errors 🤷
This commit is contained in:
parent
416d179c09
commit
8b08828473
@ -2,7 +2,6 @@ use serde::ser::SerializeStruct;
|
|||||||
use serde::{Serialize, Serializer};
|
use serde::{Serialize, Serializer};
|
||||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
|
||||||
use std::{
|
use std::{
|
||||||
collections::hash_map::DefaultHasher,
|
collections::hash_map::DefaultHasher,
|
||||||
fmt::Debug,
|
fmt::Debug,
|
||||||
@ -51,16 +50,14 @@ where
|
|||||||
|
|
||||||
// we don't actually care what the return value is. we just want to send only if the cache is empty
|
// we don't actually care what the return value is. we just want to send only if the cache is empty
|
||||||
// TODO: count new vs unique
|
// TODO: count new vs unique
|
||||||
self.cache
|
self.cache.get_or_insert(hashed, || {
|
||||||
.get_or_insert(hashed, || {
|
self.total_filtered
|
||||||
self.total_filtered
|
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||||
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
|
||||||
|
|
||||||
if let Ok(x) = self.broadcast_filtered_tx.send(item) {
|
if let Ok(x) = self.broadcast_filtered_tx.send(item) {
|
||||||
self.total_broadcasts.fetch_add(x, Ordering::Relaxed);
|
self.total_broadcasts.fetch_add(x, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
})
|
});
|
||||||
.await;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -69,7 +66,7 @@ impl<T> DedupedBroadcaster<T>
|
|||||||
where
|
where
|
||||||
T: Clone + Debug + Hash + Send + Sync + 'static,
|
T: Clone + Debug + Hash + Send + Sync + 'static,
|
||||||
{
|
{
|
||||||
pub fn new(capacity: usize, cache_capacity: u64) -> Self {
|
pub fn new(capacity: usize, cache_capacity: usize) -> Self {
|
||||||
let (unfiltered_tx, unfiltered_rx) = mpsc::channel::<T>(capacity);
|
let (unfiltered_tx, unfiltered_rx) = mpsc::channel::<T>(capacity);
|
||||||
let (broadcast_filtered_tx, _) = broadcast::channel(capacity);
|
let (broadcast_filtered_tx, _) = broadcast::channel(capacity);
|
||||||
|
|
||||||
@ -175,7 +172,7 @@ mod tests {
|
|||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_deduped_broadcaster() {
|
async fn test_deduped_broadcaster() {
|
||||||
let broadcaster = DedupedBroadcaster::new(10, 10, None);
|
let broadcaster = DedupedBroadcaster::new(10, 10);
|
||||||
|
|
||||||
let mut receiver_1 = broadcaster.subscribe();
|
let mut receiver_1 = broadcaster.subscribe();
|
||||||
let _receiver_2 = broadcaster.subscribe();
|
let _receiver_2 = broadcaster.subscribe();
|
||||||
|
@ -411,8 +411,7 @@ impl Web3ProxyApp {
|
|||||||
|
|
||||||
let chain_id = top_config.app.chain_id;
|
let chain_id = top_config.app.chain_id;
|
||||||
|
|
||||||
let deduped_txid_firehose =
|
let deduped_txid_firehose = DedupedBroadcaster::new(5_000, 5_000);
|
||||||
DedupedBroadcaster::new(10_000, 10_000, Some(Duration::from_secs(5 * 60)));
|
|
||||||
|
|
||||||
// TODO: remove this. it should only be done by apply_top_config
|
// TODO: remove this. it should only be done by apply_top_config
|
||||||
let (balanced_rpcs, balanced_handle, consensus_connections_watcher) = Web3Rpcs::spawn(
|
let (balanced_rpcs, balanced_handle, consensus_connections_watcher) = Web3Rpcs::spawn(
|
||||||
|
Loading…
Reference in New Issue
Block a user