From b527f5d0d6cfd2f7951bf26a77bbd58bc68dc9ba Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Sun, 9 Jul 2023 19:23:32 -0700 Subject: [PATCH] David/remove balance logic non destructive (#173) * WIP will work on locks * added concurrent test for referral, numbers rn still add up * will change balance table to be a view instead of a table * merged in devel. will proceed with migrations for rpc_stats_v2 and referral * WIP * WIP * WIP compiling * gotta add the balance_v2 view * about to add balance view * stupid missing closing param * compiles again, now i will just have to implement the raw sql * need to paste the real query and play with it * ok now on to testing * addresses many comments in PR * app works for simple unauthorized access. will look into tests * some tests pass, referral logic tests fail * not sure why the test is failing --------- Co-authored-by: yenicelik --- entities/src/referee.rs | 3 +- entities/src/rpc_accounting_v2.rs | 2 + example.sql | 28 + migration/src/lib.rs | 4 + ...51756_rpc_accounting_free_usage_credits.rs | 41 ++ ...131_referral_track_one_time_bonus_bonus.rs | 49 ++ scripts/manual-tests/48-balance-downgrade.sh | 32 +- web3_proxy/src/balance.rs | 83 +++ web3_proxy/src/caches.rs | 7 +- web3_proxy/src/config.rs | 4 + web3_proxy/src/frontend/admin.rs | 24 +- web3_proxy/src/frontend/authorization.rs | 94 +-- .../src/frontend/users/authentication.rs | 18 +- web3_proxy/src/frontend/users/payment.rs | 72 +-- .../src/frontend/users/payment_stripe.rs | 28 +- web3_proxy/src/frontend/users/referral.rs | 8 +- web3_proxy/src/frontend/users/subuser.rs | 6 - web3_proxy/src/lib.rs | 1 + web3_proxy/src/stats/influxdb_queries.rs | 23 +- web3_proxy/src/stats/mod.rs | 593 ++++++------------ web3_proxy/src/stats/stat_buffer.rs | 10 +- web3_proxy/tests/test_users.rs | 399 +++++++++++- 22 files changed, 886 insertions(+), 643 deletions(-) create mode 100644 example.sql create mode 100644 migration/src/m20230708_151756_rpc_accounting_free_usage_credits.rs create mode 100644 migration/src/m20230708_152131_referral_track_one_time_bonus_bonus.rs create mode 100644 web3_proxy/src/balance.rs diff --git a/entities/src/referee.rs b/entities/src/referee.rs index 018527e7..0a42cbb9 100644 --- a/entities/src/referee.rs +++ b/entities/src/referee.rs @@ -8,7 +8,8 @@ use serde::{Deserialize, Serialize}; pub struct Model { #[sea_orm(primary_key)] pub id: i32, - pub credits_applied_for_referee: bool, + #[sea_orm(column_type = "Decimal(Some((20, 10)))")] + pub one_time_bonus_applied_for_referee: Decimal, #[sea_orm(column_type = "Decimal(Some((20, 10)))")] pub credits_applied_for_referrer: Decimal, pub referral_start_date: DateTime, diff --git a/entities/src/rpc_accounting_v2.rs b/entities/src/rpc_accounting_v2.rs index 324154c4..1a67ccfc 100644 --- a/entities/src/rpc_accounting_v2.rs +++ b/entities/src/rpc_accounting_v2.rs @@ -24,6 +24,8 @@ pub struct Model { pub sum_response_bytes: u64, #[sea_orm(column_type = "Decimal(Some((20, 10)))")] pub sum_credits_used: Decimal, + #[sea_orm(column_type = "Decimal(Some((20, 10)))")] + pub sum_incl_free_credits_used: Decimal, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] diff --git a/example.sql b/example.sql new file mode 100644 index 00000000..51624ace --- /dev/null +++ b/example.sql @@ -0,0 +1,28 @@ +SELECT + user.id AS user_id, + COALESCE(SUM(admin_receipt.amount), 0) + COALESCE(SUM(chain_receipt.amount), 0) + COALESCE(SUM(stripe_receipt.amount), 0) + COALESCE(SUM(referee.one_time_bonus_applied_for_referee), 0) + COALESCE(referrer_bonus.total_bonus, 0) AS total_deposits, + COALESCE(SUM(accounting.sum_credits_used), 0) AS total_spent_including_free_tier, + COALESCE(SUM(accounting.sum_incl_free_credits_used), 0) AS total_spent_outside_free_tier +FROM + user + LEFT JOIN + admin_increase_balance_receipt AS admin_receipt ON user.id = admin_receipt.deposit_to_user_id + LEFT JOIN + increase_on_chain_balance_receipt AS chain_receipt ON user.id = chain_receipt.deposit_to_user_id + LEFT JOIN + stripe_increase_balance_receipt AS stripe_receipt ON user.id = stripe_receipt.deposit_to_user_id + LEFT JOIN + referee ON user.id = referee.user_id + LEFT JOIN + (SELECT referrer.user_id, SUM(referee.credits_applied_for_referrer) AS total_bonus + FROM referrer + JOIN referee ON referrer.id = referee.used_referral_code + GROUP BY referrer.user_id) AS referrer_bonus ON user.id = referrer_bonus.user_id + LEFT JOIN + rpc_key ON user.id = rpc_key.user_id + LEFT JOIN + rpc_accounting_v2 AS accounting ON rpc_key.id = accounting.rpc_key_id + LEFT JOIN + user_tier ON user.user_tier_id = user_tier.id +WHERE + user.id = 1; \ No newline at end of file diff --git a/migration/src/lib.rs b/migration/src/lib.rs index bfab2ca8..1d2cd626 100644 --- a/migration/src/lib.rs +++ b/migration/src/lib.rs @@ -35,6 +35,8 @@ mod m20230619_172237_default_tracking; mod m20230622_104142_stripe_deposits; mod m20230705_214013_type_fixes; mod m20230707_211936_premium_tier_changes; +mod m20230708_151756_rpc_accounting_free_usage_credits; +mod m20230708_152131_referral_track_one_time_bonus_bonus; pub struct Migrator; @@ -77,6 +79,8 @@ impl MigratorTrait for Migrator { Box::new(m20230622_104142_stripe_deposits::Migration), Box::new(m20230705_214013_type_fixes::Migration), Box::new(m20230707_211936_premium_tier_changes::Migration), + Box::new(m20230708_151756_rpc_accounting_free_usage_credits::Migration), + Box::new(m20230708_152131_referral_track_one_time_bonus_bonus::Migration), ] } } diff --git a/migration/src/m20230708_151756_rpc_accounting_free_usage_credits.rs b/migration/src/m20230708_151756_rpc_accounting_free_usage_credits.rs new file mode 100644 index 00000000..ce10403f --- /dev/null +++ b/migration/src/m20230708_151756_rpc_accounting_free_usage_credits.rs @@ -0,0 +1,41 @@ +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .alter_table( + Table::alter() + .table(RpcAccountingV2::Table) + .add_column( + ColumnDef::new(RpcAccountingV2::SumInclFreeCreditsUsed) + .decimal_len(20, 10) + .default("0.0") + .not_null(), + ) + .to_owned(), + ) + .await + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .alter_table( + sea_query::Table::alter() + .table(RpcAccountingV2::Table) + .drop_column(RpcAccountingV2::SumInclFreeCreditsUsed) + .to_owned(), + ) + .await + } +} + +/// Learn more at https://docs.rs/sea-query#iden +#[derive(Iden)] +enum RpcAccountingV2 { + Table, + SumInclFreeCreditsUsed, +} diff --git a/migration/src/m20230708_152131_referral_track_one_time_bonus_bonus.rs b/migration/src/m20230708_152131_referral_track_one_time_bonus_bonus.rs new file mode 100644 index 00000000..b0faecaf --- /dev/null +++ b/migration/src/m20230708_152131_referral_track_one_time_bonus_bonus.rs @@ -0,0 +1,49 @@ +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .alter_table( + Table::alter() + .table(Referee::Table) + .drop_column(Referee::CreditsAppliedForReferee) + .add_column( + ColumnDef::new(Referee::OneTimeBonusAppliedForReferee) + .decimal_len(20, 10) + .default("0.0") + .not_null(), + ) + .to_owned(), + ) + .await + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .alter_table( + Table::alter() + .table(Referee::Table) + .drop_column(Referee::OneTimeBonusAppliedForReferee) + .add_column( + ColumnDef::new(Referee::CreditsAppliedForReferee) + .boolean() + .not_null(), + ) + .to_owned(), + ) + .await + } +} + +/// Learn more at https://docs.rs/sea-query#iden +#[derive(Iden)] +enum Referee { + Table, + Id, + CreditsAppliedForReferee, + OneTimeBonusAppliedForReferee, +} diff --git a/scripts/manual-tests/48-balance-downgrade.sh b/scripts/manual-tests/48-balance-downgrade.sh index 0463a339..c18425cd 100644 --- a/scripts/manual-tests/48-balance-downgrade.sh +++ b/scripts/manual-tests/48-balance-downgrade.sh @@ -24,14 +24,14 @@ curl -X POST http://127.0.0.1:8544/user/login \ -H 'Content-Type: application/json' \ -d '{ "address": "0xeb3e928a2e54be013ef8241d4c9eaf4dfae94d5a", - "msg": "0x6c6c616d616e6f6465732e636f6d2077616e747320796f7520746f207369676e20696e207769746820796f757220457468657265756d206163636f756e743a0a3078654233453932384132453534424530313345463832343164344339456146344466414539344435610a0af09fa699f09fa699f09fa699f09fa699f09fa6990a0a5552493a2068747470733a2f2f6c6c616d616e6f6465732e636f6d2f0a56657273696f6e3a20310a436861696e2049443a20310a4e6f6e63653a2030314759513445564731474b34314b42364130324a344b45384b0a4973737565642041743a20323032332d30342d32335431333a32323a30392e3533373932365a0a45787069726174696f6e2054696d653a20323032332d30342d32335431333a34323a30392e3533373932365a", - "sig": "52071cc59afb427eb554126f4f9f2a445c2a539783ba45079ccc0911197062f135d6d347cf0c38fa078dc2369c32b5131b86811fc0916786d1e48252163f58131c", + "msg": "0x6c6c616d616e6f6465732e636f6d2077616e747320796f7520746f207369676e20696e207769746820796f757220457468657265756d206163636f756e743a0a3078654233453932384132453534424530313345463832343164344339456146344466414539344435610a0af09fa699f09fa699f09fa699f09fa699f09fa6990a0a5552493a2068747470733a2f2f6c6c616d616e6f6465732e636f6d2f0a56657273696f6e3a20310a436861696e2049443a203133370a4e6f6e63653a2030314834594e564b4b4a4e385737474e4b585643364137505a5a0a4973737565642041743a20323032332d30372d31305430313a31353a30352e3230323638375a0a45787069726174696f6e2054696d653a20323032332d30372d31305430313a33353a30352e3230323638375a", + "sig": "de9a7c024b9fac2b46e01a5397a74b1c479f2c2fd72f674133c3b6b3e5e569981d3f9d5551fe73c52e70173b2ea82c8bc0614b538f5c74179c4cbaa2bdab3f8e1b", "version": "3", "signer": "MEW" }' -# bearer token is: 01GYQ4FMRKKWJEA2YBST3B89MJ -# scret key is: 01GYQ4FMNX9EMFBT43XEFGZV1K +# bearer token is: 01H4YP4AW35DBMW9CXJXTE7MBA +# scret key is: 01H4YP4AVSZGZT0WXCSMMZ1MEH ########################################### # Initially check balance, it should be 0 @@ -39,7 +39,7 @@ curl -X POST http://127.0.0.1:8544/user/login \ # Check the balance of the user # Balance seems to be returning properly (0, in this test case) curl \ --H "Authorization: Bearer 01GYQ4FMRKKWJEA2YBST3B89MJ" \ +-H "Authorization: Bearer 01H4YP4AW35DBMW9CXJXTE7MBA" \ -X GET "127.0.0.1:8544/user/balance" @@ -48,8 +48,8 @@ curl \ # and submits it on the endpoint ########################################### curl \ --H "Authorization: Bearer 01GYQ4FMRKKWJEA2YBST3B89MJ" \ --X GET "127.0.0.1:8544/user/balance/0x749788a5766577431a0a4fc8721fd7cb981f55222e073ed17976f0aba5e8818a" +-H "Authorization: Bearer 01H4YP4AW35DBMW9CXJXTE7MBA" \ +-X POST "127.0.0.1:8544/user/balance/0x749788a5766577431a0a4fc8721fd7cb981f55222e073ed17976f0aba5e8818a" ########################################### # Check the balance again, it should have increased according to how much USDC was spent @@ -57,32 +57,20 @@ curl \ # Check the balance of the user # Balance seems to be returning properly (0, in this test case) curl \ --H "Authorization: Bearer 01GYQ4FMRKKWJEA2YBST3B89MJ" \ +-H "Authorization: Bearer 01H4YP4AW35DBMW9CXJXTE7MBA" \ -X GET "127.0.0.1:8544/user/balance" # Get the RPC key curl \ -X GET "127.0.0.1:8544/user/keys" \ - -H "Authorization: Bearer 01GYQ4FMRKKWJEA2YBST3B89MJ" + -H "Authorization: Bearer 01H4YP4AW35DBMW9CXJXTE7MBA" ## Check if calling an RPC endpoint logs the stats ## This one does already even it seems for i in {1..100000} do curl \ - -X POST "127.0.0.1:8544/rpc/01GZHMCXGXT5Z4M8SCKCMKDAZ6" \ + -X POST "127.0.0.1:8544/rpc/01H4YP4AVSZGZT0WXCSMMZ1MEH" \ -H "Content-Type: application/json" \ --data '{"method":"eth_blockNumber","params":[],"id":1,"jsonrpc":"2.0"}' done - -for i in {1..10000} -do - curl \ - -X POST "127.0.0.1:8544/" \ - -H "Content-Type: application/json" \ - --data '{"method":"eth_blockNumber","params":[],"id":1,"jsonrpc":"2.0"}' -done - - -# TODO: Now implement and test withdrawal - diff --git a/web3_proxy/src/balance.rs b/web3_proxy/src/balance.rs new file mode 100644 index 00000000..9a2fcd19 --- /dev/null +++ b/web3_proxy/src/balance.rs @@ -0,0 +1,83 @@ +use crate::errors::{Web3ProxyResponse, Web3ProxyResult}; +use fstrings::{f, format_args_f}; +use migration::sea_orm::{ + DbBackend, DbConn, FromQueryResult, JsonValue, QueryResult, SqlxMySqlPoolConnection, Statement, +}; +use migration::{sea_orm, ConnectionTrait}; +use rust_decimal::Decimal; +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use std::num::NonZeroU64; +use tracing::info; + +/// Implements the balance getter +#[derive(Clone, Debug, Default, Serialize, Deserialize, FromQueryResult)] +pub struct Balance { + pub user_id: u64, + pub total_spent_paid_credits: Decimal, + pub total_spent: Decimal, + pub total_deposits: Decimal, +} + +impl Balance { + pub fn remaining(&self) -> Decimal { + self.total_deposits - self.total_spent_paid_credits + } +} + +pub async fn try_get_balance_from_db( + db_conn: &DbConn, + user_id: u64, +) -> Web3ProxyResult> { + // Return early if user_id == 0 + if user_id == 0 { + return Ok(None); + } + + // Injecting the variable directly, should be fine because Rust is typesafe, especially with primitives + let raw_sql = f!(r#" + SELECT + user.id AS user_id, + COALESCE(SUM(admin_receipt.amount), 0) + COALESCE(SUM(chain_receipt.amount), 0) + COALESCE(SUM(stripe_receipt.amount), 0) + COALESCE(SUM(referee.one_time_bonus_applied_for_referee), 0) + COALESCE(referrer_bonus.total_bonus, 0) AS total_deposits, + COALESCE(SUM(accounting.sum_credits_used), 0) AS total_spent_paid_credits, + COALESCE(SUM(accounting.sum_incl_free_credits_used), 0) AS total_spent + FROM + user + LEFT JOIN + admin_increase_balance_receipt AS admin_receipt ON user.id = admin_receipt.deposit_to_user_id + LEFT JOIN + increase_on_chain_balance_receipt AS chain_receipt ON user.id = chain_receipt.deposit_to_user_id + LEFT JOIN + stripe_increase_balance_receipt AS stripe_receipt ON user.id = stripe_receipt.deposit_to_user_id + LEFT JOIN + referee ON user.id = referee.user_id + LEFT JOIN + (SELECT referrer.user_id, SUM(referee.credits_applied_for_referrer) AS total_bonus + FROM referrer + JOIN referee ON referrer.id = referee.used_referral_code + GROUP BY referrer.user_id) AS referrer_bonus ON user.id = referrer_bonus.user_id + LEFT JOIN + rpc_key ON user.id = rpc_key.user_id + LEFT JOIN + rpc_accounting_v2 AS accounting ON rpc_key.id = accounting.rpc_key_id + LEFT JOIN + user_tier ON user.user_tier_id = user_tier.id + WHERE + user.id = {}; + "#, user_id).to_string(); + + let balance: Balance = match Balance::find_by_statement(Statement::from_string( + DbBackend::MySql, + raw_sql, + // [.into()], + )) + .one(db_conn) + .await? + { + None => return Ok(None), + Some(x) => x, + }; + + // Return None if there is no entry + Ok(Some(balance)) +} diff --git a/web3_proxy/src/caches.rs b/web3_proxy/src/caches.rs index bf30f47e..627c1ae5 100644 --- a/web3_proxy/src/caches.rs +++ b/web3_proxy/src/caches.rs @@ -1,15 +1,16 @@ -use crate::frontend::authorization::{AuthorizationChecks, Balance, RpcSecretKey}; +use crate::balance::Balance; +use crate::frontend::authorization::{AuthorizationChecks, RpcSecretKey}; use moka::future::Cache; -use parking_lot::RwLock; use std::fmt; use std::net::IpAddr; use std::num::NonZeroU64; use std::sync::Arc; +use tokio::sync::RwLock as AsyncRwLock; /// Cache data from the database about rpc keys pub type RpcSecretKeyCache = Cache; /// Cache data from the database about user balances -pub type UserBalanceCache = Cache>>; +pub type UserBalanceCache = Cache>>; #[derive(Clone, Copy, Hash, Eq, PartialEq)] pub struct RegisteredUserRateLimitKey(pub u64, pub IpAddr); diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index d69d7563..74c7ff02 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -6,6 +6,7 @@ use ethers::prelude::{Address, TxHash}; use ethers::types::{U256, U64}; use hashbrown::HashMap; use migration::sea_orm::DatabaseConnection; +use rust_decimal::Decimal; use sentry::types::Dsn; use serde::Deserialize; use serde_inline_default::serde_inline_default; @@ -70,6 +71,9 @@ pub struct AppConfig { #[serde_inline_default(1u64)] pub chain_id: u64, + /// Cost per computational unit + // pub cost_per_cu: Decimal, + /// Database is used for user data. /// Currently supports mysql or compatible backend. pub db_url: Option, diff --git a/web3_proxy/src/frontend/admin.rs b/web3_proxy/src/frontend/admin.rs index 9eca41a4..cbcec1f4 100644 --- a/web3_proxy/src/frontend/admin.rs +++ b/web3_proxy/src/frontend/admin.rs @@ -3,6 +3,7 @@ use super::authorization::login_is_authorized; use crate::admin_queries::query_admin_modify_usertier; use crate::app::Web3ProxyApp; +use crate::caches::UserBalanceCache; use crate::errors::Web3ProxyResponse; use crate::errors::{Web3ProxyError, Web3ProxyErrorContext}; use crate::frontend::users::authentication::PostLogin; @@ -83,28 +84,11 @@ pub async fn admin_increase_balance( ..Default::default() }; increase_balance_receipt.save(&txn).await?; - - // update balance - let balance_entry = balance::ActiveModel { - total_deposits: sea_orm::Set(payload.amount), - user_id: sea_orm::Set(user_entry.id), - ..Default::default() - }; - balance::Entity::insert(balance_entry) - .on_conflict( - OnConflict::new() - .values([( - balance::Column::TotalDeposits, - Expr::col(balance::Column::TotalDeposits).add(payload.amount), - )]) - .to_owned(), - ) - .exec(&txn) - .await - .web3_context("admin is increasing balance")?; - txn.commit().await?; + // Invalidate the user_balance_cache for this user: + app.user_balance_cache.invalidate(&user_entry.id).await; + let out = json!({ "user": payload.user_address, "amount": payload.amount, diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index 85c93f91..6df9b8f4 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -2,6 +2,7 @@ use super::rpc_proxy_ws::ProxyMode; use crate::app::{Web3ProxyApp, APP_USER_AGENT}; +use crate::balance::{try_get_balance_from_db, Balance}; use crate::caches::RegisteredUserRateLimitKey; use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult}; use crate::jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest}; @@ -42,6 +43,7 @@ use std::num::NonZeroU64; use std::sync::atomic::{self, AtomicBool, AtomicI64, AtomicU64, AtomicUsize}; use std::time::Duration; use std::{net::IpAddr, str::FromStr, sync::Arc}; +use tokio::sync::RwLock as AsyncRwLock; use tokio::sync::{OwnedSemaphorePermit, Semaphore}; use tokio::task::JoinHandle; use tokio::time::Instant; @@ -93,20 +95,6 @@ pub enum AuthorizationType { Frontend, } -#[derive(Clone, Debug, Default)] -pub struct Balance { - /// The total USD value deposited. - pub total_deposit: Decimal, - /// The amount spent outside the free tier. - pub total_spend: Decimal, -} - -impl Balance { - pub fn remaining(&self) -> Decimal { - self.total_deposit - self.total_spend - } -} - /// TODO: move this #[derive(Clone, Debug, Default, From)] pub struct AuthorizationChecks { @@ -115,7 +103,7 @@ pub struct AuthorizationChecks { /// TODO: `Option`? they are actual zeroes some places in the db now pub user_id: u64, /// locally cached balance that may drift slightly if the user is on multiple servers - pub latest_balance: Arc>, + pub latest_balance: Arc>, /// the key used (if any) pub rpc_secret_key: Option, /// database id of the rpc key @@ -380,7 +368,7 @@ impl RequestMetadata { /// this may drift slightly if multiple servers are handling the same users, but should be close pub async fn latest_balance(&self) -> Option { if let Some(x) = self.authorization.as_ref() { - let x = x.checks.latest_balance.read().remaining(); + let x = x.checks.latest_balance.read().await.remaining(); Some(x) } else { @@ -1147,60 +1135,24 @@ impl Web3ProxyApp { pub(crate) async fn balance_checks( &self, user_id: u64, - ) -> Web3ProxyResult>> { - match NonZeroU64::try_from(user_id) { - Err(_) => Ok(Arc::new(Default::default())), - Ok(x) => self - .user_balance_cache - .try_get_with(x, async move { - let db_replica = self.db_replica()?; - - loop { - match balance::Entity::find() - .filter(balance::Column::UserId.eq(user_id)) - .one(db_replica.as_ref()) - .await? - { - Some(x) => { - let x = Balance { - total_deposit: x.total_deposits, - total_spend: x.total_spent_outside_free_tier, - }; - trace!("Balance for cache retrieved from database is {:?}", x); - - return Ok(Arc::new(RwLock::new(x))); - } - None => { - // no balance row. make one now - let db_conn = self.db_conn()?; - - let balance_entry = balance::ActiveModel { - id: sea_orm::NotSet, - user_id: sea_orm::Set(user_id), - ..Default::default() - }; - - balance::Entity::insert(balance_entry) - .on_conflict( - OnConflict::new() - .values([( - balance::Column::TotalDeposits, - Expr::col(balance::Column::TotalDeposits).add(0), - )]) - .to_owned(), - ) - .exec(db_conn) - .await - .web3_context("creating empty balance row for existing user")?; - - continue; - } - }; + ) -> Web3ProxyResult>> { + self.user_balance_cache + .try_get_with(user_id, async move { + let db_replica = self.db_replica()?; + let x = match crate::balance::try_get_balance_from_db(db_replica.as_ref(), user_id) + .await? + { + None => { + format!("user_id {:?} has no balance entry", user_id).to_owned(); + Err(Web3ProxyError::InvalidUserKey) } - }) - .await - .map_err(Into::into), - } + Some(x) => Ok(x), + }?; + trace!("Balance for cache retrieved from database is {:?}", x); + Ok(Arc::new(AsyncRwLock::new(x))) + }) + .await + .map_err(Into::into) } // check the local cache for user data, or query the database @@ -1308,11 +1260,11 @@ impl Web3ProxyApp { // TODO: Do the logic here, as to how to treat the user, based on balance and initial check // Clear the cache (not the login!) in the stats if a tier-change happens (clear, but don't modify roles) if let Some(downgrade_user_tier) = user_tier_model.downgrade_tier_id { - let balance = latest_balance.read().clone(); + let balance = latest_balance.read().await.clone(); // only consider the user premium if they have paid at least $10 and have a balance > $.01 // otherwise, set user_tier_model to the downograded tier - if balance.total_deposit < Decimal::from(10) + if balance.total_deposits < Decimal::from(10) || balance.remaining() < Decimal::new(1, 2) { // TODO: include boolean to mark that the user is downgraded diff --git a/web3_proxy/src/frontend/users/authentication.rs b/web3_proxy/src/frontend/users/authentication.rs index 54b33551..ad2a57ae 100644 --- a/web3_proxy/src/frontend/users/authentication.rs +++ b/web3_proxy/src/frontend/users/authentication.rs @@ -189,7 +189,7 @@ pub async fn user_login_get( pub async fn register_new_user( txn: &DatabaseTransaction, address: Address, -) -> anyhow::Result<(user::Model, rpc_key::Model, balance::Model)> { +) -> anyhow::Result<(user::Model, rpc_key::Model)> { // the only thing we need from them is an address // everything else is optional // TODO: different invite codes should allow different levels @@ -217,15 +217,7 @@ pub async fn register_new_user( .await .web3_context("Failed saving new user key")?; - // create an empty balance entry - let user_balance = balance::ActiveModel { - user_id: sea_orm::Set(new_user.id), - ..Default::default() - }; - - let user_balance = user_balance.insert(txn).await?; - - Ok((new_user, user_rpc_key, user_balance)) + Ok((new_user, user_rpc_key)) } /// `POST /user/login` - Register or login by posting a signed "siwe" message. @@ -323,7 +315,7 @@ pub async fn user_login_post( let txn = db_conn.begin().await?; - let (caller, caller_key, _) = register_new_user(&txn, our_msg.address.into()).await?; + let (caller, caller_key) = register_new_user(&txn, our_msg.address.into()).await?; txn.commit().await?; @@ -347,7 +339,7 @@ pub async fn user_login_post( let used_referral = referee::ActiveModel { used_referral_code: sea_orm::Set(user_referrer.id), user_id: sea_orm::Set(caller.id), - credits_applied_for_referee: sea_orm::Set(false), + one_time_bonus_applied_for_referee: sea_orm::Set(Decimal::new(0, 10)), credits_applied_for_referrer: sea_orm::Set(Decimal::new(0, 10)), ..Default::default() }; @@ -380,7 +372,7 @@ pub async fn user_login_post( let used_referral = referee::ActiveModel { used_referral_code: sea_orm::Set(user_referrer.id), user_id: sea_orm::Set(caller.id), - credits_applied_for_referee: sea_orm::Set(false), + one_time_bonus_applied_for_referee: sea_orm::Set(Decimal::new(0, 10)), credits_applied_for_referrer: sea_orm::Set(Decimal::new(0, 10)), ..Default::default() }; diff --git a/web3_proxy/src/frontend/users/payment.rs b/web3_proxy/src/frontend/users/payment.rs index 5155e955..ae42c42f 100644 --- a/web3_proxy/src/frontend/users/payment.rs +++ b/web3_proxy/src/frontend/users/payment.rs @@ -1,4 +1,5 @@ use crate::app::Web3ProxyApp; +use crate::balance::{try_get_balance_from_db, Balance}; use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResponse, Web3ProxyResult}; use crate::frontend::authorization::{ login_is_authorized, Authorization as Web3ProxyAuthorization, @@ -21,17 +22,18 @@ use ethers::abi::AbiEncode; use ethers::types::{Address, Block, TransactionReceipt, TxHash, H256}; use hashbrown::{HashMap, HashSet}; use http::StatusCode; -use migration::LockType; use migration::sea_orm::prelude::Decimal; use migration::sea_orm::{ self, ActiveModelTrait, ActiveValue, ColumnTrait, EntityTrait, IntoActiveModel, ModelTrait, QueryFilter, QuerySelect, TransactionTrait, }; +use migration::LockType; use migration::{Expr, OnConflict}; use payment_contracts::ierc20::IERC20; use payment_contracts::payment_factory::{self, PaymentFactory}; +use rdkafka::bindings::rd_kafka_AclBinding_destroy; use serde_json::json; -use std::num::NonZeroU64; +use std::num::{NonZeroU64, TryFromIntError}; use std::sync::Arc; use tracing::{debug, info, trace}; @@ -51,20 +53,16 @@ pub async fn user_balance_get( let db_replica = app.db_replica()?; - let user_balance_row = balance::Entity::find() - .filter(balance::Column::UserId.eq(user.id)) - .one(db_replica.as_ref()) - .await? - .unwrap_or_default(); - - let user_balance = - user_balance_row.total_deposits - user_balance_row.total_spent_outside_free_tier; + let user_balance = match try_get_balance_from_db(db_replica.as_ref(), user.id).await? { + None => Balance::default(), + Some(x) => x, + }; let response = json!({ - "total_deposits": user_balance_row.total_deposits, - "total_spent_outside_free_tier": user_balance_row.total_spent_outside_free_tier, - "total_spent": user_balance_row.total_spent_including_free_tier, - "balance": user_balance, + "total_deposits": user_balance.total_deposits, + "total_spent_paid_credits": user_balance.total_spent_paid_credits, + "total_spent": user_balance.total_spent, + "balance": user_balance.remaining(), }); // TODO: Gotta create a new table for the spend part @@ -374,7 +372,7 @@ pub async fn user_balance_post( { Some(x) => x, None => { - let (user, _, _) = register_new_user(&txn, recipient_account).await?; + let (user, _) = register_new_user(&txn, recipient_account).await?; user } @@ -391,27 +389,6 @@ pub async fn user_balance_post( payment_token_amount ); - // create or update the balance - let balance_entry = balance::ActiveModel { - id: sea_orm::NotSet, - total_deposits: sea_orm::Set(payment_token_amount), - user_id: sea_orm::Set(recipient.id), - ..Default::default() - }; - trace!("Trying to insert into balance entry: {:?}", balance_entry); - balance::Entity::insert(balance_entry) - .on_conflict( - OnConflict::new() - .values([( - balance::Column::TotalDeposits, - Expr::col(balance::Column::TotalDeposits).add(payment_token_amount), - )]) - .to_owned(), - ) - .exec(&txn) - .await - .web3_context("increasing balance")?; - trace!("Saving log {} of txid {:?}", log_index, tx_hash); let receipt = increase_on_chain_balance_receipt::ActiveModel { id: sea_orm::ActiveValue::NotSet, @@ -433,12 +410,7 @@ pub async fn user_balance_post( .all(&txn) .await?; - match NonZeroU64::try_from(recipient.id) { - Err(_) => {} - Ok(x) => { - app.user_balance_cache.invalidate(&x).await; - } - }; + app.user_balance_cache.invalidate(&recipient.id).await; for rpc_key_entity in rpc_keys { app.rpc_secret_key_cache @@ -544,22 +516,6 @@ pub async fn handle_uncle_block( debug!("removing balances: {:#?}", reversed_balances); - for (user_id, reversed_balance) in reversed_balances.iter() { - if let Some(user_balance) = balance::Entity::find() - .lock(LockType::Update) - .filter(balance::Column::Id.eq(*user_id)) - .one(&txn) - .await? - { - let mut user_balance = user_balance.into_active_model(); - - user_balance.total_deposits = - ActiveValue::Set(user_balance.total_deposits.as_ref() - reversed_balance); - - user_balance.update(&txn).await?; - } - } - txn.commit().await?; Ok(Some(reversed_balances)) diff --git a/web3_proxy/src/frontend/users/payment_stripe.rs b/web3_proxy/src/frontend/users/payment_stripe.rs index 4e6e31d2..6ca6a2ce 100644 --- a/web3_proxy/src/frontend/users/payment_stripe.rs +++ b/web3_proxy/src/frontend/users/payment_stripe.rs @@ -163,27 +163,6 @@ pub async fn user_balance_stripe_post( // Otherwise, also increase the balance ... match recipient { Some(recipient) => { - // Create a balance update as well - let balance_entry = balance::ActiveModel { - id: sea_orm::NotSet, - total_deposits: sea_orm::Set(amount), - user_id: sea_orm::Set(recipient.id), - ..Default::default() - }; - trace!(?balance_entry, "Trying to insert into balance entry"); - balance::Entity::insert(balance_entry) - .on_conflict( - OnConflict::new() - .values([( - balance::Column::TotalDeposits, - Expr::col(balance::Column::TotalDeposits).add(amount), - )]) - .to_owned(), - ) - .exec(&txn) - .await - .web3_context("increasing balance")?; - let _ = insert_receipt_model.save(&txn).await; let recipient_rpc_keys = rpc_key::Entity::find() @@ -194,12 +173,7 @@ pub async fn user_balance_stripe_post( txn.commit().await?; // Finally invalidate the cache as well - match NonZeroU64::try_from(recipient.id) { - Err(_) => {} - Ok(x) => { - app.user_balance_cache.invalidate(&x).await; - } - }; + app.user_balance_cache.invalidate(&recipient.id).await; for rpc_key_entity in recipient_rpc_keys { app.rpc_secret_key_cache .invalidate(&rpc_key_entity.secret_key.into()) diff --git a/web3_proxy/src/frontend/users/referral.rs b/web3_proxy/src/frontend/users/referral.rs index 0e59fbec..29ee5fba 100644 --- a/web3_proxy/src/frontend/users/referral.rs +++ b/web3_proxy/src/frontend/users/referral.rs @@ -92,7 +92,7 @@ pub async fn user_used_referral_stats( // For each related referral person, find the corresponding user-address #[derive(Debug, Serialize)] struct Info { - credits_applied_for_referee: bool, + credits_applied_for_referee: Decimal, credits_applied_for_referrer: Decimal, referral_start_date: DateTime, used_referral_code: String, @@ -109,7 +109,7 @@ pub async fn user_used_referral_stats( // .context("Database error, no foreign key found for referring user")?; let tmp = Info { - credits_applied_for_referee: referral_record.credits_applied_for_referee, + credits_applied_for_referee: referral_record.one_time_bonus_applied_for_referee, credits_applied_for_referrer: referral_record.credits_applied_for_referrer, referral_start_date: referral_record.referral_start_date, used_referral_code: referrer_record.referral_code, @@ -150,7 +150,7 @@ pub async fn user_shared_referral_stats( // collect info about each referral #[derive(Debug, Serialize)] struct Info { - credits_applied_for_referee: bool, + credits_applied_for_referee: Decimal, credits_applied_for_referrer: Decimal, referral_start_date: DateTime, referred_address: Address, @@ -170,7 +170,7 @@ pub async fn user_shared_referral_stats( .context("Database error, no foreign key found for referring user")?; let info = Info { - credits_applied_for_referee: referral_record.credits_applied_for_referee, + credits_applied_for_referee: referral_record.one_time_bonus_applied_for_referee, credits_applied_for_referrer: referral_record.credits_applied_for_referrer, referral_start_date: referral_record.referral_start_date, referred_address: Address::from_slice(&referred_user.address), diff --git a/web3_proxy/src/frontend/users/subuser.rs b/web3_proxy/src/frontend/users/subuser.rs index ced8f6b5..41e614a4 100644 --- a/web3_proxy/src/frontend/users/subuser.rs +++ b/web3_proxy/src/frontend/users/subuser.rs @@ -284,12 +284,6 @@ pub async fn modify_subuser( .await .web3_context("Failed saving new user key")?]; - // We should also create the balance entry ... - let subuser_balance = balance::ActiveModel { - user_id: sea_orm::Set(subuser.id), - ..Default::default() - }; - subuser_balance.insert(&txn).await?; // save the user and key to the database txn.commit().await?; diff --git a/web3_proxy/src/lib.rs b/web3_proxy/src/lib.rs index 48dccd1a..8d00b691 100644 --- a/web3_proxy/src/lib.rs +++ b/web3_proxy/src/lib.rs @@ -3,6 +3,7 @@ pub mod admin_queries; pub mod app; +pub mod balance; pub mod block_number; pub mod caches; pub mod compute_units; diff --git a/web3_proxy/src/stats/influxdb_queries.rs b/web3_proxy/src/stats/influxdb_queries.rs index ee97486b..f39b4ca1 100644 --- a/web3_proxy/src/stats/influxdb_queries.rs +++ b/web3_proxy/src/stats/influxdb_queries.rs @@ -1,4 +1,5 @@ use super::StatType; +use crate::balance::{try_get_balance_from_db, Balance}; use crate::errors::Web3ProxyErrorContext; use crate::{ app::Web3ProxyApp, @@ -61,22 +62,18 @@ pub async fn query_user_stats<'a>( // TODO: move this to a helper. it should be simple to check that a user has an active premium account if let Some(caller_user) = &caller_user { // get the balance of the user whose stats we are going to fetch (might be self, but might be another user) - let (total_deposits, total_spent) = match balance::Entity::find() - .filter(balance::Column::UserId.eq(user_id)) - .one(db_replica.as_ref()) - .await? - { - Some(user_balance) => ( - user_balance.total_deposits, - user_balance.total_spent_outside_free_tier, - ), - None => (0.into(), 0.into()), + let balance = match try_get_balance_from_db(db_replica.as_ref(), user_id).await? { + None => { + return Err(Web3ProxyError::AccessDenied( + "User Stats Response requires you to authorize with a bearer token".into(), + )); + } + Some(x) => x, }; - let balance_remaining = total_deposits - total_spent; - // TODO: We should add the threshold that determines if a user is premium into app.config. hard coding to $10 works for now - if total_deposits < Decimal::from(10) || balance_remaining <= Decimal::from(0) { + // TODO: @Bryan this condition seems off, can you double-check? + if balance.total_deposits < Decimal::from(10) || balance.remaining() <= Decimal::from(0) { // get the user tier so we can see if it is a tier that has downgrades let relevant_balance_user_tier_id = if user_id == caller_user.id { caller_user.user_tier_id diff --git a/web3_proxy/src/stats/mod.rs b/web3_proxy/src/stats/mod.rs index 0a1cb899..56b59d3a 100644 --- a/web3_proxy/src/stats/mod.rs +++ b/web3_proxy/src/stats/mod.rs @@ -16,23 +16,31 @@ use axum::headers::Origin; use chrono::{DateTime, Months, TimeZone, Utc}; use derive_more::From; use entities::{balance, referee, referrer, rpc_accounting_v2, rpc_key}; +use futures::TryFutureExt; +use hyper::body::Buf; use influxdb2::models::DataPoint; use migration::sea_orm::prelude::Decimal; use migration::sea_orm::{ - self, ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter, TransactionTrait, + self, ActiveModelTrait, ColumnTrait, DatabaseConnection, DbConn, EntityTrait, IntoActiveModel, + QueryFilter, TransactionTrait, }; use migration::sea_orm::{DatabaseTransaction, QuerySelect}; use migration::{Expr, LockType, OnConflict}; -use num_traits::ToPrimitive; -use parking_lot::Mutex; +use num_traits::{ToPrimitive, Zero}; +use parking_lot::{Mutex, RwLock}; +use sentry::User; use std::borrow::Cow; +use std::convert::Infallible; +use std::default::Default; use std::mem; -use std::num::NonZeroU64; +use std::num::{NonZeroU64, TryFromIntError}; use std::str::FromStr; use std::sync::atomic::Ordering; use std::sync::Arc; -use tracing::trace; +use tokio::sync::RwLock as AsyncRwLock; +use tracing::{error, info, trace}; +use crate::balance::{try_get_balance_from_db, Balance}; pub use stat_buffer::{SpawnedStatBuffer, StatBuffer}; #[derive(Debug, PartialEq, Eq)] @@ -168,15 +176,14 @@ impl RpcQueryStats { } } -#[derive(Debug, Default)] -struct Deltas { - balance_spent_including_free_credits: Decimal, - balance_spent_excluding_free_credits: Decimal, - apply_usage_bonus_to_request_sender: bool, - usage_bonus_to_request_sender_through_referral: Decimal, - - bonus_to_referrer: Decimal, -} +// #[derive(Debug, Default)] +// struct Deltas { +// balance_spent_including_free_credits: Decimal, +// balance_spent_excluding_free_credits: Decimal, +// apply_usage_bonus_to_request_sender: bool, +// usage_bonus_to_request_sender_through_referral: Decimal, +// bonus_to_referrer: Decimal, +// } /// A stat that we aggregate and then store in a database. /// For now there is just one, but I think there might be others later @@ -187,7 +194,7 @@ pub enum AppStat { // TODO: move to stat_buffer.rs? impl BufferedRpcQueryStats { - fn add(&mut self, stat: RpcQueryStats) { + async fn add(&mut self, stat: RpcQueryStats) { // a stat always come from just 1 frontend request self.frontend_requests += 1; @@ -210,8 +217,8 @@ impl BufferedRpcQueryStats { self.sum_response_millis += stat.response_millis; self.sum_credits_used += stat.compute_unit_cost; - let latest_balance = stat.authorization.checks.latest_balance.read(); - self.latest_balance = latest_balance.clone(); + let latest_balance = stat.authorization.checks.latest_balance.read().await; + self.approximate_latest_balance_for_influx = latest_balance.clone(); } async fn _save_db_stats( @@ -219,9 +226,18 @@ impl BufferedRpcQueryStats { chain_id: u64, db_conn: &DatabaseConnection, key: &RpcQueryKey, - ) -> Web3ProxyResult<()> { + user_balance: Decimal, + ) -> Web3ProxyResult { let period_datetime = Utc.timestamp_opt(key.response_timestamp, 0).unwrap(); + // Because reading the balance and updating the stats here is not atomically locked, this may lead to a negative balance + // This negative balance shouldn't be large tough + let paid_credits_used = if self.sum_credits_used >= user_balance { + user_balance + } else { + self.sum_credits_used + }; + // =============================== // // UPDATE STATISTICS // // =============================== // @@ -241,7 +257,8 @@ impl BufferedRpcQueryStats { sum_request_bytes: sea_orm::Set(self.sum_request_bytes), sum_response_millis: sea_orm::Set(self.sum_response_millis), sum_response_bytes: sea_orm::Set(self.sum_response_bytes), - sum_credits_used: sea_orm::Set(self.sum_credits_used), + sum_credits_used: sea_orm::Set(paid_credits_used), + sum_incl_free_credits_used: sea_orm::Set(self.sum_credits_used), }; rpc_accounting_v2::Entity::insert(accounting_entry) @@ -291,10 +308,15 @@ impl BufferedRpcQueryStats { Expr::col(rpc_accounting_v2::Column::SumResponseBytes) .add(self.sum_response_bytes), ), + ( + rpc_accounting_v2::Column::SumInclFreeCreditsUsed, + Expr::col(rpc_accounting_v2::Column::SumInclFreeCreditsUsed) + .add(self.sum_credits_used), + ), ( rpc_accounting_v2::Column::SumCreditsUsed, Expr::col(rpc_accounting_v2::Column::SumCreditsUsed) - .add(self.sum_credits_used), + .add(paid_credits_used), ), ]) .to_owned(), @@ -302,356 +324,33 @@ impl BufferedRpcQueryStats { .exec(db_conn) .await?; - Ok(()) + Ok(paid_credits_used) } - async fn _get_relevant_entities( + // TODO: This is basically a duplicate with the balance_checks, except the database + // TODO: Please refactor this. Also there are small differences, like the Error is 0 + async fn _get_user_balance( &self, - rpc_secret_key_id: &NonZeroU64, - txn: &DatabaseTransaction, - ) -> Web3ProxyResult<( - rpc_key::Model, - balance::Model, - Option<(referee::Model, referrer::Model)>, - )> { - // Only calculate, and update the user balance - // Do not worry about referrers and all that - let sender_rpc_entity: rpc_key::Model = rpc_key::Entity::find() - .filter(rpc_key::Column::Id.eq(rpc_secret_key_id.get())) - .one(txn) - .await? - .context("We previous checked that the id exists, this is likely some race condition, or it just got deleted!")?; - - let sender_balance: balance::Model = balance::Entity::find() - .filter(balance::Column::UserId.eq(sender_rpc_entity.user_id)) - .one(txn) - .await? - .ok_or(Web3ProxyError::BadRequest( - "Could not find rpc key in db".into(), - ))?; - - // I think one lock here is fine, because only one server has access to the "credits_applied_for_referee" entry - let referral_objects: Option<(referee::Model, referrer::Model)> = - match referee::Entity::find() - .filter(referee::Column::UserId.eq(sender_rpc_entity.user_id)) - .lock(LockType::Update) - .find_also_related(referrer::Entity) - .one(txn) - .await? - { - Some(x) => Some(( - x.0, - x.1.context("Could not fine corresponding referrer code")?, - )), - None => None, - }; - - Ok((sender_rpc_entity, sender_balance, referral_objects)) - } - - async fn _compute_balance_deltas( - &self, - sender_balance: balance::Model, - referral_objects: Option<(referee::Model, referrer::Model)>, - ) -> Web3ProxyResult<(Deltas, Option<(referee::Model, referrer::Model)>)> { - // Calculate Balance Only - let mut deltas = Deltas::default(); - - // Calculate a bunch using referrals as well - if let Some((referral_entity, referrer_code_entity)) = referral_objects { - deltas.apply_usage_bonus_to_request_sender = - referral_entity.credits_applied_for_referee; - - // Calculate if we are above the usage threshold, and apply a bonus - // Optimally we would read this from the balance, but if we do it like this, we only have to lock a single table (much safer w.r.t. deadlocks) - // referral_entity.credits_applied_for_referrer * (Decimal::from(10) checks (atomically using this table only), whether the user has brought in >$100 to the referer - // In this case, the sender receives $100 as a bonus / gift - // Apply a 10$ bonus onto the user, if the user has spent 100$ - trace!( - "Were credits applied so far? {:?} {:?}", - referral_entity.credits_applied_for_referee, - !referral_entity.credits_applied_for_referee - ); - trace!( - "Credits applied for referrer so far? {:?}", - referral_entity.credits_applied_for_referrer - ); - trace!("Sum credits used? {:?}", self.sum_credits_used); - trace!( - "Hello: {:?}", - (referral_entity.credits_applied_for_referrer * (Decimal::from(10)) - + self.sum_credits_used) - >= Decimal::from(100) - ); - if !referral_entity.credits_applied_for_referee - && (referral_entity.credits_applied_for_referrer * (Decimal::from(10)) - + self.sum_credits_used) - >= Decimal::from(100) - { - trace!("Adding sender bonus balance"); - deltas.usage_bonus_to_request_sender_through_referral = Decimal::from(10); - deltas.apply_usage_bonus_to_request_sender = true; - } - - // Calculate how much the referrer should get, limited to the last 12 months - // Apply 10% of the used balance as a bonus if applicable - let now = Utc::now(); - let valid_until = DateTime::::from_utc(referral_entity.referral_start_date, Utc) - + Months::new(12); - - if now <= valid_until { - deltas.bonus_to_referrer += self.sum_credits_used / Decimal::new(10, 0); - } - - // Duplicate code, I should fix this later ... - let user_balance = sender_balance.total_deposits - - sender_balance.total_spent_outside_free_tier - + deltas.usage_bonus_to_request_sender_through_referral; - - // Split up the component of into how much of the paid component was used, and how much of the free component was used (anything after "balance") - if user_balance - self.sum_credits_used >= Decimal::from(0) { - deltas.balance_spent_including_free_credits = self.sum_credits_used; - deltas.balance_spent_excluding_free_credits = self.sum_credits_used; - } else { - deltas.balance_spent_including_free_credits = self.sum_credits_used; - deltas.balance_spent_excluding_free_credits = user_balance; - } - - Ok((deltas, Some((referral_entity, referrer_code_entity)))) - } else { - let user_balance = sender_balance.total_deposits - - sender_balance.total_spent_outside_free_tier - + deltas.usage_bonus_to_request_sender_through_referral; - - // Split up the component of into how much of the paid component was used, and how much of the free component was used (anything after "balance") - if user_balance - self.sum_credits_used >= Decimal::from(0) { - deltas.balance_spent_including_free_credits = self.sum_credits_used; - deltas.balance_spent_excluding_free_credits = self.sum_credits_used; - } else { - deltas.balance_spent_including_free_credits = self.sum_credits_used; - deltas.balance_spent_excluding_free_credits = user_balance; - } - - Ok((deltas, None)) - } - } - - /// Save all referral-based objects in the database - async fn _update_balances_in_db( - &self, - deltas: &Deltas, - txn: &DatabaseTransaction, - sender_rpc_entity: &rpc_key::Model, - referral_objects: &Option<(referee::Model, referrer::Model)>, - ) -> Web3ProxyResult<()> { - // Do the sender balance updates - let user_balance = balance::ActiveModel { - id: sea_orm::NotSet, - total_deposits: sea_orm::Set(deltas.usage_bonus_to_request_sender_through_referral), - total_spent_including_free_tier: sea_orm::Set( - deltas.balance_spent_including_free_credits, - ), - total_spent_outside_free_tier: sea_orm::Set( - deltas.balance_spent_excluding_free_credits, - ), - user_id: sea_orm::Set(sender_rpc_entity.user_id), - }; - - // In any case, add to the balance - trace!( - "Delta is: {:?} from credits used {:?}", - deltas, - self.sum_credits_used - ); - let _ = balance::Entity::insert(user_balance) - .on_conflict( - OnConflict::new() - .values([ - ( - balance::Column::TotalSpentIncludingFreeTier, - Expr::col(balance::Column::TotalSpentIncludingFreeTier) - .add(deltas.balance_spent_including_free_credits), - ), - ( - balance::Column::TotalSpentOutsideFreeTier, - Expr::col(balance::Column::TotalSpentOutsideFreeTier) - .add(deltas.balance_spent_excluding_free_credits), - ), - ( - balance::Column::TotalDeposits, - Expr::col(balance::Column::TotalDeposits) - .add(deltas.usage_bonus_to_request_sender_through_referral), - ), - ]) - .to_owned(), - ) - .exec(txn) - .await?; - - // Do the referrer_entry updates - if let Some((referral_entity, referrer_code_entity)) = referral_objects { - trace!("Positive referrer deposit delta"); - let referee_entry = referee::ActiveModel { - id: sea_orm::Unchanged(referral_entity.id), - credits_applied_for_referee: sea_orm::Set( - deltas.apply_usage_bonus_to_request_sender, - ), - credits_applied_for_referrer: sea_orm::Set(deltas.bonus_to_referrer), - - referral_start_date: sea_orm::Unchanged(referral_entity.referral_start_date), - used_referral_code: sea_orm::Unchanged(referral_entity.used_referral_code), - user_id: sea_orm::Unchanged(referral_entity.user_id), - }; - - // If there was a referral, first of all check if credits should be applied to the sender itself (once he spent 100$) - // If these two values are not equal, that means that we have not applied the bonus just yet. - // In that case, we can apply the bonus just now. - if referral_entity.credits_applied_for_referee - != deltas.apply_usage_bonus_to_request_sender - { - referee::Entity::insert(referee_entry.clone()) - .on_conflict( - OnConflict::new() - .values([( - // TODO Make it a "Set", add is hacky (but works ..) - referee::Column::CreditsAppliedForReferee, - Expr::col(referee::Column::CreditsAppliedForReferee) - .add(deltas.apply_usage_bonus_to_request_sender), - )]) - .to_owned(), - ) - .exec(txn) - .await?; - - // Also add a bonus to the sender (But this should already have been done with the above code!!) - } - - // If the bonus to the referrer is non-empty, also apply that - if deltas.bonus_to_referrer > Decimal::from(0) { - referee::Entity::insert(referee_entry) - .on_conflict( - OnConflict::new() - .values([( - // TODO Make it a "Set", add is hacky (but works ..) - referee::Column::CreditsAppliedForReferrer, - Expr::col(referee::Column::CreditsAppliedForReferrer) - .add(deltas.bonus_to_referrer), - )]) - .to_owned(), - ) - .exec(txn) - .await?; - } - - // Finally, add to the balance of the referrer - let user_balance = balance::ActiveModel { - id: sea_orm::NotSet, - total_deposits: sea_orm::Set(deltas.bonus_to_referrer), - user_id: sea_orm::Set(referrer_code_entity.user_id), - ..Default::default() - }; - - let _ = balance::Entity::insert(user_balance) - .on_conflict( - OnConflict::new() - .values([( - balance::Column::TotalDeposits, - Expr::col(balance::Column::TotalDeposits).add(deltas.bonus_to_referrer), - )]) - .to_owned(), - ) - .exec(txn) - .await?; - }; - Ok(()) - } - - /// Update & Invalidate cache if user is credits are low (premium downgrade condition) - /// Reduce credits if there was no issue - /// This is not atomic, so this may be an issue because it's not sequentially consistent across threads - /// It is a good-enough approximation though, and if the TTL for the balance cache is low enough, this should be ok - async fn _update_balance_in_cache( - &self, - deltas: &Deltas, - db_conn: &DatabaseConnection, - sender_rpc_entity: &rpc_key::Model, - rpc_secret_key_cache: &RpcSecretKeyCache, + user_id: u64, user_balance_cache: &UserBalanceCache, - ) -> Web3ProxyResult<()> { - // ================== - // Modify sender balance - // ================== - let user_id = NonZeroU64::try_from(sender_rpc_entity.user_id) - .expect("database ids are always nonzero"); - - // We don't do an get_or_insert, because technically we don't have the most up to date balance - // Also let's keep things simple in terms of writing and getting. A single place writes it, multiple places can remove / poll it - let latest_balance = match user_balance_cache.get(&user_id) { - Some(x) => x, - // If not in cache, nothing to update - None => return Ok(()), - }; - - let (balance_before, latest_balance) = { - let mut latest_balance = latest_balance.write(); - - let balance_before = latest_balance.clone(); - - // Now modify the balance - latest_balance.total_deposit += deltas.usage_bonus_to_request_sender_through_referral; - latest_balance.total_spend += deltas.balance_spent_including_free_credits; - - (balance_before, latest_balance.clone()) - }; - - // we only start subtracting once the user is first upgraded to a premium user - // consider the user premium if total_deposit > premium threshold - // If the balance is getting low, clear the cache - // TODO: configurable amount for "premium" - // TODO: configurable amount for "low" - // we check balance_before because this current request would have been handled with limits matching the balance at the start of the request - if balance_before.total_deposit > Decimal::from(10) - && latest_balance.remaining() <= Decimal::from(1) - { - let rpc_keys = rpc_key::Entity::find() - .filter(rpc_key::Column::UserId.eq(sender_rpc_entity.user_id)) - .all(db_conn) - .await?; - - // clear the user from the cache - if let Ok(user_id) = NonZeroU64::try_from(sender_rpc_entity.user_id) { - user_balance_cache.invalidate(&user_id).await; - } - - // clear all keys owned by this user from the cache - for rpc_key_entity in rpc_keys { - rpc_secret_key_cache - .invalidate(&rpc_key_entity.secret_key.into()) - .await; - } - } - - // ================== - // Modify referrer balance - // ================== - // We ignore this for performance reasons right now - // We would have to load all the RPC keys of the referrer to de-activate them - // Instead, it's fine if they wait for 60 seconds until their cache expires - // If they are getting low, they will refresh themselves if necessary and then they will see - // // If the referrer object is empty, we don't care about the cache, becase this will be fetched in a next request from the database - // if let Some((referral_entity, _)) = referral_objects { - // if let Ok(referrer_user_id) = NonZeroU64::try_from(referral_entity.user_id) { - // // If the referrer object is in the cache, we just remove it from the balance cache; it will be reloaded next time - // // Get all the RPC keys, delete them from cache - // - // // In principle, do not remove the cache for the referrer; the next reload will trigger premium - // // We don't touch the RPC keys at this stage for the refferer, a payment must be paid to reset those (we want to keep things simple here) - // // Anyways, the RPC keys will be updated in 5 min (600 seconds) - // user_balance_cache.invalidate(&referrer_user_id).await; - // } - // }; - - Ok(()) + db_conn: &DbConn, + ) -> Arc> { + trace!("Will get it from the balance cache"); + let out: Arc> = user_balance_cache + .try_get_with(user_id, async { + let x = match try_get_balance_from_db(db_conn, user_id).await? { + Some(x) => x, + None => return Err(Web3ProxyError::InvalidUserKey), + }; + return Ok(Arc::new(AsyncRwLock::new(x))); + }) + .await + .unwrap_or_else(|err| { + error!("Could not find balance for user !{}", err); + // We are just instantiating this for type-safety's sake + Arc::new(AsyncRwLock::new(Balance::default())) + }); + out } // TODO: take a db transaction instead so that we can batch? @@ -663,6 +362,7 @@ impl BufferedRpcQueryStats { rpc_secret_key_cache: &RpcSecretKeyCache, user_balance_cache: &UserBalanceCache, ) -> Web3ProxyResult<()> { + // Sanity check, if we need to save stats if key.response_timestamp == 0 { return Err(Web3ProxyError::Anyhow(anyhow!( "no response_timestamp! This is a bug! {:?} {:?}", @@ -671,49 +371,152 @@ impl BufferedRpcQueryStats { ))); } - // First of all, save the statistics to the database: - self._save_db_stats(chain_id, db_conn, &key).await?; + let sender_user_id = key.rpc_key_user_id.map_or(0, |x| x.get()); - // Return early if no credits were used, or if user is anonymous + // Gathering cache and database rows + let user_balance = self + ._get_user_balance(sender_user_id, user_balance_cache, &db_conn) + .await; + + let mut user_balance = user_balance.write().await; + + // First of all, save the statistics to the database: + let paid_credits_used = self + ._save_db_stats(chain_id, &db_conn, &key, user_balance.remaining()) + .await?; + + // No need to continue if no credits were used if self.sum_credits_used == 0.into() { + // write-lock is released return Ok(()); } - let rpc_secret_key_id: &NonZeroU64 = match &key.rpc_secret_key_id { - Some(x) => x, - None => return Ok(()), - }; + + // Update and possible invalidate rpc caches if necessary (if there was a downgrade) + { + let balance_before = user_balance.remaining(); + info!("Balance before is {:?}", balance_before); + user_balance.total_spent_paid_credits += paid_credits_used; + + // Invalidate caches if remaining is below a threshold + // It will be re-fetched again if needed + if sender_user_id != 0 + && balance_before > Decimal::from(10) + && user_balance.remaining() < Decimal::from(10) + { + let rpc_keys = rpc_key::Entity::find() + .filter(rpc_key::Column::UserId.eq(sender_user_id)) + .all(db_conn) + .await?; + + // clear all keys owned by this user from the cache + for rpc_key_entity in rpc_keys { + rpc_secret_key_cache + .invalidate(&rpc_key_entity.secret_key.into()) + .await; + } + } + } // Start a transaction let txn = db_conn.begin().await?; - // Fetch any items that we will be modifying - let (sender_rpc_entity, _sender_balance, referral_objects) = - self._get_relevant_entities(rpc_secret_key_id, &txn).await?; - // Compute Changes in balance for user and referrer, incl. referral logic - let (deltas, referral_objects): (Deltas, Option<(referee::Model, referrer::Model)>) = self - ._compute_balance_deltas(_sender_balance, referral_objects) - .await?; + // Apply all the referral logic; let's keep it simple and flat for now + // Calculate if we are above the usage threshold, and apply a bonus + // Optimally we would read this from the balance, but if we do it like this, we only have to lock a single table (much safer w.r.t. deadlocks) + // referral_entity.credits_applied_for_referrer * (Decimal::from(10) checks (atomically using this table only), whether the user has brought in >$100 to the referer + // In this case, the sender receives $100 as a bonus / gift + // Apply a 10$ bonus onto the user, if the user has spent 100$ + match referee::Entity::find() + .filter(referee::Column::UserId.eq(sender_user_id)) + .find_also_related(referrer::Entity) + .one(&txn) + .await? + { + Some((referral_entity, Some(referrer))) => { + // Get the balance for the referrer, see if they're premium or not + let referrer_balance = self + ._get_user_balance(referrer.user_id, &user_balance_cache, &db_conn) + .await; + // Just to keep locking simple, keep things where they are + let referrer_balance = referrer_balance.read().await; + let mut new_referee_entity: referee::ActiveModel; - // Update balances in the database - self._update_balances_in_db(&deltas, &txn, &sender_rpc_entity, &referral_objects) - .await?; + let bonus_for_user_threshold = Decimal::from(100); + let bonus_for_user = Decimal::from(10); + + // Provide one-time bonus to user, if more than 100$ was spent, + // and if the one-time bonus was not already provided + if referral_entity.one_time_bonus_applied_for_referee.is_zero() + && (referral_entity.credits_applied_for_referrer * Decimal::from(10) + + self.sum_credits_used) + >= bonus_for_user_threshold + { + trace!("Adding sender bonus balance"); + // TODO: Should it be Unchanged, or do we have to load all numbers there again .. + new_referee_entity = referee::ActiveModel { + id: sea_orm::Unchanged(Default::default()), + one_time_bonus_applied_for_referee: sea_orm::Set(bonus_for_user), + credits_applied_for_referrer: sea_orm::Unchanged(Default::default()), + referral_start_date: sea_orm::Unchanged(Default::default()), + used_referral_code: sea_orm::Unchanged(Default::default()), + user_id: sea_orm::Unchanged(Default::default()), + }; + // Update the cache + { + // Also no need to invalidate the cache here, just by itself + user_balance.total_deposits += bonus_for_user; + } + // The resulting field will not be read again, so I will not try to turn the ActiveModel into a Model one + new_referee_entity = new_referee_entity.save(&txn).await?; + } + + // Apply the bonus to the referrer if they are premium right now + { + let now = Utc::now(); + let valid_until = + DateTime::::from_utc(referral_entity.referral_start_date, Utc) + + Months::new(12); + + // There must be a conflict, if there isn't than there's a clear bug! + // If the referrer has more than $10, provide credits to them + // Also only works if the referrer referred the person less than 1 year ago + // TODO: Perhaps let's not worry about the referral cache here, to avoid deadlocks (hence only reading) + let referrer_bonus = self.sum_credits_used / Decimal::from(10); + if referrer_balance.remaining() > Decimal::from(10) && now <= valid_until { + referee::Entity::insert(referral_entity.into_active_model()) + .on_conflict( + OnConflict::new() + .values([( + // Provide credits to referrer + referee::Column::CreditsAppliedForReferrer, + Expr::col(referee::Column::CreditsAppliedForReferrer) + .add(referrer_bonus), + )]) + .to_owned(), + ) + .exec(&txn) + .await?; + } + // No need to invalidate the referrer every single time; + // this is no major change and can wait for a bit + // Let's not worry about the referrer balance bcs possibility of deadlock + // referrer_balance.total_deposits += referrer_bonus; + } + } + Some((referee, None)) => { + error!( + "No referrer code found for this referrer, this should never happen! {:?}", + referee + ); + } + _ => {} + }; // Finally commit the transaction in the database txn.commit() .await .context("Failed to update referral and balance updates")?; - // Update balanaces in the cache. - // do this after commiting the database so that invalidated caches definitely query commited data - self._update_balance_in_cache( - &deltas, - db_conn, - &sender_rpc_entity, - rpc_secret_key_cache, - user_balance_cache, - ) - .await?; - Ok(()) } @@ -734,7 +537,7 @@ impl BufferedRpcQueryStats { builder = builder.tag("method", key.method); // Read the latest balance ... - let remaining = self.latest_balance.remaining(); + let remaining = self.approximate_latest_balance_for_influx.remaining(); trace!("Remaining balance for influx is {:?}", remaining); builder = builder diff --git a/web3_proxy/src/stats/stat_buffer.rs b/web3_proxy/src/stats/stat_buffer.rs index 775687fa..84093d77 100644 --- a/web3_proxy/src/stats/stat_buffer.rs +++ b/web3_proxy/src/stats/stat_buffer.rs @@ -1,8 +1,8 @@ use super::{AppStat, RpcQueryKey}; use crate::app::Web3ProxyJoinHandle; +use crate::balance::Balance; use crate::caches::{RpcSecretKeyCache, UserBalanceCache}; use crate::errors::Web3ProxyResult; -use crate::frontend::authorization::Balance; use derive_more::From; use futures::stream; use hashbrown::HashMap; @@ -28,7 +28,7 @@ pub struct BufferedRpcQueryStats { pub sum_credits_used: Decimal, pub sum_cu_used: Decimal, /// The user's balance at this point in time. Multiple queries might be modifying it at once. - pub latest_balance: Balance, + pub approximate_latest_balance_for_influx: Balance, } #[derive(From)] @@ -130,15 +130,15 @@ impl StatBuffer { let global_timeseries_key = stat.global_timeseries_key(); - self.global_timeseries_buffer.entry(global_timeseries_key).or_default().add(stat.clone()); + self.global_timeseries_buffer.entry(global_timeseries_key).or_default().add(stat.clone()).await; if let Some(opt_in_timeseries_key) = stat.owned_timeseries_key() { - self.opt_in_timeseries_buffer.entry(opt_in_timeseries_key).or_default().add(stat.clone()); + self.opt_in_timeseries_buffer.entry(opt_in_timeseries_key).or_default().add(stat.clone()).await; } } if self.db_conn.is_some() { - self.accounting_db_buffer.entry(stat.accounting_key(self.billing_period_seconds)).or_default().add(stat); + self.accounting_db_buffer.entry(stat.accounting_key(self.billing_period_seconds)).or_default().add(stat).await; } } Err(err) => { diff --git a/web3_proxy/tests/test_users.rs b/web3_proxy/tests/test_users.rs index ea130dcf..82d8c000 100644 --- a/web3_proxy/tests/test_users.rs +++ b/web3_proxy/tests/test_users.rs @@ -13,6 +13,7 @@ use crate::common::referral::{ use crate::common::TestApp; use ethers::prelude::{Http, Provider}; use ethers::{signers::Signer, types::Signature}; +use futures::future::select_all; use rust_decimal::Decimal; use serde::Deserialize; use std::str::FromStr; @@ -91,7 +92,7 @@ async fn test_log_in_and_out() { #[cfg_attr(not(feature = "tests-needing-docker"), ignore)] #[test_log::test(tokio::test)] async fn test_admin_balance_increase() { - info!("Starting balance decreases with usage test"); + info!("Starting admin can increase balance"); let x = TestApp::spawn(true).await; let r = reqwest::Client::builder() .timeout(Duration::from_secs(20)) @@ -157,7 +158,7 @@ async fn test_user_balance_decreases() { let proxy_endpoint = format!("{}rpc/{}", x.proxy_provider.url(), rpc_keys.secret_key); let proxy_provider = Provider::::try_from(proxy_endpoint).unwrap(); - // Make somre requests while in the free tier, so we can test bookkeeping here + // Make some requests while in the free tier, so we can test bookkeeping here for _ in 1..10_000 { let _ = proxy_provider .request::<_, Option>("eth_getBlockByNumber", ("latest", false)) @@ -171,6 +172,34 @@ async fn test_user_balance_decreases() { assert_eq!(influx_count, 0); assert!(mysql_count > 0); + // Check the balance, it should not have decreased; there should have been accounted free credits, however + let user_balance_response = user_get_balance(&x, &r, &user_login_response).await; + // Check that the balance is 0 + assert_eq!( + Decimal::from_str(user_balance_response["balance"].as_str().unwrap()).unwrap(), + Decimal::from(0) + ); + // Check that paid credits is 0 (because balance is 0) + assert_eq!( + Decimal::from_str( + user_balance_response["total_spent_paid_credits"] + .as_str() + .unwrap() + ) + .unwrap(), + Decimal::from(0) + ); + // Check that paid credits is 0 (because balance is 0) + assert_eq!( + Decimal::from_str(user_balance_response["total_deposits"].as_str().unwrap()).unwrap(), + Decimal::from(0) + ); + // Check that total credits incl free used is larger than 0 + assert!( + Decimal::from_str(user_balance_response["total_spent"].as_str().unwrap()).unwrap() + > Decimal::from(0) + ); + // Bump both user's wallet to $20 admin_increase_balance( &x, @@ -199,12 +228,21 @@ async fn test_user_balance_decreases() { assert!(mysql_count > 0); // Deposits should not be affected, and should be equal to what was initially provided + let user_balance_response = user_get_balance(&x, &r, &user_login_response).await; let total_deposits = Decimal::from_str(user_balance_response["total_deposits"].as_str().unwrap()).unwrap(); assert_eq!(total_deposits, Decimal::from(20)); - + // Check that total_spent_paid credits is equal to total_spent, because we are all still inside premium + assert_eq!( + Decimal::from_str( + user_balance_response["total_spent_paid_credits"] + .as_str() + .unwrap() + ) + .unwrap(), + Decimal::from_str(user_balance_response["total_spent"].as_str().unwrap()).unwrap() + ); // Get the full balance endpoint - let user_balance_response = user_get_balance(&x, &r, &user_login_response).await; let user_balance_post = Decimal::from_str(user_balance_response["balance"].as_str().unwrap()).unwrap(); assert!(user_balance_post < user_balance_pre); @@ -345,12 +383,21 @@ async fn test_referral_bonus_non_concurrent() { let referrer_balance_post = Decimal::from_str(referrer_balance_response["balance"].as_str().unwrap()).unwrap(); + info!( + "Balances before and after are (user): {:?} {:?}", + user_balance_pre, user_balance_post + ); + info!( + "Balances before and after are (referrer): {:?} {:?}", + referrer_balance_pre, referrer_balance_post + ); + let difference = user_balance_pre - user_balance_post; // Make sure that the pre and post balance is not the same (i.e. some change has occurred) assert_ne!( user_balance_pre, user_balance_post, - "Pre and post balnace is equivalent" + "Pre and post balance is equivalent" ); assert!(user_balance_pre > user_balance_post); assert!(referrer_balance_pre < referrer_balance_post); @@ -361,3 +408,345 @@ async fn test_referral_bonus_non_concurrent() { referrer_balance_post ); } + +#[cfg_attr(not(feature = "tests-needing-docker"), ignore)] +#[test_log::test(tokio::test)] +async fn test_referral_bonus_concurrent_referrer_only() { + info!("Starting referral bonus test"); + let x = TestApp::spawn(true).await; + let r = reqwest::Client::builder() + .timeout(Duration::from_secs(20)) + .build() + .unwrap(); + + let user_wallet = x.wallet(0); + let referrer_wallet = x.wallet(1); + let admin_wallet = x.wallet(2); + + // Create three users, one referrer, one admin who bumps both their balances + let referrer_login_response = create_user(&x, &r, &referrer_wallet, None).await; + let admin_login_response = create_user_as_admin(&x, &r, &admin_wallet).await; + // Get the first user's referral link + let referral_link = get_referral_code(&x, &r, &referrer_login_response).await; + + let user_login_response = create_user(&x, &r, &user_wallet, Some(referral_link.clone())).await; + + // Bump both user's wallet to $20 + admin_increase_balance( + &x, + &r, + &admin_login_response, + &user_wallet, + Decimal::from(20), + ) + .await; + admin_increase_balance( + &x, + &r, + &admin_login_response, + &referrer_wallet, + Decimal::from(20), + ) + .await; + + // Get balance before for both users + let user_balance_response = user_get_balance(&x, &r, &user_login_response).await; + let user_balance_pre = + Decimal::from_str(user_balance_response["balance"].as_str().unwrap()).unwrap(); + let referrer_balance_response = user_get_balance(&x, &r, &referrer_login_response).await; + let referrer_balance_pre = + Decimal::from_str(referrer_balance_response["balance"].as_str().unwrap()).unwrap(); + + // Make sure they both have balance now + assert_eq!(user_balance_pre, Decimal::from(20)); + assert_eq!(referrer_balance_pre, Decimal::from(20)); + + // Setup variables that will be used + let shared_referral_code: UserSharedReferralInfo = + get_shared_referral_codes(&x, &r, &referrer_login_response).await; + let used_referral_code: UserUsedReferralInfo = + get_used_referral_codes(&x, &r, &user_login_response).await; + + // assert that the used referral code is used + assert_eq!( + format!("{:?}", user_wallet.address()), + shared_referral_code + .clone() + .referrals + .get(0) + .unwrap() + .referred_address + .clone() + .unwrap() + ); + assert_eq!( + referral_link.clone(), + used_referral_code + .clone() + .referrals + .get(0) + .unwrap() + .used_referral_code + .clone() + .unwrap() + ); + + // Make a for-loop just spam it a bit + // Make a JSON request + let rpc_keys: RpcKey = user_get_first_rpc_key(&x, &r, &user_login_response).await; + info!("Rpc key is: {:?}", rpc_keys); + + // Spawn many requests proxy_providers + let number_requests = 100; + // Spin up concurrent requests ... + let mut handles = Vec::with_capacity(number_requests); + for _ in 1..number_requests { + let url = (&x).proxy_provider.url().clone(); + let secret_key = (&rpc_keys).secret_key.clone(); + handles.push(tokio::spawn(async move { + let proxy_endpoint = format!("{}rpc/{}", url, secret_key); + let proxy_provider = Provider::::try_from(proxy_endpoint).unwrap(); + let _proxy_result = proxy_provider + .request::<_, Option>("eth_getBlockByNumber", ("latest", false)) + .await + .unwrap() + .unwrap(); + _proxy_result + })); + } + + let mut results = Vec::with_capacity(handles.len()); + for handle in handles { + results.push(handle.await.unwrap()); + } + + // Flush all stats here + let (influx_count, mysql_count) = x.flush_stats().await.unwrap(); + assert_eq!(influx_count, 0); + assert!(mysql_count > 0); + + // Check that at least something was earned: + let shared_referral_code: UserSharedReferralInfo = + get_shared_referral_codes(&x, &r, &referrer_login_response).await; + info!("Referral code"); + info!("{:?}", shared_referral_code.referrals.get(0).unwrap()); + + // We make sure that the referrer has $10 + 10% of the used balance + // The admin provides credits for both + let user_balance_response = user_get_balance(&x, &r, &user_login_response).await; + let user_balance_post = + Decimal::from_str(user_balance_response["balance"].as_str().unwrap()).unwrap(); + let referrer_balance_response = user_get_balance(&x, &r, &referrer_login_response).await; + let referrer_balance_post = + Decimal::from_str(referrer_balance_response["balance"].as_str().unwrap()).unwrap(); + + info!( + "Balances before and after are (user): {:?} {:?}", + user_balance_pre, user_balance_post + ); + info!( + "Balances before and after are (referrer): {:?} {:?}", + referrer_balance_pre, referrer_balance_post + ); + + let difference = user_balance_pre - user_balance_post; + + // Make sure that the pre and post balance is not the same (i.e. some change has occurred) + assert_ne!( + user_balance_pre, user_balance_post, + "Pre and post balance is equivalent" + ); + assert!(user_balance_pre > user_balance_post); + assert!(referrer_balance_pre < referrer_balance_post); + + // Finally, make sure that referrer has received 10$ of balances + assert_eq!( + referrer_balance_pre + difference / Decimal::from(10), + referrer_balance_post + ); +} + +#[cfg_attr(not(feature = "tests-needing-docker"), ignore)] +#[test_log::test(tokio::test)] +async fn test_referral_bonus_concurrent_referrer_and_user() { + info!("Starting referral bonus test"); + let x = TestApp::spawn(true).await; + let r = reqwest::Client::builder() + .timeout(Duration::from_secs(20)) + .build() + .unwrap(); + + let user_wallet = x.wallet(0); + let referrer_wallet = x.wallet(1); + let admin_wallet = x.wallet(2); + + // Create three users, one referrer, one admin who bumps both their balances + let referrer_login_response = create_user(&x, &r, &referrer_wallet, None).await; + let admin_login_response = create_user_as_admin(&x, &r, &admin_wallet).await; + // Get the first user's referral link + let referral_link = get_referral_code(&x, &r, &referrer_login_response).await; + + let user_login_response = create_user(&x, &r, &user_wallet, Some(referral_link.clone())).await; + + // Bump both user's wallet to $20 + admin_increase_balance( + &x, + &r, + &admin_login_response, + &user_wallet, + Decimal::from(20), + ) + .await; + admin_increase_balance( + &x, + &r, + &admin_login_response, + &referrer_wallet, + Decimal::from(20), + ) + .await; + + // Get balance before for both users + let user_balance_response = user_get_balance(&x, &r, &user_login_response).await; + let user_balance_pre = + Decimal::from_str(user_balance_response["balance"].as_str().unwrap()).unwrap(); + let referrer_balance_response = user_get_balance(&x, &r, &referrer_login_response).await; + let referrer_balance_pre = + Decimal::from_str(referrer_balance_response["balance"].as_str().unwrap()).unwrap(); + + // Make sure they both have balance now + assert_eq!(user_balance_pre, Decimal::from(20)); + assert_eq!(referrer_balance_pre, Decimal::from(20)); + + // Setup variables that will be used + let shared_referral_code: UserSharedReferralInfo = + get_shared_referral_codes(&x, &r, &referrer_login_response).await; + let used_referral_code: UserUsedReferralInfo = + get_used_referral_codes(&x, &r, &user_login_response).await; + + // assert that the used referral code is used + assert_eq!( + format!("{:?}", user_wallet.address()), + shared_referral_code + .clone() + .referrals + .get(0) + .unwrap() + .referred_address + .clone() + .unwrap() + ); + assert_eq!( + referral_link.clone(), + used_referral_code + .clone() + .referrals + .get(0) + .unwrap() + .used_referral_code + .clone() + .unwrap() + ); + + // Make a for-loop just spam it a bit + // Make a JSON request + let referre_rpc_keys: RpcKey = user_get_first_rpc_key(&x, &r, &referrer_login_response).await; + let user_rpc_keys: RpcKey = user_get_first_rpc_key(&x, &r, &user_login_response).await; + info!("Rpc key is: {:?}", user_rpc_keys); + + // Spawn many requests proxy_providers + let number_requests = 50; + // Spin up concurrent requests ... + let mut handles = Vec::with_capacity(number_requests); + + // Make one request to create the cache; this originates from no user + let url = (&x).proxy_provider.url().clone(); + let proxy_endpoint = format!("{}", url); + let proxy_provider = Provider::::try_from(proxy_endpoint).unwrap(); + let _proxy_result = proxy_provider + .request::<_, Option>("eth_getBlockByNumber", ("latest", false)) + .await + .unwrap() + .unwrap(); + + for _ in 1..number_requests { + let url = (&x).proxy_provider.url().clone(); + let user_secret_key = (&user_rpc_keys).secret_key.clone(); + handles.push(tokio::spawn(async move { + let proxy_endpoint = format!("{}rpc/{}", url, user_secret_key); + let proxy_provider = Provider::::try_from(proxy_endpoint).unwrap(); + let _proxy_result = proxy_provider + .request::<_, Option>("eth_getBlockByNumber", ("latest", false)) + .await + .unwrap() + .unwrap(); + _proxy_result + })); + let url = (&x).proxy_provider.url().clone(); + let referrer_secret_key = (&referre_rpc_keys).secret_key.clone(); + handles.push(tokio::spawn(async move { + let proxy_endpoint = format!("{}rpc/{}", url, referrer_secret_key); + let proxy_provider = Provider::::try_from(proxy_endpoint).unwrap(); + let _proxy_result = proxy_provider + .request::<_, Option>("eth_getBlockByNumber", ("latest", false)) + .await + .unwrap() + .unwrap(); + _proxy_result + })); + } + + let mut results = Vec::with_capacity(handles.len()); + for handle in handles { + results.push(handle.await.unwrap()); + } + + // Flush all stats here + let (influx_count, mysql_count) = x.flush_stats().await.unwrap(); + assert_eq!(influx_count, 0); + assert!(mysql_count > 0); + + // Check that at least something was earned: + let shared_referral_code: UserSharedReferralInfo = + get_shared_referral_codes(&x, &r, &referrer_login_response).await; + info!("Referral code"); + info!("{:?}", shared_referral_code.referrals.get(0).unwrap()); + + // We make sure that the referrer has $10 + 10% of the used balance + // The admin provides credits for both + let user_balance_response = user_get_balance(&x, &r, &user_login_response).await; + let user_balance_post = + Decimal::from_str(user_balance_response["balance"].as_str().unwrap()).unwrap(); + let referrer_balance_response = user_get_balance(&x, &r, &referrer_login_response).await; + let referrer_balance_post = + Decimal::from_str(referrer_balance_response["balance"].as_str().unwrap()).unwrap(); + + info!( + "Balances before and after are (user): {:?} {:?}", + user_balance_pre, user_balance_post + ); + info!( + "Balances before and after are (referrer): {:?} {:?}", + referrer_balance_pre, referrer_balance_post + ); + + let user_difference = user_balance_pre - user_balance_post; + let referrer_difference = referrer_balance_pre - referrer_balance_post; + + // Make sure that the pre and post balance is not the same (i.e. some change has occurred) + assert_ne!( + user_balance_pre, user_balance_post, + "Pre and post balance is equivalent" + ); + assert!(user_balance_pre > user_balance_post); + assert!(referrer_balance_pre > referrer_balance_post); + assert!(user_difference > referrer_difference); + + // In this case, the referrer loses as much as the user spends (because exact same operations), but gains 10% of what the user has spent + // I'm also adding 19.9992160000 - 19.9992944000 = 0.0000784 as this is the non-cached requests cost on top (for the user) + // Because of 1 cached request, we basically give one unit tolerance + assert_eq!( + (referrer_balance_pre - user_difference + user_difference / Decimal::from(10)), + referrer_balance_post + ); +}