From 65ca628cc7257b355451b64d373cb1fba19065ba Mon Sep 17 00:00:00 2001 From: yenicelik Date: Tue, 14 Mar 2023 16:32:19 +0100 Subject: [PATCH] will modify query so we can make queries more easily --- scripts/generate-requests-and-stats.sh | 2 +- scripts/get-stats-aggregated.sh | 2 +- web3_proxy/src/stats/influxdb_queries.rs | 81 +++++++++++++++++++----- 3 files changed, 68 insertions(+), 17 deletions(-) diff --git a/scripts/generate-requests-and-stats.sh b/scripts/generate-requests-and-stats.sh index 62d45953..58cdf10b 100644 --- a/scripts/generate-requests-and-stats.sh +++ b/scripts/generate-requests-and-stats.sh @@ -4,4 +4,4 @@ # https://github.com/INFURA/versus # ./ethspam | ./versus --stop-after 100 "http://localhost:8544/" # Pipe into the endpoint ..., add a bearer token and all that -./ethspam http://127.0.0.1:8544 | ./versus --stop-after 100000000 http://localhost:8544 +./ethspam http://127.0.0.1:8544 | ./versus --stop-after 100 http://localhost:8544 diff --git a/scripts/get-stats-aggregated.sh b/scripts/get-stats-aggregated.sh index d229bec4..85b5b7f2 100644 --- a/scripts/get-stats-aggregated.sh +++ b/scripts/get-stats-aggregated.sh @@ -4,4 +4,4 @@ curl -X GET \ -"http://localhost:8544/user/stats/aggregate?query_start=1678463411&query_window_seconds=1000000" +"http://localhost:8544/user/stats/aggregate?query_start=1678780033&query_window_seconds=1000" diff --git a/web3_proxy/src/stats/influxdb_queries.rs b/web3_proxy/src/stats/influxdb_queries.rs index 949ef034..5fba8d0d 100644 --- a/web3_proxy/src/stats/influxdb_queries.rs +++ b/web3_proxy/src/stats/influxdb_queries.rs @@ -22,6 +22,7 @@ use itertools::Itertools; use log::info; use serde::Serialize; use serde_json::{json}; +use entities::{rpc_accounting, rpc_key}; use crate::http_params::get_stats_column_from_params; // TODO: include chain_id, method, and some other things in this struct @@ -30,8 +31,15 @@ pub struct AggregatedRpcAccounting { field: String, value: f64, time: DateTime, + // error_response: bool, } +// pub struct AggregatedRpcAccountingErrors { +// field: String, +// time: DateTime, +// archive_needed: f64 +// } + pub async fn query_user_stats<'a>( app: &'a Web3ProxyApp, bearer: Option>>, @@ -98,6 +106,7 @@ pub async fn query_user_stats<'a>( info!("Bucket is {:?}", bucket); info!("Got this far 7"); + // , "archive_needed", "error_response" let mut group_columns = vec!["_measurement", "_field"]; let mut filter_chain_id = "".to_string(); @@ -125,6 +134,7 @@ pub async fn query_user_stats<'a>( f!(r#"|> filter(fn: (r) => r["_field"] == "{stats_column}")"#) }, // TODO: Detailed should still filter it, but just "group-by" method (call it once per each method ... + // Or maybe it shouldn't filter it ... StatType::Detailed => "".to_string(), }; @@ -160,10 +170,35 @@ pub async fn query_user_stats<'a>( {filter_field} {filter_chain_id} {group} - |> aggregateWindow(every: {query_window_seconds}s, fn: sum, createEmpty: false) - |> yield(name: "sum") + |> aggregateWindow(every: {query_window_seconds}s, fn: mean, createEmpty: false) + |> group() + // |> yield(name: "mean") "#); + // Also make a query for archived + // let query_archived = f!(r#" + // // from(bucket: "{bucket}") + // |> range(start: {query_start}, stop: {query_stop}) + // |> filter(fn: (r) => r["_measurement"] == "{measurement}") + // |> filter(fn: (r) => r["archive_needed"] == true) + // |> aggregateWindow(every: {query_window_seconds}s, fn: count, createEmpty: false) + // |> yield(name: "count") + // "#); + // let query_archived = f!(r#" + // from(bucket: "{bucket}") + // |> range(start: {query_start}, stop: {query_stop}) + // |> filter(fn: (r) => r["_measurement"] == "{measurement}") + // |> filter(fn: (r) => r["error_responses"] == true) + // |> aggregateWindow(every: {query_window_seconds}s, fn: count, createEmpty: false) + // |> yield(name: "count") + // "#); + + // Also make a query for errors + + + // TODO: Also make a query thats detailed + + info!("Raw query to db is: {:?}", query); let query = Query::new(query.to_string()); info!("Query to db is: {:?}", query); @@ -174,7 +209,7 @@ pub async fn query_user_stats<'a>( // info!("Direct response is: {:?}", unparsed); info!("Got this far 12"); - let influx_responses: Vec = influxdb_client.query(Some(query)).await?; + let influx_responses: Vec = influxdb_client.query::(Some(query)).await?; info!("Influx responses are {:?}", &influx_responses); for res in &influx_responses { info!("Resp is: {:?}", res); @@ -186,7 +221,7 @@ pub async fn query_user_stats<'a>( .group_by(|x| { // This looks ugly, revisit later // x.field.clone() - (x.clone().time) + (x.clone().field.clone(), x.clone().time) }) .into_iter() .map(|(group, grouped_items)| { @@ -196,6 +231,7 @@ pub async fn query_user_stats<'a>( // Could also add a timestamp for x in grouped_items { + info!("Iterating over grouped item {:?}", x); out.insert( f!(r#"total_{x.field}"#), // serde_json::Value::Number(serde_json::Number::from(x.value)) @@ -215,21 +251,36 @@ pub async fn query_user_stats<'a>( // I suppose archive requests could be either gathered by default (then summed up), or retrieved on a second go. // Same with error responses .. - let mut out = HashMap::new(); - out.insert("num_items", serde_json::Value::Number(datapoints.len().into())); - out.insert("result", serde_json::Value::Array(datapoints)); - out.insert("query_window_seconds", serde_json::Value::Number(query_window_seconds.into())); - out.insert("query_start", serde_json::Value::Number(query_start.into())); - out.insert("chain_id", serde_json::Value::Number(chain_id.into())); + let mut response_body = HashMap::new(); + response_body.insert("num_items", serde_json::Value::Number(datapoints.len().into())); + response_body.insert("result", serde_json::Value::Array(datapoints)); + response_body.insert("query_window_seconds", serde_json::Value::Number(query_window_seconds.into())); + response_body.insert("query_start", serde_json::Value::Number(query_start.into())); + response_body.insert("chain_id", serde_json::Value::Number(chain_id.into())); - info!("Got this far 13 {:?}", out); - let out = Json(json!(out)).into_response(); + if user_id == 0 { + // 0 means everyone. don't filter on user + } else { + // q = q.left_join(rpc_key::Entity); + // condition = condition.add(rpc_key::Column::UserId.eq(user_id)); + response_body.insert("user_id", serde_json::Value::Number(user_id.into())); + } + + // Also optionally add the rpc_key_id: + if let Some(rpc_key_id) = params.get("rpc_key_id") { + let rpc_key_id = rpc_key_id.parse::().map_err(|e| { + FrontendErrorResponse::BadRequest("Unable to parse rpc_key_id".to_string()) + })?; + response_body.insert("rpc_key_id", serde_json::Value::Number(rpc_key_id.into())); + } + + info!("Got this far 13 {:?}", response_body); + let response = Json(json!(response_body)).into_response(); // Add the requests back into out - info!("Got this far 14 {:?}", out); - + info!("Got this far 14 {:?}", response); // TODO: Now impplement the proper response type - Ok(out) + Ok(response) }