diff --git a/deferred-rate-limiter/src/lib.rs b/deferred-rate-limiter/src/lib.rs index eef1640d..2543b0be 100644 --- a/deferred-rate-limiter/src/lib.rs +++ b/deferred-rate-limiter/src/lib.rs @@ -199,6 +199,7 @@ where Ok(rate_limit_f.await) } else { // rate limit has enough headroom that it should be safe to do this in the background + // TODO: send an error here somewhere tokio::spawn(rate_limit_f); Ok(DeferredRateLimitResult::Allowed) diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index b6c63d6b..16f46f5b 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -18,6 +18,7 @@ use crate::rpcs::transactions::TxStatus; use crate::user_token::UserBearerToken; use anyhow::Context; use axum::headers::{Origin, Referer, UserAgent}; +use chrono::Utc; use deferred_rate_limiter::DeferredRateLimiter; use derive_more::From; use entities::sea_orm_active_enums::LogLevel; @@ -37,6 +38,7 @@ use migration::sea_orm::{self, ConnectionTrait, Database, DatabaseConnection}; use migration::sea_query::table::ColumnDef; use migration::{Alias, DbErr, Migrator, MigratorTrait, Table}; use moka::future::Cache; +use redis_rate_limiter::redis::AsyncCommands; use redis_rate_limiter::{DeadpoolRuntime, RedisConfig, RedisPool, RedisRateLimiter}; use serde::Serialize; use serde_json::json; @@ -695,20 +697,53 @@ impl Web3ProxyApp { Ok((app, cancellable_handles, important_background_handles).into()) } - pub fn prometheus_metrics(&self) -> String { + pub async fn prometheus_metrics(&self) -> String { let globals = HashMap::new(); // TODO: what globals? should this be the hostname or what? // globals.insert("service", "web3_proxy"); + #[derive(Serialize)] + struct RecentIps(i64); + + let recent_ips = if let Ok(mut redis_conn) = self.redis_conn().await { + // TODO: delete any hash entries where + const ONE_DAY: i64 = 86400; + + let oldest = Utc::now().timestamp() - ONE_DAY; + + let recent_ips_key = format!("recent_ips:{}", self.config.chain_id); + + // delete any entries that are too old + // TODO: pipe + if let Err(err) = redis_conn + .zrembyscore::<_, _, _, u64>(recent_ips_key, i64::MIN, oldest) + .await + { + warn!("unable to clear recent_ips: {}", err); + } + + match redis_conn.zcount(recent_ips_key, i64::MIN, i64::MAX).await { + Ok(count) => RecentIps(count), + Err(err) => { + warn!("unable to count recent_ips: {}", err); + RecentIps(-1) + } + } + } else { + RecentIps(-1) + }; + #[derive(Serialize)] struct CombinedMetrics<'a> { app: &'a Web3ProxyAppMetrics, backend_rpc: &'a OpenRequestHandleMetrics, + recent_ips: RecentIps, } let metrics = CombinedMetrics { app: &self.app_metrics, backend_rpc: &self.open_request_handle_metrics, + recent_ips, }; serde_prometheus::to_string(&metrics, Some("web3_proxy"), globals) diff --git a/web3_proxy/src/app_stats.rs b/web3_proxy/src/app_stats.rs index 9028db07..204effd5 100644 --- a/web3_proxy/src/app_stats.rs +++ b/web3_proxy/src/app_stats.rs @@ -315,6 +315,7 @@ impl StatEmitter { period_seconds, }; + // TODO: send any errors somewhere let handle = tokio::spawn(async move { new.stat_loop(stat_receiver, shutdown_receiver).await }); diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index 0e7bd188..d8574289 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -120,6 +120,9 @@ pub struct AppConfig { /// None = allow all requests pub public_requests_per_period: Option, + /// Salt for hashing recent ips + pub public_recent_ips_salt: Option, + /// RPC responses are cached locally #[serde(default = "default_response_cache_max_bytes")] pub response_cache_max_bytes: usize, diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index b05ec64e..0b55cb41 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -10,12 +10,16 @@ use axum::headers::{Header, Origin, Referer, UserAgent}; use chrono::Utc; use deferred_rate_limiter::DeferredRateLimitResult; use entities::{login, rpc_key, user, user_tier}; +use ethers::types::Bytes; +use ethers::utils::keccak256; +use futures::TryFutureExt; use hashbrown::HashMap; use http::HeaderValue; use ipnet::IpNet; -use log::error; +use log::{error, warn, debug}; use migration::sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter}; use parking_lot::Mutex; +use redis_rate_limiter::redis::AsyncCommands; use redis_rate_limiter::RedisRateLimitResult; use std::fmt::Display; use std::sync::atomic::{AtomicBool, AtomicU64}; @@ -319,7 +323,7 @@ pub async fn login_is_authorized( /// semaphore won't ever be None, but its easier if key auth and ip auth work the same way pub async fn ip_is_authorized( - app: &Web3ProxyApp, + app: &Arc, ip: IpAddr, origin: Option, ) -> Result<(Authorization, Option), FrontendErrorResponse> { @@ -338,7 +342,42 @@ pub async fn ip_is_authorized( x => unimplemented!("rate_limit_by_ip shouldn't ever see these: {:?}", x), }; - // TODO: in the background, add the ip to a recent_users map + let app = app.clone(); + + // TODO: in the background, add the ip to a recent_ips map + if app.config.public_recent_ips_salt.is_some() { + let f = async move { + let now = Utc::now().timestamp(); + + let mut redis_conn = app.redis_conn().await?; + + let salt = app + .config + .public_recent_ips_salt + .as_ref() + .expect("public_recent_ips_salt must exist in here"); + + // TODO: how should we salt and hash the ips? do it faster? + let salted_ip = format!("{}:{}", salt, ip); + + let hashed_ip = Bytes::from(keccak256(salted_ip.as_bytes())); + + let recent_ips_key = format!("recent_ips:{}", app.config.chain_id); + + redis_conn + .zadd(recent_ips_key, hashed_ip.to_string(), now) + .await?; + + Ok::<_, anyhow::Error>(()) + } + .map_err(|err| { + warn!("background update of recent_ips failed: {}", err); + + err + }); + + tokio::spawn(f); + } Ok((authorization, semaphore)) } diff --git a/web3_proxy/src/frontend/mod.rs b/web3_proxy/src/frontend/mod.rs index e4afd657..cd249f85 100644 --- a/web3_proxy/src/frontend/mod.rs +++ b/web3_proxy/src/frontend/mod.rs @@ -79,8 +79,6 @@ pub async fn serve(port: u16, proxy_app: Arc) -> anyhow::Result<() .route("/user/stats/detailed", get(users::user_stats_detailed_get)) .route("/user/logout", post(users::user_logout_post)) .route("/status", get(status::status)) - // TODO: make this optional or remove it since it is available on another port - .route("/prometheus", get(status::prometheus)) // layers are ordered bottom up // the last layer is first for requests and last for responses // Mark the `Authorization` request header as sensitive so it doesn't show in logs diff --git a/web3_proxy/src/frontend/rpc_proxy_http.rs b/web3_proxy/src/frontend/rpc_proxy_http.rs index cf590fe1..8844eaa4 100644 --- a/web3_proxy/src/frontend/rpc_proxy_http.rs +++ b/web3_proxy/src/frontend/rpc_proxy_http.rs @@ -30,7 +30,8 @@ pub async fn proxy_web3_rpc( let authorization = Arc::new(authorization); - let (response, rpcs, _semaphore) = app.proxy_web3_rpc(authorization, payload) + let (response, rpcs, _semaphore) = app + .proxy_web3_rpc(authorization, payload) .await .map(|(x, y)| (x, y, semaphore))?; @@ -80,7 +81,8 @@ pub async fn proxy_web3_rpc_with_key( let authorization = Arc::new(authorization); - let (response, rpcs, _semaphore) = app.proxy_web3_rpc(authorization, payload) + let (response, rpcs, _semaphore) = app + .proxy_web3_rpc(authorization, payload) .await .map(|(x, y)| (x, y, semaphore))?; diff --git a/web3_proxy/src/frontend/status.rs b/web3_proxy/src/frontend/status.rs index 5c1a1423..e18d60c5 100644 --- a/web3_proxy/src/frontend/status.rs +++ b/web3_proxy/src/frontend/status.rs @@ -22,14 +22,6 @@ pub async fn health(Extension(app): Extension>) -> impl IntoRe } } -/// Prometheus metrics. -/// -/// TODO: when done debugging, remove this and only allow access on a different port -#[debug_handler] -pub async fn prometheus(Extension(app): Extension>) -> impl IntoResponse { - app.prometheus_metrics() -} - /// Very basic status page. /// /// TODO: replace this with proper stats and monitoring diff --git a/web3_proxy/src/metrics_frontend.rs b/web3_proxy/src/metrics_frontend.rs index 48b47553..2eb2170a 100644 --- a/web3_proxy/src/metrics_frontend.rs +++ b/web3_proxy/src/metrics_frontend.rs @@ -43,7 +43,7 @@ pub async fn serve(app: Arc, port: u16) -> anyhow::Result<()> { } async fn root(Extension(app): Extension>) -> Response { - let serialized = app.prometheus_metrics(); + let serialized = app.prometheus_metrics().await; let mut r = serialized.into_response();