lru instead of moka for deduped_broadcast

This commit is contained in:
Bryan Stitt 2023-09-19 14:39:59 -07:00
parent 6233faeec4
commit ca2056e16f
3 changed files with 17 additions and 15 deletions

13
Cargo.lock generated
View File

@ -1298,9 +1298,9 @@ dependencies = [
[[package]]
name = "deduped_broadcast"
version = "0.1.0"
version = "0.2.0"
dependencies = [
"moka",
"lru",
"serde",
"tokio",
"tracing",
@ -3070,6 +3070,15 @@ version = "0.4.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f"
[[package]]
name = "lru"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4a83fb7698b3643a0e34f9ae6f2e8f0178c0fd42f8b59d493aa271ff3a5bf21"
dependencies = [
"hashbrown 0.14.0",
]
[[package]]
name = "mach2"
version = "0.4.1"

View File

@ -1,12 +1,12 @@
[package]
name = "deduped_broadcast"
version = "0.1.0"
version = "0.2.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
tracing = "0.1"
moka = { version = "0.12.0", default-features = false, features = ["atomic64", "future", "quanta"] }
lru = { version = "0.11.0" }
serde = "1"
tokio = { version = "1.32.0", features = ["full"] }

View File

@ -1,4 +1,3 @@
use moka::future::{Cache, CacheBuilder};
use serde::ser::SerializeStruct;
use serde::{Serialize, Serializer};
use std::sync::atomic::{AtomicUsize, Ordering};
@ -17,7 +16,7 @@ where
{
unfiltered_rx: mpsc::Receiver<T>,
broadcast_filtered_tx: Arc<broadcast::Sender<T>>,
cache: Cache<u64, ()>,
cache: lru::LruCache<u64, ()>,
total_unfiltered: Arc<AtomicUsize>,
total_filtered: Arc<AtomicUsize>,
total_broadcasts: Arc<AtomicUsize>,
@ -53,7 +52,7 @@ where
// we don't actually care what the return value is. we just want to send only if the cache is empty
// TODO: count new vs unique
self.cache
.get_with(hashed, async {
.get_or_insert(hashed, || {
self.total_filtered
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
@ -70,17 +69,11 @@ impl<T> DedupedBroadcaster<T>
where
T: Clone + Debug + Hash + Send + Sync + 'static,
{
pub fn new(capacity: usize, cache_capacity: u64, cache_ttl: Option<Duration>) -> Self {
pub fn new(capacity: usize, cache_capacity: u64) -> Self {
let (unfiltered_tx, unfiltered_rx) = mpsc::channel::<T>(capacity);
let (broadcast_filtered_tx, _) = broadcast::channel(capacity);
let mut cache = CacheBuilder::new(cache_capacity);
if let Some(cache_ttl) = cache_ttl {
cache = cache.time_to_live(cache_ttl);
}
let cache = cache.build();
let cache = lru::LruCache::new(cache_capacity.try_into().unwrap());
let broadcast_filtered_tx = Arc::new(broadcast_filtered_tx);