web3-proxy/deferred-rate-limiter/src/lib.rs

212 lines
8.8 KiB
Rust
Raw Normal View History

2022-09-15 20:57:24 +03:00
//#![warn(missing_docs)]
2022-11-12 11:24:32 +03:00
use log::error;
2022-09-15 20:57:24 +03:00
use moka::future::Cache;
use redis_rate_limiter::{RedisRateLimitResult, RedisRateLimiter};
use std::cmp::Eq;
use std::fmt::{Debug, Display};
2022-09-15 20:57:24 +03:00
use std::hash::Hash;
2022-09-20 01:17:24 +03:00
use std::sync::atomic::Ordering;
2022-09-15 20:57:24 +03:00
use std::sync::{atomic::AtomicU64, Arc};
2022-09-17 04:06:10 +03:00
use tokio::sync::Mutex;
2022-09-17 04:19:11 +03:00
use tokio::time::{Duration, Instant};
2022-09-15 20:57:24 +03:00
/// A local cache that sits in front of a RedisRateLimiter
/// Generic accross the key so it is simple to use with IPs or user keys
pub struct DeferredRateLimiter<K>
where
K: Send + Sync,
{
local_cache: Cache<K, Arc<AtomicU64>, hashbrown::hash_map::DefaultHashBuilder>,
2022-09-15 20:57:24 +03:00
prefix: String,
rrl: RedisRateLimiter,
2022-09-24 06:59:21 +03:00
/// if None, defers to the max on rrl
default_max_requests_per_period: Option<u64>,
2022-09-15 20:57:24 +03:00
}
pub enum DeferredRateLimitResult {
Allowed,
RetryAt(Instant),
RetryNever,
}
impl<K> DeferredRateLimiter<K>
where
K: Copy + Debug + Display + Hash + Eq + Send + Sync + 'static,
2022-09-15 20:57:24 +03:00
{
2022-09-24 06:59:21 +03:00
pub fn new(
2022-11-16 23:18:37 +03:00
// TODO: change this to cache_size in bytes
2022-09-24 06:59:21 +03:00
cache_size: u64,
prefix: &str,
rrl: RedisRateLimiter,
default_max_requests_per_second: Option<u64>,
) -> Self {
2022-09-17 04:19:11 +03:00
let ttl = rrl.period as u64;
// TODO: time to live is not exactly right. we want this ttl counter to start only after redis is down. this works for now
2022-11-16 23:18:37 +03:00
// TODO: what do these weigh?
2022-12-29 00:53:36 +03:00
// TODO: allow skipping max_capacity
2022-09-17 04:19:11 +03:00
let local_cache = Cache::builder()
.time_to_live(Duration::from_secs(ttl))
.max_capacity(cache_size)
.name(prefix)
2022-11-11 21:40:52 +03:00
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default());
2022-09-17 04:19:11 +03:00
2022-09-15 20:57:24 +03:00
Self {
2022-09-17 04:19:11 +03:00
local_cache,
2022-09-15 20:57:24 +03:00
prefix: prefix.to_string(),
rrl,
2022-09-24 06:59:21 +03:00
default_max_requests_per_period: default_max_requests_per_second,
2022-09-15 20:57:24 +03:00
}
}
/// if setting max_per_period, be sure to keep the period the same for all requests to this label
2022-09-20 01:17:24 +03:00
/// TODO: max_per_period being None means two things. some places it means unlimited, but here it means to use the default. make an enum
2022-09-15 20:57:24 +03:00
pub async fn throttle(
&self,
key: K,
2022-09-24 06:59:21 +03:00
max_requests_per_period: Option<u64>,
2022-09-15 20:57:24 +03:00
count: u64,
) -> anyhow::Result<DeferredRateLimitResult> {
2022-09-24 06:59:21 +03:00
let max_requests_per_period = max_requests_per_period.unwrap_or_else(|| {
self.default_max_requests_per_period
.unwrap_or(self.rrl.max_requests_per_period)
});
2022-09-15 20:57:24 +03:00
2022-09-24 06:59:21 +03:00
if max_requests_per_period == 0 {
2022-09-15 20:57:24 +03:00
return Ok(DeferredRateLimitResult::RetryNever);
}
let deferred_rate_limit_result = Arc::new(Mutex::new(None));
2022-09-15 20:57:24 +03:00
let redis_key = format!("{}:{}", self.prefix, key);
2022-09-15 20:57:24 +03:00
// TODO: i'm sure this could be a lot better. but race conditions make this hard to think through. brain needs sleep
2022-09-20 01:41:53 +03:00
let local_key_count: Arc<AtomicU64> = {
2022-09-20 01:17:24 +03:00
// clone things outside of the `async move`
let deferred_rate_limit_result = deferred_rate_limit_result.clone();
let redis_key = redis_key.clone();
let rrl = Arc::new(self.rrl.clone());
2022-09-20 01:17:24 +03:00
// set arc_deferred_rate_limit_result and return the coun
2022-09-17 04:06:10 +03:00
self.local_cache
.get_with_by_ref(&key, async move {
2022-09-17 04:06:10 +03:00
// we do not use the try operator here because we want to be okay with redis errors
let redis_count = match rrl
2022-09-24 06:59:21 +03:00
.throttle_label(&redis_key, Some(max_requests_per_period), count)
2022-09-17 04:06:10 +03:00
.await
{
2022-09-20 01:17:24 +03:00
Ok(RedisRateLimitResult::Allowed(count)) => {
2022-09-20 01:41:53 +03:00
let _ = deferred_rate_limit_result
2022-09-20 01:17:24 +03:00
.lock()
.await
.insert(DeferredRateLimitResult::Allowed);
count
}
2022-09-17 04:06:10 +03:00
Ok(RedisRateLimitResult::RetryAt(retry_at, count)) => {
2022-09-20 01:41:53 +03:00
let _ = deferred_rate_limit_result
2022-09-20 01:17:24 +03:00
.lock()
.await
.insert(DeferredRateLimitResult::RetryAt(retry_at));
2022-09-17 04:06:10 +03:00
count
}
2022-09-20 01:17:24 +03:00
Ok(RedisRateLimitResult::RetryNever) => {
unreachable!();
2022-09-20 01:17:24 +03:00
}
2022-09-17 04:06:10 +03:00
Err(err) => {
2022-09-20 01:41:53 +03:00
let _ = deferred_rate_limit_result
2022-09-20 01:17:24 +03:00
.lock()
.await
.insert(DeferredRateLimitResult::Allowed);
// if we get a redis error, just let the user through.
// if users are sticky on a server, local caches will work well enough
// though now that we do this, we need to reset rate limits every minute! cache must have ttl!
2022-11-12 11:24:32 +03:00
error!("unable to rate limit! creating empty cache. err={:?}", err);
2022-09-17 04:06:10 +03:00
0
}
};
Arc::new(AtomicU64::new(redis_count))
2022-09-15 20:57:24 +03:00
})
2022-09-17 04:06:10 +03:00
.await
2022-09-15 20:57:24 +03:00
};
let mut locked = deferred_rate_limit_result.lock().await;
2022-09-20 01:17:24 +03:00
if let Some(deferred_rate_limit_result) = locked.take() {
// new entry. redis was already incremented
// return the retry_at that we got from
2022-09-20 01:17:24 +03:00
Ok(deferred_rate_limit_result)
2022-09-15 20:57:24 +03:00
} else {
// we have a cached amount here
2022-09-20 01:41:53 +03:00
let cached_key_count = local_key_count.fetch_add(count, Ordering::Acquire);
// assuming no other parallel futures incremented this key, this is the count that redis has
let expected_key_count = cached_key_count + count;
2022-09-24 06:59:21 +03:00
if expected_key_count > max_requests_per_period {
// rate limit overshot!
let now = self.rrl.now_as_secs();
// do not fetch_sub
// another row might have queued a redis throttle_label to keep our count accurate
2022-09-15 20:57:24 +03:00
// show that we are rate limited without even querying redis
let retry_at = self.rrl.next_period(now);
2022-09-17 04:06:10 +03:00
Ok(DeferredRateLimitResult::RetryAt(retry_at))
} else {
// local caches think rate limit should be okay
2022-09-17 04:06:10 +03:00
// prepare a future to update redis
let rate_limit_f = {
let rrl = self.rrl.clone();
async move {
2022-09-17 04:06:10 +03:00
match rrl
2022-09-24 06:59:21 +03:00
.throttle_label(&redis_key, Some(max_requests_per_period), count)
2022-09-17 04:06:10 +03:00
.await
{
Ok(RedisRateLimitResult::Allowed(count)) => {
2022-09-20 01:41:53 +03:00
local_key_count.store(count, Ordering::Release);
2022-09-17 04:06:10 +03:00
DeferredRateLimitResult::Allowed
}
Ok(RedisRateLimitResult::RetryAt(retry_at, count)) => {
2022-09-20 01:41:53 +03:00
local_key_count.store(count, Ordering::Release);
2022-09-17 04:06:10 +03:00
DeferredRateLimitResult::RetryAt(retry_at)
}
Ok(RedisRateLimitResult::RetryNever) => {
// TODO: what should we do to arc_key_count?
DeferredRateLimitResult::RetryNever
}
Err(err) => {
// don't let redis errors block our users!
error!(
2022-11-12 11:24:32 +03:00
"unable to query rate limits, but local cache is available. key={:?} err={:?}",
key,
err,
2022-09-17 04:06:10 +03:00
);
// TODO: we need to start a timer that resets this count every minute
DeferredRateLimitResult::Allowed
}
}
}
2022-09-15 20:57:24 +03:00
};
// if close to max_per_period, wait for redis
// TODO: how close should we allow? depends on max expected concurent requests from one user
2022-12-24 03:14:10 +03:00
let limit: f64 = (max_requests_per_period as f64 * 0.99)
.min(max_requests_per_period as f64 - 1.0);
if expected_key_count > limit as u64 {
// close to period. don't risk it. wait on redis
2022-09-17 04:06:10 +03:00
Ok(rate_limit_f.await)
} else {
// rate limit has enough headroom that it should be safe to do this in the background
2022-12-28 09:11:18 +03:00
// TODO: send an error here somewhere
2022-11-12 11:24:32 +03:00
tokio::spawn(rate_limit_f);
2022-09-17 04:06:10 +03:00
Ok(DeferredRateLimitResult::Allowed)
}
}
2022-09-15 20:57:24 +03:00
}
}
}