From 5859cd8a8d35b2ebfaf71c0175e5b27d7ac636b6 Mon Sep 17 00:00:00 2001 From: David Date: Thu, 15 Jun 2023 18:38:51 +0200 Subject: [PATCH] made the query much faster (#127) --- web3_proxy/src/stats/influxdb_queries.rs | 43 ++++++++++++++---------- 1 file changed, 25 insertions(+), 18 deletions(-) diff --git a/web3_proxy/src/stats/influxdb_queries.rs b/web3_proxy/src/stats/influxdb_queries.rs index d388d44c..deb3d6d7 100644 --- a/web3_proxy/src/stats/influxdb_queries.rs +++ b/web3_proxy/src/stats/influxdb_queries.rs @@ -130,10 +130,17 @@ pub async fn query_user_stats<'a>( } // Iterate, pop and add to string - f!( - r#"|> filter(fn: (r) => contains(value: r["rpc_secret_key_id"], set: {:?}))"#, - user_rpc_keys - ) + let mut filter_subquery = "".to_string(); + + for (idx, user_key) in user_rpc_keys.iter().enumerate() { + if idx == 0 { + filter_subquery += &f!(r#"r.rpc_secret_key_id == "{}""#, user_key); + } else { + filter_subquery += &f!(r#"or r.rpc_secret_key_id == "{}""#, user_key); + } + } + + f!(r#"|> filter(fn: (r) => {})"#, filter_subquery) }; // TODO: Turn into a 500 error if bucket is not found .. @@ -147,7 +154,7 @@ pub async fn query_user_stats<'a>( trace!("Bucket is {:?}", bucket); let mut filter_chain_id = "".to_string(); if chain_id != 0 { - filter_chain_id = f!(r#"|> filter(fn: (r) => r["chain_id"] == "{chain_id}")"#); + filter_chain_id = f!(r#"|> filter(fn: (r) => r.chain_id == "{chain_id}")"#); } // Fetch and request for balance @@ -168,32 +175,32 @@ pub async fn query_user_stats<'a>( }; let join_candidates = f!( - r#"{:?}"#, - vec![ - "_time", - "_measurement", - "chain_id", - // "rpc_secret_key_id" - ] - ); + r#"{:?}"#, + vec![ + "_time", + "_measurement", + "chain_id", + // "rpc_secret_key_id" + ] + ); let query = f!(r#" - base = from(bucket: "{bucket}") + base = () => from(bucket: "{bucket}") |> range(start: {query_start}, stop: {query_stop}) {drop_method} {rpc_key_filter} {filter_chain_id} - |> filter(fn: (r) => r["_measurement"] == "{measurement}") + |> filter(fn: (r) => r._measurement == "{measurement}") - cumsum = base - |> filter(fn: (r) => r["_field"] != "balance") + cumsum = base() + |> filter(fn: (r) => r._field == "backend_requests" or r._field == "cache_hits" or r._field == "cache_misses" or r._field == "frontend_requests" or r._field == "no_servers" or r._field == "sum_credits_used" or r._field == "sum_request_bytes" or r._field == "sum_response_bytes" or r._field == "sum_response_millis") |> group(columns: ["_field", "_measurement", "archive_needed", "chain_id", "error_response", "method", "rpc_secret_key_id"]) |> aggregateWindow(every: {query_window_seconds}s, fn: sum, createEmpty: false) |> drop(columns: ["_start", "_stop"]) |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value") |> group() - balance = base + balance = base() |> filter(fn: (r) => r["_field"] == "balance") |> group(columns: ["_field", "_measurement", "chain_id"]) |> aggregateWindow(every: {query_window_seconds}s, fn: mean, createEmpty: false)