From 252b3001c140ce1da782b218ebd233fe8e406022 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Fri, 21 Jul 2023 22:47:22 -0700 Subject: [PATCH] increment a wrapping counter every time we save stats --- web3_proxy/src/stats/mod.rs | 11 ++++--- web3_proxy/src/stats/stat_buffer.rs | 45 +++++++++++++++++++---------- 2 files changed, 36 insertions(+), 20 deletions(-) diff --git a/web3_proxy/src/stats/mod.rs b/web3_proxy/src/stats/mod.rs index b852ca2f..2e457873 100644 --- a/web3_proxy/src/stats/mod.rs +++ b/web3_proxy/src/stats/mod.rs @@ -77,7 +77,7 @@ pub struct RpcQueryStats { #[derive(Clone, Debug, From, Hash, PartialEq, Eq)] pub struct RpcQueryKey { - /// unix epoch time. + /// unix epoch time in seconds. /// for the time series db, this is (close to) the time that the response was sent. /// for the account database, this is rounded to the week. response_timestamp: i64, @@ -484,7 +484,7 @@ impl BufferedRpcQueryStats { chain_id: u64, key: RpcQueryKey, instance: &str, - now: &str, + uniq: i64, ) -> anyhow::Result { let mut builder = DataPoint::builder(measurement) .tag("archive_needed", key.archive_needed.to_string()) @@ -493,8 +493,6 @@ impl BufferedRpcQueryStats { .tag("instance", instance) .tag("method", key.method) .tag("user_error_response", key.user_error_response.to_string()) - .tag("save_time", now) - .timestamp(key.response_timestamp) .field("backend_requests", self.backend_requests as i64) .field("cache_hits", self.cache_hits as i64) .field("cache_misses", self.cache_misses as i64) @@ -527,6 +525,11 @@ impl BufferedRpcQueryStats { builder = builder.tag("rpc_secret_key_id", rpc_secret_key_id.to_string()); } + // [add "uniq" to the timstamp](https://docs.influxdata.com/influxdb/v2.0/write-data/best-practices/duplicate-points/#increment-the-timestamp) + // i64 timestamps get us to Friday, April 11, 2262 + let timestamp_ns: i64 = key.response_timestamp * 1_000_000_000 + uniq % 1_000_000_000; + builder = builder.timestamp(timestamp_ns); + let point = builder.build()?; trace!("Datapoint saving to Influx is {:?}", point); diff --git a/web3_proxy/src/stats/stat_buffer.rs b/web3_proxy/src/stats/stat_buffer.rs index ba77288b..b3158b38 100644 --- a/web3_proxy/src/stats/stat_buffer.rs +++ b/web3_proxy/src/stats/stat_buffer.rs @@ -8,7 +8,6 @@ use crate::stats::RpcQueryStats; use derive_more::From; use futures::stream; use hashbrown::HashMap; -use influxdb2::api::write::TimestampPrecision; use migration::sea_orm::prelude::Decimal; use std::time::Duration; use tokio::sync::{broadcast, mpsc, oneshot}; @@ -54,8 +53,9 @@ pub struct StatBuffer { instance_hash: String, opt_in_timeseries_buffer: HashMap, rpc_secret_key_cache: RpcSecretKeyCache, - timestamp_precision: TimestampPrecision, tsdb_save_interval_seconds: u32, + tsdb_window: i64, + num_tsdb_windows: i64, user_balance_cache: UserBalanceCache, _flush_sender: mpsc::Sender>, @@ -82,9 +82,19 @@ impl StatBuffer { let (stat_sender, stat_receiver) = mpsc::unbounded_channel(); - let timestamp_precision = TimestampPrecision::Seconds; + // TODO: this has no chance of being re-used. we will eventually hit issues with cardinatility + // would be best to use `private_ip:frontend_port`. then it has a chance of being re-used (but never by 2 instances at the same time) + // even better would be `aws_auto_scaling_group_id:frontend_port` + let instance = Ulid::new().to_string(); - let instance_hash = Ulid::new().to_string(); + // TODO: get the frontend request timeout and add a minute buffer instead of hard coding `(5 + 1)` + let num_tsdb_windows = ((5 + 1) * 60) / tsdb_save_interval_seconds as i64; + + // make sure we have at least 4 windows + let num_tsdb_windows = num_tsdb_windows.max(4); + + // start tsdb_window at num_tsdb_windows because the first run increments it at the start and wraps it back to 0 + let tsdb_window = num_tsdb_windows; let mut new = Self { accounting_db_buffer: Default::default(), @@ -94,11 +104,12 @@ impl StatBuffer { global_timeseries_buffer: Default::default(), influxdb_bucket, influxdb_client, - instance_hash, + instance_hash: instance, + num_tsdb_windows, opt_in_timeseries_buffer: Default::default(), rpc_secret_key_cache, - timestamp_precision, tsdb_save_interval_seconds, + tsdb_window, user_balance_cache, _flush_sender: flush_sender, @@ -149,7 +160,7 @@ impl StatBuffer { } _ = tsdb_save_interval.tick() => { trace!("TSDB save internal tick"); - let (count, new_frontend_requests) = self.save_tsdb_stats().await; + let (count, new_frontend_requests) = self.save_tsdb_stats().await; if count > 0 { tsdb_frontend_requests += new_frontend_requests; debug!("Saved {} stats for {} requests to the tsdb", count, new_frontend_requests); @@ -401,6 +412,14 @@ impl StatBuffer { let mut frontend_requests = 0; if let Some(influxdb_client) = self.influxdb_client.as_ref() { + // every time we save, we increment the ts_db_window. this is used to ensure that stats don't overwrite others because the keys match + // this has to be done carefully or cardinality becomes a problem! + // https://docs.influxdata.com/influxdb/v2.0/write-data/best-practices/duplicate-points/ + self.tsdb_window += 1; + if self.tsdb_window > self.num_tsdb_windows { + self.tsdb_window = 0; + } + let influxdb_bucket = self .influxdb_bucket .as_ref() @@ -409,8 +428,6 @@ impl StatBuffer { // TODO: use stream::iter properly to avoid allocating this Vec let mut points = vec![]; - let now = chrono::Utc::now().to_rfc3339(); - for (key, stat) in self.global_timeseries_buffer.drain() { // TODO: i don't like passing key (which came from the stat) to the function on the stat. but it works for now let new_frontend_requests = stat.frontend_requests; @@ -421,7 +438,7 @@ impl StatBuffer { self.chain_id, key, &self.instance_hash, - &now, + self.tsdb_window, ) .await { @@ -444,7 +461,7 @@ impl StatBuffer { self.chain_id, key, &self.instance_hash, - &now, + self.tsdb_window, ) .await { @@ -476,11 +493,7 @@ impl StatBuffer { num_left -= batch_size; if let Err(err) = influxdb_client - .write_with_precision( - influxdb_bucket, - stream::iter(points), - self.timestamp_precision, - ) + .write(influxdb_bucket, stream::iter(points)) .await { // TODO: if this errors, we throw away some of the pending stats! retry any failures! (but not successes. it can have partial successes!)