From 9621cfdccd5ffc0f4be5fdd658f49e30608cf1fc Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Sat, 13 May 2023 16:28:27 -0700 Subject: [PATCH] add quick cache wrapper for ttl expiration --- Cargo.lock | 9 +++ Cargo.toml | 1 + quick_cache_ttl/Cargo.toml | 14 +++++ quick_cache_ttl/src/lib.rs | 125 +++++++++++++++++++++++++++++++++++++ 4 files changed, 149 insertions(+) create mode 100644 quick_cache_ttl/Cargo.toml create mode 100644 quick_cache_ttl/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 15b0d7ab..0cdee203 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4225,6 +4225,15 @@ dependencies = [ "parking_lot 0.12.1", ] +[[package]] +name = "quick_cache_ttl" +version = "0.1.0" +dependencies = [ + "flume", + "quick_cache", + "tokio", +] + [[package]] name = "quote" version = "1.0.27" diff --git a/Cargo.toml b/Cargo.toml index 3c0a9c22..12175b98 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,7 @@ members = [ "entities", "latency", "migration", + "quick_cache_ttl", "rate-counter", "redis-rate-limiter", "thread-fast-rng", diff --git a/quick_cache_ttl/Cargo.toml b/quick_cache_ttl/Cargo.toml new file mode 100644 index 00000000..d0102b72 --- /dev/null +++ b/quick_cache_ttl/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "quick_cache_ttl" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +flume = "0.10.14" +quick_cache = "0.3.0" +tokio = { version = "1.28.1", features = ["full"] } + +[dev-dependencies] +tokio = { version = "1.28.1", features = ["full", "test-util"] } diff --git a/quick_cache_ttl/src/lib.rs b/quick_cache_ttl/src/lib.rs new file mode 100644 index 00000000..11184357 --- /dev/null +++ b/quick_cache_ttl/src/lib.rs @@ -0,0 +1,125 @@ +use quick_cache::sync::Cache; +use quick_cache::{PlaceholderGuard, Weighter}; +use std::hash::{BuildHasher, Hash}; +use std::sync::Arc; +use std::time::Duration; +use tokio::task::JoinHandle; +use tokio::time::{sleep_until, Instant}; + +pub struct QuickCache { + cache: Arc>, + pub task_handle: JoinHandle<()>, + ttl: Duration, + tx: flume::Sender<(Instant, Key)>, +} + +// TODO: join handle that +struct QuickCacheTask { + cache: Arc>, + rx: flume::Receiver<(Instant, Key)>, +} + +pub struct Guard<'a, Key, Qey, Val, We, B> { + inner: PlaceholderGuard<'a, Key, Qey, Val, We, B>, + key: Key, + ttl: Duration, + tx: &'a flume::Sender<(Instant, Key)>, +} + +impl< + 'a, + Key: Clone + Eq + Hash + PartialEq, + Qey: Eq + Hash, + Val: Clone, + We: Weighter, + B: BuildHasher, + > Guard<'a, Key, Qey, Val, We, B> +{ + pub fn insert(self, val: Val) { + let expire_at = Instant::now() + self.ttl; + + self.inner.insert(val); + + self.tx.send((expire_at, self.key)).unwrap(); + } +} + +impl< + Key: Clone + Eq + Hash + Send + Sync + 'static, + Val: Clone + Send + Sync + 'static, + We: Weighter + Clone + Send + Sync + 'static, + B: BuildHasher + Clone + Send + Sync + 'static, + > QuickCache +{ + pub async fn spawn( + estimated_items_capacity: usize, + weight_capacity: u64, + weighter: We, + hash_builder: B, + ttl: Duration, + ) -> Self { + let (tx, rx) = flume::unbounded(); + + let cache = Cache::with( + estimated_items_capacity, + weight_capacity, + weighter, + hash_builder, + ); + + let cache = Arc::new(cache); + + let task = QuickCacheTask { + cache: cache.clone(), + rx, + }; + + let task_handle = tokio::spawn(task.run()); + + Self { + cache, + task_handle, + ttl, + tx, + } + } + + pub fn insert(&self, key: Key, val: Val) { + let expire_at = Instant::now() + self.ttl; + + self.cache.insert(key.clone(), val); + + self.tx.send((expire_at, key)).unwrap(); + } + + pub async fn get_value_or_guard_async( + &self, + key: Key, + ) -> Result> { + match self.cache.get_value_or_guard_async(&key).await { + Ok(x) => Ok(x), + Err(inner) => Err(Guard { + inner, + key, + ttl: self.ttl, + tx: &self.tx, + }), + } + } + + pub fn remove(&self, key: &Key) -> bool { + self.cache.remove(key) + } +} + +impl + Clone, B: BuildHasher + Clone> + QuickCacheTask +{ + async fn run(self) { + while let Ok((expire_at, key)) = self.rx.recv_async().await { + sleep_until(expire_at).await; + + self.cache.remove(&key); + } + } +}