From cef26e66cc92d73e4e8d1f592f5c4d43b5c68631 Mon Sep 17 00:00:00 2001 From: yenicelik Date: Sat, 25 Mar 2023 19:26:23 +0100 Subject: [PATCH] will create a PR for some feedback --- .../bin/web3_proxy_cli/migrate_stats_to_v2.rs | 9 +++----- web3_proxy/src/stats/mod.rs | 22 ++++--------------- 2 files changed, 7 insertions(+), 24 deletions(-) diff --git a/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs b/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs index 850182e9..c781a240 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs @@ -116,8 +116,6 @@ impl MigrateStatsToV2 { None }; - info!("Background handles are: {:?}", important_background_handles); - // Basically spawn the full app, look at web3_proxy CLI while true { @@ -134,10 +132,6 @@ impl MigrateStatsToV2 { } // (2) Create request metadata objects to match the old data - let mut global_timeseries_buffer = HashMap::::new(); - let mut opt_in_timeseries_buffer = HashMap::::new(); - let mut accounting_db_buffer = HashMap::::new(); - // Iterate through all old rows, and put them into the above objects. for x in old_records.iter() { info!("Preparing for migration: {:?}", x); @@ -212,6 +206,7 @@ impl MigrateStatsToV2 { // (3) Send through a channel to a stat emitter // Send it to the stats sender if let Some(stat_sender_ref) = stat_sender.as_ref() { + info!("Method is: {:?}", x.clone().method.unwrap()); let mut response_stat = RpcQueryStats::new( x.clone().method.unwrap(), authorization.clone(), @@ -254,6 +249,8 @@ impl MigrateStatsToV2 { // If the items are in rpc_v2, delete the initial items from the database + // return Ok(()); + // (4) Update the batch in the old table with the current timestamp (Mark the batch as migrated) let old_record_ids = old_records.iter().map(|x| x.id); let update_result: UpdateResult = rpc_accounting::Entity::update_many() diff --git a/web3_proxy/src/stats/mod.rs b/web3_proxy/src/stats/mod.rs index ccdd8939..d11a53ae 100644 --- a/web3_proxy/src/stats/mod.rs +++ b/web3_proxy/src/stats/mod.rs @@ -364,7 +364,7 @@ impl RpcQueryStats { method: String, authorization: Arc, metadata: Arc, - response_bytes: usize + response_bytes: usize, ) -> Self { // TODO: try_unwrap the metadata to be sure that all the stats for this request have been collected // TODO: otherwise, i think the whole thing should be in a single lock that we can "reset" when a stat is created @@ -396,13 +396,12 @@ impl RpcQueryStats { &mut self, response_millis: u64, response_timestamp: i64, - backend_requests: u64 + backend_requests: u64, ) { self.response_millis = response_millis; self.response_timestamp = response_timestamp; self.backend_requests = backend_requests; } - } impl StatBuffer { @@ -446,7 +445,6 @@ impl StatBuffer { stat_receiver: flume::Receiver, mut shutdown_receiver: broadcast::Receiver<()>, ) -> anyhow::Result<()> { - info!("Aggregate and save loop is running"); let mut tsdb_save_interval = interval(Duration::from_secs(self.tsdb_save_interval_seconds as u64)); let mut db_save_interval = @@ -561,13 +559,7 @@ impl StatBuffer { for (key, stat) in global_timeseries_buffer.drain() { if let Err(err) = stat - .save_timeseries( - &bucket, - "global_proxy", - self.chain_id, - influxdb_client, - key, - ) + .save_timeseries(&bucket, "global_proxy", self.chain_id, influxdb_client, key) .await { error!( @@ -584,13 +576,7 @@ impl StatBuffer { for (key, stat) in opt_in_timeseries_buffer.drain() { if let Err(err) = stat - .save_timeseries( - &bucket, - "opt_in_proxy", - self.chain_id, - influxdb_client, - key, - ) + .save_timeseries(&bucket, "opt_in_proxy", self.chain_id, influxdb_client, key) .await { error!(