work in progress

This commit is contained in:
Bryan Stitt 2022-09-15 17:57:24 +00:00
parent 369187452f
commit 12b6d01434
18 changed files with 243 additions and 118 deletions

16
Cargo.lock generated
View File

@ -1256,6 +1256,17 @@ dependencies = [
"tokio", "tokio",
] ]
[[package]]
name = "deferred-rate-limiter"
version = "0.2.0"
dependencies = [
"anyhow",
"moka",
"redis-rate-limiter",
"tokio",
"tracing",
]
[[package]] [[package]]
name = "der" name = "der"
version = "0.5.1" version = "0.5.1"
@ -3752,7 +3763,7 @@ dependencies = [
] ]
[[package]] [[package]]
name = "redis-rate-limit" name = "redis-rate-limiter"
version = "0.2.0" version = "0.2.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
@ -5529,6 +5540,7 @@ dependencies = [
"axum-macros", "axum-macros",
"counter", "counter",
"dashmap", "dashmap",
"deferred-rate-limiter",
"derive_more", "derive_more",
"dotenv", "dotenv",
"entities", "entities",
@ -5548,7 +5560,7 @@ dependencies = [
"petgraph", "petgraph",
"proctitle", "proctitle",
"rand 0.8.5", "rand 0.8.5",
"redis-rate-limit", "redis-rate-limiter",
"regex", "regex",
"reqwest", "reqwest",
"rustc-hash", "rustc-hash",

View File

@ -1,8 +1,9 @@
[workspace] [workspace]
members = [ members = [
"deferred-rate-limiter",
"entities", "entities",
"migration", "migration",
"redis-rate-limit", "redis-rate-limiter",
"web3_proxy", "web3_proxy",
] ]

View File

@ -206,6 +206,8 @@ These are not yet ordered.
- [ ] on ETH, we no longer use total difficulty, but other chains might - [ ] on ETH, we no longer use total difficulty, but other chains might
- if total difficulty is not on the block and we aren't on ETH, fetch the full block instead of just the header - if total difficulty is not on the block and we aren't on ETH, fetch the full block instead of just the header
- if total difficulty is set and non-zero, use it for consensus instead of just the number - if total difficulty is set and non-zero, use it for consensus instead of just the number
- [ ] if we subscribe to a server that is syncing, it gives us null block_data_limit. when it catches up, we don't ever send queries to it. we need to recheck block_data_limit
new endpoints for users (not totally sure about the exact paths, but these features are all needed): new endpoints for users (not totally sure about the exact paths, but these features are all needed):
- [x] GET /u/:api_key - [x] GET /u/:api_key

View File

@ -0,0 +1,13 @@
[package]
name = "deferred-rate-limiter"
version = "0.2.0"
authors = ["Bryan Stitt <bryan@stitthappens.com>"]
edition = "2021"
[dependencies]
redis-rate-limiter = { path = "../redis-rate-limiter" }
anyhow = "1.0.65"
moka = { version = "0.9.4", default-features = false, features = ["future"] }
tokio = "1.21.1"
tracing = "0.1.36"

View File

@ -0,0 +1,119 @@
//#![warn(missing_docs)]
use moka::future::Cache;
use redis_rate_limiter::{RedisRateLimitResult, RedisRateLimiter};
use std::cell::Cell;
use std::cmp::Eq;
use std::fmt::Display;
use std::hash::Hash;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{atomic::AtomicU64, Arc};
use tokio::time::Instant;
/// A local cache that sits in front of a RedisRateLimiter
/// Generic accross the key so it is simple to use with IPs or user keys
pub struct DeferredRateLimiter<K>
where
K: Send + Sync,
{
local_cache: Cache<K, Arc<AtomicU64>>,
prefix: String,
rrl: RedisRateLimiter,
}
pub enum DeferredRateLimitResult {
Allowed,
RetryAt(Instant),
RetryNever,
}
impl<K> DeferredRateLimiter<K>
where
K: Copy + Display + Hash + Eq + Send + Sync + 'static,
{
pub fn new(cache_size: u64, prefix: &str, rrl: RedisRateLimiter) -> Self {
Self {
local_cache: Cache::new(cache_size),
prefix: prefix.to_string(),
rrl,
}
}
/// if setting max_per_period, be sure to keep the period the same for all requests to this label
pub async fn throttle(
&self,
key: &K,
max_per_period: Option<u64>,
count: u64,
) -> anyhow::Result<DeferredRateLimitResult> {
let max_per_period = max_per_period.unwrap_or(self.rrl.max_requests_per_period);
if max_per_period == 0 {
return Ok(DeferredRateLimitResult::RetryNever);
}
let arc_new_entry = Arc::new(AtomicBool::new(false));
let arc_retry_at = Arc::new(Cell::new(None));
// TODO: DO NOT UNWRAP HERE. figure out how to handle anyhow error being wrapped in an Arc
// TODO: i'm sure this could be a lot better. but race conditions make this hard to think through. brain needs sleep
let key_count = {
let arc_new_entry = arc_new_entry.clone();
let arc_retry_at = arc_retry_at.clone();
self.local_cache
.try_get_with(*key, async move {
arc_new_entry.store(true, Ordering::Release);
let label = format!("{}:{}", self.prefix, key);
let redis_count = match self
.rrl
.throttle_label(&label, Some(max_per_period), count)
.await?
{
RedisRateLimitResult::Allowed(count) => count,
RedisRateLimitResult::RetryAt(retry_at, count) => {
arc_retry_at.set(Some(retry_at));
count
}
RedisRateLimitResult::RetryNever => unimplemented!(),
};
Ok::<_, anyhow::Error>(Arc::new(AtomicU64::new(redis_count)))
})
.await
.unwrap()
};
if arc_new_entry.load(Ordering::Acquire) {
// new entry
if let Some(retry_at) = arc_retry_at.get() {
Ok(DeferredRateLimitResult::RetryAt(retry_at))
} else {
Ok(DeferredRateLimitResult::Allowed)
}
} else {
// we have a cached amount here
// increment our local count if
let f = async move {
let label = format!("{}:{}", self.prefix, key);
let redis_count = match self
.rrl
.throttle_label(&label, Some(max_per_period), count)
.await?
{
RedisRateLimitResult::Allowed(count) => todo!("do something with allow"),
RedisRateLimitResult::RetryAt(retry_at, count) => todo!("do something with retry at")
RedisRateLimitResult::RetryNever => unimplemented!(),
};
Ok::<_, anyhow::Error>(())
};
todo!("write more");
}
}
}

View File

@ -1,17 +0,0 @@
// pub use bb8_redis::bb8::ErrorSink as Bb8ErrorSync;
// pub use bb8_redis::redis::RedisError;
// use tracing::error;
// #[derive(Debug, Clone)]
// pub struct RedisErrorSink;
// impl Bb8ErrorSync<RedisError> for RedisErrorSink {
// fn sink(&self, err: RedisError) {
// error!(?err, "redis error");
// }
// fn boxed_clone(&self) -> Box<dyn Bb8ErrorSync<RedisError>> {
// Box::new(self.clone())
// }
// }

View File

@ -1,5 +1,5 @@
[package] [package]
name = "redis-rate-limit" name = "redis-rate-limiter"
version = "0.2.0" version = "0.2.0"
authors = ["Bryan Stitt <bryan@stitthappens.com>"] authors = ["Bryan Stitt <bryan@stitthappens.com>"]
edition = "2021" edition = "2021"

View File

@ -1,41 +1,39 @@
//#![warn(missing_docs)] //#![warn(missing_docs)]
mod errors;
use anyhow::Context; use anyhow::Context;
use deadpool_redis::redis::pipe;
use std::ops::Add; use std::ops::Add;
use std::time::{SystemTime, UNIX_EPOCH}; use std::time::{SystemTime, UNIX_EPOCH};
use tokio::time::{Duration, Instant}; use tokio::time::{Duration, Instant};
use tracing::{debug, trace}; use tracing::{debug, trace};
pub use deadpool_redis::redis; pub use deadpool_redis::redis;
pub use deadpool_redis::{Config, Connection, Manager, Pool, Runtime}; pub use deadpool_redis::{
Config as RedisConfig, Connection as RedisConnection, Manager as RedisManager,
Pool as RedisPool, Runtime as DeadpoolRuntime,
};
// pub use crate::errors::{RedisError, RedisErrorSink}; #[derive(Clone)]
// pub use bb8_redis::{bb8, redis, RedisConnectionManager}; pub struct RedisRateLimiter {
pub struct RedisRateLimit {
pool: Pool,
key_prefix: String, key_prefix: String,
/// The default maximum requests allowed in a period. /// The default maximum requests allowed in a period.
max_requests_per_period: u64, pub max_requests_per_period: u64,
/// seconds /// seconds
period: f32, pub period: f32,
pool: RedisPool,
} }
pub enum ThrottleResult { pub enum RedisRateLimitResult {
Allowed, Allowed(u64),
RetryAt(Instant), RetryAt(Instant, u64),
RetryNever, RetryNever,
} }
impl RedisRateLimit { impl RedisRateLimiter {
pub fn new( pub fn new(
pool: Pool,
app: &str, app: &str,
label: &str, label: &str,
max_requests_per_period: u64, max_requests_per_period: u64,
period: f32, period: f32,
pool: RedisPool,
) -> Self { ) -> Self {
let key_prefix = format!("{}:rrl:{}", app, label); let key_prefix = format!("{}:rrl:{}", app, label);
@ -49,17 +47,16 @@ impl RedisRateLimit {
/// label might be an ip address or a user_key id. /// label might be an ip address or a user_key id.
/// if setting max_per_period, be sure to keep the period the same for all requests to this label /// if setting max_per_period, be sure to keep the period the same for all requests to this label
/// TODO:
pub async fn throttle_label( pub async fn throttle_label(
&self, &self,
label: &str, label: &str,
max_per_period: Option<u64>, max_per_period: Option<u64>,
count: u64, count: u64,
) -> anyhow::Result<ThrottleResult> { ) -> anyhow::Result<RedisRateLimitResult> {
let max_per_period = max_per_period.unwrap_or(self.max_requests_per_period); let max_per_period = max_per_period.unwrap_or(self.max_requests_per_period);
if max_per_period == 0 { if max_per_period == 0 {
return Ok(ThrottleResult::RetryNever); return Ok(RedisRateLimitResult::RetryNever);
} }
let now = SystemTime::now() let now = SystemTime::now()
@ -70,13 +67,14 @@ impl RedisRateLimit {
// if self.period is 60, period_id will be the minute of the current time // if self.period is 60, period_id will be the minute of the current time
let period_id = (now / self.period) % self.period; let period_id = (now / self.period) % self.period;
// TODO: include max per period in the throttle key?
let throttle_key = format!("{}:{}:{}", self.key_prefix, label, period_id); let throttle_key = format!("{}:{}:{}", self.key_prefix, label, period_id);
let mut conn = self.pool.get().await.context("throttle")?; let mut conn = self.pool.get().await.context("throttle")?;
// TODO: at high concurency, i think this is giving errors // TODO: at high concurency, i think this is giving errors
// TODO: i'm starting to think that bb8 has a bug // TODO: i'm starting to think that bb8 has a bug
let x: Vec<u64> = pipe() let x: Vec<u64> = redis::pipe()
// we could get the key first, but that means an extra redis call for every check. this seems better // we could get the key first, but that means an extra redis call for every check. this seems better
.incr(&throttle_key, count) .incr(&throttle_key, count)
// set expiration each time we set the key. ignore the result // set expiration each time we set the key. ignore the result
@ -89,28 +87,25 @@ impl RedisRateLimit {
.await .await
.context("increment rate limit")?; .context("increment rate limit")?;
let new_count = x.first().context("check rate limit result")?; // TODO: is there a better way to do this?
let new_count = *x.first().context("check rate limit result")?;
if new_count > &max_per_period { if new_count > max_per_period {
let seconds_left_in_period = self.period - (now % self.period); let seconds_left_in_period = self.period - (now % self.period);
let retry_at = Instant::now().add(Duration::from_secs_f32(seconds_left_in_period)); let retry_at = Instant::now().add(Duration::from_secs_f32(seconds_left_in_period));
debug!(%label, ?retry_at, "rate limited: {}/{}", new_count, max_per_period); debug!(%label, ?retry_at, "rate limited: {}/{}", new_count, max_per_period);
Ok(ThrottleResult::RetryAt(retry_at)) Ok(RedisRateLimitResult::RetryAt(retry_at, new_count))
} else { } else {
trace!(%label, "NOT rate limited: {}/{}", new_count, max_per_period); trace!(%label, "NOT rate limited: {}/{}", new_count, max_per_period);
Ok(ThrottleResult::Allowed) Ok(RedisRateLimitResult::Allowed(new_count))
} }
} }
#[inline] #[inline]
pub async fn throttle(&self) -> anyhow::Result<ThrottleResult> { pub async fn throttle(&self) -> anyhow::Result<RedisRateLimitResult> {
self.throttle_label("", None, 1).await self.throttle_label("", None, 1).await
} }
pub fn max_requests_per_period(&self) -> u64 {
self.max_requests_per_period
}
} }

View File

@ -11,10 +11,13 @@ default = ["deadlock_detection", "verbose_db"]
deadlock_detection = ["parking_lot/deadlock_detection"] deadlock_detection = ["parking_lot/deadlock_detection"]
verbose_db = ["sea-orm/debug-print"] verbose_db = ["sea-orm/debug-print"]
# TODO: turn tokio-console on with a feature. console-subscriber = { version = "0.1.7" }
[dependencies] [dependencies]
deferred-rate-limiter = { path = "../deferred-rate-limiter" }
entities = { path = "../entities" } entities = { path = "../entities" }
migration = { path = "../migration" } migration = { path = "../migration" }
redis-rate-limit = { path = "../redis-rate-limit" } redis-rate-limiter = { path = "../redis-rate-limiter" }
anyhow = { version = "1.0.65", features = ["backtrace"] } anyhow = { version = "1.0.65", features = ["backtrace"] }
arc-swap = "1.5.1" arc-swap = "1.5.1"
@ -23,7 +26,6 @@ axum = { version = "0.5.16", features = ["headers", "serde_json", "tokio-tungste
axum-auth = "0.3.0" axum-auth = "0.3.0"
axum-client-ip = "0.2.0" axum-client-ip = "0.2.0"
axum-macros = "0.2.3" axum-macros = "0.2.3"
# TODO: turn tokio-console on with a feature. console-subscriber = { version = "0.1.7" }
counter = "0.5.6" counter = "0.5.6"
dashmap = "5.4.0" dashmap = "5.4.0"
derive_more = "0.99.17" derive_more = "0.99.17"

View File

@ -12,6 +12,7 @@ use crate::rpcs::request::OpenRequestHandleMetrics;
use crate::rpcs::transactions::TxStatus; use crate::rpcs::transactions::TxStatus;
use anyhow::Context; use anyhow::Context;
use axum::extract::ws::Message; use axum::extract::ws::Message;
use deferred_rate_limiter::DeferredRateLimiter;
use derive_more::From; use derive_more::From;
use ethers::core::utils::keccak256; use ethers::core::utils::keccak256;
use ethers::prelude::{Address, Block, Bytes, TxHash, H256, U64}; use ethers::prelude::{Address, Block, Bytes, TxHash, H256, U64};
@ -24,11 +25,12 @@ use hashbrown::HashMap;
use metered::{metered, ErrorCount, HitCount, ResponseTime, Throughput}; use metered::{metered, ErrorCount, HitCount, ResponseTime, Throughput};
use migration::{Migrator, MigratorTrait}; use migration::{Migrator, MigratorTrait};
use moka::future::Cache; use moka::future::Cache;
use redis_rate_limit::{Config as RedisConfig, Pool as RedisPool, RedisRateLimit, Runtime}; use redis_rate_limiter::{DeadpoolRuntime, RedisConfig, RedisPool, RedisRateLimiter};
use sea_orm::DatabaseConnection; use sea_orm::DatabaseConnection;
use serde::Serialize; use serde::Serialize;
use serde_json::json; use serde_json::json;
use std::fmt; use std::fmt;
use std::net::IpAddr;
use std::pin::Pin; use std::pin::Pin;
use std::str::FromStr; use std::str::FromStr;
use std::sync::atomic::{self, AtomicUsize}; use std::sync::atomic::{self, AtomicUsize};
@ -83,7 +85,8 @@ pub struct Web3ProxyApp {
open_request_handle_metrics: Arc<OpenRequestHandleMetrics>, open_request_handle_metrics: Arc<OpenRequestHandleMetrics>,
/// store pending transactions that we've seen so that we don't send duplicates to subscribers /// store pending transactions that we've seen so that we don't send duplicates to subscribers
pub pending_transactions: Cache<TxHash, TxStatus>, pub pending_transactions: Cache<TxHash, TxStatus>,
pub frontend_rate_limiter: Option<RedisRateLimit>, pub frontend_ip_rate_limiter: Option<DeferredRateLimiter<IpAddr>>,
pub frontend_key_rate_limiter: Option<DeferredRateLimiter<Uuid>>,
pub redis_pool: Option<RedisPool>, pub redis_pool: Option<RedisPool>,
pub user_cache: Cache<Uuid, UserCacheValue>, pub user_cache: Cache<Uuid, UserCacheValue>,
} }
@ -224,7 +227,7 @@ impl Web3ProxyApp {
.create_timeout(Some(Duration::from_secs(5))) .create_timeout(Some(Duration::from_secs(5)))
.max_size(redis_max_connections) .max_size(redis_max_connections)
.recycle_timeout(Some(Duration::from_secs(5))) .recycle_timeout(Some(Duration::from_secs(5)))
.runtime(Runtime::Tokio1) .runtime(DeadpoolRuntime::Tokio1)
.build()?; .build()?;
// test the pool // test the pool
@ -306,18 +309,29 @@ impl Web3ProxyApp {
Some(private_rpcs) Some(private_rpcs)
}; };
let frontend_rate_limiter = redis_pool.as_ref().map(|redis_pool| { let mut frontend_ip_rate_limiter = None;
RedisRateLimit::new( let mut frontend_key_rate_limiter = None;
redis_pool.clone(), if let Some(redis_pool) = redis_pool.as_ref() {
let rrl = RedisRateLimiter::new(
"web3_proxy", "web3_proxy",
"frontend", "frontend",
top_config.app.public_rate_limit_per_minute, top_config.app.public_rate_limit_per_minute,
60.0, 60.0,
) redis_pool.clone(),
}); );
// TODO: change this to a sized cache // TODO: take cache_size from config
frontend_ip_rate_limiter = Some(DeferredRateLimiter::<IpAddr>::new(
10_000,
"ip",
rrl.clone(),
));
frontend_key_rate_limiter = Some(DeferredRateLimiter::<Uuid>::new(10_000, "key", rrl));
}
// TODO: change this to a sized cache. theres some potentially giant responses that will break things
let response_cache = Cache::new(10_000); let response_cache = Cache::new(10_000);
let user_cache = Cache::new(10_000); let user_cache = Cache::new(10_000);
let app = Self { let app = Self {
@ -328,7 +342,8 @@ impl Web3ProxyApp {
head_block_receiver, head_block_receiver,
pending_tx_sender, pending_tx_sender,
pending_transactions, pending_transactions,
frontend_rate_limiter, frontend_ip_rate_limiter,
frontend_key_rate_limiter,
db_conn, db_conn,
redis_pool, redis_pool,
app_metrics, app_metrics,
@ -610,7 +625,7 @@ impl Web3ProxyApp {
} }
#[instrument(skip_all)] #[instrument(skip_all)]
pub async fn redis_conn(&self) -> anyhow::Result<redis_rate_limit::Connection> { pub async fn redis_conn(&self) -> anyhow::Result<redis_rate_limiter::RedisConnection> {
match self.redis_pool.as_ref() { match self.redis_pool.as_ref() {
None => Err(anyhow::anyhow!("no redis server configured")), None => Err(anyhow::anyhow!("no redis server configured")),
Some(redis_pool) => { Some(redis_pool) => {

View File

@ -116,7 +116,7 @@ impl Web3ConnectionConfig {
pub async fn spawn( pub async fn spawn(
self, self,
name: String, name: String,
redis_pool: Option<redis_rate_limit::Pool>, redis_pool: Option<redis_rate_limiter::RedisPool>,
chain_id: u64, chain_id: u64,
http_client: Option<reqwest::Client>, http_client: Option<reqwest::Client>,
http_interval_sender: Option<Arc<broadcast::Sender<()>>>, http_interval_sender: Option<Arc<broadcast::Sender<()>>>,

View File

@ -5,7 +5,7 @@ use axum::{
Json, Json,
}; };
use derive_more::From; use derive_more::From;
use redis_rate_limit::redis::RedisError; use redis_rate_limiter::redis::RedisError;
use sea_orm::DbErr; use sea_orm::DbErr;
use std::{error::Error, net::IpAddr}; use std::{error::Error, net::IpAddr};
use tokio::time::Instant; use tokio::time::Instant;
@ -18,9 +18,7 @@ pub type FrontendResult = Result<Response, FrontendErrorResponse>;
pub enum FrontendErrorResponse { pub enum FrontendErrorResponse {
Anyhow(anyhow::Error), Anyhow(anyhow::Error),
Box(Box<dyn Error>), Box(Box<dyn Error>),
// TODO: update these for Redis
Redis(RedisError), Redis(RedisError),
// RedisRun(RunError<RedisError>),
Response(Response), Response(Response),
Database(DbErr), Database(DbErr),
RateLimitedUser(u64, Option<Instant>), RateLimitedUser(u64, Option<Instant>),

View File

@ -1,14 +1,14 @@
use super::errors::FrontendErrorResponse; use super::errors::FrontendErrorResponse;
use crate::app::{UserCacheValue, Web3ProxyApp}; use crate::app::{UserCacheValue, Web3ProxyApp};
use anyhow::Context; use anyhow::Context;
use deferred_rate_limiter::DeferredRateLimitResult;
use entities::user_keys; use entities::user_keys;
use redis_rate_limit::ThrottleResult;
use sea_orm::{ use sea_orm::{
ColumnTrait, DeriveColumn, EntityTrait, EnumIter, IdenStatic, QueryFilter, QuerySelect, ColumnTrait, DeriveColumn, EntityTrait, EnumIter, IdenStatic, QueryFilter, QuerySelect,
}; };
use std::{net::IpAddr, time::Duration}; use std::{net::IpAddr, time::Duration};
use tokio::time::Instant; use tokio::time::Instant;
use tracing::{debug, error, trace}; use tracing::{error, trace};
use uuid::Uuid; use uuid::Uuid;
#[derive(Debug)] #[derive(Debug)]
@ -55,28 +55,19 @@ impl Web3ProxyApp {
// TODO: dry this up with rate_limit_by_key // TODO: dry this up with rate_limit_by_key
// TODO: have a local cache because if we hit redis too hard we get errors // TODO: have a local cache because if we hit redis too hard we get errors
// TODO: query redis in the background so that users don't have to wait on this network request // TODO: query redis in the background so that users don't have to wait on this network request
if let Some(rate_limiter) = &self.frontend_rate_limiter { if let Some(rate_limiter) = &self.frontend_ip_rate_limiter {
let rate_limiter_label = format!("ip-{}", ip); match rate_limiter.throttle(&ip, None, 1).await {
Ok(DeferredRateLimitResult::Allowed) => Ok(RateLimitResult::AllowedIp(ip)),
match rate_limiter Ok(DeferredRateLimitResult::RetryAt(retry_at)) => {
.throttle_label(&rate_limiter_label, None, 1)
.await
{
Ok(ThrottleResult::Allowed) => Ok(RateLimitResult::AllowedIp(ip)),
Ok(ThrottleResult::RetryAt(retry_at)) => {
// TODO: set headers so they know when they can retry // TODO: set headers so they know when they can retry
// TODO: debug or trace? // TODO: debug or trace?
// this is too verbose, but a stat might be good // this is too verbose, but a stat might be good
trace!( trace!(?ip, "rate limit exceeded until {:?}", retry_at);
?rate_limiter_label,
"rate limit exceeded until {:?}",
retry_at
);
Ok(RateLimitResult::RateLimitedIp(ip, Some(retry_at))) Ok(RateLimitResult::RateLimitedIp(ip, Some(retry_at)))
} }
Ok(ThrottleResult::RetryNever) => { Ok(DeferredRateLimitResult::RetryNever) => {
// TODO: i don't think we'll get here. maybe if we ban an IP forever? seems unlikely // TODO: i don't think we'll get here. maybe if we ban an IP forever? seems unlikely
debug!(?rate_limiter_label, "rate limit exceeded"); trace!(?ip, "rate limit is 0");
Ok(RateLimitResult::RateLimitedIp(ip, None)) Ok(RateLimitResult::RateLimitedIp(ip, None))
} }
Err(err) => { Err(err) => {
@ -175,38 +166,32 @@ impl Web3ProxyApp {
// user key is valid. now check rate limits // user key is valid. now check rate limits
// TODO: this is throwing errors when curve-api hits us with high concurrency. investigate i think its bb8's fault // TODO: this is throwing errors when curve-api hits us with high concurrency. investigate i think its bb8's fault
if let Some(rate_limiter) = &self.frontend_rate_limiter { if let Some(rate_limiter) = &self.frontend_key_rate_limiter {
// TODO: query redis in the background so that users don't have to wait on this network request // TODO: query redis in the background so that users don't have to wait on this network request
// TODO: better key? have a prefix so its easy to delete all of these // TODO: better key? have a prefix so its easy to delete all of these
// TODO: we should probably hash this or something // TODO: we should probably hash this or something
let rate_limiter_label = format!("user-{}", user_key);
match rate_limiter match rate_limiter
.throttle_label( .throttle(&user_key, Some(user_data.user_count_per_period), 1)
&rate_limiter_label,
Some(user_data.user_count_per_period),
1,
)
.await .await
{ {
Ok(ThrottleResult::Allowed) => Ok(RateLimitResult::AllowedUser(user_data.user_id)), Ok(DeferredRateLimitResult::Allowed) => {
Ok(ThrottleResult::RetryAt(retry_at)) => { Ok(RateLimitResult::AllowedUser(user_data.user_id))
}
Ok(DeferredRateLimitResult::RetryAt(retry_at)) => {
// TODO: set headers so they know when they can retry // TODO: set headers so they know when they can retry
// TODO: debug or trace? // TODO: debug or trace?
// this is too verbose, but a stat might be good // this is too verbose, but a stat might be good
trace!( // TODO: keys are secrets! use the id instead
?rate_limiter_label, trace!(?user_key, "rate limit exceeded until {:?}", retry_at);
"rate limit exceeded until {:?}",
retry_at
);
Ok(RateLimitResult::RateLimitedUser( Ok(RateLimitResult::RateLimitedUser(
user_data.user_id, user_data.user_id,
Some(retry_at), Some(retry_at),
)) ))
} }
Ok(ThrottleResult::RetryNever) => { Ok(DeferredRateLimitResult::RetryNever) => {
// TODO: i don't think we'll get here. maybe if we ban an IP forever? seems unlikely // TODO: i don't think we'll get here. maybe if we ban an IP forever? seems unlikely
debug!(?rate_limiter_label, "rate limit exceeded"); // TODO: keys are secret. don't log them!
trace!(?user_key, "rate limit is 0");
Ok(RateLimitResult::RateLimitedUser(user_data.user_id, None)) Ok(RateLimitResult::RateLimitedUser(user_data.user_id, None))
} }
Err(err) => { Err(err) => {

View File

@ -19,13 +19,11 @@ use axum::{
use axum_auth::AuthBearer; use axum_auth::AuthBearer;
use axum_client_ip::ClientIp; use axum_client_ip::ClientIp;
use axum_macros::debug_handler; use axum_macros::debug_handler;
use http::StatusCode;
use uuid::Uuid;
// use entities::sea_orm_active_enums::Role;
use entities::{user, user_keys}; use entities::{user, user_keys};
use ethers::{prelude::Address, types::Bytes}; use ethers::{prelude::Address, types::Bytes};
use hashbrown::HashMap; use hashbrown::HashMap;
use redis_rate_limit::redis::AsyncCommands; use http::StatusCode;
use redis_rate_limiter::redis::AsyncCommands;
use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, QueryFilter, TransactionTrait}; use sea_orm::{ActiveModelTrait, ColumnTrait, EntityTrait, QueryFilter, TransactionTrait};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use siwe::Message; use siwe::Message;
@ -33,6 +31,7 @@ use std::ops::Add;
use std::sync::Arc; use std::sync::Arc;
use time::{Duration, OffsetDateTime}; use time::{Duration, OffsetDateTime};
use ulid::Ulid; use ulid::Ulid;
use uuid::Uuid;
// TODO: how do we customize axum's error response? I think we probably want an enum that implements IntoResponse instead // TODO: how do we customize axum's error response? I think we probably want an enum that implements IntoResponse instead
#[debug_handler] #[debug_handler]

View File

@ -4,6 +4,5 @@ pub mod config;
pub mod frontend; pub mod frontend;
pub mod jsonrpc; pub mod jsonrpc;
pub mod metrics; pub mod metrics;
pub mod rate_limiter;
pub mod rpcs; pub mod rpcs;
pub mod users; pub mod users;

View File

@ -10,7 +10,7 @@ use futures::future::try_join_all;
use futures::StreamExt; use futures::StreamExt;
use parking_lot::RwLock; use parking_lot::RwLock;
use rand::Rng; use rand::Rng;
use redis_rate_limit::{Pool as RedisPool, RedisRateLimit, ThrottleResult}; use redis_rate_limiter::{RedisPool, RedisRateLimitResult, RedisRateLimiter};
use serde::ser::{SerializeStruct, Serializer}; use serde::ser::{SerializeStruct, Serializer};
use serde::Serialize; use serde::Serialize;
use std::cmp::min; use std::cmp::min;
@ -23,11 +23,12 @@ use tokio::sync::RwLock as AsyncRwLock;
use tokio::time::{interval, sleep, sleep_until, Duration, MissedTickBehavior}; use tokio::time::{interval, sleep, sleep_until, Duration, MissedTickBehavior};
use tracing::{debug, error, info, instrument, trace, warn}; use tracing::{debug, error, info, instrument, trace, warn};
/// An active connection to a Web3Rpc /// An active connection to a Web3 RPC server like geth or erigon.
pub struct Web3Connection { pub struct Web3Connection {
pub name: String, pub name: String,
/// TODO: can we get this from the provider? do we even need it? /// TODO: can we get this from the provider? do we even need it?
url: String, url: String,
/// Some connections use an http_client. we keep a clone for reconnecting
http_client: Option<reqwest::Client>, http_client: Option<reqwest::Client>,
/// keep track of currently open requests. We sort on this /// keep track of currently open requests. We sort on this
pub(super) active_requests: AtomicU32, pub(super) active_requests: AtomicU32,
@ -39,7 +40,8 @@ pub struct Web3Connection {
/// it is an async lock because we hold it open across awaits /// it is an async lock because we hold it open across awaits
pub(super) provider: AsyncRwLock<Option<Arc<Web3Provider>>>, pub(super) provider: AsyncRwLock<Option<Arc<Web3Provider>>>,
/// rate limits are stored in a central redis so that multiple proxies can share their rate limits /// rate limits are stored in a central redis so that multiple proxies can share their rate limits
hard_limit: Option<RedisRateLimit>, /// We do not use the deferred rate limiter because going over limits would cause errors
hard_limit: Option<RedisRateLimiter>,
/// used for load balancing to the least loaded server /// used for load balancing to the least loaded server
pub(super) soft_limit: u32, pub(super) soft_limit: u32,
/// TODO: have an enum for this so that "no limit" prints pretty? /// TODO: have an enum for this so that "no limit" prints pretty?
@ -74,14 +76,14 @@ impl Web3Connection {
weight: u32, weight: u32,
open_request_handle_metrics: Arc<OpenRequestHandleMetrics>, open_request_handle_metrics: Arc<OpenRequestHandleMetrics>,
) -> anyhow::Result<(Arc<Web3Connection>, AnyhowJoinHandle<()>)> { ) -> anyhow::Result<(Arc<Web3Connection>, AnyhowJoinHandle<()>)> {
let hard_limit = hard_limit.map(|(hard_rate_limit, redis_conection)| { let hard_limit = hard_limit.map(|(hard_rate_limit, redis_pool)| {
// TODO: allow configurable period and max_burst // TODO: is cache size 1 okay? i think we need
RedisRateLimit::new( RedisRateLimiter::new(
redis_conection,
"web3_proxy", "web3_proxy",
&format!("{}:{}", chain_id, url_str), &format!("{}:{}", chain_id, url_str),
hard_rate_limit, hard_rate_limit,
60.0, 60.0,
redis_pool,
) )
}); });
@ -755,10 +757,10 @@ impl Web3Connection {
// check rate limits // check rate limits
if let Some(ratelimiter) = self.hard_limit.as_ref() { if let Some(ratelimiter) = self.hard_limit.as_ref() {
match ratelimiter.throttle().await? { match ratelimiter.throttle().await? {
ThrottleResult::Allowed => { RedisRateLimitResult::Allowed(_) => {
trace!("rate limit succeeded") trace!("rate limit succeeded")
} }
ThrottleResult::RetryAt(retry_at) => { RedisRateLimitResult::RetryAt(retry_at, _) => {
// rate limit failed // rate limit failed
// save the smallest retry_after. if nothing succeeds, return an Err with retry_after in it // save the smallest retry_after. if nothing succeeds, return an Err with retry_after in it
// TODO: use tracing better // TODO: use tracing better
@ -767,7 +769,7 @@ impl Web3Connection {
return Ok(OpenRequestResult::RetryAt(retry_at)); return Ok(OpenRequestResult::RetryAt(retry_at));
} }
ThrottleResult::RetryNever => { RedisRateLimitResult::RetryNever => {
return Ok(OpenRequestResult::RetryNever); return Ok(OpenRequestResult::RetryNever);
} }
} }

View File

@ -56,7 +56,7 @@ impl Web3Connections {
chain_id: u64, chain_id: u64,
server_configs: HashMap<String, Web3ConnectionConfig>, server_configs: HashMap<String, Web3ConnectionConfig>,
http_client: Option<reqwest::Client>, http_client: Option<reqwest::Client>,
redis_pool: Option<redis_rate_limit::Pool>, redis_pool: Option<redis_rate_limiter::RedisPool>,
block_map: BlockHashesMap, block_map: BlockHashesMap,
head_block_sender: Option<watch::Sender<ArcBlock>>, head_block_sender: Option<watch::Sender<ArcBlock>>,
min_sum_soft_limit: u32, min_sum_soft_limit: u32,