From 2333b8bbb653e5b9219bb1d5e8497b1ac0d024fc Mon Sep 17 00:00:00 2001 From: yenicelik Date: Sun, 11 Jun 2023 17:57:02 +0200 Subject: [PATCH] influx query is not 20-40% faster --- web3_proxy/src/stats/influxdb_queries.rs | 39 ++++++++---------------- 1 file changed, 13 insertions(+), 26 deletions(-) diff --git a/web3_proxy/src/stats/influxdb_queries.rs b/web3_proxy/src/stats/influxdb_queries.rs index dd0dc37b..d388d44c 100644 --- a/web3_proxy/src/stats/influxdb_queries.rs +++ b/web3_proxy/src/stats/influxdb_queries.rs @@ -167,57 +167,44 @@ pub async fn query_user_stats<'a>( StatType::Detailed => "".to_string(), }; - let join_candidates = match stat_response_type { - StatType::Aggregated => f!( - r#"{:?}"#, - vec!["_time", "_measurement", "chain_id", "rpc_secret_key_id"] - ), - StatType::Detailed => f!( + let join_candidates = f!( r#"{:?}"#, vec![ "_time", "_measurement", - "method", "chain_id", - "rpc_secret_key_id" + // "rpc_secret_key_id" ] - ), - }; + ); let query = f!(r#" base = from(bucket: "{bucket}") |> range(start: {query_start}, stop: {query_stop}) - {rpc_key_filter} - |> filter(fn: (r) => r["_measurement"] == "{measurement}") - {filter_chain_id} {drop_method} + {rpc_key_filter} + {filter_chain_id} + |> filter(fn: (r) => r["_measurement"] == "{measurement}") - cumsum = base + cumsum = base + |> filter(fn: (r) => r["_field"] != "balance") + |> group(columns: ["_field", "_measurement", "archive_needed", "chain_id", "error_response", "method", "rpc_secret_key_id"]) |> aggregateWindow(every: {query_window_seconds}s, fn: sum, createEmpty: false) + |> drop(columns: ["_start", "_stop"]) |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value") - |> drop(columns: ["balance"]) - |> group(columns: ["_time", "_measurement", "archive_needed", "chain_id", "error_response", "method", "rpc_secret_key_id"]) - |> sort(columns: ["frontend_requests"]) - |> map(fn:(r) => ({{ r with "sum_credits_used": float(v: r["sum_credits_used"]) }})) - |> cumulativeSum(columns: ["backend_requests", "cache_hits", "cache_misses", "frontend_requests", "sum_credits_used", "sum_request_bytes", "sum_response_bytes", "sum_response_millis"]) - |> sort(columns: ["frontend_requests"], desc: true) - |> limit(n: 1) |> group() balance = base - |> toFloat() + |> filter(fn: (r) => r["_field"] == "balance") + |> group(columns: ["_field", "_measurement", "chain_id"]) |> aggregateWindow(every: {query_window_seconds}s, fn: mean, createEmpty: false) + |> drop(columns: ["_start", "_stop"]) |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value") - |> group(columns: ["_time", "_measurement", "chain_id", "method", "rpc_secret_key_id"]) - |> mean(column: "balance") |> group() - |> sort(columns: ["_time", "_measurement", "chain_id", "method", "rpc_secret_key_id"], desc: true) join( tables: {{cumsum, balance}}, on: {join_candidates} ) - |> sort(columns: ["_time", "_measurement", "chain_id", "method", "rpc_secret_key_id"], desc: true) "#); debug!("Raw query to db is: {:#?}", query);