Merge pull request #114 from yenicelik/devel

Devel
This commit is contained in:
David 2023-06-11 17:59:03 +02:00 committed by GitHub
commit 4b3806d59c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -167,57 +167,44 @@ pub async fn query_user_stats<'a>(
StatType::Detailed => "".to_string(), StatType::Detailed => "".to_string(),
}; };
let join_candidates = match stat_response_type { let join_candidates = f!(
StatType::Aggregated => f!(
r#"{:?}"#,
vec!["_time", "_measurement", "chain_id", "rpc_secret_key_id"]
),
StatType::Detailed => f!(
r#"{:?}"#, r#"{:?}"#,
vec![ vec![
"_time", "_time",
"_measurement", "_measurement",
"method",
"chain_id", "chain_id",
"rpc_secret_key_id" // "rpc_secret_key_id"
] ]
), );
};
let query = f!(r#" let query = f!(r#"
base = from(bucket: "{bucket}") base = from(bucket: "{bucket}")
|> range(start: {query_start}, stop: {query_stop}) |> range(start: {query_start}, stop: {query_stop})
{rpc_key_filter}
|> filter(fn: (r) => r["_measurement"] == "{measurement}")
{filter_chain_id}
{drop_method} {drop_method}
{rpc_key_filter}
{filter_chain_id}
|> filter(fn: (r) => r["_measurement"] == "{measurement}")
cumsum = base cumsum = base
|> filter(fn: (r) => r["_field"] != "balance")
|> group(columns: ["_field", "_measurement", "archive_needed", "chain_id", "error_response", "method", "rpc_secret_key_id"])
|> aggregateWindow(every: {query_window_seconds}s, fn: sum, createEmpty: false) |> aggregateWindow(every: {query_window_seconds}s, fn: sum, createEmpty: false)
|> drop(columns: ["_start", "_stop"])
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value") |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|> drop(columns: ["balance"])
|> group(columns: ["_time", "_measurement", "archive_needed", "chain_id", "error_response", "method", "rpc_secret_key_id"])
|> sort(columns: ["frontend_requests"])
|> map(fn:(r) => ({{ r with "sum_credits_used": float(v: r["sum_credits_used"]) }}))
|> cumulativeSum(columns: ["backend_requests", "cache_hits", "cache_misses", "frontend_requests", "sum_credits_used", "sum_request_bytes", "sum_response_bytes", "sum_response_millis"])
|> sort(columns: ["frontend_requests"], desc: true)
|> limit(n: 1)
|> group() |> group()
balance = base balance = base
|> toFloat() |> filter(fn: (r) => r["_field"] == "balance")
|> group(columns: ["_field", "_measurement", "chain_id"])
|> aggregateWindow(every: {query_window_seconds}s, fn: mean, createEmpty: false) |> aggregateWindow(every: {query_window_seconds}s, fn: mean, createEmpty: false)
|> drop(columns: ["_start", "_stop"])
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value") |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|> group(columns: ["_time", "_measurement", "chain_id", "method", "rpc_secret_key_id"])
|> mean(column: "balance")
|> group() |> group()
|> sort(columns: ["_time", "_measurement", "chain_id", "method", "rpc_secret_key_id"], desc: true)
join( join(
tables: {{cumsum, balance}}, tables: {{cumsum, balance}},
on: {join_candidates} on: {join_candidates}
) )
|> sort(columns: ["_time", "_measurement", "chain_id", "method", "rpc_secret_key_id"], desc: true)
"#); "#);
debug!("Raw query to db is: {:#?}", query); debug!("Raw query to db is: {:#?}", query);