diff --git a/entities/src/rpc_accounting.rs b/entities/src/rpc_accounting.rs index a615e6ff..70c79ab3 100644 --- a/entities/src/rpc_accounting.rs +++ b/entities/src/rpc_accounting.rs @@ -40,6 +40,7 @@ pub struct Model { pub max_response_bytes: u64, pub archive_request: bool, pub origin: Option, + pub migrated: Option } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] diff --git a/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs b/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs index 88d19dde..137ae3b9 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs @@ -1,6 +1,7 @@ +use std::net::IpAddr; use anyhow::Context; use argh::FromArgs; -use entities::{rpc_accounting, rpc_accounting_v2, user}; +use entities::{rpc_key, rpc_accounting, rpc_accounting_v2, user}; use ethers::types::Address; use hashbrown::HashMap; use log::{debug, info, warn}; @@ -8,6 +9,8 @@ use migration::sea_orm::{ self, ActiveModelTrait, ColumnTrait, DatabaseConnection, EntityTrait, IntoActiveModel, QueryFilter, QuerySelect }; +use web3_proxy::app::AuthorizationChecks; +use web3_proxy::frontend::authorization::{Authorization, AuthorizationType, RequestMetadata}; use web3_proxy::stats::{BufferedRpcQueryStats, RpcQueryKey}; /// change a user's address. @@ -21,7 +24,11 @@ impl MigrateStatsToV2 { while true { // (1) Load a batch of rows out of the old table until no more rows are left - let old_records = rpc_accounting::Entity::find().limit(10000).all(db_conn).await?; + let old_records = rpc_accounting::Entity::find() + .filter(rpc_accounting::Column::Migrated.is_null()) + .limit(10000) + .all(db_conn) + .await?; if old_records.len() == 0 { // Break out of while loop once all records have successfully been migrated ... warn!("All records seem to have been successfully migrated!"); @@ -38,6 +45,62 @@ impl MigrateStatsToV2 { info!("Preparing for migration: {:?}", x); + // TODO: Split up a single request into multiple requests ... + // according to frontend-requests, backend-requests, etc. + + // Get the rpc-key from the rpc_key_id + // Get the user-id from the rpc_key_id + let rpc_key_obj = rpc_key::Entity::find() + .filter(rpc_key::Column::id.eq(x.rpc_key_id)) + .one(db_conn) + .await?; + + // TODO: Create authrization + // We can probably also randomly generate this, as we don't care about the user (?) + let authorization_checks = AuthorizationChecks { + user_id: rpc_key_id.user_id, + rpc_secret_key: rpc_key_id.secret_key, + rpc_secret_key_id: x.rpc_key_id, + ..Default::default() + }; + // Then overwrite rpc_key_id and user_id (?) + let authorization_type = AuthorizationType::Frontend; + let authorization = Authorization::try_new( + authorization_checks, + None, + IpAddr::default(), + None, + None, + None, + authorization_type + ); + + // It will be like a fork basically (to simulate getting multiple single requests ...) + // Iterate through all frontend requests + // For each frontend request, create one object that will be emitted (make sure the timestamp is new) + + // TODO: Create RequestMetadata + let request_metadata = RequestMetadata { + start_instant: x.period_datetime.timestamp(), + request_bytes: x.request_bytes, + archive_request: x.archive_request, + backend_requests: todo!(), + no_servers: 0, // This is not relevant in the new version + error_response: x.error_response, + response_bytes: todo!(), + response_millis: todo!(), + response_from_backup_rpc: todo!() // I think we did not record this back then // Default::default() + }; + + // I suppose method cannot be empty ... (?) + // Ok, now we basically create a state event .. + RpcQueryStats::new( + x.method.unwrap(), + authorization + request_metadata, + x.response_bytes + ) + // // For each of the old rows, create a (i) RpcQueryKey and a matching BufferedRpcQueryStats object // let key = RpcQueryKey { // response_timestamp: x.period_datetime.timestamp(), @@ -77,6 +140,9 @@ impl MigrateStatsToV2 { // // BufferedRpcQueryStats } + // (N-1) Mark the batch as migrated + break; + }