David/66 downgrade balance logic (#89)
* tried to make the balance_entry atomic, asking for feedback * added locked select everywhere and inside the txn, it should be atomic now * forgot about overwrite (referee vs sender entities were the same, overwriting each other), fixed it * will now trigger refreshing cache when balance thresholds are met * will also invalidate cache if balance gets too low (to out of funds) * added separate balance cache with shorter TTL. need to know if i should modify it (rn its not atomic), or if low TTL is good enough, and we just fetch the value every now and then from the Database * removed UserTier struct bcs duplicate with database * removed subuser premium requirement (which would have lead to a merge conflict later on) * removed user_tier artefact * replaced cache to use AtomicF64 * a bunch of small changes * some changes * will merge with devel * changed AtomicF64 to RwLock<Decimal> * changed AtomicF64 to RwLock<Decimal> * downgrading user when at 0.1$ or if sum_credits_used is very low * changed caches to be more aggressive in being non-empty * replaced Arc::clone() by reference
This commit is contained in:
parent
fd69e6acdd
commit
63499c1564
@ -75,7 +75,7 @@ do
|
||||
--data '{"method":"eth_blockNumber","params":[],"id":1,"jsonrpc":"2.0"}'
|
||||
done
|
||||
|
||||
for i in {1..100}
|
||||
for i in {1..10000}
|
||||
do
|
||||
curl \
|
||||
-X POST "127.0.0.1:8544/" \
|
||||
|
@ -55,7 +55,7 @@ use std::num::{NonZeroU32, NonZeroU64};
|
||||
use std::str::FromStr;
|
||||
use std::sync::{atomic, Arc};
|
||||
use std::time::Duration;
|
||||
use tokio::sync::{broadcast, watch, Semaphore};
|
||||
use tokio::sync::{broadcast, watch, RwLock, Semaphore};
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio::time::timeout;
|
||||
|
||||
@ -107,11 +107,11 @@ pub struct AuthorizationChecks {
|
||||
/// IMPORTANT! Once confirmed by a miner, they will be public on the blockchain!
|
||||
pub private_txs: bool,
|
||||
pub proxy_mode: ProxyMode,
|
||||
pub balance: Option<Decimal>,
|
||||
}
|
||||
|
||||
/// Cache data from the database about rpc keys
|
||||
pub type RpcSecretKeyCache = Arc<CacheWithTTL<RpcSecretKey, AuthorizationChecks>>;
|
||||
pub type UserBalanceCache = Arc<CacheWithTTL<NonZeroU64, Arc<RwLock<Decimal>>>>; // Could also be an AtomicDecimal
|
||||
|
||||
/// The application
|
||||
// TODO: i'm sure this is more arcs than necessary, but spawning futures makes references hard
|
||||
@ -160,6 +160,8 @@ pub struct Web3ProxyApp {
|
||||
/// cache authenticated users so that we don't have to query the database on the hot path
|
||||
// TODO: should the key be our RpcSecretKey class instead of Ulid?
|
||||
pub rpc_secret_key_cache: RpcSecretKeyCache,
|
||||
/// cache user balances so we don't have to check downgrade logic every single time
|
||||
pub user_balance_cache: UserBalanceCache,
|
||||
/// concurrent/parallel RPC request limits for authenticated users
|
||||
pub user_semaphores: Cache<NonZeroU64, Arc<Semaphore>>,
|
||||
/// concurrent/parallel request limits for anonymous users
|
||||
@ -403,6 +405,11 @@ impl Web3ProxyApp {
|
||||
)
|
||||
.await;
|
||||
|
||||
// TODO: TTL left low, this could also be a solution instead of modifiying the cache, that may be disgusting across threads / slow anyways
|
||||
let user_balance_cache =
|
||||
CacheWithTTL::arc_with_capacity("user_balance_cache", 10_000, Duration::from_secs(600))
|
||||
.await;
|
||||
|
||||
// create a channel for receiving stats
|
||||
// we do this in a channel so we don't slow down our response to the users
|
||||
// stats can be saved in mysql, influxdb, both, or none
|
||||
@ -416,6 +423,7 @@ impl Web3ProxyApp {
|
||||
60,
|
||||
influxdb_client.clone(),
|
||||
Some(rpc_secret_key_cache.clone()),
|
||||
Some(user_balance_cache.clone()),
|
||||
stat_buffer_shutdown_receiver,
|
||||
1,
|
||||
)? {
|
||||
@ -631,6 +639,9 @@ impl Web3ProxyApp {
|
||||
frontend_ip_rate_limiter,
|
||||
frontend_registered_user_rate_limiter,
|
||||
hostname,
|
||||
vredis_pool,
|
||||
rpc_secret_key_cache,
|
||||
user_balance_cache,
|
||||
http_client,
|
||||
influxdb_client,
|
||||
internal_provider,
|
||||
@ -641,10 +652,8 @@ impl Web3ProxyApp {
|
||||
pending_transactions,
|
||||
pending_tx_sender,
|
||||
private_rpcs,
|
||||
rpc_secret_key_cache,
|
||||
stat_sender,
|
||||
user_semaphores,
|
||||
vredis_pool,
|
||||
watch_consensus_head_receiver,
|
||||
};
|
||||
|
||||
|
@ -4,6 +4,7 @@ use entities::{rpc_accounting, rpc_key};
|
||||
use futures::stream::FuturesUnordered;
|
||||
use futures::StreamExt;
|
||||
use log::{error, info};
|
||||
use migration::sea_orm::prelude::Decimal;
|
||||
use migration::sea_orm::QueryOrder;
|
||||
use migration::sea_orm::{
|
||||
ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter, QuerySelect, UpdateResult,
|
||||
@ -12,7 +13,7 @@ use migration::{Expr, Value};
|
||||
use parking_lot::Mutex;
|
||||
use std::num::NonZeroU64;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::sync::{broadcast, RwLock};
|
||||
use tokio::time::Instant;
|
||||
use ulid::Ulid;
|
||||
use web3_proxy::app::BILLING_PERIOD_SECONDS;
|
||||
@ -79,6 +80,7 @@ impl MigrateStatsToV2 {
|
||||
30,
|
||||
influxdb_client.clone(),
|
||||
None,
|
||||
None,
|
||||
rpc_account_shutdown_recevier,
|
||||
1,
|
||||
)
|
||||
@ -169,6 +171,8 @@ impl MigrateStatsToV2 {
|
||||
|
||||
let request_ulid = Ulid::new();
|
||||
|
||||
let latest_balance = Arc::new(RwLock::new(Decimal::default()));
|
||||
|
||||
// Create RequestMetadata
|
||||
let request_metadata = RequestMetadata {
|
||||
archive_request: x.archive_request.into(),
|
||||
@ -191,6 +195,7 @@ impl MigrateStatsToV2 {
|
||||
start_instant: Instant::now(),
|
||||
stat_sender: Some(stat_sender.clone()),
|
||||
request_ulid,
|
||||
latest_balance,
|
||||
};
|
||||
|
||||
if let Some(x) = request_metadata.try_send_stat()? {
|
||||
|
@ -76,6 +76,7 @@ pub enum Web3ProxyError {
|
||||
UnknownReferralCode,
|
||||
InvalidReferer,
|
||||
InvalidSignatureLength,
|
||||
InvalidUserTier,
|
||||
InvalidUserAgent,
|
||||
InvalidUserKey,
|
||||
IpAddrParse(AddrParseError),
|
||||
@ -502,6 +503,17 @@ impl Web3ProxyError {
|
||||
},
|
||||
)
|
||||
}
|
||||
Self::InvalidUserTier => {
|
||||
warn!("InvalidUserTier");
|
||||
(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
JsonRpcErrorData {
|
||||
message: Cow::Borrowed("UserTier is not valid!"),
|
||||
code: StatusCode::BAD_REQUEST.as_u16().into(),
|
||||
data: None,
|
||||
},
|
||||
)
|
||||
}
|
||||
Self::JoinError(err) => {
|
||||
let code = if err.is_cancelled() {
|
||||
trace!("JoinError. likely shutting down. err={:?}", err);
|
||||
|
@ -7,7 +7,7 @@ use crate::jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest};
|
||||
use crate::rpcs::one::Web3Rpc;
|
||||
use crate::stats::{AppStat, BackendRequests, RpcQueryStats};
|
||||
use crate::user_token::UserBearerToken;
|
||||
use anyhow::Context;
|
||||
use anyhow::{Context, Error};
|
||||
use axum::headers::authorization::Bearer;
|
||||
use axum::headers::{Header, Origin, Referer, UserAgent};
|
||||
use chrono::Utc;
|
||||
@ -23,7 +23,9 @@ use hashbrown::HashMap;
|
||||
use http::HeaderValue;
|
||||
use ipnet::IpNet;
|
||||
use log::{error, trace, warn};
|
||||
use migration::sea_orm::prelude::Decimal;
|
||||
use migration::sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter};
|
||||
use num_traits::ToPrimitive;
|
||||
use rdkafka::message::{Header as KafkaHeader, OwnedHeaders as KafkaOwnedHeaders, OwnedMessage};
|
||||
use rdkafka::producer::{FutureProducer, FutureRecord};
|
||||
use rdkafka::util::Timeout as KafkaTimeout;
|
||||
@ -33,10 +35,11 @@ use std::convert::Infallible;
|
||||
use std::fmt::Display;
|
||||
use std::hash::{Hash, Hasher};
|
||||
use std::mem;
|
||||
use std::num::NonZeroU64;
|
||||
use std::sync::atomic::{self, AtomicBool, AtomicI64, AtomicU64, AtomicUsize};
|
||||
use std::time::Duration;
|
||||
use std::{net::IpAddr, str::FromStr, sync::Arc};
|
||||
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
|
||||
use tokio::sync::{OwnedSemaphorePermit, RwLock, Semaphore};
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio::time::Instant;
|
||||
use ulid::Ulid;
|
||||
@ -280,6 +283,9 @@ pub struct RequestMetadata {
|
||||
|
||||
/// Cancel-safe channel for sending stats to the buffer
|
||||
pub stat_sender: Option<flume::Sender<AppStat>>,
|
||||
|
||||
/// Latest balance
|
||||
pub latest_balance: Arc<RwLock<Decimal>>,
|
||||
}
|
||||
|
||||
impl Default for Authorization {
|
||||
@ -306,6 +312,7 @@ impl Default for RequestMetadata {
|
||||
response_timestamp: Default::default(),
|
||||
start_instant: Instant::now(),
|
||||
stat_sender: Default::default(),
|
||||
latest_balance: Default::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -431,6 +438,19 @@ impl RequestMetadata {
|
||||
}
|
||||
}
|
||||
|
||||
// Get latest balance from cache
|
||||
let latest_balance = match app
|
||||
.balance_checks(authorization.checks.user_id)
|
||||
.await
|
||||
.context("Could not retrieve balance from database or cache!")
|
||||
{
|
||||
Ok(x) => x,
|
||||
Err(err) => {
|
||||
error!("{}", err);
|
||||
Arc::new(RwLock::new(Decimal::default()))
|
||||
}
|
||||
};
|
||||
|
||||
let x = Self {
|
||||
archive_request: false.into(),
|
||||
backend_requests: Default::default(),
|
||||
@ -447,6 +467,7 @@ impl RequestMetadata {
|
||||
response_timestamp: 0.into(),
|
||||
start_instant: Instant::now(),
|
||||
stat_sender: app.stat_sender.clone(),
|
||||
latest_balance,
|
||||
};
|
||||
|
||||
Arc::new(x)
|
||||
@ -1109,6 +1130,38 @@ impl Web3ProxyApp {
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the balance for the user.
|
||||
///
|
||||
/// If a subuser calls this function, the subuser needs to have first attained the user_id that the rpc key belongs to.
|
||||
/// This function should be called anywhere where balance is required (i.e. only rpc calls, I believe ... non-rpc calls don't really require balance)
|
||||
pub(crate) async fn balance_checks(
|
||||
&self,
|
||||
user_id: u64,
|
||||
) -> Web3ProxyResult<Arc<RwLock<Decimal>>> {
|
||||
match NonZeroU64::try_from(user_id) {
|
||||
Err(_) => Ok(Arc::new(RwLock::new(Decimal::default()))),
|
||||
Ok(x) => {
|
||||
self.user_balance_cache
|
||||
.try_get_or_insert_async(&x, async move {
|
||||
let db_replica = self
|
||||
.db_replica()
|
||||
.web3_context("Getting database connection")?;
|
||||
|
||||
let balance: Decimal = match balance::Entity::find()
|
||||
.filter(balance::Column::UserId.eq(user_id))
|
||||
.one(db_replica.as_ref())
|
||||
.await?
|
||||
{
|
||||
Some(x) => x.available_balance,
|
||||
None => Decimal::default(),
|
||||
};
|
||||
Ok(Arc::new(RwLock::new(balance)))
|
||||
})
|
||||
.await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// check the local cache for user data, or query the database
|
||||
pub(crate) async fn authorization_checks(
|
||||
&self,
|
||||
@ -1135,24 +1188,6 @@ impl Web3ProxyApp {
|
||||
Some(rpc_key_model) => {
|
||||
// TODO: move these splits into helper functions
|
||||
// TODO: can we have sea orm handle this for us?
|
||||
let user_model = user::Entity::find_by_id(rpc_key_model.user_id)
|
||||
.one(db_replica.as_ref())
|
||||
.await?
|
||||
.context("no related user")?;
|
||||
|
||||
let balance = balance::Entity::find()
|
||||
.filter(balance::Column::UserId.eq(user_model.id))
|
||||
.one(db_replica.as_ref())
|
||||
.await?
|
||||
.map(|x| x.available_balance)
|
||||
.unwrap_or_default();
|
||||
|
||||
let user_tier_model =
|
||||
user_tier::Entity::find_by_id(user_model.user_tier_id)
|
||||
.one(db_replica.as_ref())
|
||||
.await?
|
||||
.context("no related user tier")?;
|
||||
|
||||
let allowed_ips: Option<Vec<IpNet>> =
|
||||
if let Some(allowed_ips) = rpc_key_model.allowed_ips {
|
||||
let x = allowed_ips
|
||||
@ -1212,8 +1247,42 @@ impl Web3ProxyApp {
|
||||
None
|
||||
};
|
||||
|
||||
let rpc_key_id =
|
||||
Some(rpc_key_model.id.try_into().expect("db ids are never 0"));
|
||||
// Get the user_tier
|
||||
let user_model = user::Entity::find_by_id(rpc_key_model.user_id)
|
||||
.one(db_replica.as_ref())
|
||||
.await?
|
||||
.context("user model was not found, but every rpc_key should have a user")?;
|
||||
|
||||
let balance = match balance::Entity::find()
|
||||
.filter(balance::Column::UserId.eq(user_model.id))
|
||||
.one(db_replica.as_ref())
|
||||
.await? {
|
||||
Some(x) => x.available_balance,
|
||||
None => Decimal::default()
|
||||
};
|
||||
|
||||
let mut user_tier_model =
|
||||
user_tier::Entity::find_by_id(user_model.user_tier_id)
|
||||
.one(db_replica.as_ref())
|
||||
.await?
|
||||
.context("related user tier not found, but every user should have a tier")?;
|
||||
|
||||
// TODO: Do the logic here, as to how to treat the user, based on balance and initial check
|
||||
// Clear the cache (not the login!) in the stats if a tier-change happens (clear, but don't modify roles)
|
||||
if user_tier_model.title == "Premium" && balance < Decimal::new(1, 1) {
|
||||
// Find the equivalent downgraded user tier, and modify limits from there
|
||||
if let Some(downgrade_user_tier) = user_tier_model.downgrade_tier_id {
|
||||
user_tier_model =
|
||||
user_tier::Entity::find_by_id(downgrade_user_tier)
|
||||
.one(db_replica.as_ref())
|
||||
.await?
|
||||
.context("finding the downgrade user tier for premium did not work for premium")?;
|
||||
} else {
|
||||
return Err(Web3ProxyError::InvalidUserTier);
|
||||
}
|
||||
}
|
||||
|
||||
let rpc_key_id = Some(rpc_key_model.id.try_into().context("db ids are never 0")?);
|
||||
|
||||
Ok(AuthorizationChecks {
|
||||
user_id: rpc_key_model.user_id,
|
||||
@ -1229,7 +1298,6 @@ impl Web3ProxyApp {
|
||||
max_requests_per_period: user_tier_model.max_requests_per_period,
|
||||
private_txs: rpc_key_model.private_txs,
|
||||
proxy_mode,
|
||||
balance: Some(balance),
|
||||
})
|
||||
}
|
||||
None => Ok(AuthorizationChecks::default()),
|
||||
@ -1249,6 +1317,7 @@ impl Web3ProxyApp {
|
||||
user_agent: Option<UserAgent>,
|
||||
) -> Web3ProxyResult<RateLimitResult> {
|
||||
let authorization_checks = self.authorization_checks(proxy_mode, rpc_key).await?;
|
||||
let balance = self.balance_checks(authorization_checks.user_id).await?;
|
||||
|
||||
// if no rpc_key_id matching the given rpc was found, then we can't rate limit by key
|
||||
if authorization_checks.rpc_secret_key_id.is_none() {
|
||||
|
@ -8,11 +8,12 @@ use axum::{
|
||||
Extension, Json, TypedHeader,
|
||||
};
|
||||
use axum_macros::debug_handler;
|
||||
use entities::{balance, increase_on_chain_balance_receipt, user};
|
||||
use entities::{balance, increase_on_chain_balance_receipt, rpc_key, user};
|
||||
use ethbloom::Input as BloomInput;
|
||||
use ethers::abi::{AbiEncode, ParamType};
|
||||
use ethers::types::{Address, TransactionReceipt, ValueOrArray, H256, U256};
|
||||
use ethers::abi::AbiEncode;
|
||||
use ethers::types::{Address, TransactionReceipt, ValueOrArray, H256};
|
||||
use hashbrown::HashMap;
|
||||
// use http::StatusCode;
|
||||
use http::StatusCode;
|
||||
use log::{debug, info, trace};
|
||||
use migration::sea_orm::prelude::Decimal;
|
||||
@ -24,6 +25,7 @@ use num_traits::Pow;
|
||||
use payment_contracts::ierc20::IERC20;
|
||||
use payment_contracts::payment_factory::{self, PaymentFactory};
|
||||
use serde_json::json;
|
||||
use std::num::NonZeroU64;
|
||||
use std::sync::Arc;
|
||||
|
||||
/// Implements any logic related to payments
|
||||
@ -192,6 +194,8 @@ pub async fn user_balance_post(
|
||||
todo!("delete this transaction from the database");
|
||||
}
|
||||
|
||||
// Create a new transaction that will be used for joint transaction
|
||||
let txn = db_conn.begin().await?;
|
||||
if let Ok(event) = payment_factory_contract
|
||||
.decode_event::<payment_factory::PaymentReceivedFilter>(
|
||||
"PaymentReceived",
|
||||
@ -282,6 +286,24 @@ pub async fn user_balance_post(
|
||||
|
||||
receipt.save(&txn).await?;
|
||||
|
||||
// Remove all RPC-keys owned by this user from the cache, s.t. rate limits are re-calculated
|
||||
let rpc_keys = rpc_key::Entity::find()
|
||||
.filter(rpc_key::Column::UserId.eq(recipient.id))
|
||||
.all(&txn)
|
||||
.await?;
|
||||
|
||||
match NonZeroU64::try_from(recipient.id) {
|
||||
Err(_) => {}
|
||||
Ok(x) => {
|
||||
app.user_balance_cache.remove(&x);
|
||||
}
|
||||
};
|
||||
|
||||
for rpc_key_entity in rpc_keys {
|
||||
app.rpc_secret_key_cache
|
||||
.remove(&rpc_key_entity.secret_key.into());
|
||||
}
|
||||
|
||||
let x = json!({
|
||||
"tx_hash": tx_hash,
|
||||
"log_index": log_index,
|
||||
|
@ -28,7 +28,6 @@ use std::sync::Arc;
|
||||
|
||||
/// Create or get the existing referral link.
|
||||
/// This is the link that the user can share to third parties, and get credits.
|
||||
/// Applies to premium users only
|
||||
#[debug_handler]
|
||||
pub async fn user_referral_link_get(
|
||||
Extension(app): Extension<Arc<Web3ProxyApp>>,
|
||||
|
@ -4,11 +4,12 @@ pub mod db_queries;
|
||||
pub mod influxdb_queries;
|
||||
mod stat_buffer;
|
||||
pub use stat_buffer::{SpawnedStatBuffer, StatBuffer};
|
||||
use std::borrow::BorrowMut;
|
||||
use std::cmp;
|
||||
|
||||
use crate::app::RpcSecretKeyCache;
|
||||
use crate::app::{RpcSecretKeyCache, UserBalanceCache};
|
||||
use crate::errors::{Web3ProxyError, Web3ProxyResult};
|
||||
use crate::frontend::authorization::{Authorization, RequestMetadata};
|
||||
use crate::frontend::authorization::{Authorization, RequestMetadata, RpcSecretKey};
|
||||
use crate::rpcs::one::Web3Rpc;
|
||||
use anyhow::{anyhow, Context};
|
||||
use axum::headers::Origin;
|
||||
@ -16,20 +17,22 @@ use chrono::{DateTime, Months, TimeZone, Utc};
|
||||
use derive_more::From;
|
||||
use entities::sea_orm_active_enums::TrackingLevel;
|
||||
use entities::{balance, referee, referrer, rpc_accounting_v2, rpc_key, user};
|
||||
use ethers::core::k256::elliptic_curve::bigint::NonZero;
|
||||
use influxdb2::models::DataPoint;
|
||||
use log::trace;
|
||||
use log::{error, info, trace, warn};
|
||||
use migration::sea_orm::prelude::Decimal;
|
||||
use migration::sea_orm::QuerySelect;
|
||||
use migration::sea_orm::{
|
||||
self, ActiveModelTrait, ColumnTrait, DatabaseConnection, EntityTrait, IntoActiveModel,
|
||||
QueryFilter, TransactionTrait,
|
||||
self, ColumnTrait, DatabaseConnection, EntityTrait, IntoActiveModel, QueryFilter,
|
||||
TransactionTrait,
|
||||
};
|
||||
use migration::{Expr, LockType, OnConflict};
|
||||
use num_traits::ToPrimitive;
|
||||
use migration::sea_orm::{DatabaseTransaction, QuerySelect};
|
||||
use migration::{Expr, LockType, OnConflict, Order};
|
||||
use num_traits::{clamp, clamp_min, ToPrimitive};
|
||||
use parking_lot::Mutex;
|
||||
use std::num::NonZeroU64;
|
||||
use std::sync::atomic::{self, Ordering};
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
use self::stat_buffer::BufferedRpcQueryStats;
|
||||
|
||||
@ -57,6 +60,8 @@ pub struct RpcQueryStats {
|
||||
pub response_timestamp: i64,
|
||||
/// Credits used signifies how how much money was used up
|
||||
pub credits_used: Decimal,
|
||||
/// Last credits used
|
||||
pub latest_balance: Arc<RwLock<Decimal>>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, From, Hash, PartialEq, Eq)]
|
||||
@ -182,6 +187,13 @@ impl RpcQueryStats {
|
||||
}
|
||||
}
|
||||
|
||||
struct Deltas {
|
||||
sender_available_balance_delta: Decimal,
|
||||
sender_used_balance_delta: Decimal,
|
||||
sender_bonus_applied: bool,
|
||||
referrer_available_balance_delta: Decimal,
|
||||
}
|
||||
|
||||
/// A stat that we aggregate and then store in a database.
|
||||
/// For now there is just one, but I think there might be others later
|
||||
#[derive(Debug, From)]
|
||||
@ -215,31 +227,23 @@ impl BufferedRpcQueryStats {
|
||||
self.sum_credits_used += stat.credits_used;
|
||||
|
||||
// Also record the latest balance for this user ..
|
||||
self.latest_balance = stat
|
||||
.authorization
|
||||
.checks
|
||||
.balance
|
||||
.unwrap_or(Decimal::from(0));
|
||||
// Also subtract the used balance from the cache so we
|
||||
// TODO: We are already using the cache. We could also inject the cache into save_tsdb
|
||||
self.latest_balance = stat.latest_balance;
|
||||
}
|
||||
|
||||
// TODO: take a db transaction instead so that we can batch?
|
||||
async fn save_db(
|
||||
self,
|
||||
async fn _save_db_stats(
|
||||
&self,
|
||||
chain_id: u64,
|
||||
db_conn: &DatabaseConnection,
|
||||
key: RpcQueryKey,
|
||||
rpc_secret_key_cache: Option<&RpcSecretKeyCache>,
|
||||
key: &RpcQueryKey,
|
||||
rpc_secret_key_cache: &RpcSecretKeyCache,
|
||||
user_balance_cache: &UserBalanceCache,
|
||||
) -> Web3ProxyResult<()> {
|
||||
if key.response_timestamp == 0 {
|
||||
return Err(Web3ProxyError::Anyhow(anyhow!(
|
||||
"no response_timestamp! This is a bug! {:?} {:?}",
|
||||
key,
|
||||
self
|
||||
)));
|
||||
}
|
||||
|
||||
let period_datetime = Utc.timestamp_opt(key.response_timestamp, 0).unwrap();
|
||||
|
||||
// TODO: Could add last balance here (can take the element from the cache, and RpcQueryKey::AuthorizationCheck)
|
||||
|
||||
// =============================== //
|
||||
// UPDATE STATISTICS //
|
||||
// =============================== //
|
||||
@ -320,84 +324,80 @@ impl BufferedRpcQueryStats {
|
||||
.exec(db_conn)
|
||||
.await?;
|
||||
|
||||
// =============================== //
|
||||
// PREPARE FOR UPDATE USER BALANCE //
|
||||
// =============================== //
|
||||
let rpc_secret_key_id: u64 = match key.rpc_secret_key_id {
|
||||
Some(x) => x.into(),
|
||||
// Return early if the RPC key is not found, because then it is an anonymous user
|
||||
None => return Ok(()),
|
||||
};
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// =============================== //
|
||||
// GET ALL (STATIC) VARIABLES //
|
||||
// =============================== //
|
||||
// Get the user with that RPC key. This is also the referee
|
||||
|
||||
// Txn is not strictly necessary, but still good to keep things consistent across tables
|
||||
let txn = db_conn.begin().await?;
|
||||
|
||||
let sender_rpc_entity = rpc_key::Entity::find()
|
||||
.filter(rpc_key::Column::Id.eq(rpc_secret_key_id))
|
||||
.one(&txn)
|
||||
async fn _get_relevant_entities(
|
||||
&self,
|
||||
rpc_secret_key_id: &NonZeroU64,
|
||||
txn: &DatabaseTransaction,
|
||||
) -> Web3ProxyResult<(
|
||||
rpc_key::Model,
|
||||
balance::Model,
|
||||
Option<(referee::Model, referrer::Model)>,
|
||||
)> {
|
||||
// Only calculate, and update the user balance
|
||||
// Do not worry about referrers and all that
|
||||
let sender_rpc_entity: rpc_key::Model = rpc_key::Entity::find()
|
||||
.filter(rpc_key::Column::Id.eq(rpc_secret_key_id.get()))
|
||||
.one(txn)
|
||||
.await?
|
||||
.context("We previous checked that the id exists, this is likely some race condition, or it just got deleted!")?;
|
||||
|
||||
// (1) Do some general bookkeeping on the user
|
||||
if self.sum_credits_used == 0.into() {
|
||||
// return early because theres no need to touch the balance table
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let sender_balance = balance::Entity::find()
|
||||
let sender_balance: balance::Model = balance::Entity::find()
|
||||
.filter(balance::Column::UserId.eq(sender_rpc_entity.user_id))
|
||||
.one(db_conn)
|
||||
.one(txn)
|
||||
.await?
|
||||
.ok_or(Web3ProxyError::BadRequest(
|
||||
"Could not find rpc key in db".into(),
|
||||
))?;
|
||||
|
||||
// I think one lock here is fine, because only one server has access to the "credits_applied_for_referee" entry
|
||||
let referral_objects = match referee::Entity::find()
|
||||
.filter(referee::Column::UserId.eq(sender_rpc_entity.user_id))
|
||||
.lock(LockType::Update)
|
||||
.find_also_related(referrer::Entity)
|
||||
.one(&txn)
|
||||
.await?
|
||||
{
|
||||
Some(x) => Some((
|
||||
x.0,
|
||||
x.1.context("Could not fine corresponding referrer code")?,
|
||||
)),
|
||||
None => None,
|
||||
let referral_objects: Option<(referee::Model, referrer::Model)> =
|
||||
match referee::Entity::find()
|
||||
.filter(referee::Column::UserId.eq(sender_rpc_entity.user_id))
|
||||
.lock(LockType::Update)
|
||||
.find_also_related(referrer::Entity)
|
||||
.one(txn)
|
||||
.await?
|
||||
{
|
||||
Some(x) => Some((
|
||||
x.0,
|
||||
x.1.context("Could not fine corresponding referrer code")?,
|
||||
)),
|
||||
None => None,
|
||||
};
|
||||
|
||||
Ok((sender_rpc_entity, sender_balance, referral_objects))
|
||||
}
|
||||
|
||||
async fn _compute_balance_deltas(
|
||||
&self,
|
||||
referral_objects: Option<(referee::Model, referrer::Model)>,
|
||||
) -> Web3ProxyResult<(Deltas, Option<(referee::Model, referrer::Model)>)> {
|
||||
// Calculate Balance Only
|
||||
let mut deltas = Deltas {
|
||||
sender_available_balance_delta: -self.sum_credits_used,
|
||||
sender_used_balance_delta: self.sum_credits_used,
|
||||
sender_bonus_applied: false,
|
||||
referrer_available_balance_delta: Decimal::from(0),
|
||||
};
|
||||
|
||||
// ====================== //
|
||||
// INITIATE DELTAS //
|
||||
// ====================== //
|
||||
// Calculate Balance Only (No referrer)
|
||||
let mut sender_available_balance_delta = Decimal::from(-1) * self.sum_credits_used;
|
||||
let sender_used_balance_delta = self.sum_credits_used;
|
||||
let mut sender_bonus_applied;
|
||||
// Calculate Referrer Bonuses
|
||||
let mut referrer_balance_delta = Decimal::from(0);
|
||||
|
||||
// ============================================================ //
|
||||
// BASED ON REFERRERS, CALCULATE HOW MUCH SHOULD BE ATTRIBUTED //
|
||||
// ============================================================ //
|
||||
// If we don't lock the database as we do above on the referral_entry, we would have to do this operation on the database
|
||||
// Calculate a bunch using referrals as well
|
||||
if let Some((referral_entity, referrer_code_entity)) = referral_objects {
|
||||
sender_bonus_applied = referral_entity.credits_applied_for_referee;
|
||||
deltas.sender_bonus_applied = referral_entity.credits_applied_for_referee;
|
||||
|
||||
// Calculate if we are above the usage threshold, and apply a bonus
|
||||
// Optimally we would read this from the balance, but if we do it like this, we only have to lock a single table (much safer w.r.t. deadlocks)
|
||||
// referral_entity.credits_applied_for_referrer * (Decimal::from(10) checks (atomically using this table only), whether the user has brought in >$100 to the referer
|
||||
// In this case, the sender receives $100 as a bonus / gift
|
||||
if !referral_entity.credits_applied_for_referee
|
||||
&& (referral_entity.credits_applied_for_referrer * (Decimal::from(10))
|
||||
+ self.sum_credits_used)
|
||||
>= Decimal::from(100)
|
||||
{
|
||||
sender_available_balance_delta += Decimal::from(100);
|
||||
sender_bonus_applied = true;
|
||||
deltas.sender_available_balance_delta += Decimal::from(100);
|
||||
deltas.sender_bonus_applied = true;
|
||||
}
|
||||
|
||||
// Calculate how much the referrer should get, limited to the last 12 months
|
||||
@ -407,51 +407,29 @@ impl BufferedRpcQueryStats {
|
||||
+ Months::new(12);
|
||||
|
||||
if now <= valid_until {
|
||||
referrer_balance_delta += self.sum_credits_used / Decimal::new(10, 0);
|
||||
deltas.referrer_available_balance_delta +=
|
||||
self.sum_credits_used / Decimal::new(10, 0);
|
||||
}
|
||||
|
||||
// Do the referrer_entry updates
|
||||
if referrer_balance_delta > Decimal::from(0) {
|
||||
let referee_entry = referee::ActiveModel {
|
||||
id: sea_orm::Unchanged(referral_entity.id),
|
||||
referral_start_date: sea_orm::Unchanged(referral_entity.referral_start_date),
|
||||
used_referral_code: sea_orm::Unchanged(referral_entity.used_referral_code),
|
||||
user_id: sea_orm::Unchanged(referral_entity.user_id),
|
||||
|
||||
credits_applied_for_referee: sea_orm::Set(sender_bonus_applied),
|
||||
credits_applied_for_referrer: sea_orm::Set(referrer_balance_delta),
|
||||
};
|
||||
referee::Entity::insert(referee_entry)
|
||||
.on_conflict(
|
||||
OnConflict::new()
|
||||
.values([
|
||||
(
|
||||
referee::Column::CreditsAppliedForReferee,
|
||||
// Make it a "Set"
|
||||
Expr::col(referee::Column::CreditsAppliedForReferee)
|
||||
.eq(sender_bonus_applied),
|
||||
),
|
||||
(
|
||||
referee::Column::CreditsAppliedForReferrer,
|
||||
Expr::col(referee::Column::CreditsAppliedForReferrer)
|
||||
.add(referrer_balance_delta),
|
||||
),
|
||||
])
|
||||
.to_owned(),
|
||||
)
|
||||
.exec(&txn)
|
||||
.await?
|
||||
.last_insert_id;
|
||||
}
|
||||
return Ok((deltas, Some((referral_entity, referrer_code_entity))));
|
||||
}
|
||||
|
||||
// ================================= //
|
||||
// UPDATE REFERRER & USER BALANCE //
|
||||
// ================================= //
|
||||
Ok((deltas, None))
|
||||
}
|
||||
|
||||
/// Save all referral-based objects in the database
|
||||
async fn _update_balances_in_db(
|
||||
&self,
|
||||
deltas: &Deltas,
|
||||
txn: &DatabaseTransaction,
|
||||
sender_rpc_entity: &rpc_key::Model,
|
||||
referral_objects: &Option<(referee::Model, referrer::Model)>,
|
||||
) -> Web3ProxyResult<()> {
|
||||
// Do the user updates
|
||||
let user_balance = balance::ActiveModel {
|
||||
id: sea_orm::NotSet,
|
||||
available_balance: sea_orm::Set(sender_available_balance_delta),
|
||||
used_balance: sea_orm::Set(sender_used_balance_delta),
|
||||
available_balance: sea_orm::Set(deltas.sender_available_balance_delta),
|
||||
used_balance: sea_orm::Set(deltas.sender_used_balance_delta),
|
||||
user_id: sea_orm::Set(sender_rpc_entity.user_id),
|
||||
};
|
||||
|
||||
@ -462,45 +440,216 @@ impl BufferedRpcQueryStats {
|
||||
(
|
||||
balance::Column::AvailableBalance,
|
||||
Expr::col(balance::Column::AvailableBalance)
|
||||
.add(sender_available_balance_delta),
|
||||
.add(deltas.sender_available_balance_delta),
|
||||
),
|
||||
(
|
||||
balance::Column::UsedBalance,
|
||||
Expr::col(balance::Column::UsedBalance).add(sender_used_balance_delta),
|
||||
Expr::col(balance::Column::UsedBalance)
|
||||
.add(deltas.sender_used_balance_delta),
|
||||
),
|
||||
])
|
||||
.to_owned(),
|
||||
)
|
||||
.exec(&txn)
|
||||
.await?
|
||||
.last_insert_id;
|
||||
.exec(txn)
|
||||
.await?;
|
||||
|
||||
if referrer_balance_delta > Decimal::from(0) {
|
||||
let user_balance = balance::ActiveModel {
|
||||
id: sea_orm::NotSet,
|
||||
available_balance: sea_orm::Set(referrer_balance_delta),
|
||||
used_balance: sea_orm::Set(Decimal::from(0)),
|
||||
user_id: sea_orm::Set(sender_rpc_entity.user_id),
|
||||
};
|
||||
// Do the referrer_entry updates
|
||||
if let Some((referral_entity, referrer_code_entity)) = referral_objects {
|
||||
if deltas.referrer_available_balance_delta > Decimal::from(0) {
|
||||
let referee_entry = referee::ActiveModel {
|
||||
id: sea_orm::Unchanged(referral_entity.id),
|
||||
referral_start_date: sea_orm::Unchanged(referral_entity.referral_start_date),
|
||||
used_referral_code: sea_orm::Unchanged(referral_entity.used_referral_code),
|
||||
user_id: sea_orm::Unchanged(referral_entity.user_id),
|
||||
|
||||
let _ = balance::Entity::insert(user_balance)
|
||||
.on_conflict(
|
||||
OnConflict::new()
|
||||
.values([(
|
||||
balance::Column::AvailableBalance,
|
||||
Expr::col(balance::Column::AvailableBalance)
|
||||
.add(referrer_balance_delta),
|
||||
)])
|
||||
.to_owned(),
|
||||
)
|
||||
.exec(&txn)
|
||||
.await?
|
||||
.last_insert_id;
|
||||
credits_applied_for_referee: sea_orm::Set(deltas.sender_bonus_applied),
|
||||
credits_applied_for_referrer: sea_orm::Set(
|
||||
deltas.referrer_available_balance_delta,
|
||||
),
|
||||
};
|
||||
referee::Entity::insert(referee_entry)
|
||||
.on_conflict(
|
||||
OnConflict::new()
|
||||
.values([
|
||||
(
|
||||
referee::Column::CreditsAppliedForReferee,
|
||||
// Make it a "Set"
|
||||
Expr::col(referee::Column::CreditsAppliedForReferee)
|
||||
.eq(deltas.sender_bonus_applied),
|
||||
),
|
||||
(
|
||||
referee::Column::CreditsAppliedForReferrer,
|
||||
Expr::col(referee::Column::CreditsAppliedForReferrer)
|
||||
.add(deltas.referrer_available_balance_delta),
|
||||
),
|
||||
])
|
||||
.to_owned(),
|
||||
)
|
||||
.exec(txn)
|
||||
.await?
|
||||
.last_insert_id;
|
||||
|
||||
let user_balance = balance::ActiveModel {
|
||||
id: sea_orm::NotSet,
|
||||
available_balance: sea_orm::Set(deltas.referrer_available_balance_delta),
|
||||
used_balance: sea_orm::Set(Decimal::from(0)),
|
||||
user_id: sea_orm::Set(referral_entity.user_id),
|
||||
};
|
||||
|
||||
let _ = balance::Entity::insert(user_balance)
|
||||
.on_conflict(
|
||||
OnConflict::new()
|
||||
.values([(
|
||||
balance::Column::AvailableBalance,
|
||||
Expr::col(balance::Column::AvailableBalance)
|
||||
.add(deltas.referrer_available_balance_delta),
|
||||
)])
|
||||
.to_owned(),
|
||||
)
|
||||
.exec(txn)
|
||||
.await?;
|
||||
}
|
||||
};
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Update & Invalidate cache if user is below 10$ credits (premium downgrade condition)
|
||||
/// Reduce credits if there was no issue
|
||||
/// This is not atomic, so this may be an issue because it's not sequentially consistent across threads
|
||||
/// It is a good-enough approximation though, and if the TTL for the balance cache is high enough, this should be ok
|
||||
async fn _update_balance_in_cache(
|
||||
&self,
|
||||
deltas: &Deltas,
|
||||
txn: &DatabaseTransaction,
|
||||
sender_rpc_entity: &rpc_key::Model,
|
||||
referral_objects: &Option<(referee::Model, referrer::Model)>,
|
||||
rpc_secret_key_cache: &RpcSecretKeyCache,
|
||||
user_balance_cache: &UserBalanceCache,
|
||||
) -> Web3ProxyResult<()> {
|
||||
// ==================
|
||||
// Modify sender balance
|
||||
// ==================
|
||||
let sender_latest_balance = match NonZeroU64::try_from(sender_rpc_entity.user_id) {
|
||||
Err(_) => Err(Web3ProxyError::BadResponse(
|
||||
"Balance is not positive, although it was previously checked to be as such!"
|
||||
.to_string(),
|
||||
)),
|
||||
// We don't do an get_or_insert, because technically we don't have the most up to date balance
|
||||
// Also let's keep things simple in terms of writing and getting. A single place writes it, multiple places can remove / poll it
|
||||
Ok(x) => Ok(user_balance_cache.get(&x)),
|
||||
}?;
|
||||
let sender_latest_balance = match sender_latest_balance {
|
||||
Some(x) => x,
|
||||
// If not in cache, nothing to update theoretically
|
||||
None => return Ok(()),
|
||||
};
|
||||
let mut latest_balance = sender_latest_balance.write().await;
|
||||
let balance_before = (*latest_balance).clone();
|
||||
// Now modify the balance
|
||||
*latest_balance = *latest_balance + deltas.sender_available_balance_delta;
|
||||
if *latest_balance < Decimal::from(0) {
|
||||
*latest_balance = Decimal::from(0);
|
||||
}
|
||||
|
||||
// ================================ //
|
||||
// TODO: REFRESH USER ROLE IN CACHE //
|
||||
// ================================ //
|
||||
// Also check if the referrer is premium (thought above 10$ will always be treated as premium at least)
|
||||
// Should only refresh cache if the premium threshold is crossed
|
||||
if balance_before > Decimal::from(0) && *latest_balance == Decimal::from(0) {
|
||||
let rpc_keys = rpc_key::Entity::find()
|
||||
.filter(rpc_key::Column::UserId.eq(sender_rpc_entity.user_id))
|
||||
.all(txn)
|
||||
.await?;
|
||||
|
||||
for rpc_key_entity in rpc_keys {
|
||||
// TODO: Not sure which one was inserted, just delete both ...
|
||||
rpc_secret_key_cache.remove(&rpc_key_entity.secret_key.into());
|
||||
}
|
||||
|
||||
if let Ok(non_zero_user_id) = NonZeroU64::try_from(sender_rpc_entity.user_id) {
|
||||
user_balance_cache.remove(&non_zero_user_id);
|
||||
}
|
||||
}
|
||||
|
||||
// ==================
|
||||
// Modify referrer balance
|
||||
// ==================
|
||||
// If the referrer object is empty, we don't care about the cache, becase this will be fetched in a next request from the database
|
||||
if let Some((referral_entity, _)) = referral_objects {
|
||||
if let Ok(referrer_user_id) = NonZeroU64::try_from(referral_entity.user_id) {
|
||||
// If the referrer object is in the cache, we just remove it from the balance cache; it will be reloaded next time
|
||||
// Get all the RPC keys, delete them from cache
|
||||
|
||||
// In principle, do not remove the cache for the referrer; the next reload will trigger premium
|
||||
// We don't touch the RPC keys at this stage for the refferer, a payment must be paid to reset those (we want to keep things simple here)
|
||||
// Anyways, the RPC keys will be updated in 5 min (600 seconds)
|
||||
user_balance_cache.remove(&referrer_user_id);
|
||||
}
|
||||
};
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// TODO: take a db transaction instead so that we can batch?
|
||||
async fn save_db(
|
||||
self,
|
||||
chain_id: u64,
|
||||
db_conn: &DatabaseConnection,
|
||||
key: RpcQueryKey,
|
||||
rpc_secret_key_cache: &RpcSecretKeyCache,
|
||||
user_balance_cache: &UserBalanceCache,
|
||||
) -> Web3ProxyResult<()> {
|
||||
if key.response_timestamp == 0 {
|
||||
return Err(Web3ProxyError::Anyhow(anyhow!(
|
||||
"no response_timestamp! This is a bug! {:?} {:?}",
|
||||
key,
|
||||
self
|
||||
)));
|
||||
}
|
||||
|
||||
// First of all, save the statistics to the database:
|
||||
self._save_db_stats(
|
||||
chain_id,
|
||||
db_conn,
|
||||
&key,
|
||||
rpc_secret_key_cache,
|
||||
user_balance_cache,
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Return early if no credits were used, or if user is anonymous
|
||||
if self.sum_credits_used == 0.into() {
|
||||
return Ok(());
|
||||
}
|
||||
let rpc_secret_key_id: &NonZeroU64 = match &key.rpc_secret_key_id {
|
||||
Some(x) => x.into(),
|
||||
None => return Ok(()),
|
||||
};
|
||||
|
||||
// Start a transaction
|
||||
let txn = db_conn.begin().await?;
|
||||
// Fetch any items that we will be modifying
|
||||
let (sender_rpc_entity, _sender_balance, referral_objects) =
|
||||
self._get_relevant_entities(rpc_secret_key_id, &txn).await?;
|
||||
|
||||
// Compute Changes in balance for user and referrer, incl. referral logic //
|
||||
let (deltas, referral_objects): (Deltas, Option<(referee::Model, referrer::Model)>) =
|
||||
self._compute_balance_deltas(referral_objects).await?;
|
||||
|
||||
// Update balances in the database
|
||||
self._update_balances_in_db(&deltas, &txn, &sender_rpc_entity, &referral_objects)
|
||||
.await?;
|
||||
|
||||
// Update balanaces in the cache
|
||||
self._update_balance_in_cache(
|
||||
&deltas,
|
||||
&txn,
|
||||
&sender_rpc_entity,
|
||||
&referral_objects,
|
||||
rpc_secret_key_cache,
|
||||
user_balance_cache,
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Finally commit the transaction in the database
|
||||
txn.commit()
|
||||
.await
|
||||
.context("Failed to update referral and balance updates")?;
|
||||
@ -526,6 +675,12 @@ impl BufferedRpcQueryStats {
|
||||
builder = builder.tag("method", method);
|
||||
}
|
||||
|
||||
// Read the latest balance ...
|
||||
let balance;
|
||||
{
|
||||
balance = *(self.latest_balance.read().await);
|
||||
}
|
||||
|
||||
builder = builder
|
||||
.tag("archive_needed", key.archive_needed.to_string())
|
||||
.tag("error_response", key.error_response.to_string())
|
||||
@ -544,13 +699,11 @@ impl BufferedRpcQueryStats {
|
||||
"sum_credits_used",
|
||||
self.sum_credits_used
|
||||
.to_f64()
|
||||
.expect("number is really (too) large"),
|
||||
.context("number is really (too) large")?,
|
||||
)
|
||||
.field(
|
||||
"balance",
|
||||
self.latest_balance
|
||||
.to_f64()
|
||||
.expect("number is really (too) large"),
|
||||
balance.to_f64().context("number is really (too) large")?,
|
||||
);
|
||||
|
||||
// .round() as i64
|
||||
@ -630,6 +783,8 @@ impl TryFrom<RequestMetadata> for RpcQueryStats {
|
||||
response_millis,
|
||||
response_timestamp,
|
||||
credits_used,
|
||||
// To we need to clone it here ... (?)
|
||||
latest_balance: metadata.latest_balance.clone(),
|
||||
};
|
||||
|
||||
Ok(x)
|
||||
|
@ -1,5 +1,5 @@
|
||||
use super::{AppStat, RpcQueryKey};
|
||||
use crate::app::{RpcSecretKeyCache, Web3ProxyJoinHandle};
|
||||
use crate::app::{RpcSecretKeyCache, UserBalanceCache, Web3ProxyJoinHandle};
|
||||
use crate::errors::Web3ProxyResult;
|
||||
use derive_more::From;
|
||||
use futures::stream;
|
||||
@ -8,8 +8,9 @@ use influxdb2::api::write::TimestampPrecision;
|
||||
use log::{error, info, trace};
|
||||
use migration::sea_orm::prelude::Decimal;
|
||||
use migration::sea_orm::DatabaseConnection;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::sync::{broadcast, RwLock};
|
||||
use tokio::time::interval;
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
@ -25,7 +26,7 @@ pub struct BufferedRpcQueryStats {
|
||||
pub sum_response_millis: u64,
|
||||
pub sum_credits_used: Decimal,
|
||||
/// Balance tells us the user's balance at this point in time
|
||||
pub latest_balance: Decimal,
|
||||
pub latest_balance: Arc<RwLock<Decimal>>,
|
||||
}
|
||||
|
||||
#[derive(From)]
|
||||
@ -43,7 +44,8 @@ pub struct StatBuffer {
|
||||
global_timeseries_buffer: HashMap<RpcQueryKey, BufferedRpcQueryStats>,
|
||||
influxdb_client: Option<influxdb2::Client>,
|
||||
opt_in_timeseries_buffer: HashMap<RpcQueryKey, BufferedRpcQueryStats>,
|
||||
rpc_secret_key_cache: Option<RpcSecretKeyCache>,
|
||||
rpc_secret_key_cache: RpcSecretKeyCache,
|
||||
user_balance_cache: UserBalanceCache,
|
||||
timestamp_precision: TimestampPrecision,
|
||||
tsdb_save_interval_seconds: u32,
|
||||
}
|
||||
@ -58,6 +60,7 @@ impl StatBuffer {
|
||||
db_save_interval_seconds: u32,
|
||||
influxdb_client: Option<influxdb2::Client>,
|
||||
rpc_secret_key_cache: Option<RpcSecretKeyCache>,
|
||||
user_balance_cache: Option<UserBalanceCache>,
|
||||
shutdown_receiver: broadcast::Receiver<()>,
|
||||
tsdb_save_interval_seconds: u32,
|
||||
) -> anyhow::Result<Option<SpawnedStatBuffer>> {
|
||||
@ -77,7 +80,8 @@ impl StatBuffer {
|
||||
global_timeseries_buffer: Default::default(),
|
||||
influxdb_client,
|
||||
opt_in_timeseries_buffer: Default::default(),
|
||||
rpc_secret_key_cache,
|
||||
rpc_secret_key_cache: rpc_secret_key_cache.unwrap(),
|
||||
user_balance_cache: user_balance_cache.unwrap(),
|
||||
timestamp_precision,
|
||||
tsdb_save_interval_seconds,
|
||||
};
|
||||
@ -183,7 +187,8 @@ impl StatBuffer {
|
||||
self.chain_id,
|
||||
db_conn,
|
||||
key,
|
||||
self.rpc_secret_key_cache.as_ref(),
|
||||
&self.rpc_secret_key_cache,
|
||||
&self.user_balance_cache,
|
||||
)
|
||||
.await
|
||||
{
|
||||
|
Loading…
Reference in New Issue
Block a user