Making Balance Changes Atomic (Read and Write) (#84)

* tried to make the balance_entry atomic, asking for feedback

* added locked select everywhere and inside the txn, it should be atomic now

* forgot about overwrite (referee vs sender entities were the same, overwriting each other), fixed it

* will now trigger refreshing cache when balance thresholds are met

* Revert "will now trigger refreshing cache when balance thresholds are met"

This reverts commit b1215ddc43a1358f6086bddbd106996165431165.

* will not modify userTier in database anymore

* made payment atomic

* made updates mostly atomic, locking only the referral table (bcs more complex logic)

* latest changes
This commit is contained in:
David 2023-06-04 18:32:53 +02:00 committed by GitHub
parent 79e52f4cdb
commit f66edd961b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 166 additions and 199 deletions

@ -1030,7 +1030,7 @@ impl Web3ProxyApp {
let mut collected_rpcs: Vec<Arc<Web3Rpc>> = vec![];
for response in responses {
// TODO: any way to attach the tried rpcs to the error? it is likely helpful
let (status_code, response, rpcs) = response;
let (_status_code, response, rpcs) = response;
collected.push(response);
collected_rpcs.extend(rpcs.into_iter().filter(|x| {

@ -19,7 +19,7 @@ use axum_macros::debug_handler;
use chrono::{TimeZone, Utc};
use entities::{
admin, admin_increase_balance_receipt, admin_trail, balance, login, pending_login, rpc_key,
user, user_tier,
user,
};
use ethers::{prelude::Address, types::Bytes};
use hashbrown::HashMap;
@ -124,7 +124,6 @@ pub async fn admin_increase_balance(
user_id: sea_orm::Set(user_entry.id),
..Default::default()
};
balance::Entity::insert(balance_entry)
.on_conflict(
OnConflict::new()

@ -144,7 +144,7 @@ pub async fn user_balance_post(
.into_response();
return Ok(response);
}
};
// get the transaction receipt
let transaction_receipt = app

@ -3,8 +3,8 @@
pub mod db_queries;
pub mod influxdb_queries;
mod stat_buffer;
pub use stat_buffer::{SpawnedStatBuffer, StatBuffer};
use std::cmp;
use crate::app::RpcSecretKeyCache;
use crate::errors::{Web3ProxyError, Web3ProxyResult};
@ -15,18 +15,18 @@ use axum::headers::Origin;
use chrono::{DateTime, Months, TimeZone, Utc};
use derive_more::From;
use entities::sea_orm_active_enums::TrackingLevel;
use entities::{balance, referee, referrer, rpc_accounting_v2, rpc_key, user, user_tier};
use entities::{balance, referee, referrer, rpc_accounting_v2, rpc_key, user};
use influxdb2::models::DataPoint;
use log::{error, trace, warn};
use log::trace;
use migration::sea_orm::prelude::Decimal;
use migration::sea_orm::QuerySelect;
use migration::sea_orm::{
self, ActiveModelTrait, ColumnTrait, DatabaseConnection, EntityTrait, IntoActiveModel,
QueryFilter,
QueryFilter, TransactionTrait,
};
use migration::{Expr, OnConflict};
use migration::{Expr, LockType, OnConflict};
use num_traits::ToPrimitive;
use parking_lot::Mutex;
use std::cmp::max;
use std::num::NonZeroU64;
use std::sync::atomic::{self, Ordering};
use std::sync::Arc;
@ -240,7 +240,9 @@ impl BufferedRpcQueryStats {
let period_datetime = Utc.timestamp_opt(key.response_timestamp, 0).unwrap();
// this is a lot of variables
// =============================== //
// UPDATE STATISTICS //
// =============================== //
let accounting_entry = rpc_accounting_v2::ActiveModel {
id: sea_orm::NotSet,
rpc_key_id: sea_orm::Set(key.rpc_secret_key_id.map(Into::into)),
@ -318,35 +320,28 @@ impl BufferedRpcQueryStats {
.exec(db_conn)
.await?;
// TODO: Refactor this function a bit more just so it looks and feels nicer
// TODO: Figure out how to go around unmatching, it shouldn't return an error, but this is disgusting
// All the referral & balance arithmetic takes place here
// =============================== //
// PREPARE FOR UPDATE USER BALANCE //
// =============================== //
let rpc_secret_key_id: u64 = match key.rpc_secret_key_id {
Some(x) => x.into(),
// Return early if the RPC key is not found, because then it is an anonymous user
None => return Ok(()),
};
// (1) Get the user with that RPC key. This is the referee
let sender_rpc_key = rpc_key::Entity::find()
.filter(rpc_key::Column::Id.eq(rpc_secret_key_id))
.one(db_conn)
.await?;
// =============================== //
// GET ALL (STATIC) VARIABLES //
// =============================== //
// Get the user with that RPC key. This is also the referee
// Technicall there should always be a user ... still let's return "Ok(())" for now
let sender_user_id: u64 = match sender_rpc_key {
Some(x) => x.user_id,
// Return early if the User is not found, because then it is an anonymous user
// Let's also issue a warning because obviously the RPC key should correspond to a user
None => {
warn!(
"No user was found for the following rpc key: {:?}",
rpc_secret_key_id
);
return Ok(());
}
};
// Txn is not strictly necessary, but still good to keep things consistent across tables
let txn = db_conn.begin().await?;
let sender_rpc_entity = rpc_key::Entity::find()
.filter(rpc_key::Column::Id.eq(rpc_secret_key_id))
.one(&txn)
.await?
.context("We previous checked that the id exists, this is likely some race condition, or it just got deleted!")?;
// (1) Do some general bookkeeping on the user
if self.sum_credits_used == 0.into() {
@ -354,185 +349,161 @@ impl BufferedRpcQueryStats {
return Ok(());
}
let sender_balance = match balance::Entity::find()
.filter(balance::Column::UserId.eq(sender_user_id))
let sender_balance = balance::Entity::find()
.filter(balance::Column::UserId.eq(sender_rpc_entity.user_id))
.one(db_conn)
.await?
{
Some(x) => x,
None => {
warn!("This user id has no balance entry! {:?}", sender_user_id);
return Ok(());
}
};
let mut active_sender_balance = sender_balance.clone().into_active_model();
// Still subtract from the user in any case,
// Modify the balance of the sender completely (in mysql, next to the stats)
// In any case, add this to "spent"
// TODO! we need to do the math in mysql (like with `Expr::col` above). if we do the addition here, there is a race condition
active_sender_balance.used_balance =
sea_orm::Set(sender_balance.used_balance + self.sum_credits_used);
// Also update the available balance
// TODO! this needs to be queried from the database
let new_available_balance = max(
sender_balance.available_balance - self.sum_credits_used,
Decimal::from(0),
);
active_sender_balance.available_balance = sea_orm::Set(new_available_balance);
active_sender_balance.save(db_conn).await?;
let downgrade_user = match user::Entity::find()
.filter(user::Column::Id.eq(sender_user_id))
.one(db_conn)
.await?
{
Some(x) => x,
None => {
warn!("No user was found with this sender id!");
return Ok(());
}
};
let downgrade_user_role = user_tier::Entity::find()
.filter(user_tier::Column::Id.eq(downgrade_user.user_tier_id))
.one(db_conn)
.await?
.context(format!(
"The foreign key for the user's user_tier_id was not found! {:?}",
downgrade_user.user_tier_id
.ok_or(Web3ProxyError::BadRequest(
"Could not find rpc key in db".into(),
))?;
// Downgrade a user to premium - out of funds if there's less than 10$ in the account, and if the user was premium before
// TODO: lets let them get under $1
// TODO: instead of checking for a specific title, downgrade if the downgrade id is set to anything
if new_available_balance < Decimal::from(10u64) && downgrade_user_role.title == "Premium" {
// TODO: we could do this outside the balance low block, but I think its fine. or better, update the cache if <$10 and downgrade if <$1
if let Some(rpc_secret_key_cache) = rpc_secret_key_cache {
error!("expire (or probably better to update) the user cache now that the balance is low");
// actually i think we need to have 2 caches. otherwise users with 2 keys are going to have seperate caches
// 1. rpc_secret_key_id -> AuthorizationChecks (cuz we don't want to hit the db every time)
// 2. user_id -> Balance
// I think one lock here is fine, because only one server has access to the "credits_applied_for_referee" entry
let referral_objects = match referee::Entity::find()
.filter(referee::Column::UserId.eq(sender_rpc_entity.user_id))
.lock(LockType::Update)
.find_also_related(referrer::Entity)
.one(&txn)
.await?
{
Some(x) => Some((
x.0,
x.1.context("Could not fine corresponding referrer code")?,
)),
None => None,
};
// ====================== //
// INITIATE DELTAS //
// ====================== //
// Calculate Balance Only (No referrer)
let mut sender_available_balance_delta = Decimal::from(-1) * self.sum_credits_used;
let sender_used_balance_delta = self.sum_credits_used;
let mut sender_bonus_applied;
// Calculate Referrer Bonuses
let mut referrer_balance_delta = Decimal::from(0);
// ============================================================ //
// BASED ON REFERRERS, CALCULATE HOW MUCH SHOULD BE ATTRIBUTED //
// ============================================================ //
// If we don't lock the database as we do above on the referral_entry, we would have to do this operation on the database
if let Some((referral_entity, referrer_code_entity)) = referral_objects {
sender_bonus_applied = referral_entity.credits_applied_for_referee;
// Calculate if we are above the usage threshold, and apply a bonus
// Optimally we would read this from the balance, but if we do it like this, we only have to lock a single table (much safer w.r.t. deadlocks)
if !referral_entity.credits_applied_for_referee
&& (referral_entity.credits_applied_for_referrer * (Decimal::from(10))
+ self.sum_credits_used)
>= Decimal::from(100)
{
sender_available_balance_delta += Decimal::from(100);
sender_bonus_applied = true;
}
// Only downgrade the user in local process memory, not elsewhere
// Calculate how much the referrer should get, limited to the last 12 months
// Apply 10% of the used balance as a bonus if applicable
let now = Utc::now();
let valid_until = DateTime::<Utc>::from_utc(referral_entity.referral_start_date, Utc)
+ Months::new(12);
// let mut active_downgrade_user = downgrade_user.into_active_model();
// active_downgrade_user.user_tier_id = sea_orm::Set(downgrade_user_role.id);
// active_downgrade_user.save(db_conn).await?;
if now <= valid_until {
referrer_balance_delta += self.sum_credits_used / Decimal::new(10, 0);
}
// Do the referrer_entry updates
if referrer_balance_delta > Decimal::from(0) {
let referee_entry = referee::ActiveModel {
id: sea_orm::Unchanged(referral_entity.id),
referral_start_date: sea_orm::Unchanged(referral_entity.referral_start_date),
used_referral_code: sea_orm::Unchanged(referral_entity.used_referral_code),
user_id: sea_orm::Unchanged(referral_entity.user_id),
credits_applied_for_referee: sea_orm::Set(sender_bonus_applied),
credits_applied_for_referrer: sea_orm::Set(referrer_balance_delta),
};
referee::Entity::insert(referee_entry)
.on_conflict(
OnConflict::new()
.values([
(
referee::Column::CreditsAppliedForReferee,
// Make it a "Set"
Expr::col(referee::Column::CreditsAppliedForReferee)
.eq(sender_bonus_applied),
),
(
referee::Column::CreditsAppliedForReferrer,
Expr::col(referee::Column::CreditsAppliedForReferrer)
.add(referrer_balance_delta),
),
])
.to_owned(),
)
.exec(&txn)
.await?
.last_insert_id;
}
}
// Get the referee, and the referrer
// (2) Look up the code that this user used. This is the referee table
let referee_object = match referee::Entity::find()
.filter(referee::Column::UserId.eq(sender_user_id))
.one(db_conn)
.await?
{
Some(x) => x,
None => {
warn!(
"No referral code was found for this user: {:?}",
sender_user_id
);
return Ok(());
}
// ================================= //
// UPDATE REFERRER & USER BALANCE //
// ================================= //
let user_balance = balance::ActiveModel {
id: sea_orm::NotSet,
available_balance: sea_orm::Set(sender_available_balance_delta),
used_balance: sea_orm::Set(sender_used_balance_delta),
user_id: sea_orm::Set(sender_rpc_entity.user_id),
};
// (3) Look up the matching referrer in the referrer table
// Referral table -> Get the referee id
let user_with_that_referral_code = match referrer::Entity::find()
.filter(referrer::Column::ReferralCode.eq(referee_object.used_referral_code))
.one(db_conn)
let _ = balance::Entity::insert(user_balance)
.on_conflict(
OnConflict::new()
.values([
(
balance::Column::AvailableBalance,
Expr::col(balance::Column::AvailableBalance)
.add(sender_available_balance_delta),
),
(
balance::Column::UsedBalance,
Expr::col(balance::Column::UsedBalance).add(sender_used_balance_delta),
),
])
.to_owned(),
)
.exec(&txn)
.await?
{
Some(x) => x,
None => {
// TODO: warn seems too verbose for this. it should be fine for a user to not have a referall code, right?
warn!(
"No referrer with that referral code was found {:?}",
referee_object
);
return Ok(());
}
};
.last_insert_id;
// Ok, now we add the credits to both users if applicable...
// (4 onwards) Add balance to the referrer,
if referrer_balance_delta > Decimal::from(0) {
let user_balance = balance::ActiveModel {
id: sea_orm::NotSet,
available_balance: sea_orm::Set(referrer_balance_delta),
used_balance: sea_orm::Set(Decimal::from(0)),
user_id: sea_orm::Set(sender_rpc_entity.user_id),
};
// (5) Check if referee has used up $100.00 USD in total (Have a config item that says how many credits account to 1$)
// Get balance for the referrer (optionally make it into an active model ...)
let sender_balance = match balance::Entity::find()
.filter(balance::Column::UserId.eq(referee_object.user_id))
.one(db_conn)
.await?
{
Some(x) => x,
None => {
warn!(
"This user id has no balance entry! {:?}",
referee_object.user_id
);
return Ok(());
}
};
// TODO: don't clone on this. use the active_model later
let mut active_sender_balance = sender_balance.clone().into_active_model();
let referrer_balance = match balance::Entity::find()
.filter(balance::Column::UserId.eq(user_with_that_referral_code.user_id))
.one(db_conn)
.await?
{
Some(x) => x,
None => {
warn!(
"This user id has no balance entry! {:?}",
referee_object.user_id
);
return Ok(());
}
};
// I could try to circumvene the clone here, but let's skip that for now
let mut active_referee = referee_object.clone().into_active_model();
// (5.1) If not, go to (7). If yes, go to (6)
// Hardcode this parameter also in config, so it's easier to tune
if !referee_object.credits_applied_for_referee
&& (sender_balance.used_balance + self.sum_credits_used) >= Decimal::from(100)
{
// (6) If the credits have not yet been applied to the referee, apply 10M credits / $100.00 USD worth of credits.
// Make it into an active model, and add credits
// TODO! race condition here! we can't set. need to let the db do the math
active_sender_balance.available_balance =
sea_orm::Set(sender_balance.available_balance + Decimal::from(100));
// Also mark referral as "credits_applied_for_referee"
active_referee.credits_applied_for_referee = sea_orm::Set(true);
let _ = balance::Entity::insert(user_balance)
.on_conflict(
OnConflict::new()
.values([(
balance::Column::AvailableBalance,
Expr::col(balance::Column::AvailableBalance)
.add(referrer_balance_delta),
)])
.to_owned(),
)
.exec(&txn)
.await?
.last_insert_id;
}
// (7) If the referral-start-date has not been passed, apply 10% of the credits to the referrer.
let now = Utc::now();
let valid_until = DateTime::<Utc>::from_utc(referee_object.referral_start_date, Utc)
.checked_add_months(Months::new(12))
.unwrap();
if now <= valid_until {
let mut active_referrer_balance = referrer_balance.clone().into_active_model();
// Add 10% referral fees ...
active_referrer_balance.available_balance = sea_orm::Set(
referrer_balance.available_balance + self.sum_credits_used / Decimal::from(10),
);
// Also record how much the current referrer has "provided" / "gifted" away
active_referee.credits_applied_for_referrer =
sea_orm::Set(referee_object.credits_applied_for_referrer + self.sum_credits_used);
active_referrer_balance.save(db_conn).await?;
}
active_sender_balance.save(db_conn).await?;
active_referee.save(db_conn).await?;
// ================================ //
// TODO: REFRESH USER ROLE IN CACHE //
// ================================ //
txn.commit()
.await
.context("Failed to update referral and balance updates")?;
Ok(())
}

@ -102,9 +102,6 @@ impl StatBuffer {
let mut db_save_interval =
interval(Duration::from_secs(self.db_save_interval_seconds as u64));
// TODO: Somewhere here we should probably be updating the balance of the user
// And also update the credits used etc. for the referred user
loop {
tokio::select! {
stat = stat_receiver.recv_async() => {