From 7390bb7910ed34505ee0a12ed0bded43a34ba877 Mon Sep 17 00:00:00 2001 From: yenicelik Date: Sun, 26 Mar 2023 15:53:17 +0200 Subject: [PATCH] non-lossy data transfer --- .../bin/web3_proxy_cli/migrate_stats_to_v2.rs | 62 ++++++++++++++----- web3_proxy/src/stats/mod.rs | 1 + 2 files changed, 47 insertions(+), 16 deletions(-) diff --git a/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs b/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs index c781a240..205e4bf7 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs @@ -50,6 +50,9 @@ fn datetime_utc_to_instant(datetime: DateTime) -> anyhow::Result { #[argh(subcommand, name = "migrate_stats_to_v2")] pub struct MigrateStatsToV2 {} +// I mean drop(sender) and then important_background_handle.await. No need for shutdown signal here I think. +// Don't make data lossy + impl MigrateStatsToV2 { pub async fn main( self, @@ -186,19 +189,42 @@ impl MigrateStatsToV2 { // For each frontend request, create one object that will be emitted (make sure the timestamp is new) let n = x.frontend_requests; - for _ in 1..n { + for i in 0..n { // info!("Creating a new frontend request"); + // Collect all requests here ... + let mut int_request_bytes = (x.sum_request_bytes / n); + if i == 0 { + int_request_bytes += (x.sum_request_bytes % n); + } + + let mut int_response_bytes = (x.sum_response_bytes / n); + if i == 0 { + int_response_bytes += (x.sum_response_bytes % n); + } + + let mut int_response_millis = (x.sum_response_millis / n); + if i == 0 { + int_response_millis += (x.sum_response_millis % n); + } + + let mut int_backend_requests = (x.backend_requests / n); + if i == 0 { + int_backend_requests += (x.backend_requests % n); + } + + // Add module at the last step to include for any remained that we missed ... (?) + // TODO: Create RequestMetadata let request_metadata = RequestMetadata { - start_instant: Instant::now(), // This is overwritten later on - request_bytes: (x.sum_request_bytes / n).into(), // Get the mean of all the request bytes + start_instant: Instant::now(), // This is overwritten later on + request_bytes: int_request_bytes.into(), // Get the mean of all the request bytes archive_request: x.archive_request.into(), backend_requests: Default::default(), // This is not used, instead we modify the field later no_servers: 0.into(), // This is not relevant in the new version error_response: x.error_response.into(), - response_bytes: (x.sum_response_bytes / n).into(), - response_millis: (x.sum_response_millis / n).into(), + response_bytes: int_response_bytes.into(), + response_millis: int_response_millis.into(), // We just don't have this data response_from_backup_rpc: false.into(), // I think we did not record this back then // Default::default() }; @@ -211,15 +237,15 @@ impl MigrateStatsToV2 { x.clone().method.unwrap(), authorization.clone(), Arc::new(request_metadata), - (x.sum_response_bytes / n) + (int_response_bytes) .try_into() .context("sum bytes average is not calculated properly")?, ); // Modify the timestamps .. response_stat.modify_struct( - (x.sum_response_millis / n), - x.period_datetime.timestamp(), // I suppose timestamp is millis as well ... should check this in the database - (x.backend_requests / n), + int_response_millis, + x.period_datetime.timestamp(), // I suppose timestamp is millis as well ... should check this in the (prod) database + int_backend_requests, ); info!("Sending stats: {:?}", response_stat); stat_sender_ref @@ -280,14 +306,18 @@ impl MigrateStatsToV2 { // important_background_handles.clear(); // Finally also send a shutdown signal - match app_shutdown_sender.send(()) { - Err(x) => { - panic!("Could not send shutdown signal! {:?}", x); - } - _ => {} - }; - // Drop the background handle + drop(stat_sender); + // match app_shutdown_sender.send(()) { + // Err(x) => { + // panic!("Could not send shutdown signal! {:?}", x); + // } + // _ => {} + // }; + + // TODO: Should we also write a short verifier if the migration was successful (?) + + // Drop the background handle, wait for any tasks that are on-going while let Some(x) = important_background_handles.next().await { info!("Returned item is: {:?}", x); match x { diff --git a/web3_proxy/src/stats/mod.rs b/web3_proxy/src/stats/mod.rs index d11a53ae..f3e4776b 100644 --- a/web3_proxy/src/stats/mod.rs +++ b/web3_proxy/src/stats/mod.rs @@ -40,6 +40,7 @@ pub struct RpcQueryStats { pub error_response: bool, pub request_bytes: u64, /// if backend_requests is 0, there was a cache_hit + // pub frontend_request: u64, pub backend_requests: u64, pub response_bytes: u64, pub response_millis: u64,