diff --git a/Cargo.lock b/Cargo.lock index 40870c13..d5543075 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -569,30 +569,6 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a32fd6af2b5827bce66c29053ba0e7c42b9dcab01835835058558c10851a46b" -[[package]] -name = "bb8" -version = "0.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1627eccf3aa91405435ba240be23513eeca466b5dc33866422672264de061582" -dependencies = [ - "async-trait", - "futures-channel", - "futures-util", - "parking_lot 0.12.1", - "tokio", -] - -[[package]] -name = "bb8-redis" -version = "0.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b081e2864000416c5a4543fed63fb8dd979c4a0806b071fddab0e548c6cd4c74" -dependencies = [ - "async-trait", - "bb8", - "redis", -] - [[package]] name = "bech32" version = "0.7.3" @@ -1246,6 +1222,40 @@ dependencies = [ "parking_lot_core 0.9.3", ] +[[package]] +name = "deadpool" +version = "0.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "421fe0f90f2ab22016f32a9881be5134fdd71c65298917084b0c7477cbc3856e" +dependencies = [ + "async-trait", + "deadpool-runtime", + "num_cpus", + "retain_mut", + "serde", + "tokio", +] + +[[package]] +name = "deadpool-redis" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a62ebf187bc30bfc1a14bed4073912b988551d111208fe800b27c32df282481" +dependencies = [ + "deadpool", + "redis", + "serde", +] + +[[package]] +name = "deadpool-runtime" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eaa37046cc0f6c3cc6090fbdbf73ef0b8ef4cfcc37f6befc0020f63e8cf121e1" +dependencies = [ + "tokio", +] + [[package]] name = "der" version = "0.5.1" @@ -3746,7 +3756,7 @@ name = "redis-rate-limit" version = "0.2.0" dependencies = [ "anyhow", - "bb8-redis", + "deadpool-redis", "tokio", "tracing", ] @@ -3845,6 +3855,12 @@ dependencies = [ "winreg", ] +[[package]] +name = "retain_mut" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4389f1d5789befaf6029ebd9f7dac4af7f7e3d61b69d4f30e2ac02b57e7712b0" + [[package]] name = "rfc6979" version = "0.3.0" diff --git a/redis-rate-limit/Cargo.toml b/redis-rate-limit/Cargo.toml index 08830574..428c78a2 100644 --- a/redis-rate-limit/Cargo.toml +++ b/redis-rate-limit/Cargo.toml @@ -6,8 +6,6 @@ edition = "2021" [dependencies] anyhow = "1.0.65" -bb8-redis = "0.11.0" +deadpool-redis = { version = "0.10.2", features = ["rt_tokio_1", "serde"] } tracing = "0.1.36" - -# TODO: i'd prefer not to require tokio here, but we use tokio::time -tokio = "1.21.1" +tokio = {version = "*" } diff --git a/redis-rate-limit/src/errors.rs b/redis-rate-limit/src/errors.rs index 10da546e..43edf9dd 100644 --- a/redis-rate-limit/src/errors.rs +++ b/redis-rate-limit/src/errors.rs @@ -1,17 +1,17 @@ -pub use bb8_redis::bb8::ErrorSink as Bb8ErrorSync; -pub use bb8_redis::redis::RedisError; +// pub use bb8_redis::bb8::ErrorSink as Bb8ErrorSync; +// pub use bb8_redis::redis::RedisError; -use tracing::error; +// use tracing::error; -#[derive(Debug, Clone)] -pub struct RedisErrorSink; +// #[derive(Debug, Clone)] +// pub struct RedisErrorSink; -impl Bb8ErrorSync for RedisErrorSink { - fn sink(&self, err: RedisError) { - error!(?err, "redis error"); - } +// impl Bb8ErrorSync for RedisErrorSink { +// fn sink(&self, err: RedisError) { +// error!(?err, "redis error"); +// } - fn boxed_clone(&self) -> Box> { - Box::new(self.clone()) - } -} +// fn boxed_clone(&self) -> Box> { +// Box::new(self.clone()) +// } +// } diff --git a/redis-rate-limit/src/lib.rs b/redis-rate-limit/src/lib.rs index eb6fd041..c737eb9a 100644 --- a/redis-rate-limit/src/lib.rs +++ b/redis-rate-limit/src/lib.rs @@ -2,19 +2,20 @@ mod errors; use anyhow::Context; -use bb8_redis::redis::pipe; +use deadpool_redis::redis::pipe; use std::ops::Add; use std::time::{SystemTime, UNIX_EPOCH}; use tokio::time::{Duration, Instant}; use tracing::{debug, trace}; -pub use crate::errors::{RedisError, RedisErrorSink}; -pub use bb8_redis::{bb8, redis, RedisConnectionManager}; +pub use deadpool_redis::redis; +pub use deadpool_redis::{Config, Connection, Manager, Pool, Runtime}; -pub type RedisPool = bb8::Pool; +// pub use crate::errors::{RedisError, RedisErrorSink}; +// pub use bb8_redis::{bb8, redis, RedisConnectionManager}; pub struct RedisRateLimit { - pool: RedisPool, + pool: Pool, key_prefix: String, /// The default maximum requests allowed in a period. max_requests_per_period: u64, @@ -30,7 +31,7 @@ pub enum ThrottleResult { impl RedisRateLimit { pub fn new( - pool: RedisPool, + pool: Pool, app: &str, label: &str, max_requests_per_period: u64, diff --git a/web3_proxy/src/app.rs b/web3_proxy/src/app.rs index 9bf53159..c959004b 100644 --- a/web3_proxy/src/app.rs +++ b/web3_proxy/src/app.rs @@ -24,11 +24,7 @@ use hashbrown::HashMap; use metered::{metered, ErrorCount, HitCount, InFlight, ResponseTime, Throughput}; use migration::{Migrator, MigratorTrait}; use moka::future::Cache; -use redis_rate_limit::bb8::PooledConnection; -use redis_rate_limit::{ - bb8::{self, ErrorSink}, - RedisConnectionManager, RedisErrorSink, RedisPool, RedisRateLimit, -}; +use redis_rate_limit::{Config as RedisConfig, Pool as RedisPool, RedisRateLimit, Runtime}; use sea_orm::DatabaseConnection; use serde::Serialize; use serde_json::json; @@ -152,7 +148,7 @@ pub async fn get_migrated_db( impl Web3ProxyApp { pub async fn spawn( top_config: TopConfig, - num_workers: u32, + num_workers: usize, ) -> anyhow::Result<( Arc, Pin>>>, @@ -168,7 +164,10 @@ impl Web3ProxyApp { // first, we connect to mysql and make sure the latest migrations have run let db_conn = if let Some(db_url) = &top_config.app.db_url { - let db_min_connections = top_config.app.db_min_connections.unwrap_or(num_workers); + let db_min_connections = top_config + .app + .db_min_connections + .unwrap_or(num_workers as u32); // TODO: what default multiple? let redis_max_connections = top_config @@ -212,26 +211,26 @@ impl Web3ProxyApp { // TODO: scrub credentials and then include the redis_url in logs info!("Connecting to redis"); - let manager = RedisConnectionManager::new(redis_url.as_ref())?; - - let redis_min_connections = - top_config.app.redis_min_connections.unwrap_or(num_workers); - + // TODO: what is a good default? let redis_max_connections = top_config .app .redis_max_connections - .unwrap_or(redis_min_connections * 2); + .unwrap_or(num_workers * 2); - // TODO: min_idle? - // TODO: set max_size based on max expected concurrent connections? set based on num_workers? - let builder = bb8::Pool::builder() - .error_sink(RedisErrorSink.boxed_clone()) - .min_idle(Some(redis_min_connections)) - .max_size(redis_max_connections); + // TODO: what are reasonable timeouts + let redis_pool = RedisConfig::from_url(redis_url) + .builder()? + .create_timeout(Some(Duration::from_secs(2))) + .max_size(redis_max_connections) + .recycle_timeout(Some(Duration::from_secs(2))) + .runtime(Runtime::Tokio1) + .wait_timeout(Some(Duration::from_secs(2))) + .build()?; - let pool = builder.build(manager).await?; + // test the pool + redis_pool.get().await.context("Redis connection failed")?; - Some(pool) + Some(redis_pool) } None => { warn!("no redis connection"); @@ -611,7 +610,7 @@ impl Web3ProxyApp { } #[instrument(skip_all)] - pub async fn redis_conn(&self) -> anyhow::Result> { + pub async fn redis_conn(&self) -> anyhow::Result { match self.redis_pool.as_ref() { None => Err(anyhow::anyhow!("no redis server configured")), Some(redis_pool) => { diff --git a/web3_proxy/src/bin/web3_proxy.rs b/web3_proxy/src/bin/web3_proxy.rs index ea849cac..64940265 100644 --- a/web3_proxy/src/bin/web3_proxy.rs +++ b/web3_proxy/src/bin/web3_proxy.rs @@ -64,7 +64,7 @@ fn run( // start tokio's async runtime let rt = rt_builder.build()?; - let num_workers = rt.metrics().num_workers() as u32; + let num_workers = rt.metrics().num_workers(); debug!(?num_workers); rt.block_on(async { diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index 1f66755d..1f3477d1 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -64,12 +64,9 @@ pub struct AppConfig { #[serde(default = "default_public_rate_limit_per_minute")] pub public_rate_limit_per_minute: u64, pub redis_url: Option, - /// minimum size of the connection pool for the cache - /// If none, the number of workers are used - pub redis_min_connections: Option, /// maximum size of the connection pool for the cache /// If none, the minimum * 2 is used - pub redis_max_connections: Option, + pub redis_max_connections: Option, #[serde(default = "default_response_cache_max_bytes")] pub response_cache_max_bytes: usize, /// the stats page url for an anonymous user. @@ -119,7 +116,7 @@ impl Web3ConnectionConfig { pub async fn spawn( self, name: String, - redis_pool: Option, + redis_pool: Option, chain_id: u64, http_client: Option, http_interval_sender: Option>>, diff --git a/web3_proxy/src/frontend/errors.rs b/web3_proxy/src/frontend/errors.rs index 5ab08b26..a92f9085 100644 --- a/web3_proxy/src/frontend/errors.rs +++ b/web3_proxy/src/frontend/errors.rs @@ -5,7 +5,7 @@ use axum::{ Json, }; use derive_more::From; -use redis_rate_limit::{bb8::RunError, RedisError}; +use redis_rate_limit::redis::RedisError; use sea_orm::DbErr; use std::{error::Error, net::IpAddr}; use tokio::time::Instant; @@ -18,8 +18,9 @@ pub type FrontendResult = Result; pub enum FrontendErrorResponse { Anyhow(anyhow::Error), Box(Box), + // TODO: update these for Redis Redis(RedisError), - RedisRun(RunError), + // RedisRun(RunError), Response(Response), Database(DbErr), RateLimitedUser(u64, Option), @@ -67,17 +68,17 @@ impl IntoResponse for FrontendErrorResponse { ), ) } - Self::RedisRun(err) => { - warn!(?err, "redis run"); - ( - StatusCode::INTERNAL_SERVER_ERROR, - JsonRpcForwardedResponse::from_str( - "redis run error!", - Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), - None, - ), - ) - } + // Self::RedisRun(err) => { + // warn!(?err, "redis run"); + // ( + // StatusCode::INTERNAL_SERVER_ERROR, + // JsonRpcForwardedResponse::from_str( + // "redis run error!", + // Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), + // None, + // ), + // ) + // } Self::Response(r) => { debug_assert_ne!(r.status(), StatusCode::OK); return r; diff --git a/web3_proxy/src/frontend/rate_limit.rs b/web3_proxy/src/frontend/rate_limit.rs index 4aeb00d4..007ff97a 100644 --- a/web3_proxy/src/frontend/rate_limit.rs +++ b/web3_proxy/src/frontend/rate_limit.rs @@ -172,56 +172,50 @@ impl Web3ProxyApp { // user key is valid. now check rate limits // TODO: this is throwing errors when curve-api hits us with high concurrency. investigate i think its bb8's fault - if false { - if let Some(rate_limiter) = &self.frontend_rate_limiter { - // TODO: query redis in the background so that users don't have to wait on this network request - // TODO: better key? have a prefix so its easy to delete all of these - // TODO: we should probably hash this or something - let rate_limiter_label = format!("user-{}", user_key); + if let Some(rate_limiter) = &self.frontend_rate_limiter { + // TODO: query redis in the background so that users don't have to wait on this network request + // TODO: better key? have a prefix so its easy to delete all of these + // TODO: we should probably hash this or something + let rate_limiter_label = format!("user-{}", user_key); - match rate_limiter - .throttle_label( - &rate_limiter_label, - Some(user_data.user_count_per_period), - 1, - ) - .await - { - Ok(ThrottleResult::Allowed) => { - Ok(RateLimitResult::AllowedUser(user_data.user_id)) - } - Ok(ThrottleResult::RetryAt(retry_at)) => { - // TODO: set headers so they know when they can retry - // TODO: debug or trace? - // this is too verbose, but a stat might be good - trace!( - ?rate_limiter_label, - "rate limit exceeded until {:?}", - retry_at - ); - Ok(RateLimitResult::RateLimitedUser( - user_data.user_id, - Some(retry_at), - )) - } - Ok(ThrottleResult::RetryNever) => { - // TODO: i don't think we'll get here. maybe if we ban an IP forever? seems unlikely - debug!(?rate_limiter_label, "rate limit exceeded"); - Ok(RateLimitResult::RateLimitedUser(user_data.user_id, None)) - } - Err(err) => { - // internal error, not rate limit being hit - // TODO: i really want axum to do this for us in a single place. - error!(?err, "rate limiter is unhappy. allowing ip"); - Ok(RateLimitResult::AllowedUser(user_data.user_id)) - } + match rate_limiter + .throttle_label( + &rate_limiter_label, + Some(user_data.user_count_per_period), + 1, + ) + .await + { + Ok(ThrottleResult::Allowed) => Ok(RateLimitResult::AllowedUser(user_data.user_id)), + Ok(ThrottleResult::RetryAt(retry_at)) => { + // TODO: set headers so they know when they can retry + // TODO: debug or trace? + // this is too verbose, but a stat might be good + trace!( + ?rate_limiter_label, + "rate limit exceeded until {:?}", + retry_at + ); + Ok(RateLimitResult::RateLimitedUser( + user_data.user_id, + Some(retry_at), + )) + } + Ok(ThrottleResult::RetryNever) => { + // TODO: i don't think we'll get here. maybe if we ban an IP forever? seems unlikely + debug!(?rate_limiter_label, "rate limit exceeded"); + Ok(RateLimitResult::RateLimitedUser(user_data.user_id, None)) + } + Err(err) => { + // internal error, not rate limit being hit + // TODO: i really want axum to do this for us in a single place. + error!(?err, "rate limiter is unhappy. allowing ip"); + Ok(RateLimitResult::AllowedUser(user_data.user_id)) } - } else { - // TODO: if no redis, rate limit with a local cache? - todo!("no redis. cannot rate limit") } } else { - Ok(RateLimitResult::AllowedUser(user_data.user_id)) + // TODO: if no redis, rate limit with a local cache? + todo!("no redis. cannot rate limit") } } } diff --git a/web3_proxy/src/frontend/users.rs b/web3_proxy/src/frontend/users.rs index c4c6ba4b..ea9f6254 100644 --- a/web3_proxy/src/frontend/users.rs +++ b/web3_proxy/src/frontend/users.rs @@ -98,10 +98,9 @@ pub async fn get_login( .unwrap_or_else(|| "eip4361".to_string()); let message: String = match message_eip.as_str() { - "eip4361" => message.to_string(), - // https://github.com/spruceid/siwe/issues/98 "eip191_bytes" => Bytes::from(message.eip191_bytes().unwrap()).to_string(), "eip191_hash" => Bytes::from(&message.eip191_hash().unwrap()).to_string(), + "eip4361" => message.to_string(), _ => { // TODO: this needs the correct error code in the response return Err(anyhow::anyhow!("invalid message eip given").into()); diff --git a/web3_proxy/src/rpcs/connection.rs b/web3_proxy/src/rpcs/connection.rs index ffaa1a2a..cdb16ed3 100644 --- a/web3_proxy/src/rpcs/connection.rs +++ b/web3_proxy/src/rpcs/connection.rs @@ -10,7 +10,7 @@ use futures::future::try_join_all; use futures::StreamExt; use parking_lot::RwLock; use rand::Rng; -use redis_rate_limit::{RedisPool, RedisRateLimit, ThrottleResult}; +use redis_rate_limit::{Pool as RedisPool, RedisRateLimit, ThrottleResult}; use serde::ser::{SerializeStruct, Serializer}; use serde::Serialize; use std::cmp::min; diff --git a/web3_proxy/src/rpcs/connections.rs b/web3_proxy/src/rpcs/connections.rs index 86401fca..2cfaa3a7 100644 --- a/web3_proxy/src/rpcs/connections.rs +++ b/web3_proxy/src/rpcs/connections.rs @@ -56,7 +56,7 @@ impl Web3Connections { chain_id: u64, server_configs: HashMap, http_client: Option, - redis_pool: Option, + redis_pool: Option, block_map: BlockHashesMap, head_block_sender: Option>, min_sum_soft_limit: u32,