made the query much faster (#127)

This commit is contained in:
David 2023-06-15 18:38:51 +02:00 committed by GitHub
parent 957ba161b5
commit 5859cd8a8d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -130,10 +130,17 @@ pub async fn query_user_stats<'a>(
} }
// Iterate, pop and add to string // Iterate, pop and add to string
f!( let mut filter_subquery = "".to_string();
r#"|> filter(fn: (r) => contains(value: r["rpc_secret_key_id"], set: {:?}))"#,
user_rpc_keys for (idx, user_key) in user_rpc_keys.iter().enumerate() {
) if idx == 0 {
filter_subquery += &f!(r#"r.rpc_secret_key_id == "{}""#, user_key);
} else {
filter_subquery += &f!(r#"or r.rpc_secret_key_id == "{}""#, user_key);
}
}
f!(r#"|> filter(fn: (r) => {})"#, filter_subquery)
}; };
// TODO: Turn into a 500 error if bucket is not found .. // TODO: Turn into a 500 error if bucket is not found ..
@ -147,7 +154,7 @@ pub async fn query_user_stats<'a>(
trace!("Bucket is {:?}", bucket); trace!("Bucket is {:?}", bucket);
let mut filter_chain_id = "".to_string(); let mut filter_chain_id = "".to_string();
if chain_id != 0 { if chain_id != 0 {
filter_chain_id = f!(r#"|> filter(fn: (r) => r["chain_id"] == "{chain_id}")"#); filter_chain_id = f!(r#"|> filter(fn: (r) => r.chain_id == "{chain_id}")"#);
} }
// Fetch and request for balance // Fetch and request for balance
@ -178,22 +185,22 @@ pub async fn query_user_stats<'a>(
); );
let query = f!(r#" let query = f!(r#"
base = from(bucket: "{bucket}") base = () => from(bucket: "{bucket}")
|> range(start: {query_start}, stop: {query_stop}) |> range(start: {query_start}, stop: {query_stop})
{drop_method} {drop_method}
{rpc_key_filter} {rpc_key_filter}
{filter_chain_id} {filter_chain_id}
|> filter(fn: (r) => r["_measurement"] == "{measurement}") |> filter(fn: (r) => r._measurement == "{measurement}")
cumsum = base cumsum = base()
|> filter(fn: (r) => r["_field"] != "balance") |> filter(fn: (r) => r._field == "backend_requests" or r._field == "cache_hits" or r._field == "cache_misses" or r._field == "frontend_requests" or r._field == "no_servers" or r._field == "sum_credits_used" or r._field == "sum_request_bytes" or r._field == "sum_response_bytes" or r._field == "sum_response_millis")
|> group(columns: ["_field", "_measurement", "archive_needed", "chain_id", "error_response", "method", "rpc_secret_key_id"]) |> group(columns: ["_field", "_measurement", "archive_needed", "chain_id", "error_response", "method", "rpc_secret_key_id"])
|> aggregateWindow(every: {query_window_seconds}s, fn: sum, createEmpty: false) |> aggregateWindow(every: {query_window_seconds}s, fn: sum, createEmpty: false)
|> drop(columns: ["_start", "_stop"]) |> drop(columns: ["_start", "_stop"])
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value") |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|> group() |> group()
balance = base balance = base()
|> filter(fn: (r) => r["_field"] == "balance") |> filter(fn: (r) => r["_field"] == "balance")
|> group(columns: ["_field", "_measurement", "chain_id"]) |> group(columns: ["_field", "_measurement", "chain_id"])
|> aggregateWindow(every: {query_window_seconds}s, fn: mean, createEmpty: false) |> aggregateWindow(every: {query_window_seconds}s, fn: mean, createEmpty: false)