non-lossy data transfer

This commit is contained in:
yenicelik 2023-03-26 15:53:17 +02:00
parent cef26e66cc
commit 7390bb7910
2 changed files with 47 additions and 16 deletions

View File

@ -50,6 +50,9 @@ fn datetime_utc_to_instant(datetime: DateTime<Utc>) -> anyhow::Result<Instant> {
#[argh(subcommand, name = "migrate_stats_to_v2")] #[argh(subcommand, name = "migrate_stats_to_v2")]
pub struct MigrateStatsToV2 {} pub struct MigrateStatsToV2 {}
// I mean drop(sender) and then important_background_handle.await. No need for shutdown signal here I think.
// Don't make data lossy
impl MigrateStatsToV2 { impl MigrateStatsToV2 {
pub async fn main( pub async fn main(
self, self,
@ -186,19 +189,42 @@ impl MigrateStatsToV2 {
// For each frontend request, create one object that will be emitted (make sure the timestamp is new) // For each frontend request, create one object that will be emitted (make sure the timestamp is new)
let n = x.frontend_requests; let n = x.frontend_requests;
for _ in 1..n { for i in 0..n {
// info!("Creating a new frontend request"); // info!("Creating a new frontend request");
// Collect all requests here ...
let mut int_request_bytes = (x.sum_request_bytes / n);
if i == 0 {
int_request_bytes += (x.sum_request_bytes % n);
}
let mut int_response_bytes = (x.sum_response_bytes / n);
if i == 0 {
int_response_bytes += (x.sum_response_bytes % n);
}
let mut int_response_millis = (x.sum_response_millis / n);
if i == 0 {
int_response_millis += (x.sum_response_millis % n);
}
let mut int_backend_requests = (x.backend_requests / n);
if i == 0 {
int_backend_requests += (x.backend_requests % n);
}
// Add module at the last step to include for any remained that we missed ... (?)
// TODO: Create RequestMetadata // TODO: Create RequestMetadata
let request_metadata = RequestMetadata { let request_metadata = RequestMetadata {
start_instant: Instant::now(), // This is overwritten later on start_instant: Instant::now(), // This is overwritten later on
request_bytes: (x.sum_request_bytes / n).into(), // Get the mean of all the request bytes request_bytes: int_request_bytes.into(), // Get the mean of all the request bytes
archive_request: x.archive_request.into(), archive_request: x.archive_request.into(),
backend_requests: Default::default(), // This is not used, instead we modify the field later backend_requests: Default::default(), // This is not used, instead we modify the field later
no_servers: 0.into(), // This is not relevant in the new version no_servers: 0.into(), // This is not relevant in the new version
error_response: x.error_response.into(), error_response: x.error_response.into(),
response_bytes: (x.sum_response_bytes / n).into(), response_bytes: int_response_bytes.into(),
response_millis: (x.sum_response_millis / n).into(), response_millis: int_response_millis.into(),
// We just don't have this data // We just don't have this data
response_from_backup_rpc: false.into(), // I think we did not record this back then // Default::default() response_from_backup_rpc: false.into(), // I think we did not record this back then // Default::default()
}; };
@ -211,15 +237,15 @@ impl MigrateStatsToV2 {
x.clone().method.unwrap(), x.clone().method.unwrap(),
authorization.clone(), authorization.clone(),
Arc::new(request_metadata), Arc::new(request_metadata),
(x.sum_response_bytes / n) (int_response_bytes)
.try_into() .try_into()
.context("sum bytes average is not calculated properly")?, .context("sum bytes average is not calculated properly")?,
); );
// Modify the timestamps .. // Modify the timestamps ..
response_stat.modify_struct( response_stat.modify_struct(
(x.sum_response_millis / n), int_response_millis,
x.period_datetime.timestamp(), // I suppose timestamp is millis as well ... should check this in the database x.period_datetime.timestamp(), // I suppose timestamp is millis as well ... should check this in the (prod) database
(x.backend_requests / n), int_backend_requests,
); );
info!("Sending stats: {:?}", response_stat); info!("Sending stats: {:?}", response_stat);
stat_sender_ref stat_sender_ref
@ -280,14 +306,18 @@ impl MigrateStatsToV2 {
// important_background_handles.clear(); // important_background_handles.clear();
// Finally also send a shutdown signal // Finally also send a shutdown signal
match app_shutdown_sender.send(()) {
Err(x) => {
panic!("Could not send shutdown signal! {:?}", x);
}
_ => {}
};
// Drop the background handle drop(stat_sender);
// match app_shutdown_sender.send(()) {
// Err(x) => {
// panic!("Could not send shutdown signal! {:?}", x);
// }
// _ => {}
// };
// TODO: Should we also write a short verifier if the migration was successful (?)
// Drop the background handle, wait for any tasks that are on-going
while let Some(x) = important_background_handles.next().await { while let Some(x) = important_background_handles.next().await {
info!("Returned item is: {:?}", x); info!("Returned item is: {:?}", x);
match x { match x {

View File

@ -40,6 +40,7 @@ pub struct RpcQueryStats {
pub error_response: bool, pub error_response: bool,
pub request_bytes: u64, pub request_bytes: u64,
/// if backend_requests is 0, there was a cache_hit /// if backend_requests is 0, there was a cache_hit
// pub frontend_request: u64,
pub backend_requests: u64, pub backend_requests: u64,
pub response_bytes: u64, pub response_bytes: u64,
pub response_millis: u64, pub response_millis: u64,