2022-08-16 01:50:56 +03:00
|
|
|
//#![warn(missing_docs)]
|
|
|
|
mod errors;
|
|
|
|
|
|
|
|
use anyhow::Context;
|
2022-09-14 09:18:13 +03:00
|
|
|
use deadpool_redis::redis::pipe;
|
2022-08-16 01:50:56 +03:00
|
|
|
use std::ops::Add;
|
2022-09-10 03:12:14 +03:00
|
|
|
use std::time::{SystemTime, UNIX_EPOCH};
|
|
|
|
use tokio::time::{Duration, Instant};
|
2022-09-06 23:12:45 +03:00
|
|
|
use tracing::{debug, trace};
|
2022-08-16 01:50:56 +03:00
|
|
|
|
2022-09-14 09:18:13 +03:00
|
|
|
pub use deadpool_redis::redis;
|
|
|
|
pub use deadpool_redis::{Config, Connection, Manager, Pool, Runtime};
|
2022-08-16 01:50:56 +03:00
|
|
|
|
2022-09-14 09:18:13 +03:00
|
|
|
// pub use crate::errors::{RedisError, RedisErrorSink};
|
|
|
|
// pub use bb8_redis::{bb8, redis, RedisConnectionManager};
|
2022-08-16 01:50:56 +03:00
|
|
|
|
|
|
|
pub struct RedisRateLimit {
|
2022-09-14 09:18:13 +03:00
|
|
|
pool: Pool,
|
2022-08-16 01:50:56 +03:00
|
|
|
key_prefix: String,
|
2022-08-30 23:01:42 +03:00
|
|
|
/// The default maximum requests allowed in a period.
|
|
|
|
max_requests_per_period: u64,
|
|
|
|
/// seconds
|
|
|
|
period: f32,
|
2022-08-16 01:50:56 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
pub enum ThrottleResult {
|
|
|
|
Allowed,
|
|
|
|
RetryAt(Instant),
|
|
|
|
RetryNever,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl RedisRateLimit {
|
|
|
|
pub fn new(
|
2022-09-14 09:18:13 +03:00
|
|
|
pool: Pool,
|
2022-08-16 01:50:56 +03:00
|
|
|
app: &str,
|
|
|
|
label: &str,
|
2022-08-30 23:01:42 +03:00
|
|
|
max_requests_per_period: u64,
|
|
|
|
period: f32,
|
2022-08-16 01:50:56 +03:00
|
|
|
) -> Self {
|
|
|
|
let key_prefix = format!("{}:rrl:{}", app, label);
|
|
|
|
|
|
|
|
Self {
|
|
|
|
pool,
|
|
|
|
key_prefix,
|
2022-08-30 23:01:42 +03:00
|
|
|
max_requests_per_period,
|
2022-08-16 01:50:56 +03:00
|
|
|
period,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-08-30 23:01:42 +03:00
|
|
|
/// label might be an ip address or a user_key id.
|
|
|
|
/// if setting max_per_period, be sure to keep the period the same for all requests to this label
|
|
|
|
/// TODO:
|
2022-08-16 01:50:56 +03:00
|
|
|
pub async fn throttle_label(
|
|
|
|
&self,
|
|
|
|
label: &str,
|
|
|
|
max_per_period: Option<u64>,
|
|
|
|
count: u64,
|
|
|
|
) -> anyhow::Result<ThrottleResult> {
|
2022-08-30 23:01:42 +03:00
|
|
|
let max_per_period = max_per_period.unwrap_or(self.max_requests_per_period);
|
2022-08-16 01:50:56 +03:00
|
|
|
|
|
|
|
if max_per_period == 0 {
|
|
|
|
return Ok(ThrottleResult::RetryNever);
|
|
|
|
}
|
|
|
|
|
|
|
|
let now = SystemTime::now()
|
|
|
|
.duration_since(UNIX_EPOCH)
|
|
|
|
.context("cannot tell the time")?
|
2022-08-30 23:01:42 +03:00
|
|
|
.as_secs_f32();
|
2022-08-16 01:50:56 +03:00
|
|
|
|
2022-08-18 00:42:45 +03:00
|
|
|
// if self.period is 60, period_id will be the minute of the current time
|
|
|
|
let period_id = (now / self.period) % self.period;
|
2022-08-16 01:50:56 +03:00
|
|
|
|
|
|
|
let throttle_key = format!("{}:{}:{}", self.key_prefix, label, period_id);
|
|
|
|
|
|
|
|
let mut conn = self.pool.get().await?;
|
|
|
|
|
2022-09-06 23:12:45 +03:00
|
|
|
// TODO: at high concurency, i think this is giving errors
|
|
|
|
// TODO: i'm starting to think that bb8 has a bug
|
2022-08-16 01:50:56 +03:00
|
|
|
let x: Vec<u64> = pipe()
|
|
|
|
// we could get the key first, but that means an extra redis call for every check. this seems better
|
|
|
|
.incr(&throttle_key, count)
|
2022-09-06 23:12:45 +03:00
|
|
|
// set expiration each time we set the key. ignore the result
|
2022-08-30 23:01:42 +03:00
|
|
|
.expire(&throttle_key, self.period as usize)
|
2022-09-06 23:12:45 +03:00
|
|
|
// TODO: NX will make it only set the expiration the first time. works in redis, but not elasticache
|
|
|
|
// .arg("NX")
|
2022-08-16 01:50:56 +03:00
|
|
|
.ignore()
|
|
|
|
// do the query
|
|
|
|
.query_async(&mut *conn)
|
|
|
|
.await
|
|
|
|
.context("increment rate limit")?;
|
|
|
|
|
2022-09-01 08:58:55 +03:00
|
|
|
let new_count = x.first().context("check rate limit result")?;
|
2022-08-16 01:50:56 +03:00
|
|
|
|
|
|
|
if new_count > &max_per_period {
|
2022-08-30 23:01:42 +03:00
|
|
|
let seconds_left_in_period = self.period - (now % self.period);
|
2022-08-16 01:50:56 +03:00
|
|
|
|
2022-08-30 23:01:42 +03:00
|
|
|
let retry_at = Instant::now().add(Duration::from_secs_f32(seconds_left_in_period));
|
|
|
|
|
2022-09-06 23:12:45 +03:00
|
|
|
debug!(%label, ?retry_at, "rate limited: {}/{}", new_count, max_per_period);
|
2022-08-16 01:50:56 +03:00
|
|
|
|
2022-09-06 23:12:45 +03:00
|
|
|
Ok(ThrottleResult::RetryAt(retry_at))
|
|
|
|
} else {
|
|
|
|
trace!(%label, "NOT rate limited: {}/{}", new_count, max_per_period);
|
|
|
|
Ok(ThrottleResult::Allowed)
|
2022-08-16 01:50:56 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[inline]
|
|
|
|
pub async fn throttle(&self) -> anyhow::Result<ThrottleResult> {
|
|
|
|
self.throttle_label("", None, 1).await
|
|
|
|
}
|
2022-09-03 05:59:30 +03:00
|
|
|
|
|
|
|
pub fn max_requests_per_period(&self) -> u64 {
|
|
|
|
self.max_requests_per_period
|
|
|
|
}
|
2022-08-16 01:50:56 +03:00
|
|
|
}
|