login needs its own rate limiter

This commit is contained in:
Bryan Stitt 2022-09-24 03:59:21 +00:00
parent c12e25a210
commit 81deb1103c
9 changed files with 140 additions and 54 deletions

15
TODO.md

@ -161,16 +161,19 @@ These are roughly in order of completition
- [x] Api keys need option to lock to IP, cors header, referer, user agent, etc
- [x] /user/logout to clear bearer token and jwt
- [x] bearer tokens should expire
- [-] user login should return the bearer token, the user keys, and a jwt (jsonwebtoken rust crate should make it easy)
- [x] login endpoint needs its own rate limiter
- we don't want an rpc request limit of 0 to block logins
- for security, we want these limits low.
- [x] user login should return the bearer token and the user keys
- [x] use siwe messages and signatures for sign up and login
- [ ] active requests per second per api key
- [ ] distribution of methods per api key (eth_call, eth_getLogs, etc.)
- [-] let users choose a % to log (or maybe x/second). someone like curve logging all reverts will be a BIG database very quickly
- this must be opt-in or spawned since it will slow things down and will make their calls less private
- [ ] we currently default to 0.0 and don't expose a way to edit it. we have a database row, but we don't use it
- [-] add configurable size limits to all the Caches
- [ ] active requests per second per api key
- [ ] distribution of methods per api key (eth_call, eth_getLogs, etc.)
- [ ] endpoint for creating/modifying api keys and their advanced security features
- [ ] BUG: i think if all backend servers stop, the server doesn't properly reconnect. It appears to stop listening on 8854, but not shut down.
- [ ] revert logs should have a maximum age and a maximum count to keep the database from being huge
- [ ] Ulid instead of Uuid for user keys
- <https://discord.com/channels/873880840487206962/900758376164757555/1012942974608474142>
- since users are actively using our service, we will need to support both
@ -183,7 +186,9 @@ These are roughly in order of completition
These are not yet ordered.
- [-] use siwe messages and signatures for sign up and login
- [ ] instead of Option<...> in our frontend function signatures, use result and then the try operator so that we get our errors wrapped in json
- [ ] revert logs should have a maximum age and a maximum count to keep the database from being huge
- [ ] user login should also return a jwt (jsonwebtoken rust crate should make it easy)
- [-] if we request an old block, more servers can handle it than we currently use.
- [ ] instead of the one list of just heads, store our intermediate mappings (rpcs_by_hash, rpcs_by_num, blocks_by_hash) in SyncedConnections. this shouldn't be too much slower than what we have now
- [ ] remove the if/else where we optionally route to archive and refactor to require a BlockNumber enum

@ -19,6 +19,8 @@ where
local_cache: Cache<K, Arc<AtomicU64>, hashbrown::hash_map::DefaultHashBuilder>,
prefix: String,
rrl: RedisRateLimiter,
/// if None, defers to the max on rrl
default_max_requests_per_period: Option<u64>,
}
pub enum DeferredRateLimitResult {
@ -31,7 +33,12 @@ impl<K> DeferredRateLimiter<K>
where
K: Copy + Debug + Display + Hash + Eq + Send + Sync + 'static,
{
pub fn new(cache_size: u64, prefix: &str, rrl: RedisRateLimiter) -> Self {
pub fn new(
cache_size: u64,
prefix: &str,
rrl: RedisRateLimiter,
default_max_requests_per_second: Option<u64>,
) -> Self {
let ttl = rrl.period as u64;
// TODO: time to live is not exactly right. we want this ttl counter to start only after redis is down. this works for now
@ -45,6 +52,7 @@ where
local_cache,
prefix: prefix.to_string(),
rrl,
default_max_requests_per_period: default_max_requests_per_second,
}
}
@ -53,12 +61,15 @@ where
pub async fn throttle(
&self,
key: K,
max_per_period: Option<u64>,
max_requests_per_period: Option<u64>,
count: u64,
) -> anyhow::Result<DeferredRateLimitResult> {
let max_per_period = max_per_period.unwrap_or(self.rrl.max_requests_per_period);
let max_requests_per_period = max_requests_per_period.unwrap_or_else(|| {
self.default_max_requests_per_period
.unwrap_or(self.rrl.max_requests_per_period)
});
if max_per_period == 0 {
if max_requests_per_period == 0 {
return Ok(DeferredRateLimitResult::RetryNever);
}
@ -79,7 +90,7 @@ where
.get_with(key, async move {
// we do not use the try operator here because we want to be okay with redis errors
let redis_count = match rrl
.throttle_label(&redis_key, Some(max_per_period), count, true)
.throttle_label(&redis_key, Some(max_requests_per_period), count)
.await
{
Ok(RedisRateLimitResult::Allowed(count)) => {
@ -131,7 +142,7 @@ where
// assuming no other parallel futures incremented this key, this is the count that redis has
let expected_key_count = cached_key_count + count;
if expected_key_count > max_per_period {
if expected_key_count > max_requests_per_period {
// rate limit overshot!
let now = self.rrl.now_as_secs();
@ -149,7 +160,7 @@ where
let rrl = self.rrl.clone();
async move {
match rrl
.throttle_label(&redis_key, Some(max_per_period), count, false)
.throttle_label(&redis_key, Some(max_requests_per_period), count)
.await
{
Ok(RedisRateLimitResult::Allowed(count)) => {
@ -180,7 +191,7 @@ where
// if close to max_per_period, wait for redis
// TODO: how close should we allow? depends on max expected concurent requests from one user
if expected_key_count > max_per_period * 99 / 100 {
if expected_key_count > max_requests_per_period * 99 / 100 {
// close to period. don't risk it. wait on redis
Ok(rate_limit_f.await)
} else {

@ -1,6 +1,5 @@
//#![warn(missing_docs)]
use anyhow::Context;
use deadpool_redis::redis::AsyncCommands;
use std::ops::Add;
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::time::{Duration, Instant};
@ -72,7 +71,6 @@ impl RedisRateLimiter {
label: &str,
max_per_period: Option<u64>,
count: u64,
expire: bool,
) -> anyhow::Result<RedisRateLimitResult> {
let max_per_period = max_per_period.unwrap_or(self.max_requests_per_period);
@ -96,30 +94,24 @@ impl RedisRateLimiter {
// TODO: at high concurency, this gives "connection reset by peer" errors. at least they are off the hot path
// TODO: only set expire if this is a new key
let new_count: u64 = if expire {
trace!("redis incr+expire");
// TODO: automatic retry
let x: Vec<_> = redis::pipe()
.atomic()
// we could get the key first, but that means an extra redis call for every check. this seems better
.incr(&throttle_key, count)
// set expiration each time we set the key. ignore the result
.expire(&throttle_key, 1 + self.period as usize)
// TODO: NX will make it only set the expiration the first time. works in redis, but not elasticache
// .arg("NX")
.ignore()
// do the query
.query_async(&mut *conn)
.await
.context("increment rate limit and set expiration")?;
*x.first().expect("check redis")
} else {
trace!("redis incr only");
conn.incr(&throttle_key, count)
.await
.context("increment rate limit")?
};
trace!("redis incr+expire");
// TODO: automatic retry
let x: Vec<_> = redis::pipe()
.atomic()
// we could get the key first, but that means an extra redis call for every check. this seems better
.incr(&throttle_key, count)
// set expiration each time we set the key. ignore the result
.expire(&throttle_key, 1 + self.period as usize)
// TODO: NX will make it only set the expiration the first time. works in redis, but not elasticache
// .arg("NX")
.ignore()
// do the query
.query_async(&mut *conn)
.await
.context("increment rate limit and set expiration")?;
let new_count: u64 = *x.first().expect("check redis");
if new_count > max_per_period {
// TODO: this might actually be early if we are way over the count
@ -135,7 +127,7 @@ impl RedisRateLimiter {
}
#[inline]
pub async fn throttle(&self, expire: bool) -> anyhow::Result<RedisRateLimitResult> {
self.throttle_label("", None, 1, expire).await
pub async fn throttle(&self) -> anyhow::Result<RedisRateLimitResult> {
self.throttle_label("", None, 1).await
}
}

@ -104,6 +104,7 @@ pub struct Web3ProxyApp {
pub pending_transactions: Cache<TxHash, TxStatus, hashbrown::hash_map::DefaultHashBuilder>,
pub frontend_ip_rate_limiter: Option<DeferredRateLimiter<IpAddr>>,
pub frontend_key_rate_limiter: Option<DeferredRateLimiter<Uuid>>,
pub login_rate_limiter: Option<RedisRateLimiter>,
pub redis_pool: Option<RedisPool>,
pub user_cache: Cache<Uuid, UserKeyData, hashbrown::hash_map::DefaultHashBuilder>,
}
@ -336,22 +337,37 @@ impl Web3ProxyApp {
let mut frontend_ip_rate_limiter = None;
let mut frontend_key_rate_limiter = None;
let mut login_rate_limiter = None;
if let Some(redis_pool) = redis_pool.as_ref() {
let rrl = RedisRateLimiter::new(
let rpc_rrl = RedisRateLimiter::new(
"web3_proxy",
"frontend",
top_config.app.public_rate_limit_per_minute,
top_config.app.frontend_rate_limit_per_minute,
60.0,
redis_pool.clone(),
);
// these two rate limiters can share the base limiter
// TODO: take cache_size from config
frontend_ip_rate_limiter = Some(DeferredRateLimiter::<IpAddr>::new(
10_000,
"ip",
rrl.clone(),
rpc_rrl.clone(),
None,
));
frontend_key_rate_limiter = Some(DeferredRateLimiter::<Uuid>::new(
10_000, "key", rpc_rrl, None,
));
// don't defer this one because it will have a low request per peiod
login_rate_limiter = Some(RedisRateLimiter::new(
"web3_proxy",
"login",
top_config.app.login_rate_limit_per_minute,
60.0,
redis_pool.clone(),
));
frontend_key_rate_limiter = Some(DeferredRateLimiter::<Uuid>::new(10_000, "key", rrl));
}
// keep 1GB of blocks in the cache
@ -381,6 +397,7 @@ impl Web3ProxyApp {
pending_transactions,
frontend_ip_rate_limiter,
frontend_key_rate_limiter,
login_rate_limiter,
db_conn,
redis_pool,
app_metrics,

@ -214,7 +214,7 @@ mod tests {
default_requests_per_minute: Some(6_000_000),
min_sum_soft_limit: 1,
min_synced_rpcs: 1,
public_rate_limit_per_minute: 1_000_000,
frontend_rate_limit_per_minute: 1_000_000,
response_cache_max_bytes: 10_usize.pow(7),
redirect_public_url: "example.com/".to_string(),
redirect_user_url: "example.com/{{user_id}}".to_string(),

@ -67,8 +67,10 @@ pub struct AppConfig {
#[serde(default = "default_min_synced_rpcs")]
pub min_synced_rpcs: usize,
/// Set to 0 to block all anonymous requests
#[serde(default = "default_public_rate_limit_per_minute")]
pub public_rate_limit_per_minute: u64,
#[serde(default = "default_frontend_rate_limit_per_minute")]
pub frontend_rate_limit_per_minute: u64,
#[serde(default = "default_login_rate_limit_per_minute")]
pub login_rate_limit_per_minute: u64,
pub redis_url: Option<String>,
/// maximum size of the connection pool for the cache
/// If none, the minimum * 2 is used
@ -89,11 +91,16 @@ fn default_min_synced_rpcs() -> usize {
1
}
/// 0 blocks public requests by default.
fn default_public_rate_limit_per_minute() -> u64 {
/// 0 blocks anonymous requests by default.
fn default_frontend_rate_limit_per_minute() -> u64 {
0
}
/// Having a low amount of requests per minute for login is safest.
fn default_login_rate_limit_per_minute() -> u64 {
10
}
fn default_response_cache_max_bytes() -> usize {
// TODO: default to some percentage of the system?
// 100 megabytes

@ -5,6 +5,7 @@ use axum::headers::{Origin, Referer, UserAgent};
use deferred_rate_limiter::DeferredRateLimitResult;
use entities::user_keys;
use ipnet::IpNet;
use redis_rate_limiter::RedisRateLimitResult;
use sea_orm::{prelude::Decimal, ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter};
use serde::Serialize;
use std::{net::IpAddr, sync::Arc};
@ -122,6 +123,26 @@ impl AuthorizedRequest {
}
}
pub async fn login_is_authorized(
app: &Web3ProxyApp,
ip: IpAddr,
) -> Result<AuthorizedRequest, FrontendErrorResponse> {
// TODO: i think we could write an `impl From` for this
// TODO: move this to an AuthorizedUser extrator
let ip = match app.rate_limit_login(ip).await? {
RateLimitResult::AllowedIp(x) => x,
RateLimitResult::RateLimitedIp(x, retry_at) => {
return Err(FrontendErrorResponse::RateLimitedIp(x, retry_at));
}
// TODO: don't panic. give the user an error
x => unimplemented!("rate_limit_login shouldn't ever see these: {:?}", x),
};
let db = None;
Ok(AuthorizedRequest::Ip(db, ip))
}
pub async fn ip_is_authorized(
app: &Web3ProxyApp,
ip: IpAddr,
@ -169,6 +190,38 @@ pub async fn key_is_authorized(
}
impl Web3ProxyApp {
pub async fn rate_limit_login(&self, ip: IpAddr) -> anyhow::Result<RateLimitResult> {
// TODO: dry this up with rate_limit_by_key
// TODO: have a local cache because if we hit redis too hard we get errors
// TODO: query redis in the background so that users don't have to wait on this network request
if let Some(rate_limiter) = &self.login_rate_limiter {
match rate_limiter.throttle_label(&ip.to_string(), None, 1).await {
Ok(RedisRateLimitResult::Allowed(_)) => Ok(RateLimitResult::AllowedIp(ip)),
Ok(RedisRateLimitResult::RetryAt(retry_at, _)) => {
// TODO: set headers so they know when they can retry
// TODO: debug or trace?
// this is too verbose, but a stat might be good
trace!(?ip, "login rate limit exceeded until {:?}", retry_at);
Ok(RateLimitResult::RateLimitedIp(ip, Some(retry_at)))
}
Ok(RedisRateLimitResult::RetryNever) => {
// TODO: i don't think we'll get here. maybe if we ban an IP forever? seems unlikely
trace!(?ip, "login rate limit is 0");
Ok(RateLimitResult::RateLimitedIp(ip, None))
}
Err(err) => {
// internal error, not rate limit being hit
// TODO: i really want axum to do this for us in a single place.
error!(?err, "login rate limiter is unhappy. allowing ip");
Ok(RateLimitResult::AllowedIp(ip))
}
}
} else {
// TODO: if no redis, rate limit with a local cache? "warn!" probably isn't right
todo!("no rate limiter");
}
}
pub async fn rate_limit_by_ip(&self, ip: IpAddr) -> anyhow::Result<RateLimitResult> {
// TODO: dry this up with rate_limit_by_key
// TODO: have a local cache because if we hit redis too hard we get errors

@ -7,7 +7,7 @@
// I wonder how we handle payment
// probably have to do manual withdrawals
use super::authorization::ip_is_authorized;
use super::authorization::login_is_authorized;
use super::errors::FrontendResult;
use crate::{app::Web3ProxyApp, users::new_api_key};
use anyhow::Context;
@ -42,7 +42,7 @@ pub async fn get_login(
// TODO: allow ENS names here?
Path(mut params): Path<HashMap<String, String>>,
) -> FrontendResult {
let _ip = ip_is_authorized(&app, ip).await?;
let _ = login_is_authorized(&app, ip).await?;
// at first i thought about checking that user_address is in our db
// but theres no need to separate the registration and login flows
@ -119,6 +119,7 @@ pub struct PostLoginQuery {
}
/// JSON body to our `post_login` handler.
/// TODO: this should be an enum with the different login methods having different structs
#[derive(Deserialize)]
pub struct PostLogin {
address: Address,
@ -145,7 +146,7 @@ pub async fn post_login(
Json(payload): Json<PostLogin>,
Query(query): Query<PostLoginQuery>,
) -> FrontendResult {
let _ip = ip_is_authorized(&app, ip).await?;
let _ = login_is_authorized(&app, ip).await?;
if let Some(invite_code) = &app.config.invite_code {
// we don't do per-user referral codes because we shouldn't collect what we don't need.
@ -291,7 +292,7 @@ pub async fn post_user(
Extension(app): Extension<Arc<Web3ProxyApp>>,
Json(payload): Json<PostUser>,
) -> FrontendResult {
let _ip = ip_is_authorized(&app, ip).await?;
let _ = login_is_authorized(&app, ip).await?;
let user = ProtectedAction::PostUser
.verify(app.as_ref(), bearer_token, &payload.primary_address)

@ -790,7 +790,7 @@ impl Web3Connection {
// check rate limits
if let Some(ratelimiter) = self.hard_limit.as_ref() {
// TODO: how should we know if we should set expire or not?
match ratelimiter.throttle(true).await? {
match ratelimiter.throttle().await? {
RedisRateLimitResult::Allowed(_) => {
trace!("rate limit succeeded")
}