From 9418d72b8b013ed6eb5f63bb690f03e5b486e424 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Tue, 22 Nov 2022 01:27:26 +0000 Subject: [PATCH] seems to work --- .../src/bin/web3_proxy_cli/user_export.rs | 4 +- .../src/bin/web3_proxy_cli/user_import.rs | 108 ++++++++++++++---- 2 files changed, 90 insertions(+), 22 deletions(-) diff --git a/web3_proxy/src/bin/web3_proxy_cli/user_export.rs b/web3_proxy/src/bin/web3_proxy_cli/user_export.rs index 0b491f28..aea08485 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/user_export.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/user_export.rs @@ -26,7 +26,7 @@ impl UserExportSubCommand { let export_dir = Path::new(&self.output_dir); // get all the users from the database (paged) - let mut user_pages = user::Entity::find().into_json().paginate(db_conn, 1000); + let mut user_pages = user::Entity::find().paginate(db_conn, 1000); // TODO: for now all user_tier tables match in all databases, but in the future we might need to export/import this @@ -50,7 +50,7 @@ impl UserExportSubCommand { ); // get all the rpc keys from the database (paged) - let mut rpc_key_pages = rpc_key::Entity::find().into_json().paginate(db_conn, 1000); + let mut rpc_key_pages = rpc_key::Entity::find().paginate(db_conn, 1000); let mut rpc_key_file_count = 0; while let Some(rpc_keys) = rpc_key_pages.fetch_and_next().await? { diff --git a/web3_proxy/src/bin/web3_proxy_cli/user_import.rs b/web3_proxy/src/bin/web3_proxy_cli/user_import.rs index 4302f7b8..9e6545a3 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/user_import.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/user_import.rs @@ -1,9 +1,16 @@ +use anyhow::Context; use argh::FromArgs; +use entities::{rpc_key, user}; use glob::glob; use hashbrown::HashMap; use log::{info, warn}; -use migration::sea_orm::DatabaseConnection; +use migration::sea_orm::ActiveValue::NotSet; +use migration::sea_orm::{ + ActiveModelTrait, ColumnTrait, DatabaseConnection, EntityTrait, IntoActiveModel, QueryFilter, + Set, +}; use std::path::{Path, PathBuf}; +use std::{fs::File, io::BufReader}; #[derive(FromArgs, PartialEq, Eq, Debug)] /// Import users from another database. @@ -59,7 +66,7 @@ impl UserImportSubCommand { } info!( - "Imported {} user(s) from {} file(s). {} users mapped.", + "Imported {} user(s) from {} file(s). {} user(s) mapped.", imported_user_count, user_file_count, user_map.len() @@ -105,16 +112,46 @@ impl UserImportSubCommand { path: PathBuf, user_map: &mut UserMap, ) -> anyhow::Result { - let count = 0; + let mut count = 0; + // TODO: do this all inside a database transaction? - // for each file in the path, read as json - // -- for each entry in the json - // ---- let user_id = if user is in the database - // ------ add user to the database - // ---- else - // ------ add user to the database - // ---- add user to the map. - todo!() + + // TODO: do this with async things from tokio + let file = File::open(path)?; + let reader = BufReader::new(file); + + // Read the JSON contents of the file as an instance of `User` + let us = serde_json::from_reader::<_, Vec>(reader)?; + + for import_u in us.into_iter() { + // first, check if a user already exists with this address + if let Some(existing_u) = user::Entity::find() + .filter(user::Column::Address.eq(import_u.address.clone())) + .one(db_conn) + .await? + { + user_map.insert(import_u.id, existing_u.id); + + // don't increment count because the user already existed + } else { + // user address is not known to the local database. no existing_u + let import_id = import_u.id; + + let mut new_u = import_u.into_active_model(); + + new_u.id = NotSet; + + let new_u = new_u.save(db_conn).await?; + + let new_id = *new_u.id.as_ref(); + + user_map.insert(import_id, new_id); + + count += 1; + } + } + + Ok(count) } pub async fn import_rpc_key_file( @@ -123,14 +160,45 @@ impl UserImportSubCommand { path: PathBuf, user_map: &UserMap, ) -> anyhow::Result { - let count = 0; - // TODO: do this all inside a database transaction? - // for each file in the path, read as json - // -- for each entry in the json - // ---- let rpc_key_id = if rpc_key is in the database - // ------ continue - // ---- else - // ------ add rpc_key to the database - todo!() + let mut count = 0; + + // TODO: do this with async things from tokio + let file = File::open(path)?; + let reader = BufReader::new(file); + + // Read the JSON contents of the file as an instance of `User` + let rks = serde_json::from_reader::<_, Vec>(reader)?; + + for import_rk in rks.into_iter() { + let mapped_id = *user_map + .get(&import_rk.user_id) + .context("user mapping required")?; + + if let Some(existing_rk) = rpc_key::Entity::find() + .filter(rpc_key::Column::SecretKey.eq(import_rk.secret_key)) + .one(db_conn) + .await? + { + // make sure it belongs to the mapped user + if existing_rk.user_id != mapped_id { + // TODO: error or import the rest? + return Err(anyhow::anyhow!("unexpected user id")); + } + + // the key exists under the expected user. we are good to continue + } else { + // user address is not known to the local database. no existing_rk + let mut new_rk = import_rk.into_active_model(); + + new_rk.id = NotSet; + new_rk.user_id = Set(mapped_id); + + new_rk.save(db_conn).await?; + + count += 1; + } + } + + Ok(count) } }