Bryan devel (#176)

* refactor more balance logic

* refactor more balance checks

* doc running full test suite
This commit is contained in:
Bryan Stitt 2023-07-09 21:37:50 -07:00 committed by GitHub
parent 0ddd3662ac
commit a43fda0c32
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 152 additions and 140 deletions

@ -47,6 +47,12 @@ Quickly run tests:
RUST_LOG=web3_proxy=trace,info cargo nextest run RUST_LOG=web3_proxy=trace,info cargo nextest run
``` ```
Run more tests:
```
RUST_LOG=web3_proxy=trace,info cargo nextest run --features tests-needing-docker
```
## Common commands ## Common commands
Create a user: Create a user:

@ -15,6 +15,14 @@ pub struct Balance {
} }
impl Balance { impl Balance {
pub fn active_premium(&self) -> bool {
self.was_ever_premium() && self.total_deposits > self.total_spent_paid_credits
}
pub fn was_ever_premium(&self) -> bool {
self.user_id != 0 && self.total_deposits >= Decimal::from(10)
}
pub fn remaining(&self) -> Decimal { pub fn remaining(&self) -> Decimal {
self.total_deposits - self.total_spent_paid_credits self.total_deposits - self.total_spent_paid_credits
} }

@ -1253,13 +1253,11 @@ impl Web3ProxyApp {
// TODO: Do the logic here, as to how to treat the user, based on balance and initial check // TODO: Do the logic here, as to how to treat the user, based on balance and initial check
// Clear the cache (not the login!) in the stats if a tier-change happens (clear, but don't modify roles) // Clear the cache (not the login!) in the stats if a tier-change happens (clear, but don't modify roles)
if let Some(downgrade_user_tier) = user_tier_model.downgrade_tier_id { if let Some(downgrade_user_tier) = user_tier_model.downgrade_tier_id {
let balance = latest_balance.read().await.clone(); let active_premium = latest_balance.read().await.active_premium();
// only consider the user premium if they have paid at least $10 and have a balance > $.01 // only consider the user premium if they have paid at least $10 and have a balance > $.01
// otherwise, set user_tier_model to the downograded tier // otherwise, set user_tier_model to the downograded tier
if balance.total_deposits < Decimal::from(10) if !active_premium {
|| balance.remaining() < Decimal::new(1, 2)
{
// TODO: include boolean to mark that the user is downgraded // TODO: include boolean to mark that the user is downgraded
user_tier_model = user_tier_model =
user_tier::Entity::find_by_id(downgrade_user_tier) user_tier::Entity::find_by_id(downgrade_user_tier)

@ -21,7 +21,6 @@ use fstrings::{f, format_args_f};
use hashbrown::HashMap; use hashbrown::HashMap;
use influxdb2::api::query::FluxRecord; use influxdb2::api::query::FluxRecord;
use influxdb2::models::Query; use influxdb2::models::Query;
use migration::sea_orm::prelude::Decimal;
use migration::sea_orm::{ColumnTrait, EntityTrait, QueryFilter}; use migration::sea_orm::{ColumnTrait, EntityTrait, QueryFilter};
use serde_json::json; use serde_json::json;
use tracing::{debug, error, trace, warn}; use tracing::{debug, error, trace, warn};
@ -59,7 +58,6 @@ pub async fn query_user_stats<'a>(
.unwrap_or_else(|| caller_user.as_ref().map(|x| x.id).unwrap_or_default()); .unwrap_or_else(|| caller_user.as_ref().map(|x| x.id).unwrap_or_default());
// Only allow stats if the user has an active premium role // Only allow stats if the user has an active premium role
// TODO: move this to a helper. it should be simple to check that a user has an active premium account
if let Some(caller_user) = &caller_user { if let Some(caller_user) = &caller_user {
// get the balance of the user whose stats we are going to fetch (might be self, but might be another user) // get the balance of the user whose stats we are going to fetch (might be self, but might be another user)
let balance = match Balance::try_from_db(db_replica.as_ref(), user_id).await? { let balance = match Balance::try_from_db(db_replica.as_ref(), user_id).await? {
@ -71,29 +69,28 @@ pub async fn query_user_stats<'a>(
Some(x) => x, Some(x) => x,
}; };
// TODO: We should add the threshold that determines if a user is premium into app.config. hard coding to $10 works for now // get the user tier so we can see if it is a tier that has downgrades
// TODO: @Bryan this condition seems off, can you double-check? let relevant_balance_user_tier_id = if user_id == caller_user.id {
if balance.total_deposits < Decimal::from(10) || balance.remaining() <= Decimal::from(0) { // use the caller's tier
// get the user tier so we can see if it is a tier that has downgrades caller_user.user_tier_id
let relevant_balance_user_tier_id = if user_id == caller_user.id { } else {
caller_user.user_tier_id // use the tier of the primary user from a secondary user
} else { let user = user::Entity::find_by_id(user_id)
let user = user::Entity::find_by_id(user_id)
.one(db_replica.as_ref())
.await?
.web3_context("user_id not found")?;
user.user_tier_id
};
let user_tier = user_tier::Entity::find_by_id(relevant_balance_user_tier_id)
.one(db_replica.as_ref()) .one(db_replica.as_ref())
.await? .await?
.web3_context("user_tier not found")?; .web3_context("user_id not found")?;
if user_tier.downgrade_tier_id.is_some() { user.user_tier_id
trace!(%user_id, "User does not have enough balance to qualify for premium"); };
return Err(Web3ProxyError::PaymentRequired);
} let user_tier = user_tier::Entity::find_by_id(relevant_balance_user_tier_id)
.one(db_replica.as_ref())
.await?
.web3_context("user_tier not found")?;
if user_tier.downgrade_tier_id.is_some() && !balance.active_premium() {
trace!(%user_id, "User does not have enough balance to qualify for premium");
return Err(Web3ProxyError::PaymentRequired);
} }
if user_id != caller_user.id { if user_id != caller_user.id {

@ -20,9 +20,9 @@ use influxdb2::models::DataPoint;
use migration::sea_orm::prelude::Decimal; use migration::sea_orm::prelude::Decimal;
use migration::sea_orm::{ use migration::sea_orm::{
self, ActiveModelTrait, ColumnTrait, DatabaseConnection, DbConn, EntityTrait, IntoActiveModel, self, ActiveModelTrait, ColumnTrait, DatabaseConnection, DbConn, EntityTrait, IntoActiveModel,
QueryFilter, TransactionTrait, QueryFilter, QuerySelect, TransactionTrait,
}; };
use migration::{Expr, OnConflict}; use migration::{Expr, LockType, OnConflict};
use num_traits::ToPrimitive; use num_traits::ToPrimitive;
use parking_lot::Mutex; use parking_lot::Mutex;
use std::borrow::Cow; use std::borrow::Cow;
@ -33,7 +33,7 @@ use std::str::FromStr;
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::RwLock as AsyncRwLock; use tokio::sync::RwLock as AsyncRwLock;
use tracing::{error, info, trace}; use tracing::{error, trace};
use crate::balance::Balance; use crate::balance::Balance;
pub use stat_buffer::{SpawnedStatBuffer, StatBuffer}; pub use stat_buffer::{SpawnedStatBuffer, StatBuffer};
@ -221,16 +221,18 @@ impl BufferedRpcQueryStats {
chain_id: u64, chain_id: u64,
db_conn: &DatabaseConnection, db_conn: &DatabaseConnection,
key: &RpcQueryKey, key: &RpcQueryKey,
user_balance: Decimal, active_premium: bool,
) -> Web3ProxyResult<Decimal> { ) -> Web3ProxyResult<Decimal> {
let period_datetime = Utc.timestamp_opt(key.response_timestamp, 0).unwrap(); let period_datetime = Utc.timestamp_opt(key.response_timestamp, 0).unwrap();
// Because reading the balance and updating the stats here is not atomically locked, this may lead to a negative balance // // Because reading the balance and updating the stats here is not atomically locked, this may lead to a negative balance
// This negative balance shouldn't be large tough // // This negative balance shouldn't be large tough
let paid_credits_used = if self.sum_credits_used >= user_balance { // // TODO: I'm not so sure about this. @david can you explain more? if someone spends over their balance, they **should** go slightly negative. after all, they would have received the premium limits for these queries
user_balance // // sum_credits_used is definitely correct. the balance can be slightly off. so it seems like we should trust sum_credits_used over balance
} else { let paid_credits_used = if active_premium {
self.sum_credits_used self.sum_credits_used
} else {
0.into()
}; };
// =============================== // // =============================== //
@ -319,7 +321,7 @@ impl BufferedRpcQueryStats {
.exec(db_conn) .exec(db_conn)
.await?; .await?;
Ok(paid_credits_used) Ok(self.sum_credits_used)
} }
// TODO: This is basically a duplicate with the balance_checks, except the database // TODO: This is basically a duplicate with the balance_checks, except the database
@ -329,9 +331,14 @@ impl BufferedRpcQueryStats {
user_id: u64, user_id: u64,
user_balance_cache: &UserBalanceCache, user_balance_cache: &UserBalanceCache,
db_conn: &DbConn, db_conn: &DbConn,
) -> Arc<AsyncRwLock<Balance>> { ) -> Web3ProxyResult<Arc<AsyncRwLock<Balance>>> {
if user_id == 0 {
return Ok(Arc::new(AsyncRwLock::new(Balance::default())));
}
trace!("Will get it from the balance cache"); trace!("Will get it from the balance cache");
user_balance_cache
let x = user_balance_cache
.try_get_with(user_id, async { .try_get_with(user_id, async {
let x = match Balance::try_from_db(db_conn, user_id).await? { let x = match Balance::try_from_db(db_conn, user_id).await? {
Some(x) => x, Some(x) => x,
@ -339,12 +346,9 @@ impl BufferedRpcQueryStats {
}; };
Ok(Arc::new(AsyncRwLock::new(x))) Ok(Arc::new(AsyncRwLock::new(x)))
}) })
.await .await?;
.unwrap_or_else(|err| {
error!("Could not find balance for user !{}", err); Ok(x)
// We are just instantiating this for type-safety's sake
Arc::new(AsyncRwLock::new(Balance::default()))
})
} }
// TODO: take a db transaction instead so that we can batch? // TODO: take a db transaction instead so that we can batch?
@ -365,18 +369,19 @@ impl BufferedRpcQueryStats {
))); )));
} }
// TODO: rename to owner_id?
let sender_user_id = key.rpc_key_user_id.map_or(0, |x| x.get()); let sender_user_id = key.rpc_key_user_id.map_or(0, |x| x.get());
// Gathering cache and database rows // Gathering cache and database rows
let user_balance = self let user_balance = self
._get_user_balance(sender_user_id, user_balance_cache, db_conn) ._get_user_balance(sender_user_id, user_balance_cache, db_conn)
.await; .await?;
let mut user_balance = user_balance.write().await; let mut user_balance = user_balance.write().await;
// First of all, save the statistics to the database: // First of all, save the statistics to the database:
let paid_credits_used = self let paid_credits_used = self
._save_db_stats(chain_id, db_conn, &key, user_balance.remaining()) ._save_db_stats(chain_id, db_conn, &key, user_balance.active_premium())
.await?; .await?;
// No need to continue if no credits were used // No need to continue if no credits were used
@ -387,16 +392,13 @@ impl BufferedRpcQueryStats {
// Update and possible invalidate rpc caches if necessary (if there was a downgrade) // Update and possible invalidate rpc caches if necessary (if there was a downgrade)
{ {
let balance_before = user_balance.remaining(); let premium_before = user_balance.active_premium();
info!("Balance before is {:?}", balance_before);
user_balance.total_spent_paid_credits += paid_credits_used; user_balance.total_spent_paid_credits += paid_credits_used;
// Invalidate caches if remaining is below a threshold // Invalidate caches if remaining is getting close to $0
// It will be re-fetched again if needed // It will be re-fetched again if needed
if sender_user_id != 0 if premium_before && user_balance.remaining() < Decimal::from(1) {
&& balance_before > Decimal::from(10)
&& user_balance.remaining() < Decimal::from(10)
{
let rpc_keys = rpc_key::Entity::find() let rpc_keys = rpc_key::Entity::find()
.filter(rpc_key::Column::UserId.eq(sender_user_id)) .filter(rpc_key::Column::UserId.eq(sender_user_id))
.all(db_conn) .all(db_conn)
@ -411,100 +413,101 @@ impl BufferedRpcQueryStats {
} }
} }
// Start a transaction if user_balance.active_premium() {
let txn = db_conn.begin().await?; // Start a transaction
let txn = db_conn.begin().await?;
// Apply all the referral logic; let's keep it simple and flat for now // Apply all the referral logic; let's keep it simple and flat for now
// Calculate if we are above the usage threshold, and apply a bonus // Calculate if we are above the usage threshold, and apply a bonus
// Optimally we would read this from the balance, but if we do it like this, we only have to lock a single table (much safer w.r.t. deadlocks) // Optimally we would read this from the balance, but if we do it like this, we only have to lock a single table (much safer w.r.t. deadlocks)
// referral_entity.credits_applied_for_referrer * (Decimal::from(10) checks (atomically using this table only), whether the user has brought in >$100 to the referer // referral_entity.credits_applied_for_referrer * (Decimal::from(10) checks (atomically using this table only), whether the user has brought in >$100 to the referer
// In this case, the sender receives $100 as a bonus / gift // In this case, the sender receives $100 as a bonus / gift
// Apply a 10$ bonus onto the user, if the user has spent 100$ // Apply a 10$ bonus onto the user, if the user has spent 100$
match referee::Entity::find() match referee::Entity::find()
.filter(referee::Column::UserId.eq(sender_user_id)) .filter(referee::Column::UserId.eq(sender_user_id))
.find_also_related(referrer::Entity) .find_also_related(referrer::Entity)
.one(&txn) .lock(LockType::Update)
.await? .one(&txn)
{ .await?
Some((referral_entity, Some(referrer))) => { {
// Get the balance for the referrer, see if they're premium or not Some((referral_entity, Some(referrer))) => {
let referrer_balance = self // Get the balance for the referrer, see if they're premium or not
._get_user_balance(referrer.user_id, user_balance_cache, db_conn) let referrer_balance = self
.await; ._get_user_balance(referrer.user_id, user_balance_cache, db_conn)
// Just to keep locking simple, keep things where they are .await?;
let referrer_balance = referrer_balance.read().await;
let mut new_referee_entity: referee::ActiveModel;
let bonus_for_user_threshold = Decimal::from(100); // Just to keep locking simple, read and clone. if the value is slightly delayed, that is okay
let bonus_for_user = Decimal::from(10); let referrer_balance = referrer_balance.read().await.clone();
// Provide one-time bonus to user, if more than 100$ was spent, // Apply the bonuses only if they have the necessary premium statuses
// and if the one-time bonus was not already provided if referrer_balance.was_ever_premium() {
if referral_entity.one_time_bonus_applied_for_referee.is_zero() // spend $100
&& (referral_entity.credits_applied_for_referrer * Decimal::from(10) let bonus_for_user_threshold = Decimal::from(100);
+ self.sum_credits_used) // get $10
>= bonus_for_user_threshold let bonus_for_user = Decimal::from(10);
{
trace!("Adding sender bonus balance"); let referral_start_date = referral_entity.referral_start_date;
// TODO: Should it be Unchanged, or do we have to load all numbers there again ..
let mut new_referral_entity = referral_entity.clone().into_active_model(); let mut referral_entity = referral_entity.into_active_model();
new_referral_entity.one_time_bonus_applied_for_referee =
sea_orm::Set(bonus_for_user); // Provide one-time bonus to user, if more than 100$ was spent,
// Update the cache // and if the one-time bonus was not already provided
{ // TODO: make sure that if we change the bonus from 10%, we also change this multiplication of 10!
// Also no need to invalidate the cache here, just by itself if referral_entity
user_balance.total_deposits += bonus_for_user; .one_time_bonus_applied_for_referee
.as_ref()
.is_zero()
&& (referral_entity.credits_applied_for_referrer.as_ref()
* Decimal::from(10)
+ self.sum_credits_used)
>= bonus_for_user_threshold
{
trace!("Adding sender bonus balance");
referral_entity.one_time_bonus_applied_for_referee =
sea_orm::Set(bonus_for_user);
// Update the cache
user_balance.total_deposits += bonus_for_user;
}
let now = Utc::now();
let valid_until =
DateTime::<Utc>::from_utc(referral_start_date, Utc) + Months::new(12);
// If the referrer ever had premium, provide credits to them
// Also only works if the referrer referred the person less than 1 year ago
// TODO: Perhaps let's not worry about the referral cache here, to avoid deadlocks (hence only reading)
if now <= valid_until {
let referrer_bonus = self.sum_credits_used / Decimal::from(10);
referral_entity.credits_applied_for_referrer = sea_orm::Set(
referral_entity.credits_applied_for_referrer.as_ref()
+ referrer_bonus,
);
// No need to invalidate the referrer every single time;
// this is no major change and can wait for a bit
// Let's not worry about the referrer balance bcs possibility of deadlock
// referrer_balance.total_deposits += referrer_bonus;
}
// The resulting field will not be read again, so I will not try to turn the ActiveModel into a Model one
referral_entity.save(&txn).await?;
} }
// The resulting field will not be read again, so I will not try to turn the ActiveModel into a Model one
new_referral_entity = new_referral_entity.save(&txn).await?;
} }
Some((referee, None)) => {
// Apply the bonus to the referrer if they are premium right now error!(
{ ?referee,
let now = Utc::now(); "No referrer code found for this referrer, this should never happen!",
let valid_until = );
DateTime::<Utc>::from_utc(referral_entity.referral_start_date, Utc)
+ Months::new(12);
// There must be a conflict, if there isn't than there's a clear bug!
// If the referrer has more than $10, provide credits to them
// Also only works if the referrer referred the person less than 1 year ago
// TODO: Perhaps let's not worry about the referral cache here, to avoid deadlocks (hence only reading)
let referrer_bonus = self.sum_credits_used / Decimal::from(10);
if referrer_balance.remaining() > Decimal::from(10) && now <= valid_until {
referee::Entity::insert(referral_entity.into_active_model())
.on_conflict(
OnConflict::new()
.values([(
// Provide credits to referrer
referee::Column::CreditsAppliedForReferrer,
Expr::col(referee::Column::CreditsAppliedForReferrer)
.add(referrer_bonus),
)])
.to_owned(),
)
.exec(&txn)
.await?;
}
// No need to invalidate the referrer every single time;
// this is no major change and can wait for a bit
// Let's not worry about the referrer balance bcs possibility of deadlock
// referrer_balance.total_deposits += referrer_bonus;
} }
} _ => {}
Some((referee, None)) => { };
error!(
"No referrer code found for this referrer, this should never happen! {:?}",
referee
);
}
_ => {}
};
// Finally commit the transaction in the database // Finally commit the transaction in the database
txn.commit() txn.commit()
.await .await
.context("Failed to update referral and balance updates")?; .context("Failed to update referral and balance updates")?;
}
Ok(()) Ok(())
} }