From ca41cb5c9e38e4e93d4b7815d76ad98076254be1 Mon Sep 17 00:00:00 2001 From: yenicelik Date: Tue, 14 Mar 2023 20:08:16 +0100 Subject: [PATCH] will push this for short review --- scripts/get-stats-aggregated.sh | 3 + web3_proxy/src/stats/influxdb_queries.rs | 237 ++++++++++++++++------- 2 files changed, 170 insertions(+), 70 deletions(-) diff --git a/scripts/get-stats-aggregated.sh b/scripts/get-stats-aggregated.sh index 85b5b7f2..c1811988 100644 --- a/scripts/get-stats-aggregated.sh +++ b/scripts/get-stats-aggregated.sh @@ -5,3 +5,6 @@ curl -X GET \ "http://localhost:8544/user/stats/aggregate?query_start=1678780033&query_window_seconds=1000" + +#curl -X GET \ +#"http://localhost:8544/user/stats/detailed?query_start=1678780033&query_window_seconds=1000" diff --git a/web3_proxy/src/stats/influxdb_queries.rs b/web3_proxy/src/stats/influxdb_queries.rs index 5fba8d0d..c6ac31de 100644 --- a/web3_proxy/src/stats/influxdb_queries.rs +++ b/web3_proxy/src/stats/influxdb_queries.rs @@ -25,13 +25,25 @@ use serde_json::{json}; use entities::{rpc_accounting, rpc_key}; use crate::http_params::get_stats_column_from_params; -// TODO: include chain_id, method, and some other things in this struct #[derive(Debug, Default, FromDataPoint, Serialize)] pub struct AggregatedRpcAccounting { + chain_id: u64, field: String, value: f64, time: DateTime, - // error_response: bool, + error_response: bool, + archive_needed: bool, +} + +#[derive(Debug, Default, FromDataPoint, Serialize)] +pub struct DetailedRpcAccounting { + chain_id: u64, + field: String, + value: f64, + time: DateTime, + error_response: bool, + archive_needed: bool, + method: String, } // pub struct AggregatedRpcAccountingErrors { @@ -107,9 +119,17 @@ pub async fn query_user_stats<'a>( info!("Got this far 7"); // , "archive_needed", "error_response" - let mut group_columns = vec!["_measurement", "_field"]; + let mut group_columns = vec!["chain_id", "_measurement", "_field", "_measurement", "error_response", "archive_needed"]; let mut filter_chain_id = "".to_string(); + // Add to group columns the method, if we want the detailed view as well + match stat_response_type { + StatType::Detailed => { + group_columns.push("method"); + }, + _ => {} + } + if chain_id == 0 { group_columns.push("chain_id"); } else { @@ -127,15 +147,14 @@ pub async fn query_user_stats<'a>( info!("Got this far 10"); let filter_field = match stat_response_type { - // StatType::Aggregated => f!(r#"|> filter(fn: (r) => r["_field"] == "frontend_requests")"#), - // Let's show all endpoints in a detailed stats - // StatType::Aggregated => "".to_string(), // f!(r#"|> filter(fn: (r) => r["_field"] == "frontend_requests")"#), StatType::Aggregated => { f!(r#"|> filter(fn: (r) => r["_field"] == "{stats_column}")"#) }, // TODO: Detailed should still filter it, but just "group-by" method (call it once per each method ... // Or maybe it shouldn't filter it ... - StatType::Detailed => "".to_string(), + StatType::Detailed => { + "".to_string() + }, }; info!("Query start and stop are: {:?} {:?}", query_start, query_stop); @@ -172,33 +191,8 @@ pub async fn query_user_stats<'a>( {group} |> aggregateWindow(every: {query_window_seconds}s, fn: mean, createEmpty: false) |> group() - // |> yield(name: "mean") "#); - // Also make a query for archived - // let query_archived = f!(r#" - // // from(bucket: "{bucket}") - // |> range(start: {query_start}, stop: {query_stop}) - // |> filter(fn: (r) => r["_measurement"] == "{measurement}") - // |> filter(fn: (r) => r["archive_needed"] == true) - // |> aggregateWindow(every: {query_window_seconds}s, fn: count, createEmpty: false) - // |> yield(name: "count") - // "#); - // let query_archived = f!(r#" - // from(bucket: "{bucket}") - // |> range(start: {query_start}, stop: {query_stop}) - // |> filter(fn: (r) => r["_measurement"] == "{measurement}") - // |> filter(fn: (r) => r["error_responses"] == true) - // |> aggregateWindow(every: {query_window_seconds}s, fn: count, createEmpty: false) - // |> yield(name: "count") - // "#); - - // Also make a query for errors - - - // TODO: Also make a query thats detailed - - info!("Raw query to db is: {:?}", query); let query = Query::new(query.to_string()); info!("Query to db is: {:?}", query); @@ -209,45 +203,148 @@ pub async fn query_user_stats<'a>( // info!("Direct response is: {:?}", unparsed); info!("Got this far 12"); - let influx_responses: Vec = influxdb_client.query::(Some(query)).await?; - info!("Influx responses are {:?}", &influx_responses); - for res in &influx_responses { - info!("Resp is: {:?}", res); - } - - // Group by all fields together .. - let datapoints = influx_responses - .into_iter() - .group_by(|x| { - // This looks ugly, revisit later - // x.field.clone() - (x.clone().field.clone(), x.clone().time) - }) - .into_iter() - .map(|(group, grouped_items)| { - // Now put all the fields next to each other - // (there will be exactly one field per timestamp, but we want to arrive at a new object) - let mut out = HashMap::new(); - // Could also add a timestamp - - for x in grouped_items { - info!("Iterating over grouped item {:?}", x); - out.insert( - f!(r#"total_{x.field}"#), - // serde_json::Value::Number(serde_json::Number::from(x.value)) - json!(x.value) - ); - - if !out.contains_key("query_window_timestamp") { - out.insert( - "query_window_timestamp".to_owned(), - // serde_json::Value::Number(x.time.timestamp().into()) - json!(x.time.timestamp()) - ); - } + // Return a different result based on the query + let datapoints = match stat_response_type { + StatType::Aggregated => { + let influx_responses: Vec = influxdb_client.query::(Some(query)).await?; + info!("Influx responses are {:?}", &influx_responses); + for res in &influx_responses { + info!("Resp is: {:?}", res); } - json!(out) - }).collect::>(); + + // let tmp = influx_responses.into_iter().group_by(|x| {x.time.timestamp()}).into_iter().collect::>(); + // info!("Printing grouped item {}", tmp); + + // Group by all fields together .. + // let influx_responses = Vec::new(); + // let grouped_items = Vec::new(); + + // let mut grouped_items = influx_responses + // .into_iter() + // .map(|x| { + // (x.time.clone(), x) + // }) + // .into_group_map(); + // info!("Grouped items are {:?}", grouped_items); + + influx_responses + .into_iter() + .map(|x| { + (x.time.clone(), x) + }) + .into_group_map() + .into_iter() + .map(|(group, grouped_items)| { + + info!("Group is: {:?}", group); + + // Now put all the fields next to each other + // (there will be exactly one field per timestamp, but we want to arrive at a new object) + let mut out = HashMap::new(); + // Could also add a timestamp + + let mut archive_requests = 0; + let mut error_responses = 0; + + out.insert("method".to_owned(), json!("null")); + + for x in grouped_items { + info!("Iterating over grouped item {:?}", x); + out.insert( + f!(r#"total_{x.field}"#), + // serde_json::Value::Number(serde_json::Number::from(x.value)) + json!(x.value) + ); + + if !out.contains_key("query_window_timestamp") { + out.insert( + "query_window_timestamp".to_owned(), + // serde_json::Value::Number(x.time.timestamp().into()) + json!(x.time.timestamp()) + ); + } + + // Add up to archive requests and error responses + // TODO: Gotta double check if errors & archive is based on frontend requests, or other metrics + if x.field == "frontend_requests" && x.archive_needed { + archive_requests += x.value as i32 // This is the number of requests + } + if x.field == "frontend_requests" && x.error_response { + error_responses += x.value as i32 + } + + } + + out.insert("archive_request".to_owned(), json!(archive_requests)); + out.insert("error_response".to_owned(), json!(error_responses)); + + json!(out) + }).collect::>() + + + }, + StatType::Detailed => { + let influx_responses: Vec = influxdb_client.query::(Some(query)).await?; + info!("Influx responses are {:?}", &influx_responses); + for res in &influx_responses { + info!("Resp is: {:?}", res); + } + + // Group by all fields together .. + influx_responses + .into_iter() + .map(|x| { + ((x.time.clone(), x.method.clone()), x) + }) + .into_group_map() + .into_iter() + .map(|(group, grouped_items)| { + // Now put all the fields next to each other + // (there will be exactly one field per timestamp, but we want to arrive at a new object) + let mut out = HashMap::new(); + // Could also add a timestamp + + let mut archive_requests = 0; + let mut error_responses = 0; + + // Should probably move this outside ... (?) + let method = group.1; + out.insert("method".to_owned(), json!(method)); + + for x in grouped_items { + info!("Iterating over grouped item {:?}", x); + out.insert( + f!(r#"total_{x.field}"#), + // serde_json::Value::Number(serde_json::Number::from(x.value)) + json!(x.value) + ); + + if !out.contains_key("query_window_timestamp") { + out.insert( + "query_window_timestamp".to_owned(), + // serde_json::Value::Number(x.time.timestamp().into()) + json!(x.time.timestamp()) + ); + } + + // Add up to archive requests and error responses + // TODO: Gotta double check if errors & archive is based on frontend requests, or other metrics + if x.field == "frontend_requests" && x.archive_needed { + archive_requests += x.value as i32 // This is the number of requests + } + if x.field == "frontend_requests" && x.error_response { + error_responses += x.value as i32 + } + + } + + out.insert("archive_request".to_owned(), json!(archive_requests)); + out.insert("error_response".to_owned(), json!(error_responses)); + + json!(out) + }).collect::>() + } + }; // I suppose archive requests could be either gathered by default (then summed up), or retrieved on a second go. // Same with error responses ..