salted recent ip tracking

This commit is contained in:
Bryan Stitt 2022-12-27 22:11:18 -08:00
parent 853ee354c9
commit d9c01a59a3
9 changed files with 88 additions and 17 deletions

@ -199,6 +199,7 @@ where
Ok(rate_limit_f.await)
} else {
// rate limit has enough headroom that it should be safe to do this in the background
// TODO: send an error here somewhere
tokio::spawn(rate_limit_f);
Ok(DeferredRateLimitResult::Allowed)

@ -18,6 +18,7 @@ use crate::rpcs::transactions::TxStatus;
use crate::user_token::UserBearerToken;
use anyhow::Context;
use axum::headers::{Origin, Referer, UserAgent};
use chrono::Utc;
use deferred_rate_limiter::DeferredRateLimiter;
use derive_more::From;
use entities::sea_orm_active_enums::LogLevel;
@ -37,6 +38,7 @@ use migration::sea_orm::{self, ConnectionTrait, Database, DatabaseConnection};
use migration::sea_query::table::ColumnDef;
use migration::{Alias, DbErr, Migrator, MigratorTrait, Table};
use moka::future::Cache;
use redis_rate_limiter::redis::AsyncCommands;
use redis_rate_limiter::{DeadpoolRuntime, RedisConfig, RedisPool, RedisRateLimiter};
use serde::Serialize;
use serde_json::json;
@ -695,20 +697,53 @@ impl Web3ProxyApp {
Ok((app, cancellable_handles, important_background_handles).into())
}
pub fn prometheus_metrics(&self) -> String {
pub async fn prometheus_metrics(&self) -> String {
let globals = HashMap::new();
// TODO: what globals? should this be the hostname or what?
// globals.insert("service", "web3_proxy");
#[derive(Serialize)]
struct RecentIps(i64);
let recent_ips = if let Ok(mut redis_conn) = self.redis_conn().await {
// TODO: delete any hash entries where
const ONE_DAY: i64 = 86400;
let oldest = Utc::now().timestamp() - ONE_DAY;
let recent_ips_key = format!("recent_ips:{}", self.config.chain_id);
// delete any entries that are too old
// TODO: pipe
if let Err(err) = redis_conn
.zrembyscore::<_, _, _, u64>(recent_ips_key, i64::MIN, oldest)
.await
{
warn!("unable to clear recent_ips: {}", err);
}
match redis_conn.zcount(recent_ips_key, i64::MIN, i64::MAX).await {
Ok(count) => RecentIps(count),
Err(err) => {
warn!("unable to count recent_ips: {}", err);
RecentIps(-1)
}
}
} else {
RecentIps(-1)
};
#[derive(Serialize)]
struct CombinedMetrics<'a> {
app: &'a Web3ProxyAppMetrics,
backend_rpc: &'a OpenRequestHandleMetrics,
recent_ips: RecentIps,
}
let metrics = CombinedMetrics {
app: &self.app_metrics,
backend_rpc: &self.open_request_handle_metrics,
recent_ips,
};
serde_prometheus::to_string(&metrics, Some("web3_proxy"), globals)

@ -315,6 +315,7 @@ impl StatEmitter {
period_seconds,
};
// TODO: send any errors somewhere
let handle =
tokio::spawn(async move { new.stat_loop(stat_receiver, shutdown_receiver).await });

@ -120,6 +120,9 @@ pub struct AppConfig {
/// None = allow all requests
pub public_requests_per_period: Option<u64>,
/// Salt for hashing recent ips
pub public_recent_ips_salt: Option<String>,
/// RPC responses are cached locally
#[serde(default = "default_response_cache_max_bytes")]
pub response_cache_max_bytes: usize,

@ -10,12 +10,16 @@ use axum::headers::{Header, Origin, Referer, UserAgent};
use chrono::Utc;
use deferred_rate_limiter::DeferredRateLimitResult;
use entities::{login, rpc_key, user, user_tier};
use ethers::types::Bytes;
use ethers::utils::keccak256;
use futures::TryFutureExt;
use hashbrown::HashMap;
use http::HeaderValue;
use ipnet::IpNet;
use log::error;
use log::{error, warn, debug};
use migration::sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter};
use parking_lot::Mutex;
use redis_rate_limiter::redis::AsyncCommands;
use redis_rate_limiter::RedisRateLimitResult;
use std::fmt::Display;
use std::sync::atomic::{AtomicBool, AtomicU64};
@ -319,7 +323,7 @@ pub async fn login_is_authorized(
/// semaphore won't ever be None, but its easier if key auth and ip auth work the same way
pub async fn ip_is_authorized(
app: &Web3ProxyApp,
app: &Arc<Web3ProxyApp>,
ip: IpAddr,
origin: Option<Origin>,
) -> Result<(Authorization, Option<OwnedSemaphorePermit>), FrontendErrorResponse> {
@ -338,7 +342,42 @@ pub async fn ip_is_authorized(
x => unimplemented!("rate_limit_by_ip shouldn't ever see these: {:?}", x),
};
// TODO: in the background, add the ip to a recent_users map
let app = app.clone();
// TODO: in the background, add the ip to a recent_ips map
if app.config.public_recent_ips_salt.is_some() {
let f = async move {
let now = Utc::now().timestamp();
let mut redis_conn = app.redis_conn().await?;
let salt = app
.config
.public_recent_ips_salt
.as_ref()
.expect("public_recent_ips_salt must exist in here");
// TODO: how should we salt and hash the ips? do it faster?
let salted_ip = format!("{}:{}", salt, ip);
let hashed_ip = Bytes::from(keccak256(salted_ip.as_bytes()));
let recent_ips_key = format!("recent_ips:{}", app.config.chain_id);
redis_conn
.zadd(recent_ips_key, hashed_ip.to_string(), now)
.await?;
Ok::<_, anyhow::Error>(())
}
.map_err(|err| {
warn!("background update of recent_ips failed: {}", err);
err
});
tokio::spawn(f);
}
Ok((authorization, semaphore))
}

@ -79,8 +79,6 @@ pub async fn serve(port: u16, proxy_app: Arc<Web3ProxyApp>) -> anyhow::Result<()
.route("/user/stats/detailed", get(users::user_stats_detailed_get))
.route("/user/logout", post(users::user_logout_post))
.route("/status", get(status::status))
// TODO: make this optional or remove it since it is available on another port
.route("/prometheus", get(status::prometheus))
// layers are ordered bottom up
// the last layer is first for requests and last for responses
// Mark the `Authorization` request header as sensitive so it doesn't show in logs

@ -30,7 +30,8 @@ pub async fn proxy_web3_rpc(
let authorization = Arc::new(authorization);
let (response, rpcs, _semaphore) = app.proxy_web3_rpc(authorization, payload)
let (response, rpcs, _semaphore) = app
.proxy_web3_rpc(authorization, payload)
.await
.map(|(x, y)| (x, y, semaphore))?;
@ -80,7 +81,8 @@ pub async fn proxy_web3_rpc_with_key(
let authorization = Arc::new(authorization);
let (response, rpcs, _semaphore) = app.proxy_web3_rpc(authorization, payload)
let (response, rpcs, _semaphore) = app
.proxy_web3_rpc(authorization, payload)
.await
.map(|(x, y)| (x, y, semaphore))?;

@ -22,14 +22,6 @@ pub async fn health(Extension(app): Extension<Arc<Web3ProxyApp>>) -> impl IntoRe
}
}
/// Prometheus metrics.
///
/// TODO: when done debugging, remove this and only allow access on a different port
#[debug_handler]
pub async fn prometheus(Extension(app): Extension<Arc<Web3ProxyApp>>) -> impl IntoResponse {
app.prometheus_metrics()
}
/// Very basic status page.
///
/// TODO: replace this with proper stats and monitoring

@ -43,7 +43,7 @@ pub async fn serve(app: Arc<Web3ProxyApp>, port: u16) -> anyhow::Result<()> {
}
async fn root(Extension(app): Extension<Arc<Web3ProxyApp>>) -> Response {
let serialized = app.prometheus_metrics();
let serialized = app.prometheus_metrics().await;
let mut r = serialized.into_response();