diff --git a/Cargo.lock b/Cargo.lock index 0ea8ab91..0bc178b6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1298,9 +1298,9 @@ dependencies = [ [[package]] name = "deduped_broadcast" -version = "0.1.0" +version = "0.2.0" dependencies = [ - "moka", + "lru", "serde", "tokio", "tracing", @@ -3070,6 +3070,15 @@ version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" +[[package]] +name = "lru" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4a83fb7698b3643a0e34f9ae6f2e8f0178c0fd42f8b59d493aa271ff3a5bf21" +dependencies = [ + "hashbrown 0.14.0", +] + [[package]] name = "mach2" version = "0.4.1" diff --git a/deduped_broadcast/Cargo.toml b/deduped_broadcast/Cargo.toml index 9b7feb66..f6e79115 100644 --- a/deduped_broadcast/Cargo.toml +++ b/deduped_broadcast/Cargo.toml @@ -1,12 +1,12 @@ [package] name = "deduped_broadcast" -version = "0.1.0" +version = "0.2.0" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] tracing = "0.1" -moka = { version = "0.12.0", default-features = false, features = ["atomic64", "future", "quanta"] } +lru = { version = "0.11.0" } serde = "1" tokio = { version = "1.32.0", features = ["full"] } diff --git a/deduped_broadcast/src/lib.rs b/deduped_broadcast/src/lib.rs index cf290caa..d2287f3a 100644 --- a/deduped_broadcast/src/lib.rs +++ b/deduped_broadcast/src/lib.rs @@ -1,4 +1,3 @@ -use moka::future::{Cache, CacheBuilder}; use serde::ser::SerializeStruct; use serde::{Serialize, Serializer}; use std::sync::atomic::{AtomicUsize, Ordering}; @@ -17,7 +16,7 @@ where { unfiltered_rx: mpsc::Receiver, broadcast_filtered_tx: Arc>, - cache: Cache, + cache: lru::LruCache, total_unfiltered: Arc, total_filtered: Arc, total_broadcasts: Arc, @@ -53,7 +52,7 @@ where // we don't actually care what the return value is. we just want to send only if the cache is empty // TODO: count new vs unique self.cache - .get_with(hashed, async { + .get_or_insert(hashed, || { self.total_filtered .fetch_add(1, std::sync::atomic::Ordering::Relaxed); @@ -70,17 +69,11 @@ impl DedupedBroadcaster where T: Clone + Debug + Hash + Send + Sync + 'static, { - pub fn new(capacity: usize, cache_capacity: u64, cache_ttl: Option) -> Self { + pub fn new(capacity: usize, cache_capacity: u64) -> Self { let (unfiltered_tx, unfiltered_rx) = mpsc::channel::(capacity); let (broadcast_filtered_tx, _) = broadcast::channel(capacity); - let mut cache = CacheBuilder::new(cache_capacity); - - if let Some(cache_ttl) = cache_ttl { - cache = cache.time_to_live(cache_ttl); - } - - let cache = cache.build(); + let cache = lru::LruCache::new(cache_capacity.try_into().unwrap()); let broadcast_filtered_tx = Arc::new(broadcast_filtered_tx);