From 64505102efc9c4820c279d1db662f86dbf24ed8c Mon Sep 17 00:00:00 2001 From: yenicelik Date: Tue, 21 Mar 2023 12:07:21 +0100 Subject: [PATCH 1/6] creating a CLI endpoint for the migration --- entities/src/rpc_accounting.rs | 1 + .../bin/web3_proxy_cli/migrate_stats_to_v2.rs | 70 ++++++++++++++++++- 2 files changed, 69 insertions(+), 2 deletions(-) diff --git a/entities/src/rpc_accounting.rs b/entities/src/rpc_accounting.rs index a615e6ff..70c79ab3 100644 --- a/entities/src/rpc_accounting.rs +++ b/entities/src/rpc_accounting.rs @@ -40,6 +40,7 @@ pub struct Model { pub max_response_bytes: u64, pub archive_request: bool, pub origin: Option, + pub migrated: Option } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] diff --git a/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs b/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs index 88d19dde..137ae3b9 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs @@ -1,6 +1,7 @@ +use std::net::IpAddr; use anyhow::Context; use argh::FromArgs; -use entities::{rpc_accounting, rpc_accounting_v2, user}; +use entities::{rpc_key, rpc_accounting, rpc_accounting_v2, user}; use ethers::types::Address; use hashbrown::HashMap; use log::{debug, info, warn}; @@ -8,6 +9,8 @@ use migration::sea_orm::{ self, ActiveModelTrait, ColumnTrait, DatabaseConnection, EntityTrait, IntoActiveModel, QueryFilter, QuerySelect }; +use web3_proxy::app::AuthorizationChecks; +use web3_proxy::frontend::authorization::{Authorization, AuthorizationType, RequestMetadata}; use web3_proxy::stats::{BufferedRpcQueryStats, RpcQueryKey}; /// change a user's address. @@ -21,7 +24,11 @@ impl MigrateStatsToV2 { while true { // (1) Load a batch of rows out of the old table until no more rows are left - let old_records = rpc_accounting::Entity::find().limit(10000).all(db_conn).await?; + let old_records = rpc_accounting::Entity::find() + .filter(rpc_accounting::Column::Migrated.is_null()) + .limit(10000) + .all(db_conn) + .await?; if old_records.len() == 0 { // Break out of while loop once all records have successfully been migrated ... warn!("All records seem to have been successfully migrated!"); @@ -38,6 +45,62 @@ impl MigrateStatsToV2 { info!("Preparing for migration: {:?}", x); + // TODO: Split up a single request into multiple requests ... + // according to frontend-requests, backend-requests, etc. + + // Get the rpc-key from the rpc_key_id + // Get the user-id from the rpc_key_id + let rpc_key_obj = rpc_key::Entity::find() + .filter(rpc_key::Column::id.eq(x.rpc_key_id)) + .one(db_conn) + .await?; + + // TODO: Create authrization + // We can probably also randomly generate this, as we don't care about the user (?) + let authorization_checks = AuthorizationChecks { + user_id: rpc_key_id.user_id, + rpc_secret_key: rpc_key_id.secret_key, + rpc_secret_key_id: x.rpc_key_id, + ..Default::default() + }; + // Then overwrite rpc_key_id and user_id (?) + let authorization_type = AuthorizationType::Frontend; + let authorization = Authorization::try_new( + authorization_checks, + None, + IpAddr::default(), + None, + None, + None, + authorization_type + ); + + // It will be like a fork basically (to simulate getting multiple single requests ...) + // Iterate through all frontend requests + // For each frontend request, create one object that will be emitted (make sure the timestamp is new) + + // TODO: Create RequestMetadata + let request_metadata = RequestMetadata { + start_instant: x.period_datetime.timestamp(), + request_bytes: x.request_bytes, + archive_request: x.archive_request, + backend_requests: todo!(), + no_servers: 0, // This is not relevant in the new version + error_response: x.error_response, + response_bytes: todo!(), + response_millis: todo!(), + response_from_backup_rpc: todo!() // I think we did not record this back then // Default::default() + }; + + // I suppose method cannot be empty ... (?) + // Ok, now we basically create a state event .. + RpcQueryStats::new( + x.method.unwrap(), + authorization + request_metadata, + x.response_bytes + ) + // // For each of the old rows, create a (i) RpcQueryKey and a matching BufferedRpcQueryStats object // let key = RpcQueryKey { // response_timestamp: x.period_datetime.timestamp(), @@ -77,6 +140,9 @@ impl MigrateStatsToV2 { // // BufferedRpcQueryStats } + // (N-1) Mark the batch as migrated + break; + } From 1d72a3cd449fe239d5db5f26cf5a9109f1583c36 Mon Sep 17 00:00:00 2001 From: yenicelik Date: Tue, 21 Mar 2023 12:49:36 +0100 Subject: [PATCH 2/6] will continue after bryan response --- .../bin/web3_proxy_cli/migrate_stats_to_v2.rs | 114 +++++++----------- 1 file changed, 43 insertions(+), 71 deletions(-) diff --git a/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs b/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs index 137ae3b9..280387c9 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs @@ -1,4 +1,5 @@ -use std::net::IpAddr; +use std::net::{IpAddr, Ipv4Addr}; +use std::num::NonZeroU64; use anyhow::Context; use argh::FromArgs; use entities::{rpc_key, rpc_accounting, rpc_accounting_v2, user}; @@ -10,7 +11,7 @@ use migration::sea_orm::{ QueryFilter, QuerySelect }; use web3_proxy::app::AuthorizationChecks; -use web3_proxy::frontend::authorization::{Authorization, AuthorizationType, RequestMetadata}; +use web3_proxy::frontend::authorization::{Authorization, AuthorizationType, RequestMetadata, RpcSecretKey}; use web3_proxy::stats::{BufferedRpcQueryStats, RpcQueryKey}; /// change a user's address. @@ -50,25 +51,36 @@ impl MigrateStatsToV2 { // Get the rpc-key from the rpc_key_id // Get the user-id from the rpc_key_id - let rpc_key_obj = rpc_key::Entity::find() - .filter(rpc_key::Column::id.eq(x.rpc_key_id)) - .one(db_conn) - .await?; + let authorization_checks = match x.rpc_key_id { + Some(rpc_key_id) => { + let rpc_key_obj = rpc_key::Entity::find() + .filter(rpc_key::Column::Id.eq(rpc_key_id)) + .one(db_conn) + .await? + .unwrap(); - // TODO: Create authrization - // We can probably also randomly generate this, as we don't care about the user (?) - let authorization_checks = AuthorizationChecks { - user_id: rpc_key_id.user_id, - rpc_secret_key: rpc_key_id.secret_key, - rpc_secret_key_id: x.rpc_key_id, - ..Default::default() + // TODO: Create authrization + // We can probably also randomly generate this, as we don't care about the user (?) + AuthorizationChecks { + user_id: rpc_key_obj.user_id, + rpc_secret_key: Some(RpcSecretKey::Uuid(rpc_key_obj.secret_key)), + rpc_secret_key_id: Some(NonZeroU64::new(rpc_key_id).unwrap()), + ..Default::default() + } + }, + None => { + AuthorizationChecks { + ..Default::default() + } + } }; + // Then overwrite rpc_key_id and user_id (?) let authorization_type = AuthorizationType::Frontend; let authorization = Authorization::try_new( authorization_checks, None, - IpAddr::default(), + IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), None, None, None, @@ -79,65 +91,25 @@ impl MigrateStatsToV2 { // Iterate through all frontend requests // For each frontend request, create one object that will be emitted (make sure the timestamp is new) - // TODO: Create RequestMetadata - let request_metadata = RequestMetadata { - start_instant: x.period_datetime.timestamp(), - request_bytes: x.request_bytes, - archive_request: x.archive_request, - backend_requests: todo!(), - no_servers: 0, // This is not relevant in the new version - error_response: x.error_response, - response_bytes: todo!(), - response_millis: todo!(), - response_from_backup_rpc: todo!() // I think we did not record this back then // Default::default() - }; + for _ in 1..x.frontend_requests { + info!("Creating a new frontend request"); + + // TODO: Create RequestMetadata + let request_metadata = RequestMetadata { + start_instant: x.period_datetime.timestamp(), + request_bytes: x.request_bytes, + archive_request: x.archive_request, + backend_requests: todo!(), + no_servers: 0, // This is not relevant in the new version + error_response: x.error_response, + response_bytes: todo!(), + response_millis: todo!(), + response_from_backup_rpc: todo!() // I think we did not record this back then // Default::default() + }; + + } - // I suppose method cannot be empty ... (?) - // Ok, now we basically create a state event .. - RpcQueryStats::new( - x.method.unwrap(), - authorization - request_metadata, - x.response_bytes - ) - // // For each of the old rows, create a (i) RpcQueryKey and a matching BufferedRpcQueryStats object - // let key = RpcQueryKey { - // response_timestamp: x.period_datetime.timestamp(), - // archive_needed: x.archive_needed, - // error_response: x.error_response, - // period_datetime: x.period_datetime.timestamp(), - // rpc_secret_key_id: x.rpc_key_id, - // origin: x.origin, - // method: x.method - // }; - // - // // Create the corresponding BufferedRpcQueryStats object - // let val = BufferedRpcQueryStats { - // frontend_requests: x.frontend_requests, - // backend_requests: x.backend_requests, - // backend_retries: x.backend_retries, - // no_servers: x.no_servers, - // cache_misses: x.cache_misses, - // cache_hits: x.cache_hits, - // sum_request_bytes: x.sum_request_bytes, - // sum_response_bytes: x.sum_response_bytes, - // sum_response_millis: x.sum_response_millis - // }; - // - // // TODO: Create authorization, request metadata, and bytes ... but bytes we don't really keep track of! - // // We can generate dummy bytes of the same length though, this may work as well - // - // // TODO: Period datetime is also a question of what it is - // // let response_stat = RpcQueryStats::new( - // // x.method, - // // authorization.clone(), - // // request_metadata.clone(), - // // response_bytes, - // // x.period_datetime - // // ); - // - // // BufferedRpcQueryStats } // (N-1) Mark the batch as migrated From 53c7541fed9ac63acbb27631d3e3d97293f8e6f3 Mon Sep 17 00:00:00 2001 From: yenicelik Date: Sat, 25 Mar 2023 17:56:45 +0100 Subject: [PATCH 3/6] seems to wait until everything is flushed. I will have to debug some stuff that isnt saved properly in the mysql (and then check influx) --- web3_proxy/src/app/mod.rs | 38 ++- web3_proxy/src/bin/web3_proxy_cli/main.rs | 7 +- .../bin/web3_proxy_cli/migrate_stats_to_v2.rs | 306 +++++++++++++----- web3_proxy/src/rpcs/many.rs | 116 +++---- web3_proxy/src/stats/mod.rs | 17 + 5 files changed, 336 insertions(+), 148 deletions(-) diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 3834eb2a..424bee93 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -67,7 +67,7 @@ pub static APP_USER_AGENT: &str = concat!( ); // aggregate across 1 week -const BILLING_PERIOD_SECONDS: i64 = 60 * 60 * 24 * 7; +pub const BILLING_PERIOD_SECONDS: i64 = 60 * 60 * 24 * 7; #[derive(Debug, From)] struct ResponseCacheKey { @@ -575,7 +575,12 @@ impl Web3ProxyApp { // stats can be saved in mysql, influxdb, both, or none let stat_sender = if let Some(emitter_spawn) = StatBuffer::try_spawn( top_config.app.chain_id, - top_config.app.influxdb_bucket.clone().context("No influxdb bucket was provided")?.to_owned(), + top_config + .app + .influxdb_bucket + .clone() + .context("No influxdb bucket was provided")? + .to_owned(), db_conn.clone(), influxdb_client.clone(), 60, @@ -812,26 +817,27 @@ impl Web3ProxyApp { app_handles.push(config_handle); } -// ======= -// if important_background_handles.is_empty() { -// info!("no important background handles"); -// -// let f = tokio::spawn(async move { -// let _ = background_shutdown_receiver.recv().await; -// -// Ok(()) -// }); -// -// important_background_handles.push(f); -// >>>>>>> 77df3fa (stats v2) + // ======= + // if important_background_handles.is_empty() { + // info!("no important background handles"); + // + // let f = tokio::spawn(async move { + // let _ = background_shutdown_receiver.recv().await; + // + // Ok(()) + // }); + // + // important_background_handles.push(f); + // >>>>>>> 77df3fa (stats v2) Ok(( app, app_handles, important_background_handles, new_top_config_sender, - consensus_connections_watcher - ).into()) + consensus_connections_watcher, + ) + .into()) } pub async fn apply_top_config(&self, new_top_config: TopConfig) -> anyhow::Result<()> { diff --git a/web3_proxy/src/bin/web3_proxy_cli/main.rs b/web3_proxy/src/bin/web3_proxy_cli/main.rs index 88b64a0e..7cbeef83 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/main.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/main.rs @@ -374,12 +374,17 @@ fn main() -> anyhow::Result<()> { x.main(&db_conn).await } SubCommand::MigrateStatsToV2(x) => { + + let top_config = top_config.expect("--config is required to run the migration from stats-mysql to stats-influx"); + // let top_config_path = + // top_config_path.expect("path must be set if top_config exists"); + let db_url = cli_config .db_url .expect("'--config' (with a db) or '--db-url' is required to run the migration from stats-mysql to stats-influx"); let db_conn = get_db(db_url, 1, 1).await?; - x.main(&db_conn).await + x.main(top_config, &db_conn).await } SubCommand::Pagerduty(x) => { if cli_config.sentry_url.is_none() { diff --git a/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs b/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs index 280387c9..850182e9 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs @@ -1,18 +1,49 @@ -use std::net::{IpAddr, Ipv4Addr}; -use std::num::NonZeroU64; use anyhow::Context; use argh::FromArgs; -use entities::{rpc_key, rpc_accounting, rpc_accounting_v2, user}; +use chrono::{DateTime, Utc}; +use entities::{rpc_accounting, rpc_accounting_v2, rpc_key, user}; use ethers::types::Address; +use futures::stream::FuturesUnordered; +use futures::StreamExt; use hashbrown::HashMap; -use log::{debug, info, warn}; +use log::{debug, error, info, trace, warn}; use migration::sea_orm::{ self, ActiveModelTrait, ColumnTrait, DatabaseConnection, EntityTrait, IntoActiveModel, - QueryFilter, QuerySelect + QueryFilter, QuerySelect, UpdateResult, }; -use web3_proxy::app::AuthorizationChecks; -use web3_proxy::frontend::authorization::{Authorization, AuthorizationType, RequestMetadata, RpcSecretKey}; -use web3_proxy::stats::{BufferedRpcQueryStats, RpcQueryKey}; +use migration::{Expr, Value}; +use std::mem::swap; +use std::net::{IpAddr, Ipv4Addr}; +use std::num::NonZeroU64; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use tokio::sync::broadcast; +use tokio::time::{sleep, Instant}; +use web3_proxy::app::{AuthorizationChecks, Web3ProxyApp, BILLING_PERIOD_SECONDS}; +use web3_proxy::config::TopConfig; +use web3_proxy::frontend::authorization::{ + Authorization, AuthorizationType, RequestMetadata, RpcSecretKey, +}; +use web3_proxy::stats::{BufferedRpcQueryStats, RpcQueryKey, RpcQueryStats, StatBuffer}; + +// Helper function to go from DateTime to Instant +fn datetime_utc_to_instant(datetime: DateTime) -> anyhow::Result { + let epoch = datetime.timestamp(); // Get the Unix timestamp + let nanos = datetime.timestamp_subsec_nanos(); + + let duration_since_epoch = Duration::new(epoch as u64, nanos); + // let duration_since_datetime = Duration::new(, nanos); + let instant_new = Instant::now(); + warn!("Instant new is: {:?}", instant_new); + let unix_epoch = SystemTime::now().duration_since(UNIX_EPOCH).unwrap(); + warn!("Instant since unix epoch is: {:?}", unix_epoch); + + instant_new + .checked_sub(unix_epoch) + .context("Could not subtract unix epoch from instant now")? + .checked_add(duration_since_epoch) + .context("Could not add duration since epoch for updated time") +} /// change a user's address. #[derive(FromArgs, PartialEq, Eq, Debug)] @@ -20,14 +51,80 @@ use web3_proxy::stats::{BufferedRpcQueryStats, RpcQueryKey}; pub struct MigrateStatsToV2 {} impl MigrateStatsToV2 { - pub async fn main(self, db_conn: &DatabaseConnection) -> anyhow::Result<()> { + pub async fn main( + self, + top_config: TopConfig, + db_conn: &DatabaseConnection, + ) -> anyhow::Result<()> { + // Also add influxdb container ... + // let mut spawned_app = + // Web3ProxyApp::spawn(top_config.clone(), 2, app_shutdown_sender.clone()).await?; + + // we wouldn't really need this, but let's spawn this anyways + // easier than debugging the rest I suppose + let (app_shutdown_sender, _app_shutdown_receiver) = broadcast::channel(1); + let rpc_account_shutdown_recevier = app_shutdown_sender.subscribe(); + + // we must wait for these to end on their own (and they need to subscribe to shutdown_sender) + let mut important_background_handles = FuturesUnordered::new(); + + // Spawn the influxdb + let influxdb_client = match top_config.app.influxdb_host.as_ref() { + Some(influxdb_host) => { + let influxdb_org = top_config + .app + .influxdb_org + .clone() + .expect("influxdb_org needed when influxdb_host is set"); + let influxdb_token = top_config + .app + .influxdb_token + .clone() + .expect("influxdb_token needed when influxdb_host is set"); + + let influxdb_client = + influxdb2::Client::new(influxdb_host, influxdb_org, influxdb_token); + + // TODO: test the client now. having a stat for "started" can be useful on graphs to mark deploys + + Some(influxdb_client) + } + None => None, + }; + + // Spawn the stat-sender + let stat_sender = if let Some(emitter_spawn) = StatBuffer::try_spawn( + top_config.app.chain_id, + top_config + .app + .influxdb_bucket + .clone() + .context("No influxdb bucket was provided")? + .to_owned(), + Some(db_conn.clone()), + influxdb_client.clone(), + 30, + 1, + BILLING_PERIOD_SECONDS, + rpc_account_shutdown_recevier, + )? { + // since the database entries are used for accounting, we want to be sure everything is saved before exiting + important_background_handles.push(emitter_spawn.background_handle); + + Some(emitter_spawn.stat_sender) + } else { + None + }; + + info!("Background handles are: {:?}", important_background_handles); + + // Basically spawn the full app, look at web3_proxy CLI while true { - // (1) Load a batch of rows out of the old table until no more rows are left let old_records = rpc_accounting::Entity::find() .filter(rpc_accounting::Column::Migrated.is_null()) - .limit(10000) + .limit(2) .all(db_conn) .await?; if old_records.len() == 0 { @@ -42,8 +139,7 @@ impl MigrateStatsToV2 { let mut accounting_db_buffer = HashMap::::new(); // Iterate through all old rows, and put them into the above objects. - for x in old_records { - + for x in old_records.iter() { info!("Preparing for migration: {:?}", x); // TODO: Split up a single request into multiple requests ... @@ -57,103 +153,163 @@ impl MigrateStatsToV2 { .filter(rpc_key::Column::Id.eq(rpc_key_id)) .one(db_conn) .await? - .unwrap(); + .context("Could not find rpc_key_obj for the given rpc_key_id")?; // TODO: Create authrization // We can probably also randomly generate this, as we don't care about the user (?) AuthorizationChecks { user_id: rpc_key_obj.user_id, rpc_secret_key: Some(RpcSecretKey::Uuid(rpc_key_obj.secret_key)), - rpc_secret_key_id: Some(NonZeroU64::new(rpc_key_id).unwrap()), - ..Default::default() - } - }, - None => { - AuthorizationChecks { + rpc_secret_key_id: Some( + NonZeroU64::new(rpc_key_id) + .context("Could not use rpc_key_id to create a u64")?, + ), ..Default::default() } } + None => AuthorizationChecks { + ..Default::default() + }, }; // Then overwrite rpc_key_id and user_id (?) let authorization_type = AuthorizationType::Frontend; - let authorization = Authorization::try_new( - authorization_checks, - None, - IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), - None, - None, - None, - authorization_type + let authorization = Arc::new( + Authorization::try_new( + authorization_checks, + None, + IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), + None, + None, + None, + authorization_type, + ) + .context("Initializing Authorization Struct was not successful")?, ); // It will be like a fork basically (to simulate getting multiple single requests ...) // Iterate through all frontend requests // For each frontend request, create one object that will be emitted (make sure the timestamp is new) + let n = x.frontend_requests; - for _ in 1..x.frontend_requests { - info!("Creating a new frontend request"); + for _ in 1..n { + // info!("Creating a new frontend request"); // TODO: Create RequestMetadata let request_metadata = RequestMetadata { - start_instant: x.period_datetime.timestamp(), - request_bytes: x.request_bytes, - archive_request: x.archive_request, - backend_requests: todo!(), - no_servers: 0, // This is not relevant in the new version - error_response: x.error_response, - response_bytes: todo!(), - response_millis: todo!(), - response_from_backup_rpc: todo!() // I think we did not record this back then // Default::default() + start_instant: Instant::now(), // This is overwritten later on + request_bytes: (x.sum_request_bytes / n).into(), // Get the mean of all the request bytes + archive_request: x.archive_request.into(), + backend_requests: Default::default(), // This is not used, instead we modify the field later + no_servers: 0.into(), // This is not relevant in the new version + error_response: x.error_response.into(), + response_bytes: (x.sum_response_bytes / n).into(), + response_millis: (x.sum_response_millis / n).into(), + // We just don't have this data + response_from_backup_rpc: false.into(), // I think we did not record this back then // Default::default() }; + // (3) Send through a channel to a stat emitter + // Send it to the stats sender + if let Some(stat_sender_ref) = stat_sender.as_ref() { + let mut response_stat = RpcQueryStats::new( + x.clone().method.unwrap(), + authorization.clone(), + Arc::new(request_metadata), + (x.sum_response_bytes / n) + .try_into() + .context("sum bytes average is not calculated properly")?, + ); + // Modify the timestamps .. + response_stat.modify_struct( + (x.sum_response_millis / n), + x.period_datetime.timestamp(), // I suppose timestamp is millis as well ... should check this in the database + (x.backend_requests / n), + ); + info!("Sending stats: {:?}", response_stat); + stat_sender_ref + // .send(response_stat.into()) + .send_async(response_stat.into()) + .await + .context("stat_sender sending response_stat")?; + info!("Send! {:?}", stat_sender); + } else { + panic!("Stat sender was not spawned!"); + } + + // Create a new stats object + // Add it to the StatBuffer } - + // Let the stat_sender spawn / flush ... + // spawned_app.app.stat_sender.aggregate_and_save_loop() + // Send a signal to save ... } - // (N-1) Mark the batch as migrated - break; + // (3) Await that all items are properly processed + // TODO: Await all the background handles + info!("Waiting for a second until all is flushed"); + // Only after this mark all the items as processed / completed + + // If the items are in rpc_v2, delete the initial items from the database + + // (4) Update the batch in the old table with the current timestamp (Mark the batch as migrated) + let old_record_ids = old_records.iter().map(|x| x.id); + let update_result: UpdateResult = rpc_accounting::Entity::update_many() + .col_expr( + rpc_accounting::Column::Migrated, + Expr::value(Value::ChronoDateTimeUtc(Some(Box::new( + chrono::offset::Utc::now(), + )))), + ) + .filter(rpc_accounting::Column::Id.is_in(old_record_ids)) + // .set(pear) + .exec(db_conn) + .await?; + + info!("Update result is: {:?}", update_result); + + // (N-1) Mark the batch as migrated } + info!( + "Background handles (2) are: {:?}", + important_background_handles + ); + // Drop the handle + // Send the shutdown signal here (?) + // important_background_handles.clear(); + // Finally also send a shutdown signal + match app_shutdown_sender.send(()) { + Err(x) => { + panic!("Could not send shutdown signal! {:?}", x); + } + _ => {} + }; + // Drop the background handle + while let Some(x) = important_background_handles.next().await { + info!("Returned item is: {:?}", x); + match x { + Err(e) => { + error!("{:?}", e); + } + Ok(Err(e)) => { + error!("{:?}", e); + } + Ok(Ok(_)) => { + // TODO: how can we know which handle exited? + info!("a background handle exited"); + // Pop it in this case? + continue; + } + } + } - // (3) Update the batch in the old table with the current timestamp - - // (4) Send through a channel to a stat emitter - - - - // let old_address: Address = self.old_address.parse()?; - // let new_address: Address = self.new_address.parse()?; - // - // let old_address: Vec = old_address.to_fixed_bytes().into(); - // let new_address: Vec = new_address.to_fixed_bytes().into(); - // - // let u = user::Entity::find() - // .filter(user::Column::Address.eq(old_address)) - // .one(db_conn) - // .await? - // .context("No user found with that address")?; - // - // debug!("initial user: {:#?}", u); - // - // if u.address == new_address { - // info!("user already has this address"); - // } else { - // let mut u = u.into_active_model(); - // - // u.address = sea_orm::Set(new_address); - // - // let u = u.save(db_conn).await?; - // - // info!("changed user address"); - // - // debug!("updated user: {:#?}", u); - // } + info!("Here (?)"); Ok(()) } diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 1a1b8354..0c2528a8 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -34,6 +34,7 @@ use std::sync::atomic::{self, Ordering}; use std::sync::Arc; use std::{cmp, fmt}; use thread_fast_rng::rand::seq::SliceRandom; +use tokio; use tokio::sync::{broadcast, watch}; use tokio::task; use tokio::time::{interval, sleep, sleep_until, Duration, Instant, MissedTickBehavior}; @@ -167,7 +168,8 @@ impl Web3Rpcs { .max_capacity(10_000) .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()); - let (watch_consensus_rpcs_sender, consensus_connections_watcher) = watch::channel(Default::default()); + let (watch_consensus_rpcs_sender, consensus_connections_watcher) = + watch::channel(Default::default()); // by_name starts empty. self.apply_server_configs will add to it let by_name = Default::default(); @@ -318,43 +320,43 @@ impl Web3Rpcs { } } -// <<<<<<< HEAD + // <<<<<<< HEAD Ok(()) } -// ======= -// // TODO: max_capacity and time_to_idle from config -// // all block hashes are the same size, so no need for weigher -// let block_hashes = Cache::builder() -// .time_to_idle(Duration::from_secs(600)) -// .max_capacity(10_000) -// .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()); -// // all block numbers are the same size, so no need for weigher -// let block_numbers = Cache::builder() -// .time_to_idle(Duration::from_secs(600)) -// .max_capacity(10_000) -// .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()); -// -// let (watch_consensus_connections_sender, consensus_connections_watcher) = -// watch::channel(Default::default()); -// -// let watch_consensus_head_receiver = -// watch_consensus_head_sender.as_ref().map(|x| x.subscribe()); -// -// let connections = Arc::new(Self { -// by_name: connections, -// watch_consensus_rpcs_sender: watch_consensus_connections_sender, -// watch_consensus_head_receiver, -// pending_transactions, -// block_hashes, -// block_numbers, -// min_sum_soft_limit, -// min_head_rpcs, -// max_block_age, -// max_block_lag, -// }); -// -// let authorization = Arc::new(Authorization::internal(db_conn.clone())?); -// >>>>>>> 77df3fa (stats v2) + // ======= + // // TODO: max_capacity and time_to_idle from config + // // all block hashes are the same size, so no need for weigher + // let block_hashes = Cache::builder() + // .time_to_idle(Duration::from_secs(600)) + // .max_capacity(10_000) + // .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()); + // // all block numbers are the same size, so no need for weigher + // let block_numbers = Cache::builder() + // .time_to_idle(Duration::from_secs(600)) + // .max_capacity(10_000) + // .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()); + // + // let (watch_consensus_connections_sender, consensus_connections_watcher) = + // watch::channel(Default::default()); + // + // let watch_consensus_head_receiver = + // watch_consensus_head_sender.as_ref().map(|x| x.subscribe()); + // + // let connections = Arc::new(Self { + // by_name: connections, + // watch_consensus_rpcs_sender: watch_consensus_connections_sender, + // watch_consensus_head_receiver, + // pending_transactions, + // block_hashes, + // block_numbers, + // min_sum_soft_limit, + // min_head_rpcs, + // max_block_age, + // max_block_lag, + // }); + // + // let authorization = Arc::new(Authorization::internal(db_conn.clone())?); + // >>>>>>> 77df3fa (stats v2) pub fn get(&self, conn_name: &str) -> Option> { self.by_name.read().get(conn_name).cloned() @@ -364,12 +366,12 @@ impl Web3Rpcs { self.by_name.read().len() } -// <<<<<<< HEAD + // <<<<<<< HEAD pub fn is_empty(&self) -> bool { self.by_name.read().is_empty() -// ======= -// Ok((connections, handle, consensus_connections_watcher)) -// >>>>>>> 77df3fa (stats v2) + // ======= + // Ok((connections, handle, consensus_connections_watcher)) + // >>>>>>> 77df3fa (stats v2) } pub fn min_head_rpcs(&self) -> usize { @@ -395,7 +397,7 @@ impl Web3Rpcs { let clone = self.clone(); let authorization = authorization.clone(); let pending_tx_id_receiver = self.pending_tx_id_receiver.clone(); - let handle = task::spawn(async move { + let handle = tokio::task::spawn(async move { // TODO: set up this future the same as the block funnel while let Ok((pending_tx_id, rpc)) = pending_tx_id_receiver.recv_async().await { let f = clone.clone().process_incoming_tx_id( @@ -418,7 +420,7 @@ impl Web3Rpcs { let connections = Arc::clone(&self); let pending_tx_sender = pending_tx_sender.clone(); - let handle = task::Builder::default() + let handle = tokio::task::Builder::default() .name("process_incoming_blocks") .spawn(async move { connections @@ -432,12 +434,14 @@ impl Web3Rpcs { if futures.is_empty() { // no transaction or block subscriptions. - let handle = task::Builder::default().name("noop").spawn(async move { - loop { - sleep(Duration::from_secs(600)).await; - // TODO: "every interval, check that the provider is still connected" - } - })?; + let handle = tokio::task::Builder::default() + .name("noop") + .spawn(async move { + loop { + sleep(Duration::from_secs(600)).await; + // TODO: "every interval, check that the provider is still connected" + } + })?; futures.push(flatten_handle(handle)); } @@ -884,11 +888,11 @@ impl Web3Rpcs { // TODO: maximum retries? right now its the total number of servers loop { -// <<<<<<< HEAD + // <<<<<<< HEAD if skip_rpcs.len() >= self.by_name.read().len() { -// ======= -// if skip_rpcs.len() == self.by_name.len() { -// >>>>>>> 77df3fa (stats v2) + // ======= + // if skip_rpcs.len() == self.by_name.len() { + // >>>>>>> 77df3fa (stats v2) break; } @@ -1159,18 +1163,18 @@ impl Web3Rpcs { request_metadata.no_servers.fetch_add(1, Ordering::Release); } -// <<<<<<< HEAD + // <<<<<<< HEAD watch_consensus_rpcs.changed().await?; watch_consensus_rpcs.borrow_and_update(); -// ======= + // ======= // TODO: i don't think this will ever happen // TODO: return a 502? if it does? // return Err(anyhow::anyhow!("no available rpcs!")); // TODO: sleep how long? // TODO: subscribe to something in ConsensusWeb3Rpcs instead sleep(Duration::from_millis(200)).await; -// >>>>>>> 77df3fa (stats v2) + // >>>>>>> 77df3fa (stats v2) continue; } @@ -1299,13 +1303,13 @@ mod tests { // TODO: why is this allow needed? does tokio::test get in the way somehow? #![allow(unused_imports)] - use std::time::{SystemTime, UNIX_EPOCH}; use super::*; use crate::rpcs::consensus::ConsensusFinder; use crate::rpcs::{blockchain::Web3ProxyBlock, provider::Web3Provider}; use ethers::types::{Block, U256}; use log::{trace, LevelFilter}; use parking_lot::RwLock; + use std::time::{SystemTime, UNIX_EPOCH}; use tokio::sync::RwLock as AsyncRwLock; #[tokio::test] diff --git a/web3_proxy/src/stats/mod.rs b/web3_proxy/src/stats/mod.rs index 99cb77df..ccdd8939 100644 --- a/web3_proxy/src/stats/mod.rs +++ b/web3_proxy/src/stats/mod.rs @@ -391,6 +391,18 @@ impl RpcQueryStats { } } + /// Only used for migration from stats_v1 to stats_v2/v3 + pub fn modify_struct( + &mut self, + response_millis: u64, + response_timestamp: i64, + backend_requests: u64 + ) { + self.response_millis = response_millis; + self.response_timestamp = response_timestamp; + self.backend_requests = backend_requests; + } + } impl StatBuffer { @@ -434,6 +446,7 @@ impl StatBuffer { stat_receiver: flume::Receiver, mut shutdown_receiver: broadcast::Receiver<()>, ) -> anyhow::Result<()> { + info!("Aggregate and save loop is running"); let mut tsdb_save_interval = interval(Duration::from_secs(self.tsdb_save_interval_seconds as u64)); let mut db_save_interval = @@ -450,6 +463,7 @@ impl StatBuffer { loop { tokio::select! { stat = stat_receiver.recv_async() => { + info!("Received stat"); // save the stat to a buffer match stat { Ok(AppStat::RpcQuery(stat)) => { @@ -476,6 +490,7 @@ impl StatBuffer { } } _ = db_save_interval.tick() => { + info!("DB save internal tick"); let db_conn = self.db_conn.as_ref().expect("db connection should always exist if there are buffered stats"); // TODO: batch saves @@ -487,6 +502,7 @@ impl StatBuffer { } } _ = tsdb_save_interval.tick() => { + info!("TSDB save internal tick"); // TODO: batch saves // TODO: better bucket names let influxdb_client = self.influxdb_client.as_ref().expect("influxdb client should always exist if there are buffered stats"); @@ -506,6 +522,7 @@ impl StatBuffer { } } x = shutdown_receiver.recv() => { + info!("shutdown signal ---"); match x { Ok(_) => { info!("stat_loop shutting down"); From cef26e66cc92d73e4e8d1f592f5c4d43b5c68631 Mon Sep 17 00:00:00 2001 From: yenicelik Date: Sat, 25 Mar 2023 19:26:23 +0100 Subject: [PATCH 4/6] will create a PR for some feedback --- .../bin/web3_proxy_cli/migrate_stats_to_v2.rs | 9 +++----- web3_proxy/src/stats/mod.rs | 22 ++++--------------- 2 files changed, 7 insertions(+), 24 deletions(-) diff --git a/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs b/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs index 850182e9..c781a240 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs @@ -116,8 +116,6 @@ impl MigrateStatsToV2 { None }; - info!("Background handles are: {:?}", important_background_handles); - // Basically spawn the full app, look at web3_proxy CLI while true { @@ -134,10 +132,6 @@ impl MigrateStatsToV2 { } // (2) Create request metadata objects to match the old data - let mut global_timeseries_buffer = HashMap::::new(); - let mut opt_in_timeseries_buffer = HashMap::::new(); - let mut accounting_db_buffer = HashMap::::new(); - // Iterate through all old rows, and put them into the above objects. for x in old_records.iter() { info!("Preparing for migration: {:?}", x); @@ -212,6 +206,7 @@ impl MigrateStatsToV2 { // (3) Send through a channel to a stat emitter // Send it to the stats sender if let Some(stat_sender_ref) = stat_sender.as_ref() { + info!("Method is: {:?}", x.clone().method.unwrap()); let mut response_stat = RpcQueryStats::new( x.clone().method.unwrap(), authorization.clone(), @@ -254,6 +249,8 @@ impl MigrateStatsToV2 { // If the items are in rpc_v2, delete the initial items from the database + // return Ok(()); + // (4) Update the batch in the old table with the current timestamp (Mark the batch as migrated) let old_record_ids = old_records.iter().map(|x| x.id); let update_result: UpdateResult = rpc_accounting::Entity::update_many() diff --git a/web3_proxy/src/stats/mod.rs b/web3_proxy/src/stats/mod.rs index ccdd8939..d11a53ae 100644 --- a/web3_proxy/src/stats/mod.rs +++ b/web3_proxy/src/stats/mod.rs @@ -364,7 +364,7 @@ impl RpcQueryStats { method: String, authorization: Arc, metadata: Arc, - response_bytes: usize + response_bytes: usize, ) -> Self { // TODO: try_unwrap the metadata to be sure that all the stats for this request have been collected // TODO: otherwise, i think the whole thing should be in a single lock that we can "reset" when a stat is created @@ -396,13 +396,12 @@ impl RpcQueryStats { &mut self, response_millis: u64, response_timestamp: i64, - backend_requests: u64 + backend_requests: u64, ) { self.response_millis = response_millis; self.response_timestamp = response_timestamp; self.backend_requests = backend_requests; } - } impl StatBuffer { @@ -446,7 +445,6 @@ impl StatBuffer { stat_receiver: flume::Receiver, mut shutdown_receiver: broadcast::Receiver<()>, ) -> anyhow::Result<()> { - info!("Aggregate and save loop is running"); let mut tsdb_save_interval = interval(Duration::from_secs(self.tsdb_save_interval_seconds as u64)); let mut db_save_interval = @@ -561,13 +559,7 @@ impl StatBuffer { for (key, stat) in global_timeseries_buffer.drain() { if let Err(err) = stat - .save_timeseries( - &bucket, - "global_proxy", - self.chain_id, - influxdb_client, - key, - ) + .save_timeseries(&bucket, "global_proxy", self.chain_id, influxdb_client, key) .await { error!( @@ -584,13 +576,7 @@ impl StatBuffer { for (key, stat) in opt_in_timeseries_buffer.drain() { if let Err(err) = stat - .save_timeseries( - &bucket, - "opt_in_proxy", - self.chain_id, - influxdb_client, - key, - ) + .save_timeseries(&bucket, "opt_in_proxy", self.chain_id, influxdb_client, key) .await { error!( From 7390bb7910ed34505ee0a12ed0bded43a34ba877 Mon Sep 17 00:00:00 2001 From: yenicelik Date: Sun, 26 Mar 2023 15:53:17 +0200 Subject: [PATCH 5/6] non-lossy data transfer --- .../bin/web3_proxy_cli/migrate_stats_to_v2.rs | 62 ++++++++++++++----- web3_proxy/src/stats/mod.rs | 1 + 2 files changed, 47 insertions(+), 16 deletions(-) diff --git a/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs b/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs index c781a240..205e4bf7 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs @@ -50,6 +50,9 @@ fn datetime_utc_to_instant(datetime: DateTime) -> anyhow::Result { #[argh(subcommand, name = "migrate_stats_to_v2")] pub struct MigrateStatsToV2 {} +// I mean drop(sender) and then important_background_handle.await. No need for shutdown signal here I think. +// Don't make data lossy + impl MigrateStatsToV2 { pub async fn main( self, @@ -186,19 +189,42 @@ impl MigrateStatsToV2 { // For each frontend request, create one object that will be emitted (make sure the timestamp is new) let n = x.frontend_requests; - for _ in 1..n { + for i in 0..n { // info!("Creating a new frontend request"); + // Collect all requests here ... + let mut int_request_bytes = (x.sum_request_bytes / n); + if i == 0 { + int_request_bytes += (x.sum_request_bytes % n); + } + + let mut int_response_bytes = (x.sum_response_bytes / n); + if i == 0 { + int_response_bytes += (x.sum_response_bytes % n); + } + + let mut int_response_millis = (x.sum_response_millis / n); + if i == 0 { + int_response_millis += (x.sum_response_millis % n); + } + + let mut int_backend_requests = (x.backend_requests / n); + if i == 0 { + int_backend_requests += (x.backend_requests % n); + } + + // Add module at the last step to include for any remained that we missed ... (?) + // TODO: Create RequestMetadata let request_metadata = RequestMetadata { - start_instant: Instant::now(), // This is overwritten later on - request_bytes: (x.sum_request_bytes / n).into(), // Get the mean of all the request bytes + start_instant: Instant::now(), // This is overwritten later on + request_bytes: int_request_bytes.into(), // Get the mean of all the request bytes archive_request: x.archive_request.into(), backend_requests: Default::default(), // This is not used, instead we modify the field later no_servers: 0.into(), // This is not relevant in the new version error_response: x.error_response.into(), - response_bytes: (x.sum_response_bytes / n).into(), - response_millis: (x.sum_response_millis / n).into(), + response_bytes: int_response_bytes.into(), + response_millis: int_response_millis.into(), // We just don't have this data response_from_backup_rpc: false.into(), // I think we did not record this back then // Default::default() }; @@ -211,15 +237,15 @@ impl MigrateStatsToV2 { x.clone().method.unwrap(), authorization.clone(), Arc::new(request_metadata), - (x.sum_response_bytes / n) + (int_response_bytes) .try_into() .context("sum bytes average is not calculated properly")?, ); // Modify the timestamps .. response_stat.modify_struct( - (x.sum_response_millis / n), - x.period_datetime.timestamp(), // I suppose timestamp is millis as well ... should check this in the database - (x.backend_requests / n), + int_response_millis, + x.period_datetime.timestamp(), // I suppose timestamp is millis as well ... should check this in the (prod) database + int_backend_requests, ); info!("Sending stats: {:?}", response_stat); stat_sender_ref @@ -280,14 +306,18 @@ impl MigrateStatsToV2 { // important_background_handles.clear(); // Finally also send a shutdown signal - match app_shutdown_sender.send(()) { - Err(x) => { - panic!("Could not send shutdown signal! {:?}", x); - } - _ => {} - }; - // Drop the background handle + drop(stat_sender); + // match app_shutdown_sender.send(()) { + // Err(x) => { + // panic!("Could not send shutdown signal! {:?}", x); + // } + // _ => {} + // }; + + // TODO: Should we also write a short verifier if the migration was successful (?) + + // Drop the background handle, wait for any tasks that are on-going while let Some(x) = important_background_handles.next().await { info!("Returned item is: {:?}", x); match x { diff --git a/web3_proxy/src/stats/mod.rs b/web3_proxy/src/stats/mod.rs index d11a53ae..f3e4776b 100644 --- a/web3_proxy/src/stats/mod.rs +++ b/web3_proxy/src/stats/mod.rs @@ -40,6 +40,7 @@ pub struct RpcQueryStats { pub error_response: bool, pub request_bytes: u64, /// if backend_requests is 0, there was a cache_hit + // pub frontend_request: u64, pub backend_requests: u64, pub response_bytes: u64, pub response_millis: u64, From 05463c7ee57ba881deaf06e523bc30bb27c0acf2 Mon Sep 17 00:00:00 2001 From: yenicelik Date: Thu, 30 Mar 2023 12:54:01 +0100 Subject: [PATCH 6/6] migration seems to work for the most part --- .../21-sql-migration-make-backup.sh | 1 + .../21-sql-migration-verify-test-queries.sql | 17 +++++++++++++++++ web3_proxy/src/app/mod.rs | 4 ++-- web3_proxy/src/app/ws.rs | 10 +++++----- .../bin/web3_proxy_cli/migrate_stats_to_v2.rs | 18 ++++++++++-------- web3_proxy/src/frontend/rpc_proxy_ws.rs | 2 +- web3_proxy/src/stats/mod.rs | 18 +++++++++--------- 7 files changed, 45 insertions(+), 25 deletions(-) create mode 100644 scripts/manual-tests/21-sql-migration-make-backup.sh create mode 100644 scripts/manual-tests/21-sql-migration-verify-test-queries.sql diff --git a/scripts/manual-tests/21-sql-migration-make-backup.sh b/scripts/manual-tests/21-sql-migration-make-backup.sh new file mode 100644 index 00000000..4a43d5a3 --- /dev/null +++ b/scripts/manual-tests/21-sql-migration-make-backup.sh @@ -0,0 +1 @@ +mysqldump -u root --password=dev_web3_proxy -h 127.0.0.1 --port 13306 diff --git a/scripts/manual-tests/21-sql-migration-verify-test-queries.sql b/scripts/manual-tests/21-sql-migration-verify-test-queries.sql new file mode 100644 index 00000000..248e4f5e --- /dev/null +++ b/scripts/manual-tests/21-sql-migration-verify-test-queries.sql @@ -0,0 +1,17 @@ +SELECT COUNT(*) FROM rpc_accounting WHERE migrated IS NULL; +UPDATE rpc_accounting SET migrated = NULL; + +SELECT SUM(frontend_requests) FROM rpc_accounting; +SELECT SUM(frontend_requests) FROM rpc_accounting_v2; + +SELECT SUM(backend_requests) FROM rpc_accounting; +SELECT SUM(backend_requests) FROM rpc_accounting_v2; + +SELECT SUM(sum_request_bytes) FROM rpc_accounting; +SELECT SUM(sum_request_bytes) FROM rpc_accounting_v2; + +SELECT SUM(sum_response_millis) FROM rpc_accounting; +SELECT SUM(sum_response_millis) FROM rpc_accounting_v2; + +SELECT SUM(sum_response_bytes) FROM rpc_accounting; +SELECT SUM(sum_response_bytes) FROM rpc_accounting_v2; diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 424bee93..1c32810c 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -1799,7 +1799,7 @@ impl Web3ProxyApp { if let Some(stat_sender) = self.stat_sender.as_ref() { let response_stat = RpcQueryStats::new( - method.to_string(), + Some(method.to_string()), authorization.clone(), request_metadata, response.num_bytes(), @@ -1822,7 +1822,7 @@ impl Web3ProxyApp { if let Some(stat_sender) = self.stat_sender.as_ref() { let response_stat = RpcQueryStats::new( - request_method, + Some(request_method), authorization.clone(), request_metadata, response.num_bytes(), diff --git a/web3_proxy/src/app/ws.rs b/web3_proxy/src/app/ws.rs index b69cdcc9..037e560b 100644 --- a/web3_proxy/src/app/ws.rs +++ b/web3_proxy/src/app/ws.rs @@ -96,7 +96,7 @@ impl Web3ProxyApp { if let Some(stat_sender) = stat_sender.as_ref() { let response_stat = RpcQueryStats::new( - "eth_subscription(newHeads)".to_string(), + Some("eth_subscription(newHeads)".to_string()), authorization.clone(), request_metadata.clone(), response_bytes, @@ -167,7 +167,7 @@ impl Web3ProxyApp { if let Some(stat_sender) = stat_sender.as_ref() { let response_stat = RpcQueryStats::new( - "eth_subscription(newPendingTransactions)".to_string(), + Some("eth_subscription(newPendingTransactions)".to_string()), authorization.clone(), request_metadata.clone(), response_bytes, @@ -243,7 +243,7 @@ impl Web3ProxyApp { if let Some(stat_sender) = stat_sender.as_ref() { let response_stat = RpcQueryStats::new( - "eth_subscription(newPendingFullTransactions)".to_string(), + Some("eth_subscription(newPendingFullTransactions)".to_string()), authorization.clone(), request_metadata.clone(), response_bytes, @@ -319,7 +319,7 @@ impl Web3ProxyApp { if let Some(stat_sender) = stat_sender.as_ref() { let response_stat = RpcQueryStats::new( - "eth_subscription(newPendingRawTransactions)".to_string(), + Some("eth_subscription(newPendingRawTransactions)".to_string()), authorization.clone(), request_metadata.clone(), response_bytes, @@ -350,7 +350,7 @@ impl Web3ProxyApp { if let Some(stat_sender) = self.stat_sender.as_ref() { let response_stat = RpcQueryStats::new( - request_json.method.clone(), + Some(request_json.method.clone()), authorization.clone(), request_metadata, response.num_bytes(), diff --git a/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs b/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs index 205e4bf7..385a6aa0 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs @@ -63,6 +63,8 @@ impl MigrateStatsToV2 { // let mut spawned_app = // Web3ProxyApp::spawn(top_config.clone(), 2, app_shutdown_sender.clone()).await?; + let number_of_rows_to_process_at_once = 500; + // we wouldn't really need this, but let's spawn this anyways // easier than debugging the rest I suppose let (app_shutdown_sender, _app_shutdown_receiver) = broadcast::channel(1); @@ -125,7 +127,7 @@ impl MigrateStatsToV2 { // (1) Load a batch of rows out of the old table until no more rows are left let old_records = rpc_accounting::Entity::find() .filter(rpc_accounting::Column::Migrated.is_null()) - .limit(2) + .limit(number_of_rows_to_process_at_once) .all(db_conn) .await?; if old_records.len() == 0 { @@ -137,7 +139,7 @@ impl MigrateStatsToV2 { // (2) Create request metadata objects to match the old data // Iterate through all old rows, and put them into the above objects. for x in old_records.iter() { - info!("Preparing for migration: {:?}", x); + // info!("Preparing for migration: {:?}", x); // TODO: Split up a single request into multiple requests ... // according to frontend-requests, backend-requests, etc. @@ -232,9 +234,9 @@ impl MigrateStatsToV2 { // (3) Send through a channel to a stat emitter // Send it to the stats sender if let Some(stat_sender_ref) = stat_sender.as_ref() { - info!("Method is: {:?}", x.clone().method.unwrap()); + // info!("Method is: {:?}", x.clone().method); let mut response_stat = RpcQueryStats::new( - x.clone().method.unwrap(), + x.clone().method, authorization.clone(), Arc::new(request_metadata), (int_response_bytes) @@ -247,13 +249,13 @@ impl MigrateStatsToV2 { x.period_datetime.timestamp(), // I suppose timestamp is millis as well ... should check this in the (prod) database int_backend_requests, ); - info!("Sending stats: {:?}", response_stat); + // info!("Sending stats: {:?}", response_stat); stat_sender_ref // .send(response_stat.into()) .send_async(response_stat.into()) .await .context("stat_sender sending response_stat")?; - info!("Send! {:?}", stat_sender); + // info!("Send! {:?}", stat_sender); } else { panic!("Stat sender was not spawned!"); } @@ -269,7 +271,6 @@ impl MigrateStatsToV2 { // (3) Await that all items are properly processed // TODO: Await all the background handles - info!("Waiting for a second until all is flushed"); // Only after this mark all the items as processed / completed @@ -294,6 +295,7 @@ impl MigrateStatsToV2 { info!("Update result is: {:?}", update_result); // (N-1) Mark the batch as migrated + // break; } info!( @@ -336,7 +338,7 @@ impl MigrateStatsToV2 { } } - info!("Here (?)"); + // info!("Here (?)"); Ok(()) } diff --git a/web3_proxy/src/frontend/rpc_proxy_ws.rs b/web3_proxy/src/frontend/rpc_proxy_ws.rs index 072ad854..968e5eca 100644 --- a/web3_proxy/src/frontend/rpc_proxy_ws.rs +++ b/web3_proxy/src/frontend/rpc_proxy_ws.rs @@ -400,7 +400,7 @@ async fn handle_socket_payload( if let Some(stat_sender) = app.stat_sender.as_ref() { let response_stat = RpcQueryStats::new( - json_request.method.clone(), + Some(json_request.method.clone()), authorization.clone(), request_metadata, response.num_bytes(), diff --git a/web3_proxy/src/stats/mod.rs b/web3_proxy/src/stats/mod.rs index f3e4776b..097b57e2 100644 --- a/web3_proxy/src/stats/mod.rs +++ b/web3_proxy/src/stats/mod.rs @@ -35,7 +35,7 @@ pub enum StatType { #[derive(Clone, Debug)] pub struct RpcQueryStats { pub authorization: Arc, - pub method: String, + pub method: Option, pub archive_request: bool, pub error_response: bool, pub request_bytes: u64, @@ -97,7 +97,7 @@ impl RpcQueryStats { TrackingLevel::Detailed => { // detailed tracking keeps track of the method and origin // depending on the request, the origin might still be None - let method = Some(self.method.clone()); + let method = self.method.clone(); let origin = self.authorization.origin.clone(); (method, origin) @@ -117,7 +117,7 @@ impl RpcQueryStats { /// all queries are aggregated /// TODO: should we store "anon" or "registered" as a key just to be able to split graphs? fn global_timeseries_key(&self) -> RpcQueryKey { - let method = Some(self.method.clone()); + let method = self.method.clone(); // we don't store origin in the timeseries db. its only used for optional accounting let origin = None; // everyone gets grouped together @@ -141,7 +141,7 @@ impl RpcQueryStats { TrackingLevel::None => { // this RPC key requested no tracking. this is the default. // we still want graphs though, so we just use None as the rpc_secret_key_id - (Some(self.method.clone()), None) + (self.method.clone(), None) } TrackingLevel::Aggregated => { // this RPC key requested tracking aggregated across all methods @@ -150,7 +150,7 @@ impl RpcQueryStats { TrackingLevel::Detailed => { // detailed tracking keeps track of the method ( - Some(self.method.clone()), + self.method.clone(), self.authorization.checks.rpc_secret_key_id, ) } @@ -362,7 +362,7 @@ impl BufferedRpcQueryStats { impl RpcQueryStats { pub fn new( - method: String, + method: Option, authorization: Arc, metadata: Arc, response_bytes: usize, @@ -462,7 +462,7 @@ impl StatBuffer { loop { tokio::select! { stat = stat_receiver.recv_async() => { - info!("Received stat"); + // info!("Received stat"); // save the stat to a buffer match stat { Ok(AppStat::RpcQuery(stat)) => { @@ -489,7 +489,7 @@ impl StatBuffer { } } _ = db_save_interval.tick() => { - info!("DB save internal tick"); + // info!("DB save internal tick"); let db_conn = self.db_conn.as_ref().expect("db connection should always exist if there are buffered stats"); // TODO: batch saves @@ -501,7 +501,7 @@ impl StatBuffer { } } _ = tsdb_save_interval.tick() => { - info!("TSDB save internal tick"); + // info!("TSDB save internal tick"); // TODO: batch saves // TODO: better bucket names let influxdb_client = self.influxdb_client.as_ref().expect("influxdb client should always exist if there are buffered stats");