From a43fda0c3211d328d1ab709185d391ea2aeb8b80 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Sun, 9 Jul 2023 21:37:50 -0700 Subject: [PATCH] Bryan devel (#176) * refactor more balance logic * refactor more balance checks * doc running full test suite --- README.md | 6 + web3_proxy/src/balance.rs | 8 + web3_proxy/src/frontend/authorization.rs | 6 +- web3_proxy/src/stats/influxdb_queries.rs | 41 ++-- web3_proxy/src/stats/mod.rs | 231 ++++++++++++----------- 5 files changed, 152 insertions(+), 140 deletions(-) diff --git a/README.md b/README.md index d2c940cd..c7ada706 100644 --- a/README.md +++ b/README.md @@ -47,6 +47,12 @@ Quickly run tests: RUST_LOG=web3_proxy=trace,info cargo nextest run ``` +Run more tests: + +``` +RUST_LOG=web3_proxy=trace,info cargo nextest run --features tests-needing-docker +``` + ## Common commands Create a user: diff --git a/web3_proxy/src/balance.rs b/web3_proxy/src/balance.rs index 503076df..a1dcf702 100644 --- a/web3_proxy/src/balance.rs +++ b/web3_proxy/src/balance.rs @@ -15,6 +15,14 @@ pub struct Balance { } impl Balance { + pub fn active_premium(&self) -> bool { + self.was_ever_premium() && self.total_deposits > self.total_spent_paid_credits + } + + pub fn was_ever_premium(&self) -> bool { + self.user_id != 0 && self.total_deposits >= Decimal::from(10) + } + pub fn remaining(&self) -> Decimal { self.total_deposits - self.total_spent_paid_credits } diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index a0f7126e..5b56182a 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -1253,13 +1253,11 @@ impl Web3ProxyApp { // TODO: Do the logic here, as to how to treat the user, based on balance and initial check // Clear the cache (not the login!) in the stats if a tier-change happens (clear, but don't modify roles) if let Some(downgrade_user_tier) = user_tier_model.downgrade_tier_id { - let balance = latest_balance.read().await.clone(); + let active_premium = latest_balance.read().await.active_premium(); // only consider the user premium if they have paid at least $10 and have a balance > $.01 // otherwise, set user_tier_model to the downograded tier - if balance.total_deposits < Decimal::from(10) - || balance.remaining() < Decimal::new(1, 2) - { + if !active_premium { // TODO: include boolean to mark that the user is downgraded user_tier_model = user_tier::Entity::find_by_id(downgrade_user_tier) diff --git a/web3_proxy/src/stats/influxdb_queries.rs b/web3_proxy/src/stats/influxdb_queries.rs index f658ff91..be6e3cf4 100644 --- a/web3_proxy/src/stats/influxdb_queries.rs +++ b/web3_proxy/src/stats/influxdb_queries.rs @@ -21,7 +21,6 @@ use fstrings::{f, format_args_f}; use hashbrown::HashMap; use influxdb2::api::query::FluxRecord; use influxdb2::models::Query; -use migration::sea_orm::prelude::Decimal; use migration::sea_orm::{ColumnTrait, EntityTrait, QueryFilter}; use serde_json::json; use tracing::{debug, error, trace, warn}; @@ -59,7 +58,6 @@ pub async fn query_user_stats<'a>( .unwrap_or_else(|| caller_user.as_ref().map(|x| x.id).unwrap_or_default()); // Only allow stats if the user has an active premium role - // TODO: move this to a helper. it should be simple to check that a user has an active premium account if let Some(caller_user) = &caller_user { // get the balance of the user whose stats we are going to fetch (might be self, but might be another user) let balance = match Balance::try_from_db(db_replica.as_ref(), user_id).await? { @@ -71,29 +69,28 @@ pub async fn query_user_stats<'a>( Some(x) => x, }; - // TODO: We should add the threshold that determines if a user is premium into app.config. hard coding to $10 works for now - // TODO: @Bryan this condition seems off, can you double-check? - if balance.total_deposits < Decimal::from(10) || balance.remaining() <= Decimal::from(0) { - // get the user tier so we can see if it is a tier that has downgrades - let relevant_balance_user_tier_id = if user_id == caller_user.id { - caller_user.user_tier_id - } else { - let user = user::Entity::find_by_id(user_id) - .one(db_replica.as_ref()) - .await? - .web3_context("user_id not found")?; - - user.user_tier_id - }; - let user_tier = user_tier::Entity::find_by_id(relevant_balance_user_tier_id) + // get the user tier so we can see if it is a tier that has downgrades + let relevant_balance_user_tier_id = if user_id == caller_user.id { + // use the caller's tier + caller_user.user_tier_id + } else { + // use the tier of the primary user from a secondary user + let user = user::Entity::find_by_id(user_id) .one(db_replica.as_ref()) .await? - .web3_context("user_tier not found")?; + .web3_context("user_id not found")?; - if user_tier.downgrade_tier_id.is_some() { - trace!(%user_id, "User does not have enough balance to qualify for premium"); - return Err(Web3ProxyError::PaymentRequired); - } + user.user_tier_id + }; + + let user_tier = user_tier::Entity::find_by_id(relevant_balance_user_tier_id) + .one(db_replica.as_ref()) + .await? + .web3_context("user_tier not found")?; + + if user_tier.downgrade_tier_id.is_some() && !balance.active_premium() { + trace!(%user_id, "User does not have enough balance to qualify for premium"); + return Err(Web3ProxyError::PaymentRequired); } if user_id != caller_user.id { diff --git a/web3_proxy/src/stats/mod.rs b/web3_proxy/src/stats/mod.rs index bc70ecb2..bcde95d1 100644 --- a/web3_proxy/src/stats/mod.rs +++ b/web3_proxy/src/stats/mod.rs @@ -20,9 +20,9 @@ use influxdb2::models::DataPoint; use migration::sea_orm::prelude::Decimal; use migration::sea_orm::{ self, ActiveModelTrait, ColumnTrait, DatabaseConnection, DbConn, EntityTrait, IntoActiveModel, - QueryFilter, TransactionTrait, + QueryFilter, QuerySelect, TransactionTrait, }; -use migration::{Expr, OnConflict}; +use migration::{Expr, LockType, OnConflict}; use num_traits::ToPrimitive; use parking_lot::Mutex; use std::borrow::Cow; @@ -33,7 +33,7 @@ use std::str::FromStr; use std::sync::atomic::Ordering; use std::sync::Arc; use tokio::sync::RwLock as AsyncRwLock; -use tracing::{error, info, trace}; +use tracing::{error, trace}; use crate::balance::Balance; pub use stat_buffer::{SpawnedStatBuffer, StatBuffer}; @@ -221,16 +221,18 @@ impl BufferedRpcQueryStats { chain_id: u64, db_conn: &DatabaseConnection, key: &RpcQueryKey, - user_balance: Decimal, + active_premium: bool, ) -> Web3ProxyResult { let period_datetime = Utc.timestamp_opt(key.response_timestamp, 0).unwrap(); - // Because reading the balance and updating the stats here is not atomically locked, this may lead to a negative balance - // This negative balance shouldn't be large tough - let paid_credits_used = if self.sum_credits_used >= user_balance { - user_balance - } else { + // // Because reading the balance and updating the stats here is not atomically locked, this may lead to a negative balance + // // This negative balance shouldn't be large tough + // // TODO: I'm not so sure about this. @david can you explain more? if someone spends over their balance, they **should** go slightly negative. after all, they would have received the premium limits for these queries + // // sum_credits_used is definitely correct. the balance can be slightly off. so it seems like we should trust sum_credits_used over balance + let paid_credits_used = if active_premium { self.sum_credits_used + } else { + 0.into() }; // =============================== // @@ -319,7 +321,7 @@ impl BufferedRpcQueryStats { .exec(db_conn) .await?; - Ok(paid_credits_used) + Ok(self.sum_credits_used) } // TODO: This is basically a duplicate with the balance_checks, except the database @@ -329,9 +331,14 @@ impl BufferedRpcQueryStats { user_id: u64, user_balance_cache: &UserBalanceCache, db_conn: &DbConn, - ) -> Arc> { + ) -> Web3ProxyResult>> { + if user_id == 0 { + return Ok(Arc::new(AsyncRwLock::new(Balance::default()))); + } + trace!("Will get it from the balance cache"); - user_balance_cache + + let x = user_balance_cache .try_get_with(user_id, async { let x = match Balance::try_from_db(db_conn, user_id).await? { Some(x) => x, @@ -339,12 +346,9 @@ impl BufferedRpcQueryStats { }; Ok(Arc::new(AsyncRwLock::new(x))) }) - .await - .unwrap_or_else(|err| { - error!("Could not find balance for user !{}", err); - // We are just instantiating this for type-safety's sake - Arc::new(AsyncRwLock::new(Balance::default())) - }) + .await?; + + Ok(x) } // TODO: take a db transaction instead so that we can batch? @@ -365,18 +369,19 @@ impl BufferedRpcQueryStats { ))); } + // TODO: rename to owner_id? let sender_user_id = key.rpc_key_user_id.map_or(0, |x| x.get()); // Gathering cache and database rows let user_balance = self ._get_user_balance(sender_user_id, user_balance_cache, db_conn) - .await; + .await?; let mut user_balance = user_balance.write().await; // First of all, save the statistics to the database: let paid_credits_used = self - ._save_db_stats(chain_id, db_conn, &key, user_balance.remaining()) + ._save_db_stats(chain_id, db_conn, &key, user_balance.active_premium()) .await?; // No need to continue if no credits were used @@ -387,16 +392,13 @@ impl BufferedRpcQueryStats { // Update and possible invalidate rpc caches if necessary (if there was a downgrade) { - let balance_before = user_balance.remaining(); - info!("Balance before is {:?}", balance_before); + let premium_before = user_balance.active_premium(); + user_balance.total_spent_paid_credits += paid_credits_used; - // Invalidate caches if remaining is below a threshold + // Invalidate caches if remaining is getting close to $0 // It will be re-fetched again if needed - if sender_user_id != 0 - && balance_before > Decimal::from(10) - && user_balance.remaining() < Decimal::from(10) - { + if premium_before && user_balance.remaining() < Decimal::from(1) { let rpc_keys = rpc_key::Entity::find() .filter(rpc_key::Column::UserId.eq(sender_user_id)) .all(db_conn) @@ -411,100 +413,101 @@ impl BufferedRpcQueryStats { } } - // Start a transaction - let txn = db_conn.begin().await?; + if user_balance.active_premium() { + // Start a transaction + let txn = db_conn.begin().await?; - // Apply all the referral logic; let's keep it simple and flat for now - // Calculate if we are above the usage threshold, and apply a bonus - // Optimally we would read this from the balance, but if we do it like this, we only have to lock a single table (much safer w.r.t. deadlocks) - // referral_entity.credits_applied_for_referrer * (Decimal::from(10) checks (atomically using this table only), whether the user has brought in >$100 to the referer - // In this case, the sender receives $100 as a bonus / gift - // Apply a 10$ bonus onto the user, if the user has spent 100$ - match referee::Entity::find() - .filter(referee::Column::UserId.eq(sender_user_id)) - .find_also_related(referrer::Entity) - .one(&txn) - .await? - { - Some((referral_entity, Some(referrer))) => { - // Get the balance for the referrer, see if they're premium or not - let referrer_balance = self - ._get_user_balance(referrer.user_id, user_balance_cache, db_conn) - .await; - // Just to keep locking simple, keep things where they are - let referrer_balance = referrer_balance.read().await; - let mut new_referee_entity: referee::ActiveModel; + // Apply all the referral logic; let's keep it simple and flat for now + // Calculate if we are above the usage threshold, and apply a bonus + // Optimally we would read this from the balance, but if we do it like this, we only have to lock a single table (much safer w.r.t. deadlocks) + // referral_entity.credits_applied_for_referrer * (Decimal::from(10) checks (atomically using this table only), whether the user has brought in >$100 to the referer + // In this case, the sender receives $100 as a bonus / gift + // Apply a 10$ bonus onto the user, if the user has spent 100$ + match referee::Entity::find() + .filter(referee::Column::UserId.eq(sender_user_id)) + .find_also_related(referrer::Entity) + .lock(LockType::Update) + .one(&txn) + .await? + { + Some((referral_entity, Some(referrer))) => { + // Get the balance for the referrer, see if they're premium or not + let referrer_balance = self + ._get_user_balance(referrer.user_id, user_balance_cache, db_conn) + .await?; - let bonus_for_user_threshold = Decimal::from(100); - let bonus_for_user = Decimal::from(10); + // Just to keep locking simple, read and clone. if the value is slightly delayed, that is okay + let referrer_balance = referrer_balance.read().await.clone(); - // Provide one-time bonus to user, if more than 100$ was spent, - // and if the one-time bonus was not already provided - if referral_entity.one_time_bonus_applied_for_referee.is_zero() - && (referral_entity.credits_applied_for_referrer * Decimal::from(10) - + self.sum_credits_used) - >= bonus_for_user_threshold - { - trace!("Adding sender bonus balance"); - // TODO: Should it be Unchanged, or do we have to load all numbers there again .. - let mut new_referral_entity = referral_entity.clone().into_active_model(); - new_referral_entity.one_time_bonus_applied_for_referee = - sea_orm::Set(bonus_for_user); - // Update the cache - { - // Also no need to invalidate the cache here, just by itself - user_balance.total_deposits += bonus_for_user; + // Apply the bonuses only if they have the necessary premium statuses + if referrer_balance.was_ever_premium() { + // spend $100 + let bonus_for_user_threshold = Decimal::from(100); + // get $10 + let bonus_for_user = Decimal::from(10); + + let referral_start_date = referral_entity.referral_start_date; + + let mut referral_entity = referral_entity.into_active_model(); + + // Provide one-time bonus to user, if more than 100$ was spent, + // and if the one-time bonus was not already provided + // TODO: make sure that if we change the bonus from 10%, we also change this multiplication of 10! + if referral_entity + .one_time_bonus_applied_for_referee + .as_ref() + .is_zero() + && (referral_entity.credits_applied_for_referrer.as_ref() + * Decimal::from(10) + + self.sum_credits_used) + >= bonus_for_user_threshold + { + trace!("Adding sender bonus balance"); + + referral_entity.one_time_bonus_applied_for_referee = + sea_orm::Set(bonus_for_user); + // Update the cache + user_balance.total_deposits += bonus_for_user; + } + + let now = Utc::now(); + let valid_until = + DateTime::::from_utc(referral_start_date, Utc) + Months::new(12); + + // If the referrer ever had premium, provide credits to them + // Also only works if the referrer referred the person less than 1 year ago + // TODO: Perhaps let's not worry about the referral cache here, to avoid deadlocks (hence only reading) + + if now <= valid_until { + let referrer_bonus = self.sum_credits_used / Decimal::from(10); + referral_entity.credits_applied_for_referrer = sea_orm::Set( + referral_entity.credits_applied_for_referrer.as_ref() + + referrer_bonus, + ); + // No need to invalidate the referrer every single time; + // this is no major change and can wait for a bit + // Let's not worry about the referrer balance bcs possibility of deadlock + // referrer_balance.total_deposits += referrer_bonus; + } + + // The resulting field will not be read again, so I will not try to turn the ActiveModel into a Model one + referral_entity.save(&txn).await?; } - // The resulting field will not be read again, so I will not try to turn the ActiveModel into a Model one - new_referral_entity = new_referral_entity.save(&txn).await?; } - - // Apply the bonus to the referrer if they are premium right now - { - let now = Utc::now(); - let valid_until = - DateTime::::from_utc(referral_entity.referral_start_date, Utc) - + Months::new(12); - - // There must be a conflict, if there isn't than there's a clear bug! - // If the referrer has more than $10, provide credits to them - // Also only works if the referrer referred the person less than 1 year ago - // TODO: Perhaps let's not worry about the referral cache here, to avoid deadlocks (hence only reading) - let referrer_bonus = self.sum_credits_used / Decimal::from(10); - if referrer_balance.remaining() > Decimal::from(10) && now <= valid_until { - referee::Entity::insert(referral_entity.into_active_model()) - .on_conflict( - OnConflict::new() - .values([( - // Provide credits to referrer - referee::Column::CreditsAppliedForReferrer, - Expr::col(referee::Column::CreditsAppliedForReferrer) - .add(referrer_bonus), - )]) - .to_owned(), - ) - .exec(&txn) - .await?; - } - // No need to invalidate the referrer every single time; - // this is no major change and can wait for a bit - // Let's not worry about the referrer balance bcs possibility of deadlock - // referrer_balance.total_deposits += referrer_bonus; + Some((referee, None)) => { + error!( + ?referee, + "No referrer code found for this referrer, this should never happen!", + ); } - } - Some((referee, None)) => { - error!( - "No referrer code found for this referrer, this should never happen! {:?}", - referee - ); - } - _ => {} - }; + _ => {} + }; - // Finally commit the transaction in the database - txn.commit() - .await - .context("Failed to update referral and balance updates")?; + // Finally commit the transaction in the database + txn.commit() + .await + .context("Failed to update referral and balance updates")?; + } Ok(()) }