Merge remote-tracking branch 'origin/main' into devel
This commit is contained in:
commit
7707729f77
@ -134,3 +134,10 @@ curl \
|
|||||||
curl -X GET \
|
curl -X GET \
|
||||||
-H "Authorization: Bearer 01H2D5DN564M4Q2T6PETEZY83Q" \
|
-H "Authorization: Bearer 01H2D5DN564M4Q2T6PETEZY83Q" \
|
||||||
"http://localhost:8544/user/stats/detailed?query_start=1686236378&query_window_seconds=3600"
|
"http://localhost:8544/user/stats/detailed?query_start=1686236378&query_window_seconds=3600"
|
||||||
|
|
||||||
|
curl -X GET \
|
||||||
|
-H "Authorization: Bearer 01H2D5DN564M4Q2T6PETEZY83Q" \
|
||||||
|
"http://localhost:8544/user/stats/aggregate?query_start=1686236378&query_window_seconds=3600"
|
||||||
|
|
||||||
|
curl -X GET \
|
||||||
|
"http://localhost:8544/user/stats/aggregate?query_start=1686772800&query_window_seconds=3600"
|
@ -169,9 +169,28 @@ pub async fn query_user_stats<'a>(
|
|||||||
trace!("Filters are: {:?}", filter_chain_id); // filter_field
|
trace!("Filters are: {:?}", filter_chain_id); // filter_field
|
||||||
trace!("window seconds are: {:?}", query_window_seconds);
|
trace!("window seconds are: {:?}", query_window_seconds);
|
||||||
|
|
||||||
let drop_method = match stat_response_type {
|
let group_keys = match stat_response_type {
|
||||||
StatType::Aggregated => f!(r#"|> drop(columns: ["method"])"#),
|
StatType::Aggregated => {
|
||||||
StatType::Detailed => "".to_string(),
|
r#"[
|
||||||
|
"_field",
|
||||||
|
"_measurement",
|
||||||
|
"archive_needed",
|
||||||
|
"chain_id",
|
||||||
|
"error_response",
|
||||||
|
"rpc_secret_key_id",
|
||||||
|
]"#
|
||||||
|
}
|
||||||
|
StatType::Detailed => {
|
||||||
|
r#"[
|
||||||
|
"_field",
|
||||||
|
"_measurement",
|
||||||
|
"archive_needed",
|
||||||
|
"chain_id",
|
||||||
|
"error_response",
|
||||||
|
"method",
|
||||||
|
"rpc_secret_key_id",
|
||||||
|
]"#
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let join_candidates = f!(
|
let join_candidates = f!(
|
||||||
@ -184,35 +203,58 @@ pub async fn query_user_stats<'a>(
|
|||||||
]
|
]
|
||||||
);
|
);
|
||||||
|
|
||||||
let query = f!(r#"
|
let query;
|
||||||
base = () => from(bucket: "{bucket}")
|
if stat_response_type == StatType::Detailed
|
||||||
|> range(start: {query_start}, stop: {query_stop})
|
|| (stat_response_type == StatType::Aggregated && user_id != 0)
|
||||||
{drop_method}
|
{
|
||||||
{rpc_key_filter}
|
query = f!(r#"
|
||||||
{filter_chain_id}
|
base = () => from(bucket: "{bucket}")
|
||||||
|> filter(fn: (r) => r._measurement == "{measurement}")
|
|> range(start: {query_start}, stop: {query_stop})
|
||||||
|
{rpc_key_filter}
|
||||||
|
{filter_chain_id}
|
||||||
|
|> filter(fn: (r) => r._measurement == "{measurement}")
|
||||||
|
|
||||||
|
cumsum = base()
|
||||||
|
|> filter(fn: (r) => r._field == "backend_requests" or r._field == "cache_hits" or r._field == "cache_misses" or r._field == "frontend_requests" or r._field == "no_servers" or r._field == "sum_credits_used" or r._field == "sum_request_bytes" or r._field == "sum_response_bytes" or r._field == "sum_response_millis")
|
||||||
|
|> group(columns: {group_keys})
|
||||||
|
|> aggregateWindow(every: {query_window_seconds}s, fn: sum, createEmpty: false)
|
||||||
|
|> drop(columns: ["_start", "_stop"])
|
||||||
|
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|
||||||
|
|> group()
|
||||||
|
|
||||||
|
balance = base()
|
||||||
|
|> filter(fn: (r) => r["_field"] == "balance")
|
||||||
|
|> group(columns: ["_field", "_measurement", "chain_id"])
|
||||||
|
|> aggregateWindow(every: {query_window_seconds}s, fn: mean, createEmpty: false)
|
||||||
|
|> drop(columns: ["_start", "_stop"])
|
||||||
|
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|
||||||
|
|> group()
|
||||||
|
|
||||||
cumsum = base()
|
join(
|
||||||
|> filter(fn: (r) => r._field == "backend_requests" or r._field == "cache_hits" or r._field == "cache_misses" or r._field == "frontend_requests" or r._field == "no_servers" or r._field == "sum_credits_used" or r._field == "sum_request_bytes" or r._field == "sum_response_bytes" or r._field == "sum_response_millis")
|
tables: {{cumsum, balance}},
|
||||||
|> group(columns: ["_field", "_measurement", "archive_needed", "chain_id", "error_response", "method", "rpc_secret_key_id"])
|
on: {join_candidates}
|
||||||
|> aggregateWindow(every: {query_window_seconds}s, fn: sum, createEmpty: false)
|
)
|
||||||
|> drop(columns: ["_start", "_stop"])
|
"#);
|
||||||
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|
} else if stat_response_type == StatType::Aggregated && user_id == 0 {
|
||||||
|> group()
|
query = f!(r#"
|
||||||
|
from(bucket: "{bucket}")
|
||||||
balance = base()
|
|> range(start: {query_start}, stop: {query_stop})
|
||||||
|> filter(fn: (r) => r["_field"] == "balance")
|
{filter_chain_id}
|
||||||
|> group(columns: ["_field", "_measurement", "chain_id"])
|
|> filter(fn: (r) => r._measurement == "{measurement}")
|
||||||
|> aggregateWindow(every: {query_window_seconds}s, fn: mean, createEmpty: false)
|
|> filter(fn: (r) => r._field != "balance")
|
||||||
|> drop(columns: ["_start", "_stop"])
|
|> group(columns: {group_keys})
|
||||||
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|
|> aggregateWindow(every: {query_window_seconds}s, fn: sum, createEmpty: false)
|
||||||
|> group()
|
|> drop(columns: ["_start", "_stop"])
|
||||||
|
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|
||||||
|
|> group()
|
||||||
|
"#);
|
||||||
|
} else {
|
||||||
|
// In this something with our logic is wrong
|
||||||
|
return Err(Web3ProxyError::BadResponse(
|
||||||
|
"This edge-case should never occur".into(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
join(
|
|
||||||
tables: {{cumsum, balance}},
|
|
||||||
on: {join_candidates}
|
|
||||||
)
|
|
||||||
"#);
|
|
||||||
|
|
||||||
debug!("Raw query to db is: {:#?}", query);
|
debug!("Raw query to db is: {:#?}", query);
|
||||||
let query = Query::new(query.to_string());
|
let query = Query::new(query.to_string());
|
||||||
|
Loading…
Reference in New Issue
Block a user