diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index b58e07e8..f826dbb1 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -338,7 +338,7 @@ impl Web3ProxyApp { 10, flush_stat_buffer_sender.clone(), flush_stat_buffer_receiver, - top_config.app.influxdb_id.to_string(), + top_config.app.influxdb_id, )? { // since the database entries are used for accounting, we want to be sure everything is saved before exiting important_background_handles.push(spawned_stat_buffer.background_handle); diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index 8557c3a9..4dfa83ad 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -13,7 +13,6 @@ use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc; use tracing::warn; -use ulid::Ulid; pub type BlockAndRpc = (Option, Arc); pub type TxHashAndRpc = (TxHash, Arc); @@ -193,36 +192,21 @@ pub struct AppConfig { /// influxdb bucket to use for stats pub influxdb_bucket: Option, - /// influxdb id to use to keep stats from different servers being seen as duplicates of eachother - /// be careful about this. don't let cardinality get too high! - /// - #[serde(default = "default_influxdb_id")] - pub influxdb_id: String, + /// influxdb_id to use to keep stats from different servers being seen as duplicates of each other + /// this int is used as part of the "nanoseconds" part of the influx timestamp. + /// + /// This **MUST** be set to a unique value for each running server. + /// If not set, severs will overwrite eachother's stats. + /// + /// + #[serde_inline_default(0i64)] + pub influxdb_id: i64, /// unknown config options get put here #[serde(flatten, default = "HashMap::default")] pub extra: HashMap, } -fn default_influxdb_id() -> String { - match hostname::get() { - Ok(x) => x.into_string().unwrap_or_else(|hostname| { - warn!( - ?hostname, - "hostname could not be converted to string. Using ULID for default influxdb_id" - ); - Ulid::new().to_string() - }), - Err(err) => { - warn!( - ?err, - "no hostname for default influxdb_id. Using ULID for default influxdb_id" - ); - Ulid::new().to_string() - } - } -} - impl Default for AppConfig { fn default() -> Self { serde_json::from_str("{}").unwrap() @@ -344,11 +328,7 @@ mod tests { assert_eq!(a.min_synced_rpcs, 1); // b is from Default - let b: AppConfig = AppConfig { - // influxdb_id is randomized, so we clone it - influxdb_id: a.influxdb_id.clone(), - ..Default::default() - }; + let b = AppConfig::default(); assert_eq!(b.min_synced_rpcs, 1); diff --git a/web3_proxy/src/stats/mod.rs b/web3_proxy/src/stats/mod.rs index 2e35ea1f..351addf2 100644 --- a/web3_proxy/src/stats/mod.rs +++ b/web3_proxy/src/stats/mod.rs @@ -478,14 +478,12 @@ impl BufferedRpcQueryStats { measurement: &str, chain_id: u64, key: RpcQueryKey, - instance: &str, uniq: i64, ) -> anyhow::Result { let mut builder = DataPoint::builder(measurement) .tag("archive_needed", key.archive_needed.to_string()) .tag("chain_id", chain_id.to_string()) .tag("error_response", key.error_response.to_string()) - .tag("instance", instance) .tag("method", key.method) .tag("user_error_response", key.user_error_response.to_string()) .field("backend_requests", self.backend_requests as i64) diff --git a/web3_proxy/src/stats/stat_buffer.rs b/web3_proxy/src/stats/stat_buffer.rs index 18b360c2..4694f56a 100644 --- a/web3_proxy/src/stats/stat_buffer.rs +++ b/web3_proxy/src/stats/stat_buffer.rs @@ -49,12 +49,14 @@ pub struct StatBuffer { global_timeseries_buffer: HashMap, influxdb_bucket: Option, influxdb_client: Option, - /// a globally unique name + /// a globally unique integer. max of 1e6-1 /// instance names can be re-used but they MUST only ever be used by a single server at a time! - instance: String, + /// this will be combined with tsdb_window to create a number with a max of 1e9-1 + uniq_id: i64, opt_in_timeseries_buffer: HashMap, rpc_secret_key_cache: RpcSecretKeyCache, tsdb_save_interval_seconds: u32, + /// a wrapping counter to keep stats from old times that got delayed from being seen as a duplicate tsdb_window: i64, num_tsdb_windows: i64, user_balance_cache: UserBalanceCache, @@ -76,7 +78,7 @@ impl StatBuffer { tsdb_save_interval_seconds: u32, flush_sender: mpsc::Sender>, flush_receiver: mpsc::Receiver>, - instance: String, + uniq_id: i64, ) -> anyhow::Result> { if influxdb_bucket.is_none() { influxdb_client = None; @@ -84,15 +86,15 @@ impl StatBuffer { let (stat_sender, stat_receiver) = mpsc::unbounded_channel(); - // TODO: get the frontend request timeout and add a minute buffer instead of hard coding `(5 + 1)` - let num_tsdb_windows = ((5 + 1) * 60) / tsdb_save_interval_seconds as i64; - - // make sure we have at least 4 windows - let num_tsdb_windows = num_tsdb_windows.max(4); + let num_tsdb_windows = 1_000; // start tsdb_window at num_tsdb_windows because the first run increments it at the start and wraps it back to 0 let tsdb_window = num_tsdb_windows; + let uniq_id = uniq_id * num_tsdb_windows; + + assert!(uniq_id < 1_000_000_000, "uniq_id too large!"); + let mut new = Self { accounting_db_buffer: Default::default(), billing_period_seconds, @@ -101,7 +103,7 @@ impl StatBuffer { global_timeseries_buffer: Default::default(), influxdb_bucket, influxdb_client, - instance, + uniq_id, num_tsdb_windows, opt_in_timeseries_buffer: Default::default(), rpc_secret_key_cache, @@ -413,10 +415,12 @@ impl StatBuffer { // this has to be done carefully or cardinality becomes a problem! // https://docs.influxdata.com/influxdb/v2.0/write-data/best-practices/duplicate-points/ self.tsdb_window += 1; - if self.tsdb_window > self.num_tsdb_windows { + if self.tsdb_window >= self.num_tsdb_windows { self.tsdb_window = 0; } + let uniq = self.uniq_id + self.tsdb_window; + let influxdb_bucket = self .influxdb_bucket .as_ref() @@ -430,13 +434,7 @@ impl StatBuffer { let new_frontend_requests = stat.frontend_requests; match stat - .build_timeseries_point( - "global_proxy", - self.chain_id, - key, - &self.instance, - self.tsdb_window, - ) + .build_timeseries_point("global_proxy", self.chain_id, key, uniq) .await { Ok(point) => { @@ -453,13 +451,7 @@ impl StatBuffer { for (key, stat) in self.opt_in_timeseries_buffer.drain() { // TODO: i don't like passing key (which came from the stat) to the function on the stat. but it works for now match stat - .build_timeseries_point( - "opt_in_proxy", - self.chain_id, - key, - &self.instance, - self.tsdb_window, - ) + .build_timeseries_point("opt_in_proxy", self.chain_id, key, uniq) .await { Ok(point) => { diff --git a/web3_proxy/src/sub_commands/migrate_stats_to_v2.rs b/web3_proxy/src/sub_commands/migrate_stats_to_v2.rs index c417557e..7d5afb11 100644 --- a/web3_proxy/src/sub_commands/migrate_stats_to_v2.rs +++ b/web3_proxy/src/sub_commands/migrate_stats_to_v2.rs @@ -78,8 +78,6 @@ impl MigrateStatsToV2SubCommand { let rpc_secret_key_cache = Cache::builder().build(); let user_balance_cache = Cache::builder().build().into(); - let instance = Ulid::new().to_string(); - // Spawn the stat-sender let emitter_spawn = StatBuffer::try_spawn( BILLING_PERIOD_SECONDS, @@ -93,7 +91,7 @@ impl MigrateStatsToV2SubCommand { 60, flush_sender, flush_receiver, - instance, + top_config.app.influxdb_id, ) .context("Error spawning stat buffer")? .context("No stat buffer spawned. Maybe missing influx or db credentials?")?; diff --git a/web3_proxy/tests/common/app.rs b/web3_proxy/tests/common/app.rs index 79bc1afe..1bf1ce96 100644 --- a/web3_proxy/tests/common/app.rs +++ b/web3_proxy/tests/common/app.rs @@ -22,7 +22,6 @@ use tokio::{ time::{sleep, Instant}, }; use tracing::info; -use ulid::Ulid; use web3_proxy::{ config::{AppConfig, TopConfig, Web3RpcConfig}, stats::FlushedStats, @@ -50,7 +49,7 @@ impl TestApp { anvil: &TestAnvil, db: Option<&TestMysql>, influx: Option<&TestInflux>, - influx_id: Option, + influx_id: Option, ) -> Self { let chain_id = anvil.instance.chain_id(); let num_workers = 4; @@ -82,7 +81,7 @@ impl TestApp { "influxdb_org": influx_org, "influxdb_token": influx_token, "influxdb_bucket": influx_bucket, - "influxdb_id": influx_id.unwrap_or_else(|| Ulid::new().to_string()), + "influxdb_id": influx_id.unwrap_or_default(), "default_user_max_requests_per_period": Some(6_000_000), "deposit_factory_contract": Address::from_str( "4e3BC2054788De923A04936C6ADdB99A05B0Ea36", diff --git a/web3_proxy/tests/test_multiple_proxy.rs b/web3_proxy/tests/test_multiple_proxy.rs index 33d67474..531a956e 100644 --- a/web3_proxy/tests/test_multiple_proxy.rs +++ b/web3_proxy/tests/test_multiple_proxy.rs @@ -34,8 +34,8 @@ async fn test_multiple_proxies_stats_add_up() { .unwrap(); // Since when do indices start with 1 - let x_0 = TestApp::spawn(&a, Some(&db), Some(&influx), Some("app_0".to_string())).await; - let x_1 = TestApp::spawn(&a, Some(&db), Some(&influx), Some("app_1".to_string())).await; + let x_0 = TestApp::spawn(&a, Some(&db), Some(&influx), Some(0)).await; + let x_1 = TestApp::spawn(&a, Some(&db), Some(&influx), Some(1)).await; // make a user and give them credits let user_0_wallet = a.wallet(0); diff --git a/web3_proxy/tests/test_stat_buffer.rs b/web3_proxy/tests/test_stat_buffer.rs index 093ae7c9..faf1fb79 100644 --- a/web3_proxy/tests/test_stat_buffer.rs +++ b/web3_proxy/tests/test_stat_buffer.rs @@ -37,7 +37,7 @@ async fn test_two_buffers() { tsdb_save_interval_seconds, flush_sender_1, flush_receiver_1, - "buffer_1".to_string(), + 1, ) .unwrap() .unwrap(); @@ -54,7 +54,7 @@ async fn test_two_buffers() { tsdb_save_interval_seconds, flush_sender_2, flush_receiver_2, - "buffer_2".to_string(), + 2, ) .unwrap() .unwrap();