diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 4b8445fd..3258cd42 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -338,6 +338,7 @@ impl Web3ProxyApp { 10, flush_stat_buffer_sender.clone(), flush_stat_buffer_receiver, + top_config.app.influxdb_id.to_string(), )? { // since the database entries are used for accounting, we want to be sure everything is saved before exiting important_background_handles.push(spawned_stat_buffer.background_handle); diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index 564b7388..8a582684 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -13,6 +13,7 @@ use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc; use tracing::warn; +use ulid::Ulid; pub type BlockAndRpc = (Option, Arc); pub type TxHashAndRpc = (TxHash, Arc); @@ -192,11 +193,36 @@ pub struct AppConfig { /// influxdb bucket to use for stats pub influxdb_bucket: Option, + /// influxdb id to use to keep stats from different servers being seen as duplicates of eachother + /// be careful about this. don't let cardinality get too high! + /// + #[serde(default = "default_influxdb_id")] + pub influxdb_id: String, + /// unknown config options get put here #[serde(flatten, default = "HashMap::default")] pub extra: HashMap, } +fn default_influxdb_id() -> String { + match hostname::get() { + Ok(x) => x.into_string().unwrap_or_else(|hostname| { + warn!( + ?hostname, + "hostname could not be converted to string. Using ULID for default influxdb_id" + ); + Ulid::new().to_string() + }), + Err(err) => { + warn!( + ?err, + "no hostname for default influxdb_id. Using ULID for default influxdb_id" + ); + Ulid::new().to_string() + } + } +} + impl Default for AppConfig { fn default() -> Self { serde_json::from_str("{}").unwrap() diff --git a/web3_proxy/src/stats/stat_buffer.rs b/web3_proxy/src/stats/stat_buffer.rs index b3158b38..1ec3a7b2 100644 --- a/web3_proxy/src/stats/stat_buffer.rs +++ b/web3_proxy/src/stats/stat_buffer.rs @@ -13,7 +13,6 @@ use std::time::Duration; use tokio::sync::{broadcast, mpsc, oneshot}; use tokio::time::{interval, sleep}; use tracing::{debug, error, info, trace, warn, Instrument}; -use ulid::Ulid; #[derive(Debug, Default)] pub struct BufferedRpcQueryStats { @@ -50,7 +49,9 @@ pub struct StatBuffer { global_timeseries_buffer: HashMap, influxdb_bucket: Option, influxdb_client: Option, - instance_hash: String, + /// a globally unique name + /// instance names can be re-used but they MUST only ever be used by a single server at a time! + instance: String, opt_in_timeseries_buffer: HashMap, rpc_secret_key_cache: RpcSecretKeyCache, tsdb_save_interval_seconds: u32, @@ -75,6 +76,7 @@ impl StatBuffer { tsdb_save_interval_seconds: u32, flush_sender: mpsc::Sender>, flush_receiver: mpsc::Receiver>, + instance: String, ) -> anyhow::Result> { if influxdb_bucket.is_none() { influxdb_client = None; @@ -82,11 +84,6 @@ impl StatBuffer { let (stat_sender, stat_receiver) = mpsc::unbounded_channel(); - // TODO: this has no chance of being re-used. we will eventually hit issues with cardinatility - // would be best to use `private_ip:frontend_port`. then it has a chance of being re-used (but never by 2 instances at the same time) - // even better would be `aws_auto_scaling_group_id:frontend_port` - let instance = Ulid::new().to_string(); - // TODO: get the frontend request timeout and add a minute buffer instead of hard coding `(5 + 1)` let num_tsdb_windows = ((5 + 1) * 60) / tsdb_save_interval_seconds as i64; @@ -104,7 +101,7 @@ impl StatBuffer { global_timeseries_buffer: Default::default(), influxdb_bucket, influxdb_client, - instance_hash: instance, + instance, num_tsdb_windows, opt_in_timeseries_buffer: Default::default(), rpc_secret_key_cache, @@ -437,7 +434,7 @@ impl StatBuffer { "global_proxy", self.chain_id, key, - &self.instance_hash, + &self.instance, self.tsdb_window, ) .await @@ -460,7 +457,7 @@ impl StatBuffer { "opt_in_proxy", self.chain_id, key, - &self.instance_hash, + &self.instance, self.tsdb_window, ) .await diff --git a/web3_proxy/src/sub_commands/migrate_stats_to_v2.rs b/web3_proxy/src/sub_commands/migrate_stats_to_v2.rs index 44986bcd..c417557e 100644 --- a/web3_proxy/src/sub_commands/migrate_stats_to_v2.rs +++ b/web3_proxy/src/sub_commands/migrate_stats_to_v2.rs @@ -78,6 +78,8 @@ impl MigrateStatsToV2SubCommand { let rpc_secret_key_cache = Cache::builder().build(); let user_balance_cache = Cache::builder().build().into(); + let instance = Ulid::new().to_string(); + // Spawn the stat-sender let emitter_spawn = StatBuffer::try_spawn( BILLING_PERIOD_SECONDS, @@ -91,6 +93,7 @@ impl MigrateStatsToV2SubCommand { 60, flush_sender, flush_receiver, + instance, ) .context("Error spawning stat buffer")? .context("No stat buffer spawned. Maybe missing influx or db credentials?")?; diff --git a/web3_proxy/tests/test_stat_buffer.rs b/web3_proxy/tests/test_stat_buffer.rs index 178e7101..093ae7c9 100644 --- a/web3_proxy/tests/test_stat_buffer.rs +++ b/web3_proxy/tests/test_stat_buffer.rs @@ -37,6 +37,7 @@ async fn test_two_buffers() { tsdb_save_interval_seconds, flush_sender_1, flush_receiver_1, + "buffer_1".to_string(), ) .unwrap() .unwrap(); @@ -53,6 +54,7 @@ async fn test_two_buffers() { tsdb_save_interval_seconds, flush_sender_2, flush_receiver_2, + "buffer_2".to_string(), ) .unwrap() .unwrap();