will modify query so we can make queries more easily
This commit is contained in:
parent
9417961d78
commit
65ca628cc7
@ -4,4 +4,4 @@
|
|||||||
# https://github.com/INFURA/versus
|
# https://github.com/INFURA/versus
|
||||||
# ./ethspam | ./versus --stop-after 100 "http://localhost:8544/" # Pipe into the endpoint ..., add a bearer token and all that
|
# ./ethspam | ./versus --stop-after 100 "http://localhost:8544/" # Pipe into the endpoint ..., add a bearer token and all that
|
||||||
|
|
||||||
./ethspam http://127.0.0.1:8544 | ./versus --stop-after 100000000 http://localhost:8544
|
./ethspam http://127.0.0.1:8544 | ./versus --stop-after 100 http://localhost:8544
|
||||||
|
@ -4,4 +4,4 @@
|
|||||||
|
|
||||||
|
|
||||||
curl -X GET \
|
curl -X GET \
|
||||||
"http://localhost:8544/user/stats/aggregate?query_start=1678463411&query_window_seconds=1000000"
|
"http://localhost:8544/user/stats/aggregate?query_start=1678780033&query_window_seconds=1000"
|
||||||
|
@ -22,6 +22,7 @@ use itertools::Itertools;
|
|||||||
use log::info;
|
use log::info;
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
use serde_json::{json};
|
use serde_json::{json};
|
||||||
|
use entities::{rpc_accounting, rpc_key};
|
||||||
use crate::http_params::get_stats_column_from_params;
|
use crate::http_params::get_stats_column_from_params;
|
||||||
|
|
||||||
// TODO: include chain_id, method, and some other things in this struct
|
// TODO: include chain_id, method, and some other things in this struct
|
||||||
@ -30,8 +31,15 @@ pub struct AggregatedRpcAccounting {
|
|||||||
field: String,
|
field: String,
|
||||||
value: f64,
|
value: f64,
|
||||||
time: DateTime<FixedOffset>,
|
time: DateTime<FixedOffset>,
|
||||||
|
// error_response: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// pub struct AggregatedRpcAccountingErrors {
|
||||||
|
// field: String,
|
||||||
|
// time: DateTime<FixedOffset>,
|
||||||
|
// archive_needed: f64
|
||||||
|
// }
|
||||||
|
|
||||||
pub async fn query_user_stats<'a>(
|
pub async fn query_user_stats<'a>(
|
||||||
app: &'a Web3ProxyApp,
|
app: &'a Web3ProxyApp,
|
||||||
bearer: Option<TypedHeader<Authorization<Bearer>>>,
|
bearer: Option<TypedHeader<Authorization<Bearer>>>,
|
||||||
@ -98,6 +106,7 @@ pub async fn query_user_stats<'a>(
|
|||||||
info!("Bucket is {:?}", bucket);
|
info!("Bucket is {:?}", bucket);
|
||||||
|
|
||||||
info!("Got this far 7");
|
info!("Got this far 7");
|
||||||
|
// , "archive_needed", "error_response"
|
||||||
let mut group_columns = vec!["_measurement", "_field"];
|
let mut group_columns = vec!["_measurement", "_field"];
|
||||||
let mut filter_chain_id = "".to_string();
|
let mut filter_chain_id = "".to_string();
|
||||||
|
|
||||||
@ -125,6 +134,7 @@ pub async fn query_user_stats<'a>(
|
|||||||
f!(r#"|> filter(fn: (r) => r["_field"] == "{stats_column}")"#)
|
f!(r#"|> filter(fn: (r) => r["_field"] == "{stats_column}")"#)
|
||||||
},
|
},
|
||||||
// TODO: Detailed should still filter it, but just "group-by" method (call it once per each method ...
|
// TODO: Detailed should still filter it, but just "group-by" method (call it once per each method ...
|
||||||
|
// Or maybe it shouldn't filter it ...
|
||||||
StatType::Detailed => "".to_string(),
|
StatType::Detailed => "".to_string(),
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -160,10 +170,35 @@ pub async fn query_user_stats<'a>(
|
|||||||
{filter_field}
|
{filter_field}
|
||||||
{filter_chain_id}
|
{filter_chain_id}
|
||||||
{group}
|
{group}
|
||||||
|> aggregateWindow(every: {query_window_seconds}s, fn: sum, createEmpty: false)
|
|> aggregateWindow(every: {query_window_seconds}s, fn: mean, createEmpty: false)
|
||||||
|> yield(name: "sum")
|
|> group()
|
||||||
|
// |> yield(name: "mean")
|
||||||
"#);
|
"#);
|
||||||
|
|
||||||
|
// Also make a query for archived
|
||||||
|
// let query_archived = f!(r#"
|
||||||
|
// // from(bucket: "{bucket}")
|
||||||
|
// |> range(start: {query_start}, stop: {query_stop})
|
||||||
|
// |> filter(fn: (r) => r["_measurement"] == "{measurement}")
|
||||||
|
// |> filter(fn: (r) => r["archive_needed"] == true)
|
||||||
|
// |> aggregateWindow(every: {query_window_seconds}s, fn: count, createEmpty: false)
|
||||||
|
// |> yield(name: "count")
|
||||||
|
// "#);
|
||||||
|
// let query_archived = f!(r#"
|
||||||
|
// from(bucket: "{bucket}")
|
||||||
|
// |> range(start: {query_start}, stop: {query_stop})
|
||||||
|
// |> filter(fn: (r) => r["_measurement"] == "{measurement}")
|
||||||
|
// |> filter(fn: (r) => r["error_responses"] == true)
|
||||||
|
// |> aggregateWindow(every: {query_window_seconds}s, fn: count, createEmpty: false)
|
||||||
|
// |> yield(name: "count")
|
||||||
|
// "#);
|
||||||
|
|
||||||
|
// Also make a query for errors
|
||||||
|
|
||||||
|
|
||||||
|
// TODO: Also make a query thats detailed
|
||||||
|
|
||||||
|
|
||||||
info!("Raw query to db is: {:?}", query);
|
info!("Raw query to db is: {:?}", query);
|
||||||
let query = Query::new(query.to_string());
|
let query = Query::new(query.to_string());
|
||||||
info!("Query to db is: {:?}", query);
|
info!("Query to db is: {:?}", query);
|
||||||
@ -174,7 +209,7 @@ pub async fn query_user_stats<'a>(
|
|||||||
// info!("Direct response is: {:?}", unparsed);
|
// info!("Direct response is: {:?}", unparsed);
|
||||||
info!("Got this far 12");
|
info!("Got this far 12");
|
||||||
|
|
||||||
let influx_responses: Vec<AggregatedRpcAccounting> = influxdb_client.query(Some(query)).await?;
|
let influx_responses: Vec<AggregatedRpcAccounting> = influxdb_client.query::<AggregatedRpcAccounting>(Some(query)).await?;
|
||||||
info!("Influx responses are {:?}", &influx_responses);
|
info!("Influx responses are {:?}", &influx_responses);
|
||||||
for res in &influx_responses {
|
for res in &influx_responses {
|
||||||
info!("Resp is: {:?}", res);
|
info!("Resp is: {:?}", res);
|
||||||
@ -186,7 +221,7 @@ pub async fn query_user_stats<'a>(
|
|||||||
.group_by(|x| {
|
.group_by(|x| {
|
||||||
// This looks ugly, revisit later
|
// This looks ugly, revisit later
|
||||||
// x.field.clone()
|
// x.field.clone()
|
||||||
(x.clone().time)
|
(x.clone().field.clone(), x.clone().time)
|
||||||
})
|
})
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|(group, grouped_items)| {
|
.map(|(group, grouped_items)| {
|
||||||
@ -196,6 +231,7 @@ pub async fn query_user_stats<'a>(
|
|||||||
// Could also add a timestamp
|
// Could also add a timestamp
|
||||||
|
|
||||||
for x in grouped_items {
|
for x in grouped_items {
|
||||||
|
info!("Iterating over grouped item {:?}", x);
|
||||||
out.insert(
|
out.insert(
|
||||||
f!(r#"total_{x.field}"#),
|
f!(r#"total_{x.field}"#),
|
||||||
// serde_json::Value::Number(serde_json::Number::from(x.value))
|
// serde_json::Value::Number(serde_json::Number::from(x.value))
|
||||||
@ -215,21 +251,36 @@ pub async fn query_user_stats<'a>(
|
|||||||
|
|
||||||
// I suppose archive requests could be either gathered by default (then summed up), or retrieved on a second go.
|
// I suppose archive requests could be either gathered by default (then summed up), or retrieved on a second go.
|
||||||
// Same with error responses ..
|
// Same with error responses ..
|
||||||
let mut out = HashMap::new();
|
let mut response_body = HashMap::new();
|
||||||
out.insert("num_items", serde_json::Value::Number(datapoints.len().into()));
|
response_body.insert("num_items", serde_json::Value::Number(datapoints.len().into()));
|
||||||
out.insert("result", serde_json::Value::Array(datapoints));
|
response_body.insert("result", serde_json::Value::Array(datapoints));
|
||||||
out.insert("query_window_seconds", serde_json::Value::Number(query_window_seconds.into()));
|
response_body.insert("query_window_seconds", serde_json::Value::Number(query_window_seconds.into()));
|
||||||
out.insert("query_start", serde_json::Value::Number(query_start.into()));
|
response_body.insert("query_start", serde_json::Value::Number(query_start.into()));
|
||||||
out.insert("chain_id", serde_json::Value::Number(chain_id.into()));
|
response_body.insert("chain_id", serde_json::Value::Number(chain_id.into()));
|
||||||
|
|
||||||
info!("Got this far 13 {:?}", out);
|
if user_id == 0 {
|
||||||
let out = Json(json!(out)).into_response();
|
// 0 means everyone. don't filter on user
|
||||||
|
} else {
|
||||||
|
// q = q.left_join(rpc_key::Entity);
|
||||||
|
// condition = condition.add(rpc_key::Column::UserId.eq(user_id));
|
||||||
|
response_body.insert("user_id", serde_json::Value::Number(user_id.into()));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Also optionally add the rpc_key_id:
|
||||||
|
if let Some(rpc_key_id) = params.get("rpc_key_id") {
|
||||||
|
let rpc_key_id = rpc_key_id.parse::<u64>().map_err(|e| {
|
||||||
|
FrontendErrorResponse::BadRequest("Unable to parse rpc_key_id".to_string())
|
||||||
|
})?;
|
||||||
|
response_body.insert("rpc_key_id", serde_json::Value::Number(rpc_key_id.into()));
|
||||||
|
}
|
||||||
|
|
||||||
|
info!("Got this far 13 {:?}", response_body);
|
||||||
|
let response = Json(json!(response_body)).into_response();
|
||||||
// Add the requests back into out
|
// Add the requests back into out
|
||||||
|
|
||||||
info!("Got this far 14 {:?}", out);
|
info!("Got this far 14 {:?}", response);
|
||||||
|
|
||||||
|
|
||||||
// TODO: Now impplement the proper response type
|
// TODO: Now impplement the proper response type
|
||||||
|
|
||||||
Ok(out)
|
Ok(response)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user