switch to deadpool

This commit is contained in:
Bryan Stitt 2022-09-14 06:18:13 +00:00
parent 789672be43
commit 1730b8c7a5
12 changed files with 144 additions and 139 deletions

66
Cargo.lock generated
View File

@ -569,30 +569,6 @@ version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a32fd6af2b5827bce66c29053ba0e7c42b9dcab01835835058558c10851a46b" checksum = "8a32fd6af2b5827bce66c29053ba0e7c42b9dcab01835835058558c10851a46b"
[[package]]
name = "bb8"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1627eccf3aa91405435ba240be23513eeca466b5dc33866422672264de061582"
dependencies = [
"async-trait",
"futures-channel",
"futures-util",
"parking_lot 0.12.1",
"tokio",
]
[[package]]
name = "bb8-redis"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b081e2864000416c5a4543fed63fb8dd979c4a0806b071fddab0e548c6cd4c74"
dependencies = [
"async-trait",
"bb8",
"redis",
]
[[package]] [[package]]
name = "bech32" name = "bech32"
version = "0.7.3" version = "0.7.3"
@ -1246,6 +1222,40 @@ dependencies = [
"parking_lot_core 0.9.3", "parking_lot_core 0.9.3",
] ]
[[package]]
name = "deadpool"
version = "0.9.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "421fe0f90f2ab22016f32a9881be5134fdd71c65298917084b0c7477cbc3856e"
dependencies = [
"async-trait",
"deadpool-runtime",
"num_cpus",
"retain_mut",
"serde",
"tokio",
]
[[package]]
name = "deadpool-redis"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a62ebf187bc30bfc1a14bed4073912b988551d111208fe800b27c32df282481"
dependencies = [
"deadpool",
"redis",
"serde",
]
[[package]]
name = "deadpool-runtime"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eaa37046cc0f6c3cc6090fbdbf73ef0b8ef4cfcc37f6befc0020f63e8cf121e1"
dependencies = [
"tokio",
]
[[package]] [[package]]
name = "der" name = "der"
version = "0.5.1" version = "0.5.1"
@ -3746,7 +3756,7 @@ name = "redis-rate-limit"
version = "0.2.0" version = "0.2.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"bb8-redis", "deadpool-redis",
"tokio", "tokio",
"tracing", "tracing",
] ]
@ -3845,6 +3855,12 @@ dependencies = [
"winreg", "winreg",
] ]
[[package]]
name = "retain_mut"
version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4389f1d5789befaf6029ebd9f7dac4af7f7e3d61b69d4f30e2ac02b57e7712b0"
[[package]] [[package]]
name = "rfc6979" name = "rfc6979"
version = "0.3.0" version = "0.3.0"

View File

@ -6,8 +6,6 @@ edition = "2021"
[dependencies] [dependencies]
anyhow = "1.0.65" anyhow = "1.0.65"
bb8-redis = "0.11.0" deadpool-redis = { version = "0.10.2", features = ["rt_tokio_1", "serde"] }
tracing = "0.1.36" tracing = "0.1.36"
tokio = {version = "*" }
# TODO: i'd prefer not to require tokio here, but we use tokio::time
tokio = "1.21.1"

View File

@ -1,17 +1,17 @@
pub use bb8_redis::bb8::ErrorSink as Bb8ErrorSync; // pub use bb8_redis::bb8::ErrorSink as Bb8ErrorSync;
pub use bb8_redis::redis::RedisError; // pub use bb8_redis::redis::RedisError;
use tracing::error; // use tracing::error;
#[derive(Debug, Clone)] // #[derive(Debug, Clone)]
pub struct RedisErrorSink; // pub struct RedisErrorSink;
impl Bb8ErrorSync<RedisError> for RedisErrorSink { // impl Bb8ErrorSync<RedisError> for RedisErrorSink {
fn sink(&self, err: RedisError) { // fn sink(&self, err: RedisError) {
error!(?err, "redis error"); // error!(?err, "redis error");
} // }
fn boxed_clone(&self) -> Box<dyn Bb8ErrorSync<RedisError>> { // fn boxed_clone(&self) -> Box<dyn Bb8ErrorSync<RedisError>> {
Box::new(self.clone()) // Box::new(self.clone())
} // }
} // }

View File

@ -2,19 +2,20 @@
mod errors; mod errors;
use anyhow::Context; use anyhow::Context;
use bb8_redis::redis::pipe; use deadpool_redis::redis::pipe;
use std::ops::Add; use std::ops::Add;
use std::time::{SystemTime, UNIX_EPOCH}; use std::time::{SystemTime, UNIX_EPOCH};
use tokio::time::{Duration, Instant}; use tokio::time::{Duration, Instant};
use tracing::{debug, trace}; use tracing::{debug, trace};
pub use crate::errors::{RedisError, RedisErrorSink}; pub use deadpool_redis::redis;
pub use bb8_redis::{bb8, redis, RedisConnectionManager}; pub use deadpool_redis::{Config, Connection, Manager, Pool, Runtime};
pub type RedisPool = bb8::Pool<RedisConnectionManager>; // pub use crate::errors::{RedisError, RedisErrorSink};
// pub use bb8_redis::{bb8, redis, RedisConnectionManager};
pub struct RedisRateLimit { pub struct RedisRateLimit {
pool: RedisPool, pool: Pool,
key_prefix: String, key_prefix: String,
/// The default maximum requests allowed in a period. /// The default maximum requests allowed in a period.
max_requests_per_period: u64, max_requests_per_period: u64,
@ -30,7 +31,7 @@ pub enum ThrottleResult {
impl RedisRateLimit { impl RedisRateLimit {
pub fn new( pub fn new(
pool: RedisPool, pool: Pool,
app: &str, app: &str,
label: &str, label: &str,
max_requests_per_period: u64, max_requests_per_period: u64,

View File

@ -24,11 +24,7 @@ use hashbrown::HashMap;
use metered::{metered, ErrorCount, HitCount, InFlight, ResponseTime, Throughput}; use metered::{metered, ErrorCount, HitCount, InFlight, ResponseTime, Throughput};
use migration::{Migrator, MigratorTrait}; use migration::{Migrator, MigratorTrait};
use moka::future::Cache; use moka::future::Cache;
use redis_rate_limit::bb8::PooledConnection; use redis_rate_limit::{Config as RedisConfig, Pool as RedisPool, RedisRateLimit, Runtime};
use redis_rate_limit::{
bb8::{self, ErrorSink},
RedisConnectionManager, RedisErrorSink, RedisPool, RedisRateLimit,
};
use sea_orm::DatabaseConnection; use sea_orm::DatabaseConnection;
use serde::Serialize; use serde::Serialize;
use serde_json::json; use serde_json::json;
@ -152,7 +148,7 @@ pub async fn get_migrated_db(
impl Web3ProxyApp { impl Web3ProxyApp {
pub async fn spawn( pub async fn spawn(
top_config: TopConfig, top_config: TopConfig,
num_workers: u32, num_workers: usize,
) -> anyhow::Result<( ) -> anyhow::Result<(
Arc<Web3ProxyApp>, Arc<Web3ProxyApp>,
Pin<Box<dyn Future<Output = anyhow::Result<()>>>>, Pin<Box<dyn Future<Output = anyhow::Result<()>>>>,
@ -168,7 +164,10 @@ impl Web3ProxyApp {
// first, we connect to mysql and make sure the latest migrations have run // first, we connect to mysql and make sure the latest migrations have run
let db_conn = if let Some(db_url) = &top_config.app.db_url { let db_conn = if let Some(db_url) = &top_config.app.db_url {
let db_min_connections = top_config.app.db_min_connections.unwrap_or(num_workers); let db_min_connections = top_config
.app
.db_min_connections
.unwrap_or(num_workers as u32);
// TODO: what default multiple? // TODO: what default multiple?
let redis_max_connections = top_config let redis_max_connections = top_config
@ -212,26 +211,26 @@ impl Web3ProxyApp {
// TODO: scrub credentials and then include the redis_url in logs // TODO: scrub credentials and then include the redis_url in logs
info!("Connecting to redis"); info!("Connecting to redis");
let manager = RedisConnectionManager::new(redis_url.as_ref())?; // TODO: what is a good default?
let redis_min_connections =
top_config.app.redis_min_connections.unwrap_or(num_workers);
let redis_max_connections = top_config let redis_max_connections = top_config
.app .app
.redis_max_connections .redis_max_connections
.unwrap_or(redis_min_connections * 2); .unwrap_or(num_workers * 2);
// TODO: min_idle? // TODO: what are reasonable timeouts
// TODO: set max_size based on max expected concurrent connections? set based on num_workers? let redis_pool = RedisConfig::from_url(redis_url)
let builder = bb8::Pool::builder() .builder()?
.error_sink(RedisErrorSink.boxed_clone()) .create_timeout(Some(Duration::from_secs(2)))
.min_idle(Some(redis_min_connections)) .max_size(redis_max_connections)
.max_size(redis_max_connections); .recycle_timeout(Some(Duration::from_secs(2)))
.runtime(Runtime::Tokio1)
.wait_timeout(Some(Duration::from_secs(2)))
.build()?;
let pool = builder.build(manager).await?; // test the pool
redis_pool.get().await.context("Redis connection failed")?;
Some(pool) Some(redis_pool)
} }
None => { None => {
warn!("no redis connection"); warn!("no redis connection");
@ -611,7 +610,7 @@ impl Web3ProxyApp {
} }
#[instrument(skip_all)] #[instrument(skip_all)]
pub async fn redis_conn(&self) -> anyhow::Result<PooledConnection<RedisConnectionManager>> { pub async fn redis_conn(&self) -> anyhow::Result<redis_rate_limit::Connection> {
match self.redis_pool.as_ref() { match self.redis_pool.as_ref() {
None => Err(anyhow::anyhow!("no redis server configured")), None => Err(anyhow::anyhow!("no redis server configured")),
Some(redis_pool) => { Some(redis_pool) => {

View File

@ -64,7 +64,7 @@ fn run(
// start tokio's async runtime // start tokio's async runtime
let rt = rt_builder.build()?; let rt = rt_builder.build()?;
let num_workers = rt.metrics().num_workers() as u32; let num_workers = rt.metrics().num_workers();
debug!(?num_workers); debug!(?num_workers);
rt.block_on(async { rt.block_on(async {

View File

@ -64,12 +64,9 @@ pub struct AppConfig {
#[serde(default = "default_public_rate_limit_per_minute")] #[serde(default = "default_public_rate_limit_per_minute")]
pub public_rate_limit_per_minute: u64, pub public_rate_limit_per_minute: u64,
pub redis_url: Option<String>, pub redis_url: Option<String>,
/// minimum size of the connection pool for the cache
/// If none, the number of workers are used
pub redis_min_connections: Option<u32>,
/// maximum size of the connection pool for the cache /// maximum size of the connection pool for the cache
/// If none, the minimum * 2 is used /// If none, the minimum * 2 is used
pub redis_max_connections: Option<u32>, pub redis_max_connections: Option<usize>,
#[serde(default = "default_response_cache_max_bytes")] #[serde(default = "default_response_cache_max_bytes")]
pub response_cache_max_bytes: usize, pub response_cache_max_bytes: usize,
/// the stats page url for an anonymous user. /// the stats page url for an anonymous user.
@ -119,7 +116,7 @@ impl Web3ConnectionConfig {
pub async fn spawn( pub async fn spawn(
self, self,
name: String, name: String,
redis_pool: Option<redis_rate_limit::RedisPool>, redis_pool: Option<redis_rate_limit::Pool>,
chain_id: u64, chain_id: u64,
http_client: Option<reqwest::Client>, http_client: Option<reqwest::Client>,
http_interval_sender: Option<Arc<broadcast::Sender<()>>>, http_interval_sender: Option<Arc<broadcast::Sender<()>>>,

View File

@ -5,7 +5,7 @@ use axum::{
Json, Json,
}; };
use derive_more::From; use derive_more::From;
use redis_rate_limit::{bb8::RunError, RedisError}; use redis_rate_limit::redis::RedisError;
use sea_orm::DbErr; use sea_orm::DbErr;
use std::{error::Error, net::IpAddr}; use std::{error::Error, net::IpAddr};
use tokio::time::Instant; use tokio::time::Instant;
@ -18,8 +18,9 @@ pub type FrontendResult = Result<Response, FrontendErrorResponse>;
pub enum FrontendErrorResponse { pub enum FrontendErrorResponse {
Anyhow(anyhow::Error), Anyhow(anyhow::Error),
Box(Box<dyn Error>), Box(Box<dyn Error>),
// TODO: update these for Redis
Redis(RedisError), Redis(RedisError),
RedisRun(RunError<RedisError>), // RedisRun(RunError<RedisError>),
Response(Response), Response(Response),
Database(DbErr), Database(DbErr),
RateLimitedUser(u64, Option<Instant>), RateLimitedUser(u64, Option<Instant>),
@ -67,17 +68,17 @@ impl IntoResponse for FrontendErrorResponse {
), ),
) )
} }
Self::RedisRun(err) => { // Self::RedisRun(err) => {
warn!(?err, "redis run"); // warn!(?err, "redis run");
( // (
StatusCode::INTERNAL_SERVER_ERROR, // StatusCode::INTERNAL_SERVER_ERROR,
JsonRpcForwardedResponse::from_str( // JsonRpcForwardedResponse::from_str(
"redis run error!", // "redis run error!",
Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), // Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()),
None, // None,
), // ),
) // )
} // }
Self::Response(r) => { Self::Response(r) => {
debug_assert_ne!(r.status(), StatusCode::OK); debug_assert_ne!(r.status(), StatusCode::OK);
return r; return r;

View File

@ -172,56 +172,50 @@ impl Web3ProxyApp {
// user key is valid. now check rate limits // user key is valid. now check rate limits
// TODO: this is throwing errors when curve-api hits us with high concurrency. investigate i think its bb8's fault // TODO: this is throwing errors when curve-api hits us with high concurrency. investigate i think its bb8's fault
if false { if let Some(rate_limiter) = &self.frontend_rate_limiter {
if let Some(rate_limiter) = &self.frontend_rate_limiter { // TODO: query redis in the background so that users don't have to wait on this network request
// TODO: query redis in the background so that users don't have to wait on this network request // TODO: better key? have a prefix so its easy to delete all of these
// TODO: better key? have a prefix so its easy to delete all of these // TODO: we should probably hash this or something
// TODO: we should probably hash this or something let rate_limiter_label = format!("user-{}", user_key);
let rate_limiter_label = format!("user-{}", user_key);
match rate_limiter match rate_limiter
.throttle_label( .throttle_label(
&rate_limiter_label, &rate_limiter_label,
Some(user_data.user_count_per_period), Some(user_data.user_count_per_period),
1, 1,
) )
.await .await
{ {
Ok(ThrottleResult::Allowed) => { Ok(ThrottleResult::Allowed) => Ok(RateLimitResult::AllowedUser(user_data.user_id)),
Ok(RateLimitResult::AllowedUser(user_data.user_id)) Ok(ThrottleResult::RetryAt(retry_at)) => {
} // TODO: set headers so they know when they can retry
Ok(ThrottleResult::RetryAt(retry_at)) => { // TODO: debug or trace?
// TODO: set headers so they know when they can retry // this is too verbose, but a stat might be good
// TODO: debug or trace? trace!(
// this is too verbose, but a stat might be good ?rate_limiter_label,
trace!( "rate limit exceeded until {:?}",
?rate_limiter_label, retry_at
"rate limit exceeded until {:?}", );
retry_at Ok(RateLimitResult::RateLimitedUser(
); user_data.user_id,
Ok(RateLimitResult::RateLimitedUser( Some(retry_at),
user_data.user_id, ))
Some(retry_at), }
)) Ok(ThrottleResult::RetryNever) => {
} // TODO: i don't think we'll get here. maybe if we ban an IP forever? seems unlikely
Ok(ThrottleResult::RetryNever) => { debug!(?rate_limiter_label, "rate limit exceeded");
// TODO: i don't think we'll get here. maybe if we ban an IP forever? seems unlikely Ok(RateLimitResult::RateLimitedUser(user_data.user_id, None))
debug!(?rate_limiter_label, "rate limit exceeded"); }
Ok(RateLimitResult::RateLimitedUser(user_data.user_id, None)) Err(err) => {
} // internal error, not rate limit being hit
Err(err) => { // TODO: i really want axum to do this for us in a single place.
// internal error, not rate limit being hit error!(?err, "rate limiter is unhappy. allowing ip");
// TODO: i really want axum to do this for us in a single place. Ok(RateLimitResult::AllowedUser(user_data.user_id))
error!(?err, "rate limiter is unhappy. allowing ip");
Ok(RateLimitResult::AllowedUser(user_data.user_id))
}
} }
} else {
// TODO: if no redis, rate limit with a local cache?
todo!("no redis. cannot rate limit")
} }
} else { } else {
Ok(RateLimitResult::AllowedUser(user_data.user_id)) // TODO: if no redis, rate limit with a local cache?
todo!("no redis. cannot rate limit")
} }
} }
} }

View File

@ -98,10 +98,9 @@ pub async fn get_login(
.unwrap_or_else(|| "eip4361".to_string()); .unwrap_or_else(|| "eip4361".to_string());
let message: String = match message_eip.as_str() { let message: String = match message_eip.as_str() {
"eip4361" => message.to_string(),
// https://github.com/spruceid/siwe/issues/98
"eip191_bytes" => Bytes::from(message.eip191_bytes().unwrap()).to_string(), "eip191_bytes" => Bytes::from(message.eip191_bytes().unwrap()).to_string(),
"eip191_hash" => Bytes::from(&message.eip191_hash().unwrap()).to_string(), "eip191_hash" => Bytes::from(&message.eip191_hash().unwrap()).to_string(),
"eip4361" => message.to_string(),
_ => { _ => {
// TODO: this needs the correct error code in the response // TODO: this needs the correct error code in the response
return Err(anyhow::anyhow!("invalid message eip given").into()); return Err(anyhow::anyhow!("invalid message eip given").into());

View File

@ -10,7 +10,7 @@ use futures::future::try_join_all;
use futures::StreamExt; use futures::StreamExt;
use parking_lot::RwLock; use parking_lot::RwLock;
use rand::Rng; use rand::Rng;
use redis_rate_limit::{RedisPool, RedisRateLimit, ThrottleResult}; use redis_rate_limit::{Pool as RedisPool, RedisRateLimit, ThrottleResult};
use serde::ser::{SerializeStruct, Serializer}; use serde::ser::{SerializeStruct, Serializer};
use serde::Serialize; use serde::Serialize;
use std::cmp::min; use std::cmp::min;

View File

@ -56,7 +56,7 @@ impl Web3Connections {
chain_id: u64, chain_id: u64,
server_configs: HashMap<String, Web3ConnectionConfig>, server_configs: HashMap<String, Web3ConnectionConfig>,
http_client: Option<reqwest::Client>, http_client: Option<reqwest::Client>,
redis_pool: Option<redis_rate_limit::RedisPool>, redis_pool: Option<redis_rate_limit::Pool>,
block_map: BlockHashesMap, block_map: BlockHashesMap,
head_block_sender: Option<watch::Sender<ArcBlock>>, head_block_sender: Option<watch::Sender<ArcBlock>>,
min_sum_soft_limit: u32, min_sum_soft_limit: u32,