This commit is contained in:
Bryan Stitt 2023-07-09 20:13:03 -07:00
parent 000a1326c3
commit a560d47c48
15 changed files with 109 additions and 141 deletions

@ -43,7 +43,6 @@ impl MigrationTrait for Migration {
#[derive(Iden)]
enum Referee {
Table,
Id,
CreditsAppliedForReferee,
OneTimeBonusAppliedForReferee,
}

@ -1,14 +1,9 @@
use crate::errors::{Web3ProxyResponse, Web3ProxyResult};
use crate::errors::Web3ProxyResult;
use fstrings::{f, format_args_f};
use migration::sea_orm::{
DbBackend, DbConn, FromQueryResult, JsonValue, QueryResult, SqlxMySqlPoolConnection, Statement,
};
use migration::{sea_orm, ConnectionTrait};
use rust_decimal::Decimal;
use migration::sea_orm;
use migration::sea_orm::prelude::Decimal;
use migration::sea_orm::{DbBackend, DbConn, FromQueryResult, Statement};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::num::NonZeroU64;
use tracing::info;
/// Implements the balance getter
#[derive(Clone, Debug, Default, Serialize, Deserialize, FromQueryResult)]
@ -23,61 +18,58 @@ impl Balance {
pub fn remaining(&self) -> Decimal {
self.total_deposits - self.total_spent_paid_credits
}
}
pub async fn try_get_balance_from_db(
db_conn: &DbConn,
user_id: u64,
) -> Web3ProxyResult<Option<Balance>> {
// Return early if user_id == 0
if user_id == 0 {
return Ok(None);
pub async fn try_from_db(db_conn: &DbConn, user_id: u64) -> Web3ProxyResult<Option<Self>> {
// Return early if user_id == 0
if user_id == 0 {
return Ok(None);
}
// Injecting the variable directly, should be fine because Rust is typesafe, especially with primitives
let raw_sql = f!(r#"
SELECT
user.id AS user_id,
COALESCE(SUM(admin_receipt.amount), 0) + COALESCE(SUM(chain_receipt.amount), 0) + COALESCE(SUM(stripe_receipt.amount), 0) + COALESCE(SUM(referee.one_time_bonus_applied_for_referee), 0) + COALESCE(referrer_bonus.total_bonus, 0) AS total_deposits,
COALESCE(SUM(accounting.sum_credits_used), 0) AS total_spent_paid_credits,
COALESCE(SUM(accounting.sum_incl_free_credits_used), 0) AS total_spent
FROM
user
LEFT JOIN
admin_increase_balance_receipt AS admin_receipt ON user.id = admin_receipt.deposit_to_user_id
LEFT JOIN
increase_on_chain_balance_receipt AS chain_receipt ON user.id = chain_receipt.deposit_to_user_id
LEFT JOIN
stripe_increase_balance_receipt AS stripe_receipt ON user.id = stripe_receipt.deposit_to_user_id
LEFT JOIN
referee ON user.id = referee.user_id
LEFT JOIN
(SELECT referrer.user_id, SUM(referee.credits_applied_for_referrer) AS total_bonus
FROM referrer
JOIN referee ON referrer.id = referee.used_referral_code
GROUP BY referrer.user_id) AS referrer_bonus ON user.id = referrer_bonus.user_id
LEFT JOIN
rpc_key ON user.id = rpc_key.user_id
LEFT JOIN
rpc_accounting_v2 AS accounting ON rpc_key.id = accounting.rpc_key_id
LEFT JOIN
user_tier ON user.user_tier_id = user_tier.id
WHERE
user.id = {user_id};
"#);
let balance: Balance = match Self::find_by_statement(Statement::from_string(
DbBackend::MySql,
raw_sql,
// [.into()],
))
.one(db_conn)
.await?
{
None => return Ok(None),
Some(x) => x,
};
// Return None if there is no entry
Ok(Some(balance))
}
// Injecting the variable directly, should be fine because Rust is typesafe, especially with primitives
let raw_sql = f!(r#"
SELECT
user.id AS user_id,
COALESCE(SUM(admin_receipt.amount), 0) + COALESCE(SUM(chain_receipt.amount), 0) + COALESCE(SUM(stripe_receipt.amount), 0) + COALESCE(SUM(referee.one_time_bonus_applied_for_referee), 0) + COALESCE(referrer_bonus.total_bonus, 0) AS total_deposits,
COALESCE(SUM(accounting.sum_credits_used), 0) AS total_spent_paid_credits,
COALESCE(SUM(accounting.sum_incl_free_credits_used), 0) AS total_spent
FROM
user
LEFT JOIN
admin_increase_balance_receipt AS admin_receipt ON user.id = admin_receipt.deposit_to_user_id
LEFT JOIN
increase_on_chain_balance_receipt AS chain_receipt ON user.id = chain_receipt.deposit_to_user_id
LEFT JOIN
stripe_increase_balance_receipt AS stripe_receipt ON user.id = stripe_receipt.deposit_to_user_id
LEFT JOIN
referee ON user.id = referee.user_id
LEFT JOIN
(SELECT referrer.user_id, SUM(referee.credits_applied_for_referrer) AS total_bonus
FROM referrer
JOIN referee ON referrer.id = referee.used_referral_code
GROUP BY referrer.user_id) AS referrer_bonus ON user.id = referrer_bonus.user_id
LEFT JOIN
rpc_key ON user.id = rpc_key.user_id
LEFT JOIN
rpc_accounting_v2 AS accounting ON rpc_key.id = accounting.rpc_key_id
LEFT JOIN
user_tier ON user.user_tier_id = user_tier.id
WHERE
user.id = {};
"#, user_id).to_string();
let balance: Balance = match Balance::find_by_statement(Statement::from_string(
DbBackend::MySql,
raw_sql,
// [.into()],
))
.one(db_conn)
.await?
{
None => return Ok(None),
Some(x) => x,
};
// Return None if there is no entry
Ok(Some(balance))
}

@ -3,7 +3,6 @@ use crate::frontend::authorization::{AuthorizationChecks, RpcSecretKey};
use moka::future::Cache;
use std::fmt;
use std::net::IpAddr;
use std::num::NonZeroU64;
use std::sync::Arc;
use tokio::sync::RwLock as AsyncRwLock;

@ -6,7 +6,6 @@ use ethers::prelude::{Address, TxHash};
use ethers::types::{U256, U64};
use hashbrown::HashMap;
use migration::sea_orm::DatabaseConnection;
use rust_decimal::Decimal;
use sentry::types::Dsn;
use serde::Deserialize;
use serde_inline_default::serde_inline_default;
@ -311,7 +310,8 @@ mod tests {
fn expected_app_defaults() {
let a: AppConfig = serde_json::from_value(json!({
"chain_id": 1,
})).unwrap();
}))
.unwrap();
assert_eq!(a.min_synced_rpcs, 1);

@ -3,7 +3,6 @@
use super::authorization::login_is_authorized;
use crate::admin_queries::query_admin_modify_usertier;
use crate::app::Web3ProxyApp;
use crate::caches::UserBalanceCache;
use crate::errors::Web3ProxyResponse;
use crate::errors::{Web3ProxyError, Web3ProxyErrorContext};
use crate::frontend::users::authentication::PostLogin;
@ -18,7 +17,7 @@ use axum_client_ip::InsecureClientIp;
use axum_macros::debug_handler;
use chrono::{TimeZone, Utc};
use entities::{
admin, admin_increase_balance_receipt, admin_trail, balance, login, pending_login, rpc_key,
admin, admin_increase_balance_receipt, admin_trail, login, pending_login, rpc_key,
user,
};
use ethers::{prelude::Address, types::Bytes};
@ -28,7 +27,6 @@ use migration::sea_orm::prelude::{Decimal, Uuid};
use migration::sea_orm::{
self, ActiveModelTrait, ColumnTrait, EntityTrait, IntoActiveModel, QueryFilter,
};
use migration::{Expr, OnConflict};
use serde::{Deserialize, Serialize};
use serde_json::json;
use siwe::{Message, VerificationOpts};

@ -2,7 +2,7 @@
use super::rpc_proxy_ws::ProxyMode;
use crate::app::{Web3ProxyApp, APP_USER_AGENT};
use crate::balance::{try_get_balance_from_db, Balance};
use crate::balance::Balance;
use crate::caches::RegisteredUserRateLimitKey;
use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult};
use crate::jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest};
@ -18,7 +18,7 @@ use core::fmt;
use deferred_rate_limiter::DeferredRateLimitResult;
use derivative::Derivative;
use derive_more::From;
use entities::{balance, login, rpc_key, user, user_tier};
use entities::{login, rpc_key, user, user_tier};
use ethers::types::{Bytes, U64};
use ethers::utils::keccak256;
use futures::TryFutureExt;
@ -26,9 +26,7 @@ use hashbrown::HashMap;
use http::HeaderValue;
use ipnet::IpNet;
use migration::sea_orm::prelude::Decimal;
use migration::sea_orm::{self, ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter};
use migration::{Expr, OnConflict};
use parking_lot::RwLock;
use migration::sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter};
use rdkafka::message::{Header as KafkaHeader, OwnedHeaders as KafkaOwnedHeaders, OwnedMessage};
use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::util::Timeout as KafkaTimeout;
@ -1139,13 +1137,8 @@ impl Web3ProxyApp {
self.user_balance_cache
.try_get_with(user_id, async move {
let db_replica = self.db_replica()?;
let x = match crate::balance::try_get_balance_from_db(db_replica.as_ref(), user_id)
.await?
{
None => {
format!("user_id {:?} has no balance entry", user_id).to_owned();
Err(Web3ProxyError::InvalidUserKey)
}
let x = match Balance::try_from_db(db_replica.as_ref(), user_id).await? {
None => Err(Web3ProxyError::InvalidUserKey),
Some(x) => Ok(x),
}?;
trace!("Balance for cache retrieved from database is {:?}", x);

@ -12,7 +12,7 @@ use axum::{
use axum_client_ip::InsecureClientIp;
use axum_macros::debug_handler;
use chrono::{TimeZone, Utc};
use entities::{self, balance, login, pending_login, referee, referrer, rpc_key, user};
use entities::{self, login, pending_login, referee, referrer, rpc_key, user};
use ethers::{prelude::Address, types::Bytes};
use hashbrown::HashMap;
use http::StatusCode;

@ -1,6 +1,6 @@
use crate::app::Web3ProxyApp;
use crate::balance::{try_get_balance_from_db, Balance};
use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResponse, Web3ProxyResult};
use crate::balance::Balance;
use crate::errors::{Web3ProxyError, Web3ProxyResponse, Web3ProxyResult};
use crate::frontend::authorization::{
login_is_authorized, Authorization as Web3ProxyAuthorization,
};
@ -15,7 +15,7 @@ use axum::{
use axum_client_ip::InsecureClientIp;
use axum_macros::debug_handler;
use entities::{
admin_increase_balance_receipt, balance, increase_on_chain_balance_receipt, rpc_key,
admin_increase_balance_receipt, increase_on_chain_balance_receipt, rpc_key,
stripe_increase_balance_receipt, user,
};
use ethers::abi::AbiEncode;
@ -24,16 +24,13 @@ use hashbrown::{HashMap, HashSet};
use http::StatusCode;
use migration::sea_orm::prelude::Decimal;
use migration::sea_orm::{
self, ActiveModelTrait, ActiveValue, ColumnTrait, EntityTrait, IntoActiveModel, ModelTrait,
QueryFilter, QuerySelect, TransactionTrait,
self, ActiveModelTrait, ColumnTrait, EntityTrait, ModelTrait, QueryFilter, QuerySelect,
TransactionTrait,
};
use migration::LockType;
use migration::{Expr, OnConflict};
use payment_contracts::ierc20::IERC20;
use payment_contracts::payment_factory::{self, PaymentFactory};
use rdkafka::bindings::rd_kafka_AclBinding_destroy;
use serde_json::json;
use std::num::{NonZeroU64, TryFromIntError};
use std::sync::Arc;
use tracing::{debug, info, trace};
@ -53,7 +50,7 @@ pub async fn user_balance_get(
let db_replica = app.db_replica()?;
let user_balance = match try_get_balance_from_db(db_replica.as_ref(), user.id).await? {
let user_balance = match Balance::try_from_db(db_replica.as_ref(), user.id).await? {
None => Balance::default(),
Some(x) => x,
};

@ -7,19 +7,17 @@ use axum::{
Extension, Json, TypedHeader,
};
use axum_macros::debug_handler;
use entities::{balance, rpc_key, stripe_increase_balance_receipt, user};
use entities::{rpc_key, stripe_increase_balance_receipt, user};
use ethers::types::Address;
use http::HeaderMap;
use migration::sea_orm::prelude::Decimal;
use migration::sea_orm::{
self, ActiveModelTrait, ColumnTrait, EntityTrait, QueryFilter, TransactionTrait,
};
use migration::{Expr, OnConflict};
use serde_json::json;
use std::num::NonZeroU64;
use std::sync::Arc;
use stripe::Webhook;
use tracing::{debug, error, trace};
use tracing::{debug, error};
/// `GET /user/balance/stripe` -- Use a bearer token to get the user's balance and spend.
///

@ -11,7 +11,7 @@ use axum::{
};
use axum_macros::debug_handler;
use entities::sea_orm_active_enums::Role;
use entities::{balance, rpc_key, secondary_user, user};
use entities::{rpc_key, secondary_user, user};
use ethers::types::Address;
use hashbrown::HashMap;
use http::StatusCode;

@ -1,5 +1,5 @@
use super::StatType;
use crate::balance::{try_get_balance_from_db, Balance};
use crate::balance::Balance;
use crate::errors::Web3ProxyErrorContext;
use crate::{
app::Web3ProxyApp,
@ -16,13 +16,13 @@ use axum::{
Json, TypedHeader,
};
use entities::sea_orm_active_enums::Role;
use entities::{balance, rpc_key, secondary_user, user, user_tier};
use entities::{rpc_key, secondary_user, user, user_tier};
use fstrings::{f, format_args_f};
use hashbrown::HashMap;
use influxdb2::api::query::FluxRecord;
use influxdb2::models::Query;
use migration::sea_orm::prelude::Decimal;
use migration::sea_orm::{ColumnTrait, EntityTrait, QueryFilter};
use rust_decimal::Decimal;
use serde_json::json;
use tracing::{debug, error, trace, warn};
use ulid::Ulid;
@ -62,7 +62,7 @@ pub async fn query_user_stats<'a>(
// TODO: move this to a helper. it should be simple to check that a user has an active premium account
if let Some(caller_user) = &caller_user {
// get the balance of the user whose stats we are going to fetch (might be self, but might be another user)
let balance = match try_get_balance_from_db(db_replica.as_ref(), user_id).await? {
let balance = match Balance::try_from_db(db_replica.as_ref(), user_id).await? {
None => {
return Err(Web3ProxyError::AccessDenied(
"User Stats Response requires you to authorize with a bearer token".into(),

@ -15,32 +15,27 @@ use anyhow::{anyhow, Context};
use axum::headers::Origin;
use chrono::{DateTime, Months, TimeZone, Utc};
use derive_more::From;
use entities::{balance, referee, referrer, rpc_accounting_v2, rpc_key};
use futures::TryFutureExt;
use hyper::body::Buf;
use entities::{referee, referrer, rpc_accounting_v2, rpc_key};
use influxdb2::models::DataPoint;
use migration::sea_orm::prelude::Decimal;
use migration::sea_orm::{
self, ActiveModelTrait, ColumnTrait, DatabaseConnection, DbConn, EntityTrait, IntoActiveModel,
QueryFilter, TransactionTrait,
};
use migration::sea_orm::{DatabaseTransaction, QuerySelect};
use migration::{Expr, LockType, OnConflict};
use num_traits::{ToPrimitive, Zero};
use parking_lot::{Mutex, RwLock};
use sentry::User;
use migration::{Expr, OnConflict};
use num_traits::ToPrimitive;
use parking_lot::Mutex;
use std::borrow::Cow;
use std::convert::Infallible;
use std::default::Default;
use std::mem;
use std::num::{NonZeroU64, TryFromIntError};
use std::num::NonZeroU64;
use std::str::FromStr;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use tokio::sync::RwLock as AsyncRwLock;
use tracing::{error, info, trace};
use crate::balance::{try_get_balance_from_db, Balance};
use crate::balance::Balance;
pub use stat_buffer::{SpawnedStatBuffer, StatBuffer};
#[derive(Debug, PartialEq, Eq)]
@ -336,21 +331,20 @@ impl BufferedRpcQueryStats {
db_conn: &DbConn,
) -> Arc<AsyncRwLock<Balance>> {
trace!("Will get it from the balance cache");
let out: Arc<AsyncRwLock<Balance>> = user_balance_cache
user_balance_cache
.try_get_with(user_id, async {
let x = match try_get_balance_from_db(db_conn, user_id).await? {
let x = match Balance::try_from_db(db_conn, user_id).await? {
Some(x) => x,
None => return Err(Web3ProxyError::InvalidUserKey),
};
return Ok(Arc::new(AsyncRwLock::new(x)));
Ok(Arc::new(AsyncRwLock::new(x)))
})
.await
.unwrap_or_else(|err| {
error!("Could not find balance for user !{}", err);
// We are just instantiating this for type-safety's sake
Arc::new(AsyncRwLock::new(Balance::default()))
});
out
})
}
// TODO: take a db transaction instead so that we can batch?
@ -375,14 +369,14 @@ impl BufferedRpcQueryStats {
// Gathering cache and database rows
let user_balance = self
._get_user_balance(sender_user_id, user_balance_cache, &db_conn)
._get_user_balance(sender_user_id, user_balance_cache, db_conn)
.await;
let mut user_balance = user_balance.write().await;
// First of all, save the statistics to the database:
let paid_credits_used = self
._save_db_stats(chain_id, &db_conn, &key, user_balance.remaining())
._save_db_stats(chain_id, db_conn, &key, user_balance.remaining())
.await?;
// No need to continue if no credits were used
@ -435,7 +429,7 @@ impl BufferedRpcQueryStats {
Some((referral_entity, Some(referrer))) => {
// Get the balance for the referrer, see if they're premium or not
let referrer_balance = self
._get_user_balance(referrer.user_id, &user_balance_cache, &db_conn)
._get_user_balance(referrer.user_id, user_balance_cache, db_conn)
.await;
// Just to keep locking simple, keep things where they are
let referrer_balance = referrer_balance.read().await;

@ -1,6 +1,6 @@
use crate::TestApp;
use ethers::prelude::{LocalWallet, Signer};
use rust_decimal::Decimal;
use migration::sea_orm::prelude::Decimal;
use tracing::info;
use web3_proxy::frontend::admin::AdminIncreaseBalancePost;
use web3_proxy::frontend::users::authentication::LoginPostResponse;

@ -8,7 +8,7 @@ use crate::common::create_admin::create_user_as_admin;
use crate::common::create_user::create_user;
use crate::common::get_user_balance::user_get_balance;
use crate::common::TestApp;
use rust_decimal::Decimal;
use migration::sea_orm::prelude::Decimal;
use tracing::info;
// #[cfg_attr(not(feature = "tests-needing-docker"), ignore)]

@ -13,8 +13,7 @@ use crate::common::referral::{
use crate::common::TestApp;
use ethers::prelude::{Http, Provider};
use ethers::{signers::Signer, types::Signature};
use futures::future::select_all;
use rust_decimal::Decimal;
use migration::sea_orm::prelude::Decimal;
use serde::Deserialize;
use std::str::FromStr;
use std::time::Duration;
@ -501,17 +500,18 @@ async fn test_referral_bonus_concurrent_referrer_only() {
// Spin up concurrent requests ...
let mut handles = Vec::with_capacity(number_requests);
for _ in 1..number_requests {
let url = (&x).proxy_provider.url().clone();
let secret_key = (&rpc_keys).secret_key.clone();
let url = x.proxy_provider.url().clone();
let secret_key = rpc_keys.secret_key;
handles.push(tokio::spawn(async move {
let proxy_endpoint = format!("{}rpc/{}", url, secret_key);
let proxy_provider = Provider::<Http>::try_from(proxy_endpoint).unwrap();
let _proxy_result = proxy_provider
proxy_provider
.request::<_, Option<ArcBlock>>("eth_getBlockByNumber", ("latest", false))
.await
.unwrap()
.unwrap();
_proxy_result
.unwrap()
}));
}
@ -660,7 +660,7 @@ async fn test_referral_bonus_concurrent_referrer_and_user() {
let mut handles = Vec::with_capacity(number_requests);
// Make one request to create the cache; this originates from no user
let url = (&x).proxy_provider.url().clone();
let url = x.proxy_provider.url().clone();
let proxy_endpoint = format!("{}", url);
let proxy_provider = Provider::<Http>::try_from(proxy_endpoint).unwrap();
let _proxy_result = proxy_provider
@ -670,29 +670,27 @@ async fn test_referral_bonus_concurrent_referrer_and_user() {
.unwrap();
for _ in 1..number_requests {
let url = (&x).proxy_provider.url().clone();
let user_secret_key = (&user_rpc_keys).secret_key.clone();
let url = x.proxy_provider.url().clone();
let user_secret_key = user_rpc_keys.secret_key;
handles.push(tokio::spawn(async move {
let proxy_endpoint = format!("{}rpc/{}", url, user_secret_key);
let proxy_provider = Provider::<Http>::try_from(proxy_endpoint).unwrap();
let _proxy_result = proxy_provider
proxy_provider
.request::<_, Option<ArcBlock>>("eth_getBlockByNumber", ("latest", false))
.await
.unwrap()
.unwrap();
_proxy_result
.unwrap()
}));
let url = (&x).proxy_provider.url().clone();
let referrer_secret_key = (&referre_rpc_keys).secret_key.clone();
let url = x.proxy_provider.url().clone();
let referrer_secret_key = referre_rpc_keys.secret_key;
handles.push(tokio::spawn(async move {
let proxy_endpoint = format!("{}rpc/{}", url, referrer_secret_key);
let proxy_provider = Provider::<Http>::try_from(proxy_endpoint).unwrap();
let _proxy_result = proxy_provider
proxy_provider
.request::<_, Option<ArcBlock>>("eth_getBlockByNumber", ("latest", false))
.await
.unwrap()
.unwrap();
_proxy_result
.unwrap()
}));
}