move balance onto AuthorizationChecks (#131)

* move balance onto AuthorizationChecks

* todone

* bigger default

* flat cost for testing
This commit is contained in:
Bryan Stitt 2023-06-16 23:14:43 -07:00 committed by GitHub
parent 5d9365449f
commit 86f9e7358a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 216 additions and 185 deletions

@ -4,7 +4,8 @@ use crate::block_number::{block_needed, BlockNeeded};
use crate::config::{AppConfig, TopConfig}; use crate::config::{AppConfig, TopConfig};
use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult}; use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult};
use crate::frontend::authorization::{ use crate::frontend::authorization::{
Authorization, RequestMetadata, RequestOrMethod, ResponseOrBytes, RpcSecretKey, Authorization, AuthorizationChecks, Balance, RequestMetadata, RequestOrMethod, ResponseOrBytes,
RpcSecretKey,
}; };
use crate::frontend::rpc_proxy_ws::ProxyMode; use crate::frontend::rpc_proxy_ws::ProxyMode;
use crate::jsonrpc::{ use crate::jsonrpc::{
@ -24,12 +25,10 @@ use crate::rpcs::transactions::TxStatus;
use crate::stats::{AppStat, StatBuffer}; use crate::stats::{AppStat, StatBuffer};
use crate::user_token::UserBearerToken; use crate::user_token::UserBearerToken;
use anyhow::Context; use anyhow::Context;
use axum::headers::{Origin, Referer, UserAgent};
use axum::http::StatusCode; use axum::http::StatusCode;
use chrono::Utc; use chrono::Utc;
use deferred_rate_limiter::DeferredRateLimiter; use deferred_rate_limiter::DeferredRateLimiter;
use derive_more::From; use derive_more::From;
use entities::sea_orm_active_enums::TrackingLevel;
use entities::user; use entities::user;
use ethers::core::utils::keccak256; use ethers::core::utils::keccak256;
use ethers::prelude::{Address, Bytes, Transaction, TxHash, H256, U64}; use ethers::prelude::{Address, Bytes, Transaction, TxHash, H256, U64};
@ -38,11 +37,10 @@ use ethers::utils::rlp::{Decodable, Rlp};
use futures::future::join_all; use futures::future::join_all;
use futures::stream::{FuturesUnordered, StreamExt}; use futures::stream::{FuturesUnordered, StreamExt};
use hashbrown::{HashMap, HashSet}; use hashbrown::{HashMap, HashSet};
use ipnet::IpNet;
use log::{error, info, trace, warn, Level}; use log::{error, info, trace, warn, Level};
use migration::sea_orm::prelude::Decimal;
use migration::sea_orm::{DatabaseTransaction, EntityTrait, PaginatorTrait, TransactionTrait}; use migration::sea_orm::{DatabaseTransaction, EntityTrait, PaginatorTrait, TransactionTrait};
use moka::future::{Cache, CacheBuilder}; use moka::future::{Cache, CacheBuilder};
use parking_lot::RwLock;
use redis_rate_limiter::redis::AsyncCommands; use redis_rate_limiter::redis::AsyncCommands;
use redis_rate_limiter::{redis, DeadpoolRuntime, RedisConfig, RedisPool, RedisRateLimiter}; use redis_rate_limiter::{redis, DeadpoolRuntime, RedisConfig, RedisPool, RedisRateLimiter};
use serde::Serialize; use serde::Serialize;
@ -54,7 +52,7 @@ use std::num::NonZeroU64;
use std::str::FromStr; use std::str::FromStr;
use std::sync::{atomic, Arc}; use std::sync::{atomic, Arc};
use std::time::Duration; use std::time::Duration;
use tokio::sync::{broadcast, watch, RwLock, Semaphore}; use tokio::sync::{broadcast, watch, Semaphore};
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use tokio::time::timeout; use tokio::time::timeout;
@ -67,50 +65,16 @@ pub static APP_USER_AGENT: &str = concat!(
env!("CARGO_PKG_VERSION") env!("CARGO_PKG_VERSION")
); );
// aggregate across 1 week /// aggregate across 1 week
pub const BILLING_PERIOD_SECONDS: i64 = 60 * 60 * 24 * 7; pub const BILLING_PERIOD_SECONDS: i64 = 60 * 60 * 24 * 7;
/// Convenience type
pub type Web3ProxyJoinHandle<T> = JoinHandle<Web3ProxyResult<T>>; pub type Web3ProxyJoinHandle<T> = JoinHandle<Web3ProxyResult<T>>;
/// TODO: move this
#[derive(Clone, Debug, Default, From)]
pub struct AuthorizationChecks {
/// database id of the primary user. 0 if anon
/// TODO: do we need this? its on the authorization so probably not
/// TODO: `Option<NonZeroU64>`? they are actual zeroes some places in the db now
pub user_id: u64,
/// the key used (if any)
pub rpc_secret_key: Option<RpcSecretKey>,
/// database id of the rpc key
/// if this is None, then this request is being rate limited by ip
pub rpc_secret_key_id: Option<NonZeroU64>,
/// if None, allow unlimited queries. inherited from the user_tier
pub max_requests_per_period: Option<u64>,
// if None, allow unlimited concurrent requests. inherited from the user_tier
pub max_concurrent_requests: Option<u32>,
/// if None, allow any Origin
pub allowed_origins: Option<Vec<Origin>>,
/// if None, allow any Referer
pub allowed_referers: Option<Vec<Referer>>,
/// if None, allow any UserAgent
pub allowed_user_agents: Option<Vec<UserAgent>>,
/// if None, allow any IP Address
pub allowed_ips: Option<Vec<IpNet>>,
/// how detailed any rpc account entries should be
pub tracking_level: TrackingLevel,
/// Chance to save reverting eth_call, eth_estimateGas, and eth_sendRawTransaction to the database.
/// depending on the caller, errors might be expected. this keeps us from bloating our database
/// u16::MAX == 100%
pub log_revert_chance: u16,
/// if true, transactions are broadcast only to private mempools.
/// IMPORTANT! Once confirmed by a miner, they will be public on the blockchain!
pub private_txs: bool,
pub proxy_mode: ProxyMode,
}
/// Cache data from the database about rpc keys /// Cache data from the database about rpc keys
pub type RpcSecretKeyCache = Cache<RpcSecretKey, AuthorizationChecks>; pub type RpcSecretKeyCache = Cache<RpcSecretKey, AuthorizationChecks>;
pub type UserBalanceCache = Cache<NonZeroU64, Arc<RwLock<Decimal>>>; /// Cache data from the database about user balances
pub type UserBalanceCache = Cache<NonZeroU64, Arc<RwLock<Balance>>>;
/// The application /// The application
// TODO: i'm sure this is more arcs than necessary, but spawning futures makes references hard // TODO: i'm sure this is more arcs than necessary, but spawning futures makes references hard
@ -193,7 +157,6 @@ pub async fn flatten_handles<T>(
Ok(Ok(_)) => continue, Ok(Ok(_)) => continue,
} }
} }
Ok(()) Ok(())
} }
@ -208,7 +171,7 @@ pub struct Web3ProxyAppSpawn {
pub background_handles: FuturesUnordered<Web3ProxyJoinHandle<()>>, pub background_handles: FuturesUnordered<Web3ProxyJoinHandle<()>>,
/// config changes are sent here /// config changes are sent here
pub new_top_config_sender: watch::Sender<TopConfig>, pub new_top_config_sender: watch::Sender<TopConfig>,
/// watch this to know when to start the app /// watch this to know when the app is ready to serve requests
pub consensus_connections_watcher: watch::Receiver<Option<Arc<ConsensusWeb3Rpcs>>>, pub consensus_connections_watcher: watch::Receiver<Option<Arc<ConsensusWeb3Rpcs>>>,
} }

@ -201,7 +201,6 @@ impl MigrateStatsToV2 {
start_instant: Instant::now(), start_instant: Instant::now(),
stat_sender: Some(stat_sender.clone()), stat_sender: Some(stat_sender.clone()),
request_ulid, request_ulid,
latest_balance,
}; };
if let Some(x) = request_metadata.try_send_stat()? { if let Some(x) = request_metadata.try_send_stat()? {

@ -1,7 +1,7 @@
//! Utilities for authorization of logged in and anonymous users. //! Utilities for authorization of logged in and anonymous users.
use super::rpc_proxy_ws::ProxyMode; use super::rpc_proxy_ws::ProxyMode;
use crate::app::{AuthorizationChecks, Web3ProxyApp, APP_USER_AGENT}; use crate::app::{Web3ProxyApp, APP_USER_AGENT};
use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult}; use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult};
use crate::jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest}; use crate::jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest};
use crate::rpcs::one::Web3Rpc; use crate::rpcs::one::Web3Rpc;
@ -25,6 +25,7 @@ use ipnet::IpNet;
use log::{error, trace, warn}; use log::{error, trace, warn};
use migration::sea_orm::prelude::Decimal; use migration::sea_orm::prelude::Decimal;
use migration::sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter}; use migration::sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter};
use parking_lot::RwLock;
use rdkafka::message::{Header as KafkaHeader, OwnedHeaders as KafkaOwnedHeaders, OwnedMessage}; use rdkafka::message::{Header as KafkaHeader, OwnedHeaders as KafkaOwnedHeaders, OwnedMessage};
use rdkafka::producer::{FutureProducer, FutureRecord}; use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::util::Timeout as KafkaTimeout; use rdkafka::util::Timeout as KafkaTimeout;
@ -37,7 +38,7 @@ use std::num::NonZeroU64;
use std::sync::atomic::{self, AtomicBool, AtomicI64, AtomicU64, AtomicUsize}; use std::sync::atomic::{self, AtomicBool, AtomicI64, AtomicU64, AtomicUsize};
use std::time::Duration; use std::time::Duration;
use std::{net::IpAddr, str::FromStr, sync::Arc}; use std::{net::IpAddr, str::FromStr, sync::Arc};
use tokio::sync::{OwnedSemaphorePermit, RwLock, Semaphore}; use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use tokio::time::Instant; use tokio::time::Instant;
use ulid::Ulid; use ulid::Ulid;
@ -69,6 +70,56 @@ pub enum AuthorizationType {
Frontend, Frontend,
} }
#[derive(Clone, Debug, Default)]
pub struct Balance {
pub total_deposit: Decimal,
pub total_spend: Decimal,
}
impl Balance {
pub fn remaining(&self) -> Decimal {
self.total_deposit - self.total_spend
}
}
/// TODO: move this
#[derive(Clone, Debug, Default, From)]
pub struct AuthorizationChecks {
/// database id of the primary user. 0 if anon
/// TODO: do we need this? its on the authorization so probably not
/// TODO: `Option<NonZeroU64>`? they are actual zeroes some places in the db now
pub user_id: u64,
/// locally cached balance that may drift slightly if the user is on multiple servers
pub latest_balance: Arc<RwLock<Balance>>,
/// the key used (if any)
pub rpc_secret_key: Option<RpcSecretKey>,
/// database id of the rpc key
/// if this is None, then this request is being rate limited by ip
pub rpc_secret_key_id: Option<NonZeroU64>,
/// if None, allow unlimited queries. inherited from the user_tier
pub max_requests_per_period: Option<u64>,
// if None, allow unlimited concurrent requests. inherited from the user_tier
pub max_concurrent_requests: Option<u32>,
/// if None, allow any Origin
pub allowed_origins: Option<Vec<Origin>>,
/// if None, allow any Referer
pub allowed_referers: Option<Vec<Referer>>,
/// if None, allow any UserAgent
pub allowed_user_agents: Option<Vec<UserAgent>>,
/// if None, allow any IP Address
pub allowed_ips: Option<Vec<IpNet>>,
/// how detailed any rpc account entries should be
pub tracking_level: TrackingLevel,
/// Chance to save reverting eth_call, eth_estimateGas, and eth_sendRawTransaction to the database.
/// depending on the caller, errors might be expected. this keeps us from bloating our database
/// u16::MAX == 100%
pub log_revert_chance: u16,
/// if true, transactions are broadcast only to private mempools.
/// IMPORTANT! Once confirmed by a miner, they will be public on the blockchain!
pub private_txs: bool,
pub proxy_mode: ProxyMode,
}
/// TODO: include the authorization checks in this? /// TODO: include the authorization checks in this?
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct Authorization { pub struct Authorization {
@ -90,6 +141,7 @@ pub struct KafkaDebugLogger {
num_responses: AtomicUsize, num_responses: AtomicUsize,
} }
/// Ulids and Uuids matching the same bits hash the same
impl Hash for RpcSecretKey { impl Hash for RpcSecretKey {
fn hash<H: Hasher>(&self, state: &mut H) { fn hash<H: Hasher>(&self, state: &mut H) {
let x = match self { let x = match self {
@ -281,9 +333,6 @@ pub struct RequestMetadata {
/// Cancel-safe channel for sending stats to the buffer /// Cancel-safe channel for sending stats to the buffer
pub stat_sender: Option<flume::Sender<AppStat>>, pub stat_sender: Option<flume::Sender<AppStat>>,
/// Latest balance
pub latest_balance: Arc<RwLock<Decimal>>,
} }
impl Default for Authorization { impl Default for Authorization {
@ -310,7 +359,6 @@ impl Default for RequestMetadata {
response_timestamp: Default::default(), response_timestamp: Default::default(),
start_instant: Instant::now(), start_instant: Instant::now(),
stat_sender: Default::default(), stat_sender: Default::default(),
latest_balance: Default::default(),
} }
} }
} }
@ -322,6 +370,17 @@ impl RequestMetadata {
.map(|x| x.checks.proxy_mode) .map(|x| x.checks.proxy_mode)
.unwrap_or_default() .unwrap_or_default()
} }
/// this may drift slightly if multiple servers are handling the same users, but should be close
pub async fn latest_balance(&self) -> Option<Decimal> {
if let Some(x) = self.authorization.as_ref() {
let x = x.checks.latest_balance.read().remaining();
Some(x)
} else {
None
}
}
} }
#[derive(From)] #[derive(From)]
@ -436,36 +495,22 @@ impl RequestMetadata {
} }
} }
// Get latest balance from cache
let latest_balance = match app
.balance_checks(authorization.checks.user_id)
.await
.context("Could not retrieve balance from database or cache!")
{
Ok(x) => x,
Err(err) => {
error!("{}", err);
Arc::new(RwLock::new(Decimal::default()))
}
};
let x = Self { let x = Self {
archive_request: false.into(), archive_request: false.into(),
authorization: Some(authorization),
backend_requests: Default::default(), backend_requests: Default::default(),
error_response: false.into(), error_response: false.into(),
kafka_debug_logger, kafka_debug_logger,
no_servers: 0.into(),
authorization: Some(authorization),
request_bytes,
method, method,
no_servers: 0.into(),
request_bytes,
request_ulid,
response_bytes: 0.into(), response_bytes: 0.into(),
response_from_backup_rpc: false.into(), response_from_backup_rpc: false.into(),
response_millis: 0.into(), response_millis: 0.into(),
request_ulid,
response_timestamp: 0.into(), response_timestamp: 0.into(),
start_instant: Instant::now(), start_instant: Instant::now(),
stat_sender: app.stat_sender.clone(), stat_sender: app.stat_sender.clone(),
latest_balance,
}; };
Arc::new(x) Arc::new(x)
@ -1134,24 +1179,28 @@ impl Web3ProxyApp {
pub(crate) async fn balance_checks( pub(crate) async fn balance_checks(
&self, &self,
user_id: u64, user_id: u64,
) -> Web3ProxyResult<Arc<RwLock<Decimal>>> { ) -> Web3ProxyResult<Arc<RwLock<Balance>>> {
match NonZeroU64::try_from(user_id) { match NonZeroU64::try_from(user_id) {
Err(_) => Ok(Arc::new(RwLock::new(Decimal::default()))), Err(_) => Ok(Arc::new(Default::default())),
Ok(x) => self Ok(x) => self
.user_balance_cache .user_balance_cache
.try_get_with_by_ref(&x, async move { .try_get_with(x, async move {
let db_replica = self let db_replica = self
.db_replica() .db_replica()
.web3_context("Getting database connection")?; .web3_context("Getting database connection")?;
let balance: Decimal = match balance::Entity::find() let balance = match balance::Entity::find()
.filter(balance::Column::UserId.eq(user_id)) .filter(balance::Column::UserId.eq(user_id))
.one(db_replica.as_ref()) .one(db_replica.as_ref())
.await? .await?
{ {
Some(x) => x.total_deposits - x.total_spent_outside_free_tier, Some(x) => Balance {
None => Decimal::default(), total_deposit: x.total_deposits,
total_spend: x.total_spent_outside_free_tier,
},
None => Default::default(),
}; };
Ok(Arc::new(RwLock::new(balance))) Ok(Arc::new(RwLock::new(balance)))
}) })
.await .await
@ -1248,54 +1297,62 @@ impl Web3ProxyApp {
let user_model = user::Entity::find_by_id(rpc_key_model.user_id) let user_model = user::Entity::find_by_id(rpc_key_model.user_id)
.one(db_replica.as_ref()) .one(db_replica.as_ref())
.await? .await?
.context("user model was not found, but every rpc_key should have a user")?; .context(
"user model was not found, but every rpc_key should have a user",
)?;
let balance = match balance::Entity::find() let mut user_tier_model = user_tier::Entity::find_by_id(
.filter(balance::Column::UserId.eq(user_model.id)) user_model.user_tier_id,
.one(db_replica.as_ref()) )
.await? {
Some(x) => x.total_deposits - x.total_spent_outside_free_tier,
None => Decimal::default()
};
let mut user_tier_model =
user_tier::Entity::find_by_id(user_model.user_tier_id)
.one(db_replica.as_ref()) .one(db_replica.as_ref())
.await? .await?
.context("related user tier not found, but every user should have a tier")?; .context(
"related user tier not found, but every user should have a tier",
)?;
let latest_balance = self.balance_checks(rpc_key_model.user_id).await?;
// TODO: Do the logic here, as to how to treat the user, based on balance and initial check // TODO: Do the logic here, as to how to treat the user, based on balance and initial check
// Clear the cache (not the login!) in the stats if a tier-change happens (clear, but don't modify roles) // Clear the cache (not the login!) in the stats if a tier-change happens (clear, but don't modify roles)
if user_tier_model.title == "Premium" && balance < Decimal::new(1, 1) {
// Find the equivalent downgraded user tier, and modify limits from there
if let Some(downgrade_user_tier) = user_tier_model.downgrade_tier_id { if let Some(downgrade_user_tier) = user_tier_model.downgrade_tier_id {
let balance = latest_balance.read().clone();
// only consider the user premium if they have paid at least $10 and have a balance > $.01
// otherwise, set user_tier_model to the downograded tier
if balance.total_deposit < Decimal::from(10)
|| balance.remaining() < Decimal::new(1, 2)
{
user_tier_model = user_tier_model =
user_tier::Entity::find_by_id(downgrade_user_tier) user_tier::Entity::find_by_id(downgrade_user_tier)
.one(db_replica.as_ref()) .one(db_replica.as_ref())
.await? .await?
.context("finding the downgrade user tier for premium did not work for premium")?; .context(format!(
} else { "downgrade user tier ({}) is missing!",
return Err(Web3ProxyError::InvalidUserTier); downgrade_user_tier
))?;
} }
} }
let rpc_key_id = Some(rpc_key_model.id.try_into().context("db ids are never 0")?); let rpc_key_id =
Some(rpc_key_model.id.try_into().context("db ids are never 0")?);
Ok(AuthorizationChecks { Ok(AuthorizationChecks {
user_id: rpc_key_model.user_id,
rpc_secret_key: Some(rpc_secret_key),
rpc_secret_key_id: rpc_key_id,
allowed_ips, allowed_ips,
allowed_origins, allowed_origins,
allowed_referers, allowed_referers,
allowed_user_agents, allowed_user_agents,
tracking_level: rpc_key_model.log_level, latest_balance,
// TODO: is floating point math going to scale this correctly // TODO: is floating point math going to scale this correctly?
log_revert_chance: (rpc_key_model.log_revert_chance * u16::MAX as f64) as u16, log_revert_chance: (rpc_key_model.log_revert_chance * u16::MAX as f64)
as u16,
max_concurrent_requests: user_tier_model.max_concurrent_requests, max_concurrent_requests: user_tier_model.max_concurrent_requests,
max_requests_per_period: user_tier_model.max_requests_per_period, max_requests_per_period: user_tier_model.max_requests_per_period,
private_txs: rpc_key_model.private_txs, private_txs: rpc_key_model.private_txs,
proxy_mode, proxy_mode,
rpc_secret_key: Some(rpc_secret_key),
rpc_secret_key_id: rpc_key_id,
tracking_level: rpc_key_model.log_level,
user_id: rpc_key_model.user_id,
}) })
} }
None => Ok(AuthorizationChecks::default()), None => Ok(AuthorizationChecks::default()),
@ -1316,7 +1373,6 @@ impl Web3ProxyApp {
user_agent: Option<UserAgent>, user_agent: Option<UserAgent>,
) -> Web3ProxyResult<RateLimitResult> { ) -> Web3ProxyResult<RateLimitResult> {
let authorization_checks = self.authorization_checks(proxy_mode, rpc_key).await?; let authorization_checks = self.authorization_checks(proxy_mode, rpc_key).await?;
let balance = self.balance_checks(authorization_checks.user_id).await?;
// if no rpc_key_id matching the given rpc was found, then we can't rate limit by key // if no rpc_key_id matching the given rpc was found, then we can't rate limit by key
if authorization_checks.rpc_secret_key_id.is_none() { if authorization_checks.rpc_secret_key_id.is_none() {

@ -348,11 +348,12 @@ pub async fn user_balance_post(
} }
let x = json!({ let x = json!({
"tx_hash": tx_hash, "amount": payment_token_amount,
"block_hash": block_hash, "block_hash": block_hash,
"log_index": log_index, "log_index": log_index,
"recipient_account": recipient_account,
"token": payment_token_address, "token": payment_token_address,
"amount": payment_token_amount, "tx_hash": tx_hash,
}); });
debug!("deposit data: {:#?}", x); debug!("deposit data: {:#?}", x);

@ -105,17 +105,20 @@ pub async fn user_used_referral_stats(
let mut out: Vec<Info> = Vec::new(); let mut out: Vec<Info> = Vec::new();
for x in referrals.into_iter() { for x in referrals.into_iter() {
let (referral_record, referrer_record) = (x.0, x.1.context("each referral entity should have a referral code associated with it, but this is not the case!")?); let (referral_record, referrer_record) = (x.0, x.1.context("each referral entity should have a referral code associated with it, but this is not the case!")?);
// The foreign key is never optional
let referring_user = user::Entity::find_by_id(referrer_record.user_id) // // The foreign key is never optional
.one(db_replica.as_ref()) // let referring_user = user::Entity::find_by_id(referrer_record.user_id)
.await? // .one(db_replica.as_ref())
.context("Database error, no foreign key found for referring user")?; // .await?
// .context("Database error, no foreign key found for referring user")?;
let tmp = Info { let tmp = Info {
credits_applied_for_referee: referral_record.credits_applied_for_referee, credits_applied_for_referee: referral_record.credits_applied_for_referee,
credits_applied_for_referrer: referral_record.credits_applied_for_referrer, credits_applied_for_referrer: referral_record.credits_applied_for_referrer,
referral_start_date: referral_record.referral_start_date, referral_start_date: referral_record.referral_start_date,
used_referral_code: referrer_record.referral_code, used_referral_code: referrer_record.referral_code,
}; };
// Start inserting json's into this // Start inserting json's into this
out.push(tmp); out.push(tmp);
} }

@ -1331,7 +1331,6 @@ mod tests {
use log::{trace, LevelFilter}; use log::{trace, LevelFilter};
use moka::future::CacheBuilder; use moka::future::CacheBuilder;
use parking_lot::RwLock; use parking_lot::RwLock;
use tokio::sync::RwLock as AsyncRwLock;
#[cfg(test)] #[cfg(test)]
fn new_peak_latency() -> PeakEwmaLatency { fn new_peak_latency() -> PeakEwmaLatency {

@ -27,9 +27,9 @@ use migration::{Expr, LockType, OnConflict};
use num_traits::ToPrimitive; use num_traits::ToPrimitive;
use parking_lot::Mutex; use parking_lot::Mutex;
use std::num::NonZeroU64; use std::num::NonZeroU64;
use std::str::FromStr;
use std::sync::atomic::{self, Ordering}; use std::sync::atomic::{self, Ordering};
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::RwLock;
pub use stat_buffer::{SpawnedStatBuffer, StatBuffer}; pub use stat_buffer::{SpawnedStatBuffer, StatBuffer};
@ -57,26 +57,26 @@ pub struct RpcQueryStats {
pub response_timestamp: i64, pub response_timestamp: i64,
/// Credits used signifies how how much money was used up /// Credits used signifies how how much money was used up
pub credits_used: Decimal, pub credits_used: Decimal,
/// Last credits used
pub latest_balance: Arc<RwLock<Decimal>>,
} }
#[derive(Clone, Debug, From, Hash, PartialEq, Eq)] #[derive(Clone, Debug, From, Hash, PartialEq, Eq)]
pub struct RpcQueryKey { pub struct RpcQueryKey {
/// unix epoch time /// unix epoch time.
/// for the time series db, this is (close to) the time that the response was sent /// for the time series db, this is (close to) the time that the response was sent.
/// for the account database, this is rounded to the week /// for the account database, this is rounded to the week.
response_timestamp: i64, response_timestamp: i64,
/// true if an archive server was needed to serve the request /// true if an archive server was needed to serve the request.
archive_needed: bool, archive_needed: bool,
/// true if the response was some sort of JSONRPC error /// true if the response was some sort of JSONRPC error.
error_response: bool, error_response: bool,
/// method tracking is opt-in /// method tracking is opt-in.
method: Option<String>, method: Option<String>,
/// origin tracking is opt-in /// origin tracking is opt-in.
origin: Option<Origin>, origin: Option<Origin>,
/// None if the public url was used /// None if the public url was used.
rpc_secret_key_id: Option<NonZeroU64>, rpc_secret_key_id: Option<NonZeroU64>,
/// None if the public url was used.
rpc_key_user_id: Option<NonZeroU64>,
} }
/// round the unix epoch time to the start of a period /// round the unix epoch time to the start of a period
@ -126,6 +126,7 @@ impl RpcQueryStats {
error_response: self.error_response, error_response: self.error_response,
method, method,
rpc_secret_key_id, rpc_secret_key_id,
rpc_key_user_id: self.authorization.checks.user_id.try_into().ok(),
origin, origin,
} }
} }
@ -146,6 +147,7 @@ impl RpcQueryStats {
error_response: self.error_response, error_response: self.error_response,
method, method,
rpc_secret_key_id, rpc_secret_key_id,
rpc_key_user_id: self.authorization.checks.user_id.try_into().ok(),
origin, origin,
} }
} }
@ -177,6 +179,7 @@ impl RpcQueryStats {
error_response: self.error_response, error_response: self.error_response,
method, method,
rpc_secret_key_id: self.authorization.checks.rpc_secret_key_id, rpc_secret_key_id: self.authorization.checks.rpc_secret_key_id,
rpc_key_user_id: self.authorization.checks.user_id.try_into().ok(),
origin, origin,
}; };
@ -225,11 +228,6 @@ impl BufferedRpcQueryStats {
self.sum_response_bytes += stat.response_bytes; self.sum_response_bytes += stat.response_bytes;
self.sum_response_millis += stat.response_millis; self.sum_response_millis += stat.response_millis;
self.sum_credits_used += stat.credits_used; self.sum_credits_used += stat.credits_used;
// Also record the latest balance for this user ..
// Also subtract the used balance from the cache so we
// TODO: We are already using the cache. We could also inject the cache into save_tsdb
self.latest_balance = stat.latest_balance;
} }
async fn _save_db_stats( async fn _save_db_stats(
@ -242,8 +240,6 @@ impl BufferedRpcQueryStats {
) -> Web3ProxyResult<()> { ) -> Web3ProxyResult<()> {
let period_datetime = Utc.timestamp_opt(key.response_timestamp, 0).unwrap(); let period_datetime = Utc.timestamp_opt(key.response_timestamp, 0).unwrap();
// TODO: Could add last balance here (can take the element from the cache, and RpcQueryKey::AuthorizationCheck)
// =============================== // // =============================== //
// UPDATE STATISTICS // // UPDATE STATISTICS //
// =============================== // // =============================== //
@ -587,14 +583,14 @@ impl BufferedRpcQueryStats {
Ok(()) Ok(())
} }
/// Update & Invalidate cache if user is below 10$ credits (premium downgrade condition) /// Update & Invalidate cache if user is credits are low (premium downgrade condition)
/// Reduce credits if there was no issue /// Reduce credits if there was no issue
/// This is not atomic, so this may be an issue because it's not sequentially consistent across threads /// This is not atomic, so this may be an issue because it's not sequentially consistent across threads
/// It is a good-enough approximation though, and if the TTL for the balance cache is high enough, this should be ok /// It is a good-enough approximation though, and if the TTL for the balance cache is low enough, this should be ok
async fn _update_balance_in_cache( async fn _update_balance_in_cache(
&self, &self,
deltas: &Deltas, deltas: &Deltas,
txn: &DatabaseTransaction, db_conn: &DatabaseConnection,
sender_rpc_entity: &rpc_key::Model, sender_rpc_entity: &rpc_key::Model,
referral_objects: &Option<(referee::Model, referrer::Model)>, referral_objects: &Option<(referee::Model, referrer::Model)>,
rpc_secret_key_cache: &RpcSecretKeyCache, rpc_secret_key_cache: &RpcSecretKeyCache,
@ -603,47 +599,54 @@ impl BufferedRpcQueryStats {
// ================== // ==================
// Modify sender balance // Modify sender balance
// ================== // ==================
let sender_latest_balance = match NonZeroU64::try_from(sender_rpc_entity.user_id) { let user_id = NonZeroU64::try_from(sender_rpc_entity.user_id)
Err(_) => Err(Web3ProxyError::BadResponse( .expect("database ids are always nonzero");
"Balance is not positive, although it was previously checked to be as such!".into(),
)),
// We don't do an get_or_insert, because technically we don't have the most up to date balance // We don't do an get_or_insert, because technically we don't have the most up to date balance
// Also let's keep things simple in terms of writing and getting. A single place writes it, multiple places can remove / poll it // Also let's keep things simple in terms of writing and getting. A single place writes it, multiple places can remove / poll it
Ok(x) => Ok(user_balance_cache.get(&x)), let latest_balance = match user_balance_cache.get(&user_id) {
}?;
let sender_latest_balance = match sender_latest_balance {
Some(x) => x, Some(x) => x,
// If not in cache, nothing to update theoretically // If not in cache, nothing to update
None => return Ok(()), None => return Ok(()),
}; };
let mut latest_balance = sender_latest_balance.write().await;
let balance_before = *latest_balance;
// Now modify the balance
// TODO: Double check this (perhaps while testing...)
*latest_balance = *latest_balance - deltas.balance_spent_including_free_credits
+ deltas.usage_bonus_to_request_sender_through_referral;
if *latest_balance < Decimal::from(0) {
*latest_balance = Decimal::from(0);
}
// Also check if the referrer is premium (thought above 10$ will always be treated as premium at least) let (balance_before, latest_balance) = {
// Should only refresh cache if the premium threshold is crossed let mut latest_balance = latest_balance.write();
if balance_before > Decimal::from(0) && *latest_balance == Decimal::from(0) {
let balance_before = latest_balance.clone();
// Now modify the balance
latest_balance.total_deposit += deltas.usage_bonus_to_request_sender_through_referral;
latest_balance.total_spend += deltas.balance_spent_including_free_credits;
(balance_before, latest_balance.clone())
};
// we only start subtracting once the user is first upgraded to a premium user
// consider the user premium if total_deposit > premium threshold
// If the balance is getting low, clear the cache
// TODO: configurable amount for "premium"
// TODO: configurable amount for "low"
// we check balance_before because this current request would have been handled with limits matching the balance at the start of the request
if balance_before.total_deposit > Decimal::from(10)
&& latest_balance.remaining() <= Decimal::from(1)
{
let rpc_keys = rpc_key::Entity::find() let rpc_keys = rpc_key::Entity::find()
.filter(rpc_key::Column::UserId.eq(sender_rpc_entity.user_id)) .filter(rpc_key::Column::UserId.eq(sender_rpc_entity.user_id))
.all(txn) .all(db_conn)
.await?; .await?;
// clear the user from the cache
if let Ok(user_id) = NonZeroU64::try_from(sender_rpc_entity.user_id) {
user_balance_cache.invalidate(&user_id).await;
}
// clear all keys owned by this user from the cache
for rpc_key_entity in rpc_keys { for rpc_key_entity in rpc_keys {
// TODO: Not sure which one was inserted, just delete both ...
rpc_secret_key_cache rpc_secret_key_cache
.invalidate(&rpc_key_entity.secret_key.into()) .invalidate(&rpc_key_entity.secret_key.into())
.await; .await;
} }
if let Ok(non_zero_user_id) = NonZeroU64::try_from(sender_rpc_entity.user_id) {
user_balance_cache.invalidate(&non_zero_user_id).await;
}
} }
// ================== // ==================
@ -651,7 +654,8 @@ impl BufferedRpcQueryStats {
// ================== // ==================
// We ignore this for performance reasons right now // We ignore this for performance reasons right now
// We would have to load all the RPC keys of the referrer to de-activate them // We would have to load all the RPC keys of the referrer to de-activate them
// Instead, it's fine if they wait for 60 seconds until their tier reloads // Instead, it's fine if they wait for 60 seconds until their cache expires
// If they are getting low, they will refresh themselves if necessary and then they will see
// // If the referrer object is empty, we don't care about the cache, becase this will be fetched in a next request from the database // // If the referrer object is empty, we don't care about the cache, becase this will be fetched in a next request from the database
// if let Some((referral_entity, _)) = referral_objects { // if let Some((referral_entity, _)) = referral_objects {
// if let Ok(referrer_user_id) = NonZeroU64::try_from(referral_entity.user_id) { // if let Ok(referrer_user_id) = NonZeroU64::try_from(referral_entity.user_id) {
@ -710,7 +714,7 @@ impl BufferedRpcQueryStats {
let (sender_rpc_entity, _sender_balance, referral_objects) = let (sender_rpc_entity, _sender_balance, referral_objects) =
self._get_relevant_entities(rpc_secret_key_id, &txn).await?; self._get_relevant_entities(rpc_secret_key_id, &txn).await?;
// Compute Changes in balance for user and referrer, incl. referral logic // // Compute Changes in balance for user and referrer, incl. referral logic
let (deltas, referral_objects): (Deltas, Option<(referee::Model, referrer::Model)>) = self let (deltas, referral_objects): (Deltas, Option<(referee::Model, referrer::Model)>) = self
._compute_balance_deltas(_sender_balance, referral_objects) ._compute_balance_deltas(_sender_balance, referral_objects)
.await?; .await?;
@ -719,10 +723,16 @@ impl BufferedRpcQueryStats {
self._update_balances_in_db(&deltas, &txn, &sender_rpc_entity, &referral_objects) self._update_balances_in_db(&deltas, &txn, &sender_rpc_entity, &referral_objects)
.await?; .await?;
// Update balanaces in the cache // Finally commit the transaction in the database
txn.commit()
.await
.context("Failed to update referral and balance updates")?;
// Update balanaces in the cache.
// do this after commiting the database so that invalidated caches definitely query commited data
self._update_balance_in_cache( self._update_balance_in_cache(
&deltas, &deltas,
&txn, &db_conn,
&sender_rpc_entity, &sender_rpc_entity,
&referral_objects, &referral_objects,
rpc_secret_key_cache, rpc_secret_key_cache,
@ -730,11 +740,6 @@ impl BufferedRpcQueryStats {
) )
.await?; .await?;
// Finally commit the transaction in the database
txn.commit()
.await
.context("Failed to update referral and balance updates")?;
Ok(()) Ok(())
} }
@ -757,10 +762,7 @@ impl BufferedRpcQueryStats {
} }
// Read the latest balance ... // Read the latest balance ...
let balance; let remaining = self.latest_balance.read().remaining();
{
balance = *(self.latest_balance.read().await);
}
builder = builder builder = builder
.tag("archive_needed", key.archive_needed.to_string()) .tag("archive_needed", key.archive_needed.to_string())
@ -784,7 +786,7 @@ impl BufferedRpcQueryStats {
) )
.field( .field(
"balance", "balance",
balance.to_f64().context("number is really (too) large")?, remaining.to_f64().context("number is really (too) large")?,
); );
// .round() as i64 // .round() as i64
@ -864,8 +866,6 @@ impl TryFrom<RequestMetadata> for RpcQueryStats {
response_millis, response_millis,
response_timestamp, response_timestamp,
credits_used, credits_used,
// To we need to clone it here ... (?)
latest_balance: metadata.latest_balance.clone(),
}; };
Ok(x) Ok(x)
@ -882,18 +882,26 @@ impl RpcQueryStats {
cache_hit: bool, cache_hit: bool,
method: Option<&str>, method: Option<&str>,
) -> Decimal { ) -> Decimal {
// for now, always return 0 for cost
Decimal::new(0, 1)
/*
// some methods should be free. there might be cases where method isn't set (though they should be uncommon) // some methods should be free. there might be cases where method isn't set (though they should be uncommon)
// TODO: get this list from config (and add more to it) // TODO: get this list from config (and add more to it)
if let Some(method) = method.as_ref() { if let Some(method) = method.as_ref() {
if ["eth_chainId"].contains(method) { if [
"eth_chainId",
"eth_syncing",
"eth_protocolVersion",
"net_version",
"net_listening",
]
.contains(method)
{
return 0.into(); return 0.into();
} }
} }
// for now, always return a flat cost
return Decimal::from_str("0.000018").unwrap();
/*
// TODO: get cost_minimum, cost_free_bytes, cost_per_byte, cache_hit_divisor from config. each chain will be different // TODO: get cost_minimum, cost_free_bytes, cost_per_byte, cache_hit_divisor from config. each chain will be different
// pays at least $0.000018 / credits per request // pays at least $0.000018 / credits per request
let cost_minimum = Decimal::new(18, 6); let cost_minimum = Decimal::new(18, 6);

@ -1,6 +1,7 @@
use super::{AppStat, RpcQueryKey}; use super::{AppStat, RpcQueryKey};
use crate::app::{RpcSecretKeyCache, UserBalanceCache, Web3ProxyJoinHandle}; use crate::app::{RpcSecretKeyCache, UserBalanceCache, Web3ProxyJoinHandle};
use crate::errors::Web3ProxyResult; use crate::errors::Web3ProxyResult;
use crate::frontend::authorization::Balance;
use derive_more::From; use derive_more::From;
use futures::stream; use futures::stream;
use hashbrown::HashMap; use hashbrown::HashMap;
@ -8,9 +9,10 @@ use influxdb2::api::write::TimestampPrecision;
use log::{error, info, trace}; use log::{error, info, trace};
use migration::sea_orm::prelude::Decimal; use migration::sea_orm::prelude::Decimal;
use migration::sea_orm::DatabaseConnection; use migration::sea_orm::DatabaseConnection;
use parking_lot::RwLock;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use tokio::sync::{broadcast, RwLock}; use tokio::sync::broadcast;
use tokio::time::interval; use tokio::time::interval;
#[derive(Debug, Default)] #[derive(Debug, Default)]
@ -25,8 +27,8 @@ pub struct BufferedRpcQueryStats {
pub sum_response_bytes: u64, pub sum_response_bytes: u64,
pub sum_response_millis: u64, pub sum_response_millis: u64,
pub sum_credits_used: Decimal, pub sum_credits_used: Decimal,
/// Balance tells us the user's balance at this point in time /// The user's balance at this point in time. Multiple queries might be modifying it at once.
pub latest_balance: Arc<RwLock<Decimal>>, pub latest_balance: Arc<RwLock<Balance>>,
} }
#[derive(From)] #[derive(From)]