influx id from config. default to the hostname or fallback to ulid

This commit is contained in:
Bryan Stitt 2023-07-22 00:26:02 -07:00
parent 352726872a
commit 4091e05f3d
5 changed files with 39 additions and 10 deletions

@ -338,6 +338,7 @@ impl Web3ProxyApp {
10, 10,
flush_stat_buffer_sender.clone(), flush_stat_buffer_sender.clone(),
flush_stat_buffer_receiver, flush_stat_buffer_receiver,
top_config.app.influxdb_id.to_string(),
)? { )? {
// since the database entries are used for accounting, we want to be sure everything is saved before exiting // since the database entries are used for accounting, we want to be sure everything is saved before exiting
important_background_handles.push(spawned_stat_buffer.background_handle); important_background_handles.push(spawned_stat_buffer.background_handle);

@ -13,6 +13,7 @@ use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tracing::warn; use tracing::warn;
use ulid::Ulid;
pub type BlockAndRpc = (Option<Web3ProxyBlock>, Arc<Web3Rpc>); pub type BlockAndRpc = (Option<Web3ProxyBlock>, Arc<Web3Rpc>);
pub type TxHashAndRpc = (TxHash, Arc<Web3Rpc>); pub type TxHashAndRpc = (TxHash, Arc<Web3Rpc>);
@ -192,11 +193,36 @@ pub struct AppConfig {
/// influxdb bucket to use for stats /// influxdb bucket to use for stats
pub influxdb_bucket: Option<String>, pub influxdb_bucket: Option<String>,
/// influxdb id to use to keep stats from different servers being seen as duplicates of eachother
/// be careful about this. don't let cardinality get too high!
/// <https://docs.influxdata.com/influxdb/v2.0/write-data/best-practices/duplicate-points/#add-an-arbitrary-tag>
#[serde(default = "default_influxdb_id")]
pub influxdb_id: String,
/// unknown config options get put here /// unknown config options get put here
#[serde(flatten, default = "HashMap::default")] #[serde(flatten, default = "HashMap::default")]
pub extra: HashMap<String, serde_json::Value>, pub extra: HashMap<String, serde_json::Value>,
} }
fn default_influxdb_id() -> String {
match hostname::get() {
Ok(x) => x.into_string().unwrap_or_else(|hostname| {
warn!(
?hostname,
"hostname could not be converted to string. Using ULID for default influxdb_id"
);
Ulid::new().to_string()
}),
Err(err) => {
warn!(
?err,
"no hostname for default influxdb_id. Using ULID for default influxdb_id"
);
Ulid::new().to_string()
}
}
}
impl Default for AppConfig { impl Default for AppConfig {
fn default() -> Self { fn default() -> Self {
serde_json::from_str("{}").unwrap() serde_json::from_str("{}").unwrap()

@ -13,7 +13,6 @@ use std::time::Duration;
use tokio::sync::{broadcast, mpsc, oneshot}; use tokio::sync::{broadcast, mpsc, oneshot};
use tokio::time::{interval, sleep}; use tokio::time::{interval, sleep};
use tracing::{debug, error, info, trace, warn, Instrument}; use tracing::{debug, error, info, trace, warn, Instrument};
use ulid::Ulid;
#[derive(Debug, Default)] #[derive(Debug, Default)]
pub struct BufferedRpcQueryStats { pub struct BufferedRpcQueryStats {
@ -50,7 +49,9 @@ pub struct StatBuffer {
global_timeseries_buffer: HashMap<RpcQueryKey, BufferedRpcQueryStats>, global_timeseries_buffer: HashMap<RpcQueryKey, BufferedRpcQueryStats>,
influxdb_bucket: Option<String>, influxdb_bucket: Option<String>,
influxdb_client: Option<influxdb2::Client>, influxdb_client: Option<influxdb2::Client>,
instance_hash: String, /// a globally unique name
/// instance names can be re-used but they MUST only ever be used by a single server at a time!
instance: String,
opt_in_timeseries_buffer: HashMap<RpcQueryKey, BufferedRpcQueryStats>, opt_in_timeseries_buffer: HashMap<RpcQueryKey, BufferedRpcQueryStats>,
rpc_secret_key_cache: RpcSecretKeyCache, rpc_secret_key_cache: RpcSecretKeyCache,
tsdb_save_interval_seconds: u32, tsdb_save_interval_seconds: u32,
@ -75,6 +76,7 @@ impl StatBuffer {
tsdb_save_interval_seconds: u32, tsdb_save_interval_seconds: u32,
flush_sender: mpsc::Sender<oneshot::Sender<FlushedStats>>, flush_sender: mpsc::Sender<oneshot::Sender<FlushedStats>>,
flush_receiver: mpsc::Receiver<oneshot::Sender<FlushedStats>>, flush_receiver: mpsc::Receiver<oneshot::Sender<FlushedStats>>,
instance: String,
) -> anyhow::Result<Option<SpawnedStatBuffer>> { ) -> anyhow::Result<Option<SpawnedStatBuffer>> {
if influxdb_bucket.is_none() { if influxdb_bucket.is_none() {
influxdb_client = None; influxdb_client = None;
@ -82,11 +84,6 @@ impl StatBuffer {
let (stat_sender, stat_receiver) = mpsc::unbounded_channel(); let (stat_sender, stat_receiver) = mpsc::unbounded_channel();
// TODO: this has no chance of being re-used. we will eventually hit issues with cardinatility
// would be best to use `private_ip:frontend_port`. then it has a chance of being re-used (but never by 2 instances at the same time)
// even better would be `aws_auto_scaling_group_id:frontend_port`
let instance = Ulid::new().to_string();
// TODO: get the frontend request timeout and add a minute buffer instead of hard coding `(5 + 1)` // TODO: get the frontend request timeout and add a minute buffer instead of hard coding `(5 + 1)`
let num_tsdb_windows = ((5 + 1) * 60) / tsdb_save_interval_seconds as i64; let num_tsdb_windows = ((5 + 1) * 60) / tsdb_save_interval_seconds as i64;
@ -104,7 +101,7 @@ impl StatBuffer {
global_timeseries_buffer: Default::default(), global_timeseries_buffer: Default::default(),
influxdb_bucket, influxdb_bucket,
influxdb_client, influxdb_client,
instance_hash: instance, instance,
num_tsdb_windows, num_tsdb_windows,
opt_in_timeseries_buffer: Default::default(), opt_in_timeseries_buffer: Default::default(),
rpc_secret_key_cache, rpc_secret_key_cache,
@ -437,7 +434,7 @@ impl StatBuffer {
"global_proxy", "global_proxy",
self.chain_id, self.chain_id,
key, key,
&self.instance_hash, &self.instance,
self.tsdb_window, self.tsdb_window,
) )
.await .await
@ -460,7 +457,7 @@ impl StatBuffer {
"opt_in_proxy", "opt_in_proxy",
self.chain_id, self.chain_id,
key, key,
&self.instance_hash, &self.instance,
self.tsdb_window, self.tsdb_window,
) )
.await .await

@ -78,6 +78,8 @@ impl MigrateStatsToV2SubCommand {
let rpc_secret_key_cache = Cache::builder().build(); let rpc_secret_key_cache = Cache::builder().build();
let user_balance_cache = Cache::builder().build().into(); let user_balance_cache = Cache::builder().build().into();
let instance = Ulid::new().to_string();
// Spawn the stat-sender // Spawn the stat-sender
let emitter_spawn = StatBuffer::try_spawn( let emitter_spawn = StatBuffer::try_spawn(
BILLING_PERIOD_SECONDS, BILLING_PERIOD_SECONDS,
@ -91,6 +93,7 @@ impl MigrateStatsToV2SubCommand {
60, 60,
flush_sender, flush_sender,
flush_receiver, flush_receiver,
instance,
) )
.context("Error spawning stat buffer")? .context("Error spawning stat buffer")?
.context("No stat buffer spawned. Maybe missing influx or db credentials?")?; .context("No stat buffer spawned. Maybe missing influx or db credentials?")?;

@ -37,6 +37,7 @@ async fn test_two_buffers() {
tsdb_save_interval_seconds, tsdb_save_interval_seconds,
flush_sender_1, flush_sender_1,
flush_receiver_1, flush_receiver_1,
"buffer_1".to_string(),
) )
.unwrap() .unwrap()
.unwrap(); .unwrap();
@ -53,6 +54,7 @@ async fn test_two_buffers() {
tsdb_save_interval_seconds, tsdb_save_interval_seconds,
flush_sender_2, flush_sender_2,
flush_receiver_2, flush_receiver_2,
"buffer_2".to_string(),
) )
.unwrap() .unwrap()
.unwrap(); .unwrap();