diff --git a/config/example.toml b/config/example.toml index 7de66214..0d915dc7 100644 --- a/config/example.toml +++ b/config/example.toml @@ -7,7 +7,7 @@ min_sum_soft_limit = 2000 min_synced_rpcs = 2 redis_url = "redis://dev-redis:6379/" # TODO: how do we find the optimal redis_max_connections? too high actually ends up being slower -redis_max_connections = 99 +redis_max_connections = 300 redirect_public_url = "https://llamanodes.com/free-rpc-stats" redirect_user_url = "https://llamanodes.com/user-rpc-stats/{{user_id}}" public_rate_limit_per_minute = 0 diff --git a/redis-rate-limit/src/errors.rs b/redis-rate-limit/src/errors.rs index 0f4be3e8..10da546e 100644 --- a/redis-rate-limit/src/errors.rs +++ b/redis-rate-limit/src/errors.rs @@ -1,14 +1,14 @@ pub use bb8_redis::bb8::ErrorSink as Bb8ErrorSync; pub use bb8_redis::redis::RedisError; -use tracing::warn; +use tracing::error; #[derive(Debug, Clone)] pub struct RedisErrorSink; impl Bb8ErrorSync for RedisErrorSink { fn sink(&self, err: RedisError) { - warn!(?err, "redis error"); + error!(?err, "redis error"); } fn boxed_clone(&self) -> Box> { diff --git a/redis-rate-limit/src/lib.rs b/redis-rate-limit/src/lib.rs index 132d7717..574c170b 100644 --- a/redis-rate-limit/src/lib.rs +++ b/redis-rate-limit/src/lib.rs @@ -5,7 +5,7 @@ use anyhow::Context; use bb8_redis::redis::pipe; use std::ops::Add; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; -use tracing::trace; +use tracing::{debug, trace}; pub use crate::errors::{RedisError, RedisErrorSink}; pub use bb8_redis::{bb8, redis, RedisConnectionManager}; @@ -72,12 +72,15 @@ impl RedisRateLimit { let mut conn = self.pool.get().await?; + // TODO: at high concurency, i think this is giving errors + // TODO: i'm starting to think that bb8 has a bug let x: Vec = pipe() // we could get the key first, but that means an extra redis call for every check. this seems better .incr(&throttle_key, count) - // set expiration the first time we set the key. ignore the result + // set expiration each time we set the key. ignore the result .expire(&throttle_key, self.period as usize) - // .arg("NX") // TODO: this works in redis, but not elasticache + // TODO: NX will make it only set the expiration the first time. works in redis, but not elasticache + // .arg("NX") .ignore() // do the query .query_async(&mut *conn) @@ -91,12 +94,13 @@ impl RedisRateLimit { let retry_at = Instant::now().add(Duration::from_secs_f32(seconds_left_in_period)); - trace!(%label, ?retry_at, "rate limited"); + debug!(%label, ?retry_at, "rate limited: {}/{}", new_count, max_per_period); - return Ok(ThrottleResult::RetryAt(retry_at)); + Ok(ThrottleResult::RetryAt(retry_at)) + } else { + trace!(%label, "NOT rate limited: {}/{}", new_count, max_per_period); + Ok(ThrottleResult::Allowed) } - - Ok(ThrottleResult::Allowed) } #[inline] diff --git a/web3_proxy/src/app.rs b/web3_proxy/src/app.rs index 169b589e..7007a0b2 100644 --- a/web3_proxy/src/app.rs +++ b/web3_proxy/src/app.rs @@ -39,7 +39,7 @@ use tokio::sync::{broadcast, watch}; use tokio::task::JoinHandle; use tokio::time::{timeout, Instant}; use tokio_stream::wrappers::{BroadcastStream, WatchStream}; -use tracing::{debug, info, info_span, instrument, trace, warn, Instrument}; +use tracing::{info, info_span, instrument, trace, warn, Instrument}; use uuid::Uuid; // TODO: make this customizable? @@ -78,8 +78,6 @@ pub struct Web3ProxyApp { // TODO: broadcast channel instead? head_block_receiver: watch::Receiver, pending_tx_sender: broadcast::Sender, - /// TODO: this doesn't ever get incremented! - pub active_requests: AtomicUsize, pub config: AppConfig, pub db_conn: Option, /// store pending transactions that we've seen so that we don't send duplicates to subscribers @@ -265,6 +263,7 @@ impl Web3ProxyApp { handles.push(balanced_handle); let private_rpcs = if private_rpcs.is_empty() { + // TODO: do None instead of clone? warn!("No private relays configured. Any transactions will be broadcast to the public mempool!"); balanced_rpcs.clone() } else { @@ -310,7 +309,6 @@ impl Web3ProxyApp { config: top_config.app, balanced_rpcs, private_rpcs, - active_requests: Default::default(), response_cache, head_block_receiver, pending_tx_sender, @@ -527,8 +525,7 @@ impl Web3ProxyApp { request: JsonRpcRequestEnum, ) -> anyhow::Result { // TODO: this should probably be trace level - // trace!(?request, "proxy_web3_rpc"); - debug!(?request, "proxying request"); + trace!(?request, "proxy_web3_rpc"); // even though we have timeouts on the requests to our backend providers, // we need a timeout for the incoming request so that retries don't run forever @@ -545,8 +542,7 @@ impl Web3ProxyApp { }; // TODO: this should probably be trace level - // trace!(?response, "Forwarding"); - debug!(?response.ids(), "forwarding response"); + trace!(?response, "Forwarding"); Ok(response) } diff --git a/web3_proxy/src/bin/web3_proxy.rs b/web3_proxy/src/bin/web3_proxy.rs index 8ec7ec6c..0057fb8a 100644 --- a/web3_proxy/src/bin/web3_proxy.rs +++ b/web3_proxy/src/bin/web3_proxy.rs @@ -127,7 +127,7 @@ fn main() -> anyhow::Result<()> { // if RUST_LOG isn't set, configure a default // TODO: is there a better way to do this? if std::env::var("RUST_LOG").is_err() { - std::env::set_var("RUST_LOG", "info,web3_proxy=debug"); + std::env::set_var("RUST_LOG", "info,redis_rate_limit=debug,web3_proxy=debug"); } // install global collector configured based on RUST_LOG env var. diff --git a/web3_proxy/src/frontend/http.rs b/web3_proxy/src/frontend/http.rs index 20cfba0e..5ffc807e 100644 --- a/web3_proxy/src/frontend/http.rs +++ b/web3_proxy/src/frontend/http.rs @@ -1,7 +1,7 @@ use crate::app::Web3ProxyApp; use axum::{http::StatusCode, response::IntoResponse, Extension, Json}; use serde_json::json; -use std::sync::{atomic::Ordering, Arc}; +use std::sync::Arc; /// Health check page for load balancers to use pub async fn health(Extension(app): Extension>) -> impl IntoResponse { @@ -15,15 +15,13 @@ pub async fn health(Extension(app): Extension>) -> impl IntoRe /// Very basic status page /// TODO: replace this with proper stats and monitoring pub async fn status(Extension(app): Extension>) -> impl IntoResponse { - // TODO: what else should we include? uptime? prometheus? + // TODO: what else should we include? uptime? let body = json!({ "balanced_rpcs": app.balanced_rpcs, "private_rpcs": app.private_rpcs, - "num_active_requests": app.active_requests.load(Ordering::Acquire), - // TODO: include number of items? "pending_transactions_count": app.pending_transactions.entry_count(), "pending_transactions_size": app.pending_transactions.weighted_size(), }); - (StatusCode::OK, Json(body)) + Json(body) } diff --git a/web3_proxy/src/frontend/rate_limit.rs b/web3_proxy/src/frontend/rate_limit.rs index c3d32492..50d09c7e 100644 --- a/web3_proxy/src/frontend/rate_limit.rs +++ b/web3_proxy/src/frontend/rate_limit.rs @@ -11,7 +11,7 @@ use sea_orm::{ }; use std::{net::IpAddr, time::Duration}; use tokio::time::Instant; -use tracing::debug; +use tracing::{debug, error}; use uuid::Uuid; pub enum RateLimitResult { @@ -119,6 +119,7 @@ impl TryFrom for RequestFrom { impl Web3ProxyApp { pub async fn rate_limit_by_ip(&self, ip: IpAddr) -> anyhow::Result { // TODO: dry this up with rate_limit_by_key + // TODO: have a local cache because if we hit redis too hard we get errors if let Some(rate_limiter) = &self.rate_limiter { let rate_limiter_label = format!("ip-{}", ip); @@ -136,12 +137,13 @@ impl Web3ProxyApp { } Ok(ThrottleResult::RetryNever) => { // TODO: prettier error for the user - return Err(anyhow::anyhow!("blocked by rate limiter")); + return Err(anyhow::anyhow!("ip blocked by rate limiter")); } Err(err) => { // internal error, not rate limit being hit // TODO: i really want axum to do this for us in a single place. - return Err(err); + error!(?err, "redis is unhappy. allowing ip"); + return Ok(RateLimitResult::AllowedIp(ip)); } } } else { @@ -194,7 +196,7 @@ impl Web3ProxyApp { } }; - // save for the next run + // save for the next run self.user_cache.insert(user_key, user_data).await; Ok(user_data) @@ -234,20 +236,39 @@ impl Web3ProxyApp { // user key is valid. now check rate limits if let Some(rate_limiter) = &self.rate_limiter { // TODO: query redis in the background so that users don't have to wait on this network request - if rate_limiter + // TODO: better key? have a prefix so its easy to delete all of these + let rate_limiter_label = user_key.to_string(); + + match rate_limiter .throttle_label( - &user_key.to_string(), + &rate_limiter_label, Some(user_data.user_count_per_period), 1, ) .await - .is_err() { - // TODO: set headers so they know when they can retry - // warn!(?ip, "public rate limit exceeded"); // this is too verbose, but a stat might be good - // TODO: use their id if possible - // TODO: StatusCode::TOO_MANY_REQUESTS - return Err(anyhow::anyhow!("too many requests from this key")); + Ok(ThrottleResult::Allowed) => {} + Ok(ThrottleResult::RetryAt(_retry_at)) => { + // TODO: set headers so they know when they can retry + debug!(?rate_limiter_label, "user rate limit exceeded"); // this is too verbose, but a stat might be good + // TODO: use their id if possible + return Ok(RateLimitResult::UserRateLimitExceeded(user_data.user_id)); + } + Ok(ThrottleResult::RetryNever) => { + // TODO: prettier error for the user + return Err(anyhow::anyhow!("user blocked by rate limiter")); + } + Err(err) => { + // internal error, not rate limit being hit + // rather than have downtime, i think its better to just use in-process rate limiting + // TODO: in-process rate limits that pipe into redis + error!(?err, "redis is unhappy. allowing ip"); + return Ok(RateLimitResult::AllowedUser(user_data.user_id)); + } // // TODO: set headers so they know when they can retry + // // warn!(?ip, "public rate limit exceeded"); // this is too verbose, but a stat might be good + // // TODO: use their id if possible + // // TODO: StatusCode::TOO_MANY_REQUESTS + // return Err(anyhow::anyhow!("too many requests from this key")); } } else { // TODO: if no redis, rate limit with a local cache? diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index 2c0ad614..a7233b8a 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -458,7 +458,7 @@ impl Web3Connections { debug!(con_head=%heavy_block_id, rpc_head=%rpc_head_str, %rpc, "con {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns) } else { // hash changed - info!(con_head=%heavy_block_id, rpc_head=%rpc_head_str, old=%old_block_id, %rpc, "unc {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns); + info!(con_head=%heavy_block_id, old=%old_block_id, rpc_head=%rpc_head_str, %rpc, "unc {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns); // todo!("handle equal by updating the cannonical chain"); self.save_block(&heavy_block, true).await?; diff --git a/web3_proxy/src/rpcs/connection.rs b/web3_proxy/src/rpcs/connection.rs index 2db78290..ae809d44 100644 --- a/web3_proxy/src/rpcs/connection.rs +++ b/web3_proxy/src/rpcs/connection.rs @@ -28,6 +28,9 @@ pub struct Web3Connection { url: String, /// keep track of currently open requests. We sort on this pub(super) active_requests: AtomicU32, + /// keep track of total requests + /// TODO: is this type okay? + pub(super) total_requests: AtomicU64, /// provider is in a RwLock so that we can replace it if re-connecting /// it is an async lock because we hold it open across awaits pub(super) provider: AsyncRwLock>>, @@ -35,7 +38,7 @@ pub struct Web3Connection { hard_limit: Option, /// used for load balancing to the least loaded server pub(super) soft_limit: u32, - /// TODO: have an enum for this so that "no limit" prints pretty + /// TODO: have an enum for this so that "no limit" prints pretty? block_data_limit: AtomicU64, /// Lower weight are higher priority when sending requests pub(super) weight: u32, @@ -82,6 +85,7 @@ impl Web3Connection { name, url: url_str.clone(), active_requests: 0.into(), + total_requests: 0.into(), provider: AsyncRwLock::new(Some(Arc::new(provider))), hard_limit, soft_limit, @@ -777,13 +781,18 @@ impl Serialize for Web3Connection { S: Serializer, { // 3 is the number of fields in the struct. - let mut state = serializer.serialize_struct("Web3Connection", 5)?; + let mut state = serializer.serialize_struct("Web3Connection", 6)?; // the url is excluded because it likely includes private information. just show the name state.serialize_field("name", &self.name)?; let block_data_limit = self.block_data_limit.load(atomic::Ordering::Relaxed); - state.serialize_field("block_data_limit", &block_data_limit)?; + + if block_data_limit == u64::MAX { + state.serialize_field("block_data_limit", "None")?; + } else { + state.serialize_field("block_data_limit", &block_data_limit)?; + } state.serialize_field("soft_limit", &self.soft_limit)?; @@ -792,6 +801,11 @@ impl Serialize for Web3Connection { &self.active_requests.load(atomic::Ordering::Relaxed), )?; + state.serialize_field( + "total_requests", + &self.total_requests.load(atomic::Ordering::Relaxed), + )?; + let head_block_id = &*self.head_block_id.read(); state.serialize_field("head_block_id", head_block_id)?; diff --git a/web3_proxy/src/rpcs/connections.rs b/web3_proxy/src/rpcs/connections.rs index 2dbba595..23496ee0 100644 --- a/web3_proxy/src/rpcs/connections.rs +++ b/web3_proxy/src/rpcs/connections.rs @@ -56,7 +56,7 @@ impl Web3Connections { chain_id: u64, server_configs: HashMap, http_client: Option, - redis_client_pool: Option, + redis_pool: Option, block_map: BlockHashesMap, head_block_sender: Option>, min_sum_soft_limit: u32, @@ -83,7 +83,7 @@ impl Web3Connections { async move { loop { - // TODO: every time a head_block arrives (maybe with a small delay), or on the interval. + // TODO: every time a head_block arrives (with a small delay for known slow servers), or on the interval. interval.tick().await; trace!("http interval ready"); @@ -108,7 +108,7 @@ impl Web3Connections { .into_iter() .map(|(server_name, server_config)| { let http_client = http_client.clone(); - let redis_client_pool = redis_client_pool.clone(); + let redis_pool = redis_pool.clone(); let http_interval_sender = http_interval_sender.clone(); let block_sender = if head_block_sender.is_some() { @@ -124,7 +124,7 @@ impl Web3Connections { server_config .spawn( server_name, - redis_client_pool, + redis_pool, chain_id, http_client, http_interval_sender, @@ -159,11 +159,16 @@ impl Web3Connections { } } - // TODO: less than 3? what should we do here? - if connections.len() < 2 { - warn!("Only {} connection(s)!", connections.len()); + if connections.len() < min_synced_rpcs { + return Err(anyhow::anyhow!( + "Only {}/{} connections!", + connections.len(), + min_synced_rpcs + )); } + // TODO: safety check on sum soft limit + let synced_connections = SyncedConnections::default(); // TODO: sizing and expiration on these caches! diff --git a/web3_proxy/src/rpcs/provider.rs b/web3_proxy/src/rpcs/provider.rs index 654d23ee..baf73ef2 100644 --- a/web3_proxy/src/rpcs/provider.rs +++ b/web3_proxy/src/rpcs/provider.rs @@ -1,8 +1,7 @@ use std::time::Duration; use derive_more::From; -use ethers::providers::Middleware; -use tracing::{error_span, info_span, instrument, Instrument}; +use tracing::{info_span, instrument, Instrument}; /// Use HTTP and WS providers. // TODO: instead of an enum, I tried to use Box, but hit diff --git a/web3_proxy/src/rpcs/request.rs b/web3_proxy/src/rpcs/request.rs index 32667d19..0a037090 100644 --- a/web3_proxy/src/rpcs/request.rs +++ b/web3_proxy/src/rpcs/request.rs @@ -22,12 +22,18 @@ pub struct OpenRequestHandle(Arc); impl OpenRequestHandle { pub fn new(connection: Arc) -> Self { - // TODO: attach a unique id to this? + // TODO: attach a unique id to this? customer requests have one, but not internal queries // TODO: what ordering?! connection .active_requests .fetch_add(1, atomic::Ordering::AcqRel); + // TODO: handle overflows? + // TODO: what ordering? + connection + .total_requests + .fetch_add(1, atomic::Ordering::Relaxed); + Self(connection) }