more stats and rate limit on user, not key

This commit is contained in:
Bryan Stitt 2022-12-28 13:50:34 -08:00
parent 3a8f30f87d
commit 68183c34c7
3 changed files with 176 additions and 72 deletions

@ -6,10 +6,9 @@ use crate::block_number::{block_needed, BlockNeeded};
use crate::config::{AppConfig, TopConfig};
use crate::frontend::authorization::{Authorization, RequestMetadata};
use crate::frontend::errors::FrontendErrorResponse;
use crate::jsonrpc::JsonRpcForwardedResponse;
use crate::jsonrpc::JsonRpcForwardedResponseEnum;
use crate::jsonrpc::JsonRpcRequest;
use crate::jsonrpc::JsonRpcRequestEnum;
use crate::jsonrpc::{
JsonRpcForwardedResponse, JsonRpcForwardedResponseEnum, JsonRpcRequest, JsonRpcRequestEnum,
};
use crate::rpcs::blockchain::{ArcBlock, SavedBlock};
use crate::rpcs::connection::Web3Connection;
use crate::rpcs::connections::Web3Connections;
@ -23,23 +22,19 @@ use deferred_rate_limiter::DeferredRateLimiter;
use derive_more::From;
use entities::sea_orm_active_enums::LogLevel;
use ethers::core::utils::keccak256;
use ethers::prelude::{Address, Block, Bytes, TxHash, H256, U64};
use ethers::types::Transaction;
use ethers::prelude::{Address, Block, Bytes, Transaction, TxHash, H256, U64};
use ethers::utils::rlp::{Decodable, Rlp};
use futures::future::join_all;
use futures::stream::FuturesUnordered;
use futures::stream::StreamExt;
use futures::stream::{FuturesUnordered, StreamExt};
use hashbrown::{HashMap, HashSet};
use ipnet::IpNet;
use log::Level;
use log::{debug, error, info, warn};
use log::{debug, error, info, warn, Level};
use metered::{metered, ErrorCount, HitCount, ResponseTime, Throughput};
use migration::sea_orm::{self, ConnectionTrait, Database, DatabaseConnection};
use migration::sea_query::table::ColumnDef;
use migration::{Alias, DbErr, Migrator, MigratorTrait, Table};
use moka::future::Cache;
use redis_rate_limiter::redis::AsyncCommands;
use redis_rate_limiter::{DeadpoolRuntime, RedisConfig, RedisPool, RedisRateLimiter};
use redis_rate_limiter::{redis, DeadpoolRuntime, RedisConfig, RedisPool, RedisRateLimiter};
use serde::Serialize;
use serde_json::json;
use serde_json::value::to_raw_value;
@ -48,8 +43,7 @@ use std::hash::{Hash, Hasher};
use std::net::IpAddr;
use std::num::NonZeroU64;
use std::str::FromStr;
use std::sync::atomic;
use std::sync::Arc;
use std::sync::{atomic, Arc};
use std::time::Duration;
use tokio::sync::{broadcast, watch, Semaphore};
use tokio::task::JoinHandle;
@ -194,14 +188,13 @@ pub struct Web3ProxyApp {
/// store pending transactions that we've seen so that we don't send duplicates to subscribers
pub pending_transactions: Cache<TxHash, TxStatus, hashbrown::hash_map::DefaultHashBuilder>,
pub frontend_ip_rate_limiter: Option<DeferredRateLimiter<IpAddr>>,
// TODO: this key should be our RpcSecretKey class, not Ulid
pub frontend_key_rate_limiter: Option<DeferredRateLimiter<Ulid>>,
pub frontend_registered_user_rate_limiter: Option<DeferredRateLimiter<u64>>,
pub login_rate_limiter: Option<RedisRateLimiter>,
pub vredis_pool: Option<RedisPool>,
// TODO: this key should be our RpcSecretKey class, not Ulid
pub rpc_secret_key_cache:
Cache<Ulid, AuthorizationChecks, hashbrown::hash_map::DefaultHashBuilder>,
pub rpc_key_semaphores:
pub registered_user_semaphores:
Cache<NonZeroU64, Arc<Semaphore>, hashbrown::hash_map::DefaultHashBuilder>,
pub ip_semaphores: Cache<IpAddr, Arc<Semaphore>, hashbrown::hash_map::DefaultHashBuilder>,
pub bearer_token_semaphores:
@ -346,11 +339,17 @@ impl Web3ProxyApp {
}
if !top_config.extra.is_empty() {
warn!("unknown TopConfig fields!: {:?}", top_config.app.extra.keys());
warn!(
"unknown TopConfig fields!: {:?}",
top_config.app.extra.keys()
);
}
if !top_config.app.extra.is_empty() {
warn!("unknown Web3ProxyAppConfig fields!: {:?}", top_config.app.extra.keys());
warn!(
"unknown Web3ProxyAppConfig fields!: {:?}",
top_config.app.extra.keys()
);
}
// setup metrics
@ -598,7 +597,7 @@ impl Web3ProxyApp {
// create rate limiters
// these are optional. they require redis
let mut frontend_ip_rate_limiter = None;
let mut frontend_key_rate_limiter = None;
let mut frontend_registered_user_rate_limiter = None;
let mut login_rate_limiter = None;
if let Some(redis_pool) = vredis_pool.as_ref() {
@ -623,7 +622,7 @@ impl Web3ProxyApp {
rpc_rrl.clone(),
None,
));
frontend_key_rate_limiter = Some(DeferredRateLimiter::<Ulid>::new(
frontend_registered_user_rate_limiter = Some(DeferredRateLimiter::<u64>::new(
10_000, "key", rpc_rrl, None,
));
@ -673,7 +672,7 @@ impl Web3ProxyApp {
let ip_semaphores = Cache::builder()
.time_to_idle(Duration::from_secs(120))
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default());
let rpc_key_semaphores = Cache::builder()
let registered_user_semaphores = Cache::builder()
.time_to_idle(Duration::from_secs(120))
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default());
@ -686,7 +685,7 @@ impl Web3ProxyApp {
pending_tx_sender,
pending_transactions,
frontend_ip_rate_limiter,
frontend_key_rate_limiter,
frontend_registered_user_rate_limiter,
login_rate_limiter,
db_conn,
db_replica,
@ -696,7 +695,7 @@ impl Web3ProxyApp {
rpc_secret_key_cache,
bearer_token_semaphores,
ip_semaphores,
rpc_key_semaphores,
registered_user_semaphores,
stat_sender,
};
@ -711,47 +710,103 @@ impl Web3ProxyApp {
// globals.insert("service", "web3_proxy");
#[derive(Serialize)]
struct RecentIps(i64);
struct RecentCounts {
one_day: i64,
one_hour: i64,
one_minute: i64,
}
let recent_ips = if let Ok(mut redis_conn) = self.redis_conn().await {
// TODO: delete any hash entries where
const ONE_DAY: i64 = 86400;
let oldest = Utc::now().timestamp() - ONE_DAY;
let recent_ips_key = format!("recent_ips:{}", self.config.chain_id);
// delete any entries that are too old
// TODO: pipe
if let Err(err) = redis_conn
.zrembyscore::<_, _, _, u64>(recent_ips_key.clone(), i64::MIN, oldest)
.await
{
warn!("unable to clear recent_ips: {}", err);
}
match redis_conn.zcount(recent_ips_key, i64::MIN, i64::MAX).await {
Ok(count) => RecentIps(count),
Err(err) => {
warn!("unable to count recent_ips: {}", err);
RecentIps(-1)
impl RecentCounts {
fn for_err() -> Self {
Self {
one_day: -1,
one_hour: -1,
one_minute: -1,
}
}
} else {
RecentIps(-1)
};
}
let (recent_ip_counts, recent_user_counts): (RecentCounts, RecentCounts) =
match self.redis_conn().await {
Ok(mut redis_conn) => {
// TODO: delete any hash entries where
const ONE_MINUTE: i64 = 60;
const ONE_HOUR: i64 = ONE_MINUTE * 60;
const ONE_DAY: i64 = ONE_HOUR * 24;
let one_day_ago = Utc::now().timestamp() - ONE_DAY;
let one_hour_ago = Utc::now().timestamp() - ONE_HOUR;
let one_minute_ago = Utc::now().timestamp() - ONE_MINUTE;
let recent_users_by_user =
format!("recent_users:registered:{}", self.config.chain_id);
let recent_users_by_ip = format!("recent_users:ip:{}", self.config.chain_id);
match redis::pipe()
.atomic()
// delete any entries older than 24 hours
.zrembyscore(&recent_users_by_user, i64::MIN, one_day_ago)
.ignore()
.zrembyscore(&recent_users_by_ip, i64::MIN, one_day_ago)
.ignore()
// get count for last day
.zcount(&recent_users_by_user, one_day_ago, i64::MAX)
.zcount(&recent_users_by_ip, one_day_ago, i64::MAX)
// get count for last hour
.zcount(&recent_users_by_user, one_hour_ago, i64::MAX)
.zcount(&recent_users_by_ip, one_hour_ago, i64::MAX)
// get count for last minute
.zcount(&recent_users_by_user, one_minute_ago, i64::MAX)
.zcount(&recent_users_by_ip, one_minute_ago, i64::MAX)
.query_async(&mut redis_conn)
.await
{
Ok((
day_by_user,
day_by_ip,
hour_by_user,
hour_by_ip,
minute_by_user,
minute_by_ip,
)) => {
let recent_ip_counts = RecentCounts {
one_day: day_by_ip,
one_hour: hour_by_ip,
one_minute: minute_by_ip,
};
let recent_user_counts = RecentCounts {
one_day: day_by_user,
one_hour: hour_by_user,
one_minute: minute_by_user,
};
(recent_ip_counts, recent_user_counts)
}
Err(err) => {
warn!("unable to count recent users: {}", err);
(RecentCounts::for_err(), RecentCounts::for_err())
}
}
}
Err(err) => {
warn!("unable to connect to redis while counting users: {:?}", err);
(RecentCounts::for_err(), RecentCounts::for_err())
}
};
#[derive(Serialize)]
struct CombinedMetrics<'a> {
app: &'a Web3ProxyAppMetrics,
backend_rpc: &'a OpenRequestHandleMetrics,
recent_ips: RecentIps,
recent_ip_counts: RecentCounts,
recent_user_counts: RecentCounts,
}
let metrics = CombinedMetrics {
app: &self.app_metrics,
backend_rpc: &self.open_request_handle_metrics,
recent_ips,
recent_ip_counts,
recent_user_counts,
};
serde_prometheus::to_string(&metrics, Some("web3_proxy"), globals)

@ -45,7 +45,7 @@ pub struct TopConfig {
// TODO: instead of an option, give it a default
pub private_rpcs: Option<HashMap<String, Web3ConnectionConfig>>,
/// unknown config options get put here
#[serde(flatten, default="HashMap::default")]
#[serde(flatten, default = "HashMap::default")]
pub extra: HashMap<String, serde_json::Value>,
}
@ -148,7 +148,7 @@ pub struct AppConfig {
pub volatile_redis_max_connections: Option<usize>,
/// unknown config options get put here
#[serde(flatten, default="HashMap::default")]
#[serde(flatten, default = "HashMap::default")]
pub extra: HashMap<String, serde_json::Value>,
}
@ -206,7 +206,7 @@ pub struct Web3ConnectionConfig {
#[serde(default)]
pub subscribe_txs: Option<bool>,
/// unknown config options get put here
#[serde(flatten, default="HashMap::default")]
#[serde(flatten, default = "HashMap::default")]
pub extra: HashMap<String, serde_json::Value>,
}
@ -232,7 +232,10 @@ impl Web3ConnectionConfig {
open_request_handle_metrics: Arc<OpenRequestHandleMetrics>,
) -> anyhow::Result<(Arc<Web3Connection>, AnyhowJoinHandle<()>)> {
if !self.extra.is_empty() {
warn!("unknown Web3ConnectionConfig fields!: {:?}", self.extra.keys());
warn!(
"unknown Web3ConnectionConfig fields!: {:?}",
self.extra.keys()
);
}
let hard_limit = match (self.hard_limit, redis_pool) {

@ -342,10 +342,9 @@ pub async fn ip_is_authorized(
x => unimplemented!("rate_limit_by_ip shouldn't ever see these: {:?}", x),
};
let app = app.clone();
// TODO: in the background, add the ip to a recent_ips map
// in the background, add the ip to a recent_users map
if app.config.public_recent_ips_salt.is_some() {
let app = app.clone();
let f = async move {
let now = Utc::now().timestamp();
@ -357,21 +356,20 @@ pub async fn ip_is_authorized(
.as_ref()
.expect("public_recent_ips_salt must exist in here");
// TODO: how should we salt and hash the ips? do it faster?
let salted_ip = format!("{}:{}", salt, ip);
let hashed_ip = Bytes::from(keccak256(salted_ip.as_bytes()));
let recent_ips_key = format!("recent_ips:{}", app.config.chain_id);
let recent_ip_key = format!("recent_users:ip:{}", app.config.chain_id);
redis_conn
.zadd(recent_ips_key, hashed_ip.to_string(), now)
.zadd(recent_ip_key, hashed_ip.to_string(), now)
.await?;
Ok::<_, anyhow::Error>(())
}
.map_err(|err| {
warn!("background update of recent_ips failed: {}", err);
warn!("background update of recent_users:ip failed: {}", err);
err
});
@ -384,7 +382,7 @@ pub async fn ip_is_authorized(
/// like app.rate_limit_by_rpc_key but converts to a FrontendErrorResponse;
pub async fn key_is_authorized(
app: &Web3ProxyApp,
app: &Arc<Web3ProxyApp>,
rpc_key: RpcSecretKey,
ip: IpAddr,
origin: Option<Origin>,
@ -404,6 +402,43 @@ pub async fn key_is_authorized(
RateLimitResult::UnknownKey => return Err(FrontendErrorResponse::UnknownKey),
};
// TODO: DRY and maybe optimize the hashing
// in the background, add the ip to a recent_users map
if app.config.public_recent_ips_salt.is_some() {
let app = app.clone();
let user_id = authorization.checks.user_id;
let f = async move {
let now = Utc::now().timestamp();
let mut redis_conn = app.redis_conn().await?;
let salt = app
.config
.public_recent_ips_salt
.as_ref()
.expect("public_recent_ips_salt must exist in here");
let salted_user_id = format!("{}:{}", salt, user_id);
let hashed_user_id = Bytes::from(keccak256(salted_user_id.as_bytes()));
let recent_user_id_key = format!("recent_users:registered:{}", app.config.chain_id);
redis_conn
.zadd(recent_user_id_key, hashed_user_id.to_string(), now)
.await?;
Ok::<_, anyhow::Error>(())
}
.map_err(|err| {
warn!("background update of recent_users:ip failed: {}", err);
err
});
tokio::spawn(f);
}
Ok((authorization, semaphore))
}
@ -434,18 +469,21 @@ impl Web3ProxyApp {
}
/// Limit the number of concurrent requests from the given rpc key.
pub async fn rpc_key_semaphore(
pub async fn registered_user_semaphore(
&self,
authorization_checks: &AuthorizationChecks,
) -> anyhow::Result<Option<OwnedSemaphorePermit>> {
if let Some(max_concurrent_requests) = authorization_checks.max_concurrent_requests {
let rpc_key_id = authorization_checks.rpc_key_id.context("no rpc_key_id")?;
let user_id = authorization_checks
.user_id
.try_into()
.context("user ids should always be non-zero")?;
let semaphore = self
.rpc_key_semaphores
.get_with(rpc_key_id, async move {
.registered_user_semaphores
.get_with(user_id, async move {
let s = Semaphore::new(max_concurrent_requests as usize);
// // trace!("new semaphore for rpc_key_id {}", rpc_key_id);
// trace!("new semaphore for user_id {}", user_id);
Arc::new(s)
})
.await;
@ -739,9 +777,13 @@ impl Web3ProxyApp {
return Ok(RateLimitResult::UnknownKey);
}
// TODO: rpc_key should have an option to rate limit by ip instead of by key
// only allow this rpc_key to run a limited amount of concurrent requests
// TODO: rate limit should be BEFORE the semaphore!
let semaphore = self.rpc_key_semaphore(&authorization_checks).await?;
let semaphore = self
.registered_user_semaphore(&authorization_checks)
.await?;
let authorization = Authorization::try_new(
authorization_checks,
@ -761,9 +803,13 @@ impl Web3ProxyApp {
};
// user key is valid. now check rate limits
if let Some(rate_limiter) = &self.frontend_key_rate_limiter {
if let Some(rate_limiter) = &self.frontend_registered_user_rate_limiter {
match rate_limiter
.throttle(rpc_key.into(), Some(user_max_requests_per_period), 1)
.throttle(
authorization.checks.user_id,
Some(user_max_requests_per_period),
1,
)
.await
{
Ok(DeferredRateLimitResult::Allowed) => {