web3-proxy/redis-cell-client/src/lib.rs

131 lines
3.8 KiB
Rust
Raw Normal View History

2022-07-25 21:41:38 +03:00
//#![warn(missing_docs)]
2022-07-25 21:21:58 +03:00
2022-07-07 06:22:09 +03:00
use bb8_redis::redis::cmd;
2022-07-09 02:02:32 +03:00
pub use bb8_redis::redis::RedisError;
2022-07-07 06:22:09 +03:00
pub use bb8_redis::{bb8, RedisConnectionManager};
2022-08-07 09:48:57 +03:00
use std::ops::Add;
use std::time::{Duration, Instant};
2022-07-07 06:22:09 +03:00
2022-08-10 05:37:34 +03:00
pub type RedisPool = bb8::Pool<RedisConnectionManager>;
2022-07-07 06:22:09 +03:00
2022-08-10 05:37:34 +03:00
pub struct RedisCell {
pool: RedisPool,
2022-07-07 06:22:09 +03:00
key: String,
2022-08-06 05:29:55 +03:00
default_max_burst: u32,
default_count_per_period: u32,
default_period: u32,
2022-07-07 06:22:09 +03:00
}
2022-08-07 09:48:57 +03:00
pub enum ThrottleResult {
Allowed,
RetryAt(Instant),
}
2022-08-10 05:37:34 +03:00
impl RedisCell {
2022-07-07 06:22:09 +03:00
// todo: seems like this could be derived
// TODO: take something generic for conn
// TODO: use r2d2 for connection pooling?
pub fn new(
2022-08-10 05:37:34 +03:00
pool: RedisPool,
2022-08-06 05:29:55 +03:00
app: &str,
key: &str,
default_max_burst: u32,
default_count_per_period: u32,
default_period: u32,
2022-07-07 06:22:09 +03:00
) -> Self {
2022-08-06 05:29:55 +03:00
let key = format!("{}:redis-cell:{}", app, key);
2022-07-07 06:22:09 +03:00
Self {
pool,
2022-08-06 05:29:55 +03:00
key,
default_max_burst,
default_count_per_period,
default_period,
2022-07-07 06:22:09 +03:00
}
}
#[inline]
2022-08-06 05:29:55 +03:00
async fn _throttle(
&self,
key: &str,
max_burst: Option<u32>,
count_per_period: Option<u32>,
period: Option<u32>,
quantity: u32,
2022-08-07 09:48:57 +03:00
) -> anyhow::Result<ThrottleResult> {
let mut conn = self.pool.get().await?;
2022-07-07 06:22:09 +03:00
2022-08-06 05:29:55 +03:00
let count_per_period = count_per_period.unwrap_or(self.default_count_per_period);
2022-08-06 08:26:43 +03:00
if count_per_period == 0 {
2022-08-07 09:48:57 +03:00
return Ok(ThrottleResult::Allowed);
2022-08-06 08:26:43 +03:00
}
let max_burst = max_burst.unwrap_or(self.default_max_burst);
2022-08-06 05:29:55 +03:00
let period = period.unwrap_or(self.default_period);
2022-07-07 06:22:09 +03:00
/*
https://github.com/brandur/redis-cell#response
CL.THROTTLE <key> <max_burst> <count per period> <period> [<quantity>]
0. Whether the action was limited:
0 indicates the action is allowed.
1 indicates that the action was limited/blocked.
1. The total limit of the key (max_burst + 1). This is equivalent to the common X-RateLimit-Limit HTTP header.
2. The remaining limit of the key. Equivalent to X-RateLimit-Remaining.
3. The number of seconds until the user should retry, and always -1 if the action was allowed. Equivalent to Retry-After.
4. The number of seconds until the limit will reset to its maximum capacity. Equivalent to X-RateLimit-Reset.
*/
// TODO: don't unwrap. maybe return anyhow::Result<Result<(), Duration>>
// TODO: should we return more error info?
let x: Vec<isize> = cmd("CL.THROTTLE")
2022-08-06 05:29:55 +03:00
.arg(&(key, max_burst, count_per_period, period, quantity))
2022-07-07 06:22:09 +03:00
.query_async(&mut *conn)
2022-08-07 09:48:57 +03:00
.await?;
2022-07-07 06:22:09 +03:00
2022-08-07 09:48:57 +03:00
// TODO: trace log the result?
2022-07-07 06:22:09 +03:00
2022-08-07 09:48:57 +03:00
if x.len() != 5 {
return Err(anyhow::anyhow!("unexpected redis result"));
}
2022-07-07 06:22:09 +03:00
2022-08-07 09:48:57 +03:00
let retry_after = *x.get(3).expect("index exists above");
2022-07-07 06:22:09 +03:00
if retry_after == -1 {
2022-08-07 09:48:57 +03:00
Ok(ThrottleResult::Allowed)
2022-07-07 06:22:09 +03:00
} else {
2022-08-07 09:48:57 +03:00
// don't return a duration, return an instant
let retry_at = Instant::now().add(Duration::from_secs(retry_after as u64));
Ok(ThrottleResult::RetryAt(retry_at))
2022-07-07 06:22:09 +03:00
}
}
#[inline]
2022-08-07 09:48:57 +03:00
pub async fn throttle(&self) -> anyhow::Result<ThrottleResult> {
2022-08-06 05:29:55 +03:00
self._throttle(&self.key, None, None, None, 1).await
2022-07-07 06:22:09 +03:00
}
#[inline]
2022-08-06 08:26:43 +03:00
pub async fn throttle_key(
&self,
key: &str,
max_burst: Option<u32>,
count_per_period: Option<u32>,
period: Option<u32>,
2022-08-07 09:48:57 +03:00
) -> anyhow::Result<ThrottleResult> {
2022-07-09 03:00:31 +03:00
let key = format!("{}:{}", self.key, key);
2022-07-07 06:22:09 +03:00
2022-08-06 08:26:43 +03:00
self._throttle(key.as_ref(), max_burst, count_per_period, period, 1)
.await
2022-07-07 06:22:09 +03:00
}
#[inline]
2022-08-07 09:48:57 +03:00
pub async fn throttle_quantity(&self, quantity: u32) -> anyhow::Result<ThrottleResult> {
2022-08-06 05:29:55 +03:00
self._throttle(&self.key, None, None, None, quantity).await
2022-07-07 06:22:09 +03:00
}
}