creating a CLI endpoint for the migration

This commit is contained in:
yenicelik 2023-03-21 12:07:21 +01:00
parent 9beddc43a4
commit 64505102ef
2 changed files with 69 additions and 2 deletions

@ -40,6 +40,7 @@ pub struct Model {
pub max_response_bytes: u64,
pub archive_request: bool,
pub origin: Option<String>,
pub migrated: Option<DateTime>
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]

@ -1,6 +1,7 @@
use std::net::IpAddr;
use anyhow::Context;
use argh::FromArgs;
use entities::{rpc_accounting, rpc_accounting_v2, user};
use entities::{rpc_key, rpc_accounting, rpc_accounting_v2, user};
use ethers::types::Address;
use hashbrown::HashMap;
use log::{debug, info, warn};
@ -8,6 +9,8 @@ use migration::sea_orm::{
self, ActiveModelTrait, ColumnTrait, DatabaseConnection, EntityTrait, IntoActiveModel,
QueryFilter, QuerySelect
};
use web3_proxy::app::AuthorizationChecks;
use web3_proxy::frontend::authorization::{Authorization, AuthorizationType, RequestMetadata};
use web3_proxy::stats::{BufferedRpcQueryStats, RpcQueryKey};
/// change a user's address.
@ -21,7 +24,11 @@ impl MigrateStatsToV2 {
while true {
// (1) Load a batch of rows out of the old table until no more rows are left
let old_records = rpc_accounting::Entity::find().limit(10000).all(db_conn).await?;
let old_records = rpc_accounting::Entity::find()
.filter(rpc_accounting::Column::Migrated.is_null())
.limit(10000)
.all(db_conn)
.await?;
if old_records.len() == 0 {
// Break out of while loop once all records have successfully been migrated ...
warn!("All records seem to have been successfully migrated!");
@ -38,6 +45,62 @@ impl MigrateStatsToV2 {
info!("Preparing for migration: {:?}", x);
// TODO: Split up a single request into multiple requests ...
// according to frontend-requests, backend-requests, etc.
// Get the rpc-key from the rpc_key_id
// Get the user-id from the rpc_key_id
let rpc_key_obj = rpc_key::Entity::find()
.filter(rpc_key::Column::id.eq(x.rpc_key_id))
.one(db_conn)
.await?;
// TODO: Create authrization
// We can probably also randomly generate this, as we don't care about the user (?)
let authorization_checks = AuthorizationChecks {
user_id: rpc_key_id.user_id,
rpc_secret_key: rpc_key_id.secret_key,
rpc_secret_key_id: x.rpc_key_id,
..Default::default()
};
// Then overwrite rpc_key_id and user_id (?)
let authorization_type = AuthorizationType::Frontend;
let authorization = Authorization::try_new(
authorization_checks,
None,
IpAddr::default(),
None,
None,
None,
authorization_type
);
// It will be like a fork basically (to simulate getting multiple single requests ...)
// Iterate through all frontend requests
// For each frontend request, create one object that will be emitted (make sure the timestamp is new)
// TODO: Create RequestMetadata
let request_metadata = RequestMetadata {
start_instant: x.period_datetime.timestamp(),
request_bytes: x.request_bytes,
archive_request: x.archive_request,
backend_requests: todo!(),
no_servers: 0, // This is not relevant in the new version
error_response: x.error_response,
response_bytes: todo!(),
response_millis: todo!(),
response_from_backup_rpc: todo!() // I think we did not record this back then // Default::default()
};
// I suppose method cannot be empty ... (?)
// Ok, now we basically create a state event ..
RpcQueryStats::new(
x.method.unwrap(),
authorization
request_metadata,
x.response_bytes
)
// // For each of the old rows, create a (i) RpcQueryKey and a matching BufferedRpcQueryStats object
// let key = RpcQueryKey {
// response_timestamp: x.period_datetime.timestamp(),
@ -77,6 +140,9 @@ impl MigrateStatsToV2 {
// // BufferedRpcQueryStats
}
// (N-1) Mark the batch as migrated
break;
}