David/remove balance logic non destructive (#173)

* WIP will work on locks

* added concurrent test for referral, numbers rn still add up

* will change balance table to be a view instead of a table

* merged in devel. will proceed with migrations for rpc_stats_v2 and referral

* WIP

* WIP

* WIP compiling

* gotta add the balance_v2 view

* about to add balance view

* stupid missing closing param

* compiles again, now i will just have to implement the raw sql

* need to paste the real query and play with it

* ok now on to testing

* addresses many comments in PR

* app works for simple unauthorized access. will look into tests

* some tests pass, referral logic tests fail

* not sure why the test is failing

---------

Co-authored-by: yenicelik <david.yenicelik@gmail.com>
This commit is contained in:
Bryan Stitt 2023-07-09 19:23:32 -07:00 committed by GitHub
parent 386363e295
commit b527f5d0d6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 886 additions and 643 deletions

@ -8,7 +8,8 @@ use serde::{Deserialize, Serialize};
pub struct Model { pub struct Model {
#[sea_orm(primary_key)] #[sea_orm(primary_key)]
pub id: i32, pub id: i32,
pub credits_applied_for_referee: bool, #[sea_orm(column_type = "Decimal(Some((20, 10)))")]
pub one_time_bonus_applied_for_referee: Decimal,
#[sea_orm(column_type = "Decimal(Some((20, 10)))")] #[sea_orm(column_type = "Decimal(Some((20, 10)))")]
pub credits_applied_for_referrer: Decimal, pub credits_applied_for_referrer: Decimal,
pub referral_start_date: DateTime, pub referral_start_date: DateTime,

@ -24,6 +24,8 @@ pub struct Model {
pub sum_response_bytes: u64, pub sum_response_bytes: u64,
#[sea_orm(column_type = "Decimal(Some((20, 10)))")] #[sea_orm(column_type = "Decimal(Some((20, 10)))")]
pub sum_credits_used: Decimal, pub sum_credits_used: Decimal,
#[sea_orm(column_type = "Decimal(Some((20, 10)))")]
pub sum_incl_free_credits_used: Decimal,
} }
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]

28
example.sql Normal file

@ -0,0 +1,28 @@
SELECT
user.id AS user_id,
COALESCE(SUM(admin_receipt.amount), 0) + COALESCE(SUM(chain_receipt.amount), 0) + COALESCE(SUM(stripe_receipt.amount), 0) + COALESCE(SUM(referee.one_time_bonus_applied_for_referee), 0) + COALESCE(referrer_bonus.total_bonus, 0) AS total_deposits,
COALESCE(SUM(accounting.sum_credits_used), 0) AS total_spent_including_free_tier,
COALESCE(SUM(accounting.sum_incl_free_credits_used), 0) AS total_spent_outside_free_tier
FROM
user
LEFT JOIN
admin_increase_balance_receipt AS admin_receipt ON user.id = admin_receipt.deposit_to_user_id
LEFT JOIN
increase_on_chain_balance_receipt AS chain_receipt ON user.id = chain_receipt.deposit_to_user_id
LEFT JOIN
stripe_increase_balance_receipt AS stripe_receipt ON user.id = stripe_receipt.deposit_to_user_id
LEFT JOIN
referee ON user.id = referee.user_id
LEFT JOIN
(SELECT referrer.user_id, SUM(referee.credits_applied_for_referrer) AS total_bonus
FROM referrer
JOIN referee ON referrer.id = referee.used_referral_code
GROUP BY referrer.user_id) AS referrer_bonus ON user.id = referrer_bonus.user_id
LEFT JOIN
rpc_key ON user.id = rpc_key.user_id
LEFT JOIN
rpc_accounting_v2 AS accounting ON rpc_key.id = accounting.rpc_key_id
LEFT JOIN
user_tier ON user.user_tier_id = user_tier.id
WHERE
user.id = 1;

@ -35,6 +35,8 @@ mod m20230619_172237_default_tracking;
mod m20230622_104142_stripe_deposits; mod m20230622_104142_stripe_deposits;
mod m20230705_214013_type_fixes; mod m20230705_214013_type_fixes;
mod m20230707_211936_premium_tier_changes; mod m20230707_211936_premium_tier_changes;
mod m20230708_151756_rpc_accounting_free_usage_credits;
mod m20230708_152131_referral_track_one_time_bonus_bonus;
pub struct Migrator; pub struct Migrator;
@ -77,6 +79,8 @@ impl MigratorTrait for Migrator {
Box::new(m20230622_104142_stripe_deposits::Migration), Box::new(m20230622_104142_stripe_deposits::Migration),
Box::new(m20230705_214013_type_fixes::Migration), Box::new(m20230705_214013_type_fixes::Migration),
Box::new(m20230707_211936_premium_tier_changes::Migration), Box::new(m20230707_211936_premium_tier_changes::Migration),
Box::new(m20230708_151756_rpc_accounting_free_usage_credits::Migration),
Box::new(m20230708_152131_referral_track_one_time_bonus_bonus::Migration),
] ]
} }
} }

@ -0,0 +1,41 @@
use sea_orm_migration::prelude::*;
#[derive(DeriveMigrationName)]
pub struct Migration;
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.alter_table(
Table::alter()
.table(RpcAccountingV2::Table)
.add_column(
ColumnDef::new(RpcAccountingV2::SumInclFreeCreditsUsed)
.decimal_len(20, 10)
.default("0.0")
.not_null(),
)
.to_owned(),
)
.await
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.alter_table(
sea_query::Table::alter()
.table(RpcAccountingV2::Table)
.drop_column(RpcAccountingV2::SumInclFreeCreditsUsed)
.to_owned(),
)
.await
}
}
/// Learn more at https://docs.rs/sea-query#iden
#[derive(Iden)]
enum RpcAccountingV2 {
Table,
SumInclFreeCreditsUsed,
}

@ -0,0 +1,49 @@
use sea_orm_migration::prelude::*;
#[derive(DeriveMigrationName)]
pub struct Migration;
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.alter_table(
Table::alter()
.table(Referee::Table)
.drop_column(Referee::CreditsAppliedForReferee)
.add_column(
ColumnDef::new(Referee::OneTimeBonusAppliedForReferee)
.decimal_len(20, 10)
.default("0.0")
.not_null(),
)
.to_owned(),
)
.await
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.alter_table(
Table::alter()
.table(Referee::Table)
.drop_column(Referee::OneTimeBonusAppliedForReferee)
.add_column(
ColumnDef::new(Referee::CreditsAppliedForReferee)
.boolean()
.not_null(),
)
.to_owned(),
)
.await
}
}
/// Learn more at https://docs.rs/sea-query#iden
#[derive(Iden)]
enum Referee {
Table,
Id,
CreditsAppliedForReferee,
OneTimeBonusAppliedForReferee,
}

@ -24,14 +24,14 @@ curl -X POST http://127.0.0.1:8544/user/login \
-H 'Content-Type: application/json' \ -H 'Content-Type: application/json' \
-d '{ -d '{
"address": "0xeb3e928a2e54be013ef8241d4c9eaf4dfae94d5a", "address": "0xeb3e928a2e54be013ef8241d4c9eaf4dfae94d5a",
"msg": "0x6c6c616d616e6f6465732e636f6d2077616e747320796f7520746f207369676e20696e207769746820796f757220457468657265756d206163636f756e743a0a3078654233453932384132453534424530313345463832343164344339456146344466414539344435610a0af09fa699f09fa699f09fa699f09fa699f09fa6990a0a5552493a2068747470733a2f2f6c6c616d616e6f6465732e636f6d2f0a56657273696f6e3a20310a436861696e2049443a20310a4e6f6e63653a2030314759513445564731474b34314b42364130324a344b45384b0a4973737565642041743a20323032332d30342d32335431333a32323a30392e3533373932365a0a45787069726174696f6e2054696d653a20323032332d30342d32335431333a34323a30392e3533373932365a", "msg": "0x6c6c616d616e6f6465732e636f6d2077616e747320796f7520746f207369676e20696e207769746820796f757220457468657265756d206163636f756e743a0a3078654233453932384132453534424530313345463832343164344339456146344466414539344435610a0af09fa699f09fa699f09fa699f09fa699f09fa6990a0a5552493a2068747470733a2f2f6c6c616d616e6f6465732e636f6d2f0a56657273696f6e3a20310a436861696e2049443a203133370a4e6f6e63653a2030314834594e564b4b4a4e385737474e4b585643364137505a5a0a4973737565642041743a20323032332d30372d31305430313a31353a30352e3230323638375a0a45787069726174696f6e2054696d653a20323032332d30372d31305430313a33353a30352e3230323638375a",
"sig": "52071cc59afb427eb554126f4f9f2a445c2a539783ba45079ccc0911197062f135d6d347cf0c38fa078dc2369c32b5131b86811fc0916786d1e48252163f58131c", "sig": "de9a7c024b9fac2b46e01a5397a74b1c479f2c2fd72f674133c3b6b3e5e569981d3f9d5551fe73c52e70173b2ea82c8bc0614b538f5c74179c4cbaa2bdab3f8e1b",
"version": "3", "version": "3",
"signer": "MEW" "signer": "MEW"
}' }'
# bearer token is: 01GYQ4FMRKKWJEA2YBST3B89MJ # bearer token is: 01H4YP4AW35DBMW9CXJXTE7MBA
# scret key is: 01GYQ4FMNX9EMFBT43XEFGZV1K # scret key is: 01H4YP4AVSZGZT0WXCSMMZ1MEH
########################################### ###########################################
# Initially check balance, it should be 0 # Initially check balance, it should be 0
@ -39,7 +39,7 @@ curl -X POST http://127.0.0.1:8544/user/login \
# Check the balance of the user # Check the balance of the user
# Balance seems to be returning properly (0, in this test case) # Balance seems to be returning properly (0, in this test case)
curl \ curl \
-H "Authorization: Bearer 01GYQ4FMRKKWJEA2YBST3B89MJ" \ -H "Authorization: Bearer 01H4YP4AW35DBMW9CXJXTE7MBA" \
-X GET "127.0.0.1:8544/user/balance" -X GET "127.0.0.1:8544/user/balance"
@ -48,8 +48,8 @@ curl \
# and submits it on the endpoint # and submits it on the endpoint
########################################### ###########################################
curl \ curl \
-H "Authorization: Bearer 01GYQ4FMRKKWJEA2YBST3B89MJ" \ -H "Authorization: Bearer 01H4YP4AW35DBMW9CXJXTE7MBA" \
-X GET "127.0.0.1:8544/user/balance/0x749788a5766577431a0a4fc8721fd7cb981f55222e073ed17976f0aba5e8818a" -X POST "127.0.0.1:8544/user/balance/0x749788a5766577431a0a4fc8721fd7cb981f55222e073ed17976f0aba5e8818a"
########################################### ###########################################
# Check the balance again, it should have increased according to how much USDC was spent # Check the balance again, it should have increased according to how much USDC was spent
@ -57,32 +57,20 @@ curl \
# Check the balance of the user # Check the balance of the user
# Balance seems to be returning properly (0, in this test case) # Balance seems to be returning properly (0, in this test case)
curl \ curl \
-H "Authorization: Bearer 01GYQ4FMRKKWJEA2YBST3B89MJ" \ -H "Authorization: Bearer 01H4YP4AW35DBMW9CXJXTE7MBA" \
-X GET "127.0.0.1:8544/user/balance" -X GET "127.0.0.1:8544/user/balance"
# Get the RPC key # Get the RPC key
curl \ curl \
-X GET "127.0.0.1:8544/user/keys" \ -X GET "127.0.0.1:8544/user/keys" \
-H "Authorization: Bearer 01GYQ4FMRKKWJEA2YBST3B89MJ" -H "Authorization: Bearer 01H4YP4AW35DBMW9CXJXTE7MBA"
## Check if calling an RPC endpoint logs the stats ## Check if calling an RPC endpoint logs the stats
## This one does already even it seems ## This one does already even it seems
for i in {1..100000} for i in {1..100000}
do do
curl \ curl \
-X POST "127.0.0.1:8544/rpc/01GZHMCXGXT5Z4M8SCKCMKDAZ6" \ -X POST "127.0.0.1:8544/rpc/01H4YP4AVSZGZT0WXCSMMZ1MEH" \
-H "Content-Type: application/json" \ -H "Content-Type: application/json" \
--data '{"method":"eth_blockNumber","params":[],"id":1,"jsonrpc":"2.0"}' --data '{"method":"eth_blockNumber","params":[],"id":1,"jsonrpc":"2.0"}'
done done
for i in {1..10000}
do
curl \
-X POST "127.0.0.1:8544/" \
-H "Content-Type: application/json" \
--data '{"method":"eth_blockNumber","params":[],"id":1,"jsonrpc":"2.0"}'
done
# TODO: Now implement and test withdrawal

83
web3_proxy/src/balance.rs Normal file

@ -0,0 +1,83 @@
use crate::errors::{Web3ProxyResponse, Web3ProxyResult};
use fstrings::{f, format_args_f};
use migration::sea_orm::{
DbBackend, DbConn, FromQueryResult, JsonValue, QueryResult, SqlxMySqlPoolConnection, Statement,
};
use migration::{sea_orm, ConnectionTrait};
use rust_decimal::Decimal;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::num::NonZeroU64;
use tracing::info;
/// Implements the balance getter
#[derive(Clone, Debug, Default, Serialize, Deserialize, FromQueryResult)]
pub struct Balance {
pub user_id: u64,
pub total_spent_paid_credits: Decimal,
pub total_spent: Decimal,
pub total_deposits: Decimal,
}
impl Balance {
pub fn remaining(&self) -> Decimal {
self.total_deposits - self.total_spent_paid_credits
}
}
pub async fn try_get_balance_from_db(
db_conn: &DbConn,
user_id: u64,
) -> Web3ProxyResult<Option<Balance>> {
// Return early if user_id == 0
if user_id == 0 {
return Ok(None);
}
// Injecting the variable directly, should be fine because Rust is typesafe, especially with primitives
let raw_sql = f!(r#"
SELECT
user.id AS user_id,
COALESCE(SUM(admin_receipt.amount), 0) + COALESCE(SUM(chain_receipt.amount), 0) + COALESCE(SUM(stripe_receipt.amount), 0) + COALESCE(SUM(referee.one_time_bonus_applied_for_referee), 0) + COALESCE(referrer_bonus.total_bonus, 0) AS total_deposits,
COALESCE(SUM(accounting.sum_credits_used), 0) AS total_spent_paid_credits,
COALESCE(SUM(accounting.sum_incl_free_credits_used), 0) AS total_spent
FROM
user
LEFT JOIN
admin_increase_balance_receipt AS admin_receipt ON user.id = admin_receipt.deposit_to_user_id
LEFT JOIN
increase_on_chain_balance_receipt AS chain_receipt ON user.id = chain_receipt.deposit_to_user_id
LEFT JOIN
stripe_increase_balance_receipt AS stripe_receipt ON user.id = stripe_receipt.deposit_to_user_id
LEFT JOIN
referee ON user.id = referee.user_id
LEFT JOIN
(SELECT referrer.user_id, SUM(referee.credits_applied_for_referrer) AS total_bonus
FROM referrer
JOIN referee ON referrer.id = referee.used_referral_code
GROUP BY referrer.user_id) AS referrer_bonus ON user.id = referrer_bonus.user_id
LEFT JOIN
rpc_key ON user.id = rpc_key.user_id
LEFT JOIN
rpc_accounting_v2 AS accounting ON rpc_key.id = accounting.rpc_key_id
LEFT JOIN
user_tier ON user.user_tier_id = user_tier.id
WHERE
user.id = {};
"#, user_id).to_string();
let balance: Balance = match Balance::find_by_statement(Statement::from_string(
DbBackend::MySql,
raw_sql,
// [.into()],
))
.one(db_conn)
.await?
{
None => return Ok(None),
Some(x) => x,
};
// Return None if there is no entry
Ok(Some(balance))
}

@ -1,15 +1,16 @@
use crate::frontend::authorization::{AuthorizationChecks, Balance, RpcSecretKey}; use crate::balance::Balance;
use crate::frontend::authorization::{AuthorizationChecks, RpcSecretKey};
use moka::future::Cache; use moka::future::Cache;
use parking_lot::RwLock;
use std::fmt; use std::fmt;
use std::net::IpAddr; use std::net::IpAddr;
use std::num::NonZeroU64; use std::num::NonZeroU64;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::RwLock as AsyncRwLock;
/// Cache data from the database about rpc keys /// Cache data from the database about rpc keys
pub type RpcSecretKeyCache = Cache<RpcSecretKey, AuthorizationChecks>; pub type RpcSecretKeyCache = Cache<RpcSecretKey, AuthorizationChecks>;
/// Cache data from the database about user balances /// Cache data from the database about user balances
pub type UserBalanceCache = Cache<NonZeroU64, Arc<RwLock<Balance>>>; pub type UserBalanceCache = Cache<u64, Arc<AsyncRwLock<Balance>>>;
#[derive(Clone, Copy, Hash, Eq, PartialEq)] #[derive(Clone, Copy, Hash, Eq, PartialEq)]
pub struct RegisteredUserRateLimitKey(pub u64, pub IpAddr); pub struct RegisteredUserRateLimitKey(pub u64, pub IpAddr);

@ -6,6 +6,7 @@ use ethers::prelude::{Address, TxHash};
use ethers::types::{U256, U64}; use ethers::types::{U256, U64};
use hashbrown::HashMap; use hashbrown::HashMap;
use migration::sea_orm::DatabaseConnection; use migration::sea_orm::DatabaseConnection;
use rust_decimal::Decimal;
use sentry::types::Dsn; use sentry::types::Dsn;
use serde::Deserialize; use serde::Deserialize;
use serde_inline_default::serde_inline_default; use serde_inline_default::serde_inline_default;
@ -70,6 +71,9 @@ pub struct AppConfig {
#[serde_inline_default(1u64)] #[serde_inline_default(1u64)]
pub chain_id: u64, pub chain_id: u64,
/// Cost per computational unit
// pub cost_per_cu: Decimal,
/// Database is used for user data. /// Database is used for user data.
/// Currently supports mysql or compatible backend. /// Currently supports mysql or compatible backend.
pub db_url: Option<String>, pub db_url: Option<String>,

@ -3,6 +3,7 @@
use super::authorization::login_is_authorized; use super::authorization::login_is_authorized;
use crate::admin_queries::query_admin_modify_usertier; use crate::admin_queries::query_admin_modify_usertier;
use crate::app::Web3ProxyApp; use crate::app::Web3ProxyApp;
use crate::caches::UserBalanceCache;
use crate::errors::Web3ProxyResponse; use crate::errors::Web3ProxyResponse;
use crate::errors::{Web3ProxyError, Web3ProxyErrorContext}; use crate::errors::{Web3ProxyError, Web3ProxyErrorContext};
use crate::frontend::users::authentication::PostLogin; use crate::frontend::users::authentication::PostLogin;
@ -83,28 +84,11 @@ pub async fn admin_increase_balance(
..Default::default() ..Default::default()
}; };
increase_balance_receipt.save(&txn).await?; increase_balance_receipt.save(&txn).await?;
// update balance
let balance_entry = balance::ActiveModel {
total_deposits: sea_orm::Set(payload.amount),
user_id: sea_orm::Set(user_entry.id),
..Default::default()
};
balance::Entity::insert(balance_entry)
.on_conflict(
OnConflict::new()
.values([(
balance::Column::TotalDeposits,
Expr::col(balance::Column::TotalDeposits).add(payload.amount),
)])
.to_owned(),
)
.exec(&txn)
.await
.web3_context("admin is increasing balance")?;
txn.commit().await?; txn.commit().await?;
// Invalidate the user_balance_cache for this user:
app.user_balance_cache.invalidate(&user_entry.id).await;
let out = json!({ let out = json!({
"user": payload.user_address, "user": payload.user_address,
"amount": payload.amount, "amount": payload.amount,

@ -2,6 +2,7 @@
use super::rpc_proxy_ws::ProxyMode; use super::rpc_proxy_ws::ProxyMode;
use crate::app::{Web3ProxyApp, APP_USER_AGENT}; use crate::app::{Web3ProxyApp, APP_USER_AGENT};
use crate::balance::{try_get_balance_from_db, Balance};
use crate::caches::RegisteredUserRateLimitKey; use crate::caches::RegisteredUserRateLimitKey;
use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult}; use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult};
use crate::jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest}; use crate::jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest};
@ -42,6 +43,7 @@ use std::num::NonZeroU64;
use std::sync::atomic::{self, AtomicBool, AtomicI64, AtomicU64, AtomicUsize}; use std::sync::atomic::{self, AtomicBool, AtomicI64, AtomicU64, AtomicUsize};
use std::time::Duration; use std::time::Duration;
use std::{net::IpAddr, str::FromStr, sync::Arc}; use std::{net::IpAddr, str::FromStr, sync::Arc};
use tokio::sync::RwLock as AsyncRwLock;
use tokio::sync::{OwnedSemaphorePermit, Semaphore}; use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use tokio::time::Instant; use tokio::time::Instant;
@ -93,20 +95,6 @@ pub enum AuthorizationType {
Frontend, Frontend,
} }
#[derive(Clone, Debug, Default)]
pub struct Balance {
/// The total USD value deposited.
pub total_deposit: Decimal,
/// The amount spent outside the free tier.
pub total_spend: Decimal,
}
impl Balance {
pub fn remaining(&self) -> Decimal {
self.total_deposit - self.total_spend
}
}
/// TODO: move this /// TODO: move this
#[derive(Clone, Debug, Default, From)] #[derive(Clone, Debug, Default, From)]
pub struct AuthorizationChecks { pub struct AuthorizationChecks {
@ -115,7 +103,7 @@ pub struct AuthorizationChecks {
/// TODO: `Option<NonZeroU64>`? they are actual zeroes some places in the db now /// TODO: `Option<NonZeroU64>`? they are actual zeroes some places in the db now
pub user_id: u64, pub user_id: u64,
/// locally cached balance that may drift slightly if the user is on multiple servers /// locally cached balance that may drift slightly if the user is on multiple servers
pub latest_balance: Arc<RwLock<Balance>>, pub latest_balance: Arc<AsyncRwLock<Balance>>,
/// the key used (if any) /// the key used (if any)
pub rpc_secret_key: Option<RpcSecretKey>, pub rpc_secret_key: Option<RpcSecretKey>,
/// database id of the rpc key /// database id of the rpc key
@ -380,7 +368,7 @@ impl RequestMetadata {
/// this may drift slightly if multiple servers are handling the same users, but should be close /// this may drift slightly if multiple servers are handling the same users, but should be close
pub async fn latest_balance(&self) -> Option<Decimal> { pub async fn latest_balance(&self) -> Option<Decimal> {
if let Some(x) = self.authorization.as_ref() { if let Some(x) = self.authorization.as_ref() {
let x = x.checks.latest_balance.read().remaining(); let x = x.checks.latest_balance.read().await.remaining();
Some(x) Some(x)
} else { } else {
@ -1147,60 +1135,24 @@ impl Web3ProxyApp {
pub(crate) async fn balance_checks( pub(crate) async fn balance_checks(
&self, &self,
user_id: u64, user_id: u64,
) -> Web3ProxyResult<Arc<RwLock<Balance>>> { ) -> Web3ProxyResult<Arc<AsyncRwLock<Balance>>> {
match NonZeroU64::try_from(user_id) { self.user_balance_cache
Err(_) => Ok(Arc::new(Default::default())), .try_get_with(user_id, async move {
Ok(x) => self
.user_balance_cache
.try_get_with(x, async move {
let db_replica = self.db_replica()?; let db_replica = self.db_replica()?;
let x = match crate::balance::try_get_balance_from_db(db_replica.as_ref(), user_id)
loop {
match balance::Entity::find()
.filter(balance::Column::UserId.eq(user_id))
.one(db_replica.as_ref())
.await? .await?
{ {
Some(x) => {
let x = Balance {
total_deposit: x.total_deposits,
total_spend: x.total_spent_outside_free_tier,
};
trace!("Balance for cache retrieved from database is {:?}", x);
return Ok(Arc::new(RwLock::new(x)));
}
None => { None => {
// no balance row. make one now format!("user_id {:?} has no balance entry", user_id).to_owned();
let db_conn = self.db_conn()?; Err(Web3ProxyError::InvalidUserKey)
let balance_entry = balance::ActiveModel {
id: sea_orm::NotSet,
user_id: sea_orm::Set(user_id),
..Default::default()
};
balance::Entity::insert(balance_entry)
.on_conflict(
OnConflict::new()
.values([(
balance::Column::TotalDeposits,
Expr::col(balance::Column::TotalDeposits).add(0),
)])
.to_owned(),
)
.exec(db_conn)
.await
.web3_context("creating empty balance row for existing user")?;
continue;
}
};
} }
Some(x) => Ok(x),
}?;
trace!("Balance for cache retrieved from database is {:?}", x);
Ok(Arc::new(AsyncRwLock::new(x)))
}) })
.await .await
.map_err(Into::into), .map_err(Into::into)
}
} }
// check the local cache for user data, or query the database // check the local cache for user data, or query the database
@ -1308,11 +1260,11 @@ impl Web3ProxyApp {
// TODO: Do the logic here, as to how to treat the user, based on balance and initial check // TODO: Do the logic here, as to how to treat the user, based on balance and initial check
// Clear the cache (not the login!) in the stats if a tier-change happens (clear, but don't modify roles) // Clear the cache (not the login!) in the stats if a tier-change happens (clear, but don't modify roles)
if let Some(downgrade_user_tier) = user_tier_model.downgrade_tier_id { if let Some(downgrade_user_tier) = user_tier_model.downgrade_tier_id {
let balance = latest_balance.read().clone(); let balance = latest_balance.read().await.clone();
// only consider the user premium if they have paid at least $10 and have a balance > $.01 // only consider the user premium if they have paid at least $10 and have a balance > $.01
// otherwise, set user_tier_model to the downograded tier // otherwise, set user_tier_model to the downograded tier
if balance.total_deposit < Decimal::from(10) if balance.total_deposits < Decimal::from(10)
|| balance.remaining() < Decimal::new(1, 2) || balance.remaining() < Decimal::new(1, 2)
{ {
// TODO: include boolean to mark that the user is downgraded // TODO: include boolean to mark that the user is downgraded

@ -189,7 +189,7 @@ pub async fn user_login_get(
pub async fn register_new_user( pub async fn register_new_user(
txn: &DatabaseTransaction, txn: &DatabaseTransaction,
address: Address, address: Address,
) -> anyhow::Result<(user::Model, rpc_key::Model, balance::Model)> { ) -> anyhow::Result<(user::Model, rpc_key::Model)> {
// the only thing we need from them is an address // the only thing we need from them is an address
// everything else is optional // everything else is optional
// TODO: different invite codes should allow different levels // TODO: different invite codes should allow different levels
@ -217,15 +217,7 @@ pub async fn register_new_user(
.await .await
.web3_context("Failed saving new user key")?; .web3_context("Failed saving new user key")?;
// create an empty balance entry Ok((new_user, user_rpc_key))
let user_balance = balance::ActiveModel {
user_id: sea_orm::Set(new_user.id),
..Default::default()
};
let user_balance = user_balance.insert(txn).await?;
Ok((new_user, user_rpc_key, user_balance))
} }
/// `POST /user/login` - Register or login by posting a signed "siwe" message. /// `POST /user/login` - Register or login by posting a signed "siwe" message.
@ -323,7 +315,7 @@ pub async fn user_login_post(
let txn = db_conn.begin().await?; let txn = db_conn.begin().await?;
let (caller, caller_key, _) = register_new_user(&txn, our_msg.address.into()).await?; let (caller, caller_key) = register_new_user(&txn, our_msg.address.into()).await?;
txn.commit().await?; txn.commit().await?;
@ -347,7 +339,7 @@ pub async fn user_login_post(
let used_referral = referee::ActiveModel { let used_referral = referee::ActiveModel {
used_referral_code: sea_orm::Set(user_referrer.id), used_referral_code: sea_orm::Set(user_referrer.id),
user_id: sea_orm::Set(caller.id), user_id: sea_orm::Set(caller.id),
credits_applied_for_referee: sea_orm::Set(false), one_time_bonus_applied_for_referee: sea_orm::Set(Decimal::new(0, 10)),
credits_applied_for_referrer: sea_orm::Set(Decimal::new(0, 10)), credits_applied_for_referrer: sea_orm::Set(Decimal::new(0, 10)),
..Default::default() ..Default::default()
}; };
@ -380,7 +372,7 @@ pub async fn user_login_post(
let used_referral = referee::ActiveModel { let used_referral = referee::ActiveModel {
used_referral_code: sea_orm::Set(user_referrer.id), used_referral_code: sea_orm::Set(user_referrer.id),
user_id: sea_orm::Set(caller.id), user_id: sea_orm::Set(caller.id),
credits_applied_for_referee: sea_orm::Set(false), one_time_bonus_applied_for_referee: sea_orm::Set(Decimal::new(0, 10)),
credits_applied_for_referrer: sea_orm::Set(Decimal::new(0, 10)), credits_applied_for_referrer: sea_orm::Set(Decimal::new(0, 10)),
..Default::default() ..Default::default()
}; };

@ -1,4 +1,5 @@
use crate::app::Web3ProxyApp; use crate::app::Web3ProxyApp;
use crate::balance::{try_get_balance_from_db, Balance};
use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResponse, Web3ProxyResult}; use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResponse, Web3ProxyResult};
use crate::frontend::authorization::{ use crate::frontend::authorization::{
login_is_authorized, Authorization as Web3ProxyAuthorization, login_is_authorized, Authorization as Web3ProxyAuthorization,
@ -21,17 +22,18 @@ use ethers::abi::AbiEncode;
use ethers::types::{Address, Block, TransactionReceipt, TxHash, H256}; use ethers::types::{Address, Block, TransactionReceipt, TxHash, H256};
use hashbrown::{HashMap, HashSet}; use hashbrown::{HashMap, HashSet};
use http::StatusCode; use http::StatusCode;
use migration::LockType;
use migration::sea_orm::prelude::Decimal; use migration::sea_orm::prelude::Decimal;
use migration::sea_orm::{ use migration::sea_orm::{
self, ActiveModelTrait, ActiveValue, ColumnTrait, EntityTrait, IntoActiveModel, ModelTrait, self, ActiveModelTrait, ActiveValue, ColumnTrait, EntityTrait, IntoActiveModel, ModelTrait,
QueryFilter, QuerySelect, TransactionTrait, QueryFilter, QuerySelect, TransactionTrait,
}; };
use migration::LockType;
use migration::{Expr, OnConflict}; use migration::{Expr, OnConflict};
use payment_contracts::ierc20::IERC20; use payment_contracts::ierc20::IERC20;
use payment_contracts::payment_factory::{self, PaymentFactory}; use payment_contracts::payment_factory::{self, PaymentFactory};
use rdkafka::bindings::rd_kafka_AclBinding_destroy;
use serde_json::json; use serde_json::json;
use std::num::NonZeroU64; use std::num::{NonZeroU64, TryFromIntError};
use std::sync::Arc; use std::sync::Arc;
use tracing::{debug, info, trace}; use tracing::{debug, info, trace};
@ -51,20 +53,16 @@ pub async fn user_balance_get(
let db_replica = app.db_replica()?; let db_replica = app.db_replica()?;
let user_balance_row = balance::Entity::find() let user_balance = match try_get_balance_from_db(db_replica.as_ref(), user.id).await? {
.filter(balance::Column::UserId.eq(user.id)) None => Balance::default(),
.one(db_replica.as_ref()) Some(x) => x,
.await? };
.unwrap_or_default();
let user_balance =
user_balance_row.total_deposits - user_balance_row.total_spent_outside_free_tier;
let response = json!({ let response = json!({
"total_deposits": user_balance_row.total_deposits, "total_deposits": user_balance.total_deposits,
"total_spent_outside_free_tier": user_balance_row.total_spent_outside_free_tier, "total_spent_paid_credits": user_balance.total_spent_paid_credits,
"total_spent": user_balance_row.total_spent_including_free_tier, "total_spent": user_balance.total_spent,
"balance": user_balance, "balance": user_balance.remaining(),
}); });
// TODO: Gotta create a new table for the spend part // TODO: Gotta create a new table for the spend part
@ -374,7 +372,7 @@ pub async fn user_balance_post(
{ {
Some(x) => x, Some(x) => x,
None => { None => {
let (user, _, _) = register_new_user(&txn, recipient_account).await?; let (user, _) = register_new_user(&txn, recipient_account).await?;
user user
} }
@ -391,27 +389,6 @@ pub async fn user_balance_post(
payment_token_amount payment_token_amount
); );
// create or update the balance
let balance_entry = balance::ActiveModel {
id: sea_orm::NotSet,
total_deposits: sea_orm::Set(payment_token_amount),
user_id: sea_orm::Set(recipient.id),
..Default::default()
};
trace!("Trying to insert into balance entry: {:?}", balance_entry);
balance::Entity::insert(balance_entry)
.on_conflict(
OnConflict::new()
.values([(
balance::Column::TotalDeposits,
Expr::col(balance::Column::TotalDeposits).add(payment_token_amount),
)])
.to_owned(),
)
.exec(&txn)
.await
.web3_context("increasing balance")?;
trace!("Saving log {} of txid {:?}", log_index, tx_hash); trace!("Saving log {} of txid {:?}", log_index, tx_hash);
let receipt = increase_on_chain_balance_receipt::ActiveModel { let receipt = increase_on_chain_balance_receipt::ActiveModel {
id: sea_orm::ActiveValue::NotSet, id: sea_orm::ActiveValue::NotSet,
@ -433,12 +410,7 @@ pub async fn user_balance_post(
.all(&txn) .all(&txn)
.await?; .await?;
match NonZeroU64::try_from(recipient.id) { app.user_balance_cache.invalidate(&recipient.id).await;
Err(_) => {}
Ok(x) => {
app.user_balance_cache.invalidate(&x).await;
}
};
for rpc_key_entity in rpc_keys { for rpc_key_entity in rpc_keys {
app.rpc_secret_key_cache app.rpc_secret_key_cache
@ -544,22 +516,6 @@ pub async fn handle_uncle_block(
debug!("removing balances: {:#?}", reversed_balances); debug!("removing balances: {:#?}", reversed_balances);
for (user_id, reversed_balance) in reversed_balances.iter() {
if let Some(user_balance) = balance::Entity::find()
.lock(LockType::Update)
.filter(balance::Column::Id.eq(*user_id))
.one(&txn)
.await?
{
let mut user_balance = user_balance.into_active_model();
user_balance.total_deposits =
ActiveValue::Set(user_balance.total_deposits.as_ref() - reversed_balance);
user_balance.update(&txn).await?;
}
}
txn.commit().await?; txn.commit().await?;
Ok(Some(reversed_balances)) Ok(Some(reversed_balances))

@ -163,27 +163,6 @@ pub async fn user_balance_stripe_post(
// Otherwise, also increase the balance ... // Otherwise, also increase the balance ...
match recipient { match recipient {
Some(recipient) => { Some(recipient) => {
// Create a balance update as well
let balance_entry = balance::ActiveModel {
id: sea_orm::NotSet,
total_deposits: sea_orm::Set(amount),
user_id: sea_orm::Set(recipient.id),
..Default::default()
};
trace!(?balance_entry, "Trying to insert into balance entry");
balance::Entity::insert(balance_entry)
.on_conflict(
OnConflict::new()
.values([(
balance::Column::TotalDeposits,
Expr::col(balance::Column::TotalDeposits).add(amount),
)])
.to_owned(),
)
.exec(&txn)
.await
.web3_context("increasing balance")?;
let _ = insert_receipt_model.save(&txn).await; let _ = insert_receipt_model.save(&txn).await;
let recipient_rpc_keys = rpc_key::Entity::find() let recipient_rpc_keys = rpc_key::Entity::find()
@ -194,12 +173,7 @@ pub async fn user_balance_stripe_post(
txn.commit().await?; txn.commit().await?;
// Finally invalidate the cache as well // Finally invalidate the cache as well
match NonZeroU64::try_from(recipient.id) { app.user_balance_cache.invalidate(&recipient.id).await;
Err(_) => {}
Ok(x) => {
app.user_balance_cache.invalidate(&x).await;
}
};
for rpc_key_entity in recipient_rpc_keys { for rpc_key_entity in recipient_rpc_keys {
app.rpc_secret_key_cache app.rpc_secret_key_cache
.invalidate(&rpc_key_entity.secret_key.into()) .invalidate(&rpc_key_entity.secret_key.into())

@ -92,7 +92,7 @@ pub async fn user_used_referral_stats(
// For each related referral person, find the corresponding user-address // For each related referral person, find the corresponding user-address
#[derive(Debug, Serialize)] #[derive(Debug, Serialize)]
struct Info { struct Info {
credits_applied_for_referee: bool, credits_applied_for_referee: Decimal,
credits_applied_for_referrer: Decimal, credits_applied_for_referrer: Decimal,
referral_start_date: DateTime, referral_start_date: DateTime,
used_referral_code: String, used_referral_code: String,
@ -109,7 +109,7 @@ pub async fn user_used_referral_stats(
// .context("Database error, no foreign key found for referring user")?; // .context("Database error, no foreign key found for referring user")?;
let tmp = Info { let tmp = Info {
credits_applied_for_referee: referral_record.credits_applied_for_referee, credits_applied_for_referee: referral_record.one_time_bonus_applied_for_referee,
credits_applied_for_referrer: referral_record.credits_applied_for_referrer, credits_applied_for_referrer: referral_record.credits_applied_for_referrer,
referral_start_date: referral_record.referral_start_date, referral_start_date: referral_record.referral_start_date,
used_referral_code: referrer_record.referral_code, used_referral_code: referrer_record.referral_code,
@ -150,7 +150,7 @@ pub async fn user_shared_referral_stats(
// collect info about each referral // collect info about each referral
#[derive(Debug, Serialize)] #[derive(Debug, Serialize)]
struct Info { struct Info {
credits_applied_for_referee: bool, credits_applied_for_referee: Decimal,
credits_applied_for_referrer: Decimal, credits_applied_for_referrer: Decimal,
referral_start_date: DateTime, referral_start_date: DateTime,
referred_address: Address, referred_address: Address,
@ -170,7 +170,7 @@ pub async fn user_shared_referral_stats(
.context("Database error, no foreign key found for referring user")?; .context("Database error, no foreign key found for referring user")?;
let info = Info { let info = Info {
credits_applied_for_referee: referral_record.credits_applied_for_referee, credits_applied_for_referee: referral_record.one_time_bonus_applied_for_referee,
credits_applied_for_referrer: referral_record.credits_applied_for_referrer, credits_applied_for_referrer: referral_record.credits_applied_for_referrer,
referral_start_date: referral_record.referral_start_date, referral_start_date: referral_record.referral_start_date,
referred_address: Address::from_slice(&referred_user.address), referred_address: Address::from_slice(&referred_user.address),

@ -284,12 +284,6 @@ pub async fn modify_subuser(
.await .await
.web3_context("Failed saving new user key")?]; .web3_context("Failed saving new user key")?];
// We should also create the balance entry ...
let subuser_balance = balance::ActiveModel {
user_id: sea_orm::Set(subuser.id),
..Default::default()
};
subuser_balance.insert(&txn).await?;
// save the user and key to the database // save the user and key to the database
txn.commit().await?; txn.commit().await?;

@ -3,6 +3,7 @@
pub mod admin_queries; pub mod admin_queries;
pub mod app; pub mod app;
pub mod balance;
pub mod block_number; pub mod block_number;
pub mod caches; pub mod caches;
pub mod compute_units; pub mod compute_units;

@ -1,4 +1,5 @@
use super::StatType; use super::StatType;
use crate::balance::{try_get_balance_from_db, Balance};
use crate::errors::Web3ProxyErrorContext; use crate::errors::Web3ProxyErrorContext;
use crate::{ use crate::{
app::Web3ProxyApp, app::Web3ProxyApp,
@ -61,22 +62,18 @@ pub async fn query_user_stats<'a>(
// TODO: move this to a helper. it should be simple to check that a user has an active premium account // TODO: move this to a helper. it should be simple to check that a user has an active premium account
if let Some(caller_user) = &caller_user { if let Some(caller_user) = &caller_user {
// get the balance of the user whose stats we are going to fetch (might be self, but might be another user) // get the balance of the user whose stats we are going to fetch (might be self, but might be another user)
let (total_deposits, total_spent) = match balance::Entity::find() let balance = match try_get_balance_from_db(db_replica.as_ref(), user_id).await? {
.filter(balance::Column::UserId.eq(user_id)) None => {
.one(db_replica.as_ref()) return Err(Web3ProxyError::AccessDenied(
.await? "User Stats Response requires you to authorize with a bearer token".into(),
{ ));
Some(user_balance) => ( }
user_balance.total_deposits, Some(x) => x,
user_balance.total_spent_outside_free_tier,
),
None => (0.into(), 0.into()),
}; };
let balance_remaining = total_deposits - total_spent;
// TODO: We should add the threshold that determines if a user is premium into app.config. hard coding to $10 works for now // TODO: We should add the threshold that determines if a user is premium into app.config. hard coding to $10 works for now
if total_deposits < Decimal::from(10) || balance_remaining <= Decimal::from(0) { // TODO: @Bryan this condition seems off, can you double-check?
if balance.total_deposits < Decimal::from(10) || balance.remaining() <= Decimal::from(0) {
// get the user tier so we can see if it is a tier that has downgrades // get the user tier so we can see if it is a tier that has downgrades
let relevant_balance_user_tier_id = if user_id == caller_user.id { let relevant_balance_user_tier_id = if user_id == caller_user.id {
caller_user.user_tier_id caller_user.user_tier_id

@ -16,23 +16,31 @@ use axum::headers::Origin;
use chrono::{DateTime, Months, TimeZone, Utc}; use chrono::{DateTime, Months, TimeZone, Utc};
use derive_more::From; use derive_more::From;
use entities::{balance, referee, referrer, rpc_accounting_v2, rpc_key}; use entities::{balance, referee, referrer, rpc_accounting_v2, rpc_key};
use futures::TryFutureExt;
use hyper::body::Buf;
use influxdb2::models::DataPoint; use influxdb2::models::DataPoint;
use migration::sea_orm::prelude::Decimal; use migration::sea_orm::prelude::Decimal;
use migration::sea_orm::{ use migration::sea_orm::{
self, ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter, TransactionTrait, self, ActiveModelTrait, ColumnTrait, DatabaseConnection, DbConn, EntityTrait, IntoActiveModel,
QueryFilter, TransactionTrait,
}; };
use migration::sea_orm::{DatabaseTransaction, QuerySelect}; use migration::sea_orm::{DatabaseTransaction, QuerySelect};
use migration::{Expr, LockType, OnConflict}; use migration::{Expr, LockType, OnConflict};
use num_traits::ToPrimitive; use num_traits::{ToPrimitive, Zero};
use parking_lot::Mutex; use parking_lot::{Mutex, RwLock};
use sentry::User;
use std::borrow::Cow; use std::borrow::Cow;
use std::convert::Infallible;
use std::default::Default;
use std::mem; use std::mem;
use std::num::NonZeroU64; use std::num::{NonZeroU64, TryFromIntError};
use std::str::FromStr; use std::str::FromStr;
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
use std::sync::Arc; use std::sync::Arc;
use tracing::trace; use tokio::sync::RwLock as AsyncRwLock;
use tracing::{error, info, trace};
use crate::balance::{try_get_balance_from_db, Balance};
pub use stat_buffer::{SpawnedStatBuffer, StatBuffer}; pub use stat_buffer::{SpawnedStatBuffer, StatBuffer};
#[derive(Debug, PartialEq, Eq)] #[derive(Debug, PartialEq, Eq)]
@ -168,15 +176,14 @@ impl RpcQueryStats {
} }
} }
#[derive(Debug, Default)] // #[derive(Debug, Default)]
struct Deltas { // struct Deltas {
balance_spent_including_free_credits: Decimal, // balance_spent_including_free_credits: Decimal,
balance_spent_excluding_free_credits: Decimal, // balance_spent_excluding_free_credits: Decimal,
apply_usage_bonus_to_request_sender: bool, // apply_usage_bonus_to_request_sender: bool,
usage_bonus_to_request_sender_through_referral: Decimal, // usage_bonus_to_request_sender_through_referral: Decimal,
// bonus_to_referrer: Decimal,
bonus_to_referrer: Decimal, // }
}
/// A stat that we aggregate and then store in a database. /// A stat that we aggregate and then store in a database.
/// For now there is just one, but I think there might be others later /// For now there is just one, but I think there might be others later
@ -187,7 +194,7 @@ pub enum AppStat {
// TODO: move to stat_buffer.rs? // TODO: move to stat_buffer.rs?
impl BufferedRpcQueryStats { impl BufferedRpcQueryStats {
fn add(&mut self, stat: RpcQueryStats) { async fn add(&mut self, stat: RpcQueryStats) {
// a stat always come from just 1 frontend request // a stat always come from just 1 frontend request
self.frontend_requests += 1; self.frontend_requests += 1;
@ -210,8 +217,8 @@ impl BufferedRpcQueryStats {
self.sum_response_millis += stat.response_millis; self.sum_response_millis += stat.response_millis;
self.sum_credits_used += stat.compute_unit_cost; self.sum_credits_used += stat.compute_unit_cost;
let latest_balance = stat.authorization.checks.latest_balance.read(); let latest_balance = stat.authorization.checks.latest_balance.read().await;
self.latest_balance = latest_balance.clone(); self.approximate_latest_balance_for_influx = latest_balance.clone();
} }
async fn _save_db_stats( async fn _save_db_stats(
@ -219,9 +226,18 @@ impl BufferedRpcQueryStats {
chain_id: u64, chain_id: u64,
db_conn: &DatabaseConnection, db_conn: &DatabaseConnection,
key: &RpcQueryKey, key: &RpcQueryKey,
) -> Web3ProxyResult<()> { user_balance: Decimal,
) -> Web3ProxyResult<Decimal> {
let period_datetime = Utc.timestamp_opt(key.response_timestamp, 0).unwrap(); let period_datetime = Utc.timestamp_opt(key.response_timestamp, 0).unwrap();
// Because reading the balance and updating the stats here is not atomically locked, this may lead to a negative balance
// This negative balance shouldn't be large tough
let paid_credits_used = if self.sum_credits_used >= user_balance {
user_balance
} else {
self.sum_credits_used
};
// =============================== // // =============================== //
// UPDATE STATISTICS // // UPDATE STATISTICS //
// =============================== // // =============================== //
@ -241,7 +257,8 @@ impl BufferedRpcQueryStats {
sum_request_bytes: sea_orm::Set(self.sum_request_bytes), sum_request_bytes: sea_orm::Set(self.sum_request_bytes),
sum_response_millis: sea_orm::Set(self.sum_response_millis), sum_response_millis: sea_orm::Set(self.sum_response_millis),
sum_response_bytes: sea_orm::Set(self.sum_response_bytes), sum_response_bytes: sea_orm::Set(self.sum_response_bytes),
sum_credits_used: sea_orm::Set(self.sum_credits_used), sum_credits_used: sea_orm::Set(paid_credits_used),
sum_incl_free_credits_used: sea_orm::Set(self.sum_credits_used),
}; };
rpc_accounting_v2::Entity::insert(accounting_entry) rpc_accounting_v2::Entity::insert(accounting_entry)
@ -291,10 +308,15 @@ impl BufferedRpcQueryStats {
Expr::col(rpc_accounting_v2::Column::SumResponseBytes) Expr::col(rpc_accounting_v2::Column::SumResponseBytes)
.add(self.sum_response_bytes), .add(self.sum_response_bytes),
), ),
(
rpc_accounting_v2::Column::SumInclFreeCreditsUsed,
Expr::col(rpc_accounting_v2::Column::SumInclFreeCreditsUsed)
.add(self.sum_credits_used),
),
( (
rpc_accounting_v2::Column::SumCreditsUsed, rpc_accounting_v2::Column::SumCreditsUsed,
Expr::col(rpc_accounting_v2::Column::SumCreditsUsed) Expr::col(rpc_accounting_v2::Column::SumCreditsUsed)
.add(self.sum_credits_used), .add(paid_credits_used),
), ),
]) ])
.to_owned(), .to_owned(),
@ -302,356 +324,33 @@ impl BufferedRpcQueryStats {
.exec(db_conn) .exec(db_conn)
.await?; .await?;
Ok(()) Ok(paid_credits_used)
} }
async fn _get_relevant_entities( // TODO: This is basically a duplicate with the balance_checks, except the database
// TODO: Please refactor this. Also there are small differences, like the Error is 0
async fn _get_user_balance(
&self, &self,
rpc_secret_key_id: &NonZeroU64, user_id: u64,
txn: &DatabaseTransaction,
) -> Web3ProxyResult<(
rpc_key::Model,
balance::Model,
Option<(referee::Model, referrer::Model)>,
)> {
// Only calculate, and update the user balance
// Do not worry about referrers and all that
let sender_rpc_entity: rpc_key::Model = rpc_key::Entity::find()
.filter(rpc_key::Column::Id.eq(rpc_secret_key_id.get()))
.one(txn)
.await?
.context("We previous checked that the id exists, this is likely some race condition, or it just got deleted!")?;
let sender_balance: balance::Model = balance::Entity::find()
.filter(balance::Column::UserId.eq(sender_rpc_entity.user_id))
.one(txn)
.await?
.ok_or(Web3ProxyError::BadRequest(
"Could not find rpc key in db".into(),
))?;
// I think one lock here is fine, because only one server has access to the "credits_applied_for_referee" entry
let referral_objects: Option<(referee::Model, referrer::Model)> =
match referee::Entity::find()
.filter(referee::Column::UserId.eq(sender_rpc_entity.user_id))
.lock(LockType::Update)
.find_also_related(referrer::Entity)
.one(txn)
.await?
{
Some(x) => Some((
x.0,
x.1.context("Could not fine corresponding referrer code")?,
)),
None => None,
};
Ok((sender_rpc_entity, sender_balance, referral_objects))
}
async fn _compute_balance_deltas(
&self,
sender_balance: balance::Model,
referral_objects: Option<(referee::Model, referrer::Model)>,
) -> Web3ProxyResult<(Deltas, Option<(referee::Model, referrer::Model)>)> {
// Calculate Balance Only
let mut deltas = Deltas::default();
// Calculate a bunch using referrals as well
if let Some((referral_entity, referrer_code_entity)) = referral_objects {
deltas.apply_usage_bonus_to_request_sender =
referral_entity.credits_applied_for_referee;
// Calculate if we are above the usage threshold, and apply a bonus
// Optimally we would read this from the balance, but if we do it like this, we only have to lock a single table (much safer w.r.t. deadlocks)
// referral_entity.credits_applied_for_referrer * (Decimal::from(10) checks (atomically using this table only), whether the user has brought in >$100 to the referer
// In this case, the sender receives $100 as a bonus / gift
// Apply a 10$ bonus onto the user, if the user has spent 100$
trace!(
"Were credits applied so far? {:?} {:?}",
referral_entity.credits_applied_for_referee,
!referral_entity.credits_applied_for_referee
);
trace!(
"Credits applied for referrer so far? {:?}",
referral_entity.credits_applied_for_referrer
);
trace!("Sum credits used? {:?}", self.sum_credits_used);
trace!(
"Hello: {:?}",
(referral_entity.credits_applied_for_referrer * (Decimal::from(10))
+ self.sum_credits_used)
>= Decimal::from(100)
);
if !referral_entity.credits_applied_for_referee
&& (referral_entity.credits_applied_for_referrer * (Decimal::from(10))
+ self.sum_credits_used)
>= Decimal::from(100)
{
trace!("Adding sender bonus balance");
deltas.usage_bonus_to_request_sender_through_referral = Decimal::from(10);
deltas.apply_usage_bonus_to_request_sender = true;
}
// Calculate how much the referrer should get, limited to the last 12 months
// Apply 10% of the used balance as a bonus if applicable
let now = Utc::now();
let valid_until = DateTime::<Utc>::from_utc(referral_entity.referral_start_date, Utc)
+ Months::new(12);
if now <= valid_until {
deltas.bonus_to_referrer += self.sum_credits_used / Decimal::new(10, 0);
}
// Duplicate code, I should fix this later ...
let user_balance = sender_balance.total_deposits
- sender_balance.total_spent_outside_free_tier
+ deltas.usage_bonus_to_request_sender_through_referral;
// Split up the component of into how much of the paid component was used, and how much of the free component was used (anything after "balance")
if user_balance - self.sum_credits_used >= Decimal::from(0) {
deltas.balance_spent_including_free_credits = self.sum_credits_used;
deltas.balance_spent_excluding_free_credits = self.sum_credits_used;
} else {
deltas.balance_spent_including_free_credits = self.sum_credits_used;
deltas.balance_spent_excluding_free_credits = user_balance;
}
Ok((deltas, Some((referral_entity, referrer_code_entity))))
} else {
let user_balance = sender_balance.total_deposits
- sender_balance.total_spent_outside_free_tier
+ deltas.usage_bonus_to_request_sender_through_referral;
// Split up the component of into how much of the paid component was used, and how much of the free component was used (anything after "balance")
if user_balance - self.sum_credits_used >= Decimal::from(0) {
deltas.balance_spent_including_free_credits = self.sum_credits_used;
deltas.balance_spent_excluding_free_credits = self.sum_credits_used;
} else {
deltas.balance_spent_including_free_credits = self.sum_credits_used;
deltas.balance_spent_excluding_free_credits = user_balance;
}
Ok((deltas, None))
}
}
/// Save all referral-based objects in the database
async fn _update_balances_in_db(
&self,
deltas: &Deltas,
txn: &DatabaseTransaction,
sender_rpc_entity: &rpc_key::Model,
referral_objects: &Option<(referee::Model, referrer::Model)>,
) -> Web3ProxyResult<()> {
// Do the sender balance updates
let user_balance = balance::ActiveModel {
id: sea_orm::NotSet,
total_deposits: sea_orm::Set(deltas.usage_bonus_to_request_sender_through_referral),
total_spent_including_free_tier: sea_orm::Set(
deltas.balance_spent_including_free_credits,
),
total_spent_outside_free_tier: sea_orm::Set(
deltas.balance_spent_excluding_free_credits,
),
user_id: sea_orm::Set(sender_rpc_entity.user_id),
};
// In any case, add to the balance
trace!(
"Delta is: {:?} from credits used {:?}",
deltas,
self.sum_credits_used
);
let _ = balance::Entity::insert(user_balance)
.on_conflict(
OnConflict::new()
.values([
(
balance::Column::TotalSpentIncludingFreeTier,
Expr::col(balance::Column::TotalSpentIncludingFreeTier)
.add(deltas.balance_spent_including_free_credits),
),
(
balance::Column::TotalSpentOutsideFreeTier,
Expr::col(balance::Column::TotalSpentOutsideFreeTier)
.add(deltas.balance_spent_excluding_free_credits),
),
(
balance::Column::TotalDeposits,
Expr::col(balance::Column::TotalDeposits)
.add(deltas.usage_bonus_to_request_sender_through_referral),
),
])
.to_owned(),
)
.exec(txn)
.await?;
// Do the referrer_entry updates
if let Some((referral_entity, referrer_code_entity)) = referral_objects {
trace!("Positive referrer deposit delta");
let referee_entry = referee::ActiveModel {
id: sea_orm::Unchanged(referral_entity.id),
credits_applied_for_referee: sea_orm::Set(
deltas.apply_usage_bonus_to_request_sender,
),
credits_applied_for_referrer: sea_orm::Set(deltas.bonus_to_referrer),
referral_start_date: sea_orm::Unchanged(referral_entity.referral_start_date),
used_referral_code: sea_orm::Unchanged(referral_entity.used_referral_code),
user_id: sea_orm::Unchanged(referral_entity.user_id),
};
// If there was a referral, first of all check if credits should be applied to the sender itself (once he spent 100$)
// If these two values are not equal, that means that we have not applied the bonus just yet.
// In that case, we can apply the bonus just now.
if referral_entity.credits_applied_for_referee
!= deltas.apply_usage_bonus_to_request_sender
{
referee::Entity::insert(referee_entry.clone())
.on_conflict(
OnConflict::new()
.values([(
// TODO Make it a "Set", add is hacky (but works ..)
referee::Column::CreditsAppliedForReferee,
Expr::col(referee::Column::CreditsAppliedForReferee)
.add(deltas.apply_usage_bonus_to_request_sender),
)])
.to_owned(),
)
.exec(txn)
.await?;
// Also add a bonus to the sender (But this should already have been done with the above code!!)
}
// If the bonus to the referrer is non-empty, also apply that
if deltas.bonus_to_referrer > Decimal::from(0) {
referee::Entity::insert(referee_entry)
.on_conflict(
OnConflict::new()
.values([(
// TODO Make it a "Set", add is hacky (but works ..)
referee::Column::CreditsAppliedForReferrer,
Expr::col(referee::Column::CreditsAppliedForReferrer)
.add(deltas.bonus_to_referrer),
)])
.to_owned(),
)
.exec(txn)
.await?;
}
// Finally, add to the balance of the referrer
let user_balance = balance::ActiveModel {
id: sea_orm::NotSet,
total_deposits: sea_orm::Set(deltas.bonus_to_referrer),
user_id: sea_orm::Set(referrer_code_entity.user_id),
..Default::default()
};
let _ = balance::Entity::insert(user_balance)
.on_conflict(
OnConflict::new()
.values([(
balance::Column::TotalDeposits,
Expr::col(balance::Column::TotalDeposits).add(deltas.bonus_to_referrer),
)])
.to_owned(),
)
.exec(txn)
.await?;
};
Ok(())
}
/// Update & Invalidate cache if user is credits are low (premium downgrade condition)
/// Reduce credits if there was no issue
/// This is not atomic, so this may be an issue because it's not sequentially consistent across threads
/// It is a good-enough approximation though, and if the TTL for the balance cache is low enough, this should be ok
async fn _update_balance_in_cache(
&self,
deltas: &Deltas,
db_conn: &DatabaseConnection,
sender_rpc_entity: &rpc_key::Model,
rpc_secret_key_cache: &RpcSecretKeyCache,
user_balance_cache: &UserBalanceCache, user_balance_cache: &UserBalanceCache,
) -> Web3ProxyResult<()> { db_conn: &DbConn,
// ================== ) -> Arc<AsyncRwLock<Balance>> {
// Modify sender balance trace!("Will get it from the balance cache");
// ================== let out: Arc<AsyncRwLock<Balance>> = user_balance_cache
let user_id = NonZeroU64::try_from(sender_rpc_entity.user_id) .try_get_with(user_id, async {
.expect("database ids are always nonzero"); let x = match try_get_balance_from_db(db_conn, user_id).await? {
// We don't do an get_or_insert, because technically we don't have the most up to date balance
// Also let's keep things simple in terms of writing and getting. A single place writes it, multiple places can remove / poll it
let latest_balance = match user_balance_cache.get(&user_id) {
Some(x) => x, Some(x) => x,
// If not in cache, nothing to update None => return Err(Web3ProxyError::InvalidUserKey),
None => return Ok(()),
}; };
return Ok(Arc::new(AsyncRwLock::new(x)));
let (balance_before, latest_balance) = { })
let mut latest_balance = latest_balance.write(); .await
.unwrap_or_else(|err| {
let balance_before = latest_balance.clone(); error!("Could not find balance for user !{}", err);
// We are just instantiating this for type-safety's sake
// Now modify the balance Arc::new(AsyncRwLock::new(Balance::default()))
latest_balance.total_deposit += deltas.usage_bonus_to_request_sender_through_referral; });
latest_balance.total_spend += deltas.balance_spent_including_free_credits; out
(balance_before, latest_balance.clone())
};
// we only start subtracting once the user is first upgraded to a premium user
// consider the user premium if total_deposit > premium threshold
// If the balance is getting low, clear the cache
// TODO: configurable amount for "premium"
// TODO: configurable amount for "low"
// we check balance_before because this current request would have been handled with limits matching the balance at the start of the request
if balance_before.total_deposit > Decimal::from(10)
&& latest_balance.remaining() <= Decimal::from(1)
{
let rpc_keys = rpc_key::Entity::find()
.filter(rpc_key::Column::UserId.eq(sender_rpc_entity.user_id))
.all(db_conn)
.await?;
// clear the user from the cache
if let Ok(user_id) = NonZeroU64::try_from(sender_rpc_entity.user_id) {
user_balance_cache.invalidate(&user_id).await;
}
// clear all keys owned by this user from the cache
for rpc_key_entity in rpc_keys {
rpc_secret_key_cache
.invalidate(&rpc_key_entity.secret_key.into())
.await;
}
}
// ==================
// Modify referrer balance
// ==================
// We ignore this for performance reasons right now
// We would have to load all the RPC keys of the referrer to de-activate them
// Instead, it's fine if they wait for 60 seconds until their cache expires
// If they are getting low, they will refresh themselves if necessary and then they will see
// // If the referrer object is empty, we don't care about the cache, becase this will be fetched in a next request from the database
// if let Some((referral_entity, _)) = referral_objects {
// if let Ok(referrer_user_id) = NonZeroU64::try_from(referral_entity.user_id) {
// // If the referrer object is in the cache, we just remove it from the balance cache; it will be reloaded next time
// // Get all the RPC keys, delete them from cache
//
// // In principle, do not remove the cache for the referrer; the next reload will trigger premium
// // We don't touch the RPC keys at this stage for the refferer, a payment must be paid to reset those (we want to keep things simple here)
// // Anyways, the RPC keys will be updated in 5 min (600 seconds)
// user_balance_cache.invalidate(&referrer_user_id).await;
// }
// };
Ok(())
} }
// TODO: take a db transaction instead so that we can batch? // TODO: take a db transaction instead so that we can batch?
@ -663,6 +362,7 @@ impl BufferedRpcQueryStats {
rpc_secret_key_cache: &RpcSecretKeyCache, rpc_secret_key_cache: &RpcSecretKeyCache,
user_balance_cache: &UserBalanceCache, user_balance_cache: &UserBalanceCache,
) -> Web3ProxyResult<()> { ) -> Web3ProxyResult<()> {
// Sanity check, if we need to save stats
if key.response_timestamp == 0 { if key.response_timestamp == 0 {
return Err(Web3ProxyError::Anyhow(anyhow!( return Err(Web3ProxyError::Anyhow(anyhow!(
"no response_timestamp! This is a bug! {:?} {:?}", "no response_timestamp! This is a bug! {:?} {:?}",
@ -671,49 +371,152 @@ impl BufferedRpcQueryStats {
))); )));
} }
// First of all, save the statistics to the database: let sender_user_id = key.rpc_key_user_id.map_or(0, |x| x.get());
self._save_db_stats(chain_id, db_conn, &key).await?;
// Return early if no credits were used, or if user is anonymous // Gathering cache and database rows
let user_balance = self
._get_user_balance(sender_user_id, user_balance_cache, &db_conn)
.await;
let mut user_balance = user_balance.write().await;
// First of all, save the statistics to the database:
let paid_credits_used = self
._save_db_stats(chain_id, &db_conn, &key, user_balance.remaining())
.await?;
// No need to continue if no credits were used
if self.sum_credits_used == 0.into() { if self.sum_credits_used == 0.into() {
// write-lock is released
return Ok(()); return Ok(());
} }
let rpc_secret_key_id: &NonZeroU64 = match &key.rpc_secret_key_id {
Some(x) => x, // Update and possible invalidate rpc caches if necessary (if there was a downgrade)
None => return Ok(()), {
}; let balance_before = user_balance.remaining();
info!("Balance before is {:?}", balance_before);
user_balance.total_spent_paid_credits += paid_credits_used;
// Invalidate caches if remaining is below a threshold
// It will be re-fetched again if needed
if sender_user_id != 0
&& balance_before > Decimal::from(10)
&& user_balance.remaining() < Decimal::from(10)
{
let rpc_keys = rpc_key::Entity::find()
.filter(rpc_key::Column::UserId.eq(sender_user_id))
.all(db_conn)
.await?;
// clear all keys owned by this user from the cache
for rpc_key_entity in rpc_keys {
rpc_secret_key_cache
.invalidate(&rpc_key_entity.secret_key.into())
.await;
}
}
}
// Start a transaction // Start a transaction
let txn = db_conn.begin().await?; let txn = db_conn.begin().await?;
// Fetch any items that we will be modifying
let (sender_rpc_entity, _sender_balance, referral_objects) =
self._get_relevant_entities(rpc_secret_key_id, &txn).await?;
// Compute Changes in balance for user and referrer, incl. referral logic // Apply all the referral logic; let's keep it simple and flat for now
let (deltas, referral_objects): (Deltas, Option<(referee::Model, referrer::Model)>) = self // Calculate if we are above the usage threshold, and apply a bonus
._compute_balance_deltas(_sender_balance, referral_objects) // Optimally we would read this from the balance, but if we do it like this, we only have to lock a single table (much safer w.r.t. deadlocks)
.await?; // referral_entity.credits_applied_for_referrer * (Decimal::from(10) checks (atomically using this table only), whether the user has brought in >$100 to the referer
// In this case, the sender receives $100 as a bonus / gift
// Apply a 10$ bonus onto the user, if the user has spent 100$
match referee::Entity::find()
.filter(referee::Column::UserId.eq(sender_user_id))
.find_also_related(referrer::Entity)
.one(&txn)
.await?
{
Some((referral_entity, Some(referrer))) => {
// Get the balance for the referrer, see if they're premium or not
let referrer_balance = self
._get_user_balance(referrer.user_id, &user_balance_cache, &db_conn)
.await;
// Just to keep locking simple, keep things where they are
let referrer_balance = referrer_balance.read().await;
let mut new_referee_entity: referee::ActiveModel;
// Update balances in the database let bonus_for_user_threshold = Decimal::from(100);
self._update_balances_in_db(&deltas, &txn, &sender_rpc_entity, &referral_objects) let bonus_for_user = Decimal::from(10);
// Provide one-time bonus to user, if more than 100$ was spent,
// and if the one-time bonus was not already provided
if referral_entity.one_time_bonus_applied_for_referee.is_zero()
&& (referral_entity.credits_applied_for_referrer * Decimal::from(10)
+ self.sum_credits_used)
>= bonus_for_user_threshold
{
trace!("Adding sender bonus balance");
// TODO: Should it be Unchanged, or do we have to load all numbers there again ..
new_referee_entity = referee::ActiveModel {
id: sea_orm::Unchanged(Default::default()),
one_time_bonus_applied_for_referee: sea_orm::Set(bonus_for_user),
credits_applied_for_referrer: sea_orm::Unchanged(Default::default()),
referral_start_date: sea_orm::Unchanged(Default::default()),
used_referral_code: sea_orm::Unchanged(Default::default()),
user_id: sea_orm::Unchanged(Default::default()),
};
// Update the cache
{
// Also no need to invalidate the cache here, just by itself
user_balance.total_deposits += bonus_for_user;
}
// The resulting field will not be read again, so I will not try to turn the ActiveModel into a Model one
new_referee_entity = new_referee_entity.save(&txn).await?;
}
// Apply the bonus to the referrer if they are premium right now
{
let now = Utc::now();
let valid_until =
DateTime::<Utc>::from_utc(referral_entity.referral_start_date, Utc)
+ Months::new(12);
// There must be a conflict, if there isn't than there's a clear bug!
// If the referrer has more than $10, provide credits to them
// Also only works if the referrer referred the person less than 1 year ago
// TODO: Perhaps let's not worry about the referral cache here, to avoid deadlocks (hence only reading)
let referrer_bonus = self.sum_credits_used / Decimal::from(10);
if referrer_balance.remaining() > Decimal::from(10) && now <= valid_until {
referee::Entity::insert(referral_entity.into_active_model())
.on_conflict(
OnConflict::new()
.values([(
// Provide credits to referrer
referee::Column::CreditsAppliedForReferrer,
Expr::col(referee::Column::CreditsAppliedForReferrer)
.add(referrer_bonus),
)])
.to_owned(),
)
.exec(&txn)
.await?; .await?;
}
// No need to invalidate the referrer every single time;
// this is no major change and can wait for a bit
// Let's not worry about the referrer balance bcs possibility of deadlock
// referrer_balance.total_deposits += referrer_bonus;
}
}
Some((referee, None)) => {
error!(
"No referrer code found for this referrer, this should never happen! {:?}",
referee
);
}
_ => {}
};
// Finally commit the transaction in the database // Finally commit the transaction in the database
txn.commit() txn.commit()
.await .await
.context("Failed to update referral and balance updates")?; .context("Failed to update referral and balance updates")?;
// Update balanaces in the cache.
// do this after commiting the database so that invalidated caches definitely query commited data
self._update_balance_in_cache(
&deltas,
db_conn,
&sender_rpc_entity,
rpc_secret_key_cache,
user_balance_cache,
)
.await?;
Ok(()) Ok(())
} }
@ -734,7 +537,7 @@ impl BufferedRpcQueryStats {
builder = builder.tag("method", key.method); builder = builder.tag("method", key.method);
// Read the latest balance ... // Read the latest balance ...
let remaining = self.latest_balance.remaining(); let remaining = self.approximate_latest_balance_for_influx.remaining();
trace!("Remaining balance for influx is {:?}", remaining); trace!("Remaining balance for influx is {:?}", remaining);
builder = builder builder = builder

@ -1,8 +1,8 @@
use super::{AppStat, RpcQueryKey}; use super::{AppStat, RpcQueryKey};
use crate::app::Web3ProxyJoinHandle; use crate::app::Web3ProxyJoinHandle;
use crate::balance::Balance;
use crate::caches::{RpcSecretKeyCache, UserBalanceCache}; use crate::caches::{RpcSecretKeyCache, UserBalanceCache};
use crate::errors::Web3ProxyResult; use crate::errors::Web3ProxyResult;
use crate::frontend::authorization::Balance;
use derive_more::From; use derive_more::From;
use futures::stream; use futures::stream;
use hashbrown::HashMap; use hashbrown::HashMap;
@ -28,7 +28,7 @@ pub struct BufferedRpcQueryStats {
pub sum_credits_used: Decimal, pub sum_credits_used: Decimal,
pub sum_cu_used: Decimal, pub sum_cu_used: Decimal,
/// The user's balance at this point in time. Multiple queries might be modifying it at once. /// The user's balance at this point in time. Multiple queries might be modifying it at once.
pub latest_balance: Balance, pub approximate_latest_balance_for_influx: Balance,
} }
#[derive(From)] #[derive(From)]
@ -130,15 +130,15 @@ impl StatBuffer {
let global_timeseries_key = stat.global_timeseries_key(); let global_timeseries_key = stat.global_timeseries_key();
self.global_timeseries_buffer.entry(global_timeseries_key).or_default().add(stat.clone()); self.global_timeseries_buffer.entry(global_timeseries_key).or_default().add(stat.clone()).await;
if let Some(opt_in_timeseries_key) = stat.owned_timeseries_key() { if let Some(opt_in_timeseries_key) = stat.owned_timeseries_key() {
self.opt_in_timeseries_buffer.entry(opt_in_timeseries_key).or_default().add(stat.clone()); self.opt_in_timeseries_buffer.entry(opt_in_timeseries_key).or_default().add(stat.clone()).await;
} }
} }
if self.db_conn.is_some() { if self.db_conn.is_some() {
self.accounting_db_buffer.entry(stat.accounting_key(self.billing_period_seconds)).or_default().add(stat); self.accounting_db_buffer.entry(stat.accounting_key(self.billing_period_seconds)).or_default().add(stat).await;
} }
} }
Err(err) => { Err(err) => {

@ -13,6 +13,7 @@ use crate::common::referral::{
use crate::common::TestApp; use crate::common::TestApp;
use ethers::prelude::{Http, Provider}; use ethers::prelude::{Http, Provider};
use ethers::{signers::Signer, types::Signature}; use ethers::{signers::Signer, types::Signature};
use futures::future::select_all;
use rust_decimal::Decimal; use rust_decimal::Decimal;
use serde::Deserialize; use serde::Deserialize;
use std::str::FromStr; use std::str::FromStr;
@ -91,7 +92,7 @@ async fn test_log_in_and_out() {
#[cfg_attr(not(feature = "tests-needing-docker"), ignore)] #[cfg_attr(not(feature = "tests-needing-docker"), ignore)]
#[test_log::test(tokio::test)] #[test_log::test(tokio::test)]
async fn test_admin_balance_increase() { async fn test_admin_balance_increase() {
info!("Starting balance decreases with usage test"); info!("Starting admin can increase balance");
let x = TestApp::spawn(true).await; let x = TestApp::spawn(true).await;
let r = reqwest::Client::builder() let r = reqwest::Client::builder()
.timeout(Duration::from_secs(20)) .timeout(Duration::from_secs(20))
@ -157,7 +158,7 @@ async fn test_user_balance_decreases() {
let proxy_endpoint = format!("{}rpc/{}", x.proxy_provider.url(), rpc_keys.secret_key); let proxy_endpoint = format!("{}rpc/{}", x.proxy_provider.url(), rpc_keys.secret_key);
let proxy_provider = Provider::<Http>::try_from(proxy_endpoint).unwrap(); let proxy_provider = Provider::<Http>::try_from(proxy_endpoint).unwrap();
// Make somre requests while in the free tier, so we can test bookkeeping here // Make some requests while in the free tier, so we can test bookkeeping here
for _ in 1..10_000 { for _ in 1..10_000 {
let _ = proxy_provider let _ = proxy_provider
.request::<_, Option<ArcBlock>>("eth_getBlockByNumber", ("latest", false)) .request::<_, Option<ArcBlock>>("eth_getBlockByNumber", ("latest", false))
@ -171,6 +172,34 @@ async fn test_user_balance_decreases() {
assert_eq!(influx_count, 0); assert_eq!(influx_count, 0);
assert!(mysql_count > 0); assert!(mysql_count > 0);
// Check the balance, it should not have decreased; there should have been accounted free credits, however
let user_balance_response = user_get_balance(&x, &r, &user_login_response).await;
// Check that the balance is 0
assert_eq!(
Decimal::from_str(user_balance_response["balance"].as_str().unwrap()).unwrap(),
Decimal::from(0)
);
// Check that paid credits is 0 (because balance is 0)
assert_eq!(
Decimal::from_str(
user_balance_response["total_spent_paid_credits"]
.as_str()
.unwrap()
)
.unwrap(),
Decimal::from(0)
);
// Check that paid credits is 0 (because balance is 0)
assert_eq!(
Decimal::from_str(user_balance_response["total_deposits"].as_str().unwrap()).unwrap(),
Decimal::from(0)
);
// Check that total credits incl free used is larger than 0
assert!(
Decimal::from_str(user_balance_response["total_spent"].as_str().unwrap()).unwrap()
> Decimal::from(0)
);
// Bump both user's wallet to $20 // Bump both user's wallet to $20
admin_increase_balance( admin_increase_balance(
&x, &x,
@ -199,12 +228,21 @@ async fn test_user_balance_decreases() {
assert!(mysql_count > 0); assert!(mysql_count > 0);
// Deposits should not be affected, and should be equal to what was initially provided // Deposits should not be affected, and should be equal to what was initially provided
let user_balance_response = user_get_balance(&x, &r, &user_login_response).await;
let total_deposits = let total_deposits =
Decimal::from_str(user_balance_response["total_deposits"].as_str().unwrap()).unwrap(); Decimal::from_str(user_balance_response["total_deposits"].as_str().unwrap()).unwrap();
assert_eq!(total_deposits, Decimal::from(20)); assert_eq!(total_deposits, Decimal::from(20));
// Check that total_spent_paid credits is equal to total_spent, because we are all still inside premium
assert_eq!(
Decimal::from_str(
user_balance_response["total_spent_paid_credits"]
.as_str()
.unwrap()
)
.unwrap(),
Decimal::from_str(user_balance_response["total_spent"].as_str().unwrap()).unwrap()
);
// Get the full balance endpoint // Get the full balance endpoint
let user_balance_response = user_get_balance(&x, &r, &user_login_response).await;
let user_balance_post = let user_balance_post =
Decimal::from_str(user_balance_response["balance"].as_str().unwrap()).unwrap(); Decimal::from_str(user_balance_response["balance"].as_str().unwrap()).unwrap();
assert!(user_balance_post < user_balance_pre); assert!(user_balance_post < user_balance_pre);
@ -345,12 +383,21 @@ async fn test_referral_bonus_non_concurrent() {
let referrer_balance_post = let referrer_balance_post =
Decimal::from_str(referrer_balance_response["balance"].as_str().unwrap()).unwrap(); Decimal::from_str(referrer_balance_response["balance"].as_str().unwrap()).unwrap();
info!(
"Balances before and after are (user): {:?} {:?}",
user_balance_pre, user_balance_post
);
info!(
"Balances before and after are (referrer): {:?} {:?}",
referrer_balance_pre, referrer_balance_post
);
let difference = user_balance_pre - user_balance_post; let difference = user_balance_pre - user_balance_post;
// Make sure that the pre and post balance is not the same (i.e. some change has occurred) // Make sure that the pre and post balance is not the same (i.e. some change has occurred)
assert_ne!( assert_ne!(
user_balance_pre, user_balance_post, user_balance_pre, user_balance_post,
"Pre and post balnace is equivalent" "Pre and post balance is equivalent"
); );
assert!(user_balance_pre > user_balance_post); assert!(user_balance_pre > user_balance_post);
assert!(referrer_balance_pre < referrer_balance_post); assert!(referrer_balance_pre < referrer_balance_post);
@ -361,3 +408,345 @@ async fn test_referral_bonus_non_concurrent() {
referrer_balance_post referrer_balance_post
); );
} }
#[cfg_attr(not(feature = "tests-needing-docker"), ignore)]
#[test_log::test(tokio::test)]
async fn test_referral_bonus_concurrent_referrer_only() {
info!("Starting referral bonus test");
let x = TestApp::spawn(true).await;
let r = reqwest::Client::builder()
.timeout(Duration::from_secs(20))
.build()
.unwrap();
let user_wallet = x.wallet(0);
let referrer_wallet = x.wallet(1);
let admin_wallet = x.wallet(2);
// Create three users, one referrer, one admin who bumps both their balances
let referrer_login_response = create_user(&x, &r, &referrer_wallet, None).await;
let admin_login_response = create_user_as_admin(&x, &r, &admin_wallet).await;
// Get the first user's referral link
let referral_link = get_referral_code(&x, &r, &referrer_login_response).await;
let user_login_response = create_user(&x, &r, &user_wallet, Some(referral_link.clone())).await;
// Bump both user's wallet to $20
admin_increase_balance(
&x,
&r,
&admin_login_response,
&user_wallet,
Decimal::from(20),
)
.await;
admin_increase_balance(
&x,
&r,
&admin_login_response,
&referrer_wallet,
Decimal::from(20),
)
.await;
// Get balance before for both users
let user_balance_response = user_get_balance(&x, &r, &user_login_response).await;
let user_balance_pre =
Decimal::from_str(user_balance_response["balance"].as_str().unwrap()).unwrap();
let referrer_balance_response = user_get_balance(&x, &r, &referrer_login_response).await;
let referrer_balance_pre =
Decimal::from_str(referrer_balance_response["balance"].as_str().unwrap()).unwrap();
// Make sure they both have balance now
assert_eq!(user_balance_pre, Decimal::from(20));
assert_eq!(referrer_balance_pre, Decimal::from(20));
// Setup variables that will be used
let shared_referral_code: UserSharedReferralInfo =
get_shared_referral_codes(&x, &r, &referrer_login_response).await;
let used_referral_code: UserUsedReferralInfo =
get_used_referral_codes(&x, &r, &user_login_response).await;
// assert that the used referral code is used
assert_eq!(
format!("{:?}", user_wallet.address()),
shared_referral_code
.clone()
.referrals
.get(0)
.unwrap()
.referred_address
.clone()
.unwrap()
);
assert_eq!(
referral_link.clone(),
used_referral_code
.clone()
.referrals
.get(0)
.unwrap()
.used_referral_code
.clone()
.unwrap()
);
// Make a for-loop just spam it a bit
// Make a JSON request
let rpc_keys: RpcKey = user_get_first_rpc_key(&x, &r, &user_login_response).await;
info!("Rpc key is: {:?}", rpc_keys);
// Spawn many requests proxy_providers
let number_requests = 100;
// Spin up concurrent requests ...
let mut handles = Vec::with_capacity(number_requests);
for _ in 1..number_requests {
let url = (&x).proxy_provider.url().clone();
let secret_key = (&rpc_keys).secret_key.clone();
handles.push(tokio::spawn(async move {
let proxy_endpoint = format!("{}rpc/{}", url, secret_key);
let proxy_provider = Provider::<Http>::try_from(proxy_endpoint).unwrap();
let _proxy_result = proxy_provider
.request::<_, Option<ArcBlock>>("eth_getBlockByNumber", ("latest", false))
.await
.unwrap()
.unwrap();
_proxy_result
}));
}
let mut results = Vec::with_capacity(handles.len());
for handle in handles {
results.push(handle.await.unwrap());
}
// Flush all stats here
let (influx_count, mysql_count) = x.flush_stats().await.unwrap();
assert_eq!(influx_count, 0);
assert!(mysql_count > 0);
// Check that at least something was earned:
let shared_referral_code: UserSharedReferralInfo =
get_shared_referral_codes(&x, &r, &referrer_login_response).await;
info!("Referral code");
info!("{:?}", shared_referral_code.referrals.get(0).unwrap());
// We make sure that the referrer has $10 + 10% of the used balance
// The admin provides credits for both
let user_balance_response = user_get_balance(&x, &r, &user_login_response).await;
let user_balance_post =
Decimal::from_str(user_balance_response["balance"].as_str().unwrap()).unwrap();
let referrer_balance_response = user_get_balance(&x, &r, &referrer_login_response).await;
let referrer_balance_post =
Decimal::from_str(referrer_balance_response["balance"].as_str().unwrap()).unwrap();
info!(
"Balances before and after are (user): {:?} {:?}",
user_balance_pre, user_balance_post
);
info!(
"Balances before and after are (referrer): {:?} {:?}",
referrer_balance_pre, referrer_balance_post
);
let difference = user_balance_pre - user_balance_post;
// Make sure that the pre and post balance is not the same (i.e. some change has occurred)
assert_ne!(
user_balance_pre, user_balance_post,
"Pre and post balance is equivalent"
);
assert!(user_balance_pre > user_balance_post);
assert!(referrer_balance_pre < referrer_balance_post);
// Finally, make sure that referrer has received 10$ of balances
assert_eq!(
referrer_balance_pre + difference / Decimal::from(10),
referrer_balance_post
);
}
#[cfg_attr(not(feature = "tests-needing-docker"), ignore)]
#[test_log::test(tokio::test)]
async fn test_referral_bonus_concurrent_referrer_and_user() {
info!("Starting referral bonus test");
let x = TestApp::spawn(true).await;
let r = reqwest::Client::builder()
.timeout(Duration::from_secs(20))
.build()
.unwrap();
let user_wallet = x.wallet(0);
let referrer_wallet = x.wallet(1);
let admin_wallet = x.wallet(2);
// Create three users, one referrer, one admin who bumps both their balances
let referrer_login_response = create_user(&x, &r, &referrer_wallet, None).await;
let admin_login_response = create_user_as_admin(&x, &r, &admin_wallet).await;
// Get the first user's referral link
let referral_link = get_referral_code(&x, &r, &referrer_login_response).await;
let user_login_response = create_user(&x, &r, &user_wallet, Some(referral_link.clone())).await;
// Bump both user's wallet to $20
admin_increase_balance(
&x,
&r,
&admin_login_response,
&user_wallet,
Decimal::from(20),
)
.await;
admin_increase_balance(
&x,
&r,
&admin_login_response,
&referrer_wallet,
Decimal::from(20),
)
.await;
// Get balance before for both users
let user_balance_response = user_get_balance(&x, &r, &user_login_response).await;
let user_balance_pre =
Decimal::from_str(user_balance_response["balance"].as_str().unwrap()).unwrap();
let referrer_balance_response = user_get_balance(&x, &r, &referrer_login_response).await;
let referrer_balance_pre =
Decimal::from_str(referrer_balance_response["balance"].as_str().unwrap()).unwrap();
// Make sure they both have balance now
assert_eq!(user_balance_pre, Decimal::from(20));
assert_eq!(referrer_balance_pre, Decimal::from(20));
// Setup variables that will be used
let shared_referral_code: UserSharedReferralInfo =
get_shared_referral_codes(&x, &r, &referrer_login_response).await;
let used_referral_code: UserUsedReferralInfo =
get_used_referral_codes(&x, &r, &user_login_response).await;
// assert that the used referral code is used
assert_eq!(
format!("{:?}", user_wallet.address()),
shared_referral_code
.clone()
.referrals
.get(0)
.unwrap()
.referred_address
.clone()
.unwrap()
);
assert_eq!(
referral_link.clone(),
used_referral_code
.clone()
.referrals
.get(0)
.unwrap()
.used_referral_code
.clone()
.unwrap()
);
// Make a for-loop just spam it a bit
// Make a JSON request
let referre_rpc_keys: RpcKey = user_get_first_rpc_key(&x, &r, &referrer_login_response).await;
let user_rpc_keys: RpcKey = user_get_first_rpc_key(&x, &r, &user_login_response).await;
info!("Rpc key is: {:?}", user_rpc_keys);
// Spawn many requests proxy_providers
let number_requests = 50;
// Spin up concurrent requests ...
let mut handles = Vec::with_capacity(number_requests);
// Make one request to create the cache; this originates from no user
let url = (&x).proxy_provider.url().clone();
let proxy_endpoint = format!("{}", url);
let proxy_provider = Provider::<Http>::try_from(proxy_endpoint).unwrap();
let _proxy_result = proxy_provider
.request::<_, Option<ArcBlock>>("eth_getBlockByNumber", ("latest", false))
.await
.unwrap()
.unwrap();
for _ in 1..number_requests {
let url = (&x).proxy_provider.url().clone();
let user_secret_key = (&user_rpc_keys).secret_key.clone();
handles.push(tokio::spawn(async move {
let proxy_endpoint = format!("{}rpc/{}", url, user_secret_key);
let proxy_provider = Provider::<Http>::try_from(proxy_endpoint).unwrap();
let _proxy_result = proxy_provider
.request::<_, Option<ArcBlock>>("eth_getBlockByNumber", ("latest", false))
.await
.unwrap()
.unwrap();
_proxy_result
}));
let url = (&x).proxy_provider.url().clone();
let referrer_secret_key = (&referre_rpc_keys).secret_key.clone();
handles.push(tokio::spawn(async move {
let proxy_endpoint = format!("{}rpc/{}", url, referrer_secret_key);
let proxy_provider = Provider::<Http>::try_from(proxy_endpoint).unwrap();
let _proxy_result = proxy_provider
.request::<_, Option<ArcBlock>>("eth_getBlockByNumber", ("latest", false))
.await
.unwrap()
.unwrap();
_proxy_result
}));
}
let mut results = Vec::with_capacity(handles.len());
for handle in handles {
results.push(handle.await.unwrap());
}
// Flush all stats here
let (influx_count, mysql_count) = x.flush_stats().await.unwrap();
assert_eq!(influx_count, 0);
assert!(mysql_count > 0);
// Check that at least something was earned:
let shared_referral_code: UserSharedReferralInfo =
get_shared_referral_codes(&x, &r, &referrer_login_response).await;
info!("Referral code");
info!("{:?}", shared_referral_code.referrals.get(0).unwrap());
// We make sure that the referrer has $10 + 10% of the used balance
// The admin provides credits for both
let user_balance_response = user_get_balance(&x, &r, &user_login_response).await;
let user_balance_post =
Decimal::from_str(user_balance_response["balance"].as_str().unwrap()).unwrap();
let referrer_balance_response = user_get_balance(&x, &r, &referrer_login_response).await;
let referrer_balance_post =
Decimal::from_str(referrer_balance_response["balance"].as_str().unwrap()).unwrap();
info!(
"Balances before and after are (user): {:?} {:?}",
user_balance_pre, user_balance_post
);
info!(
"Balances before and after are (referrer): {:?} {:?}",
referrer_balance_pre, referrer_balance_post
);
let user_difference = user_balance_pre - user_balance_post;
let referrer_difference = referrer_balance_pre - referrer_balance_post;
// Make sure that the pre and post balance is not the same (i.e. some change has occurred)
assert_ne!(
user_balance_pre, user_balance_post,
"Pre and post balance is equivalent"
);
assert!(user_balance_pre > user_balance_post);
assert!(referrer_balance_pre > referrer_balance_post);
assert!(user_difference > referrer_difference);
// In this case, the referrer loses as much as the user spends (because exact same operations), but gains 10% of what the user has spent
// I'm also adding 19.9992160000 - 19.9992944000 = 0.0000784 as this is the non-cached requests cost on top (for the user)
// Because of 1 cached request, we basically give one unit tolerance
assert_eq!(
(referrer_balance_pre - user_difference + user_difference / Decimal::from(10)),
referrer_balance_post
);
}