diff --git a/web3_proxy/src/stats/mod.rs b/web3_proxy/src/stats/mod.rs index 6b960219..c391ee9f 100644 --- a/web3_proxy/src/stats/mod.rs +++ b/web3_proxy/src/stats/mod.rs @@ -14,7 +14,7 @@ use futures::stream; use hashbrown::HashMap; use influxdb2::api::write::TimestampPrecision; use influxdb2::models::DataPoint; -use log::{error, info}; +use log::{error, info, trace}; use migration::sea_orm::{self, DatabaseConnection, EntityTrait}; use migration::{Expr, OnConflict}; use std::num::NonZeroU64; @@ -114,9 +114,10 @@ impl RpcQueryStats { } } - /// all queries are aggregated + /// all rpc keys are aggregated in the global stats /// TODO: should we store "anon" or "registered" as a key just to be able to split graphs? fn global_timeseries_key(&self) -> RpcQueryKey { + // we include the method because that can be helpful for predicting load let method = self.method.clone(); // we don't store origin in the timeseries db. its only used for optional accounting let origin = None; @@ -133,15 +134,15 @@ impl RpcQueryStats { } } - fn opt_in_timeseries_key(&self) -> RpcQueryKey { + /// rpc keys can opt into more detailed tracking + fn opt_in_timeseries_key(&self) -> Option { // we don't store origin in the timeseries db. its only optionaly used for accounting let origin = None; let (method, rpc_secret_key_id) = match self.authorization.checks.tracking_level { TrackingLevel::None => { // this RPC key requested no tracking. this is the default. - // we still want graphs though, so we just use None as the rpc_secret_key_id - (self.method.clone(), None) + return None; } TrackingLevel::Aggregated => { // this RPC key requested tracking aggregated across all methods @@ -156,14 +157,16 @@ impl RpcQueryStats { } }; - RpcQueryKey { + let key = RpcQueryKey { response_timestamp: self.response_timestamp, archive_needed: self.archive_request, error_response: self.error_response, method, rpc_secret_key_id, origin, - } + }; + + Some(key) } } @@ -201,6 +204,10 @@ pub struct StatBuffer { tsdb_save_interval_seconds: u32, db_save_interval_seconds: u32, billing_period_seconds: i64, + global_timeseries_buffer: HashMap, + opt_in_timeseries_buffer: HashMap, + accounting_db_buffer: HashMap, + timestamp_precision: TimestampPrecision, } impl BufferedRpcQueryStats { @@ -310,18 +317,12 @@ impl BufferedRpcQueryStats { Ok(()) } - // TODO: change this to return a DataPoint? - async fn save_timeseries( + async fn build_timeseries_point( self, - bucket: &str, measurement: &str, chain_id: u64, - influxdb2_clent: &influxdb2::Client, key: RpcQueryKey, - ) -> anyhow::Result<()> { - // TODO: error if key.origin is set? - - // TODO: what name? + ) -> anyhow::Result { let mut builder = DataPoint::builder(measurement); builder = builder.tag("chain_id", chain_id.to_string()); @@ -347,16 +348,10 @@ impl BufferedRpcQueryStats { .field("sum_response_bytes", self.sum_response_bytes as i64); builder = builder.timestamp(key.response_timestamp); - let timestamp_precision = TimestampPrecision::Seconds; - let points = [builder.build()?]; + let point = builder.build()?; - // TODO: bucket should be an enum so that we don't risk typos - influxdb2_clent - .write_with_precision(bucket, stream::iter(points), timestamp_precision) - .await?; - - Ok(()) + Ok(point) } } @@ -423,6 +418,7 @@ impl StatBuffer { let (stat_sender, stat_receiver) = flume::unbounded(); + let timestamp_precision = TimestampPrecision::Seconds; let mut new = Self { chain_id, db_conn, @@ -430,6 +426,10 @@ impl StatBuffer { db_save_interval_seconds, tsdb_save_interval_seconds, billing_period_seconds, + global_timeseries_buffer: Default::default(), + opt_in_timeseries_buffer: Default::default(), + accounting_db_buffer: Default::default(), + timestamp_precision, }; // any errors inside this task will cause the application to exit @@ -452,11 +452,6 @@ impl StatBuffer { let mut db_save_interval = interval(Duration::from_secs(self.db_save_interval_seconds as u64)); - // TODO: this is used for rpc_accounting_v2 and influxdb. give it a name to match that? "stat" of some kind? - let mut global_timeseries_buffer = HashMap::::new(); - let mut opt_in_timeseries_buffer = HashMap::::new(); - let mut accounting_db_buffer = HashMap::::new(); - // TODO: Somewhere here we should probably be updating the balance of the user // And also update the credits used etc. for the referred user @@ -472,15 +467,15 @@ impl StatBuffer { let global_timeseries_key = stat.global_timeseries_key(); - global_timeseries_buffer.entry(global_timeseries_key).or_default().add(stat.clone()); + self.global_timeseries_buffer.entry(global_timeseries_key).or_default().add(stat.clone()); - let opt_in_timeseries_key = stat.opt_in_timeseries_key(); - - opt_in_timeseries_buffer.entry(opt_in_timeseries_key).or_default().add(stat.clone()); + if let Some(opt_in_timeseries_key) = stat.opt_in_timeseries_key() { + self.opt_in_timeseries_buffer.entry(opt_in_timeseries_key).or_default().add(stat.clone()); + } } if self.db_conn.is_some() { - accounting_db_buffer.entry(stat.accounting_key(self.billing_period_seconds)).or_default().add(stat); + self.accounting_db_buffer.entry(stat.accounting_key(self.billing_period_seconds)).or_default().add(stat); } } Err(err) => { @@ -491,34 +486,16 @@ impl StatBuffer { } _ = db_save_interval.tick() => { // info!("DB save internal tick"); - let db_conn = self.db_conn.as_ref().expect("db connection should always exist if there are buffered stats"); - - // TODO: batch saves - for (key, stat) in accounting_db_buffer.drain() { - // TODO: i don't like passing key (which came from the stat) to the function on the stat. but it works for now - if let Err(err) = stat.save_db(self.chain_id, db_conn, key).await { - error!("unable to save accounting entry! err={:?}", err); - }; + let count = self.save_relational_stats().await; + if count > 0 { + trace!("Saved {} stats to the relational db", count); } } _ = tsdb_save_interval.tick() => { // info!("TSDB save internal tick"); - // TODO: batch saves - // TODO: better bucket names - let influxdb_client = self.influxdb_client.as_ref().expect("influxdb client should always exist if there are buffered stats"); - - for (key, stat) in global_timeseries_buffer.drain() { - // TODO: i don't like passing key (which came from the stat) to the function on the stat. but it works for now - if let Err(err) = stat.save_timeseries(bucket.clone().as_ref(), "global_proxy", self.chain_id, influxdb_client, key).await { - error!("unable to save global stat! err={:?}", err); - }; - } - - for (key, stat) in opt_in_timeseries_buffer.drain() { - // TODO: i don't like passing key (which came from the stat) to the function on the stat. but it works for now - if let Err(err) = stat.save_timeseries(bucket.clone().as_ref(), "opt_in_proxy", self.chain_id, influxdb_client, key).await { - error!("unable to save opt-in stat! err={:?}", err); - }; + let count = self.save_tsdb_stats(&bucket).await; + if count > 0 { + trace!("Saved {} stats to the tsdb", count); } } x = shutdown_receiver.recv() => { @@ -534,62 +511,102 @@ impl StatBuffer { } } - // TODO: dry - if let Some(db_conn) = self.db_conn.as_ref() { - info!( - "saving {} buffered accounting entries", - accounting_db_buffer.len(), - ); + let saved_relational = self.save_relational_stats().await; - for (key, stat) in accounting_db_buffer.drain() { - if let Err(err) = stat.save_db(self.chain_id, db_conn, key).await { - error!( - "Unable to save accounting entry while shutting down! err={:?}", - err - ); - }; - } - } + info!("saved {} pending relational stats", saved_relational); - // TODO: dry - if let Some(influxdb_client) = self.influxdb_client.as_ref() { - info!( - "saving {} buffered global stats", - global_timeseries_buffer.len(), - ); + let saved_tsdb = self.save_tsdb_stats(&bucket).await; - for (key, stat) in global_timeseries_buffer.drain() { - if let Err(err) = stat - .save_timeseries(&bucket, "global_proxy", self.chain_id, influxdb_client, key) - .await - { - error!( - "Unable to save global stat while shutting down! err={:?}", - err - ); - }; - } - - info!( - "saving {} buffered opt-in stats", - opt_in_timeseries_buffer.len(), - ); - - for (key, stat) in opt_in_timeseries_buffer.drain() { - if let Err(err) = stat - .save_timeseries(&bucket, "opt_in_proxy", self.chain_id, influxdb_client, key) - .await - { - error!( - "unable to save opt-in stat while shutting down! err={:?}", - err - ); - }; - } - } + info!("saved {} pending tsdb stats", saved_tsdb); info!("accounting and stat save loop complete"); Ok(()) } + + async fn save_relational_stats(&mut self) -> usize { + let mut count = 0; + + if let Some(db_conn) = self.db_conn.as_ref() { + count = self.accounting_db_buffer.len(); + for (key, stat) in self.accounting_db_buffer.drain() { + // TODO: batch saves + // TODO: i don't like passing key (which came from the stat) to the function on the stat. but it works for now + if let Err(err) = stat.save_db(self.chain_id, db_conn, key).await { + error!("unable to save accounting entry! err={:?}", err); + }; + } + } + + count + } + + // TODO: bucket should be an enum so that we don't risk typos + async fn save_tsdb_stats(&mut self, bucket: &str) -> usize { + let mut count = 0; + + if let Some(influxdb_client) = self.influxdb_client.as_ref() { + // TODO: use stream::iter properly to avoid allocating this Vec + let mut points = vec![]; + + for (key, stat) in self.global_timeseries_buffer.drain() { + // TODO: i don't like passing key (which came from the stat) to the function on the stat. but it works for now + match stat + .build_timeseries_point("global_proxy", self.chain_id, key) + .await + { + Ok(point) => { + points.push(point); + } + Err(err) => { + error!("unable to build global stat! err={:?}", err); + } + }; + } + + for (key, stat) in self.opt_in_timeseries_buffer.drain() { + // TODO: i don't like passing key (which came from the stat) to the function on the stat. but it works for now + match stat + .build_timeseries_point("opt_in_proxy", self.chain_id, key) + .await + { + Ok(point) => { + points.push(point); + } + Err(err) => { + // TODO: if this errors, we throw away some of the pending stats! we should probably buffer them somewhere to be tried again + error!("unable to build opt-in stat! err={:?}", err); + } + }; + } + + count = points.len(); + + if count > 0 { + // TODO: put max_batch_size in config? + // TODO: i think the real limit is the byte size of the http request. so, a simple line count won't work very well + let max_batch_size = 100; + + let mut num_left = count; + + while num_left > 0 { + let batch_size = num_left.min(max_batch_size); + + let p = points.split_off(batch_size); + + num_left -= batch_size; + + if let Err(err) = influxdb_client + .write_with_precision(bucket, stream::iter(p), self.timestamp_precision) + .await + { + // TODO: if this errors, we throw away some of the pending stats! we should probably buffer them somewhere to be tried again + error!("unable to save {} tsdb stats! err={:?}", batch_size, err); + } + } + } + } + + count + } }