From 86f9e7358a47d782c13ef398b15bb6d7cafee9f3 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Fri, 16 Jun 2023 23:14:43 -0700 Subject: [PATCH] move balance onto AuthorizationChecks (#131) * move balance onto AuthorizationChecks * todone * bigger default * flat cost for testing --- web3_proxy/src/app/mod.rs | 55 +----- .../bin/web3_proxy_cli/migrate_stats_to_v2.rs | 1 - web3_proxy/src/frontend/authorization.rs | 170 ++++++++++++------ web3_proxy/src/frontend/users/payment.rs | 5 +- web3_proxy/src/frontend/users/referral.rs | 13 +- web3_proxy/src/rpcs/many.rs | 1 - web3_proxy/src/stats/mod.rs | 148 +++++++-------- web3_proxy/src/stats/stat_buffer.rs | 8 +- 8 files changed, 216 insertions(+), 185 deletions(-) diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index c8a6d3d8..7c5ae9c6 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -4,7 +4,8 @@ use crate::block_number::{block_needed, BlockNeeded}; use crate::config::{AppConfig, TopConfig}; use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult}; use crate::frontend::authorization::{ - Authorization, RequestMetadata, RequestOrMethod, ResponseOrBytes, RpcSecretKey, + Authorization, AuthorizationChecks, Balance, RequestMetadata, RequestOrMethod, ResponseOrBytes, + RpcSecretKey, }; use crate::frontend::rpc_proxy_ws::ProxyMode; use crate::jsonrpc::{ @@ -24,12 +25,10 @@ use crate::rpcs::transactions::TxStatus; use crate::stats::{AppStat, StatBuffer}; use crate::user_token::UserBearerToken; use anyhow::Context; -use axum::headers::{Origin, Referer, UserAgent}; use axum::http::StatusCode; use chrono::Utc; use deferred_rate_limiter::DeferredRateLimiter; use derive_more::From; -use entities::sea_orm_active_enums::TrackingLevel; use entities::user; use ethers::core::utils::keccak256; use ethers::prelude::{Address, Bytes, Transaction, TxHash, H256, U64}; @@ -38,11 +37,10 @@ use ethers::utils::rlp::{Decodable, Rlp}; use futures::future::join_all; use futures::stream::{FuturesUnordered, StreamExt}; use hashbrown::{HashMap, HashSet}; -use ipnet::IpNet; use log::{error, info, trace, warn, Level}; -use migration::sea_orm::prelude::Decimal; use migration::sea_orm::{DatabaseTransaction, EntityTrait, PaginatorTrait, TransactionTrait}; use moka::future::{Cache, CacheBuilder}; +use parking_lot::RwLock; use redis_rate_limiter::redis::AsyncCommands; use redis_rate_limiter::{redis, DeadpoolRuntime, RedisConfig, RedisPool, RedisRateLimiter}; use serde::Serialize; @@ -54,7 +52,7 @@ use std::num::NonZeroU64; use std::str::FromStr; use std::sync::{atomic, Arc}; use std::time::Duration; -use tokio::sync::{broadcast, watch, RwLock, Semaphore}; +use tokio::sync::{broadcast, watch, Semaphore}; use tokio::task::JoinHandle; use tokio::time::timeout; @@ -67,50 +65,16 @@ pub static APP_USER_AGENT: &str = concat!( env!("CARGO_PKG_VERSION") ); -// aggregate across 1 week +/// aggregate across 1 week pub const BILLING_PERIOD_SECONDS: i64 = 60 * 60 * 24 * 7; +/// Convenience type pub type Web3ProxyJoinHandle = JoinHandle>; -/// TODO: move this -#[derive(Clone, Debug, Default, From)] -pub struct AuthorizationChecks { - /// database id of the primary user. 0 if anon - /// TODO: do we need this? its on the authorization so probably not - /// TODO: `Option`? they are actual zeroes some places in the db now - pub user_id: u64, - /// the key used (if any) - pub rpc_secret_key: Option, - /// database id of the rpc key - /// if this is None, then this request is being rate limited by ip - pub rpc_secret_key_id: Option, - /// if None, allow unlimited queries. inherited from the user_tier - pub max_requests_per_period: Option, - // if None, allow unlimited concurrent requests. inherited from the user_tier - pub max_concurrent_requests: Option, - /// if None, allow any Origin - pub allowed_origins: Option>, - /// if None, allow any Referer - pub allowed_referers: Option>, - /// if None, allow any UserAgent - pub allowed_user_agents: Option>, - /// if None, allow any IP Address - pub allowed_ips: Option>, - /// how detailed any rpc account entries should be - pub tracking_level: TrackingLevel, - /// Chance to save reverting eth_call, eth_estimateGas, and eth_sendRawTransaction to the database. - /// depending on the caller, errors might be expected. this keeps us from bloating our database - /// u16::MAX == 100% - pub log_revert_chance: u16, - /// if true, transactions are broadcast only to private mempools. - /// IMPORTANT! Once confirmed by a miner, they will be public on the blockchain! - pub private_txs: bool, - pub proxy_mode: ProxyMode, -} - /// Cache data from the database about rpc keys pub type RpcSecretKeyCache = Cache; -pub type UserBalanceCache = Cache>>; +/// Cache data from the database about user balances +pub type UserBalanceCache = Cache>>; /// The application // TODO: i'm sure this is more arcs than necessary, but spawning futures makes references hard @@ -193,7 +157,6 @@ pub async fn flatten_handles( Ok(Ok(_)) => continue, } } - Ok(()) } @@ -208,7 +171,7 @@ pub struct Web3ProxyAppSpawn { pub background_handles: FuturesUnordered>, /// config changes are sent here pub new_top_config_sender: watch::Sender, - /// watch this to know when to start the app + /// watch this to know when the app is ready to serve requests pub consensus_connections_watcher: watch::Receiver>>, } diff --git a/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs b/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs index 05ad957b..d915c733 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/migrate_stats_to_v2.rs @@ -201,7 +201,6 @@ impl MigrateStatsToV2 { start_instant: Instant::now(), stat_sender: Some(stat_sender.clone()), request_ulid, - latest_balance, }; if let Some(x) = request_metadata.try_send_stat()? { diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index 6e80df5f..d201d248 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -1,7 +1,7 @@ //! Utilities for authorization of logged in and anonymous users. use super::rpc_proxy_ws::ProxyMode; -use crate::app::{AuthorizationChecks, Web3ProxyApp, APP_USER_AGENT}; +use crate::app::{Web3ProxyApp, APP_USER_AGENT}; use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult}; use crate::jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest}; use crate::rpcs::one::Web3Rpc; @@ -25,6 +25,7 @@ use ipnet::IpNet; use log::{error, trace, warn}; use migration::sea_orm::prelude::Decimal; use migration::sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter}; +use parking_lot::RwLock; use rdkafka::message::{Header as KafkaHeader, OwnedHeaders as KafkaOwnedHeaders, OwnedMessage}; use rdkafka::producer::{FutureProducer, FutureRecord}; use rdkafka::util::Timeout as KafkaTimeout; @@ -37,7 +38,7 @@ use std::num::NonZeroU64; use std::sync::atomic::{self, AtomicBool, AtomicI64, AtomicU64, AtomicUsize}; use std::time::Duration; use std::{net::IpAddr, str::FromStr, sync::Arc}; -use tokio::sync::{OwnedSemaphorePermit, RwLock, Semaphore}; +use tokio::sync::{OwnedSemaphorePermit, Semaphore}; use tokio::task::JoinHandle; use tokio::time::Instant; use ulid::Ulid; @@ -69,6 +70,56 @@ pub enum AuthorizationType { Frontend, } +#[derive(Clone, Debug, Default)] +pub struct Balance { + pub total_deposit: Decimal, + pub total_spend: Decimal, +} + +impl Balance { + pub fn remaining(&self) -> Decimal { + self.total_deposit - self.total_spend + } +} + +/// TODO: move this +#[derive(Clone, Debug, Default, From)] +pub struct AuthorizationChecks { + /// database id of the primary user. 0 if anon + /// TODO: do we need this? its on the authorization so probably not + /// TODO: `Option`? they are actual zeroes some places in the db now + pub user_id: u64, + /// locally cached balance that may drift slightly if the user is on multiple servers + pub latest_balance: Arc>, + /// the key used (if any) + pub rpc_secret_key: Option, + /// database id of the rpc key + /// if this is None, then this request is being rate limited by ip + pub rpc_secret_key_id: Option, + /// if None, allow unlimited queries. inherited from the user_tier + pub max_requests_per_period: Option, + // if None, allow unlimited concurrent requests. inherited from the user_tier + pub max_concurrent_requests: Option, + /// if None, allow any Origin + pub allowed_origins: Option>, + /// if None, allow any Referer + pub allowed_referers: Option>, + /// if None, allow any UserAgent + pub allowed_user_agents: Option>, + /// if None, allow any IP Address + pub allowed_ips: Option>, + /// how detailed any rpc account entries should be + pub tracking_level: TrackingLevel, + /// Chance to save reverting eth_call, eth_estimateGas, and eth_sendRawTransaction to the database. + /// depending on the caller, errors might be expected. this keeps us from bloating our database + /// u16::MAX == 100% + pub log_revert_chance: u16, + /// if true, transactions are broadcast only to private mempools. + /// IMPORTANT! Once confirmed by a miner, they will be public on the blockchain! + pub private_txs: bool, + pub proxy_mode: ProxyMode, +} + /// TODO: include the authorization checks in this? #[derive(Clone, Debug)] pub struct Authorization { @@ -90,6 +141,7 @@ pub struct KafkaDebugLogger { num_responses: AtomicUsize, } +/// Ulids and Uuids matching the same bits hash the same impl Hash for RpcSecretKey { fn hash(&self, state: &mut H) { let x = match self { @@ -281,9 +333,6 @@ pub struct RequestMetadata { /// Cancel-safe channel for sending stats to the buffer pub stat_sender: Option>, - - /// Latest balance - pub latest_balance: Arc>, } impl Default for Authorization { @@ -310,7 +359,6 @@ impl Default for RequestMetadata { response_timestamp: Default::default(), start_instant: Instant::now(), stat_sender: Default::default(), - latest_balance: Default::default(), } } } @@ -322,6 +370,17 @@ impl RequestMetadata { .map(|x| x.checks.proxy_mode) .unwrap_or_default() } + + /// this may drift slightly if multiple servers are handling the same users, but should be close + pub async fn latest_balance(&self) -> Option { + if let Some(x) = self.authorization.as_ref() { + let x = x.checks.latest_balance.read().remaining(); + + Some(x) + } else { + None + } + } } #[derive(From)] @@ -436,36 +495,22 @@ impl RequestMetadata { } } - // Get latest balance from cache - let latest_balance = match app - .balance_checks(authorization.checks.user_id) - .await - .context("Could not retrieve balance from database or cache!") - { - Ok(x) => x, - Err(err) => { - error!("{}", err); - Arc::new(RwLock::new(Decimal::default())) - } - }; - let x = Self { archive_request: false.into(), + authorization: Some(authorization), backend_requests: Default::default(), error_response: false.into(), kafka_debug_logger, - no_servers: 0.into(), - authorization: Some(authorization), - request_bytes, method, + no_servers: 0.into(), + request_bytes, + request_ulid, response_bytes: 0.into(), response_from_backup_rpc: false.into(), response_millis: 0.into(), - request_ulid, response_timestamp: 0.into(), start_instant: Instant::now(), stat_sender: app.stat_sender.clone(), - latest_balance, }; Arc::new(x) @@ -1134,24 +1179,28 @@ impl Web3ProxyApp { pub(crate) async fn balance_checks( &self, user_id: u64, - ) -> Web3ProxyResult>> { + ) -> Web3ProxyResult>> { match NonZeroU64::try_from(user_id) { - Err(_) => Ok(Arc::new(RwLock::new(Decimal::default()))), + Err(_) => Ok(Arc::new(Default::default())), Ok(x) => self .user_balance_cache - .try_get_with_by_ref(&x, async move { + .try_get_with(x, async move { let db_replica = self .db_replica() .web3_context("Getting database connection")?; - let balance: Decimal = match balance::Entity::find() + let balance = match balance::Entity::find() .filter(balance::Column::UserId.eq(user_id)) .one(db_replica.as_ref()) .await? { - Some(x) => x.total_deposits - x.total_spent_outside_free_tier, - None => Decimal::default(), + Some(x) => Balance { + total_deposit: x.total_deposits, + total_spend: x.total_spent_outside_free_tier, + }, + None => Default::default(), }; + Ok(Arc::new(RwLock::new(balance))) }) .await @@ -1248,54 +1297,62 @@ impl Web3ProxyApp { let user_model = user::Entity::find_by_id(rpc_key_model.user_id) .one(db_replica.as_ref()) .await? - .context("user model was not found, but every rpc_key should have a user")?; + .context( + "user model was not found, but every rpc_key should have a user", + )?; - let balance = match balance::Entity::find() - .filter(balance::Column::UserId.eq(user_model.id)) - .one(db_replica.as_ref()) - .await? { - Some(x) => x.total_deposits - x.total_spent_outside_free_tier, - None => Decimal::default() - }; + let mut user_tier_model = user_tier::Entity::find_by_id( + user_model.user_tier_id, + ) + .one(db_replica.as_ref()) + .await? + .context( + "related user tier not found, but every user should have a tier", + )?; - let mut user_tier_model = - user_tier::Entity::find_by_id(user_model.user_tier_id) - .one(db_replica.as_ref()) - .await? - .context("related user tier not found, but every user should have a tier")?; + let latest_balance = self.balance_checks(rpc_key_model.user_id).await?; // TODO: Do the logic here, as to how to treat the user, based on balance and initial check // Clear the cache (not the login!) in the stats if a tier-change happens (clear, but don't modify roles) - if user_tier_model.title == "Premium" && balance < Decimal::new(1, 1) { - // Find the equivalent downgraded user tier, and modify limits from there - if let Some(downgrade_user_tier) = user_tier_model.downgrade_tier_id { + if let Some(downgrade_user_tier) = user_tier_model.downgrade_tier_id { + let balance = latest_balance.read().clone(); + + // only consider the user premium if they have paid at least $10 and have a balance > $.01 + // otherwise, set user_tier_model to the downograded tier + if balance.total_deposit < Decimal::from(10) + || balance.remaining() < Decimal::new(1, 2) + { user_tier_model = user_tier::Entity::find_by_id(downgrade_user_tier) .one(db_replica.as_ref()) .await? - .context("finding the downgrade user tier for premium did not work for premium")?; - } else { - return Err(Web3ProxyError::InvalidUserTier); + .context(format!( + "downgrade user tier ({}) is missing!", + downgrade_user_tier + ))?; } } - let rpc_key_id = Some(rpc_key_model.id.try_into().context("db ids are never 0")?); + let rpc_key_id = + Some(rpc_key_model.id.try_into().context("db ids are never 0")?); Ok(AuthorizationChecks { - user_id: rpc_key_model.user_id, - rpc_secret_key: Some(rpc_secret_key), - rpc_secret_key_id: rpc_key_id, allowed_ips, allowed_origins, allowed_referers, allowed_user_agents, - tracking_level: rpc_key_model.log_level, - // TODO: is floating point math going to scale this correctly - log_revert_chance: (rpc_key_model.log_revert_chance * u16::MAX as f64) as u16, + latest_balance, + // TODO: is floating point math going to scale this correctly? + log_revert_chance: (rpc_key_model.log_revert_chance * u16::MAX as f64) + as u16, max_concurrent_requests: user_tier_model.max_concurrent_requests, max_requests_per_period: user_tier_model.max_requests_per_period, private_txs: rpc_key_model.private_txs, proxy_mode, + rpc_secret_key: Some(rpc_secret_key), + rpc_secret_key_id: rpc_key_id, + tracking_level: rpc_key_model.log_level, + user_id: rpc_key_model.user_id, }) } None => Ok(AuthorizationChecks::default()), @@ -1316,7 +1373,6 @@ impl Web3ProxyApp { user_agent: Option, ) -> Web3ProxyResult { let authorization_checks = self.authorization_checks(proxy_mode, rpc_key).await?; - let balance = self.balance_checks(authorization_checks.user_id).await?; // if no rpc_key_id matching the given rpc was found, then we can't rate limit by key if authorization_checks.rpc_secret_key_id.is_none() { diff --git a/web3_proxy/src/frontend/users/payment.rs b/web3_proxy/src/frontend/users/payment.rs index 73a55b6c..a5a1dae9 100644 --- a/web3_proxy/src/frontend/users/payment.rs +++ b/web3_proxy/src/frontend/users/payment.rs @@ -348,11 +348,12 @@ pub async fn user_balance_post( } let x = json!({ - "tx_hash": tx_hash, + "amount": payment_token_amount, "block_hash": block_hash, "log_index": log_index, + "recipient_account": recipient_account, "token": payment_token_address, - "amount": payment_token_amount, + "tx_hash": tx_hash, }); debug!("deposit data: {:#?}", x); diff --git a/web3_proxy/src/frontend/users/referral.rs b/web3_proxy/src/frontend/users/referral.rs index 9afeb186..5daa09a0 100644 --- a/web3_proxy/src/frontend/users/referral.rs +++ b/web3_proxy/src/frontend/users/referral.rs @@ -105,17 +105,20 @@ pub async fn user_used_referral_stats( let mut out: Vec = Vec::new(); for x in referrals.into_iter() { let (referral_record, referrer_record) = (x.0, x.1.context("each referral entity should have a referral code associated with it, but this is not the case!")?); - // The foreign key is never optional - let referring_user = user::Entity::find_by_id(referrer_record.user_id) - .one(db_replica.as_ref()) - .await? - .context("Database error, no foreign key found for referring user")?; + + // // The foreign key is never optional + // let referring_user = user::Entity::find_by_id(referrer_record.user_id) + // .one(db_replica.as_ref()) + // .await? + // .context("Database error, no foreign key found for referring user")?; + let tmp = Info { credits_applied_for_referee: referral_record.credits_applied_for_referee, credits_applied_for_referrer: referral_record.credits_applied_for_referrer, referral_start_date: referral_record.referral_start_date, used_referral_code: referrer_record.referral_code, }; + // Start inserting json's into this out.push(tmp); } diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 48cbdeac..54e8f61d 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -1331,7 +1331,6 @@ mod tests { use log::{trace, LevelFilter}; use moka::future::CacheBuilder; use parking_lot::RwLock; - use tokio::sync::RwLock as AsyncRwLock; #[cfg(test)] fn new_peak_latency() -> PeakEwmaLatency { diff --git a/web3_proxy/src/stats/mod.rs b/web3_proxy/src/stats/mod.rs index 8b1740f2..a9faf5f2 100644 --- a/web3_proxy/src/stats/mod.rs +++ b/web3_proxy/src/stats/mod.rs @@ -27,9 +27,9 @@ use migration::{Expr, LockType, OnConflict}; use num_traits::ToPrimitive; use parking_lot::Mutex; use std::num::NonZeroU64; +use std::str::FromStr; use std::sync::atomic::{self, Ordering}; use std::sync::Arc; -use tokio::sync::RwLock; pub use stat_buffer::{SpawnedStatBuffer, StatBuffer}; @@ -57,26 +57,26 @@ pub struct RpcQueryStats { pub response_timestamp: i64, /// Credits used signifies how how much money was used up pub credits_used: Decimal, - /// Last credits used - pub latest_balance: Arc>, } #[derive(Clone, Debug, From, Hash, PartialEq, Eq)] pub struct RpcQueryKey { - /// unix epoch time - /// for the time series db, this is (close to) the time that the response was sent - /// for the account database, this is rounded to the week + /// unix epoch time. + /// for the time series db, this is (close to) the time that the response was sent. + /// for the account database, this is rounded to the week. response_timestamp: i64, - /// true if an archive server was needed to serve the request + /// true if an archive server was needed to serve the request. archive_needed: bool, - /// true if the response was some sort of JSONRPC error + /// true if the response was some sort of JSONRPC error. error_response: bool, - /// method tracking is opt-in + /// method tracking is opt-in. method: Option, - /// origin tracking is opt-in + /// origin tracking is opt-in. origin: Option, - /// None if the public url was used + /// None if the public url was used. rpc_secret_key_id: Option, + /// None if the public url was used. + rpc_key_user_id: Option, } /// round the unix epoch time to the start of a period @@ -126,6 +126,7 @@ impl RpcQueryStats { error_response: self.error_response, method, rpc_secret_key_id, + rpc_key_user_id: self.authorization.checks.user_id.try_into().ok(), origin, } } @@ -146,6 +147,7 @@ impl RpcQueryStats { error_response: self.error_response, method, rpc_secret_key_id, + rpc_key_user_id: self.authorization.checks.user_id.try_into().ok(), origin, } } @@ -177,6 +179,7 @@ impl RpcQueryStats { error_response: self.error_response, method, rpc_secret_key_id: self.authorization.checks.rpc_secret_key_id, + rpc_key_user_id: self.authorization.checks.user_id.try_into().ok(), origin, }; @@ -225,11 +228,6 @@ impl BufferedRpcQueryStats { self.sum_response_bytes += stat.response_bytes; self.sum_response_millis += stat.response_millis; self.sum_credits_used += stat.credits_used; - - // Also record the latest balance for this user .. - // Also subtract the used balance from the cache so we - // TODO: We are already using the cache. We could also inject the cache into save_tsdb - self.latest_balance = stat.latest_balance; } async fn _save_db_stats( @@ -242,8 +240,6 @@ impl BufferedRpcQueryStats { ) -> Web3ProxyResult<()> { let period_datetime = Utc.timestamp_opt(key.response_timestamp, 0).unwrap(); - // TODO: Could add last balance here (can take the element from the cache, and RpcQueryKey::AuthorizationCheck) - // =============================== // // UPDATE STATISTICS // // =============================== // @@ -587,14 +583,14 @@ impl BufferedRpcQueryStats { Ok(()) } - /// Update & Invalidate cache if user is below 10$ credits (premium downgrade condition) + /// Update & Invalidate cache if user is credits are low (premium downgrade condition) /// Reduce credits if there was no issue /// This is not atomic, so this may be an issue because it's not sequentially consistent across threads - /// It is a good-enough approximation though, and if the TTL for the balance cache is high enough, this should be ok + /// It is a good-enough approximation though, and if the TTL for the balance cache is low enough, this should be ok async fn _update_balance_in_cache( &self, deltas: &Deltas, - txn: &DatabaseTransaction, + db_conn: &DatabaseConnection, sender_rpc_entity: &rpc_key::Model, referral_objects: &Option<(referee::Model, referrer::Model)>, rpc_secret_key_cache: &RpcSecretKeyCache, @@ -603,47 +599,54 @@ impl BufferedRpcQueryStats { // ================== // Modify sender balance // ================== - let sender_latest_balance = match NonZeroU64::try_from(sender_rpc_entity.user_id) { - Err(_) => Err(Web3ProxyError::BadResponse( - "Balance is not positive, although it was previously checked to be as such!".into(), - )), - // We don't do an get_or_insert, because technically we don't have the most up to date balance - // Also let's keep things simple in terms of writing and getting. A single place writes it, multiple places can remove / poll it - Ok(x) => Ok(user_balance_cache.get(&x)), - }?; - let sender_latest_balance = match sender_latest_balance { + let user_id = NonZeroU64::try_from(sender_rpc_entity.user_id) + .expect("database ids are always nonzero"); + + // We don't do an get_or_insert, because technically we don't have the most up to date balance + // Also let's keep things simple in terms of writing and getting. A single place writes it, multiple places can remove / poll it + let latest_balance = match user_balance_cache.get(&user_id) { Some(x) => x, - // If not in cache, nothing to update theoretically + // If not in cache, nothing to update None => return Ok(()), }; - let mut latest_balance = sender_latest_balance.write().await; - let balance_before = *latest_balance; - // Now modify the balance - // TODO: Double check this (perhaps while testing...) - *latest_balance = *latest_balance - deltas.balance_spent_including_free_credits - + deltas.usage_bonus_to_request_sender_through_referral; - if *latest_balance < Decimal::from(0) { - *latest_balance = Decimal::from(0); - } - // Also check if the referrer is premium (thought above 10$ will always be treated as premium at least) - // Should only refresh cache if the premium threshold is crossed - if balance_before > Decimal::from(0) && *latest_balance == Decimal::from(0) { + let (balance_before, latest_balance) = { + let mut latest_balance = latest_balance.write(); + + let balance_before = latest_balance.clone(); + + // Now modify the balance + latest_balance.total_deposit += deltas.usage_bonus_to_request_sender_through_referral; + latest_balance.total_spend += deltas.balance_spent_including_free_credits; + + (balance_before, latest_balance.clone()) + }; + + // we only start subtracting once the user is first upgraded to a premium user + // consider the user premium if total_deposit > premium threshold + // If the balance is getting low, clear the cache + // TODO: configurable amount for "premium" + // TODO: configurable amount for "low" + // we check balance_before because this current request would have been handled with limits matching the balance at the start of the request + if balance_before.total_deposit > Decimal::from(10) + && latest_balance.remaining() <= Decimal::from(1) + { let rpc_keys = rpc_key::Entity::find() .filter(rpc_key::Column::UserId.eq(sender_rpc_entity.user_id)) - .all(txn) + .all(db_conn) .await?; + // clear the user from the cache + if let Ok(user_id) = NonZeroU64::try_from(sender_rpc_entity.user_id) { + user_balance_cache.invalidate(&user_id).await; + } + + // clear all keys owned by this user from the cache for rpc_key_entity in rpc_keys { - // TODO: Not sure which one was inserted, just delete both ... rpc_secret_key_cache .invalidate(&rpc_key_entity.secret_key.into()) .await; } - - if let Ok(non_zero_user_id) = NonZeroU64::try_from(sender_rpc_entity.user_id) { - user_balance_cache.invalidate(&non_zero_user_id).await; - } } // ================== @@ -651,7 +654,8 @@ impl BufferedRpcQueryStats { // ================== // We ignore this for performance reasons right now // We would have to load all the RPC keys of the referrer to de-activate them - // Instead, it's fine if they wait for 60 seconds until their tier reloads + // Instead, it's fine if they wait for 60 seconds until their cache expires + // If they are getting low, they will refresh themselves if necessary and then they will see // // If the referrer object is empty, we don't care about the cache, becase this will be fetched in a next request from the database // if let Some((referral_entity, _)) = referral_objects { // if let Ok(referrer_user_id) = NonZeroU64::try_from(referral_entity.user_id) { @@ -710,7 +714,7 @@ impl BufferedRpcQueryStats { let (sender_rpc_entity, _sender_balance, referral_objects) = self._get_relevant_entities(rpc_secret_key_id, &txn).await?; - // Compute Changes in balance for user and referrer, incl. referral logic // + // Compute Changes in balance for user and referrer, incl. referral logic let (deltas, referral_objects): (Deltas, Option<(referee::Model, referrer::Model)>) = self ._compute_balance_deltas(_sender_balance, referral_objects) .await?; @@ -719,10 +723,16 @@ impl BufferedRpcQueryStats { self._update_balances_in_db(&deltas, &txn, &sender_rpc_entity, &referral_objects) .await?; - // Update balanaces in the cache + // Finally commit the transaction in the database + txn.commit() + .await + .context("Failed to update referral and balance updates")?; + + // Update balanaces in the cache. + // do this after commiting the database so that invalidated caches definitely query commited data self._update_balance_in_cache( &deltas, - &txn, + &db_conn, &sender_rpc_entity, &referral_objects, rpc_secret_key_cache, @@ -730,11 +740,6 @@ impl BufferedRpcQueryStats { ) .await?; - // Finally commit the transaction in the database - txn.commit() - .await - .context("Failed to update referral and balance updates")?; - Ok(()) } @@ -757,10 +762,7 @@ impl BufferedRpcQueryStats { } // Read the latest balance ... - let balance; - { - balance = *(self.latest_balance.read().await); - } + let remaining = self.latest_balance.read().remaining(); builder = builder .tag("archive_needed", key.archive_needed.to_string()) @@ -784,7 +786,7 @@ impl BufferedRpcQueryStats { ) .field( "balance", - balance.to_f64().context("number is really (too) large")?, + remaining.to_f64().context("number is really (too) large")?, ); // .round() as i64 @@ -864,8 +866,6 @@ impl TryFrom for RpcQueryStats { response_millis, response_timestamp, credits_used, - // To we need to clone it here ... (?) - latest_balance: metadata.latest_balance.clone(), }; Ok(x) @@ -882,18 +882,26 @@ impl RpcQueryStats { cache_hit: bool, method: Option<&str>, ) -> Decimal { - // for now, always return 0 for cost - Decimal::new(0, 1) - - /* // some methods should be free. there might be cases where method isn't set (though they should be uncommon) // TODO: get this list from config (and add more to it) if let Some(method) = method.as_ref() { - if ["eth_chainId"].contains(method) { + if [ + "eth_chainId", + "eth_syncing", + "eth_protocolVersion", + "net_version", + "net_listening", + ] + .contains(method) + { return 0.into(); } } + // for now, always return a flat cost + return Decimal::from_str("0.000018").unwrap(); + + /* // TODO: get cost_minimum, cost_free_bytes, cost_per_byte, cache_hit_divisor from config. each chain will be different // pays at least $0.000018 / credits per request let cost_minimum = Decimal::new(18, 6); diff --git a/web3_proxy/src/stats/stat_buffer.rs b/web3_proxy/src/stats/stat_buffer.rs index 71412f49..40300b82 100644 --- a/web3_proxy/src/stats/stat_buffer.rs +++ b/web3_proxy/src/stats/stat_buffer.rs @@ -1,6 +1,7 @@ use super::{AppStat, RpcQueryKey}; use crate::app::{RpcSecretKeyCache, UserBalanceCache, Web3ProxyJoinHandle}; use crate::errors::Web3ProxyResult; +use crate::frontend::authorization::Balance; use derive_more::From; use futures::stream; use hashbrown::HashMap; @@ -8,9 +9,10 @@ use influxdb2::api::write::TimestampPrecision; use log::{error, info, trace}; use migration::sea_orm::prelude::Decimal; use migration::sea_orm::DatabaseConnection; +use parking_lot::RwLock; use std::sync::Arc; use std::time::Duration; -use tokio::sync::{broadcast, RwLock}; +use tokio::sync::broadcast; use tokio::time::interval; #[derive(Debug, Default)] @@ -25,8 +27,8 @@ pub struct BufferedRpcQueryStats { pub sum_response_bytes: u64, pub sum_response_millis: u64, pub sum_credits_used: Decimal, - /// Balance tells us the user's balance at this point in time - pub latest_balance: Arc>, + /// The user's balance at this point in time. Multiple queries might be modifying it at once. + pub latest_balance: Arc>, } #[derive(From)]