From 0a60ccd95e3429374ff427e1fb6408a141362cf7 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Thu, 15 Dec 2022 22:32:58 -0800 Subject: [PATCH] query_user_stats caching --- TODO.md | 3 + redis-rate-limiter/src/lib.rs | 1 - web3_proxy/src/frontend/users.rs | 28 ++++-- web3_proxy/src/user_queries.rs | 145 ++++++++++++++++++++++++------- 4 files changed, 136 insertions(+), 41 deletions(-) diff --git a/TODO.md b/TODO.md index 090f7849..758db189 100644 --- a/TODO.md +++ b/TODO.md @@ -261,6 +261,9 @@ These are roughly in order of completition These are not yet ordered. There might be duplicates. We might not actually need all of these. +- [ ] rate limiting/throttling on query_user_stats +- [ ] minimum allowed query_start on query_user_stats +- [ ] query_user_stats cache hit rate - [ ] having the whole block in status is very verbose. trim it down - [ ] `cost estimate` script - sum bytes and number of requests. prompt hosting costs. divide diff --git a/redis-rate-limiter/src/lib.rs b/redis-rate-limiter/src/lib.rs index ced90004..3f2fd0f3 100644 --- a/redis-rate-limiter/src/lib.rs +++ b/redis-rate-limiter/src/lib.rs @@ -48,7 +48,6 @@ impl RedisRateLimiter { pub fn now_as_secs(&self) -> f32 { // TODO: if system time doesn't match redis, this won't work great - // TODO: now that we fixed SystemTime::now() .duration_since(UNIX_EPOCH) .expect("cannot tell the time") diff --git a/web3_proxy/src/frontend/users.rs b/web3_proxy/src/frontend/users.rs index 4d5925df..9bfb8a45 100644 --- a/web3_proxy/src/frontend/users.rs +++ b/web3_proxy/src/frontend/users.rs @@ -736,7 +736,7 @@ pub async fn user_revert_logs_get( let page = get_page_from_params(¶ms)?; // TODO: page size from config - let page_size = 200; + let page_size = 1_000; let mut response = HashMap::new(); @@ -756,20 +756,30 @@ pub async fn user_revert_logs_get( // TODO: only select the ids let uks: Vec<_> = uks.into_iter().map(|x| x.id).collect(); - // get paginated logs - let q = revert_log::Entity::find() + // get revert logs + let mut q = revert_log::Entity::find() .filter(revert_log::Column::Timestamp.gte(query_start)) .filter(revert_log::Column::RpcKeyId.is_in(uks)) .order_by_asc(revert_log::Column::Timestamp); - let q = if chain_id == 0 { + if chain_id == 0 { // don't do anything - q } else { // filter on chain id - q.filter(revert_log::Column::ChainId.eq(chain_id)) - }; + q = q.filter(revert_log::Column::ChainId.eq(chain_id)) + } + // query the database for number of items and pages + let pages_result = q + .clone() + .paginate(&db_conn, page_size) + .num_items_and_pages() + .await?; + + response.insert("num_items", pages_result.number_of_items.into()); + response.insert("num_pages", pages_result.number_of_pages.into()); + + // query the database for the revert logs let revert_logs = q.paginate(&db_conn, page_size).fetch_page(page).await?; response.insert("revert_logs", json!(revert_logs)); @@ -786,7 +796,7 @@ pub async fn user_stats_aggregated_get( ) -> FrontendResult { let response = query_user_stats(&app, bearer, ¶ms, StatResponse::Aggregated).await?; - Ok(Json(response).into_response()) + Ok(response) } /// `GET /user/stats/detailed` -- Use a bearer token to get the user's key stats such as bandwidth used and methods requested. @@ -806,5 +816,5 @@ pub async fn user_stats_detailed_get( ) -> FrontendResult { let response = query_user_stats(&app, bearer, ¶ms, StatResponse::Detailed).await?; - Ok(Json(response).into_response()) + Ok(response) } diff --git a/web3_proxy/src/user_queries.rs b/web3_proxy/src/user_queries.rs index 164d0f81..ffffe5e3 100644 --- a/web3_proxy/src/user_queries.rs +++ b/web3_proxy/src/user_queries.rs @@ -1,6 +1,8 @@ use crate::frontend::errors::FrontendErrorResponse; use crate::{app::Web3ProxyApp, user_token::UserBearerToken}; use anyhow::Context; +use axum::response::{IntoResponse, Response}; +use axum::Json; use axum::{ headers::{authorization::Bearer, Authorization}, TypedHeader, @@ -15,13 +17,16 @@ use migration::sea_orm::{ QuerySelect, Select, }; use migration::{Condition, Expr, SimpleExpr}; +use redis_rate_limiter::redis; use redis_rate_limiter::{redis::AsyncCommands, RedisConnection}; +use serde_json::json; /// get the attached address for the given bearer token. /// First checks redis. Then checks the database. -/// 0 means all users +/// 0 means all users. +/// This authenticates that the bearer is allowed to view this user_id's stats pub async fn get_user_id_from_params( - mut redis_conn: RedisConnection, + redis_conn: &mut RedisConnection, db_conn: DatabaseConnection, // this is a long type. should we strip it down? bearer: Option>>, @@ -212,12 +217,10 @@ pub fn get_query_window_seconds_from_params( } pub fn filter_query_window_seconds( - params: &HashMap, + query_window_seconds: u64, response: &mut HashMap<&str, serde_json::Value>, q: Select, ) -> Result, FrontendErrorResponse> { - let query_window_seconds = get_query_window_seconds_from_params(params)?; - if query_window_seconds == 0 { // TODO: order by more than this? // query_window_seconds is not set so we aggregate all records @@ -256,11 +259,66 @@ pub async fn query_user_stats<'a>( bearer: Option>>, params: &'a HashMap, stat_response_type: StatResponse, -) -> Result, FrontendErrorResponse> { +) -> Result { let db_conn = app.db_conn().context("connecting to db")?; - let redis_conn = app.redis_conn().await.context("connecting to redis")?; + let mut redis_conn = app.redis_conn().await.context("connecting to redis")?; - let mut response = HashMap::new(); + // get the user id first. if it is 0, we should use a cache on the app + let user_id = get_user_id_from_params(&mut redis_conn, db_conn.clone(), bearer, params).await?; + // get the query window seconds now so that we can pick a cache with a good TTL + // TODO: for now though, just do one cache. its easier + let query_window_seconds = get_query_window_seconds_from_params(params)?; + let query_start = get_query_start_from_params(params)?; + let chain_id = get_chain_id_from_params(app, params)?; + let page = get_page_from_params(params)?; + + let cache_key = if user_id == 0 { + // TODO: cacheable query_window_seconds from config + if [60, 600, 3600, 86400, 86400 * 7, 86400 * 30].contains(&query_window_seconds) + && query_start.timestamp() % (query_window_seconds as i64) == 0 + { + None + } else { + // TODO: is this a good key? + let redis_cache_key = format!( + "query_user_stats:{}:{}:{}:{}:{}", + chain_id, user_id, query_start, query_window_seconds, page, + ); + + let cached_result: Result<(String, u64), _> = redis::pipe() + .atomic() + // get the key and its ttl + .get(&redis_cache_key) + .ttl(&redis_cache_key) + // do the query + .query_async(&mut redis_conn) + .await; + + // redis being down should not break the stats page! + if let Ok((body, ttl)) = cached_result { + let mut response = body.into_response(); + + let headers = response.headers_mut(); + + headers.insert( + "Cache-Control", + format!("max-age={}", ttl) + .parse() + .expect("max-age should always parse"), + ); + + info!("served resposne from cache"); + + return Ok(response); + } + + Some(redis_cache_key) + } + } else { + None + }; + + let mut response_body = HashMap::new(); let mut q = rpc_accounting::Entity::find() .select_only() @@ -304,31 +362,27 @@ pub async fn query_user_stats<'a>( .group_by(rpc_accounting::Column::ArchiveRequest); } - q = filter_query_window_seconds(params, &mut response, q)?; + // TODO: have q be &mut? + q = filter_query_window_seconds(query_window_seconds, &mut response_body, q)?; // aggregate stats after query_start - // TODO: minimum query_start of 90 days? - let query_start = get_query_start_from_params(params)?; + // TODO: maximum query_start of 90 days ago? // TODO: if no query_start, don't add to response or condition - response.insert( + response_body.insert( "query_start", serde_json::Value::Number(query_start.timestamp().into()), ); condition = condition.add(rpc_accounting::Column::PeriodDatetime.gte(query_start)); - // filter on chain_id - let chain_id = get_chain_id_from_params(app, params)?; if chain_id == 0 { // fetch all the chains } else { + // filter on chain_id condition = condition.add(rpc_accounting::Column::ChainId.eq(chain_id)); - response.insert("chain_id", serde_json::Value::Number(chain_id.into())); + response_body.insert("chain_id", serde_json::Value::Number(chain_id.into())); } - // get_user_id_from_params checks that the bearer is connected to this user_id - // TODO: match on user_id and rpc_key_id? - let user_id = get_user_id_from_params(redis_conn, db_conn.clone(), bearer, params).await?; if user_id == 0 { // 0 means everyone. don't filter on user } else { @@ -336,7 +390,7 @@ pub async fn query_user_stats<'a>( condition = condition.add(rpc_key::Column::UserId.eq(user_id)); - response.insert("user_id", serde_json::Value::Number(user_id.into())); + response_body.insert("user_id", serde_json::Value::Number(user_id.into())); } // filter on rpc_key_id @@ -351,7 +405,7 @@ pub async fn query_user_stats<'a>( ) })?; - response.insert("rpc_key_id", serde_json::Value::Number(rpc_key_id.into())); + response_body.insert("rpc_key_id", serde_json::Value::Number(rpc_key_id.into())); condition = condition.add(rpc_accounting::Column::RpcKeyId.eq(rpc_key_id)); @@ -372,15 +426,11 @@ pub async fn query_user_stats<'a>( // TODO: trace log query here? i think sea orm has a useful log level for this // set up pagination - let page = get_page_from_params(params)?; - response.insert("page", serde_json::to_value(page).expect("can't fail")); + response_body.insert("page", serde_json::Value::Number(page.into())); // TODO: page size from param with a max from the config - let page_size = 200; - response.insert( - "page_size", - serde_json::to_value(page_size).expect("can't fail"), - ); + let page_size = 1_000; + response_body.insert("page_size", serde_json::Value::Number(page_size.into())); // query the database for number of items and pages let pages_result = q @@ -389,19 +439,52 @@ pub async fn query_user_stats<'a>( .num_items_and_pages() .await?; - response.insert("num_items", pages_result.number_of_items.into()); - response.insert("num_pages", pages_result.number_of_pages.into()); + response_body.insert("num_items", pages_result.number_of_items.into()); + response_body.insert("num_pages", pages_result.number_of_pages.into()); // query the database (todo: combine with the pages_result query?) let query_response = q .into_json() .paginate(&db_conn, page_size) .fetch_page(page) - // TODO: timeouts here? or are they already set up on the connection .await?; + // TODO: be a lot smart about caching + let ttl = 60; + // add the query_response to the json response - response.insert("result", serde_json::Value::Array(query_response)); + response_body.insert("result", serde_json::Value::Array(query_response)); + + let mut response = Json(&response_body).into_response(); + + let headers = response.headers_mut(); + + if let Some(cache_key) = cache_key { + headers.insert( + "Cache-Control", + format!("public, max-age={}", ttl) + .parse() + .expect("max-age should always parse"), + ); + + let cache_body = json!(response_body).to_string(); + + if let Err(err) = redis_conn + .set_ex::<_, _, ()>(cache_key, cache_body, ttl) + .await + { + warn!("Redis error while caching query_user_stats: {:?}", err); + } + } else { + headers.insert( + "Cache-Control", + format!("private, max-age={}", ttl) + .parse() + .expect("max-age should always parse"), + ); + } + + // TODO: Last-Modified header? Ok(response) }