disable balance join for now
This commit is contained in:
parent
0f367d9035
commit
24439c5143
@ -184,7 +184,8 @@ pub async fn query_user_stats<'a>(
|
|||||||
{filter_chain_id}
|
{filter_chain_id}
|
||||||
{drop_method}
|
{drop_method}
|
||||||
|
|
||||||
cumsum = base
|
// cumsum = base
|
||||||
|
base
|
||||||
|> aggregateWindow(every: {query_window_seconds}s, fn: sum, createEmpty: false)
|
|> aggregateWindow(every: {query_window_seconds}s, fn: sum, createEmpty: false)
|
||||||
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|
||||||
|> drop(columns: ["balance"])
|
|> drop(columns: ["balance"])
|
||||||
@ -199,19 +200,19 @@ pub async fn query_user_stats<'a>(
|
|||||||
|> group()
|
|> group()
|
||||||
|> sort(columns: ["_time", "_measurement", "chain_id", "method", "rpc_secret_key_id"], desc: true)
|
|> sort(columns: ["_time", "_measurement", "chain_id", "method", "rpc_secret_key_id"], desc: true)
|
||||||
|
|
||||||
balance = base
|
// balance = base
|
||||||
|> toFloat()
|
// |> toFloat()
|
||||||
|> aggregateWindow(every: {query_window_seconds}s, fn: mean, createEmpty: false)
|
// |> aggregateWindow(every: {query_window_seconds}s, fn: mean, createEmpty: false)
|
||||||
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|
// |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|
||||||
|> group(columns: ["_time", "_measurement", "chain_id", "method", "rpc_secret_key_id"])
|
// |> group(columns: ["_time", "_measurement", "chain_id", "method", "rpc_secret_key_id"])
|
||||||
|> mean(column: "balance")
|
// |> mean(column: "balance")
|
||||||
|> group()
|
// |> group()
|
||||||
|> sort(columns: ["_time", "_measurement", "chain_id", "method", "rpc_secret_key_id"], desc: true)
|
// |> sort(columns: ["_time", "_measurement", "chain_id", "method", "rpc_secret_key_id"], desc: true)
|
||||||
|
|
||||||
join(
|
// join(
|
||||||
tables: {{cumsum, balance}},
|
// tables: {{cumsum, balance}},
|
||||||
on: {join_candidates}
|
// on: {join_candidates}
|
||||||
)
|
// )
|
||||||
"#);
|
"#);
|
||||||
|
|
||||||
info!("Raw query to db is: {:?}", query);
|
info!("Raw query to db is: {:?}", query);
|
||||||
@ -219,8 +220,10 @@ pub async fn query_user_stats<'a>(
|
|||||||
info!("Query to db is: {:?}", query);
|
info!("Query to db is: {:?}", query);
|
||||||
|
|
||||||
// Make the query and collect all data
|
// Make the query and collect all data
|
||||||
let raw_influx_responses: Vec<FluxRecord> =
|
let raw_influx_responses: Vec<FluxRecord> = influxdb_client
|
||||||
influxdb_client.query_raw(Some(query.clone())).await?;
|
.query_raw(Some(query.clone()))
|
||||||
|
.await
|
||||||
|
.context("failed parsing query result into a FluxRecord")?;
|
||||||
|
|
||||||
// Basically rename all items to be "total",
|
// Basically rename all items to be "total",
|
||||||
// calculate number of "archive_needed" and "error_responses" through their boolean representations ...
|
// calculate number of "archive_needed" and "error_responses" through their boolean representations ...
|
||||||
|
@ -4,7 +4,7 @@ use derive_more::From;
|
|||||||
use futures::stream;
|
use futures::stream;
|
||||||
use hashbrown::HashMap;
|
use hashbrown::HashMap;
|
||||||
use influxdb2::api::write::TimestampPrecision;
|
use influxdb2::api::write::TimestampPrecision;
|
||||||
use log::{error, info, trace};
|
use log::{debug, error, info, trace};
|
||||||
use migration::sea_orm::prelude::Decimal;
|
use migration::sea_orm::prelude::Decimal;
|
||||||
use migration::sea_orm::DatabaseConnection;
|
use migration::sea_orm::DatabaseConnection;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
@ -249,17 +249,24 @@ impl StatBuffer {
|
|||||||
while num_left > 0 {
|
while num_left > 0 {
|
||||||
let batch_size = num_left.min(max_batch_size);
|
let batch_size = num_left.min(max_batch_size);
|
||||||
|
|
||||||
|
// TODO: there has to be a better way to chunk this up. chunk on the stream with the stream being an iter?
|
||||||
let p = points.split_off(batch_size);
|
let p = points.split_off(batch_size);
|
||||||
|
|
||||||
num_left -= batch_size;
|
num_left -= batch_size;
|
||||||
|
|
||||||
if let Err(err) = influxdb_client
|
if let Err(err) = influxdb_client
|
||||||
.write_with_precision(bucket, stream::iter(p), self.timestamp_precision)
|
.write_with_precision(
|
||||||
|
bucket,
|
||||||
|
stream::iter(points),
|
||||||
|
self.timestamp_precision,
|
||||||
|
)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
// TODO: if this errors, we throw away some of the pending stats! we should probably buffer them somewhere to be tried again
|
// TODO: if this errors, we throw away some of the pending stats! we should probably buffer them somewhere to be tried again
|
||||||
error!("unable to save {} tsdb stats! err={:?}", batch_size, err);
|
error!("unable to save {} tsdb stats! err={:?}", batch_size, err);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
points = p;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user