diff --git a/scripts/manual-tests/101-balance-referral-stats.sh b/scripts/manual-tests/101-balance-referral-stats.sh index 40b4fbba..d0bc3096 100644 --- a/scripts/manual-tests/101-balance-referral-stats.sh +++ b/scripts/manual-tests/101-balance-referral-stats.sh @@ -134,3 +134,10 @@ curl \ curl -X GET \ -H "Authorization: Bearer 01H2D5DN564M4Q2T6PETEZY83Q" \ "http://localhost:8544/user/stats/detailed?query_start=1686236378&query_window_seconds=3600" + +curl -X GET \ +-H "Authorization: Bearer 01H2D5DN564M4Q2T6PETEZY83Q" \ +"http://localhost:8544/user/stats/aggregate?query_start=1686236378&query_window_seconds=3600" + +curl -X GET \ +"http://localhost:8544/user/stats/aggregate?query_start=1686772800&query_window_seconds=3600" \ No newline at end of file diff --git a/web3_proxy/src/stats/influxdb_queries.rs b/web3_proxy/src/stats/influxdb_queries.rs index deb3d6d7..3634536c 100644 --- a/web3_proxy/src/stats/influxdb_queries.rs +++ b/web3_proxy/src/stats/influxdb_queries.rs @@ -169,9 +169,28 @@ pub async fn query_user_stats<'a>( trace!("Filters are: {:?}", filter_chain_id); // filter_field trace!("window seconds are: {:?}", query_window_seconds); - let drop_method = match stat_response_type { - StatType::Aggregated => f!(r#"|> drop(columns: ["method"])"#), - StatType::Detailed => "".to_string(), + let group_keys = match stat_response_type { + StatType::Aggregated => { + r#"[ + "_field", + "_measurement", + "archive_needed", + "chain_id", + "error_response", + "rpc_secret_key_id", + ]"# + } + StatType::Detailed => { + r#"[ + "_field", + "_measurement", + "archive_needed", + "chain_id", + "error_response", + "method", + "rpc_secret_key_id", + ]"# + } }; let join_candidates = f!( @@ -184,35 +203,58 @@ pub async fn query_user_stats<'a>( ] ); - let query = f!(r#" - base = () => from(bucket: "{bucket}") - |> range(start: {query_start}, stop: {query_stop}) - {drop_method} - {rpc_key_filter} - {filter_chain_id} - |> filter(fn: (r) => r._measurement == "{measurement}") + let query; + if stat_response_type == StatType::Detailed + || (stat_response_type == StatType::Aggregated && user_id != 0) + { + query = f!(r#" + base = () => from(bucket: "{bucket}") + |> range(start: {query_start}, stop: {query_stop}) + {rpc_key_filter} + {filter_chain_id} + |> filter(fn: (r) => r._measurement == "{measurement}") + + cumsum = base() + |> filter(fn: (r) => r._field == "backend_requests" or r._field == "cache_hits" or r._field == "cache_misses" or r._field == "frontend_requests" or r._field == "no_servers" or r._field == "sum_credits_used" or r._field == "sum_request_bytes" or r._field == "sum_response_bytes" or r._field == "sum_response_millis") + |> group(columns: {group_keys}) + |> aggregateWindow(every: {query_window_seconds}s, fn: sum, createEmpty: false) + |> drop(columns: ["_start", "_stop"]) + |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value") + |> group() + + balance = base() + |> filter(fn: (r) => r["_field"] == "balance") + |> group(columns: ["_field", "_measurement", "chain_id"]) + |> aggregateWindow(every: {query_window_seconds}s, fn: mean, createEmpty: false) + |> drop(columns: ["_start", "_stop"]) + |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value") + |> group() - cumsum = base() - |> filter(fn: (r) => r._field == "backend_requests" or r._field == "cache_hits" or r._field == "cache_misses" or r._field == "frontend_requests" or r._field == "no_servers" or r._field == "sum_credits_used" or r._field == "sum_request_bytes" or r._field == "sum_response_bytes" or r._field == "sum_response_millis") - |> group(columns: ["_field", "_measurement", "archive_needed", "chain_id", "error_response", "method", "rpc_secret_key_id"]) - |> aggregateWindow(every: {query_window_seconds}s, fn: sum, createEmpty: false) - |> drop(columns: ["_start", "_stop"]) - |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value") - |> group() - - balance = base() - |> filter(fn: (r) => r["_field"] == "balance") - |> group(columns: ["_field", "_measurement", "chain_id"]) - |> aggregateWindow(every: {query_window_seconds}s, fn: mean, createEmpty: false) - |> drop(columns: ["_start", "_stop"]) - |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value") - |> group() + join( + tables: {{cumsum, balance}}, + on: {join_candidates} + ) + "#); + } else if stat_response_type == StatType::Aggregated && user_id == 0 { + query = f!(r#" + from(bucket: "{bucket}") + |> range(start: {query_start}, stop: {query_stop}) + {filter_chain_id} + |> filter(fn: (r) => r._measurement == "{measurement}") + |> filter(fn: (r) => r._field != "balance") + |> group(columns: {group_keys}) + |> aggregateWindow(every: {query_window_seconds}s, fn: sum, createEmpty: false) + |> drop(columns: ["_start", "_stop"]) + |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value") + |> group() + "#); + } else { + // In this something with our logic is wrong + return Err(Web3ProxyError::BadResponse( + "This edge-case should never occur".into(), + )); + } - join( - tables: {{cumsum, balance}}, - on: {join_candidates} - ) - "#); debug!("Raw query to db is: {:#?}", query); let query = Query::new(query.to_string());