timestamps instead of tags for cardinality reduction

This commit is contained in:
Bryan Stitt 2023-07-25 21:44:48 -07:00
parent e238203bbc
commit b80f994e90
8 changed files with 34 additions and 67 deletions

@ -338,7 +338,7 @@ impl Web3ProxyApp {
10, 10,
flush_stat_buffer_sender.clone(), flush_stat_buffer_sender.clone(),
flush_stat_buffer_receiver, flush_stat_buffer_receiver,
top_config.app.influxdb_id.to_string(), top_config.app.influxdb_id,
)? { )? {
// since the database entries are used for accounting, we want to be sure everything is saved before exiting // since the database entries are used for accounting, we want to be sure everything is saved before exiting
important_background_handles.push(spawned_stat_buffer.background_handle); important_background_handles.push(spawned_stat_buffer.background_handle);

@ -13,7 +13,6 @@ use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tracing::warn; use tracing::warn;
use ulid::Ulid;
pub type BlockAndRpc = (Option<Web3ProxyBlock>, Arc<Web3Rpc>); pub type BlockAndRpc = (Option<Web3ProxyBlock>, Arc<Web3Rpc>);
pub type TxHashAndRpc = (TxHash, Arc<Web3Rpc>); pub type TxHashAndRpc = (TxHash, Arc<Web3Rpc>);
@ -193,36 +192,21 @@ pub struct AppConfig {
/// influxdb bucket to use for stats /// influxdb bucket to use for stats
pub influxdb_bucket: Option<String>, pub influxdb_bucket: Option<String>,
/// influxdb id to use to keep stats from different servers being seen as duplicates of eachother /// influxdb_id to use to keep stats from different servers being seen as duplicates of each other
/// be careful about this. don't let cardinality get too high! /// this int is used as part of the "nanoseconds" part of the influx timestamp.
/// <https://docs.influxdata.com/influxdb/v2.0/write-data/best-practices/duplicate-points/#add-an-arbitrary-tag> ///
#[serde(default = "default_influxdb_id")] /// This **MUST** be set to a unique value for each running server.
pub influxdb_id: String, /// If not set, severs will overwrite eachother's stats.
///
/// <https://docs.influxdata.com/influxdb/v2.0/write-data/best-practices/duplicate-points/#increment-the-timestamp>
#[serde_inline_default(0i64)]
pub influxdb_id: i64,
/// unknown config options get put here /// unknown config options get put here
#[serde(flatten, default = "HashMap::default")] #[serde(flatten, default = "HashMap::default")]
pub extra: HashMap<String, serde_json::Value>, pub extra: HashMap<String, serde_json::Value>,
} }
fn default_influxdb_id() -> String {
match hostname::get() {
Ok(x) => x.into_string().unwrap_or_else(|hostname| {
warn!(
?hostname,
"hostname could not be converted to string. Using ULID for default influxdb_id"
);
Ulid::new().to_string()
}),
Err(err) => {
warn!(
?err,
"no hostname for default influxdb_id. Using ULID for default influxdb_id"
);
Ulid::new().to_string()
}
}
}
impl Default for AppConfig { impl Default for AppConfig {
fn default() -> Self { fn default() -> Self {
serde_json::from_str("{}").unwrap() serde_json::from_str("{}").unwrap()
@ -344,11 +328,7 @@ mod tests {
assert_eq!(a.min_synced_rpcs, 1); assert_eq!(a.min_synced_rpcs, 1);
// b is from Default // b is from Default
let b: AppConfig = AppConfig { let b = AppConfig::default();
// influxdb_id is randomized, so we clone it
influxdb_id: a.influxdb_id.clone(),
..Default::default()
};
assert_eq!(b.min_synced_rpcs, 1); assert_eq!(b.min_synced_rpcs, 1);

@ -478,14 +478,12 @@ impl BufferedRpcQueryStats {
measurement: &str, measurement: &str,
chain_id: u64, chain_id: u64,
key: RpcQueryKey, key: RpcQueryKey,
instance: &str,
uniq: i64, uniq: i64,
) -> anyhow::Result<DataPoint> { ) -> anyhow::Result<DataPoint> {
let mut builder = DataPoint::builder(measurement) let mut builder = DataPoint::builder(measurement)
.tag("archive_needed", key.archive_needed.to_string()) .tag("archive_needed", key.archive_needed.to_string())
.tag("chain_id", chain_id.to_string()) .tag("chain_id", chain_id.to_string())
.tag("error_response", key.error_response.to_string()) .tag("error_response", key.error_response.to_string())
.tag("instance", instance)
.tag("method", key.method) .tag("method", key.method)
.tag("user_error_response", key.user_error_response.to_string()) .tag("user_error_response", key.user_error_response.to_string())
.field("backend_requests", self.backend_requests as i64) .field("backend_requests", self.backend_requests as i64)

@ -49,12 +49,14 @@ pub struct StatBuffer {
global_timeseries_buffer: HashMap<RpcQueryKey, BufferedRpcQueryStats>, global_timeseries_buffer: HashMap<RpcQueryKey, BufferedRpcQueryStats>,
influxdb_bucket: Option<String>, influxdb_bucket: Option<String>,
influxdb_client: Option<influxdb2::Client>, influxdb_client: Option<influxdb2::Client>,
/// a globally unique name /// a globally unique integer. max of 1e6-1
/// instance names can be re-used but they MUST only ever be used by a single server at a time! /// instance names can be re-used but they MUST only ever be used by a single server at a time!
instance: String, /// this will be combined with tsdb_window to create a number with a max of 1e9-1
uniq_id: i64,
opt_in_timeseries_buffer: HashMap<RpcQueryKey, BufferedRpcQueryStats>, opt_in_timeseries_buffer: HashMap<RpcQueryKey, BufferedRpcQueryStats>,
rpc_secret_key_cache: RpcSecretKeyCache, rpc_secret_key_cache: RpcSecretKeyCache,
tsdb_save_interval_seconds: u32, tsdb_save_interval_seconds: u32,
/// a wrapping counter to keep stats from old times that got delayed from being seen as a duplicate
tsdb_window: i64, tsdb_window: i64,
num_tsdb_windows: i64, num_tsdb_windows: i64,
user_balance_cache: UserBalanceCache, user_balance_cache: UserBalanceCache,
@ -76,7 +78,7 @@ impl StatBuffer {
tsdb_save_interval_seconds: u32, tsdb_save_interval_seconds: u32,
flush_sender: mpsc::Sender<oneshot::Sender<FlushedStats>>, flush_sender: mpsc::Sender<oneshot::Sender<FlushedStats>>,
flush_receiver: mpsc::Receiver<oneshot::Sender<FlushedStats>>, flush_receiver: mpsc::Receiver<oneshot::Sender<FlushedStats>>,
instance: String, uniq_id: i64,
) -> anyhow::Result<Option<SpawnedStatBuffer>> { ) -> anyhow::Result<Option<SpawnedStatBuffer>> {
if influxdb_bucket.is_none() { if influxdb_bucket.is_none() {
influxdb_client = None; influxdb_client = None;
@ -84,15 +86,15 @@ impl StatBuffer {
let (stat_sender, stat_receiver) = mpsc::unbounded_channel(); let (stat_sender, stat_receiver) = mpsc::unbounded_channel();
// TODO: get the frontend request timeout and add a minute buffer instead of hard coding `(5 + 1)` let num_tsdb_windows = 1_000;
let num_tsdb_windows = ((5 + 1) * 60) / tsdb_save_interval_seconds as i64;
// make sure we have at least 4 windows
let num_tsdb_windows = num_tsdb_windows.max(4);
// start tsdb_window at num_tsdb_windows because the first run increments it at the start and wraps it back to 0 // start tsdb_window at num_tsdb_windows because the first run increments it at the start and wraps it back to 0
let tsdb_window = num_tsdb_windows; let tsdb_window = num_tsdb_windows;
let uniq_id = uniq_id * num_tsdb_windows;
assert!(uniq_id < 1_000_000_000, "uniq_id too large!");
let mut new = Self { let mut new = Self {
accounting_db_buffer: Default::default(), accounting_db_buffer: Default::default(),
billing_period_seconds, billing_period_seconds,
@ -101,7 +103,7 @@ impl StatBuffer {
global_timeseries_buffer: Default::default(), global_timeseries_buffer: Default::default(),
influxdb_bucket, influxdb_bucket,
influxdb_client, influxdb_client,
instance, uniq_id,
num_tsdb_windows, num_tsdb_windows,
opt_in_timeseries_buffer: Default::default(), opt_in_timeseries_buffer: Default::default(),
rpc_secret_key_cache, rpc_secret_key_cache,
@ -413,10 +415,12 @@ impl StatBuffer {
// this has to be done carefully or cardinality becomes a problem! // this has to be done carefully or cardinality becomes a problem!
// https://docs.influxdata.com/influxdb/v2.0/write-data/best-practices/duplicate-points/ // https://docs.influxdata.com/influxdb/v2.0/write-data/best-practices/duplicate-points/
self.tsdb_window += 1; self.tsdb_window += 1;
if self.tsdb_window > self.num_tsdb_windows { if self.tsdb_window >= self.num_tsdb_windows {
self.tsdb_window = 0; self.tsdb_window = 0;
} }
let uniq = self.uniq_id + self.tsdb_window;
let influxdb_bucket = self let influxdb_bucket = self
.influxdb_bucket .influxdb_bucket
.as_ref() .as_ref()
@ -430,13 +434,7 @@ impl StatBuffer {
let new_frontend_requests = stat.frontend_requests; let new_frontend_requests = stat.frontend_requests;
match stat match stat
.build_timeseries_point( .build_timeseries_point("global_proxy", self.chain_id, key, uniq)
"global_proxy",
self.chain_id,
key,
&self.instance,
self.tsdb_window,
)
.await .await
{ {
Ok(point) => { Ok(point) => {
@ -453,13 +451,7 @@ impl StatBuffer {
for (key, stat) in self.opt_in_timeseries_buffer.drain() { for (key, stat) in self.opt_in_timeseries_buffer.drain() {
// TODO: i don't like passing key (which came from the stat) to the function on the stat. but it works for now // TODO: i don't like passing key (which came from the stat) to the function on the stat. but it works for now
match stat match stat
.build_timeseries_point( .build_timeseries_point("opt_in_proxy", self.chain_id, key, uniq)
"opt_in_proxy",
self.chain_id,
key,
&self.instance,
self.tsdb_window,
)
.await .await
{ {
Ok(point) => { Ok(point) => {

@ -78,8 +78,6 @@ impl MigrateStatsToV2SubCommand {
let rpc_secret_key_cache = Cache::builder().build(); let rpc_secret_key_cache = Cache::builder().build();
let user_balance_cache = Cache::builder().build().into(); let user_balance_cache = Cache::builder().build().into();
let instance = Ulid::new().to_string();
// Spawn the stat-sender // Spawn the stat-sender
let emitter_spawn = StatBuffer::try_spawn( let emitter_spawn = StatBuffer::try_spawn(
BILLING_PERIOD_SECONDS, BILLING_PERIOD_SECONDS,
@ -93,7 +91,7 @@ impl MigrateStatsToV2SubCommand {
60, 60,
flush_sender, flush_sender,
flush_receiver, flush_receiver,
instance, top_config.app.influxdb_id,
) )
.context("Error spawning stat buffer")? .context("Error spawning stat buffer")?
.context("No stat buffer spawned. Maybe missing influx or db credentials?")?; .context("No stat buffer spawned. Maybe missing influx or db credentials?")?;

@ -22,7 +22,6 @@ use tokio::{
time::{sleep, Instant}, time::{sleep, Instant},
}; };
use tracing::info; use tracing::info;
use ulid::Ulid;
use web3_proxy::{ use web3_proxy::{
config::{AppConfig, TopConfig, Web3RpcConfig}, config::{AppConfig, TopConfig, Web3RpcConfig},
stats::FlushedStats, stats::FlushedStats,
@ -50,7 +49,7 @@ impl TestApp {
anvil: &TestAnvil, anvil: &TestAnvil,
db: Option<&TestMysql>, db: Option<&TestMysql>,
influx: Option<&TestInflux>, influx: Option<&TestInflux>,
influx_id: Option<String>, influx_id: Option<u64>,
) -> Self { ) -> Self {
let chain_id = anvil.instance.chain_id(); let chain_id = anvil.instance.chain_id();
let num_workers = 4; let num_workers = 4;
@ -82,7 +81,7 @@ impl TestApp {
"influxdb_org": influx_org, "influxdb_org": influx_org,
"influxdb_token": influx_token, "influxdb_token": influx_token,
"influxdb_bucket": influx_bucket, "influxdb_bucket": influx_bucket,
"influxdb_id": influx_id.unwrap_or_else(|| Ulid::new().to_string()), "influxdb_id": influx_id.unwrap_or_default(),
"default_user_max_requests_per_period": Some(6_000_000), "default_user_max_requests_per_period": Some(6_000_000),
"deposit_factory_contract": Address::from_str( "deposit_factory_contract": Address::from_str(
"4e3BC2054788De923A04936C6ADdB99A05B0Ea36", "4e3BC2054788De923A04936C6ADdB99A05B0Ea36",

@ -34,8 +34,8 @@ async fn test_multiple_proxies_stats_add_up() {
.unwrap(); .unwrap();
// Since when do indices start with 1 // Since when do indices start with 1
let x_0 = TestApp::spawn(&a, Some(&db), Some(&influx), Some("app_0".to_string())).await; let x_0 = TestApp::spawn(&a, Some(&db), Some(&influx), Some(0)).await;
let x_1 = TestApp::spawn(&a, Some(&db), Some(&influx), Some("app_1".to_string())).await; let x_1 = TestApp::spawn(&a, Some(&db), Some(&influx), Some(1)).await;
// make a user and give them credits // make a user and give them credits
let user_0_wallet = a.wallet(0); let user_0_wallet = a.wallet(0);

@ -37,7 +37,7 @@ async fn test_two_buffers() {
tsdb_save_interval_seconds, tsdb_save_interval_seconds,
flush_sender_1, flush_sender_1,
flush_receiver_1, flush_receiver_1,
"buffer_1".to_string(), 1,
) )
.unwrap() .unwrap()
.unwrap(); .unwrap();
@ -54,7 +54,7 @@ async fn test_two_buffers() {
tsdb_save_interval_seconds, tsdb_save_interval_seconds,
flush_sender_2, flush_sender_2,
flush_receiver_2, flush_receiver_2,
"buffer_2".to_string(), 2,
) )
.unwrap() .unwrap()
.unwrap(); .unwrap();