query_user_stats caching

This commit is contained in:
Bryan Stitt 2022-12-15 22:32:58 -08:00
parent 9e32b02ada
commit 0a60ccd95e
4 changed files with 136 additions and 41 deletions

View File

@ -261,6 +261,9 @@ These are roughly in order of completition
These are not yet ordered. There might be duplicates. We might not actually need all of these. These are not yet ordered. There might be duplicates. We might not actually need all of these.
- [ ] rate limiting/throttling on query_user_stats
- [ ] minimum allowed query_start on query_user_stats
- [ ] query_user_stats cache hit rate
- [ ] having the whole block in status is very verbose. trim it down - [ ] having the whole block in status is very verbose. trim it down
- [ ] `cost estimate` script - [ ] `cost estimate` script
- sum bytes and number of requests. prompt hosting costs. divide - sum bytes and number of requests. prompt hosting costs. divide

View File

@ -48,7 +48,6 @@ impl RedisRateLimiter {
pub fn now_as_secs(&self) -> f32 { pub fn now_as_secs(&self) -> f32 {
// TODO: if system time doesn't match redis, this won't work great // TODO: if system time doesn't match redis, this won't work great
// TODO: now that we fixed
SystemTime::now() SystemTime::now()
.duration_since(UNIX_EPOCH) .duration_since(UNIX_EPOCH)
.expect("cannot tell the time") .expect("cannot tell the time")

View File

@ -736,7 +736,7 @@ pub async fn user_revert_logs_get(
let page = get_page_from_params(&params)?; let page = get_page_from_params(&params)?;
// TODO: page size from config // TODO: page size from config
let page_size = 200; let page_size = 1_000;
let mut response = HashMap::new(); let mut response = HashMap::new();
@ -756,20 +756,30 @@ pub async fn user_revert_logs_get(
// TODO: only select the ids // TODO: only select the ids
let uks: Vec<_> = uks.into_iter().map(|x| x.id).collect(); let uks: Vec<_> = uks.into_iter().map(|x| x.id).collect();
// get paginated logs // get revert logs
let q = revert_log::Entity::find() let mut q = revert_log::Entity::find()
.filter(revert_log::Column::Timestamp.gte(query_start)) .filter(revert_log::Column::Timestamp.gte(query_start))
.filter(revert_log::Column::RpcKeyId.is_in(uks)) .filter(revert_log::Column::RpcKeyId.is_in(uks))
.order_by_asc(revert_log::Column::Timestamp); .order_by_asc(revert_log::Column::Timestamp);
let q = if chain_id == 0 { if chain_id == 0 {
// don't do anything // don't do anything
q
} else { } else {
// filter on chain id // filter on chain id
q.filter(revert_log::Column::ChainId.eq(chain_id)) q = q.filter(revert_log::Column::ChainId.eq(chain_id))
}; }
// query the database for number of items and pages
let pages_result = q
.clone()
.paginate(&db_conn, page_size)
.num_items_and_pages()
.await?;
response.insert("num_items", pages_result.number_of_items.into());
response.insert("num_pages", pages_result.number_of_pages.into());
// query the database for the revert logs
let revert_logs = q.paginate(&db_conn, page_size).fetch_page(page).await?; let revert_logs = q.paginate(&db_conn, page_size).fetch_page(page).await?;
response.insert("revert_logs", json!(revert_logs)); response.insert("revert_logs", json!(revert_logs));
@ -786,7 +796,7 @@ pub async fn user_stats_aggregated_get(
) -> FrontendResult { ) -> FrontendResult {
let response = query_user_stats(&app, bearer, &params, StatResponse::Aggregated).await?; let response = query_user_stats(&app, bearer, &params, StatResponse::Aggregated).await?;
Ok(Json(response).into_response()) Ok(response)
} }
/// `GET /user/stats/detailed` -- Use a bearer token to get the user's key stats such as bandwidth used and methods requested. /// `GET /user/stats/detailed` -- Use a bearer token to get the user's key stats such as bandwidth used and methods requested.
@ -806,5 +816,5 @@ pub async fn user_stats_detailed_get(
) -> FrontendResult { ) -> FrontendResult {
let response = query_user_stats(&app, bearer, &params, StatResponse::Detailed).await?; let response = query_user_stats(&app, bearer, &params, StatResponse::Detailed).await?;
Ok(Json(response).into_response()) Ok(response)
} }

View File

@ -1,6 +1,8 @@
use crate::frontend::errors::FrontendErrorResponse; use crate::frontend::errors::FrontendErrorResponse;
use crate::{app::Web3ProxyApp, user_token::UserBearerToken}; use crate::{app::Web3ProxyApp, user_token::UserBearerToken};
use anyhow::Context; use anyhow::Context;
use axum::response::{IntoResponse, Response};
use axum::Json;
use axum::{ use axum::{
headers::{authorization::Bearer, Authorization}, headers::{authorization::Bearer, Authorization},
TypedHeader, TypedHeader,
@ -15,13 +17,16 @@ use migration::sea_orm::{
QuerySelect, Select, QuerySelect, Select,
}; };
use migration::{Condition, Expr, SimpleExpr}; use migration::{Condition, Expr, SimpleExpr};
use redis_rate_limiter::redis;
use redis_rate_limiter::{redis::AsyncCommands, RedisConnection}; use redis_rate_limiter::{redis::AsyncCommands, RedisConnection};
use serde_json::json;
/// get the attached address for the given bearer token. /// get the attached address for the given bearer token.
/// First checks redis. Then checks the database. /// First checks redis. Then checks the database.
/// 0 means all users /// 0 means all users.
/// This authenticates that the bearer is allowed to view this user_id's stats
pub async fn get_user_id_from_params( pub async fn get_user_id_from_params(
mut redis_conn: RedisConnection, redis_conn: &mut RedisConnection,
db_conn: DatabaseConnection, db_conn: DatabaseConnection,
// this is a long type. should we strip it down? // this is a long type. should we strip it down?
bearer: Option<TypedHeader<Authorization<Bearer>>>, bearer: Option<TypedHeader<Authorization<Bearer>>>,
@ -212,12 +217,10 @@ pub fn get_query_window_seconds_from_params(
} }
pub fn filter_query_window_seconds( pub fn filter_query_window_seconds(
params: &HashMap<String, String>, query_window_seconds: u64,
response: &mut HashMap<&str, serde_json::Value>, response: &mut HashMap<&str, serde_json::Value>,
q: Select<rpc_accounting::Entity>, q: Select<rpc_accounting::Entity>,
) -> Result<Select<rpc_accounting::Entity>, FrontendErrorResponse> { ) -> Result<Select<rpc_accounting::Entity>, FrontendErrorResponse> {
let query_window_seconds = get_query_window_seconds_from_params(params)?;
if query_window_seconds == 0 { if query_window_seconds == 0 {
// TODO: order by more than this? // TODO: order by more than this?
// query_window_seconds is not set so we aggregate all records // query_window_seconds is not set so we aggregate all records
@ -256,11 +259,66 @@ pub async fn query_user_stats<'a>(
bearer: Option<TypedHeader<Authorization<Bearer>>>, bearer: Option<TypedHeader<Authorization<Bearer>>>,
params: &'a HashMap<String, String>, params: &'a HashMap<String, String>,
stat_response_type: StatResponse, stat_response_type: StatResponse,
) -> Result<HashMap<&'a str, serde_json::Value>, FrontendErrorResponse> { ) -> Result<Response, FrontendErrorResponse> {
let db_conn = app.db_conn().context("connecting to db")?; let db_conn = app.db_conn().context("connecting to db")?;
let redis_conn = app.redis_conn().await.context("connecting to redis")?; let mut redis_conn = app.redis_conn().await.context("connecting to redis")?;
let mut response = HashMap::new(); // get the user id first. if it is 0, we should use a cache on the app
let user_id = get_user_id_from_params(&mut redis_conn, db_conn.clone(), bearer, params).await?;
// get the query window seconds now so that we can pick a cache with a good TTL
// TODO: for now though, just do one cache. its easier
let query_window_seconds = get_query_window_seconds_from_params(params)?;
let query_start = get_query_start_from_params(params)?;
let chain_id = get_chain_id_from_params(app, params)?;
let page = get_page_from_params(params)?;
let cache_key = if user_id == 0 {
// TODO: cacheable query_window_seconds from config
if [60, 600, 3600, 86400, 86400 * 7, 86400 * 30].contains(&query_window_seconds)
&& query_start.timestamp() % (query_window_seconds as i64) == 0
{
None
} else {
// TODO: is this a good key?
let redis_cache_key = format!(
"query_user_stats:{}:{}:{}:{}:{}",
chain_id, user_id, query_start, query_window_seconds, page,
);
let cached_result: Result<(String, u64), _> = redis::pipe()
.atomic()
// get the key and its ttl
.get(&redis_cache_key)
.ttl(&redis_cache_key)
// do the query
.query_async(&mut redis_conn)
.await;
// redis being down should not break the stats page!
if let Ok((body, ttl)) = cached_result {
let mut response = body.into_response();
let headers = response.headers_mut();
headers.insert(
"Cache-Control",
format!("max-age={}", ttl)
.parse()
.expect("max-age should always parse"),
);
info!("served resposne from cache");
return Ok(response);
}
Some(redis_cache_key)
}
} else {
None
};
let mut response_body = HashMap::new();
let mut q = rpc_accounting::Entity::find() let mut q = rpc_accounting::Entity::find()
.select_only() .select_only()
@ -304,31 +362,27 @@ pub async fn query_user_stats<'a>(
.group_by(rpc_accounting::Column::ArchiveRequest); .group_by(rpc_accounting::Column::ArchiveRequest);
} }
q = filter_query_window_seconds(params, &mut response, q)?; // TODO: have q be &mut?
q = filter_query_window_seconds(query_window_seconds, &mut response_body, q)?;
// aggregate stats after query_start // aggregate stats after query_start
// TODO: minimum query_start of 90 days? // TODO: maximum query_start of 90 days ago?
let query_start = get_query_start_from_params(params)?;
// TODO: if no query_start, don't add to response or condition // TODO: if no query_start, don't add to response or condition
response.insert( response_body.insert(
"query_start", "query_start",
serde_json::Value::Number(query_start.timestamp().into()), serde_json::Value::Number(query_start.timestamp().into()),
); );
condition = condition.add(rpc_accounting::Column::PeriodDatetime.gte(query_start)); condition = condition.add(rpc_accounting::Column::PeriodDatetime.gte(query_start));
// filter on chain_id
let chain_id = get_chain_id_from_params(app, params)?;
if chain_id == 0 { if chain_id == 0 {
// fetch all the chains // fetch all the chains
} else { } else {
// filter on chain_id
condition = condition.add(rpc_accounting::Column::ChainId.eq(chain_id)); condition = condition.add(rpc_accounting::Column::ChainId.eq(chain_id));
response.insert("chain_id", serde_json::Value::Number(chain_id.into())); response_body.insert("chain_id", serde_json::Value::Number(chain_id.into()));
} }
// get_user_id_from_params checks that the bearer is connected to this user_id
// TODO: match on user_id and rpc_key_id?
let user_id = get_user_id_from_params(redis_conn, db_conn.clone(), bearer, params).await?;
if user_id == 0 { if user_id == 0 {
// 0 means everyone. don't filter on user // 0 means everyone. don't filter on user
} else { } else {
@ -336,7 +390,7 @@ pub async fn query_user_stats<'a>(
condition = condition.add(rpc_key::Column::UserId.eq(user_id)); condition = condition.add(rpc_key::Column::UserId.eq(user_id));
response.insert("user_id", serde_json::Value::Number(user_id.into())); response_body.insert("user_id", serde_json::Value::Number(user_id.into()));
} }
// filter on rpc_key_id // filter on rpc_key_id
@ -351,7 +405,7 @@ pub async fn query_user_stats<'a>(
) )
})?; })?;
response.insert("rpc_key_id", serde_json::Value::Number(rpc_key_id.into())); response_body.insert("rpc_key_id", serde_json::Value::Number(rpc_key_id.into()));
condition = condition.add(rpc_accounting::Column::RpcKeyId.eq(rpc_key_id)); condition = condition.add(rpc_accounting::Column::RpcKeyId.eq(rpc_key_id));
@ -372,15 +426,11 @@ pub async fn query_user_stats<'a>(
// TODO: trace log query here? i think sea orm has a useful log level for this // TODO: trace log query here? i think sea orm has a useful log level for this
// set up pagination // set up pagination
let page = get_page_from_params(params)?; response_body.insert("page", serde_json::Value::Number(page.into()));
response.insert("page", serde_json::to_value(page).expect("can't fail"));
// TODO: page size from param with a max from the config // TODO: page size from param with a max from the config
let page_size = 200; let page_size = 1_000;
response.insert( response_body.insert("page_size", serde_json::Value::Number(page_size.into()));
"page_size",
serde_json::to_value(page_size).expect("can't fail"),
);
// query the database for number of items and pages // query the database for number of items and pages
let pages_result = q let pages_result = q
@ -389,19 +439,52 @@ pub async fn query_user_stats<'a>(
.num_items_and_pages() .num_items_and_pages()
.await?; .await?;
response.insert("num_items", pages_result.number_of_items.into()); response_body.insert("num_items", pages_result.number_of_items.into());
response.insert("num_pages", pages_result.number_of_pages.into()); response_body.insert("num_pages", pages_result.number_of_pages.into());
// query the database (todo: combine with the pages_result query?) // query the database (todo: combine with the pages_result query?)
let query_response = q let query_response = q
.into_json() .into_json()
.paginate(&db_conn, page_size) .paginate(&db_conn, page_size)
.fetch_page(page) .fetch_page(page)
// TODO: timeouts here? or are they already set up on the connection
.await?; .await?;
// TODO: be a lot smart about caching
let ttl = 60;
// add the query_response to the json response // add the query_response to the json response
response.insert("result", serde_json::Value::Array(query_response)); response_body.insert("result", serde_json::Value::Array(query_response));
let mut response = Json(&response_body).into_response();
let headers = response.headers_mut();
if let Some(cache_key) = cache_key {
headers.insert(
"Cache-Control",
format!("public, max-age={}", ttl)
.parse()
.expect("max-age should always parse"),
);
let cache_body = json!(response_body).to_string();
if let Err(err) = redis_conn
.set_ex::<_, _, ()>(cache_key, cache_body, ttl)
.await
{
warn!("Redis error while caching query_user_stats: {:?}", err);
}
} else {
headers.insert(
"Cache-Control",
format!("private, max-age={}", ttl)
.parse()
.expect("max-age should always parse"),
);
}
// TODO: Last-Modified header?
Ok(response) Ok(response)
} }