diff --git a/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs b/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs index 525536aa..12c70ade 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs @@ -1,12 +1,14 @@ use anyhow::Context; use argh::FromArgs; -use entities::user; +use entities::{rpc_accounting, rpc_accounting_v2, user}; use ethers::types::Address; -use log::{debug, info}; +use hashbrown::HashMap; +use log::{debug, info, warn}; use migration::sea_orm::{ self, ActiveModelTrait, ColumnTrait, DatabaseConnection, EntityTrait, IntoActiveModel, - QueryFilter, + QueryFilter, QuerySelect }; +use web3_proxy::stats::{BufferedRpcQueryStats, RpcQueryKey}; /// change a user's address. #[derive(FromArgs, PartialEq, Eq, Debug)] @@ -16,9 +18,77 @@ pub struct MigrateStatsToV2 {} impl MigrateStatsToV2 { pub async fn main(self, db_conn: &DatabaseConnection) -> anyhow::Result<()> { - // (1) Load a batch of rows out of the old table + while true { + + // (1) Load a batch of rows out of the old table until no more rows are left + let old_records = rpc_accounting::Entity::find().limit(10000).all(db_conn).await?; + if old_records.len() == 0 { + // Break out of while loop once all records have successfully been migrated ... + warn!("All records seem to have been successfully migrated!"); + break; + } + + // (2) Create request metadata objects to match the old data + let mut global_timeseries_buffer = HashMap::::new(); + let mut opt_in_timeseries_buffer = HashMap::::new(); + let mut accounting_db_buffer = HashMap::::new(); + + // Iterate through all old rows, and put them into the above objects. + for x in old_records { + + // For each of the old rows, create a (i) RpcQueryKey and a matching BufferedRpcQueryStats object + let key = RpcQueryKey { + response_timestamp: x.period_datetime.timestamp(), + archive_needed: x.archive_needed, + error_response: x.error_response, + period_datetime: x.period_datetime.timestamp(), + rpc_secret_key_id: x.rpc_key_id, + origin: x.origin, + method: x.method + }; + + // Create the corresponding BufferedRpcQueryStats object + let val = BufferedRpcQueryStats { + frontend_requests: x.frontend_requests, + backend_requests: x.backend_requests, + backend_retries: x.backend_retries, + no_servers: x.no_servers, + cache_misses: x.cache_misses, + cache_hits: x.cache_hits, + sum_request_bytes: x.sum_request_bytes, + sum_response_bytes: x.sum_response_bytes, + sum_response_millis: x.sum_response_millis + }; + + // TODO: Create authorization, request metadata, and bytes ... but bytes we don't really keep track of! + // We can generate dummy bytes of the same length though, this may work as well + + // TODO: Period datetime is also a question of what it is + // let response_stat = RpcQueryStats::new( + // x.method, + // authorization.clone(), + // request_metadata.clone(), + // response_bytes, + // x.period_datetime + // ); + + // BufferedRpcQueryStats + + + + + } + + + + + + + } + + + - // (2) Create request metadata objects to match the old data // (3) Update the batch in the old table with the current timestamp diff --git a/web3_proxy/src/stats/mod.rs b/web3_proxy/src/stats/mod.rs index e8f99154..99cb77df 100644 --- a/web3_proxy/src/stats/mod.rs +++ b/web3_proxy/src/stats/mod.rs @@ -30,23 +30,24 @@ pub enum StatType { Detailed, } +// Pub is needed for migration ... I could also write a second constructor for this if needed /// TODO: better name? #[derive(Clone, Debug)] pub struct RpcQueryStats { - authorization: Arc, - method: String, - archive_request: bool, - error_response: bool, - request_bytes: u64, + pub authorization: Arc, + pub method: String, + pub archive_request: bool, + pub error_response: bool, + pub request_bytes: u64, /// if backend_requests is 0, there was a cache_hit - backend_requests: u64, - response_bytes: u64, - response_millis: u64, - response_timestamp: i64, + pub backend_requests: u64, + pub response_bytes: u64, + pub response_millis: u64, + pub response_timestamp: i64, } #[derive(Clone, From, Hash, PartialEq, Eq)] -struct RpcQueryKey { +pub struct RpcQueryKey { /// unix epoch time /// for the time series db, this is (close to) the time that the response was sent /// for the account database, this is rounded to the week @@ -167,15 +168,15 @@ impl RpcQueryStats { #[derive(Default)] pub struct BufferedRpcQueryStats { - frontend_requests: u64, - backend_requests: u64, - backend_retries: u64, - no_servers: u64, - cache_misses: u64, - cache_hits: u64, - sum_request_bytes: u64, - sum_response_bytes: u64, - sum_response_millis: u64, + pub frontend_requests: u64, + pub backend_requests: u64, + pub backend_retries: u64, + pub no_servers: u64, + pub cache_misses: u64, + pub cache_hits: u64, + pub sum_request_bytes: u64, + pub sum_response_bytes: u64, + pub sum_response_millis: u64, } /// A stat that we aggregate and then store in a database. @@ -363,7 +364,7 @@ impl RpcQueryStats { method: String, authorization: Arc, metadata: Arc, - response_bytes: usize, + response_bytes: usize ) -> Self { // TODO: try_unwrap the metadata to be sure that all the stats for this request have been collected // TODO: otherwise, i think the whole thing should be in a single lock that we can "reset" when a stat is created @@ -389,6 +390,7 @@ impl RpcQueryStats { response_timestamp, } } + } impl StatBuffer {