will push this for short review

This commit is contained in:
yenicelik 2023-03-14 20:08:16 +01:00
parent 65ca628cc7
commit ca41cb5c9e
2 changed files with 170 additions and 70 deletions

@ -5,3 +5,6 @@
curl -X GET \
"http://localhost:8544/user/stats/aggregate?query_start=1678780033&query_window_seconds=1000"
#curl -X GET \
#"http://localhost:8544/user/stats/detailed?query_start=1678780033&query_window_seconds=1000"

@ -25,13 +25,25 @@ use serde_json::{json};
use entities::{rpc_accounting, rpc_key};
use crate::http_params::get_stats_column_from_params;
// TODO: include chain_id, method, and some other things in this struct
#[derive(Debug, Default, FromDataPoint, Serialize)]
pub struct AggregatedRpcAccounting {
chain_id: u64,
field: String,
value: f64,
time: DateTime<FixedOffset>,
// error_response: bool,
error_response: bool,
archive_needed: bool,
}
#[derive(Debug, Default, FromDataPoint, Serialize)]
pub struct DetailedRpcAccounting {
chain_id: u64,
field: String,
value: f64,
time: DateTime<FixedOffset>,
error_response: bool,
archive_needed: bool,
method: String,
}
// pub struct AggregatedRpcAccountingErrors {
@ -107,9 +119,17 @@ pub async fn query_user_stats<'a>(
info!("Got this far 7");
// , "archive_needed", "error_response"
let mut group_columns = vec!["_measurement", "_field"];
let mut group_columns = vec!["chain_id", "_measurement", "_field", "_measurement", "error_response", "archive_needed"];
let mut filter_chain_id = "".to_string();
// Add to group columns the method, if we want the detailed view as well
match stat_response_type {
StatType::Detailed => {
group_columns.push("method");
},
_ => {}
}
if chain_id == 0 {
group_columns.push("chain_id");
} else {
@ -127,15 +147,14 @@ pub async fn query_user_stats<'a>(
info!("Got this far 10");
let filter_field = match stat_response_type {
// StatType::Aggregated => f!(r#"|> filter(fn: (r) => r["_field"] == "frontend_requests")"#),
// Let's show all endpoints in a detailed stats
// StatType::Aggregated => "".to_string(), // f!(r#"|> filter(fn: (r) => r["_field"] == "frontend_requests")"#),
StatType::Aggregated => {
f!(r#"|> filter(fn: (r) => r["_field"] == "{stats_column}")"#)
},
// TODO: Detailed should still filter it, but just "group-by" method (call it once per each method ...
// Or maybe it shouldn't filter it ...
StatType::Detailed => "".to_string(),
StatType::Detailed => {
"".to_string()
},
};
info!("Query start and stop are: {:?} {:?}", query_start, query_stop);
@ -172,33 +191,8 @@ pub async fn query_user_stats<'a>(
{group}
|> aggregateWindow(every: {query_window_seconds}s, fn: mean, createEmpty: false)
|> group()
// |> yield(name: "mean")
"#);
// Also make a query for archived
// let query_archived = f!(r#"
// // from(bucket: "{bucket}")
// |> range(start: {query_start}, stop: {query_stop})
// |> filter(fn: (r) => r["_measurement"] == "{measurement}")
// |> filter(fn: (r) => r["archive_needed"] == true)
// |> aggregateWindow(every: {query_window_seconds}s, fn: count, createEmpty: false)
// |> yield(name: "count")
// "#);
// let query_archived = f!(r#"
// from(bucket: "{bucket}")
// |> range(start: {query_start}, stop: {query_stop})
// |> filter(fn: (r) => r["_measurement"] == "{measurement}")
// |> filter(fn: (r) => r["error_responses"] == true)
// |> aggregateWindow(every: {query_window_seconds}s, fn: count, createEmpty: false)
// |> yield(name: "count")
// "#);
// Also make a query for errors
// TODO: Also make a query thats detailed
info!("Raw query to db is: {:?}", query);
let query = Query::new(query.to_string());
info!("Query to db is: {:?}", query);
@ -209,45 +203,148 @@ pub async fn query_user_stats<'a>(
// info!("Direct response is: {:?}", unparsed);
info!("Got this far 12");
let influx_responses: Vec<AggregatedRpcAccounting> = influxdb_client.query::<AggregatedRpcAccounting>(Some(query)).await?;
info!("Influx responses are {:?}", &influx_responses);
for res in &influx_responses {
info!("Resp is: {:?}", res);
}
// Group by all fields together ..
let datapoints = influx_responses
.into_iter()
.group_by(|x| {
// This looks ugly, revisit later
// x.field.clone()
(x.clone().field.clone(), x.clone().time)
})
.into_iter()
.map(|(group, grouped_items)| {
// Now put all the fields next to each other
// (there will be exactly one field per timestamp, but we want to arrive at a new object)
let mut out = HashMap::new();
// Could also add a timestamp
for x in grouped_items {
info!("Iterating over grouped item {:?}", x);
out.insert(
f!(r#"total_{x.field}"#),
// serde_json::Value::Number(serde_json::Number::from(x.value))
json!(x.value)
);
if !out.contains_key("query_window_timestamp") {
out.insert(
"query_window_timestamp".to_owned(),
// serde_json::Value::Number(x.time.timestamp().into())
json!(x.time.timestamp())
);
}
// Return a different result based on the query
let datapoints = match stat_response_type {
StatType::Aggregated => {
let influx_responses: Vec<AggregatedRpcAccounting> = influxdb_client.query::<AggregatedRpcAccounting>(Some(query)).await?;
info!("Influx responses are {:?}", &influx_responses);
for res in &influx_responses {
info!("Resp is: {:?}", res);
}
json!(out)
}).collect::<Vec<_>>();
// let tmp = influx_responses.into_iter().group_by(|x| {x.time.timestamp()}).into_iter().collect::<Vec<_>>();
// info!("Printing grouped item {}", tmp);
// Group by all fields together ..
// let influx_responses = Vec::new();
// let grouped_items = Vec::new();
// let mut grouped_items = influx_responses
// .into_iter()
// .map(|x| {
// (x.time.clone(), x)
// })
// .into_group_map();
// info!("Grouped items are {:?}", grouped_items);
influx_responses
.into_iter()
.map(|x| {
(x.time.clone(), x)
})
.into_group_map()
.into_iter()
.map(|(group, grouped_items)| {
info!("Group is: {:?}", group);
// Now put all the fields next to each other
// (there will be exactly one field per timestamp, but we want to arrive at a new object)
let mut out = HashMap::new();
// Could also add a timestamp
let mut archive_requests = 0;
let mut error_responses = 0;
out.insert("method".to_owned(), json!("null"));
for x in grouped_items {
info!("Iterating over grouped item {:?}", x);
out.insert(
f!(r#"total_{x.field}"#),
// serde_json::Value::Number(serde_json::Number::from(x.value))
json!(x.value)
);
if !out.contains_key("query_window_timestamp") {
out.insert(
"query_window_timestamp".to_owned(),
// serde_json::Value::Number(x.time.timestamp().into())
json!(x.time.timestamp())
);
}
// Add up to archive requests and error responses
// TODO: Gotta double check if errors & archive is based on frontend requests, or other metrics
if x.field == "frontend_requests" && x.archive_needed {
archive_requests += x.value as i32 // This is the number of requests
}
if x.field == "frontend_requests" && x.error_response {
error_responses += x.value as i32
}
}
out.insert("archive_request".to_owned(), json!(archive_requests));
out.insert("error_response".to_owned(), json!(error_responses));
json!(out)
}).collect::<Vec<_>>()
},
StatType::Detailed => {
let influx_responses: Vec<DetailedRpcAccounting> = influxdb_client.query::<DetailedRpcAccounting>(Some(query)).await?;
info!("Influx responses are {:?}", &influx_responses);
for res in &influx_responses {
info!("Resp is: {:?}", res);
}
// Group by all fields together ..
influx_responses
.into_iter()
.map(|x| {
((x.time.clone(), x.method.clone()), x)
})
.into_group_map()
.into_iter()
.map(|(group, grouped_items)| {
// Now put all the fields next to each other
// (there will be exactly one field per timestamp, but we want to arrive at a new object)
let mut out = HashMap::new();
// Could also add a timestamp
let mut archive_requests = 0;
let mut error_responses = 0;
// Should probably move this outside ... (?)
let method = group.1;
out.insert("method".to_owned(), json!(method));
for x in grouped_items {
info!("Iterating over grouped item {:?}", x);
out.insert(
f!(r#"total_{x.field}"#),
// serde_json::Value::Number(serde_json::Number::from(x.value))
json!(x.value)
);
if !out.contains_key("query_window_timestamp") {
out.insert(
"query_window_timestamp".to_owned(),
// serde_json::Value::Number(x.time.timestamp().into())
json!(x.time.timestamp())
);
}
// Add up to archive requests and error responses
// TODO: Gotta double check if errors & archive is based on frontend requests, or other metrics
if x.field == "frontend_requests" && x.archive_needed {
archive_requests += x.value as i32 // This is the number of requests
}
if x.field == "frontend_requests" && x.error_response {
error_responses += x.value as i32
}
}
out.insert("archive_request".to_owned(), json!(archive_requests));
out.insert("error_response".to_owned(), json!(error_responses));
json!(out)
}).collect::<Vec<_>>()
}
};
// I suppose archive requests could be either gathered by default (then summed up), or retrieved on a second go.
// Same with error responses ..