From 63499c15649433e9c7c35f23633491194d4c30e3 Mon Sep 17 00:00:00 2001 From: David Date: Wed, 7 Jun 2023 18:39:30 +0200 Subject: [PATCH] David/66 downgrade balance logic (#89) * tried to make the balance_entry atomic, asking for feedback * added locked select everywhere and inside the txn, it should be atomic now * forgot about overwrite (referee vs sender entities were the same, overwriting each other), fixed it * will now trigger refreshing cache when balance thresholds are met * will also invalidate cache if balance gets too low (to out of funds) * added separate balance cache with shorter TTL. need to know if i should modify it (rn its not atomic), or if low TTL is good enough, and we just fetch the value every now and then from the Database * removed UserTier struct bcs duplicate with database * removed subuser premium requirement (which would have lead to a merge conflict later on) * removed user_tier artefact * replaced cache to use AtomicF64 * a bunch of small changes * some changes * will merge with devel * changed AtomicF64 to RwLock * changed AtomicF64 to RwLock * downgrading user when at 0.1$ or if sum_credits_used is very low * changed caches to be more aggressive in being non-empty * replaced Arc::clone() by reference --- scripts/manual-tests/48-balance-downgrade.sh | 2 +- web3_proxy/src/app/mod.rs | 17 +- .../bin/web3_proxy_cli/migrate_stats_to_v2.rs | 7 +- web3_proxy/src/errors.rs | 12 + web3_proxy/src/frontend/authorization.rs | 115 ++++- web3_proxy/src/frontend/users/payment.rs | 28 +- web3_proxy/src/frontend/users/referral.rs | 1 - web3_proxy/src/stats/mod.rs | 461 ++++++++++++------ web3_proxy/src/stats/stat_buffer.rs | 17 +- 9 files changed, 468 insertions(+), 192 deletions(-) diff --git a/scripts/manual-tests/48-balance-downgrade.sh b/scripts/manual-tests/48-balance-downgrade.sh index 80cc4cfd..0463a339 100644 --- a/scripts/manual-tests/48-balance-downgrade.sh +++ b/scripts/manual-tests/48-balance-downgrade.sh @@ -75,7 +75,7 @@ do --data '{"method":"eth_blockNumber","params":[],"id":1,"jsonrpc":"2.0"}' done -for i in {1..100} +for i in {1..10000} do curl \ -X POST "127.0.0.1:8544/" \ diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 302d9bfe..1be2dea9 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -55,7 +55,7 @@ use std::num::{NonZeroU32, NonZeroU64}; use std::str::FromStr; use std::sync::{atomic, Arc}; use std::time::Duration; -use tokio::sync::{broadcast, watch, Semaphore}; +use tokio::sync::{broadcast, watch, RwLock, Semaphore}; use tokio::task::JoinHandle; use tokio::time::timeout; @@ -107,11 +107,11 @@ pub struct AuthorizationChecks { /// IMPORTANT! Once confirmed by a miner, they will be public on the blockchain! pub private_txs: bool, pub proxy_mode: ProxyMode, - pub balance: Option, } /// Cache data from the database about rpc keys pub type RpcSecretKeyCache = Arc>; +pub type UserBalanceCache = Arc>>>; // Could also be an AtomicDecimal /// The application // TODO: i'm sure this is more arcs than necessary, but spawning futures makes references hard @@ -160,6 +160,8 @@ pub struct Web3ProxyApp { /// cache authenticated users so that we don't have to query the database on the hot path // TODO: should the key be our RpcSecretKey class instead of Ulid? pub rpc_secret_key_cache: RpcSecretKeyCache, + /// cache user balances so we don't have to check downgrade logic every single time + pub user_balance_cache: UserBalanceCache, /// concurrent/parallel RPC request limits for authenticated users pub user_semaphores: Cache>, /// concurrent/parallel request limits for anonymous users @@ -403,6 +405,11 @@ impl Web3ProxyApp { ) .await; + // TODO: TTL left low, this could also be a solution instead of modifiying the cache, that may be disgusting across threads / slow anyways + let user_balance_cache = + CacheWithTTL::arc_with_capacity("user_balance_cache", 10_000, Duration::from_secs(600)) + .await; + // create a channel for receiving stats // we do this in a channel so we don't slow down our response to the users // stats can be saved in mysql, influxdb, both, or none @@ -416,6 +423,7 @@ impl Web3ProxyApp { 60, influxdb_client.clone(), Some(rpc_secret_key_cache.clone()), + Some(user_balance_cache.clone()), stat_buffer_shutdown_receiver, 1, )? { @@ -631,6 +639,9 @@ impl Web3ProxyApp { frontend_ip_rate_limiter, frontend_registered_user_rate_limiter, hostname, + vredis_pool, + rpc_secret_key_cache, + user_balance_cache, http_client, influxdb_client, internal_provider, @@ -641,10 +652,8 @@ impl Web3ProxyApp { pending_transactions, pending_tx_sender, private_rpcs, - rpc_secret_key_cache, stat_sender, user_semaphores, - vredis_pool, watch_consensus_head_receiver, }; diff --git a/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs b/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs index a933a86d..75823b0f 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs @@ -4,6 +4,7 @@ use entities::{rpc_accounting, rpc_key}; use futures::stream::FuturesUnordered; use futures::StreamExt; use log::{error, info}; +use migration::sea_orm::prelude::Decimal; use migration::sea_orm::QueryOrder; use migration::sea_orm::{ ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter, QuerySelect, UpdateResult, @@ -12,7 +13,7 @@ use migration::{Expr, Value}; use parking_lot::Mutex; use std::num::NonZeroU64; use std::sync::Arc; -use tokio::sync::broadcast; +use tokio::sync::{broadcast, RwLock}; use tokio::time::Instant; use ulid::Ulid; use web3_proxy::app::BILLING_PERIOD_SECONDS; @@ -79,6 +80,7 @@ impl MigrateStatsToV2 { 30, influxdb_client.clone(), None, + None, rpc_account_shutdown_recevier, 1, ) @@ -169,6 +171,8 @@ impl MigrateStatsToV2 { let request_ulid = Ulid::new(); + let latest_balance = Arc::new(RwLock::new(Decimal::default())); + // Create RequestMetadata let request_metadata = RequestMetadata { archive_request: x.archive_request.into(), @@ -191,6 +195,7 @@ impl MigrateStatsToV2 { start_instant: Instant::now(), stat_sender: Some(stat_sender.clone()), request_ulid, + latest_balance, }; if let Some(x) = request_metadata.try_send_stat()? { diff --git a/web3_proxy/src/errors.rs b/web3_proxy/src/errors.rs index 63257a76..3a17bbd0 100644 --- a/web3_proxy/src/errors.rs +++ b/web3_proxy/src/errors.rs @@ -76,6 +76,7 @@ pub enum Web3ProxyError { UnknownReferralCode, InvalidReferer, InvalidSignatureLength, + InvalidUserTier, InvalidUserAgent, InvalidUserKey, IpAddrParse(AddrParseError), @@ -502,6 +503,17 @@ impl Web3ProxyError { }, ) } + Self::InvalidUserTier => { + warn!("InvalidUserTier"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + JsonRpcErrorData { + message: Cow::Borrowed("UserTier is not valid!"), + code: StatusCode::BAD_REQUEST.as_u16().into(), + data: None, + }, + ) + } Self::JoinError(err) => { let code = if err.is_cancelled() { trace!("JoinError. likely shutting down. err={:?}", err); diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index 8212e055..7e9a615a 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -7,7 +7,7 @@ use crate::jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest}; use crate::rpcs::one::Web3Rpc; use crate::stats::{AppStat, BackendRequests, RpcQueryStats}; use crate::user_token::UserBearerToken; -use anyhow::Context; +use anyhow::{Context, Error}; use axum::headers::authorization::Bearer; use axum::headers::{Header, Origin, Referer, UserAgent}; use chrono::Utc; @@ -23,7 +23,9 @@ use hashbrown::HashMap; use http::HeaderValue; use ipnet::IpNet; use log::{error, trace, warn}; +use migration::sea_orm::prelude::Decimal; use migration::sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter}; +use num_traits::ToPrimitive; use rdkafka::message::{Header as KafkaHeader, OwnedHeaders as KafkaOwnedHeaders, OwnedMessage}; use rdkafka::producer::{FutureProducer, FutureRecord}; use rdkafka::util::Timeout as KafkaTimeout; @@ -33,10 +35,11 @@ use std::convert::Infallible; use std::fmt::Display; use std::hash::{Hash, Hasher}; use std::mem; +use std::num::NonZeroU64; use std::sync::atomic::{self, AtomicBool, AtomicI64, AtomicU64, AtomicUsize}; use std::time::Duration; use std::{net::IpAddr, str::FromStr, sync::Arc}; -use tokio::sync::{OwnedSemaphorePermit, Semaphore}; +use tokio::sync::{OwnedSemaphorePermit, RwLock, Semaphore}; use tokio::task::JoinHandle; use tokio::time::Instant; use ulid::Ulid; @@ -280,6 +283,9 @@ pub struct RequestMetadata { /// Cancel-safe channel for sending stats to the buffer pub stat_sender: Option>, + + /// Latest balance + pub latest_balance: Arc>, } impl Default for Authorization { @@ -306,6 +312,7 @@ impl Default for RequestMetadata { response_timestamp: Default::default(), start_instant: Instant::now(), stat_sender: Default::default(), + latest_balance: Default::default(), } } } @@ -431,6 +438,19 @@ impl RequestMetadata { } } + // Get latest balance from cache + let latest_balance = match app + .balance_checks(authorization.checks.user_id) + .await + .context("Could not retrieve balance from database or cache!") + { + Ok(x) => x, + Err(err) => { + error!("{}", err); + Arc::new(RwLock::new(Decimal::default())) + } + }; + let x = Self { archive_request: false.into(), backend_requests: Default::default(), @@ -447,6 +467,7 @@ impl RequestMetadata { response_timestamp: 0.into(), start_instant: Instant::now(), stat_sender: app.stat_sender.clone(), + latest_balance, }; Arc::new(x) @@ -1109,6 +1130,38 @@ impl Web3ProxyApp { } } + /// Get the balance for the user. + /// + /// If a subuser calls this function, the subuser needs to have first attained the user_id that the rpc key belongs to. + /// This function should be called anywhere where balance is required (i.e. only rpc calls, I believe ... non-rpc calls don't really require balance) + pub(crate) async fn balance_checks( + &self, + user_id: u64, + ) -> Web3ProxyResult>> { + match NonZeroU64::try_from(user_id) { + Err(_) => Ok(Arc::new(RwLock::new(Decimal::default()))), + Ok(x) => { + self.user_balance_cache + .try_get_or_insert_async(&x, async move { + let db_replica = self + .db_replica() + .web3_context("Getting database connection")?; + + let balance: Decimal = match balance::Entity::find() + .filter(balance::Column::UserId.eq(user_id)) + .one(db_replica.as_ref()) + .await? + { + Some(x) => x.available_balance, + None => Decimal::default(), + }; + Ok(Arc::new(RwLock::new(balance))) + }) + .await + } + } + } + // check the local cache for user data, or query the database pub(crate) async fn authorization_checks( &self, @@ -1135,24 +1188,6 @@ impl Web3ProxyApp { Some(rpc_key_model) => { // TODO: move these splits into helper functions // TODO: can we have sea orm handle this for us? - let user_model = user::Entity::find_by_id(rpc_key_model.user_id) - .one(db_replica.as_ref()) - .await? - .context("no related user")?; - - let balance = balance::Entity::find() - .filter(balance::Column::UserId.eq(user_model.id)) - .one(db_replica.as_ref()) - .await? - .map(|x| x.available_balance) - .unwrap_or_default(); - - let user_tier_model = - user_tier::Entity::find_by_id(user_model.user_tier_id) - .one(db_replica.as_ref()) - .await? - .context("no related user tier")?; - let allowed_ips: Option> = if let Some(allowed_ips) = rpc_key_model.allowed_ips { let x = allowed_ips @@ -1212,8 +1247,42 @@ impl Web3ProxyApp { None }; - let rpc_key_id = - Some(rpc_key_model.id.try_into().expect("db ids are never 0")); + // Get the user_tier + let user_model = user::Entity::find_by_id(rpc_key_model.user_id) + .one(db_replica.as_ref()) + .await? + .context("user model was not found, but every rpc_key should have a user")?; + + let balance = match balance::Entity::find() + .filter(balance::Column::UserId.eq(user_model.id)) + .one(db_replica.as_ref()) + .await? { + Some(x) => x.available_balance, + None => Decimal::default() + }; + + let mut user_tier_model = + user_tier::Entity::find_by_id(user_model.user_tier_id) + .one(db_replica.as_ref()) + .await? + .context("related user tier not found, but every user should have a tier")?; + + // TODO: Do the logic here, as to how to treat the user, based on balance and initial check + // Clear the cache (not the login!) in the stats if a tier-change happens (clear, but don't modify roles) + if user_tier_model.title == "Premium" && balance < Decimal::new(1, 1) { + // Find the equivalent downgraded user tier, and modify limits from there + if let Some(downgrade_user_tier) = user_tier_model.downgrade_tier_id { + user_tier_model = + user_tier::Entity::find_by_id(downgrade_user_tier) + .one(db_replica.as_ref()) + .await? + .context("finding the downgrade user tier for premium did not work for premium")?; + } else { + return Err(Web3ProxyError::InvalidUserTier); + } + } + + let rpc_key_id = Some(rpc_key_model.id.try_into().context("db ids are never 0")?); Ok(AuthorizationChecks { user_id: rpc_key_model.user_id, @@ -1229,7 +1298,6 @@ impl Web3ProxyApp { max_requests_per_period: user_tier_model.max_requests_per_period, private_txs: rpc_key_model.private_txs, proxy_mode, - balance: Some(balance), }) } None => Ok(AuthorizationChecks::default()), @@ -1249,6 +1317,7 @@ impl Web3ProxyApp { user_agent: Option, ) -> Web3ProxyResult { let authorization_checks = self.authorization_checks(proxy_mode, rpc_key).await?; + let balance = self.balance_checks(authorization_checks.user_id).await?; // if no rpc_key_id matching the given rpc was found, then we can't rate limit by key if authorization_checks.rpc_secret_key_id.is_none() { diff --git a/web3_proxy/src/frontend/users/payment.rs b/web3_proxy/src/frontend/users/payment.rs index 1b8772dc..58e8446d 100644 --- a/web3_proxy/src/frontend/users/payment.rs +++ b/web3_proxy/src/frontend/users/payment.rs @@ -8,11 +8,12 @@ use axum::{ Extension, Json, TypedHeader, }; use axum_macros::debug_handler; -use entities::{balance, increase_on_chain_balance_receipt, user}; +use entities::{balance, increase_on_chain_balance_receipt, rpc_key, user}; use ethbloom::Input as BloomInput; -use ethers::abi::{AbiEncode, ParamType}; -use ethers::types::{Address, TransactionReceipt, ValueOrArray, H256, U256}; +use ethers::abi::AbiEncode; +use ethers::types::{Address, TransactionReceipt, ValueOrArray, H256}; use hashbrown::HashMap; +// use http::StatusCode; use http::StatusCode; use log::{debug, info, trace}; use migration::sea_orm::prelude::Decimal; @@ -24,6 +25,7 @@ use num_traits::Pow; use payment_contracts::ierc20::IERC20; use payment_contracts::payment_factory::{self, PaymentFactory}; use serde_json::json; +use std::num::NonZeroU64; use std::sync::Arc; /// Implements any logic related to payments @@ -192,6 +194,8 @@ pub async fn user_balance_post( todo!("delete this transaction from the database"); } + // Create a new transaction that will be used for joint transaction + let txn = db_conn.begin().await?; if let Ok(event) = payment_factory_contract .decode_event::( "PaymentReceived", @@ -282,6 +286,24 @@ pub async fn user_balance_post( receipt.save(&txn).await?; + // Remove all RPC-keys owned by this user from the cache, s.t. rate limits are re-calculated + let rpc_keys = rpc_key::Entity::find() + .filter(rpc_key::Column::UserId.eq(recipient.id)) + .all(&txn) + .await?; + + match NonZeroU64::try_from(recipient.id) { + Err(_) => {} + Ok(x) => { + app.user_balance_cache.remove(&x); + } + }; + + for rpc_key_entity in rpc_keys { + app.rpc_secret_key_cache + .remove(&rpc_key_entity.secret_key.into()); + } + let x = json!({ "tx_hash": tx_hash, "log_index": log_index, diff --git a/web3_proxy/src/frontend/users/referral.rs b/web3_proxy/src/frontend/users/referral.rs index 22c0ed9d..784bbdc1 100644 --- a/web3_proxy/src/frontend/users/referral.rs +++ b/web3_proxy/src/frontend/users/referral.rs @@ -28,7 +28,6 @@ use std::sync::Arc; /// Create or get the existing referral link. /// This is the link that the user can share to third parties, and get credits. -/// Applies to premium users only #[debug_handler] pub async fn user_referral_link_get( Extension(app): Extension>, diff --git a/web3_proxy/src/stats/mod.rs b/web3_proxy/src/stats/mod.rs index 7ddace3f..a88f7717 100644 --- a/web3_proxy/src/stats/mod.rs +++ b/web3_proxy/src/stats/mod.rs @@ -4,11 +4,12 @@ pub mod db_queries; pub mod influxdb_queries; mod stat_buffer; pub use stat_buffer::{SpawnedStatBuffer, StatBuffer}; +use std::borrow::BorrowMut; use std::cmp; -use crate::app::RpcSecretKeyCache; +use crate::app::{RpcSecretKeyCache, UserBalanceCache}; use crate::errors::{Web3ProxyError, Web3ProxyResult}; -use crate::frontend::authorization::{Authorization, RequestMetadata}; +use crate::frontend::authorization::{Authorization, RequestMetadata, RpcSecretKey}; use crate::rpcs::one::Web3Rpc; use anyhow::{anyhow, Context}; use axum::headers::Origin; @@ -16,20 +17,22 @@ use chrono::{DateTime, Months, TimeZone, Utc}; use derive_more::From; use entities::sea_orm_active_enums::TrackingLevel; use entities::{balance, referee, referrer, rpc_accounting_v2, rpc_key, user}; +use ethers::core::k256::elliptic_curve::bigint::NonZero; use influxdb2::models::DataPoint; -use log::trace; +use log::{error, info, trace, warn}; use migration::sea_orm::prelude::Decimal; -use migration::sea_orm::QuerySelect; use migration::sea_orm::{ - self, ActiveModelTrait, ColumnTrait, DatabaseConnection, EntityTrait, IntoActiveModel, - QueryFilter, TransactionTrait, + self, ColumnTrait, DatabaseConnection, EntityTrait, IntoActiveModel, QueryFilter, + TransactionTrait, }; -use migration::{Expr, LockType, OnConflict}; -use num_traits::ToPrimitive; +use migration::sea_orm::{DatabaseTransaction, QuerySelect}; +use migration::{Expr, LockType, OnConflict, Order}; +use num_traits::{clamp, clamp_min, ToPrimitive}; use parking_lot::Mutex; use std::num::NonZeroU64; use std::sync::atomic::{self, Ordering}; use std::sync::Arc; +use tokio::sync::RwLock; use self::stat_buffer::BufferedRpcQueryStats; @@ -57,6 +60,8 @@ pub struct RpcQueryStats { pub response_timestamp: i64, /// Credits used signifies how how much money was used up pub credits_used: Decimal, + /// Last credits used + pub latest_balance: Arc>, } #[derive(Clone, Debug, From, Hash, PartialEq, Eq)] @@ -182,6 +187,13 @@ impl RpcQueryStats { } } +struct Deltas { + sender_available_balance_delta: Decimal, + sender_used_balance_delta: Decimal, + sender_bonus_applied: bool, + referrer_available_balance_delta: Decimal, +} + /// A stat that we aggregate and then store in a database. /// For now there is just one, but I think there might be others later #[derive(Debug, From)] @@ -215,31 +227,23 @@ impl BufferedRpcQueryStats { self.sum_credits_used += stat.credits_used; // Also record the latest balance for this user .. - self.latest_balance = stat - .authorization - .checks - .balance - .unwrap_or(Decimal::from(0)); + // Also subtract the used balance from the cache so we + // TODO: We are already using the cache. We could also inject the cache into save_tsdb + self.latest_balance = stat.latest_balance; } - // TODO: take a db transaction instead so that we can batch? - async fn save_db( - self, + async fn _save_db_stats( + &self, chain_id: u64, db_conn: &DatabaseConnection, - key: RpcQueryKey, - rpc_secret_key_cache: Option<&RpcSecretKeyCache>, + key: &RpcQueryKey, + rpc_secret_key_cache: &RpcSecretKeyCache, + user_balance_cache: &UserBalanceCache, ) -> Web3ProxyResult<()> { - if key.response_timestamp == 0 { - return Err(Web3ProxyError::Anyhow(anyhow!( - "no response_timestamp! This is a bug! {:?} {:?}", - key, - self - ))); - } - let period_datetime = Utc.timestamp_opt(key.response_timestamp, 0).unwrap(); + // TODO: Could add last balance here (can take the element from the cache, and RpcQueryKey::AuthorizationCheck) + // =============================== // // UPDATE STATISTICS // // =============================== // @@ -320,84 +324,80 @@ impl BufferedRpcQueryStats { .exec(db_conn) .await?; - // =============================== // - // PREPARE FOR UPDATE USER BALANCE // - // =============================== // - let rpc_secret_key_id: u64 = match key.rpc_secret_key_id { - Some(x) => x.into(), - // Return early if the RPC key is not found, because then it is an anonymous user - None => return Ok(()), - }; + Ok(()) + } - // =============================== // - // GET ALL (STATIC) VARIABLES // - // =============================== // - // Get the user with that RPC key. This is also the referee - - // Txn is not strictly necessary, but still good to keep things consistent across tables - let txn = db_conn.begin().await?; - - let sender_rpc_entity = rpc_key::Entity::find() - .filter(rpc_key::Column::Id.eq(rpc_secret_key_id)) - .one(&txn) + async fn _get_relevant_entities( + &self, + rpc_secret_key_id: &NonZeroU64, + txn: &DatabaseTransaction, + ) -> Web3ProxyResult<( + rpc_key::Model, + balance::Model, + Option<(referee::Model, referrer::Model)>, + )> { + // Only calculate, and update the user balance + // Do not worry about referrers and all that + let sender_rpc_entity: rpc_key::Model = rpc_key::Entity::find() + .filter(rpc_key::Column::Id.eq(rpc_secret_key_id.get())) + .one(txn) .await? .context("We previous checked that the id exists, this is likely some race condition, or it just got deleted!")?; - // (1) Do some general bookkeeping on the user - if self.sum_credits_used == 0.into() { - // return early because theres no need to touch the balance table - return Ok(()); - } - - let sender_balance = balance::Entity::find() + let sender_balance: balance::Model = balance::Entity::find() .filter(balance::Column::UserId.eq(sender_rpc_entity.user_id)) - .one(db_conn) + .one(txn) .await? .ok_or(Web3ProxyError::BadRequest( "Could not find rpc key in db".into(), ))?; // I think one lock here is fine, because only one server has access to the "credits_applied_for_referee" entry - let referral_objects = match referee::Entity::find() - .filter(referee::Column::UserId.eq(sender_rpc_entity.user_id)) - .lock(LockType::Update) - .find_also_related(referrer::Entity) - .one(&txn) - .await? - { - Some(x) => Some(( - x.0, - x.1.context("Could not fine corresponding referrer code")?, - )), - None => None, + let referral_objects: Option<(referee::Model, referrer::Model)> = + match referee::Entity::find() + .filter(referee::Column::UserId.eq(sender_rpc_entity.user_id)) + .lock(LockType::Update) + .find_also_related(referrer::Entity) + .one(txn) + .await? + { + Some(x) => Some(( + x.0, + x.1.context("Could not fine corresponding referrer code")?, + )), + None => None, + }; + + Ok((sender_rpc_entity, sender_balance, referral_objects)) + } + + async fn _compute_balance_deltas( + &self, + referral_objects: Option<(referee::Model, referrer::Model)>, + ) -> Web3ProxyResult<(Deltas, Option<(referee::Model, referrer::Model)>)> { + // Calculate Balance Only + let mut deltas = Deltas { + sender_available_balance_delta: -self.sum_credits_used, + sender_used_balance_delta: self.sum_credits_used, + sender_bonus_applied: false, + referrer_available_balance_delta: Decimal::from(0), }; - // ====================== // - // INITIATE DELTAS // - // ====================== // - // Calculate Balance Only (No referrer) - let mut sender_available_balance_delta = Decimal::from(-1) * self.sum_credits_used; - let sender_used_balance_delta = self.sum_credits_used; - let mut sender_bonus_applied; - // Calculate Referrer Bonuses - let mut referrer_balance_delta = Decimal::from(0); - - // ============================================================ // - // BASED ON REFERRERS, CALCULATE HOW MUCH SHOULD BE ATTRIBUTED // - // ============================================================ // - // If we don't lock the database as we do above on the referral_entry, we would have to do this operation on the database + // Calculate a bunch using referrals as well if let Some((referral_entity, referrer_code_entity)) = referral_objects { - sender_bonus_applied = referral_entity.credits_applied_for_referee; + deltas.sender_bonus_applied = referral_entity.credits_applied_for_referee; // Calculate if we are above the usage threshold, and apply a bonus // Optimally we would read this from the balance, but if we do it like this, we only have to lock a single table (much safer w.r.t. deadlocks) + // referral_entity.credits_applied_for_referrer * (Decimal::from(10) checks (atomically using this table only), whether the user has brought in >$100 to the referer + // In this case, the sender receives $100 as a bonus / gift if !referral_entity.credits_applied_for_referee && (referral_entity.credits_applied_for_referrer * (Decimal::from(10)) + self.sum_credits_used) >= Decimal::from(100) { - sender_available_balance_delta += Decimal::from(100); - sender_bonus_applied = true; + deltas.sender_available_balance_delta += Decimal::from(100); + deltas.sender_bonus_applied = true; } // Calculate how much the referrer should get, limited to the last 12 months @@ -407,51 +407,29 @@ impl BufferedRpcQueryStats { + Months::new(12); if now <= valid_until { - referrer_balance_delta += self.sum_credits_used / Decimal::new(10, 0); + deltas.referrer_available_balance_delta += + self.sum_credits_used / Decimal::new(10, 0); } - // Do the referrer_entry updates - if referrer_balance_delta > Decimal::from(0) { - let referee_entry = referee::ActiveModel { - id: sea_orm::Unchanged(referral_entity.id), - referral_start_date: sea_orm::Unchanged(referral_entity.referral_start_date), - used_referral_code: sea_orm::Unchanged(referral_entity.used_referral_code), - user_id: sea_orm::Unchanged(referral_entity.user_id), - - credits_applied_for_referee: sea_orm::Set(sender_bonus_applied), - credits_applied_for_referrer: sea_orm::Set(referrer_balance_delta), - }; - referee::Entity::insert(referee_entry) - .on_conflict( - OnConflict::new() - .values([ - ( - referee::Column::CreditsAppliedForReferee, - // Make it a "Set" - Expr::col(referee::Column::CreditsAppliedForReferee) - .eq(sender_bonus_applied), - ), - ( - referee::Column::CreditsAppliedForReferrer, - Expr::col(referee::Column::CreditsAppliedForReferrer) - .add(referrer_balance_delta), - ), - ]) - .to_owned(), - ) - .exec(&txn) - .await? - .last_insert_id; - } + return Ok((deltas, Some((referral_entity, referrer_code_entity)))); } - // ================================= // - // UPDATE REFERRER & USER BALANCE // - // ================================= // + Ok((deltas, None)) + } + + /// Save all referral-based objects in the database + async fn _update_balances_in_db( + &self, + deltas: &Deltas, + txn: &DatabaseTransaction, + sender_rpc_entity: &rpc_key::Model, + referral_objects: &Option<(referee::Model, referrer::Model)>, + ) -> Web3ProxyResult<()> { + // Do the user updates let user_balance = balance::ActiveModel { id: sea_orm::NotSet, - available_balance: sea_orm::Set(sender_available_balance_delta), - used_balance: sea_orm::Set(sender_used_balance_delta), + available_balance: sea_orm::Set(deltas.sender_available_balance_delta), + used_balance: sea_orm::Set(deltas.sender_used_balance_delta), user_id: sea_orm::Set(sender_rpc_entity.user_id), }; @@ -462,45 +440,216 @@ impl BufferedRpcQueryStats { ( balance::Column::AvailableBalance, Expr::col(balance::Column::AvailableBalance) - .add(sender_available_balance_delta), + .add(deltas.sender_available_balance_delta), ), ( balance::Column::UsedBalance, - Expr::col(balance::Column::UsedBalance).add(sender_used_balance_delta), + Expr::col(balance::Column::UsedBalance) + .add(deltas.sender_used_balance_delta), ), ]) .to_owned(), ) - .exec(&txn) - .await? - .last_insert_id; + .exec(txn) + .await?; - if referrer_balance_delta > Decimal::from(0) { - let user_balance = balance::ActiveModel { - id: sea_orm::NotSet, - available_balance: sea_orm::Set(referrer_balance_delta), - used_balance: sea_orm::Set(Decimal::from(0)), - user_id: sea_orm::Set(sender_rpc_entity.user_id), - }; + // Do the referrer_entry updates + if let Some((referral_entity, referrer_code_entity)) = referral_objects { + if deltas.referrer_available_balance_delta > Decimal::from(0) { + let referee_entry = referee::ActiveModel { + id: sea_orm::Unchanged(referral_entity.id), + referral_start_date: sea_orm::Unchanged(referral_entity.referral_start_date), + used_referral_code: sea_orm::Unchanged(referral_entity.used_referral_code), + user_id: sea_orm::Unchanged(referral_entity.user_id), - let _ = balance::Entity::insert(user_balance) - .on_conflict( - OnConflict::new() - .values([( - balance::Column::AvailableBalance, - Expr::col(balance::Column::AvailableBalance) - .add(referrer_balance_delta), - )]) - .to_owned(), - ) - .exec(&txn) - .await? - .last_insert_id; + credits_applied_for_referee: sea_orm::Set(deltas.sender_bonus_applied), + credits_applied_for_referrer: sea_orm::Set( + deltas.referrer_available_balance_delta, + ), + }; + referee::Entity::insert(referee_entry) + .on_conflict( + OnConflict::new() + .values([ + ( + referee::Column::CreditsAppliedForReferee, + // Make it a "Set" + Expr::col(referee::Column::CreditsAppliedForReferee) + .eq(deltas.sender_bonus_applied), + ), + ( + referee::Column::CreditsAppliedForReferrer, + Expr::col(referee::Column::CreditsAppliedForReferrer) + .add(deltas.referrer_available_balance_delta), + ), + ]) + .to_owned(), + ) + .exec(txn) + .await? + .last_insert_id; + + let user_balance = balance::ActiveModel { + id: sea_orm::NotSet, + available_balance: sea_orm::Set(deltas.referrer_available_balance_delta), + used_balance: sea_orm::Set(Decimal::from(0)), + user_id: sea_orm::Set(referral_entity.user_id), + }; + + let _ = balance::Entity::insert(user_balance) + .on_conflict( + OnConflict::new() + .values([( + balance::Column::AvailableBalance, + Expr::col(balance::Column::AvailableBalance) + .add(deltas.referrer_available_balance_delta), + )]) + .to_owned(), + ) + .exec(txn) + .await?; + } + }; + Ok(()) + } + + /// Update & Invalidate cache if user is below 10$ credits (premium downgrade condition) + /// Reduce credits if there was no issue + /// This is not atomic, so this may be an issue because it's not sequentially consistent across threads + /// It is a good-enough approximation though, and if the TTL for the balance cache is high enough, this should be ok + async fn _update_balance_in_cache( + &self, + deltas: &Deltas, + txn: &DatabaseTransaction, + sender_rpc_entity: &rpc_key::Model, + referral_objects: &Option<(referee::Model, referrer::Model)>, + rpc_secret_key_cache: &RpcSecretKeyCache, + user_balance_cache: &UserBalanceCache, + ) -> Web3ProxyResult<()> { + // ================== + // Modify sender balance + // ================== + let sender_latest_balance = match NonZeroU64::try_from(sender_rpc_entity.user_id) { + Err(_) => Err(Web3ProxyError::BadResponse( + "Balance is not positive, although it was previously checked to be as such!" + .to_string(), + )), + // We don't do an get_or_insert, because technically we don't have the most up to date balance + // Also let's keep things simple in terms of writing and getting. A single place writes it, multiple places can remove / poll it + Ok(x) => Ok(user_balance_cache.get(&x)), + }?; + let sender_latest_balance = match sender_latest_balance { + Some(x) => x, + // If not in cache, nothing to update theoretically + None => return Ok(()), + }; + let mut latest_balance = sender_latest_balance.write().await; + let balance_before = (*latest_balance).clone(); + // Now modify the balance + *latest_balance = *latest_balance + deltas.sender_available_balance_delta; + if *latest_balance < Decimal::from(0) { + *latest_balance = Decimal::from(0); } - // ================================ // - // TODO: REFRESH USER ROLE IN CACHE // - // ================================ // + // Also check if the referrer is premium (thought above 10$ will always be treated as premium at least) + // Should only refresh cache if the premium threshold is crossed + if balance_before > Decimal::from(0) && *latest_balance == Decimal::from(0) { + let rpc_keys = rpc_key::Entity::find() + .filter(rpc_key::Column::UserId.eq(sender_rpc_entity.user_id)) + .all(txn) + .await?; + + for rpc_key_entity in rpc_keys { + // TODO: Not sure which one was inserted, just delete both ... + rpc_secret_key_cache.remove(&rpc_key_entity.secret_key.into()); + } + + if let Ok(non_zero_user_id) = NonZeroU64::try_from(sender_rpc_entity.user_id) { + user_balance_cache.remove(&non_zero_user_id); + } + } + + // ================== + // Modify referrer balance + // ================== + // If the referrer object is empty, we don't care about the cache, becase this will be fetched in a next request from the database + if let Some((referral_entity, _)) = referral_objects { + if let Ok(referrer_user_id) = NonZeroU64::try_from(referral_entity.user_id) { + // If the referrer object is in the cache, we just remove it from the balance cache; it will be reloaded next time + // Get all the RPC keys, delete them from cache + + // In principle, do not remove the cache for the referrer; the next reload will trigger premium + // We don't touch the RPC keys at this stage for the refferer, a payment must be paid to reset those (we want to keep things simple here) + // Anyways, the RPC keys will be updated in 5 min (600 seconds) + user_balance_cache.remove(&referrer_user_id); + } + }; + + Ok(()) + } + + // TODO: take a db transaction instead so that we can batch? + async fn save_db( + self, + chain_id: u64, + db_conn: &DatabaseConnection, + key: RpcQueryKey, + rpc_secret_key_cache: &RpcSecretKeyCache, + user_balance_cache: &UserBalanceCache, + ) -> Web3ProxyResult<()> { + if key.response_timestamp == 0 { + return Err(Web3ProxyError::Anyhow(anyhow!( + "no response_timestamp! This is a bug! {:?} {:?}", + key, + self + ))); + } + + // First of all, save the statistics to the database: + self._save_db_stats( + chain_id, + db_conn, + &key, + rpc_secret_key_cache, + user_balance_cache, + ) + .await?; + + // Return early if no credits were used, or if user is anonymous + if self.sum_credits_used == 0.into() { + return Ok(()); + } + let rpc_secret_key_id: &NonZeroU64 = match &key.rpc_secret_key_id { + Some(x) => x.into(), + None => return Ok(()), + }; + + // Start a transaction + let txn = db_conn.begin().await?; + // Fetch any items that we will be modifying + let (sender_rpc_entity, _sender_balance, referral_objects) = + self._get_relevant_entities(rpc_secret_key_id, &txn).await?; + + // Compute Changes in balance for user and referrer, incl. referral logic // + let (deltas, referral_objects): (Deltas, Option<(referee::Model, referrer::Model)>) = + self._compute_balance_deltas(referral_objects).await?; + + // Update balances in the database + self._update_balances_in_db(&deltas, &txn, &sender_rpc_entity, &referral_objects) + .await?; + + // Update balanaces in the cache + self._update_balance_in_cache( + &deltas, + &txn, + &sender_rpc_entity, + &referral_objects, + rpc_secret_key_cache, + user_balance_cache, + ) + .await?; + + // Finally commit the transaction in the database txn.commit() .await .context("Failed to update referral and balance updates")?; @@ -526,6 +675,12 @@ impl BufferedRpcQueryStats { builder = builder.tag("method", method); } + // Read the latest balance ... + let balance; + { + balance = *(self.latest_balance.read().await); + } + builder = builder .tag("archive_needed", key.archive_needed.to_string()) .tag("error_response", key.error_response.to_string()) @@ -544,13 +699,11 @@ impl BufferedRpcQueryStats { "sum_credits_used", self.sum_credits_used .to_f64() - .expect("number is really (too) large"), + .context("number is really (too) large")?, ) .field( "balance", - self.latest_balance - .to_f64() - .expect("number is really (too) large"), + balance.to_f64().context("number is really (too) large")?, ); // .round() as i64 @@ -630,6 +783,8 @@ impl TryFrom for RpcQueryStats { response_millis, response_timestamp, credits_used, + // To we need to clone it here ... (?) + latest_balance: metadata.latest_balance.clone(), }; Ok(x) diff --git a/web3_proxy/src/stats/stat_buffer.rs b/web3_proxy/src/stats/stat_buffer.rs index 5efd9822..5cd7d2f8 100644 --- a/web3_proxy/src/stats/stat_buffer.rs +++ b/web3_proxy/src/stats/stat_buffer.rs @@ -1,5 +1,5 @@ use super::{AppStat, RpcQueryKey}; -use crate::app::{RpcSecretKeyCache, Web3ProxyJoinHandle}; +use crate::app::{RpcSecretKeyCache, UserBalanceCache, Web3ProxyJoinHandle}; use crate::errors::Web3ProxyResult; use derive_more::From; use futures::stream; @@ -8,8 +8,9 @@ use influxdb2::api::write::TimestampPrecision; use log::{error, info, trace}; use migration::sea_orm::prelude::Decimal; use migration::sea_orm::DatabaseConnection; +use std::sync::Arc; use std::time::Duration; -use tokio::sync::broadcast; +use tokio::sync::{broadcast, RwLock}; use tokio::time::interval; #[derive(Debug, Default)] @@ -25,7 +26,7 @@ pub struct BufferedRpcQueryStats { pub sum_response_millis: u64, pub sum_credits_used: Decimal, /// Balance tells us the user's balance at this point in time - pub latest_balance: Decimal, + pub latest_balance: Arc>, } #[derive(From)] @@ -43,7 +44,8 @@ pub struct StatBuffer { global_timeseries_buffer: HashMap, influxdb_client: Option, opt_in_timeseries_buffer: HashMap, - rpc_secret_key_cache: Option, + rpc_secret_key_cache: RpcSecretKeyCache, + user_balance_cache: UserBalanceCache, timestamp_precision: TimestampPrecision, tsdb_save_interval_seconds: u32, } @@ -58,6 +60,7 @@ impl StatBuffer { db_save_interval_seconds: u32, influxdb_client: Option, rpc_secret_key_cache: Option, + user_balance_cache: Option, shutdown_receiver: broadcast::Receiver<()>, tsdb_save_interval_seconds: u32, ) -> anyhow::Result> { @@ -77,7 +80,8 @@ impl StatBuffer { global_timeseries_buffer: Default::default(), influxdb_client, opt_in_timeseries_buffer: Default::default(), - rpc_secret_key_cache, + rpc_secret_key_cache: rpc_secret_key_cache.unwrap(), + user_balance_cache: user_balance_cache.unwrap(), timestamp_precision, tsdb_save_interval_seconds, }; @@ -183,7 +187,8 @@ impl StatBuffer { self.chain_id, db_conn, key, - self.rpc_secret_key_cache.as_ref(), + &self.rpc_secret_key_cache, + &self.user_balance_cache, ) .await {