tx stats too

This commit is contained in:
Bryan Stitt 2022-12-28 22:21:09 -08:00
parent 8868c35e95
commit 81c49d08cf
4 changed files with 166 additions and 94 deletions

@ -584,3 +584,5 @@ in another repo: event subscriber
- [ ] some internal requests should go through app.proxy_rpc_request so that they get caching!
- be careful not to make an infinite loop
- [ ] request timeout messages should include the request id
- [ ] have an upgrade tier that queries multiple backends at once. returns on first Ok result, collects errors. if no Ok, find the most common error and then respond with that
- [ ] give public_recent_ips_salt a better, more general, name

@ -34,6 +34,7 @@ use migration::sea_orm::{self, ConnectionTrait, Database, DatabaseConnection};
use migration::sea_query::table::ColumnDef;
use migration::{Alias, DbErr, Migrator, MigratorTrait, Table};
use moka::future::Cache;
use redis_rate_limiter::redis::AsyncCommands;
use redis_rate_limiter::{redis, DeadpoolRuntime, RedisConfig, RedisPool, RedisRateLimiter};
use serde::Serialize;
use serde_json::json;
@ -709,7 +710,7 @@ impl Web3ProxyApp {
// TODO: what globals? should this be the hostname or what?
// globals.insert("service", "web3_proxy");
#[derive(Serialize)]
#[derive(Default, Serialize)]
struct RecentCounts {
one_day: i64,
one_hour: i64,
@ -726,87 +727,119 @@ impl Web3ProxyApp {
}
}
let (recent_ip_counts, recent_user_counts): (RecentCounts, RecentCounts) =
match self.redis_conn().await {
Ok(mut redis_conn) => {
// TODO: delete any hash entries where
const ONE_MINUTE: i64 = 60;
const ONE_HOUR: i64 = ONE_MINUTE * 60;
const ONE_DAY: i64 = ONE_HOUR * 24;
let (recent_ip_counts, recent_user_id_counts, recent_tx_counts): (
RecentCounts,
RecentCounts,
RecentCounts,
) = match self.redis_conn().await {
Ok(Some(mut redis_conn)) => {
// TODO: delete any hash entries where
const ONE_MINUTE: i64 = 60;
const ONE_HOUR: i64 = ONE_MINUTE * 60;
const ONE_DAY: i64 = ONE_HOUR * 24;
let one_day_ago = Utc::now().timestamp() - ONE_DAY;
let one_hour_ago = Utc::now().timestamp() - ONE_HOUR;
let one_minute_ago = Utc::now().timestamp() - ONE_MINUTE;
let one_day_ago = Utc::now().timestamp() - ONE_DAY;
let one_hour_ago = Utc::now().timestamp() - ONE_HOUR;
let one_minute_ago = Utc::now().timestamp() - ONE_MINUTE;
let recent_users_by_user =
format!("recent_users:registered:{}", self.config.chain_id);
let recent_users_by_ip = format!("recent_users:ip:{}", self.config.chain_id);
let recent_users_by_id = format!("recent_users:id:{}", self.config.chain_id);
let recent_users_by_ip = format!("recent_users:ip:{}", self.config.chain_id);
let recent_transactions =
format!("eth_sendRawTransaction:{}", self.config.chain_id);
match redis::pipe()
.atomic()
// delete any entries older than 24 hours
.zrembyscore(&recent_users_by_user, i64::MIN, one_day_ago)
.ignore()
.zrembyscore(&recent_users_by_ip, i64::MIN, one_day_ago)
.ignore()
// get count for last day
.zcount(&recent_users_by_user, one_day_ago, i64::MAX)
.zcount(&recent_users_by_ip, one_day_ago, i64::MAX)
// get count for last hour
.zcount(&recent_users_by_user, one_hour_ago, i64::MAX)
.zcount(&recent_users_by_ip, one_hour_ago, i64::MAX)
// get count for last minute
.zcount(&recent_users_by_user, one_minute_ago, i64::MAX)
.zcount(&recent_users_by_ip, one_minute_ago, i64::MAX)
.query_async(&mut redis_conn)
.await
{
Ok((
day_by_user,
day_by_ip,
hour_by_user,
hour_by_ip,
minute_by_user,
minute_by_ip,
)) => {
let recent_ip_counts = RecentCounts {
one_day: day_by_ip,
one_hour: hour_by_ip,
one_minute: minute_by_ip,
};
let recent_user_counts = RecentCounts {
one_day: day_by_user,
one_hour: hour_by_user,
one_minute: minute_by_user,
};
match redis::pipe()
.atomic()
// delete any entries older than 24 hours
.zrembyscore(&recent_users_by_id, i64::MIN, one_day_ago)
.ignore()
.zrembyscore(&recent_users_by_ip, i64::MIN, one_day_ago)
.ignore()
.zrembyscore(&recent_transactions, i64::MIN, one_day_ago)
.ignore()
// get counts for last day
.zcount(&recent_users_by_id, one_day_ago, i64::MAX)
.zcount(&recent_users_by_ip, one_day_ago, i64::MAX)
.zcount(&recent_transactions, one_day_ago, i64::MAX)
// get counts for last hour
.zcount(&recent_users_by_id, one_hour_ago, i64::MAX)
.zcount(&recent_users_by_ip, one_hour_ago, i64::MAX)
.zcount(&recent_transactions, one_hour_ago, i64::MAX)
// get counts for last minute
.zcount(&recent_users_by_id, one_minute_ago, i64::MAX)
.zcount(&recent_users_by_ip, one_minute_ago, i64::MAX)
.zcount(&recent_transactions, one_minute_ago, i64::MAX)
.query_async(&mut redis_conn)
.await
{
Ok((
user_id_in_day,
ip_in_day,
txs_in_day,
user_id_in_hour,
ip_in_hour,
txs_in_hour,
user_id_in_minute,
ip_in_minute,
txs_in_minute,
)) => {
let recent_user_id_counts = RecentCounts {
one_day: user_id_in_day,
one_hour: user_id_in_hour,
one_minute: user_id_in_minute,
};
let recent_ip_counts = RecentCounts {
one_day: ip_in_day,
one_hour: ip_in_hour,
one_minute: ip_in_minute,
};
let recent_tx_counts = RecentCounts {
one_day: txs_in_day,
one_hour: txs_in_hour,
one_minute: txs_in_minute,
};
(recent_ip_counts, recent_user_counts)
}
Err(err) => {
warn!("unable to count recent users: {}", err);
(RecentCounts::for_err(), RecentCounts::for_err())
}
(recent_ip_counts, recent_user_id_counts, recent_tx_counts)
}
Err(err) => {
warn!("unable to count recent users: {}", err);
(
RecentCounts::for_err(),
RecentCounts::for_err(),
RecentCounts::for_err(),
)
}
}
Err(err) => {
warn!("unable to connect to redis while counting users: {:?}", err);
(RecentCounts::for_err(), RecentCounts::for_err())
}
};
}
Ok(None) => (
RecentCounts::default(),
RecentCounts::default(),
RecentCounts::default(),
),
Err(err) => {
warn!("unable to connect to redis while counting users: {:?}", err);
(
RecentCounts::for_err(),
RecentCounts::for_err(),
RecentCounts::for_err(),
)
}
};
#[derive(Serialize)]
struct CombinedMetrics<'a> {
app: &'a Web3ProxyAppMetrics,
backend_rpc: &'a OpenRequestHandleMetrics,
recent_ip_counts: RecentCounts,
recent_user_counts: RecentCounts,
recent_user_id_counts: RecentCounts,
recent_tx_counts: RecentCounts,
}
let metrics = CombinedMetrics {
app: &self.app_metrics,
backend_rpc: &self.open_request_handle_metrics,
recent_ip_counts,
recent_user_counts,
recent_user_id_counts,
recent_tx_counts,
};
serde_prometheus::to_string(&metrics, Some("web3_proxy"), globals)
@ -898,13 +931,14 @@ impl Web3ProxyApp {
self.db_replica.clone()
}
pub async fn redis_conn(&self) -> anyhow::Result<redis_rate_limiter::RedisConnection> {
pub async fn redis_conn(&self) -> anyhow::Result<Option<redis_rate_limiter::RedisConnection>> {
match self.vredis_pool.as_ref() {
None => Err(anyhow::anyhow!("no redis server configured")),
// TODO: don't do an error. return None
None => Ok(None),
Some(redis_pool) => {
let redis_conn = redis_pool.get().await?;
Ok(redis_conn)
Ok(Some(redis_conn))
}
}
}
@ -1136,7 +1170,42 @@ impl Web3ProxyApp {
let rpcs = request_metadata.backend_requests.lock().clone();
// TODO! STATS!
if let Some(salt) = self.config.public_recent_ips_salt.as_ref() {
if let Some(tx_hash) = response.result.clone() {
let now = Utc::now().timestamp();
let salt = salt.clone();
let app = self.clone();
let f = async move {
match app.redis_conn().await {
Ok(Some(mut redis_conn)) => {
let salted_tx_hash = format!("{}:{}", salt, tx_hash);
let hashed_tx_hash =
Bytes::from(keccak256(salted_tx_hash.as_bytes()));
let recent_tx_hash_key =
format!("eth_sendRawTransaction:{}", app.config.chain_id);
redis_conn
.zadd(recent_tx_hash_key, hashed_tx_hash.to_string(), now)
.await?;
}
Ok(None) => {}
Err(err) => {
warn!(
"unable to save stats for eth_sendRawTransaction: {:?}",
err
)
}
}
Ok::<_, anyhow::Error>(())
};
tokio::spawn(f);
}
}
return Ok((response, rpcs));
}

@ -348,23 +348,23 @@ pub async fn ip_is_authorized(
let f = async move {
let now = Utc::now().timestamp();
let mut redis_conn = app.redis_conn().await?;
if let Some(mut redis_conn) = app.redis_conn().await? {
let salt = app
.config
.public_recent_ips_salt
.as_ref()
.expect("public_recent_ips_salt must exist in here");
let salt = app
.config
.public_recent_ips_salt
.as_ref()
.expect("public_recent_ips_salt must exist in here");
let salted_ip = format!("{}:{}", salt, ip);
let salted_ip = format!("{}:{}", salt, ip);
let hashed_ip = Bytes::from(keccak256(salted_ip.as_bytes()));
let hashed_ip = Bytes::from(keccak256(salted_ip.as_bytes()));
let recent_ip_key = format!("recent_users:ip:{}", app.config.chain_id);
let recent_ip_key = format!("recent_users:ip:{}", app.config.chain_id);
redis_conn
.zadd(recent_ip_key, hashed_ip.to_string(), now)
.await?;
redis_conn
.zadd(recent_ip_key, hashed_ip.to_string(), now)
.await?;
};
Ok::<_, anyhow::Error>(())
}
@ -410,23 +410,23 @@ pub async fn key_is_authorized(
let f = async move {
let now = Utc::now().timestamp();
let mut redis_conn = app.redis_conn().await?;
if let Some(mut redis_conn) = app.redis_conn().await? {
let salt = app
.config
.public_recent_ips_salt
.as_ref()
.expect("public_recent_ips_salt must exist in here");
let salt = app
.config
.public_recent_ips_salt
.as_ref()
.expect("public_recent_ips_salt must exist in here");
let salted_user_id = format!("{}:{}", salt, user_id);
let salted_user_id = format!("{}:{}", salt, user_id);
let hashed_user_id = Bytes::from(keccak256(salted_user_id.as_bytes()));
let hashed_user_id = Bytes::from(keccak256(salted_user_id.as_bytes()));
let recent_user_id_key = format!("recent_users:registered:{}", app.config.chain_id);
let recent_user_id_key = format!("recent_users:registered:{}", app.config.chain_id);
redis_conn
.zadd(recent_user_id_key, hashed_user_id.to_string(), now)
.await?;
redis_conn
.zadd(recent_user_id_key, hashed_user_id.to_string(), now)
.await?;
}
Ok::<_, anyhow::Error>(())
}

@ -268,6 +268,7 @@ pub async fn query_user_stats<'a>(
let mut redis_conn = app
.redis_conn()
.await
.context("query_user_stats had a redis connection error")?
.context("query_user_stats needs a redis")?;
// get the user id first. if it is 0, we should use a cache on the app