diff --git a/scripts/manual-tests/21-sql-migration-make-backup.sh b/scripts/manual-tests/21-sql-migration-make-backup.sh new file mode 100644 index 00000000..4a43d5a3 --- /dev/null +++ b/scripts/manual-tests/21-sql-migration-make-backup.sh @@ -0,0 +1 @@ +mysqldump -u root --password=dev_web3_proxy -h 127.0.0.1 --port 13306 diff --git a/scripts/manual-tests/21-sql-migration-verify-test-queries.sql b/scripts/manual-tests/21-sql-migration-verify-test-queries.sql new file mode 100644 index 00000000..248e4f5e --- /dev/null +++ b/scripts/manual-tests/21-sql-migration-verify-test-queries.sql @@ -0,0 +1,17 @@ +SELECT COUNT(*) FROM rpc_accounting WHERE migrated IS NULL; +UPDATE rpc_accounting SET migrated = NULL; + +SELECT SUM(frontend_requests) FROM rpc_accounting; +SELECT SUM(frontend_requests) FROM rpc_accounting_v2; + +SELECT SUM(backend_requests) FROM rpc_accounting; +SELECT SUM(backend_requests) FROM rpc_accounting_v2; + +SELECT SUM(sum_request_bytes) FROM rpc_accounting; +SELECT SUM(sum_request_bytes) FROM rpc_accounting_v2; + +SELECT SUM(sum_response_millis) FROM rpc_accounting; +SELECT SUM(sum_response_millis) FROM rpc_accounting_v2; + +SELECT SUM(sum_response_bytes) FROM rpc_accounting; +SELECT SUM(sum_response_bytes) FROM rpc_accounting_v2; diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 424bee93..1c32810c 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -1799,7 +1799,7 @@ impl Web3ProxyApp { if let Some(stat_sender) = self.stat_sender.as_ref() { let response_stat = RpcQueryStats::new( - method.to_string(), + Some(method.to_string()), authorization.clone(), request_metadata, response.num_bytes(), @@ -1822,7 +1822,7 @@ impl Web3ProxyApp { if let Some(stat_sender) = self.stat_sender.as_ref() { let response_stat = RpcQueryStats::new( - request_method, + Some(request_method), authorization.clone(), request_metadata, response.num_bytes(), diff --git a/web3_proxy/src/app/ws.rs b/web3_proxy/src/app/ws.rs index b69cdcc9..037e560b 100644 --- a/web3_proxy/src/app/ws.rs +++ b/web3_proxy/src/app/ws.rs @@ -96,7 +96,7 @@ impl Web3ProxyApp { if let Some(stat_sender) = stat_sender.as_ref() { let response_stat = RpcQueryStats::new( - "eth_subscription(newHeads)".to_string(), + Some("eth_subscription(newHeads)".to_string()), authorization.clone(), request_metadata.clone(), response_bytes, @@ -167,7 +167,7 @@ impl Web3ProxyApp { if let Some(stat_sender) = stat_sender.as_ref() { let response_stat = RpcQueryStats::new( - "eth_subscription(newPendingTransactions)".to_string(), + Some("eth_subscription(newPendingTransactions)".to_string()), authorization.clone(), request_metadata.clone(), response_bytes, @@ -243,7 +243,7 @@ impl Web3ProxyApp { if let Some(stat_sender) = stat_sender.as_ref() { let response_stat = RpcQueryStats::new( - "eth_subscription(newPendingFullTransactions)".to_string(), + Some("eth_subscription(newPendingFullTransactions)".to_string()), authorization.clone(), request_metadata.clone(), response_bytes, @@ -319,7 +319,7 @@ impl Web3ProxyApp { if let Some(stat_sender) = stat_sender.as_ref() { let response_stat = RpcQueryStats::new( - "eth_subscription(newPendingRawTransactions)".to_string(), + Some("eth_subscription(newPendingRawTransactions)".to_string()), authorization.clone(), request_metadata.clone(), response_bytes, @@ -350,7 +350,7 @@ impl Web3ProxyApp { if let Some(stat_sender) = self.stat_sender.as_ref() { let response_stat = RpcQueryStats::new( - request_json.method.clone(), + Some(request_json.method.clone()), authorization.clone(), request_metadata, response.num_bytes(), diff --git a/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs b/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs index 205e4bf7..385a6aa0 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs @@ -63,6 +63,8 @@ impl MigrateStatsToV2 { // let mut spawned_app = // Web3ProxyApp::spawn(top_config.clone(), 2, app_shutdown_sender.clone()).await?; + let number_of_rows_to_process_at_once = 500; + // we wouldn't really need this, but let's spawn this anyways // easier than debugging the rest I suppose let (app_shutdown_sender, _app_shutdown_receiver) = broadcast::channel(1); @@ -125,7 +127,7 @@ impl MigrateStatsToV2 { // (1) Load a batch of rows out of the old table until no more rows are left let old_records = rpc_accounting::Entity::find() .filter(rpc_accounting::Column::Migrated.is_null()) - .limit(2) + .limit(number_of_rows_to_process_at_once) .all(db_conn) .await?; if old_records.len() == 0 { @@ -137,7 +139,7 @@ impl MigrateStatsToV2 { // (2) Create request metadata objects to match the old data // Iterate through all old rows, and put them into the above objects. for x in old_records.iter() { - info!("Preparing for migration: {:?}", x); + // info!("Preparing for migration: {:?}", x); // TODO: Split up a single request into multiple requests ... // according to frontend-requests, backend-requests, etc. @@ -232,9 +234,9 @@ impl MigrateStatsToV2 { // (3) Send through a channel to a stat emitter // Send it to the stats sender if let Some(stat_sender_ref) = stat_sender.as_ref() { - info!("Method is: {:?}", x.clone().method.unwrap()); + // info!("Method is: {:?}", x.clone().method); let mut response_stat = RpcQueryStats::new( - x.clone().method.unwrap(), + x.clone().method, authorization.clone(), Arc::new(request_metadata), (int_response_bytes) @@ -247,13 +249,13 @@ impl MigrateStatsToV2 { x.period_datetime.timestamp(), // I suppose timestamp is millis as well ... should check this in the (prod) database int_backend_requests, ); - info!("Sending stats: {:?}", response_stat); + // info!("Sending stats: {:?}", response_stat); stat_sender_ref // .send(response_stat.into()) .send_async(response_stat.into()) .await .context("stat_sender sending response_stat")?; - info!("Send! {:?}", stat_sender); + // info!("Send! {:?}", stat_sender); } else { panic!("Stat sender was not spawned!"); } @@ -269,7 +271,6 @@ impl MigrateStatsToV2 { // (3) Await that all items are properly processed // TODO: Await all the background handles - info!("Waiting for a second until all is flushed"); // Only after this mark all the items as processed / completed @@ -294,6 +295,7 @@ impl MigrateStatsToV2 { info!("Update result is: {:?}", update_result); // (N-1) Mark the batch as migrated + // break; } info!( @@ -336,7 +338,7 @@ impl MigrateStatsToV2 { } } - info!("Here (?)"); + // info!("Here (?)"); Ok(()) } diff --git a/web3_proxy/src/frontend/rpc_proxy_ws.rs b/web3_proxy/src/frontend/rpc_proxy_ws.rs index 072ad854..968e5eca 100644 --- a/web3_proxy/src/frontend/rpc_proxy_ws.rs +++ b/web3_proxy/src/frontend/rpc_proxy_ws.rs @@ -400,7 +400,7 @@ async fn handle_socket_payload( if let Some(stat_sender) = app.stat_sender.as_ref() { let response_stat = RpcQueryStats::new( - json_request.method.clone(), + Some(json_request.method.clone()), authorization.clone(), request_metadata, response.num_bytes(), diff --git a/web3_proxy/src/stats/mod.rs b/web3_proxy/src/stats/mod.rs index f3e4776b..097b57e2 100644 --- a/web3_proxy/src/stats/mod.rs +++ b/web3_proxy/src/stats/mod.rs @@ -35,7 +35,7 @@ pub enum StatType { #[derive(Clone, Debug)] pub struct RpcQueryStats { pub authorization: Arc, - pub method: String, + pub method: Option, pub archive_request: bool, pub error_response: bool, pub request_bytes: u64, @@ -97,7 +97,7 @@ impl RpcQueryStats { TrackingLevel::Detailed => { // detailed tracking keeps track of the method and origin // depending on the request, the origin might still be None - let method = Some(self.method.clone()); + let method = self.method.clone(); let origin = self.authorization.origin.clone(); (method, origin) @@ -117,7 +117,7 @@ impl RpcQueryStats { /// all queries are aggregated /// TODO: should we store "anon" or "registered" as a key just to be able to split graphs? fn global_timeseries_key(&self) -> RpcQueryKey { - let method = Some(self.method.clone()); + let method = self.method.clone(); // we don't store origin in the timeseries db. its only used for optional accounting let origin = None; // everyone gets grouped together @@ -141,7 +141,7 @@ impl RpcQueryStats { TrackingLevel::None => { // this RPC key requested no tracking. this is the default. // we still want graphs though, so we just use None as the rpc_secret_key_id - (Some(self.method.clone()), None) + (self.method.clone(), None) } TrackingLevel::Aggregated => { // this RPC key requested tracking aggregated across all methods @@ -150,7 +150,7 @@ impl RpcQueryStats { TrackingLevel::Detailed => { // detailed tracking keeps track of the method ( - Some(self.method.clone()), + self.method.clone(), self.authorization.checks.rpc_secret_key_id, ) } @@ -362,7 +362,7 @@ impl BufferedRpcQueryStats { impl RpcQueryStats { pub fn new( - method: String, + method: Option, authorization: Arc, metadata: Arc, response_bytes: usize, @@ -462,7 +462,7 @@ impl StatBuffer { loop { tokio::select! { stat = stat_receiver.recv_async() => { - info!("Received stat"); + // info!("Received stat"); // save the stat to a buffer match stat { Ok(AppStat::RpcQuery(stat)) => { @@ -489,7 +489,7 @@ impl StatBuffer { } } _ = db_save_interval.tick() => { - info!("DB save internal tick"); + // info!("DB save internal tick"); let db_conn = self.db_conn.as_ref().expect("db connection should always exist if there are buffered stats"); // TODO: batch saves @@ -501,7 +501,7 @@ impl StatBuffer { } } _ = tsdb_save_interval.tick() => { - info!("TSDB save internal tick"); + // info!("TSDB save internal tick"); // TODO: batch saves // TODO: better bucket names let influxdb_client = self.influxdb_client.as_ref().expect("influxdb client should always exist if there are buffered stats");