diff --git a/web3_proxy/src/stats/influxdb_queries.rs b/web3_proxy/src/stats/influxdb_queries.rs index 29747c65..368df1f1 100644 --- a/web3_proxy/src/stats/influxdb_queries.rs +++ b/web3_proxy/src/stats/influxdb_queries.rs @@ -184,7 +184,8 @@ pub async fn query_user_stats<'a>( {filter_chain_id} {drop_method} - cumsum = base + // cumsum = base + base |> aggregateWindow(every: {query_window_seconds}s, fn: sum, createEmpty: false) |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value") |> drop(columns: ["balance"]) @@ -199,19 +200,19 @@ pub async fn query_user_stats<'a>( |> group() |> sort(columns: ["_time", "_measurement", "chain_id", "method", "rpc_secret_key_id"], desc: true) - balance = base - |> toFloat() - |> aggregateWindow(every: {query_window_seconds}s, fn: mean, createEmpty: false) - |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value") - |> group(columns: ["_time", "_measurement", "chain_id", "method", "rpc_secret_key_id"]) - |> mean(column: "balance") - |> group() - |> sort(columns: ["_time", "_measurement", "chain_id", "method", "rpc_secret_key_id"], desc: true) + // balance = base + // |> toFloat() + // |> aggregateWindow(every: {query_window_seconds}s, fn: mean, createEmpty: false) + // |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value") + // |> group(columns: ["_time", "_measurement", "chain_id", "method", "rpc_secret_key_id"]) + // |> mean(column: "balance") + // |> group() + // |> sort(columns: ["_time", "_measurement", "chain_id", "method", "rpc_secret_key_id"], desc: true) - join( - tables: {{cumsum, balance}}, - on: {join_candidates} - ) + // join( + // tables: {{cumsum, balance}}, + // on: {join_candidates} + // ) "#); info!("Raw query to db is: {:?}", query); @@ -219,8 +220,10 @@ pub async fn query_user_stats<'a>( info!("Query to db is: {:?}", query); // Make the query and collect all data - let raw_influx_responses: Vec = - influxdb_client.query_raw(Some(query.clone())).await?; + let raw_influx_responses: Vec = influxdb_client + .query_raw(Some(query.clone())) + .await + .context("failed parsing query result into a FluxRecord")?; // Basically rename all items to be "total", // calculate number of "archive_needed" and "error_responses" through their boolean representations ... diff --git a/web3_proxy/src/stats/stat_buffer.rs b/web3_proxy/src/stats/stat_buffer.rs index 5de91e8b..d7a5dea1 100644 --- a/web3_proxy/src/stats/stat_buffer.rs +++ b/web3_proxy/src/stats/stat_buffer.rs @@ -4,7 +4,7 @@ use derive_more::From; use futures::stream; use hashbrown::HashMap; use influxdb2::api::write::TimestampPrecision; -use log::{error, info, trace}; +use log::{debug, error, info, trace}; use migration::sea_orm::prelude::Decimal; use migration::sea_orm::DatabaseConnection; use std::time::Duration; @@ -249,17 +249,24 @@ impl StatBuffer { while num_left > 0 { let batch_size = num_left.min(max_batch_size); + // TODO: there has to be a better way to chunk this up. chunk on the stream with the stream being an iter? let p = points.split_off(batch_size); num_left -= batch_size; if let Err(err) = influxdb_client - .write_with_precision(bucket, stream::iter(p), self.timestamp_precision) + .write_with_precision( + bucket, + stream::iter(points), + self.timestamp_precision, + ) .await { // TODO: if this errors, we throw away some of the pending stats! we should probably buffer them somewhere to be tried again error!("unable to save {} tsdb stats! err={:?}", batch_size, err); } + + points = p; } } }