increment a wrapping counter every time we save stats

This commit is contained in:
Bryan Stitt 2023-07-21 22:47:22 -07:00
parent 39141c1e52
commit 252b3001c1
2 changed files with 36 additions and 20 deletions

@ -77,7 +77,7 @@ pub struct RpcQueryStats {
#[derive(Clone, Debug, From, Hash, PartialEq, Eq)]
pub struct RpcQueryKey {
/// unix epoch time.
/// unix epoch time in seconds.
/// for the time series db, this is (close to) the time that the response was sent.
/// for the account database, this is rounded to the week.
response_timestamp: i64,
@ -484,7 +484,7 @@ impl BufferedRpcQueryStats {
chain_id: u64,
key: RpcQueryKey,
instance: &str,
now: &str,
uniq: i64,
) -> anyhow::Result<DataPoint> {
let mut builder = DataPoint::builder(measurement)
.tag("archive_needed", key.archive_needed.to_string())
@ -493,8 +493,6 @@ impl BufferedRpcQueryStats {
.tag("instance", instance)
.tag("method", key.method)
.tag("user_error_response", key.user_error_response.to_string())
.tag("save_time", now)
.timestamp(key.response_timestamp)
.field("backend_requests", self.backend_requests as i64)
.field("cache_hits", self.cache_hits as i64)
.field("cache_misses", self.cache_misses as i64)
@ -527,6 +525,11 @@ impl BufferedRpcQueryStats {
builder = builder.tag("rpc_secret_key_id", rpc_secret_key_id.to_string());
}
// [add "uniq" to the timstamp](https://docs.influxdata.com/influxdb/v2.0/write-data/best-practices/duplicate-points/#increment-the-timestamp)
// i64 timestamps get us to Friday, April 11, 2262
let timestamp_ns: i64 = key.response_timestamp * 1_000_000_000 + uniq % 1_000_000_000;
builder = builder.timestamp(timestamp_ns);
let point = builder.build()?;
trace!("Datapoint saving to Influx is {:?}", point);

@ -8,7 +8,6 @@ use crate::stats::RpcQueryStats;
use derive_more::From;
use futures::stream;
use hashbrown::HashMap;
use influxdb2::api::write::TimestampPrecision;
use migration::sea_orm::prelude::Decimal;
use std::time::Duration;
use tokio::sync::{broadcast, mpsc, oneshot};
@ -54,8 +53,9 @@ pub struct StatBuffer {
instance_hash: String,
opt_in_timeseries_buffer: HashMap<RpcQueryKey, BufferedRpcQueryStats>,
rpc_secret_key_cache: RpcSecretKeyCache,
timestamp_precision: TimestampPrecision,
tsdb_save_interval_seconds: u32,
tsdb_window: i64,
num_tsdb_windows: i64,
user_balance_cache: UserBalanceCache,
_flush_sender: mpsc::Sender<oneshot::Sender<FlushedStats>>,
@ -82,9 +82,19 @@ impl StatBuffer {
let (stat_sender, stat_receiver) = mpsc::unbounded_channel();
let timestamp_precision = TimestampPrecision::Seconds;
// TODO: this has no chance of being re-used. we will eventually hit issues with cardinatility
// would be best to use `private_ip:frontend_port`. then it has a chance of being re-used (but never by 2 instances at the same time)
// even better would be `aws_auto_scaling_group_id:frontend_port`
let instance = Ulid::new().to_string();
let instance_hash = Ulid::new().to_string();
// TODO: get the frontend request timeout and add a minute buffer instead of hard coding `(5 + 1)`
let num_tsdb_windows = ((5 + 1) * 60) / tsdb_save_interval_seconds as i64;
// make sure we have at least 4 windows
let num_tsdb_windows = num_tsdb_windows.max(4);
// start tsdb_window at num_tsdb_windows because the first run increments it at the start and wraps it back to 0
let tsdb_window = num_tsdb_windows;
let mut new = Self {
accounting_db_buffer: Default::default(),
@ -94,11 +104,12 @@ impl StatBuffer {
global_timeseries_buffer: Default::default(),
influxdb_bucket,
influxdb_client,
instance_hash,
instance_hash: instance,
num_tsdb_windows,
opt_in_timeseries_buffer: Default::default(),
rpc_secret_key_cache,
timestamp_precision,
tsdb_save_interval_seconds,
tsdb_window,
user_balance_cache,
_flush_sender: flush_sender,
@ -149,7 +160,7 @@ impl StatBuffer {
}
_ = tsdb_save_interval.tick() => {
trace!("TSDB save internal tick");
let (count, new_frontend_requests) = self.save_tsdb_stats().await;
let (count, new_frontend_requests) = self.save_tsdb_stats().await;
if count > 0 {
tsdb_frontend_requests += new_frontend_requests;
debug!("Saved {} stats for {} requests to the tsdb", count, new_frontend_requests);
@ -401,6 +412,14 @@ impl StatBuffer {
let mut frontend_requests = 0;
if let Some(influxdb_client) = self.influxdb_client.as_ref() {
// every time we save, we increment the ts_db_window. this is used to ensure that stats don't overwrite others because the keys match
// this has to be done carefully or cardinality becomes a problem!
// https://docs.influxdata.com/influxdb/v2.0/write-data/best-practices/duplicate-points/
self.tsdb_window += 1;
if self.tsdb_window > self.num_tsdb_windows {
self.tsdb_window = 0;
}
let influxdb_bucket = self
.influxdb_bucket
.as_ref()
@ -409,8 +428,6 @@ impl StatBuffer {
// TODO: use stream::iter properly to avoid allocating this Vec
let mut points = vec![];
let now = chrono::Utc::now().to_rfc3339();
for (key, stat) in self.global_timeseries_buffer.drain() {
// TODO: i don't like passing key (which came from the stat) to the function on the stat. but it works for now
let new_frontend_requests = stat.frontend_requests;
@ -421,7 +438,7 @@ impl StatBuffer {
self.chain_id,
key,
&self.instance_hash,
&now,
self.tsdb_window,
)
.await
{
@ -444,7 +461,7 @@ impl StatBuffer {
self.chain_id,
key,
&self.instance_hash,
&now,
self.tsdb_window,
)
.await
{
@ -476,11 +493,7 @@ impl StatBuffer {
num_left -= batch_size;
if let Err(err) = influxdb_client
.write_with_precision(
influxdb_bucket,
stream::iter(points),
self.timestamp_precision,
)
.write(influxdb_bucket, stream::iter(points))
.await
{
// TODO: if this errors, we throw away some of the pending stats! retry any failures! (but not successes. it can have partial successes!)