From f66edd961b7f8c4bcfdccae6ac8c95981d2644df Mon Sep 17 00:00:00 2001 From: David Date: Sun, 4 Jun 2023 18:32:53 +0200 Subject: [PATCH] Making Balance Changes Atomic (Read and Write) (#84) * tried to make the balance_entry atomic, asking for feedback * added locked select everywhere and inside the txn, it should be atomic now * forgot about overwrite (referee vs sender entities were the same, overwriting each other), fixed it * will now trigger refreshing cache when balance thresholds are met * Revert "will now trigger refreshing cache when balance thresholds are met" This reverts commit b1215ddc43a1358f6086bddbd106996165431165. * will not modify userTier in database anymore * made payment atomic * made updates mostly atomic, locking only the referral table (bcs more complex logic) * latest changes --- web3_proxy/src/app/mod.rs | 2 +- web3_proxy/src/frontend/admin.rs | 3 +- web3_proxy/src/frontend/users/payment.rs | 2 +- web3_proxy/src/stats/mod.rs | 355 +++++++++++------------ web3_proxy/src/stats/stat_buffer.rs | 3 - 5 files changed, 166 insertions(+), 199 deletions(-) diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index f7009192..302d9bfe 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -1030,7 +1030,7 @@ impl Web3ProxyApp { let mut collected_rpcs: Vec> = vec![]; for response in responses { // TODO: any way to attach the tried rpcs to the error? it is likely helpful - let (status_code, response, rpcs) = response; + let (_status_code, response, rpcs) = response; collected.push(response); collected_rpcs.extend(rpcs.into_iter().filter(|x| { diff --git a/web3_proxy/src/frontend/admin.rs b/web3_proxy/src/frontend/admin.rs index d361c3ca..628f9263 100644 --- a/web3_proxy/src/frontend/admin.rs +++ b/web3_proxy/src/frontend/admin.rs @@ -19,7 +19,7 @@ use axum_macros::debug_handler; use chrono::{TimeZone, Utc}; use entities::{ admin, admin_increase_balance_receipt, admin_trail, balance, login, pending_login, rpc_key, - user, user_tier, + user, }; use ethers::{prelude::Address, types::Bytes}; use hashbrown::HashMap; @@ -124,7 +124,6 @@ pub async fn admin_increase_balance( user_id: sea_orm::Set(user_entry.id), ..Default::default() }; - balance::Entity::insert(balance_entry) .on_conflict( OnConflict::new() diff --git a/web3_proxy/src/frontend/users/payment.rs b/web3_proxy/src/frontend/users/payment.rs index e64c4b9a..1b8772dc 100644 --- a/web3_proxy/src/frontend/users/payment.rs +++ b/web3_proxy/src/frontend/users/payment.rs @@ -144,7 +144,7 @@ pub async fn user_balance_post( .into_response(); return Ok(response); - } + }; // get the transaction receipt let transaction_receipt = app diff --git a/web3_proxy/src/stats/mod.rs b/web3_proxy/src/stats/mod.rs index 719d8519..7ddace3f 100644 --- a/web3_proxy/src/stats/mod.rs +++ b/web3_proxy/src/stats/mod.rs @@ -3,8 +3,8 @@ pub mod db_queries; pub mod influxdb_queries; mod stat_buffer; - pub use stat_buffer::{SpawnedStatBuffer, StatBuffer}; +use std::cmp; use crate::app::RpcSecretKeyCache; use crate::errors::{Web3ProxyError, Web3ProxyResult}; @@ -15,18 +15,18 @@ use axum::headers::Origin; use chrono::{DateTime, Months, TimeZone, Utc}; use derive_more::From; use entities::sea_orm_active_enums::TrackingLevel; -use entities::{balance, referee, referrer, rpc_accounting_v2, rpc_key, user, user_tier}; +use entities::{balance, referee, referrer, rpc_accounting_v2, rpc_key, user}; use influxdb2::models::DataPoint; -use log::{error, trace, warn}; +use log::trace; use migration::sea_orm::prelude::Decimal; +use migration::sea_orm::QuerySelect; use migration::sea_orm::{ self, ActiveModelTrait, ColumnTrait, DatabaseConnection, EntityTrait, IntoActiveModel, - QueryFilter, + QueryFilter, TransactionTrait, }; -use migration::{Expr, OnConflict}; +use migration::{Expr, LockType, OnConflict}; use num_traits::ToPrimitive; use parking_lot::Mutex; -use std::cmp::max; use std::num::NonZeroU64; use std::sync::atomic::{self, Ordering}; use std::sync::Arc; @@ -240,7 +240,9 @@ impl BufferedRpcQueryStats { let period_datetime = Utc.timestamp_opt(key.response_timestamp, 0).unwrap(); - // this is a lot of variables + // =============================== // + // UPDATE STATISTICS // + // =============================== // let accounting_entry = rpc_accounting_v2::ActiveModel { id: sea_orm::NotSet, rpc_key_id: sea_orm::Set(key.rpc_secret_key_id.map(Into::into)), @@ -318,35 +320,28 @@ impl BufferedRpcQueryStats { .exec(db_conn) .await?; - // TODO: Refactor this function a bit more just so it looks and feels nicer - // TODO: Figure out how to go around unmatching, it shouldn't return an error, but this is disgusting - - // All the referral & balance arithmetic takes place here + // =============================== // + // PREPARE FOR UPDATE USER BALANCE // + // =============================== // let rpc_secret_key_id: u64 = match key.rpc_secret_key_id { Some(x) => x.into(), // Return early if the RPC key is not found, because then it is an anonymous user None => return Ok(()), }; - // (1) Get the user with that RPC key. This is the referee - let sender_rpc_key = rpc_key::Entity::find() - .filter(rpc_key::Column::Id.eq(rpc_secret_key_id)) - .one(db_conn) - .await?; + // =============================== // + // GET ALL (STATIC) VARIABLES // + // =============================== // + // Get the user with that RPC key. This is also the referee - // Technicall there should always be a user ... still let's return "Ok(())" for now - let sender_user_id: u64 = match sender_rpc_key { - Some(x) => x.user_id, - // Return early if the User is not found, because then it is an anonymous user - // Let's also issue a warning because obviously the RPC key should correspond to a user - None => { - warn!( - "No user was found for the following rpc key: {:?}", - rpc_secret_key_id - ); - return Ok(()); - } - }; + // Txn is not strictly necessary, but still good to keep things consistent across tables + let txn = db_conn.begin().await?; + + let sender_rpc_entity = rpc_key::Entity::find() + .filter(rpc_key::Column::Id.eq(rpc_secret_key_id)) + .one(&txn) + .await? + .context("We previous checked that the id exists, this is likely some race condition, or it just got deleted!")?; // (1) Do some general bookkeeping on the user if self.sum_credits_used == 0.into() { @@ -354,185 +349,161 @@ impl BufferedRpcQueryStats { return Ok(()); } - let sender_balance = match balance::Entity::find() - .filter(balance::Column::UserId.eq(sender_user_id)) + let sender_balance = balance::Entity::find() + .filter(balance::Column::UserId.eq(sender_rpc_entity.user_id)) .one(db_conn) .await? - { - Some(x) => x, - None => { - warn!("This user id has no balance entry! {:?}", sender_user_id); - return Ok(()); - } - }; - - let mut active_sender_balance = sender_balance.clone().into_active_model(); - - // Still subtract from the user in any case, - // Modify the balance of the sender completely (in mysql, next to the stats) - // In any case, add this to "spent" - // TODO! we need to do the math in mysql (like with `Expr::col` above). if we do the addition here, there is a race condition - active_sender_balance.used_balance = - sea_orm::Set(sender_balance.used_balance + self.sum_credits_used); - - // Also update the available balance - // TODO! this needs to be queried from the database - let new_available_balance = max( - sender_balance.available_balance - self.sum_credits_used, - Decimal::from(0), - ); - active_sender_balance.available_balance = sea_orm::Set(new_available_balance); - - active_sender_balance.save(db_conn).await?; - - let downgrade_user = match user::Entity::find() - .filter(user::Column::Id.eq(sender_user_id)) - .one(db_conn) - .await? - { - Some(x) => x, - None => { - warn!("No user was found with this sender id!"); - return Ok(()); - } - }; - - let downgrade_user_role = user_tier::Entity::find() - .filter(user_tier::Column::Id.eq(downgrade_user.user_tier_id)) - .one(db_conn) - .await? - .context(format!( - "The foreign key for the user's user_tier_id was not found! {:?}", - downgrade_user.user_tier_id + .ok_or(Web3ProxyError::BadRequest( + "Could not find rpc key in db".into(), ))?; - // Downgrade a user to premium - out of funds if there's less than 10$ in the account, and if the user was premium before - // TODO: lets let them get under $1 - // TODO: instead of checking for a specific title, downgrade if the downgrade id is set to anything - if new_available_balance < Decimal::from(10u64) && downgrade_user_role.title == "Premium" { - // TODO: we could do this outside the balance low block, but I think its fine. or better, update the cache if <$10 and downgrade if <$1 - if let Some(rpc_secret_key_cache) = rpc_secret_key_cache { - error!("expire (or probably better to update) the user cache now that the balance is low"); - // actually i think we need to have 2 caches. otherwise users with 2 keys are going to have seperate caches - // 1. rpc_secret_key_id -> AuthorizationChecks (cuz we don't want to hit the db every time) - // 2. user_id -> Balance + // I think one lock here is fine, because only one server has access to the "credits_applied_for_referee" entry + let referral_objects = match referee::Entity::find() + .filter(referee::Column::UserId.eq(sender_rpc_entity.user_id)) + .lock(LockType::Update) + .find_also_related(referrer::Entity) + .one(&txn) + .await? + { + Some(x) => Some(( + x.0, + x.1.context("Could not fine corresponding referrer code")?, + )), + None => None, + }; + + // ====================== // + // INITIATE DELTAS // + // ====================== // + // Calculate Balance Only (No referrer) + let mut sender_available_balance_delta = Decimal::from(-1) * self.sum_credits_used; + let sender_used_balance_delta = self.sum_credits_used; + let mut sender_bonus_applied; + // Calculate Referrer Bonuses + let mut referrer_balance_delta = Decimal::from(0); + + // ============================================================ // + // BASED ON REFERRERS, CALCULATE HOW MUCH SHOULD BE ATTRIBUTED // + // ============================================================ // + // If we don't lock the database as we do above on the referral_entry, we would have to do this operation on the database + if let Some((referral_entity, referrer_code_entity)) = referral_objects { + sender_bonus_applied = referral_entity.credits_applied_for_referee; + + // Calculate if we are above the usage threshold, and apply a bonus + // Optimally we would read this from the balance, but if we do it like this, we only have to lock a single table (much safer w.r.t. deadlocks) + if !referral_entity.credits_applied_for_referee + && (referral_entity.credits_applied_for_referrer * (Decimal::from(10)) + + self.sum_credits_used) + >= Decimal::from(100) + { + sender_available_balance_delta += Decimal::from(100); + sender_bonus_applied = true; } - // Only downgrade the user in local process memory, not elsewhere + // Calculate how much the referrer should get, limited to the last 12 months + // Apply 10% of the used balance as a bonus if applicable + let now = Utc::now(); + let valid_until = DateTime::::from_utc(referral_entity.referral_start_date, Utc) + + Months::new(12); - // let mut active_downgrade_user = downgrade_user.into_active_model(); - // active_downgrade_user.user_tier_id = sea_orm::Set(downgrade_user_role.id); - // active_downgrade_user.save(db_conn).await?; + if now <= valid_until { + referrer_balance_delta += self.sum_credits_used / Decimal::new(10, 0); + } + + // Do the referrer_entry updates + if referrer_balance_delta > Decimal::from(0) { + let referee_entry = referee::ActiveModel { + id: sea_orm::Unchanged(referral_entity.id), + referral_start_date: sea_orm::Unchanged(referral_entity.referral_start_date), + used_referral_code: sea_orm::Unchanged(referral_entity.used_referral_code), + user_id: sea_orm::Unchanged(referral_entity.user_id), + + credits_applied_for_referee: sea_orm::Set(sender_bonus_applied), + credits_applied_for_referrer: sea_orm::Set(referrer_balance_delta), + }; + referee::Entity::insert(referee_entry) + .on_conflict( + OnConflict::new() + .values([ + ( + referee::Column::CreditsAppliedForReferee, + // Make it a "Set" + Expr::col(referee::Column::CreditsAppliedForReferee) + .eq(sender_bonus_applied), + ), + ( + referee::Column::CreditsAppliedForReferrer, + Expr::col(referee::Column::CreditsAppliedForReferrer) + .add(referrer_balance_delta), + ), + ]) + .to_owned(), + ) + .exec(&txn) + .await? + .last_insert_id; + } } - // Get the referee, and the referrer - // (2) Look up the code that this user used. This is the referee table - let referee_object = match referee::Entity::find() - .filter(referee::Column::UserId.eq(sender_user_id)) - .one(db_conn) - .await? - { - Some(x) => x, - None => { - warn!( - "No referral code was found for this user: {:?}", - sender_user_id - ); - return Ok(()); - } + // ================================= // + // UPDATE REFERRER & USER BALANCE // + // ================================= // + let user_balance = balance::ActiveModel { + id: sea_orm::NotSet, + available_balance: sea_orm::Set(sender_available_balance_delta), + used_balance: sea_orm::Set(sender_used_balance_delta), + user_id: sea_orm::Set(sender_rpc_entity.user_id), }; - // (3) Look up the matching referrer in the referrer table - // Referral table -> Get the referee id - let user_with_that_referral_code = match referrer::Entity::find() - .filter(referrer::Column::ReferralCode.eq(referee_object.used_referral_code)) - .one(db_conn) + let _ = balance::Entity::insert(user_balance) + .on_conflict( + OnConflict::new() + .values([ + ( + balance::Column::AvailableBalance, + Expr::col(balance::Column::AvailableBalance) + .add(sender_available_balance_delta), + ), + ( + balance::Column::UsedBalance, + Expr::col(balance::Column::UsedBalance).add(sender_used_balance_delta), + ), + ]) + .to_owned(), + ) + .exec(&txn) .await? - { - Some(x) => x, - None => { - // TODO: warn seems too verbose for this. it should be fine for a user to not have a referall code, right? - warn!( - "No referrer with that referral code was found {:?}", - referee_object - ); - return Ok(()); - } - }; + .last_insert_id; - // Ok, now we add the credits to both users if applicable... - // (4 onwards) Add balance to the referrer, + if referrer_balance_delta > Decimal::from(0) { + let user_balance = balance::ActiveModel { + id: sea_orm::NotSet, + available_balance: sea_orm::Set(referrer_balance_delta), + used_balance: sea_orm::Set(Decimal::from(0)), + user_id: sea_orm::Set(sender_rpc_entity.user_id), + }; - // (5) Check if referee has used up $100.00 USD in total (Have a config item that says how many credits account to 1$) - // Get balance for the referrer (optionally make it into an active model ...) - let sender_balance = match balance::Entity::find() - .filter(balance::Column::UserId.eq(referee_object.user_id)) - .one(db_conn) - .await? - { - Some(x) => x, - None => { - warn!( - "This user id has no balance entry! {:?}", - referee_object.user_id - ); - return Ok(()); - } - }; - - // TODO: don't clone on this. use the active_model later - let mut active_sender_balance = sender_balance.clone().into_active_model(); - let referrer_balance = match balance::Entity::find() - .filter(balance::Column::UserId.eq(user_with_that_referral_code.user_id)) - .one(db_conn) - .await? - { - Some(x) => x, - None => { - warn!( - "This user id has no balance entry! {:?}", - referee_object.user_id - ); - return Ok(()); - } - }; - - // I could try to circumvene the clone here, but let's skip that for now - let mut active_referee = referee_object.clone().into_active_model(); - - // (5.1) If not, go to (7). If yes, go to (6) - // Hardcode this parameter also in config, so it's easier to tune - if !referee_object.credits_applied_for_referee - && (sender_balance.used_balance + self.sum_credits_used) >= Decimal::from(100) - { - // (6) If the credits have not yet been applied to the referee, apply 10M credits / $100.00 USD worth of credits. - // Make it into an active model, and add credits - // TODO! race condition here! we can't set. need to let the db do the math - active_sender_balance.available_balance = - sea_orm::Set(sender_balance.available_balance + Decimal::from(100)); - // Also mark referral as "credits_applied_for_referee" - active_referee.credits_applied_for_referee = sea_orm::Set(true); + let _ = balance::Entity::insert(user_balance) + .on_conflict( + OnConflict::new() + .values([( + balance::Column::AvailableBalance, + Expr::col(balance::Column::AvailableBalance) + .add(referrer_balance_delta), + )]) + .to_owned(), + ) + .exec(&txn) + .await? + .last_insert_id; } - // (7) If the referral-start-date has not been passed, apply 10% of the credits to the referrer. - let now = Utc::now(); - let valid_until = DateTime::::from_utc(referee_object.referral_start_date, Utc) - .checked_add_months(Months::new(12)) - .unwrap(); - if now <= valid_until { - let mut active_referrer_balance = referrer_balance.clone().into_active_model(); - // Add 10% referral fees ... - active_referrer_balance.available_balance = sea_orm::Set( - referrer_balance.available_balance + self.sum_credits_used / Decimal::from(10), - ); - // Also record how much the current referrer has "provided" / "gifted" away - active_referee.credits_applied_for_referrer = - sea_orm::Set(referee_object.credits_applied_for_referrer + self.sum_credits_used); - active_referrer_balance.save(db_conn).await?; - } - - active_sender_balance.save(db_conn).await?; - active_referee.save(db_conn).await?; + // ================================ // + // TODO: REFRESH USER ROLE IN CACHE // + // ================================ // + txn.commit() + .await + .context("Failed to update referral and balance updates")?; Ok(()) } diff --git a/web3_proxy/src/stats/stat_buffer.rs b/web3_proxy/src/stats/stat_buffer.rs index 9d59f0d5..5efd9822 100644 --- a/web3_proxy/src/stats/stat_buffer.rs +++ b/web3_proxy/src/stats/stat_buffer.rs @@ -102,9 +102,6 @@ impl StatBuffer { let mut db_save_interval = interval(Duration::from_secs(self.db_save_interval_seconds as u64)); - // TODO: Somewhere here we should probably be updating the balance of the user - // And also update the credits used etc. for the referred user - loop { tokio::select! { stat = stat_receiver.recv_async() => {