drop redis-cell in favor of simpler query
This commit is contained in:
parent
9fcf84c0e0
commit
e9000d1f61
41
Cargo.lock
generated
41
Cargo.lock
generated
@ -1915,9 +1915,9 @@ checksum = "e6d5a32815ae3f33302d95fdcb2ce17862f8c65363dcfd29360480ba1001fc9c"
|
||||
|
||||
[[package]]
|
||||
name = "futures"
|
||||
version = "0.3.21"
|
||||
version = "0.3.23"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f73fe65f54d1e12b726f517d3e2135ca3125a437b6d998caf1962961f7172d9e"
|
||||
checksum = "ab30e97ab6aacfe635fad58f22c2bb06c8b685f7421eb1e064a729e2a5f481fa"
|
||||
dependencies = [
|
||||
"futures-channel",
|
||||
"futures-core",
|
||||
@ -1930,9 +1930,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "futures-channel"
|
||||
version = "0.3.21"
|
||||
version = "0.3.23"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c3083ce4b914124575708913bca19bfe887522d6e2e6d0952943f5eac4a74010"
|
||||
checksum = "2bfc52cbddcfd745bf1740338492bb0bd83d76c67b445f91c5fb29fae29ecaa1"
|
||||
dependencies = [
|
||||
"futures-core",
|
||||
"futures-sink",
|
||||
@ -1940,15 +1940,15 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "futures-core"
|
||||
version = "0.3.21"
|
||||
version = "0.3.23"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0c09fd04b7e4073ac7156a9539b57a484a8ea920f79c7c675d05d289ab6110d3"
|
||||
checksum = "d2acedae88d38235936c3922476b10fced7b2b68136f5e3c03c2d5be348a1115"
|
||||
|
||||
[[package]]
|
||||
name = "futures-executor"
|
||||
version = "0.3.21"
|
||||
version = "0.3.23"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9420b90cfa29e327d0429f19be13e7ddb68fa1cccb09d65e5706b8c7a749b8a6"
|
||||
checksum = "1d11aa21b5b587a64682c0094c2bdd4df0076c5324961a40cc3abd7f37930528"
|
||||
dependencies = [
|
||||
"futures-core",
|
||||
"futures-task",
|
||||
@ -1969,9 +1969,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "futures-io"
|
||||
version = "0.3.21"
|
||||
version = "0.3.23"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fc4045962a5a5e935ee2fdedaa4e08284547402885ab326734432bed5d12966b"
|
||||
checksum = "93a66fc6d035a26a3ae255a6d2bca35eda63ae4c5512bef54449113f7a1228e5"
|
||||
|
||||
[[package]]
|
||||
name = "futures-lite"
|
||||
@ -2001,9 +2001,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "futures-macro"
|
||||
version = "0.3.21"
|
||||
version = "0.3.23"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "33c1e13800337f4d4d7a316bf45a567dbcb6ffe087f16424852d97e97a91f512"
|
||||
checksum = "0db9cce532b0eae2ccf2766ab246f114b56b9cf6d445e00c2549fbc100ca045d"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
@ -2012,15 +2012,15 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "futures-sink"
|
||||
version = "0.3.21"
|
||||
version = "0.3.23"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "21163e139fa306126e6eedaf49ecdb4588f939600f0b1e770f4205ee4b7fa868"
|
||||
checksum = "ca0bae1fe9752cf7fd9b0064c674ae63f97b37bc714d745cbde0afb7ec4e6765"
|
||||
|
||||
[[package]]
|
||||
name = "futures-task"
|
||||
version = "0.3.21"
|
||||
version = "0.3.23"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "57c66a976bf5909d801bbef33416c41372779507e7a6b3a5e25e4749c58f776a"
|
||||
checksum = "842fc63b931f4056a24d59de13fb1272134ce261816e063e634ad0c15cdc5306"
|
||||
|
||||
[[package]]
|
||||
name = "futures-timer"
|
||||
@ -2030,9 +2030,9 @@ checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c"
|
||||
|
||||
[[package]]
|
||||
name = "futures-util"
|
||||
version = "0.3.21"
|
||||
version = "0.3.23"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d8b7abd5d659d9b90c8cba917f6ec750a74e2dc23902ef9cd4cc8c8b22e6036a"
|
||||
checksum = "f0828a5471e340229c11c77ca80017937ce3c58cb788a17e5f1c2d5c485a9577"
|
||||
dependencies = [
|
||||
"futures-channel",
|
||||
"futures-core",
|
||||
@ -3646,11 +3646,12 @@ dependencies = [
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "redis-cell-client"
|
||||
name = "redis-rate-limit"
|
||||
version = "0.2.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"bb8-redis",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@ -5336,7 +5337,7 @@ dependencies = [
|
||||
"proctitle",
|
||||
"prometheus-client",
|
||||
"rand",
|
||||
"redis-cell-client",
|
||||
"redis-rate-limit",
|
||||
"regex",
|
||||
"reqwest",
|
||||
"rustc-hash",
|
||||
|
@ -1,10 +1,10 @@
|
||||
[workspace]
|
||||
members = [
|
||||
"entities",
|
||||
"migration",
|
||||
"fifomap",
|
||||
"linkedhashmap",
|
||||
"redis-cell-client",
|
||||
"migration",
|
||||
"redis-rate-limit",
|
||||
"web3_proxy",
|
||||
]
|
||||
|
||||
|
@ -2,7 +2,7 @@
|
||||
name = "linkedhashmap"
|
||||
version = "0.2.0"
|
||||
authors = ["quininer <quininer@live.com>"]
|
||||
edition = "2018"
|
||||
edition = "2021"
|
||||
|
||||
[features]
|
||||
inline-more = [ "hashbrown" ]
|
||||
|
@ -1,130 +0,0 @@
|
||||
//#![warn(missing_docs)]
|
||||
|
||||
use bb8_redis::redis::cmd;
|
||||
|
||||
pub use bb8_redis::redis::RedisError;
|
||||
pub use bb8_redis::{bb8, RedisConnectionManager};
|
||||
|
||||
use std::ops::Add;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
pub type RedisPool = bb8::Pool<RedisConnectionManager>;
|
||||
|
||||
pub struct RedisCell {
|
||||
pool: RedisPool,
|
||||
key: String,
|
||||
default_max_burst: u32,
|
||||
default_count_per_period: u32,
|
||||
default_period: u32,
|
||||
}
|
||||
|
||||
pub enum ThrottleResult {
|
||||
Allowed,
|
||||
RetryAt(Instant),
|
||||
}
|
||||
|
||||
impl RedisCell {
|
||||
// todo: seems like this could be derived
|
||||
// TODO: take something generic for conn
|
||||
// TODO: use r2d2 for connection pooling?
|
||||
pub fn new(
|
||||
pool: RedisPool,
|
||||
app: &str,
|
||||
key: &str,
|
||||
default_max_burst: u32,
|
||||
default_count_per_period: u32,
|
||||
default_period: u32,
|
||||
) -> Self {
|
||||
let key = format!("{}:redis-cell:{}", app, key);
|
||||
|
||||
Self {
|
||||
pool,
|
||||
key,
|
||||
default_max_burst,
|
||||
default_count_per_period,
|
||||
default_period,
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
async fn _throttle(
|
||||
&self,
|
||||
key: &str,
|
||||
max_burst: Option<u32>,
|
||||
count_per_period: Option<u32>,
|
||||
period: Option<u32>,
|
||||
quantity: u32,
|
||||
) -> anyhow::Result<ThrottleResult> {
|
||||
let mut conn = self.pool.get().await?;
|
||||
|
||||
let count_per_period = count_per_period.unwrap_or(self.default_count_per_period);
|
||||
|
||||
if count_per_period == 0 {
|
||||
return Ok(ThrottleResult::Allowed);
|
||||
}
|
||||
|
||||
let max_burst = max_burst.unwrap_or(self.default_max_burst);
|
||||
let period = period.unwrap_or(self.default_period);
|
||||
|
||||
/*
|
||||
https://github.com/brandur/redis-cell#response
|
||||
|
||||
CL.THROTTLE <key> <max_burst> <count per period> <period> [<quantity>]
|
||||
|
||||
0. Whether the action was limited:
|
||||
0 indicates the action is allowed.
|
||||
1 indicates that the action was limited/blocked.
|
||||
1. The total limit of the key (max_burst + 1). This is equivalent to the common X-RateLimit-Limit HTTP header.
|
||||
2. The remaining limit of the key. Equivalent to X-RateLimit-Remaining.
|
||||
3. The number of seconds until the user should retry, and always -1 if the action was allowed. Equivalent to Retry-After.
|
||||
4. The number of seconds until the limit will reset to its maximum capacity. Equivalent to X-RateLimit-Reset.
|
||||
*/
|
||||
// TODO: don't unwrap. maybe return anyhow::Result<Result<(), Duration>>
|
||||
// TODO: should we return more error info?
|
||||
let x: Vec<isize> = cmd("CL.THROTTLE")
|
||||
.arg(&(key, max_burst, count_per_period, period, quantity))
|
||||
.query_async(&mut *conn)
|
||||
.await?;
|
||||
|
||||
// TODO: trace log the result?
|
||||
|
||||
if x.len() != 5 {
|
||||
return Err(anyhow::anyhow!("unexpected redis result"));
|
||||
}
|
||||
|
||||
let retry_after = *x.get(3).expect("index exists above");
|
||||
|
||||
if retry_after == -1 {
|
||||
Ok(ThrottleResult::Allowed)
|
||||
} else {
|
||||
// don't return a duration, return an instant
|
||||
let retry_at = Instant::now().add(Duration::from_secs(retry_after as u64));
|
||||
|
||||
Ok(ThrottleResult::RetryAt(retry_at))
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub async fn throttle(&self) -> anyhow::Result<ThrottleResult> {
|
||||
self._throttle(&self.key, None, None, None, 1).await
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub async fn throttle_key(
|
||||
&self,
|
||||
key: &str,
|
||||
max_burst: Option<u32>,
|
||||
count_per_period: Option<u32>,
|
||||
period: Option<u32>,
|
||||
) -> anyhow::Result<ThrottleResult> {
|
||||
let key = format!("{}:{}", self.key, key);
|
||||
|
||||
self._throttle(key.as_ref(), max_burst, count_per_period, period, 1)
|
||||
.await
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub async fn throttle_quantity(&self, quantity: u32) -> anyhow::Result<ThrottleResult> {
|
||||
self._throttle(&self.key, None, None, None, quantity).await
|
||||
}
|
||||
}
|
@ -1,18 +0,0 @@
|
||||
# A redis server with the libredis_cell module installed
|
||||
FROM rust:1-bullseye as builder
|
||||
|
||||
WORKDIR /usr/src/redis-cell
|
||||
RUN \
|
||||
--mount=type=cache,target=/usr/local/cargo/registry \
|
||||
--mount=type=cache,target=/usr/src/web3_proxy/target \
|
||||
{ \
|
||||
set -eux; \
|
||||
git clone -b v0.3.0 https://github.com/brandur/redis-cell .; \
|
||||
cargo build --release; \
|
||||
}
|
||||
|
||||
FROM redis:bullseye
|
||||
|
||||
COPY --from=builder /usr/src/redis-cell/target/release/libredis_cell.so /usr/lib/redis/modules/
|
||||
|
||||
CMD ["redis-server", "--loadmodule", "/usr/lib/redis/modules/libredis_cell.so"]
|
@ -1,9 +1,10 @@
|
||||
[package]
|
||||
name = "redis-cell-client"
|
||||
name = "redis-rate-limit"
|
||||
version = "0.2.0"
|
||||
authors = ["Bryan Stitt <bryan@stitthappens.com>"]
|
||||
edition = "2018"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
anyhow = "1.0.61"
|
||||
bb8-redis = "0.11.0"
|
||||
tracing = "0.1.36"
|
17
redis-rate-limit/src/errors.rs
Normal file
17
redis-rate-limit/src/errors.rs
Normal file
@ -0,0 +1,17 @@
|
||||
pub use bb8_redis::bb8::ErrorSink as Bb8ErrorSync;
|
||||
pub use bb8_redis::redis::RedisError;
|
||||
|
||||
use tracing::warn;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct RedisErrorSink;
|
||||
|
||||
impl Bb8ErrorSync<RedisError> for RedisErrorSink {
|
||||
fn sink(&self, err: RedisError) {
|
||||
warn!(?err, "redis error");
|
||||
}
|
||||
|
||||
fn boxed_clone(&self) -> Box<dyn Bb8ErrorSync<RedisError>> {
|
||||
Box::new(self.clone())
|
||||
}
|
||||
}
|
100
redis-rate-limit/src/lib.rs
Normal file
100
redis-rate-limit/src/lib.rs
Normal file
@ -0,0 +1,100 @@
|
||||
//#![warn(missing_docs)]
|
||||
mod errors;
|
||||
|
||||
use anyhow::Context;
|
||||
use bb8_redis::redis::pipe;
|
||||
use std::ops::Add;
|
||||
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
|
||||
|
||||
pub use crate::errors::{RedisError, RedisErrorSink};
|
||||
pub use bb8_redis::{bb8, RedisConnectionManager};
|
||||
|
||||
pub type RedisPool = bb8::Pool<RedisConnectionManager>;
|
||||
|
||||
pub struct RedisRateLimit {
|
||||
pool: RedisPool,
|
||||
key_prefix: String,
|
||||
default_max_per_period: u64,
|
||||
period: u64,
|
||||
}
|
||||
|
||||
pub enum ThrottleResult {
|
||||
Allowed,
|
||||
RetryAt(Instant),
|
||||
RetryNever,
|
||||
}
|
||||
|
||||
impl RedisRateLimit {
|
||||
pub fn new(
|
||||
pool: RedisPool,
|
||||
app: &str,
|
||||
label: &str,
|
||||
default_max_per_period: u64,
|
||||
period: u64,
|
||||
) -> Self {
|
||||
let key_prefix = format!("{}:rrl:{}", app, label);
|
||||
|
||||
Self {
|
||||
pool,
|
||||
key_prefix,
|
||||
default_max_per_period,
|
||||
period,
|
||||
}
|
||||
}
|
||||
|
||||
/// label might be an ip address or a user_key id
|
||||
pub async fn throttle_label(
|
||||
&self,
|
||||
label: &str,
|
||||
max_per_period: Option<u64>,
|
||||
count: u64,
|
||||
) -> anyhow::Result<ThrottleResult> {
|
||||
let max_per_period = max_per_period.unwrap_or(self.default_max_per_period);
|
||||
|
||||
if max_per_period == 0 {
|
||||
return Ok(ThrottleResult::RetryNever);
|
||||
}
|
||||
|
||||
let now = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.context("cannot tell the time")?
|
||||
.as_secs();
|
||||
|
||||
let period_id = now % self.period;
|
||||
|
||||
let throttle_key = format!("{}:{}:{}", self.key_prefix, label, period_id);
|
||||
|
||||
let mut conn = self.pool.get().await?;
|
||||
|
||||
let x: Vec<u64> = pipe()
|
||||
// we could get the key first, but that means an extra redis call for every check. this seems better
|
||||
.incr(&throttle_key, count)
|
||||
// set expiration the first time we set the key. ignore the result
|
||||
.expire(&throttle_key, self.period.try_into().unwrap())
|
||||
.arg("NX")
|
||||
.ignore()
|
||||
// do the query
|
||||
.query_async(&mut *conn)
|
||||
.await
|
||||
.context("increment rate limit")?;
|
||||
|
||||
let new_count = x
|
||||
.first()
|
||||
.ok_or_else(|| anyhow::anyhow!("check rate limit result"))?;
|
||||
|
||||
if new_count > &max_per_period {
|
||||
let seconds_left_in_period = self.period - now / self.period;
|
||||
|
||||
let retry_at = Instant::now().add(Duration::from_secs(seconds_left_in_period));
|
||||
|
||||
return Ok(ThrottleResult::RetryAt(retry_at));
|
||||
}
|
||||
|
||||
Ok(ThrottleResult::Allowed)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub async fn throttle(&self) -> anyhow::Result<ThrottleResult> {
|
||||
self.throttle_label("", None, 1).await
|
||||
}
|
||||
}
|
@ -12,9 +12,9 @@ deadlock_detection = ["parking_lot/deadlock_detection"]
|
||||
verbose_db = ["sea-orm/debug-print"]
|
||||
|
||||
[dependencies]
|
||||
redis-cell-client = { path = "../redis-cell-client" }
|
||||
entities = { path = "../entities" }
|
||||
migration = { path = "../migration" }
|
||||
redis-rate-limit = { path = "../redis-rate-limit" }
|
||||
|
||||
anyhow = { version = "1.0.61", features = ["backtrace"] }
|
||||
arc-swap = "1.5.1"
|
||||
@ -28,7 +28,7 @@ dotenv = "0.15.0"
|
||||
ethers = { version = "0.17.0", features = ["rustls", "ws"] }
|
||||
fdlimit = "0.2.1"
|
||||
flume = "0.10.14"
|
||||
futures = { version = "0.3.21", features = ["thread-pool"] }
|
||||
futures = { version = "0.3.23", features = ["thread-pool"] }
|
||||
hashbrown = { version = "0.12.3", features = ["serde"] }
|
||||
indexmap = "1.9.1"
|
||||
fifomap = { path = "../fifomap" }
|
||||
|
@ -15,8 +15,10 @@ use futures::stream::StreamExt;
|
||||
use futures::Future;
|
||||
use migration::{Migrator, MigratorTrait};
|
||||
use parking_lot::RwLock;
|
||||
use redis_cell_client::bb8::ErrorSink;
|
||||
use redis_cell_client::{bb8, RedisCell, RedisConnectionManager, RedisPool};
|
||||
use redis_rate_limit::{
|
||||
bb8::{self, ErrorSink},
|
||||
RedisConnectionManager, RedisErrorSink, RedisPool, RedisRateLimit,
|
||||
};
|
||||
use sea_orm::DatabaseConnection;
|
||||
use serde_json::json;
|
||||
use std::fmt;
|
||||
@ -32,7 +34,6 @@ use tokio_stream::wrappers::{BroadcastStream, WatchStream};
|
||||
use tracing::{info, info_span, instrument, trace, warn, Instrument};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::bb8_helpers;
|
||||
use crate::block_helpers::block_needed;
|
||||
use crate::config::{AppConfig, TopConfig};
|
||||
use crate::connections::Web3Connections;
|
||||
@ -71,7 +72,7 @@ pub enum TxState {
|
||||
pub struct UserCacheValue {
|
||||
pub expires_at: Instant,
|
||||
pub user_id: u64,
|
||||
pub user_count_per_period: u32,
|
||||
pub user_count_per_period: u64,
|
||||
}
|
||||
|
||||
/// flatten a JoinError into an anyhow error
|
||||
@ -143,7 +144,7 @@ pub struct Web3ProxyApp {
|
||||
pub config: AppConfig,
|
||||
pub db_conn: Option<sea_orm::DatabaseConnection>,
|
||||
pub pending_transactions: Arc<DashMap<TxHash, TxState>>,
|
||||
pub rate_limiter: Option<RedisCell>,
|
||||
pub rate_limiter: Option<RedisRateLimit>,
|
||||
pub redis_pool: Option<RedisPool>,
|
||||
pub stats: AppStats,
|
||||
pub user_cache: RwLock<FifoCountMap<Uuid, UserCacheValue>>,
|
||||
@ -217,7 +218,7 @@ impl Web3ProxyApp {
|
||||
// TODO: min_idle?
|
||||
// TODO: set max_size based on max expected concurrent connections? set based on num_workers?
|
||||
let builder = bb8::Pool::builder()
|
||||
.error_sink(bb8_helpers::RedisErrorSink.boxed_clone())
|
||||
.error_sink(RedisErrorSink.boxed_clone())
|
||||
.min_idle(Some(min_size))
|
||||
.max_size(max_size);
|
||||
|
||||
@ -284,16 +285,11 @@ impl Web3ProxyApp {
|
||||
private_rpcs
|
||||
};
|
||||
|
||||
// TODO: how much should we allow?
|
||||
// TODO: im seeing errors in redis. just use the redis FAQ on rate limiting. its really simple
|
||||
let public_max_burst = top_config.app.public_rate_limit_per_minute / 3;
|
||||
|
||||
let frontend_rate_limiter = redis_pool.as_ref().map(|redis_pool| {
|
||||
RedisCell::new(
|
||||
RedisRateLimit::new(
|
||||
redis_pool.clone(),
|
||||
"web3_proxy",
|
||||
"frontend",
|
||||
public_max_burst,
|
||||
top_config.app.public_rate_limit_per_minute,
|
||||
60,
|
||||
)
|
||||
|
@ -1,17 +0,0 @@
|
||||
// TODO: move this into redis-cell-client
|
||||
|
||||
use redis_cell_client::bb8;
|
||||
use tracing::warn;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct RedisErrorSink;
|
||||
|
||||
impl bb8::ErrorSink<redis_cell_client::RedisError> for RedisErrorSink {
|
||||
fn sink(&self, err: redis_cell_client::RedisError) {
|
||||
warn!(?err, "redis error");
|
||||
}
|
||||
|
||||
fn boxed_clone(&self) -> Box<dyn bb8::ErrorSink<redis_cell_client::RedisError>> {
|
||||
Box::new(self.clone())
|
||||
}
|
||||
}
|
@ -145,7 +145,7 @@ fn main() -> anyhow::Result<()> {
|
||||
let cli_config: CliConfig = argh::from_env();
|
||||
|
||||
// advanced configuration is on disk
|
||||
info!("Loading rpc config @ {}", cli_config.config);
|
||||
info!("Loading config @ {}", cli_config.config);
|
||||
let top_config: String = fs::read_to_string(cli_config.config.clone())?;
|
||||
let top_config: TopConfig = toml::from_str(&top_config)?;
|
||||
|
||||
|
@ -1,16 +1,25 @@
|
||||
use argh::FromArgs;
|
||||
use std::fs;
|
||||
use tracing::info;
|
||||
use web3_proxy::config::TopConfig;
|
||||
|
||||
#[derive(FromArgs, PartialEq, Eq, Debug)]
|
||||
/// Second subcommand.
|
||||
#[argh(subcommand, name = "check_config")]
|
||||
pub struct CheckConfigSubCommand {
|
||||
#[argh(switch)]
|
||||
/// whether to fooey
|
||||
fooey: bool,
|
||||
#[argh(option)]
|
||||
/// path to the configuration toml.
|
||||
path: String,
|
||||
}
|
||||
|
||||
impl CheckConfigSubCommand {
|
||||
pub async fn main(self) -> anyhow::Result<()> {
|
||||
todo!()
|
||||
info!("Loading config @ {}", self.path);
|
||||
let top_config: String = fs::read_to_string(self.path)?;
|
||||
let top_config: TopConfig = toml::from_str(&top_config)?;
|
||||
|
||||
info!("config: {:?}", top_config);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
@ -46,7 +46,7 @@ pub struct AppConfig {
|
||||
pub db_url: Option<String>,
|
||||
pub redis_url: Option<String>,
|
||||
#[serde(default = "default_public_rate_limit_per_minute")]
|
||||
pub public_rate_limit_per_minute: u32,
|
||||
pub public_rate_limit_per_minute: u64,
|
||||
#[serde(default = "default_response_cache_max_bytes")]
|
||||
pub response_cache_max_bytes: usize,
|
||||
/// the stats page url for an anonymous user.
|
||||
@ -55,7 +55,7 @@ pub struct AppConfig {
|
||||
pub redirect_user_url: String,
|
||||
}
|
||||
|
||||
fn default_public_rate_limit_per_minute() -> u32 {
|
||||
fn default_public_rate_limit_per_minute() -> u64 {
|
||||
0
|
||||
}
|
||||
|
||||
@ -69,7 +69,7 @@ fn default_response_cache_max_bytes() -> usize {
|
||||
pub struct Web3ConnectionConfig {
|
||||
url: String,
|
||||
soft_limit: u32,
|
||||
hard_limit: Option<u32>,
|
||||
hard_limit: Option<u64>,
|
||||
weight: u32,
|
||||
}
|
||||
|
||||
@ -80,14 +80,14 @@ impl Web3ConnectionConfig {
|
||||
pub async fn spawn(
|
||||
self,
|
||||
name: String,
|
||||
redis_client_pool: Option<redis_cell_client::RedisPool>,
|
||||
redis_pool: Option<redis_rate_limit::RedisPool>,
|
||||
chain_id: u64,
|
||||
http_client: Option<reqwest::Client>,
|
||||
http_interval_sender: Option<Arc<broadcast::Sender<()>>>,
|
||||
block_sender: Option<flume::Sender<BlockAndRpc>>,
|
||||
tx_id_sender: Option<flume::Sender<(TxHash, Arc<Web3Connection>)>>,
|
||||
) -> anyhow::Result<(Arc<Web3Connection>, AnyhowJoinHandle<()>)> {
|
||||
let hard_limit = match (self.hard_limit, redis_client_pool) {
|
||||
let hard_limit = match (self.hard_limit, redis_pool) {
|
||||
(None, None) => None,
|
||||
(Some(hard_limit), Some(redis_client_pool)) => Some((hard_limit, redis_client_pool)),
|
||||
(None, Some(_)) => None,
|
||||
|
@ -5,7 +5,7 @@ use ethers::prelude::{Block, Bytes, Middleware, ProviderError, TxHash, H256, U64
|
||||
use futures::future::try_join_all;
|
||||
use futures::StreamExt;
|
||||
use parking_lot::RwLock;
|
||||
use redis_cell_client::{RedisCell, ThrottleResult};
|
||||
use redis_rate_limit::{RedisPool, RedisRateLimit, ThrottleResult};
|
||||
use serde::ser::{SerializeStruct, Serializer};
|
||||
use serde::Serialize;
|
||||
use std::fmt;
|
||||
@ -82,7 +82,7 @@ pub struct Web3Connection {
|
||||
/// it is an async lock because we hold it open across awaits
|
||||
provider: AsyncRwLock<Option<Arc<Web3Provider>>>,
|
||||
/// rate limits are stored in a central redis so that multiple proxies can share their rate limits
|
||||
hard_limit: Option<redis_cell_client::RedisCell>,
|
||||
hard_limit: Option<RedisRateLimit>,
|
||||
/// used for load balancing to the least loaded server
|
||||
pub soft_limit: u32,
|
||||
block_data_limit: AtomicU64,
|
||||
@ -102,7 +102,8 @@ impl Web3Connection {
|
||||
// optional because this is only used for http providers. websocket providers don't use it
|
||||
http_client: Option<reqwest::Client>,
|
||||
http_interval_sender: Option<Arc<broadcast::Sender<()>>>,
|
||||
hard_limit: Option<(u32, redis_cell_client::RedisPool)>,
|
||||
// TODO: have a builder struct for this.
|
||||
hard_limit: Option<(u64, RedisPool)>,
|
||||
// TODO: think more about this type
|
||||
soft_limit: u32,
|
||||
block_sender: Option<flume::Sender<BlockAndRpc>>,
|
||||
@ -113,12 +114,11 @@ impl Web3Connection {
|
||||
let hard_limit = hard_limit.map(|(hard_rate_limit, redis_conection)| {
|
||||
// TODO: allow configurable period and max_burst
|
||||
let period = 1;
|
||||
RedisCell::new(
|
||||
RedisRateLimit::new(
|
||||
redis_conection,
|
||||
"web3_proxy",
|
||||
&format!("{}:{}", chain_id, url_str),
|
||||
hard_rate_limit,
|
||||
hard_rate_limit,
|
||||
period,
|
||||
)
|
||||
});
|
||||
@ -614,6 +614,9 @@ impl Web3Connection {
|
||||
|
||||
return Ok(HandleResult::RetryAt(retry_at.into()));
|
||||
}
|
||||
Ok(ThrottleResult::RetryNever) => {
|
||||
return Err(anyhow::anyhow!("Rate limit failed."));
|
||||
}
|
||||
Err(err) => {
|
||||
return Err(err);
|
||||
}
|
||||
|
@ -137,7 +137,7 @@ impl Web3Connections {
|
||||
chain_id: u64,
|
||||
server_configs: HashMap<String, Web3ConnectionConfig>,
|
||||
http_client: Option<reqwest::Client>,
|
||||
redis_client_pool: Option<redis_cell_client::RedisPool>,
|
||||
redis_client_pool: Option<redis_rate_limit::RedisPool>,
|
||||
head_block_sender: Option<watch::Sender<Arc<Block<TxHash>>>>,
|
||||
pending_tx_sender: Option<broadcast::Sender<TxState>>,
|
||||
pending_transactions: Arc<DashMap<TxHash, TxState>>,
|
||||
|
@ -1,6 +1,6 @@
|
||||
use axum::response::Response;
|
||||
use entities::user_keys;
|
||||
use redis_cell_client::ThrottleResult;
|
||||
use redis_rate_limit::ThrottleResult;
|
||||
use reqwest::StatusCode;
|
||||
use sea_orm::{
|
||||
ColumnTrait, DeriveColumn, EntityTrait, EnumIter, IdenStatic, QueryFilter, QuerySelect,
|
||||
@ -52,21 +52,24 @@ impl RateLimitResult {
|
||||
|
||||
impl Web3ProxyApp {
|
||||
pub async fn rate_limit_by_ip(&self, ip: IpAddr) -> anyhow::Result<RateLimitResult> {
|
||||
let rate_limiter_key = format!("ip-{}", ip);
|
||||
let rate_limiter_label = format!("ip-{}", ip);
|
||||
|
||||
// TODO: dry this up with rate_limit_by_key
|
||||
if let Some(rate_limiter) = &self.rate_limiter {
|
||||
match rate_limiter
|
||||
.throttle_key(&rate_limiter_key, None, None, None)
|
||||
.throttle_label(&rate_limiter_label, None, 1)
|
||||
.await
|
||||
{
|
||||
Ok(ThrottleResult::Allowed) => {}
|
||||
Ok(ThrottleResult::RetryAt(_retry_at)) => {
|
||||
// TODO: set headers so they know when they can retry
|
||||
debug!(?rate_limiter_key, "rate limit exceeded"); // this is too verbose, but a stat might be good
|
||||
// TODO: use their id if possible
|
||||
debug!(?rate_limiter_label, "rate limit exceeded"); // this is too verbose, but a stat might be good
|
||||
// TODO: use their id if possible
|
||||
return Ok(RateLimitResult::IpRateLimitExceeded(ip));
|
||||
}
|
||||
Ok(ThrottleResult::RetryNever) => {
|
||||
return Err(anyhow::anyhow!("blocked by rate limiter"))
|
||||
}
|
||||
Err(err) => {
|
||||
// internal error, not rate limit being hit
|
||||
// TODO: i really want axum to do this for us in a single place.
|
||||
@ -148,16 +151,11 @@ impl Web3ProxyApp {
|
||||
|
||||
// user key is valid. now check rate limits
|
||||
if let Some(rate_limiter) = &self.rate_limiter {
|
||||
// TODO: how does max burst actually work? what should it be?
|
||||
let user_max_burst = user_data.user_count_per_period / 3;
|
||||
let user_period = 60;
|
||||
|
||||
if rate_limiter
|
||||
.throttle_key(
|
||||
.throttle_label(
|
||||
&user_key.to_string(),
|
||||
Some(user_max_burst),
|
||||
Some(user_data.user_count_per_period),
|
||||
Some(user_period),
|
||||
1,
|
||||
)
|
||||
.await
|
||||
.is_err()
|
||||
|
@ -1,5 +1,4 @@
|
||||
pub mod app;
|
||||
pub mod bb8_helpers;
|
||||
pub mod block_helpers;
|
||||
pub mod config;
|
||||
pub mod connection;
|
||||
|
Loading…
Reference in New Issue
Block a user