From 81c49d08cff22d9082f663e53d946bee6554c98b Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Wed, 28 Dec 2022 22:21:09 -0800 Subject: [PATCH] tx stats too --- TODO.md | 2 + web3_proxy/src/app/mod.rs | 205 +++++++++++++++-------- web3_proxy/src/frontend/authorization.rs | 52 +++--- web3_proxy/src/user_queries.rs | 1 + 4 files changed, 166 insertions(+), 94 deletions(-) diff --git a/TODO.md b/TODO.md index 25beb67c..bd0c65ff 100644 --- a/TODO.md +++ b/TODO.md @@ -584,3 +584,5 @@ in another repo: event subscriber - [ ] some internal requests should go through app.proxy_rpc_request so that they get caching! - be careful not to make an infinite loop - [ ] request timeout messages should include the request id +- [ ] have an upgrade tier that queries multiple backends at once. returns on first Ok result, collects errors. if no Ok, find the most common error and then respond with that +- [ ] give public_recent_ips_salt a better, more general, name \ No newline at end of file diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 351b035e..6edd1ac7 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -34,6 +34,7 @@ use migration::sea_orm::{self, ConnectionTrait, Database, DatabaseConnection}; use migration::sea_query::table::ColumnDef; use migration::{Alias, DbErr, Migrator, MigratorTrait, Table}; use moka::future::Cache; +use redis_rate_limiter::redis::AsyncCommands; use redis_rate_limiter::{redis, DeadpoolRuntime, RedisConfig, RedisPool, RedisRateLimiter}; use serde::Serialize; use serde_json::json; @@ -709,7 +710,7 @@ impl Web3ProxyApp { // TODO: what globals? should this be the hostname or what? // globals.insert("service", "web3_proxy"); - #[derive(Serialize)] + #[derive(Default, Serialize)] struct RecentCounts { one_day: i64, one_hour: i64, @@ -726,87 +727,119 @@ impl Web3ProxyApp { } } - let (recent_ip_counts, recent_user_counts): (RecentCounts, RecentCounts) = - match self.redis_conn().await { - Ok(mut redis_conn) => { - // TODO: delete any hash entries where - const ONE_MINUTE: i64 = 60; - const ONE_HOUR: i64 = ONE_MINUTE * 60; - const ONE_DAY: i64 = ONE_HOUR * 24; + let (recent_ip_counts, recent_user_id_counts, recent_tx_counts): ( + RecentCounts, + RecentCounts, + RecentCounts, + ) = match self.redis_conn().await { + Ok(Some(mut redis_conn)) => { + // TODO: delete any hash entries where + const ONE_MINUTE: i64 = 60; + const ONE_HOUR: i64 = ONE_MINUTE * 60; + const ONE_DAY: i64 = ONE_HOUR * 24; - let one_day_ago = Utc::now().timestamp() - ONE_DAY; - let one_hour_ago = Utc::now().timestamp() - ONE_HOUR; - let one_minute_ago = Utc::now().timestamp() - ONE_MINUTE; + let one_day_ago = Utc::now().timestamp() - ONE_DAY; + let one_hour_ago = Utc::now().timestamp() - ONE_HOUR; + let one_minute_ago = Utc::now().timestamp() - ONE_MINUTE; - let recent_users_by_user = - format!("recent_users:registered:{}", self.config.chain_id); - let recent_users_by_ip = format!("recent_users:ip:{}", self.config.chain_id); + let recent_users_by_id = format!("recent_users:id:{}", self.config.chain_id); + let recent_users_by_ip = format!("recent_users:ip:{}", self.config.chain_id); + let recent_transactions = + format!("eth_sendRawTransaction:{}", self.config.chain_id); - match redis::pipe() - .atomic() - // delete any entries older than 24 hours - .zrembyscore(&recent_users_by_user, i64::MIN, one_day_ago) - .ignore() - .zrembyscore(&recent_users_by_ip, i64::MIN, one_day_ago) - .ignore() - // get count for last day - .zcount(&recent_users_by_user, one_day_ago, i64::MAX) - .zcount(&recent_users_by_ip, one_day_ago, i64::MAX) - // get count for last hour - .zcount(&recent_users_by_user, one_hour_ago, i64::MAX) - .zcount(&recent_users_by_ip, one_hour_ago, i64::MAX) - // get count for last minute - .zcount(&recent_users_by_user, one_minute_ago, i64::MAX) - .zcount(&recent_users_by_ip, one_minute_ago, i64::MAX) - .query_async(&mut redis_conn) - .await - { - Ok(( - day_by_user, - day_by_ip, - hour_by_user, - hour_by_ip, - minute_by_user, - minute_by_ip, - )) => { - let recent_ip_counts = RecentCounts { - one_day: day_by_ip, - one_hour: hour_by_ip, - one_minute: minute_by_ip, - }; - let recent_user_counts = RecentCounts { - one_day: day_by_user, - one_hour: hour_by_user, - one_minute: minute_by_user, - }; + match redis::pipe() + .atomic() + // delete any entries older than 24 hours + .zrembyscore(&recent_users_by_id, i64::MIN, one_day_ago) + .ignore() + .zrembyscore(&recent_users_by_ip, i64::MIN, one_day_ago) + .ignore() + .zrembyscore(&recent_transactions, i64::MIN, one_day_ago) + .ignore() + // get counts for last day + .zcount(&recent_users_by_id, one_day_ago, i64::MAX) + .zcount(&recent_users_by_ip, one_day_ago, i64::MAX) + .zcount(&recent_transactions, one_day_ago, i64::MAX) + // get counts for last hour + .zcount(&recent_users_by_id, one_hour_ago, i64::MAX) + .zcount(&recent_users_by_ip, one_hour_ago, i64::MAX) + .zcount(&recent_transactions, one_hour_ago, i64::MAX) + // get counts for last minute + .zcount(&recent_users_by_id, one_minute_ago, i64::MAX) + .zcount(&recent_users_by_ip, one_minute_ago, i64::MAX) + .zcount(&recent_transactions, one_minute_ago, i64::MAX) + .query_async(&mut redis_conn) + .await + { + Ok(( + user_id_in_day, + ip_in_day, + txs_in_day, + user_id_in_hour, + ip_in_hour, + txs_in_hour, + user_id_in_minute, + ip_in_minute, + txs_in_minute, + )) => { + let recent_user_id_counts = RecentCounts { + one_day: user_id_in_day, + one_hour: user_id_in_hour, + one_minute: user_id_in_minute, + }; + let recent_ip_counts = RecentCounts { + one_day: ip_in_day, + one_hour: ip_in_hour, + one_minute: ip_in_minute, + }; + let recent_tx_counts = RecentCounts { + one_day: txs_in_day, + one_hour: txs_in_hour, + one_minute: txs_in_minute, + }; - (recent_ip_counts, recent_user_counts) - } - Err(err) => { - warn!("unable to count recent users: {}", err); - (RecentCounts::for_err(), RecentCounts::for_err()) - } + (recent_ip_counts, recent_user_id_counts, recent_tx_counts) + } + Err(err) => { + warn!("unable to count recent users: {}", err); + ( + RecentCounts::for_err(), + RecentCounts::for_err(), + RecentCounts::for_err(), + ) } } - Err(err) => { - warn!("unable to connect to redis while counting users: {:?}", err); - (RecentCounts::for_err(), RecentCounts::for_err()) - } - }; + } + Ok(None) => ( + RecentCounts::default(), + RecentCounts::default(), + RecentCounts::default(), + ), + Err(err) => { + warn!("unable to connect to redis while counting users: {:?}", err); + ( + RecentCounts::for_err(), + RecentCounts::for_err(), + RecentCounts::for_err(), + ) + } + }; #[derive(Serialize)] struct CombinedMetrics<'a> { app: &'a Web3ProxyAppMetrics, backend_rpc: &'a OpenRequestHandleMetrics, recent_ip_counts: RecentCounts, - recent_user_counts: RecentCounts, + recent_user_id_counts: RecentCounts, + recent_tx_counts: RecentCounts, } let metrics = CombinedMetrics { app: &self.app_metrics, backend_rpc: &self.open_request_handle_metrics, recent_ip_counts, - recent_user_counts, + recent_user_id_counts, + recent_tx_counts, }; serde_prometheus::to_string(&metrics, Some("web3_proxy"), globals) @@ -898,13 +931,14 @@ impl Web3ProxyApp { self.db_replica.clone() } - pub async fn redis_conn(&self) -> anyhow::Result { + pub async fn redis_conn(&self) -> anyhow::Result> { match self.vredis_pool.as_ref() { - None => Err(anyhow::anyhow!("no redis server configured")), + // TODO: don't do an error. return None + None => Ok(None), Some(redis_pool) => { let redis_conn = redis_pool.get().await?; - Ok(redis_conn) + Ok(Some(redis_conn)) } } } @@ -1136,7 +1170,42 @@ impl Web3ProxyApp { let rpcs = request_metadata.backend_requests.lock().clone(); - // TODO! STATS! + if let Some(salt) = self.config.public_recent_ips_salt.as_ref() { + if let Some(tx_hash) = response.result.clone() { + let now = Utc::now().timestamp(); + let salt = salt.clone(); + let app = self.clone(); + + let f = async move { + match app.redis_conn().await { + Ok(Some(mut redis_conn)) => { + let salted_tx_hash = format!("{}:{}", salt, tx_hash); + + let hashed_tx_hash = + Bytes::from(keccak256(salted_tx_hash.as_bytes())); + + let recent_tx_hash_key = + format!("eth_sendRawTransaction:{}", app.config.chain_id); + + redis_conn + .zadd(recent_tx_hash_key, hashed_tx_hash.to_string(), now) + .await?; + } + Ok(None) => {} + Err(err) => { + warn!( + "unable to save stats for eth_sendRawTransaction: {:?}", + err + ) + } + } + + Ok::<_, anyhow::Error>(()) + }; + + tokio::spawn(f); + } + } return Ok((response, rpcs)); } diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index 051c8bb6..335ba3f4 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -348,23 +348,23 @@ pub async fn ip_is_authorized( let f = async move { let now = Utc::now().timestamp(); - let mut redis_conn = app.redis_conn().await?; + if let Some(mut redis_conn) = app.redis_conn().await? { + let salt = app + .config + .public_recent_ips_salt + .as_ref() + .expect("public_recent_ips_salt must exist in here"); - let salt = app - .config - .public_recent_ips_salt - .as_ref() - .expect("public_recent_ips_salt must exist in here"); + let salted_ip = format!("{}:{}", salt, ip); - let salted_ip = format!("{}:{}", salt, ip); + let hashed_ip = Bytes::from(keccak256(salted_ip.as_bytes())); - let hashed_ip = Bytes::from(keccak256(salted_ip.as_bytes())); + let recent_ip_key = format!("recent_users:ip:{}", app.config.chain_id); - let recent_ip_key = format!("recent_users:ip:{}", app.config.chain_id); - - redis_conn - .zadd(recent_ip_key, hashed_ip.to_string(), now) - .await?; + redis_conn + .zadd(recent_ip_key, hashed_ip.to_string(), now) + .await?; + }; Ok::<_, anyhow::Error>(()) } @@ -410,23 +410,23 @@ pub async fn key_is_authorized( let f = async move { let now = Utc::now().timestamp(); - let mut redis_conn = app.redis_conn().await?; + if let Some(mut redis_conn) = app.redis_conn().await? { + let salt = app + .config + .public_recent_ips_salt + .as_ref() + .expect("public_recent_ips_salt must exist in here"); - let salt = app - .config - .public_recent_ips_salt - .as_ref() - .expect("public_recent_ips_salt must exist in here"); + let salted_user_id = format!("{}:{}", salt, user_id); - let salted_user_id = format!("{}:{}", salt, user_id); + let hashed_user_id = Bytes::from(keccak256(salted_user_id.as_bytes())); - let hashed_user_id = Bytes::from(keccak256(salted_user_id.as_bytes())); + let recent_user_id_key = format!("recent_users:registered:{}", app.config.chain_id); - let recent_user_id_key = format!("recent_users:registered:{}", app.config.chain_id); - - redis_conn - .zadd(recent_user_id_key, hashed_user_id.to_string(), now) - .await?; + redis_conn + .zadd(recent_user_id_key, hashed_user_id.to_string(), now) + .await?; + } Ok::<_, anyhow::Error>(()) } diff --git a/web3_proxy/src/user_queries.rs b/web3_proxy/src/user_queries.rs index c3e16177..fdf87249 100644 --- a/web3_proxy/src/user_queries.rs +++ b/web3_proxy/src/user_queries.rs @@ -268,6 +268,7 @@ pub async fn query_user_stats<'a>( let mut redis_conn = app .redis_conn() .await + .context("query_user_stats had a redis connection error")? .context("query_user_stats needs a redis")?; // get the user id first. if it is 0, we should use a cache on the app