More balance tests (#182)

* fix popularity contest

* more info in the Debug for Web3Rpc

* add frontend_requests and cache_misses to the Balance query

* add more to balance and stats flushing and improved test coverage

* it compiles

* deserializer for Ulid to Uuid

I think a wrapper type on Ulid that implements sea_orm::Value is probably better

* rename variable to match struct name

* add deserializer for Address -> Vec<u8>

* sql sum returns a Decimal. need to convert to u64

* assert more

* one log and assert more

* log more

* use a helper to get the user's rpc provider

* this should be 2 now that we have a public and authed call

* this should be zero. the public has the cache miss

* instrument cu calcs

* trace the value we took, not the default that replaced it

* move usd_per_chain into config

* remove some extra logging

* use Arc::into_inner to maybe avoid a race

* off by 1

* pass paid credits used instead of returning it

this lets us use it to write to our user balance cache first. importantly, this keeps us from holding a write lock while writing to mysql

* no cache misses expected in this test

* actually check the admin

* put the balance checks back now that the rest of the test works

* archive request is being set incorrectly

* wow howd we manage flipping the greater than sign on archive depth

* move latest_balance and premium_credits_used to before any stats are emitted

* lint

* and build undoes the linting. fun

i didnt even want to lint them in the first place, so this is fine

* missed incrementing total_spent when not incrementing total_spent_paid_credits

* use the credits on self

* use the credits on self (pt 2)

* fix type for 10 cu query

* convert the requestmetadata on the other side of the channel

* logs

* viewing stats is allowed even without a balance

* move paid_credits_used to AuthorizationChecks

* wip

* test_sum_credits_used finally passes

* UserBalanceCache::get_or_insert

* re-enable rpc_secret_key_cache

* move invalidate to a helper function

and always call it **after** the db is commited

* fix PartialEq and Eq on RpcSecretKey

* cargo upgrade
This commit is contained in:
Bryan Stitt 2023-07-12 00:35:07 -07:00 committed by GitHub
parent 5f215facab
commit df2f3d340f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
45 changed files with 1011 additions and 564 deletions

16
Cargo.lock generated

@ -3460,9 +3460,9 @@ dependencies = [
[[package]]
name = "num"
version = "0.4.0"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "43db66d1170d347f9a065114077f7dccb00c1b9478c89384490a3425279a4606"
checksum = "b05180d69e3da0e530ba2a1dae5110317e49e3b7f3d41be227dc5f92e49ee7af"
dependencies = [
"num-bigint",
"num-complex",
@ -4133,9 +4133,9 @@ dependencies = [
[[package]]
name = "portable-atomic"
version = "1.3.3"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "767eb9f07d4a5ebcb39bbf2d452058a93c011373abf6832e24194a1c3f004794"
checksum = "d220334a184db82b31b83f5ff093e3315280fb2b6bbc032022b2304a509aab7a"
[[package]]
name = "ppv-lite86"
@ -4238,9 +4238,9 @@ checksum = "dc375e1527247fe1a97d8b7156678dfe7c1af2fc075c9a4db3690ecd2a148068"
[[package]]
name = "proc-macro2"
version = "1.0.63"
version = "1.0.64"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b368fba921b0dce7e60f5e04ec15e565b3303972b42bcfde1d0713b881959eb"
checksum = "78803b62cbf1f46fde80d7c0e803111524b9877184cfe7c3033659490ac7a7da"
dependencies = [
"unicode-ident",
]
@ -5463,9 +5463,9 @@ dependencies = [
[[package]]
name = "serde_json"
version = "1.0.100"
version = "1.0.102"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0f1e14e89be7aa4c4b78bdbdc9eb5bf8517829a600ae8eaa39a6e1d960b5185c"
checksum = "b5062a995d481b2308b6064e9af76011f2921c35f97b0468811ed9f6cd91dfed"
dependencies = [
"itoa",
"ryu",

@ -10,7 +10,10 @@ pub struct Model {
#[sea_orm(primary_key)]
pub id: u64,
#[sea_orm(column_type = "Binary(BlobSize::Blob(Some(16)))", unique)]
#[serde(serialize_with = "serialization::uuid_as_ulid")]
#[serde(
serialize_with = "serialization::uuid_as_ulid",
deserialize_with = "serialization::ulid_to_uuid"
)]
pub bearer_token: Uuid,
pub user_id: u64,
pub expires_at: DateTimeUtc,

@ -10,7 +10,10 @@ pub struct Model {
#[sea_orm(primary_key)]
pub id: u64,
#[sea_orm(column_type = "Binary(BlobSize::Blob(Some(16)))", unique)]
#[serde(serialize_with = "serialization::uuid_as_ulid")]
#[serde(
serialize_with = "serialization::uuid_as_ulid",
deserialize_with = "serialization::ulid_to_uuid"
)]
pub nonce: Uuid,
#[sea_orm(column_type = "Text")]
pub message: String,

@ -14,7 +14,10 @@ pub struct Model {
pub timestamp: DateTimeUtc,
pub method: Method,
#[sea_orm(column_type = "Binary(BlobSize::Blob(Some(20)))")]
#[serde(serialize_with = "serialization::vec_as_address")]
#[serde(
serialize_with = "serialization::vec_as_address",
deserialize_with = "serialization::address_to_vec"
)]
pub to: Vec<u8>,
#[sea_orm(column_type = "Text", nullable)]
pub call_data: Option<String>,

@ -11,7 +11,10 @@ pub struct Model {
pub id: u64,
pub user_id: u64,
#[sea_orm(column_type = "Binary(BlobSize::Blob(Some(16)))", unique)]
#[serde(serialize_with = "serialization::uuid_as_ulid")]
#[serde(
serialize_with = "serialization::uuid_as_ulid",
deserialize_with = "serialization::ulid_to_uuid"
)]
pub secret_key: Uuid,
pub description: Option<String>,
pub private_txs: bool,

@ -1,7 +1,7 @@
//! sea-orm types don't always serialize how we want. this helps that, though it won't help every case.
use ethers::prelude::Address;
use sea_orm::prelude::Uuid;
use serde::{Serialize, Serializer};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use std::convert::TryInto;
use ulid::Ulid;
@ -19,6 +19,12 @@ where
x.serialize(s)
}
pub fn address_to_vec<'de, D: Deserializer<'de>>(deserializer: D) -> Result<Vec<u8>, D::Error> {
let address = Address::deserialize(deserializer)?;
Ok(address.to_fixed_bytes().into())
}
pub fn uuid_as_ulid<S>(x: &Uuid, s: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
@ -28,3 +34,9 @@ where
// TODO: to_string shouldn't be needed, but i'm still seeing Uuid length
x.to_string().serialize(s)
}
pub fn ulid_to_uuid<'de, D: Deserializer<'de>>(deserializer: D) -> Result<Uuid, D::Error> {
let ulid = Ulid::deserialize(deserializer)?;
Ok(ulid.into())
}

@ -10,7 +10,10 @@ pub struct Model {
#[sea_orm(primary_key)]
pub id: u64,
#[sea_orm(column_type = "Binary(BlobSize::Blob(Some(20)))", unique)]
#[serde(serialize_with = "serialization::vec_as_address")]
#[serde(
serialize_with = "serialization::vec_as_address",
deserialize_with = "serialization::address_to_vec"
)]
pub address: Vec<u8>,
pub description: Option<String>,
pub email: Option<String>,

@ -7,7 +7,7 @@ edition = "2021"
[dependencies]
log = "0.4.19"
portable-atomic = { version = "1.3.3", features = ["float"] }
portable-atomic = { version = "1.4.0", features = ["float"] }
serde = { version = "1.0.171", features = [] }
tokio = { version = "1.29.1", features = ["full"] }
tracing = "0.1.37"

@ -67,7 +67,7 @@ listenfd = "1.0.1"
mimalloc = { version = "0.1.37", optional = true}
moka = { version = "0.11.2", default-features = false, features = ["atomic64", "future", "parking_lot", "quanta", "triomphe"] }
nanorand = { version = "0.7.0", default-features = false, features = ["std", "tls", "wyrand"] }
num = { version = "0.4.0" }
num = { version = "0.4.1" }
num-traits = "0.2.15"
once_cell = { version = "1.18.0" }
ordered-float = {version = "3.7.0" }
@ -83,7 +83,7 @@ rust_decimal = { version = "1.30.0", features = ["maths"] }
sentry = { version = "0.31.5", default-features = false, features = ["anyhow", "backtrace", "contexts", "panic", "reqwest", "rustls", "serde_json", "tracing"] }
sentry-tracing = "0.31.5"
serde = { version = "1.0.171" }
serde_json = { version = "1.0.100", default-features = false, features = ["raw_value"] }
serde_json = { version = "1.0.102", default-features = false, features = ["raw_value"] }
serde_prometheus = "0.2.3"
strum = { version = "0.25.0", features = ["derive"] }
time_01 = { package = "time", version = "0.1.45" }

@ -22,7 +22,7 @@ use crate::rpcs::many::Web3Rpcs;
use crate::rpcs::one::Web3Rpc;
use crate::rpcs::provider::{connect_http, EthersHttpProvider};
use crate::rpcs::transactions::TxStatus;
use crate::stats::{AppStat, StatBuffer};
use crate::stats::{AppStat, FlushedStats, StatBuffer};
use anyhow::Context;
use axum::http::StatusCode;
use chrono::Utc;
@ -50,7 +50,7 @@ use std::str::FromStr;
use std::sync::atomic::{AtomicU16, Ordering};
use std::sync::{atomic, Arc};
use std::time::Duration;
use tokio::sync::{broadcast, mpsc, watch, Semaphore, oneshot};
use tokio::sync::{broadcast, mpsc, oneshot, watch, Semaphore};
use tokio::task::JoinHandle;
use tokio::time::timeout;
use tracing::{error, info, trace, warn, Level};
@ -99,7 +99,8 @@ pub struct Web3ProxyApp {
/// rate limit anonymous users
pub frontend_ip_rate_limiter: Option<DeferredRateLimiter<IpAddr>>,
/// rate limit authenticated users
pub frontend_registered_user_rate_limiter: Option<DeferredRateLimiter<RegisteredUserRateLimitKey>>,
pub frontend_registered_user_rate_limiter:
Option<DeferredRateLimiter<RegisteredUserRateLimitKey>>,
/// concurrent/parallel request limits for anonymous users
pub ip_semaphores: Cache<IpAddr, Arc<Semaphore>>,
pub kafka_producer: Option<rdkafka::producer::FutureProducer>,
@ -179,8 +180,8 @@ impl Web3ProxyApp {
top_config: TopConfig,
num_workers: usize,
shutdown_sender: broadcast::Sender<()>,
flush_stat_buffer_sender: mpsc::Sender<oneshot::Sender<(usize, usize)>>,
flush_stat_buffer_receiver: mpsc::Receiver<oneshot::Sender<(usize, usize)>>,
flush_stat_buffer_sender: mpsc::Sender<oneshot::Sender<FlushedStats>>,
flush_stat_buffer_receiver: mpsc::Receiver<oneshot::Sender<FlushedStats>>,
) -> anyhow::Result<Web3ProxyAppSpawn> {
let stat_buffer_shutdown_receiver = shutdown_sender.subscribe();
let mut background_shutdown_receiver = shutdown_sender.subscribe();
@ -372,10 +373,11 @@ impl Web3ProxyApp {
.build();
// TODO: TTL left low, this could also be a solution instead of modifiying the cache, that may be disgusting across threads / slow anyways
let user_balance_cache = CacheBuilder::new(10_000)
let user_balance_cache: UserBalanceCache = CacheBuilder::new(10_000)
.name("user_balance")
.time_to_live(Duration::from_secs(600))
.build();
.build()
.into();
// create a channel for receiving stats
// we do this in a channel so we don't slow down our response to the users
@ -434,9 +436,8 @@ impl Web3ProxyApp {
// these two rate limiters can share the base limiter
// these are deferred rate limiters because we don't want redis network requests on the hot path
// TODO: take cache_size from config
frontend_ip_rate_limiter = Some(
DeferredRateLimiter::new(20_000, "ip", rpc_rrl.clone(), None).await,
);
frontend_ip_rate_limiter =
Some(DeferredRateLimiter::new(20_000, "ip", rpc_rrl.clone(), None).await);
frontend_registered_user_rate_limiter =
Some(DeferredRateLimiter::new(20_000, "key", rpc_rrl, None).await);
}
@ -698,7 +699,9 @@ impl Web3ProxyApp {
}
pub fn influxdb_client(&self) -> Web3ProxyResult<&influxdb2::Client> {
self.influxdb_client.as_ref().ok_or(Web3ProxyError::NoDatabase)
self.influxdb_client
.as_ref()
.ok_or(Web3ProxyError::NoDatabase)
}
/// an ethers provider that you can use with ether's abigen.
@ -1140,15 +1143,19 @@ impl Web3ProxyApp {
.await
{
Ok(response_data) => {
request_metadata.error_response.store(false, Ordering::Release);
request_metadata
.error_response
.store(false, Ordering::Release);
(StatusCode::OK, response_data)
},
}
Err(err) => {
request_metadata.error_response.store(true, Ordering::Release);
request_metadata
.error_response
.store(true, Ordering::Release);
err.as_response_parts()
},
}
};
let response = JsonRpcForwardedResponse::from_response_data(response_data, response_id);
@ -1158,6 +1165,9 @@ impl Web3ProxyApp {
let rpcs = request_metadata.backend_rpcs_used();
// there might be clones in the background, so this isn't a sure thing
let _ = request_metadata.try_send_arc_stat();
(code, response, rpcs)
}
@ -1486,7 +1496,7 @@ impl Web3ProxyApp {
Err(Web3ProxyError::NoDatabase) => {},
Err(err) => {
warn!(
?err,
?err,
"unable to save stats for eth_sendRawTransaction",
)
}
@ -1620,7 +1630,9 @@ impl Web3ProxyApp {
} => {
let block_depth = (head_block.number().saturating_sub(*block.num())).as_u64();
if block_depth < self.config.archive_depth {
if block_depth > self.config.archive_depth {
trace!(%block_depth, archive_depth=%self.config.archive_depth);
request_metadata
.archive_request
.store(true, atomic::Ordering::Release);
@ -1641,7 +1653,9 @@ impl Web3ProxyApp {
} => {
let block_depth = (head_block.number().saturating_sub(*from_block.num())).as_u64();
if block_depth < self.config.archive_depth {
if block_depth > self.config.archive_depth {
trace!(%block_depth, archive_depth=%self.config.archive_depth);
request_metadata
.archive_request
.store(true, atomic::Ordering::Release);

@ -8,16 +8,22 @@ use migration::sea_orm::DbConn;
use migration::sea_orm::{ColumnTrait, EntityTrait, QueryFilter, QuerySelect};
use migration::{Func, SimpleExpr};
use serde::ser::SerializeStruct;
use serde::Serialize;
use serde::{Deserialize, Serialize};
use serde_json::json;
use tracing::info;
/// Implements the balance getter which combines data from several tables
#[derive(Clone, Debug, Default)]
#[derive(Clone, Debug, Default, Deserialize)]
pub struct Balance {
pub admin_deposits: Decimal,
pub chain_deposits: Decimal,
pub referal_bonus: Decimal,
pub one_time_referee_bonus: Decimal,
pub referal_bonus: Decimal,
pub stripe_deposits: Decimal,
pub total_cache_misses: u64,
pub total_frontend_requests: u64,
/// this includes credits spent inside a "free" or "downgraded" tier
/// this always increments and so will always be >= total_spent_paid_credits
pub total_spent: Decimal,
pub total_spent_paid_credits: Decimal,
pub user_id: u64,
@ -28,13 +34,15 @@ impl Serialize for Balance {
where
S: serde::Serializer,
{
let mut state = serializer.serialize_struct("balance", 12)?;
let mut state = serializer.serialize_struct("balance", 14)?;
state.serialize_field("admin_deposits", &self.admin_deposits)?;
state.serialize_field("chain_deposits", &self.chain_deposits)?;
state.serialize_field("referal_bonus", &self.referal_bonus)?;
state.serialize_field("one_time_referee_bonus", &self.one_time_referee_bonus)?;
state.serialize_field("referal_bonus", &self.referal_bonus)?;
state.serialize_field("stripe_deposits", &self.stripe_deposits)?;
state.serialize_field("total_cache_misses", &self.total_cache_misses)?;
state.serialize_field("total_frontend_requests", &self.total_frontend_requests)?;
state.serialize_field("total_spent", &self.total_spent)?;
state.serialize_field("total_spent_paid_credits", &self.total_spent_paid_credits)?;
state.serialize_field("user_id", &self.user_id)?;
@ -124,30 +132,45 @@ impl Balance {
.web3_context("fetching stripe deposits")?
.unwrap_or_default();
let (total_spent_paid_credits, total_spent) = rpc_accounting_v2::Entity::find()
.select_only()
.column_as(
SimpleExpr::from(Func::coalesce([
rpc_accounting_v2::Column::SumCreditsUsed.sum(),
0.into(),
])),
"total_spent_paid_credits",
)
.column_as(
SimpleExpr::from(Func::coalesce([
rpc_accounting_v2::Column::SumInclFreeCreditsUsed.sum(),
0.into(),
])),
"total_spent",
)
.inner_join(rpc_key::Entity)
// .filter(rpc_key::Column::Id.eq(rpc_accounting_v2::Column::RpcKeyId)) // TODO: i think the inner_join function handles this
.filter(rpc_key::Column::UserId.eq(user_id))
.into_tuple()
.one(db_conn)
.await
.web3_context("fetching total_spent_paid_credits and total_spent")?
.unwrap_or_default();
let (total_cache_misses, total_frontend_requests, total_spent_paid_credits, total_spent) =
rpc_accounting_v2::Entity::find()
.select_only()
.column_as(
SimpleExpr::from(Func::coalesce([
rpc_accounting_v2::Column::CacheMisses.sum(),
0.into(),
])),
"total_cache_misses",
)
.column_as(
SimpleExpr::from(Func::coalesce([
rpc_accounting_v2::Column::FrontendRequests.sum(),
0.into(),
])),
"total_frontend_requests",
)
.column_as(
SimpleExpr::from(Func::coalesce([
rpc_accounting_v2::Column::SumCreditsUsed.sum(),
0.into(),
])),
"total_spent_paid_credits",
)
.column_as(
SimpleExpr::from(Func::coalesce([
rpc_accounting_v2::Column::SumInclFreeCreditsUsed.sum(),
0.into(),
])),
"total_spent",
)
.inner_join(rpc_key::Entity)
// .filter(rpc_key::Column::Id.eq(rpc_accounting_v2::Column::RpcKeyId)) // TODO: i think the inner_join function handles this
.filter(rpc_key::Column::UserId.eq(user_id))
.into_tuple::<(Decimal, Decimal, Decimal, Decimal)>()
.one(db_conn)
.await
.web3_context("fetching total_spent_paid_credits and total_spent")?
.unwrap_or_default();
let one_time_referee_bonus = referee::Entity::find()
.select_only()
@ -179,17 +202,25 @@ impl Balance {
.web3_context("fetching referal bonus")?
.unwrap_or_default();
let total_cache_misses: u64 = total_cache_misses.try_into()?;
let total_frontend_requests: u64 = total_frontend_requests.try_into()?;
let balance = Self {
admin_deposits,
chain_deposits,
referal_bonus,
one_time_referee_bonus,
stripe_deposits,
total_cache_misses,
total_frontend_requests,
total_spent,
total_spent_paid_credits,
user_id,
};
// TODO: lower log level
info!("balance: {:#}", json!(&balance));
// Return None if there is no entry
Ok(Some(balance))
}

@ -1,15 +1,19 @@
use crate::balance::Balance;
use crate::errors::Web3ProxyResult;
use crate::frontend::authorization::{AuthorizationChecks, RpcSecretKey};
use moka::future::Cache;
use derive_more::From;
use entities::rpc_key;
use migration::sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter};
use moka::future::{Cache, ConcurrentCacheExt};
use std::fmt;
use std::net::IpAddr;
use std::sync::Arc;
use tokio::sync::RwLock as AsyncRwLock;
use tracing::trace;
/// Cache data from the database about rpc keys
/// TODO: try Ulid/u128 instead of RpcSecretKey in case my hash method is broken
pub type RpcSecretKeyCache = Cache<RpcSecretKey, AuthorizationChecks>;
/// Cache data from the database about user balances
pub type UserBalanceCache = Cache<u64, Arc<AsyncRwLock<Balance>>>;
#[derive(Clone, Copy, Hash, Eq, PartialEq)]
pub struct RegisteredUserRateLimitKey(pub u64, pub IpAddr);
@ -19,3 +23,65 @@ impl std::fmt::Display for RegisteredUserRateLimitKey {
write!(f, "{}-{}", self.0, self.1)
}
}
/// Cache data from the database about user balances
#[derive(Clone, From)]
pub struct UserBalanceCache(pub Cache<u64, Arc<AsyncRwLock<Balance>>>);
impl UserBalanceCache {
pub async fn get_or_insert(
&self,
db_conn: &DatabaseConnection,
user_id: u64,
) -> Web3ProxyResult<Arc<AsyncRwLock<Balance>>> {
if user_id == 0 {
return Ok(Arc::new(AsyncRwLock::new(Balance::default())));
}
let x = self
.0
.try_get_with(user_id, async move {
let x = match Balance::try_from_db(db_conn, user_id).await? {
None => Balance {
user_id,
..Default::default()
},
Some(x) => x,
};
trace!(?x, "from database");
Ok(Arc::new(AsyncRwLock::new(x)))
})
.await?;
Ok(x)
}
pub async fn invalidate(
&self,
user_id: &u64,
db_conn: &DatabaseConnection,
rpc_secret_key_cache: &RpcSecretKeyCache,
) -> Web3ProxyResult<()> {
self.0.invalidate(user_id).await;
trace!(%user_id, "invalidating");
// Remove all RPC-keys owned by this user from the cache, s.t. rate limits are re-calculated
let rpc_keys = rpc_key::Entity::find()
.filter(rpc_key::Column::UserId.eq(*user_id))
.all(db_conn)
.await?;
for rpc_key_entity in rpc_keys {
let rpc_key_id = rpc_key_entity.id;
let secret_key = rpc_key_entity.secret_key.into();
trace!(%user_id, %rpc_key_id, ?secret_key, "invalidating");
rpc_secret_key_cache.invalidate(&secret_key).await;
}
Ok(())
}
}

@ -8,12 +8,23 @@
use migration::sea_orm::prelude::Decimal;
use std::str::FromStr;
use tracing::warn;
use tracing::{instrument, trace, warn};
pub fn default_usd_per_cu(chain_id: u64) -> Decimal {
match chain_id {
// TODO: only include if `cfg(test)`?
999_001_999 => Decimal::from_str("0.10").unwrap(),
137 => Decimal::from_str("0.000000533333333333333").unwrap(),
_ => Decimal::from_str("0.000000400000000000000").unwrap(),
}
}
#[derive(Debug)]
pub struct ComputeUnit(Decimal);
impl ComputeUnit {
/// costs can vary widely depending on method and chain
#[instrument(level = "trace")]
pub fn new(method: &str, chain_id: u64, response_bytes: u64) -> Self {
// TODO: this works, but this is fragile. think of a better way to check the method is a subscription
if method.ends_with(')') {
@ -123,11 +134,14 @@ impl ComputeUnit {
let cu = Decimal::from(cu);
trace!(%cu);
Self(cu)
}
/// notifications and subscription responses cost per-byte
pub fn subscription_response<D: Into<Decimal>>(num_bytes: D) -> Self {
#[instrument(level = "trace")]
pub fn subscription_response<D: Into<Decimal> + std::fmt::Debug>(num_bytes: D) -> Self {
let cu = num_bytes.into() * Decimal::new(4, 2);
Self(cu)
@ -141,28 +155,40 @@ impl ComputeUnit {
/// Compute cost per request
/// All methods cost the same
/// The number of bytes are based on input, and output bytes
#[instrument(level = "trace")]
pub fn cost(
&self,
archive_request: bool,
cache_hit: bool,
error_response: bool,
usd_per_cu: Decimal,
usd_per_cu: &Decimal,
) -> Decimal {
if error_response {
trace!("error responses are free");
return 0.into();
}
let mut cost = self.0 * usd_per_cu;
trace!(%cost, "base");
if archive_request {
// TODO: get from config
cost *= Decimal::from_str("2.5").unwrap();
trace!(%cost, "archive_request");
}
// cache hits get a 25% discount
if cache_hit {
cost *= Decimal::from_str("0.75").unwrap()
// cache hits get a 25% discount
// TODO: get from config
cost *= Decimal::from_str("0.75").unwrap();
trace!(%cost, "cache_hit");
}
trace!(%cost, "final");
cost
}
}

@ -5,6 +5,7 @@ use argh::FromArgs;
use ethers::prelude::{Address, TxHash};
use ethers::types::{U256, U64};
use hashbrown::HashMap;
use migration::sea_orm::prelude::Decimal;
use migration::sea_orm::DatabaseConnection;
use sentry::types::Dsn;
use serde::Deserialize;
@ -170,6 +171,8 @@ pub struct AppConfig {
/// Stripe api key for checking validity of webhooks
pub stripe_whsec_key: Option<String>,
pub usd_per_cu: Option<Decimal>,
/// Track rate limits in a redis (or compatible backend)
/// It is okay if this data is lost.
pub volatile_redis_url: Option<String>,

@ -17,8 +17,7 @@ use axum_client_ip::InsecureClientIp;
use axum_macros::debug_handler;
use chrono::{TimeZone, Utc};
use entities::{
admin, admin_increase_balance_receipt, admin_trail, login, pending_login, rpc_key,
user,
admin, admin_increase_balance_receipt, admin_trail, login, pending_login, rpc_key, user,
};
use ethers::{prelude::Address, types::Bytes};
use hashbrown::HashMap;
@ -85,7 +84,13 @@ pub async fn admin_increase_balance(
txn.commit().await?;
// Invalidate the user_balance_cache for this user:
app.user_balance_cache.invalidate(&user_entry.id).await;
if let Err(err) = app
.user_balance_cache
.invalidate(&user_entry.id, app.db_conn()?, &app.rpc_secret_key_cache)
.await
{
warn!(?err, "unable to invalidate caches");
};
let out = json!({
"user": payload.user_address,
@ -327,7 +332,7 @@ pub async fn admin_imitate_login_post(
let db_replica = app.db_replica()?;
let user_pending_login = pending_login::Entity::find()
.filter(pending_login::Column::Nonce.eq(Uuid::from(login_nonce.clone())))
.filter(pending_login::Column::Nonce.eq(Uuid::from(login_nonce)))
.one(db_replica.as_ref())
.await
.web3_context("database error while finding pending_login")?

@ -8,7 +8,7 @@ use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult};
use crate::jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest};
use crate::rpcs::blockchain::Web3ProxyBlock;
use crate::rpcs::one::Web3Rpc;
use crate::stats::{AppStat, BackendRequests, RpcQueryStats};
use crate::stats::{AppStat, BackendRequests};
use crate::user_token::UserBearerToken;
use anyhow::Context;
use axum::headers::authorization::Bearer;
@ -27,6 +27,7 @@ use http::HeaderValue;
use ipnet::IpNet;
use migration::sea_orm::prelude::Decimal;
use migration::sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter};
use parking_lot::Mutex;
use rdkafka::message::{Header as KafkaHeader, OwnedHeaders as KafkaOwnedHeaders, OwnedMessage};
use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::util::Timeout as KafkaTimeout;
@ -34,6 +35,7 @@ use redis_rate_limiter::redis::AsyncCommands;
use redis_rate_limiter::RedisRateLimitResult;
use serde::{Deserialize, Serialize};
use std::borrow::Cow;
use std::fmt::Debug;
use std::fmt::Display;
use std::hash::{Hash, Hasher};
use std::mem;
@ -45,18 +47,52 @@ use tokio::sync::RwLock as AsyncRwLock;
use tokio::sync::{mpsc, OwnedSemaphorePermit, Semaphore};
use tokio::task::JoinHandle;
use tokio::time::Instant;
use tracing::{error, trace, warn};
use tracing::{error, info, trace, warn};
use ulid::Ulid;
use uuid::Uuid;
/// This lets us use UUID and ULID while we transition to only ULIDs
/// TODO: custom deserialize that can also go from String to Ulid
#[derive(Copy, Clone, Debug, Deserialize, Eq, PartialEq)]
#[derive(Copy, Clone, Deserialize)]
pub enum RpcSecretKey {
Ulid(Ulid),
Uuid(Uuid),
}
impl RpcSecretKey {
pub fn new() -> Self {
Ulid::new().into()
}
fn as_128(&self) -> u128 {
match self {
Self::Ulid(x) => x.0,
Self::Uuid(x) => x.as_u128(),
}
}
}
impl PartialEq for RpcSecretKey {
fn eq(&self, other: &Self) -> bool {
self.as_128() == other.as_128()
}
}
impl Eq for RpcSecretKey {}
impl Debug for RpcSecretKey {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Ulid(x) => Debug::fmt(x, f),
Self::Uuid(x) => {
let x = Ulid::from(x.as_u128());
Debug::fmt(&x, f)
}
}
}
}
/// always serialize as a ULID.
impl Serialize for RpcSecretKey {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
@ -127,6 +163,10 @@ pub struct AuthorizationChecks {
/// IMPORTANT! Once confirmed by a miner, they will be public on the blockchain!
pub private_txs: bool,
pub proxy_mode: ProxyMode,
/// if the account had premium when this request metadata was created
/// they might spend slightly more than they've paid, but we are okay with that
/// TODO: we could price the request now and if its too high, downgrade. but thats more complex than we need
pub paid_credits_used: bool,
}
/// TODO: include the authorization checks in this?
@ -153,10 +193,7 @@ pub struct KafkaDebugLogger {
/// Ulids and Uuids matching the same bits hash the same
impl Hash for RpcSecretKey {
fn hash<H: Hasher>(&self, state: &mut H) {
let x = match self {
Self::Ulid(x) => x.0,
Self::Uuid(x) => x.as_u128(),
};
let x = self.as_128();
x.hash(state);
}
@ -308,6 +345,8 @@ pub struct RequestMetadata {
pub chain_id: u64,
pub usd_per_cu: Decimal,
pub request_ulid: Ulid,
/// Size of the JSON request. Does not include headers or things like that.
@ -362,17 +401,6 @@ impl RequestMetadata {
.map(|x| x.checks.proxy_mode)
.unwrap_or_default()
}
/// this may drift slightly if multiple servers are handling the same users, but should be close
pub async fn latest_balance(&self) -> Option<Decimal> {
if let Some(x) = self.authorization.as_ref() {
let x = x.checks.latest_balance.read().await.remaining();
Some(x)
} else {
None
}
}
}
#[derive(From)]
@ -482,11 +510,13 @@ impl RequestMetadata {
}
}
let chain_id = app.config.chain_id;
let x = Self {
archive_request: false.into(),
authorization: Some(authorization),
backend_requests: Default::default(),
chain_id: app.config.chain_id,
chain_id,
error_response: false.into(),
kafka_debug_logger,
method,
@ -499,6 +529,7 @@ impl RequestMetadata {
response_timestamp: 0.into(),
start_instant: Instant::now(),
stat_sender: app.stat_sender.clone(),
usd_per_cu: app.config.usd_per_cu.unwrap_or_default(),
user_error_response: false.into(),
};
@ -509,13 +540,11 @@ impl RequestMetadata {
self.backend_requests.lock().clone()
}
pub fn try_send_stat(mut self) -> Web3ProxyResult<Option<Self>> {
pub fn try_send_stat(mut self) -> Web3ProxyResult<()> {
if let Some(stat_sender) = self.stat_sender.take() {
trace!(?self, "sending stat");
let stat: RpcQueryStats = self.try_into()?;
let stat: AppStat = stat.into();
let stat: AppStat = self.into();
if let Err(err) = stat_sender.send(stat) {
error!(?err, "failed sending stat");
@ -524,11 +553,9 @@ impl RequestMetadata {
};
trace!("stat sent successfully");
Ok(None)
} else {
Ok(Some(self))
}
Ok(())
}
pub fn add_response<'a, R: Into<ResponseOrBytes<'a>>>(&'a self, response: R) {
@ -556,18 +583,12 @@ impl RequestMetadata {
}
}
pub fn try_send_arc_stat(self: Arc<Self>) -> anyhow::Result<Option<Arc<Self>>> {
match Arc::try_unwrap(self) {
Ok(x) => {
let not_sent = x.try_send_stat()?.map(Arc::new);
Ok(not_sent)
}
Err(not_sent) => {
trace!(
"could not send stat while {} arcs are active",
Arc::strong_count(&not_sent)
);
Ok(Some(not_sent))
pub fn try_send_arc_stat(self: Arc<Self>) -> Web3ProxyResult<()> {
match Arc::into_inner(self) {
Some(x) => x.try_send_stat(),
None => {
trace!("could not send stat while other arcs are active");
Ok(())
}
}
}
@ -582,18 +603,12 @@ impl Drop for RequestMetadata {
// turn `&mut self` into `self`
let x = mem::take(self);
trace!(?self, "request metadata dropped without stat send");
trace!(?x, "request metadata dropped without stat send");
let _ = x.try_send_stat();
}
}
}
impl RpcSecretKey {
pub fn new() -> Self {
Ulid::new().into()
}
}
impl Default for RpcSecretKey {
fn default() -> Self {
Self::new()
@ -605,7 +620,7 @@ impl Display for RpcSecretKey {
// TODO: do this without dereferencing
let ulid: Ulid = (*self).into();
ulid.fmt(f)
Display::fmt(&ulid, f)
}
}
@ -1126,37 +1141,25 @@ impl Web3ProxyApp {
}
}
/// Get the balance for the user.
///
/// If a subuser calls this function, the subuser needs to have first attained the user_id that the rpc key belongs to.
/// This function should be called anywhere where balance is required (i.e. only rpc calls, I believe ... non-rpc calls don't really require balance)
pub(crate) async fn balance_checks(
&self,
user_id: u64,
) -> Web3ProxyResult<Arc<AsyncRwLock<Balance>>> {
self.user_balance_cache
.try_get_with(user_id, async move {
let db_replica = self.db_replica()?;
let x = match Balance::try_from_db(db_replica.as_ref(), user_id).await? {
None => Err(Web3ProxyError::InvalidUserKey),
Some(x) => Ok(x),
}?;
trace!("Balance for cache retrieved from database is {:?}", x);
Ok(Arc::new(AsyncRwLock::new(x)))
})
.await
.map_err(Into::into)
}
// check the local cache for user data, or query the database
pub(crate) async fn authorization_checks(
&self,
proxy_mode: ProxyMode,
rpc_secret_key: &RpcSecretKey,
) -> Web3ProxyResult<AuthorizationChecks> {
self.rpc_secret_key_cache
// TODO: move onto a helper function
let fresh = Arc::new(Mutex::new(false));
let fresh_clone = fresh.clone();
let x = self
.rpc_secret_key_cache
.try_get_with_by_ref(rpc_secret_key, async move {
// trace!(?rpc_secret_key, "user cache miss");
{
let mut f = fresh.lock_arc();
*f = true;
}
let db_replica = self.db_replica()?;
@ -1248,16 +1251,24 @@ impl Web3ProxyApp {
"related user tier not found, but every user should have a tier",
)?;
let latest_balance = self.balance_checks(rpc_key_model.user_id).await?;
let latest_balance = self
.user_balance_cache
.get_or_insert(db_replica.as_ref(), rpc_key_model.user_id)
.await?;
// TODO: Do the logic here, as to how to treat the user, based on balance and initial check
// Clear the cache (not the login!) in the stats if a tier-change happens (clear, but don't modify roles)
let paid_credits_used: bool;
if let Some(downgrade_user_tier) = user_tier_model.downgrade_tier_id {
trace!("user belongs to a premium tier. checking balance");
let active_premium = latest_balance.read().await.active_premium();
// only consider the user premium if they have paid at least $10 and have a balance > $.01
// otherwise, set user_tier_model to the downograded tier
if !active_premium {
if active_premium {
paid_credits_used = true;
} else {
paid_credits_used = false;
// TODO: include boolean to mark that the user is downgraded
user_tier_model =
user_tier::Entity::find_by_id(downgrade_user_tier)
@ -1268,6 +1279,8 @@ impl Web3ProxyApp {
downgrade_user_tier
))?;
}
} else {
paid_credits_used = false;
}
let rpc_key_id =
@ -1289,13 +1302,21 @@ impl Web3ProxyApp {
rpc_secret_key: Some(*rpc_secret_key),
rpc_secret_key_id: rpc_key_id,
user_id: rpc_key_model.user_id,
paid_credits_used,
})
}
None => Ok(AuthorizationChecks::default()),
}
})
.await
.map_err(Into::into)
.await?;
if *fresh_clone.lock() {
info!(?rpc_secret_key, "authorization_checks miss");
} else {
info!(?rpc_secret_key, "authorization_checks hit");
}
Ok(x)
}
/// Authorized the ip/origin/referer/useragent and rate limit and concurrency

@ -76,7 +76,7 @@ async fn _proxy_web3_rpc(
let mut response = (status_code, Json(response)).into_response();
// TODO: DRY this up. same for public and private queries
// TODO: DRY this up. it is the same code for public and private queries
let response_headers = response.headers_mut();
// TODO: this might be slow. think about this more

@ -199,7 +199,7 @@ async fn _status(app: Arc<Web3ProxyApp>) -> (StatusCode, &'static str, Bytes) {
MokaCacheSerializer(&app.ip_semaphores),
MokaCacheSerializer(&app.jsonrpc_response_cache),
MokaCacheSerializer(&app.rpc_secret_key_cache),
MokaCacheSerializer(&app.user_balance_cache),
MokaCacheSerializer(&app.user_balance_cache.0),
MokaCacheSerializer(&app.user_semaphores),
],
"chain_id": app.config.chain_id,

@ -22,8 +22,8 @@ use migration::sea_orm::{
QueryFilter, TransactionTrait,
};
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use siwe::{Message, VerificationOpts};
use std::collections::BTreeMap;
use std::ops::Add;
use std::str::FromStr;
use std::sync::Arc;
@ -49,14 +49,11 @@ pub struct PostLogin {
pub referral_code: Option<String>,
}
/// TODO: use this type in the frontend
#[derive(Debug, Deserialize)]
#[derive(Debug, Deserialize, Serialize)]
pub struct LoginPostResponse {
pub bearer_token: Ulid,
pub rpc_keys: Value,
/// unknown data gets put here
#[serde(flatten, default = "HashMap::default")]
pub extra: HashMap<String, serde_json::Value>,
pub bearer_token: UserBearerToken,
pub rpc_keys: BTreeMap<u64, rpc_key::Model>,
pub user: user::Model,
}
/// `GET /user/login/:user_address` or `GET /user/login/:user_address/:message_eip` -- Start the "Sign In with Ethereum" (siwe) login flow.
@ -267,7 +264,7 @@ pub async fn user_login_post(
let db_replica = app.db_replica()?;
let user_pending_login = pending_login::Entity::find()
.filter(pending_login::Column::Nonce.eq(Uuid::from(login_nonce.clone())))
.filter(pending_login::Column::Nonce.eq(Uuid::from(login_nonce)))
.one(db_replica.as_ref())
.await
.web3_context("database error while finding pending_login")?
@ -325,7 +322,7 @@ pub async fn user_login_post(
trace!(?payload.referral_code);
if let Some(referral_code) = payload.referral_code.as_ref() {
// If it is not inside, also check in the database
trace!("Using register referral code: {:?}", referral_code);
trace!("Using register referral code: {:?}", referral_code);
let user_referrer = referrer::Entity::find()
.filter(referrer::Column::ReferralCode.eq(referral_code))
.one(&txn)
@ -394,19 +391,6 @@ pub async fn user_login_post(
// create a bearer token for the user.
let user_bearer_token = UserBearerToken::default();
// json response with everything in it
// we could return just the bearer token, but I think they will always request api keys and the user profile
let response_json = json!({
"rpc_keys": user_rpc_keys
.into_iter()
.map(|user_rpc_key| (user_rpc_key.id, user_rpc_key))
.collect::<HashMap<_, _>>(),
"bearer_token": user_bearer_token,
"user": caller,
});
let response = (status_code, Json(response_json)).into_response();
// add bearer to the database
// expire in 4 weeks
@ -431,6 +415,19 @@ pub async fn user_login_post(
error!("Failed to delete nonce:{}: {}", login_nonce, err);
}
// json response with everything in it
// we could return just the bearer token, but I think they will always request api keys and the user profile
let response_json = LoginPostResponse {
rpc_keys: user_rpc_keys
.into_iter()
.map(|user_rpc_key| (user_rpc_key.id, user_rpc_key))
.collect(),
bearer_token: user_bearer_token,
user: caller,
};
let response = (status_code, Json(response_json)).into_response();
Ok(response)
}

@ -15,7 +15,7 @@ use axum::{
use axum_client_ip::InsecureClientIp;
use axum_macros::debug_handler;
use entities::{
admin_increase_balance_receipt, increase_on_chain_balance_receipt, rpc_key,
admin_increase_balance_receipt, increase_on_chain_balance_receipt,
stripe_increase_balance_receipt, user,
};
use ethers::abi::AbiEncode;
@ -30,7 +30,7 @@ use payment_contracts::ierc20::IERC20;
use payment_contracts::payment_factory::{self, PaymentFactory};
use serde_json::json;
use std::sync::Arc;
use tracing::{debug, info, trace};
use tracing::{debug, info, trace, warn};
/// Implements any logic related to payments
/// Removed this mainly from "user" as this was getting clogged
@ -304,6 +304,7 @@ pub async fn user_balance_post(
// the transaction might contain multiple relevant logs. collect them all
let mut response_data = vec![];
let mut user_ids_to_invalidate = HashSet::new();
for log in transaction_receipt.logs {
if let Some(true) = log.removed {
// TODO: do we need to make sure this row is deleted? it should be handled by `handle_uncle_block`
@ -390,19 +391,7 @@ pub async fn user_balance_post(
receipt.save(&txn).await?;
// Remove all RPC-keys owned by this user from the cache, s.t. rate limits are re-calculated
let rpc_keys = rpc_key::Entity::find()
.filter(rpc_key::Column::UserId.eq(recipient.id))
.all(&txn)
.await?;
app.user_balance_cache.invalidate(&recipient.id).await;
for rpc_key_entity in rpc_keys {
app.rpc_secret_key_cache
.invalidate(&rpc_key_entity.secret_key.into())
.await;
}
user_ids_to_invalidate.insert(recipient.id);
let x = json!({
"amount": payment_token_amount,
@ -421,6 +410,17 @@ pub async fn user_balance_post(
txn.commit().await?;
for user_id in user_ids_to_invalidate.into_iter() {
// Finally invalidate the cache as well
if let Err(err) = app
.user_balance_cache
.invalidate(&user_id, db_conn, &app.rpc_secret_key_cache)
.await
{
warn!(?err, "unable to invalidate caches");
};
}
let response = (StatusCode::CREATED, Json(json!(response_data))).into_response();
Ok(response)

@ -7,7 +7,7 @@ use axum::{
Extension, Json, TypedHeader,
};
use axum_macros::debug_handler;
use entities::{rpc_key, stripe_increase_balance_receipt, user};
use entities::{stripe_increase_balance_receipt, user};
use ethers::types::Address;
use http::HeaderMap;
use migration::sea_orm::prelude::Decimal;
@ -17,7 +17,7 @@ use migration::sea_orm::{
use serde_json::json;
use std::sync::Arc;
use stripe::Webhook;
use tracing::{debug, error};
use tracing::{debug, error, warn};
/// `GET /user/balance/stripe` -- Use a bearer token to get the user's balance and spend.
///
@ -163,20 +163,16 @@ pub async fn user_balance_stripe_post(
Some(recipient) => {
let _ = insert_receipt_model.save(&txn).await;
let recipient_rpc_keys = rpc_key::Entity::find()
.filter(rpc_key::Column::UserId.eq(recipient.id))
.all(&txn)
.await?;
txn.commit().await?;
// Finally invalidate the cache as well
app.user_balance_cache.invalidate(&recipient.id).await;
for rpc_key_entity in recipient_rpc_keys {
app.rpc_secret_key_cache
.invalidate(&rpc_key_entity.secret_key.into())
.await;
}
if let Err(err) = app
.user_balance_cache
.invalidate(&recipient.id, db_conn, &app.rpc_secret_key_cache)
.await
{
warn!(?err, "unable to invalidate caches");
};
}
None => {
return Err(Web3ProxyError::BadResponse(

@ -1,5 +1,6 @@
#![feature(let_chains)]
#![feature(trait_alias)]
#![forbid(unsafe_code)]
pub mod admin_queries;
pub mod app;

@ -1,10 +1,8 @@
use crate::{
block_number::BlockNumAndHash, errors::Web3ProxyError, jsonrpc::JsonRpcErrorData,
};
use crate::{block_number::BlockNumAndHash, errors::Web3ProxyError, jsonrpc::JsonRpcErrorData};
use derive_more::From;
use ethers::{
providers::{HttpClientError, JsonRpcError, ProviderError, WsClientError},
types::{U64},
types::U64,
};
use hashbrown::hash_map::DefaultHashBuilder;
use moka::future::Cache;

@ -14,14 +14,13 @@ use itertools::{Itertools, MinMaxResult};
use moka::future::Cache;
use serde::Serialize;
use std::cmp::{Ordering, Reverse};
use std::fmt;
use std::sync::{atomic, Arc};
use std::time::Duration;
use tokio::sync::broadcast;
use tokio::time::Instant;
use tracing::{debug, enabled, info, trace, warn, Level};
#[derive(Clone, Serialize)]
#[derive(Clone, Debug, Serialize)]
struct ConsensusRpcData {
head_block_num: U64,
// TODO: this is too simple. erigon has 4 prune levels (hrct)
@ -98,7 +97,7 @@ pub enum ShouldWaitForBlock {
/// TODO: remove head_block/head_rpcs/tier and replace with one RankedRpcMap
/// TODO: add `best_rpc(method_data_kind, min_block_needed, max_block_needed, include_backups)`
/// TODO: make serializing work. the key needs to be a string. I think we need `serialize_with`
#[derive(Clone, Serialize)]
#[derive(Clone, Debug, Serialize)]
pub struct RankedRpcs {
pub head_block: Web3ProxyBlock,
pub num_synced: usize,
@ -212,9 +211,9 @@ impl RankedRpcs {
self.inner.is_empty()
}
/// TODO! we should also keep the number on the head block saved
#[inline]
pub fn num_consensus_rpcs(&self) -> usize {
// TODO! wrong! this is now the total num of rpcs. we should keep the number on the head block saved
pub fn num_active_rpcs(&self) -> usize {
self.inner.len()
}
@ -340,13 +339,6 @@ impl RankedRpcs {
// TODO: sum_hard_limit?
}
impl fmt::Debug for RankedRpcs {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// TODO: the default formatter takes forever to write. we need to print something though
f.debug_struct("RankedRpcs").finish_non_exhaustive()
}
}
// TODO: refs for all of these. borrow on a Sender is cheap enough
impl Web3Rpcs {
pub fn head_block(&self) -> Option<Web3ProxyBlock> {
@ -435,7 +427,7 @@ impl ConsensusFinder {
rpc: Option<&Arc<Web3Rpc>>,
new_block: Option<Web3ProxyBlock>,
) -> Web3ProxyResult<bool> {
let new_consensus_rpcs = match self
let new_ranked_rpcs = match self
.find_consensus_connections(authorization, web3_rpcs)
.await
.web3_context("error while finding consensus head block!")?
@ -444,23 +436,23 @@ impl ConsensusFinder {
Some(x) => x,
};
trace!(?new_consensus_rpcs);
trace!(?new_ranked_rpcs);
let watch_consensus_head_sender = web3_rpcs.watch_head_block.as_ref().unwrap();
// TODO: think more about the default for tiers
let best_tier = self.best_tier().unwrap_or_default();
let worst_tier = self.worst_tier().unwrap_or_default();
let backups_needed = new_consensus_rpcs.backups_needed;
let consensus_head_block = new_consensus_rpcs.head_block.clone();
let num_consensus_rpcs = new_consensus_rpcs.num_consensus_rpcs();
let backups_needed = new_ranked_rpcs.backups_needed;
let consensus_head_block = new_ranked_rpcs.head_block.clone();
let num_consensus_rpcs = new_ranked_rpcs.num_active_rpcs();
let num_active_rpcs = self.len();
let total_rpcs = web3_rpcs.len();
let new_consensus_rpcs = Arc::new(new_consensus_rpcs);
let new_ranked_rpcs = Arc::new(new_ranked_rpcs);
let old_consensus_head_connections = web3_rpcs
let old_ranked_rpcs = web3_rpcs
.watch_ranked_rpcs
.send_replace(Some(new_consensus_rpcs.clone()));
.send_replace(Some(new_ranked_rpcs.clone()));
let backups_voted_str = if backups_needed { "B " } else { "" };
@ -476,7 +468,7 @@ impl ConsensusFinder {
"None".to_string()
};
match old_consensus_head_connections.as_ref() {
match old_ranked_rpcs.as_ref() {
None => {
info!(
"first {}/{} {}{}/{}/{} block={}, rpc={}",

@ -1267,6 +1267,22 @@ impl fmt::Debug for Web3Rpc {
f.field("blocks", &block_data_limit);
}
f.field("backup", &self.backup);
f.field("tier", &self.tier.load(atomic::Ordering::Relaxed));
f.field("weighted_ms", &self.weighted_peak_latency().as_millis());
if let Some(head_block_watch) = self.head_block.as_ref() {
if let Some(head_block) = head_block_watch.borrow().as_ref() {
f.field("head_num", head_block.number());
f.field("head_hash", head_block.hash());
} else {
f.field("head_num", &None::<()>);
f.field("head_hash", &None::<()>);
}
}
f.finish_non_exhaustive()
}
}

@ -1,5 +1,4 @@
use super::StatType;
use crate::balance::Balance;
use crate::errors::Web3ProxyErrorContext;
use crate::{
app::Web3ProxyApp,
@ -16,7 +15,7 @@ use axum::{
Json, TypedHeader,
};
use entities::sea_orm_active_enums::Role;
use entities::{rpc_key, secondary_user, user, user_tier};
use entities::{rpc_key, secondary_user};
use fstrings::{f, format_args_f};
use hashbrown::HashMap;
use influxdb2::api::query::FluxRecord;
@ -59,40 +58,6 @@ pub async fn query_user_stats<'a>(
// Only allow stats if the user has an active premium role
if let Some(caller_user) = &caller_user {
// get the balance of the user whose stats we are going to fetch (might be self, but might be another user)
let balance = match Balance::try_from_db(db_replica.as_ref(), user_id).await? {
None => {
return Err(Web3ProxyError::AccessDenied(
"User Stats Response requires you to authorize with a bearer token".into(),
));
}
Some(x) => x,
};
// get the user tier so we can see if it is a tier that has downgrades
let relevant_balance_user_tier_id = if user_id == caller_user.id {
// use the caller's tier
caller_user.user_tier_id
} else {
// use the tier of the primary user from a secondary user
let user = user::Entity::find_by_id(user_id)
.one(db_replica.as_ref())
.await?
.web3_context("user_id not found")?;
user.user_tier_id
};
let user_tier = user_tier::Entity::find_by_id(relevant_balance_user_tier_id)
.one(db_replica.as_ref())
.await?
.web3_context("user_tier not found")?;
if user_tier.downgrade_tier_id.is_some() && !balance.active_premium() {
trace!(%user_id, "User does not have enough balance to qualify for premium");
return Err(Web3ProxyError::PaymentRequired);
}
if user_id != caller_user.id {
// check that there is at least on rpc-keys owned by the requested user and related to the caller user
let user_rpc_key_ids: Vec<u64> = rpc_key::Entity::find()

@ -15,27 +15,23 @@ use anyhow::{anyhow, Context};
use axum::headers::Origin;
use chrono::{DateTime, Months, TimeZone, Utc};
use derive_more::From;
use entities::{referee, referrer, rpc_accounting_v2, rpc_key};
use entities::{referee, referrer, rpc_accounting_v2};
use influxdb2::models::DataPoint;
use migration::sea_orm::prelude::Decimal;
use migration::sea_orm::{
self, ActiveModelTrait, ColumnTrait, DatabaseConnection, DbConn, EntityTrait, IntoActiveModel,
QueryFilter, TransactionTrait,
self, ActiveModelTrait, ColumnTrait, DatabaseConnection, EntityTrait, IntoActiveModel,
QueryFilter, QuerySelect, TransactionTrait,
};
use migration::{Expr, OnConflict};
use migration::{Expr, LockType, OnConflict};
use num_traits::ToPrimitive;
use parking_lot::Mutex;
use std::borrow::Cow;
use std::default::Default;
use std::mem;
use std::num::NonZeroU64;
use std::str::FromStr;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use tokio::sync::RwLock as AsyncRwLock;
use tracing::{error, trace};
use tracing::{error, instrument, trace, warn};
use crate::balance::Balance;
pub use stat_buffer::{SpawnedStatBuffer, StatBuffer};
#[derive(Debug, PartialEq, Eq)]
@ -46,6 +42,12 @@ pub enum StatType {
pub type BackendRequests = Mutex<Vec<Arc<Web3Rpc>>>;
#[derive(Copy, Clone, Debug)]
pub struct FlushedStats {
pub relational: usize,
pub timeseries: usize,
}
/// TODO: better name? RpcQueryStatBuilder?
#[derive(Clone, Debug)]
pub struct RpcQueryStats {
@ -66,6 +68,8 @@ pub struct RpcQueryStats {
pub compute_unit_cost: Decimal,
/// If the request is invalid or received a jsonrpc error response (excluding reverts)
pub user_error_response: bool,
/// If premium was active at the start of the request
pub paid_credits_used: bool,
}
#[derive(Clone, Debug, From, Hash, PartialEq, Eq)]
@ -103,6 +107,7 @@ impl RpcQueryStats {
fn accounting_key(&self, period_seconds: i64) -> RpcQueryKey {
let response_timestamp = round_timestamp(self.response_timestamp, period_seconds);
// TODO: change this to use 0 for anonymous queries
let rpc_secret_key_id = self.authorization.checks.rpc_secret_key_id;
let method = self.method.clone();
@ -151,6 +156,10 @@ impl RpcQueryStats {
/// stats for a single key
fn owned_timeseries_key(&self) -> Option<RpcQueryKey> {
if !self.paid_credits_used {
return None;
}
// we don't store origin in the timeseries db. its only optionaly used for accounting
let origin = None;
@ -184,12 +193,13 @@ impl RpcQueryStats {
/// For now there is just one, but I think there might be others later
#[derive(Debug, From)]
pub enum AppStat {
RpcQuery(RpcQueryStats),
RpcQuery(RequestMetadata),
}
// TODO: move to stat_buffer.rs?
impl BufferedRpcQueryStats {
async fn add(&mut self, stat: RpcQueryStats) {
#[instrument(level = "trace")]
async fn add(&mut self, stat: RpcQueryStats, approximate_balance_remaining: Decimal) {
// a stat always come from just 1 frontend request
self.frontend_requests += 1;
@ -212,8 +222,13 @@ impl BufferedRpcQueryStats {
self.sum_response_millis += stat.response_millis;
self.sum_credits_used += stat.compute_unit_cost;
let latest_balance = stat.authorization.checks.latest_balance.read().await;
self.approximate_latest_balance_for_influx = latest_balance.clone();
if stat.authorization.checks.paid_credits_used {
self.paid_credits_used += stat.compute_unit_cost;
}
self.approximate_balance_remaining = approximate_balance_remaining;
trace!("added");
}
async fn _save_db_stats(
@ -221,20 +236,9 @@ impl BufferedRpcQueryStats {
chain_id: u64,
db_conn: &DatabaseConnection,
key: &RpcQueryKey,
active_premium: bool,
) -> Web3ProxyResult<Decimal> {
) -> Web3ProxyResult<()> {
let period_datetime = Utc.timestamp_opt(key.response_timestamp, 0).unwrap();
// // Because reading the balance and updating the stats here is not atomically locked, this may lead to a negative balance
// // This negative balance shouldn't be large tough
// // TODO: I'm not so sure about this. @david can you explain more? if someone spends over their balance, they **should** go slightly negative. after all, they would have received the premium limits for these queries
// // sum_credits_used is definitely correct. the balance can be slightly off. so it seems like we should trust sum_credits_used over balance
let paid_credits_used = if active_premium {
self.sum_credits_used
} else {
0.into()
};
// =============================== //
// UPDATE STATISTICS //
// =============================== //
@ -254,7 +258,7 @@ impl BufferedRpcQueryStats {
sum_request_bytes: sea_orm::Set(self.sum_request_bytes),
sum_response_millis: sea_orm::Set(self.sum_response_millis),
sum_response_bytes: sea_orm::Set(self.sum_response_bytes),
sum_credits_used: sea_orm::Set(paid_credits_used),
sum_credits_used: sea_orm::Set(self.paid_credits_used),
sum_incl_free_credits_used: sea_orm::Set(self.sum_credits_used),
};
@ -313,7 +317,7 @@ impl BufferedRpcQueryStats {
(
rpc_accounting_v2::Column::SumCreditsUsed,
Expr::col(rpc_accounting_v2::Column::SumCreditsUsed)
.add(paid_credits_used),
.add(self.paid_credits_used),
),
])
.to_owned(),
@ -321,34 +325,7 @@ impl BufferedRpcQueryStats {
.exec(db_conn)
.await?;
Ok(self.sum_credits_used)
}
// TODO: This is basically a duplicate with the balance_checks, except the database
// TODO: Please refactor this. Also there are small differences, like the Error is 0
async fn _get_user_balance(
&self,
user_id: u64,
user_balance_cache: &UserBalanceCache,
db_conn: &DbConn,
) -> Web3ProxyResult<Arc<AsyncRwLock<Balance>>> {
if user_id == 0 {
return Ok(Arc::new(AsyncRwLock::new(Balance::default())));
}
trace!("Will get it from the balance cache");
let x = user_balance_cache
.try_get_with(user_id, async {
let x = match Balance::try_from_db(db_conn, user_id).await? {
Some(x) => x,
None => return Err(Web3ProxyError::InvalidUserKey),
};
Ok(Arc::new(AsyncRwLock::new(x)))
})
.await?;
Ok(x)
Ok(())
}
// TODO: take a db transaction instead so that we can batch?
@ -357,8 +334,8 @@ impl BufferedRpcQueryStats {
chain_id: u64,
db_conn: &DatabaseConnection,
key: RpcQueryKey,
rpc_secret_key_cache: &RpcSecretKeyCache,
user_balance_cache: &UserBalanceCache,
rpc_secret_key_cache: &RpcSecretKeyCache,
) -> Web3ProxyResult<()> {
// Sanity check, if we need to save stats
if key.response_timestamp == 0 {
@ -372,52 +349,16 @@ impl BufferedRpcQueryStats {
// TODO: rename to owner_id?
let sender_user_id = key.rpc_key_user_id.map_or(0, |x| x.get());
// Gathering cache and database rows
let user_balance = self
._get_user_balance(sender_user_id, user_balance_cache, db_conn)
.await?;
// save the statistics to the database:
self._save_db_stats(chain_id, db_conn, &key).await?;
let mut user_balance = user_balance.write().await;
let premium_before = user_balance.active_premium();
// First of all, save the statistics to the database:
let paid_credits_used = self
._save_db_stats(chain_id, db_conn, &key, premium_before)
.await?;
// No need to continue if no credits were used
if self.sum_credits_used == 0.into() {
// write-lock is released
return Ok(());
}
// Update and possible invalidate rpc caches if necessary (if there was a downgrade)
{
user_balance.total_spent_paid_credits += paid_credits_used;
// Invalidate caches if remaining is getting close to $0
// It will be re-fetched again if needed
if premium_before && user_balance.remaining() < Decimal::from(1) {
let rpc_keys = rpc_key::Entity::find()
.filter(rpc_key::Column::UserId.eq(sender_user_id))
.all(db_conn)
.await?;
// clear all keys owned by this user from the cache
for rpc_key_entity in rpc_keys {
rpc_secret_key_cache
.invalidate(&rpc_key_entity.secret_key.into())
.await;
}
}
}
if premium_before {
// Apply all the referral logic; let's keep it simple and flat for now
if self.paid_credits_used > 0.into() {
// Start a transaction
let txn = db_conn.begin().await?;
// Apply all the referral logic; let's keep it simple and flat for now
let mut invalidate_caches = false;
// Calculate if we are above the usage threshold, and apply a bonus
// Optimally we would read this from the balance, but if we do it like this, we only have to lock a single table (much safer w.r.t. deadlocks)
// referral_entity.credits_applied_for_referrer * (Decimal::from(10) checks (atomically using this table only), whether the user has brought in >$100 to the referer
@ -425,6 +366,7 @@ impl BufferedRpcQueryStats {
// Apply a 10$ bonus onto the user, if the user has spent 100$
// TODO: i think we do want a LockType::Update on this
match referee::Entity::find()
.lock(LockType::Update)
.filter(referee::Column::UserId.eq(sender_user_id))
.find_also_related(referrer::Entity)
.one(&txn)
@ -432,8 +374,8 @@ impl BufferedRpcQueryStats {
{
Some((referral_entity, Some(referrer))) => {
// Get the balance for the referrer, see if they're premium or not
let referrer_balance = self
._get_user_balance(referrer.user_id, user_balance_cache, db_conn)
let referrer_balance = user_balance_cache
.get_or_insert(db_conn, referrer.user_id)
.await?;
// Just to keep locking simple, read and clone. if the value is slightly delayed, that is okay
@ -467,9 +409,9 @@ impl BufferedRpcQueryStats {
referral_entity.one_time_bonus_applied_for_referee =
sea_orm::Set(bonus_for_user);
// Update the cache
// TODO: race condition here?
user_balance.one_time_referee_bonus += bonus_for_user;
// writing here with `+= 10` has a race unless we lock outside of the mysql query (and thats just too slow)
// so instead we just invalidate the cache (after writing to mysql)
invalidate_caches = true;
}
let now = Utc::now();
@ -481,7 +423,10 @@ impl BufferedRpcQueryStats {
// TODO: Perhaps let's not worry about the referral cache here, to avoid deadlocks (hence only reading)
if now <= valid_until {
let referrer_bonus = self.sum_credits_used / Decimal::from(10);
// TODO: make this configurable (and change all the other hard coded places for 10%)
let referrer_bonus = self.paid_credits_used / Decimal::from(10);
// there is a LockType::Update on this that should keep any raises incrementing this
referral_entity.credits_applied_for_referrer = sea_orm::Set(
referral_entity.credits_applied_for_referrer.as_ref()
+ referrer_bonus,
@ -505,10 +450,19 @@ impl BufferedRpcQueryStats {
_ => {}
};
// Finally commit the transaction in the database
// Finally, commit the transaction in the database
txn.commit()
.await
.context("Failed to update referral and balance updates")?;
if invalidate_caches {
if let Err(err) = user_balance_cache
.invalidate(&sender_user_id, db_conn, rpc_secret_key_cache)
.await
{
warn!(?err, "unable to invalidate caches");
};
}
}
Ok(())
@ -530,10 +484,6 @@ impl BufferedRpcQueryStats {
builder = builder.tag("method", key.method);
// Read the latest balance ...
let remaining = self.approximate_latest_balance_for_influx.remaining();
trace!("Remaining balance for influx is {:?}", remaining);
builder = builder
.tag("archive_needed", key.archive_needed.to_string())
.tag("error_response", key.error_response.to_string())
@ -554,7 +504,7 @@ impl BufferedRpcQueryStats {
)
.field(
"balance",
remaining
self.approximate_balance_remaining
.to_f64()
.context("balance is really (too) large")?,
);
@ -569,10 +519,11 @@ impl BufferedRpcQueryStats {
}
}
impl TryFrom<RequestMetadata> for RpcQueryStats {
type Error = Web3ProxyError;
fn try_from(mut metadata: RequestMetadata) -> Result<Self, Self::Error> {
/// this is **intentionally** not a TryFrom<Arc<RequestMetadata>>
/// We want this to run when there is **one and only one** copy of this RequestMetadata left
/// There are often multiple copies if a request is being sent to multiple servers in parallel
impl RpcQueryStats {
fn try_from_metadata(mut metadata: RequestMetadata) -> Web3ProxyResult<Self> {
let mut authorization = metadata.authorization.take();
if authorization.is_none() {
@ -620,18 +571,19 @@ impl TryFrom<RequestMetadata> for RpcQueryStats {
let cu = ComputeUnit::new(&metadata.method, metadata.chain_id, response_bytes);
// TODO: get from config? a helper function? how should we pick this?
let usd_per_cu = match metadata.chain_id {
137 => Decimal::from_str("0.000000533333333333333"),
_ => Decimal::from_str("0.000000400000000000000"),
}?;
let cache_hit = backend_rpcs_used.is_empty();
let cache_hit = !backend_rpcs_used.is_empty();
let compute_unit_cost = cu.cost(archive_request, cache_hit, error_response, usd_per_cu);
let compute_unit_cost = cu.cost(
archive_request,
cache_hit,
error_response,
&metadata.usd_per_cu,
);
let method = mem::take(&mut metadata.method);
let paid_credits_used = authorization.checks.paid_credits_used;
let x = Self {
archive_request,
authorization,
@ -640,6 +592,7 @@ impl TryFrom<RequestMetadata> for RpcQueryStats {
compute_unit_cost,
error_response,
method,
paid_credits_used,
request_bytes,
response_bytes,
response_millis,

@ -1,8 +1,8 @@
use super::{AppStat, RpcQueryKey};
use super::{AppStat, FlushedStats, RpcQueryKey};
use crate::app::Web3ProxyJoinHandle;
use crate::balance::Balance;
use crate::caches::{RpcSecretKeyCache, UserBalanceCache};
use crate::errors::Web3ProxyResult;
use crate::stats::RpcQueryStats;
use derive_more::From;
use futures::stream;
use hashbrown::HashMap;
@ -12,7 +12,7 @@ use migration::sea_orm::DatabaseConnection;
use std::time::Duration;
use tokio::sync::{broadcast, mpsc, oneshot};
use tokio::time::{interval, sleep};
use tracing::{error, info, trace};
use tracing::{error, info, trace, warn};
#[derive(Debug, Default)]
pub struct BufferedRpcQueryStats {
@ -27,8 +27,11 @@ pub struct BufferedRpcQueryStats {
pub sum_response_millis: u64,
pub sum_credits_used: Decimal,
pub sum_cu_used: Decimal,
/// The user's balance at this point in time. Multiple queries might be modifying it at once.
pub approximate_latest_balance_for_influx: Balance,
pub paid_credits_used: Decimal,
/// The user's balance at this point in time.
/// Multiple queries might be modifying it at once, so this is a copy of it when received
/// None if this is an unauthenticated request
pub approximate_balance_remaining: Decimal,
}
#[derive(From)]
@ -53,7 +56,7 @@ pub struct StatBuffer {
tsdb_save_interval_seconds: u32,
user_balance_cache: UserBalanceCache,
_flush_sender: mpsc::Sender<oneshot::Sender<(usize, usize)>>,
_flush_sender: mpsc::Sender<oneshot::Sender<FlushedStats>>,
}
impl StatBuffer {
@ -69,8 +72,8 @@ impl StatBuffer {
user_balance_cache: Option<UserBalanceCache>,
shutdown_receiver: broadcast::Receiver<()>,
tsdb_save_interval_seconds: u32,
flush_sender: mpsc::Sender<oneshot::Sender<(usize, usize)>>,
flush_receiver: mpsc::Receiver<oneshot::Sender<(usize, usize)>>,
flush_sender: mpsc::Sender<oneshot::Sender<FlushedStats>>,
flush_receiver: mpsc::Receiver<oneshot::Sender<FlushedStats>>,
) -> anyhow::Result<Option<SpawnedStatBuffer>> {
if influxdb_bucket.is_none() {
influxdb_client = None;
@ -115,7 +118,7 @@ impl StatBuffer {
&mut self,
mut stat_receiver: mpsc::UnboundedReceiver<AppStat>,
mut shutdown_receiver: broadcast::Receiver<()>,
mut flush_receiver: mpsc::Receiver<oneshot::Sender<(usize, usize)>>,
mut flush_receiver: mpsc::Receiver<oneshot::Sender<FlushedStats>>,
) -> Web3ProxyResult<()> {
let mut tsdb_save_interval =
interval(Duration::from_secs(self.tsdb_save_interval_seconds as u64));
@ -127,22 +130,71 @@ impl StatBuffer {
stat = stat_receiver.recv() => {
// trace!("Received stat");
// save the stat to a buffer
// TODO: tokio spawn this!
match stat {
Some(AppStat::RpcQuery(stat)) => {
Some(AppStat::RpcQuery(request_metadata)) => {
// we convert on this side of the channel so that we don't slow down the request
let stat = RpcQueryStats::try_from_metadata(request_metadata)?;
// update the latest balance
// do this BEFORE emitting any stats
let mut approximate_balance_remaining = 0.into();
if let Some(db_conn) = self.db_conn.as_ref() {
let user_id = stat.authorization.checks.user_id;
// update the user's balance
if user_id != 0 {
// update the user's cached balance
let mut user_balance = stat.authorization.checks.latest_balance.write().await;
// TODO: move this to a helper function
user_balance.total_frontend_requests += 1;
user_balance.total_spent += stat.compute_unit_cost;
if !stat.backend_rpcs_used.is_empty() {
user_balance.total_cache_misses += 1;
}
// if paid_credits_used is true, then they were premium at the start of the request
if stat.authorization.checks.paid_credits_used {
// TODO: this lets them get a negative remaining balance. we should clear if close to 0
user_balance.total_spent_paid_credits += stat.compute_unit_cost;
// check if they still have premium
if user_balance.active_premium() {
// TODO: referall credits here? i think in the save_db section still makes sense for those
} else if let Err(err) = self.user_balance_cache.invalidate(&user_balance.user_id, db_conn, &self.rpc_secret_key_cache).await {
// was premium, but isn't anymore due to paying for this query. clear the cache
// TODO: stop at <$0.000001 instead of negative?
warn!(?err, "unable to clear caches");
}
} else if user_balance.active_premium() {
// paid credits were not used, but now we have active premium. invalidate the caches
// TODO: this seems unliekly. should we warn if this happens so we can investigate?
if let Err(err) = self.user_balance_cache.invalidate(&user_balance.user_id, db_conn, &self.rpc_secret_key_cache).await {
// was premium, but isn't anymore due to paying for this query. clear the cache
// TODO: stop at <$0.000001 instead of negative?
warn!(?err, "unable to clear caches");
}
}
approximate_balance_remaining = user_balance.remaining();
}
self.accounting_db_buffer.entry(stat.accounting_key(self.billing_period_seconds)).or_default().add(stat.clone(), approximate_balance_remaining).await;
}
if self.influxdb_client.is_some() {
// TODO: round the timestamp at all?
if let Some(opt_in_timeseries_key) = stat.owned_timeseries_key() {
self.opt_in_timeseries_buffer.entry(opt_in_timeseries_key).or_default().add(stat.clone(), approximate_balance_remaining).await;
}
let global_timeseries_key = stat.global_timeseries_key();
self.global_timeseries_buffer.entry(global_timeseries_key).or_default().add(stat.clone()).await;
if let Some(opt_in_timeseries_key) = stat.owned_timeseries_key() {
self.opt_in_timeseries_buffer.entry(opt_in_timeseries_key).or_default().add(stat.clone()).await;
}
}
if self.db_conn.is_some() {
self.accounting_db_buffer.entry(stat.accounting_key(self.billing_period_seconds)).or_default().add(stat).await;
self.global_timeseries_buffer.entry(global_timeseries_key).or_default().add(stat, approximate_balance_remaining).await;
}
}
None => {
@ -152,6 +204,7 @@ impl StatBuffer {
}
}
_ = db_save_interval.tick() => {
// TODO: tokio spawn this! (but with a semaphore on db_save_interval)
trace!("DB save internal tick");
let count = self.save_relational_stats().await;
if count > 0 {
@ -171,17 +224,15 @@ impl StatBuffer {
trace!("flush");
let tsdb_count = self.save_tsdb_stats().await;
if tsdb_count > 0 {
trace!("Flushed {} stats to the tsdb", tsdb_count);
}
let relational_count = self.save_relational_stats().await;
if relational_count > 0 {
trace!("Flushed {} stats to the relational db", relational_count);
}
if let Err(err) = x.send((tsdb_count, relational_count)) {
error!(%tsdb_count, %relational_count, ?err, "unable to notify about flushed stats");
let flushed_stats = FlushedStats{ timeseries: tsdb_count, relational: relational_count};
trace!(?flushed_stats);
if let Err(err) = x.send(flushed_stats) {
error!(?flushed_stats, ?err, "unable to notify about flushed stats");
}
}
None => {
@ -244,8 +295,8 @@ impl StatBuffer {
self.chain_id,
db_conn,
key,
&self.rpc_secret_key_cache,
&self.user_balance_cache,
&self.rpc_secret_key_cache,
)
.await
{

@ -3,7 +3,7 @@ use crate::config::TopConfig;
use crate::frontend::authorization::{Authorization, RequestMetadata, RpcSecretKey};
use crate::rpcs::one::Web3Rpc;
use crate::stats::StatBuffer;
use anyhow::{anyhow, Context};
use anyhow::Context;
use argh::FromArgs;
use entities::{rpc_accounting, rpc_key};
use futures::stream::FuturesUnordered;
@ -176,12 +176,14 @@ impl MigrateStatsToV2SubCommand {
let request_ulid = Ulid::new();
let chain_id = x.chain_id;
// Create RequestMetadata
let request_metadata = RequestMetadata {
archive_request: x.archive_request.into(),
authorization: Some(authorization.clone()),
backend_requests: Mutex::new(backend_rpcs),
chain_id: x.chain_id,
chain_id,
error_response: x.error_response.into(),
// debug data is in kafka, not mysql or influx
kafka_debug_logger: None,
@ -204,11 +206,10 @@ impl MigrateStatsToV2SubCommand {
stat_sender: Some(stat_sender.clone()),
request_ulid,
user_error_response: false.into(),
usd_per_cu: top_config.app.usd_per_cu.unwrap_or_default(),
};
if let Some(x) = request_metadata.try_send_stat()? {
return Err(anyhow!("failed saving stat! {:?}", x));
}
request_metadata.try_send_stat()?;
}
}

@ -95,7 +95,11 @@ impl PopularityContestSubCommand {
highest_block = highest_block.max(head_block);
let head_delay_ms = conn.get("head_delay_ms").unwrap().as_f64().unwrap();
// TODO: this was moved to an async lock and so serialize can't fetch it
let head_delay_ms = conn
.get("head_delay_ms")
.and_then(|x| x.as_f64())
.unwrap_or_default();
let peak_latency_ms = conn
.get("peak_latency_ms")

@ -1,7 +1,7 @@
#![forbid(unsafe_code)]
use crate::app::{flatten_handle, flatten_handles, Web3ProxyApp};
use crate::compute_units::default_usd_per_cu;
use crate::config::TopConfig;
use crate::stats::FlushedStats;
use crate::{frontend, prometheus};
use argh::FromArgs;
use futures::StreamExt;
@ -60,15 +60,20 @@ impl ProxydSubCommand {
/// this shouldn't really be pub except it makes test fixtures easier
#[allow(clippy::too_many_arguments)]
pub async fn _main(
top_config: TopConfig,
mut top_config: TopConfig,
top_config_path: Option<PathBuf>,
frontend_port: Arc<AtomicU16>,
prometheus_port: Arc<AtomicU16>,
num_workers: usize,
frontend_shutdown_sender: broadcast::Sender<()>,
flush_stat_buffer_sender: mpsc::Sender<oneshot::Sender<(usize, usize)>>,
flush_stat_buffer_receiver: mpsc::Receiver<oneshot::Sender<(usize, usize)>>,
flush_stat_buffer_sender: mpsc::Sender<oneshot::Sender<FlushedStats>>,
flush_stat_buffer_receiver: mpsc::Receiver<oneshot::Sender<FlushedStats>>,
) -> anyhow::Result<()> {
// TODO: this is gross but it works. i'd rather it be called by serde, but it needs to know the chain id
if top_config.app.usd_per_cu.is_none() {
top_config.app.usd_per_cu = Some(default_usd_per_cu(top_config.app.chain_id));
}
// tokio has code for catching ctrl+c so we use that to shut down in most cases
// frontend_shutdown_sender is currently only used in tests, but we might make a /shutdown endpoint or something
// we do not need this receiver. new receivers are made by `shutdown_sender.subscribe()`
@ -104,7 +109,12 @@ impl ProxydSubCommand {
thread::spawn(move || loop {
match fs::read_to_string(&top_config_path) {
Ok(new_top_config) => match toml::from_str::<TopConfig>(&new_top_config) {
Ok(new_top_config) => {
Ok(mut new_top_config) => {
if new_top_config.app.usd_per_cu.is_none() {
new_top_config.app.usd_per_cu =
Some(default_usd_per_cu(new_top_config.app.chain_id));
}
if new_top_config != current_config {
trace!("current_config: {:#?}", current_config);
trace!("new_top_config: {:#?}", new_top_config);

@ -10,7 +10,7 @@ use migration::sea_orm::{
};
use std::path::{Path, PathBuf};
use std::{fs::File, io::BufReader};
use tracing::{info};
use tracing::info;
#[derive(FromArgs, PartialEq, Eq, Debug)]
/// Import users from another database.

@ -1,12 +1,12 @@
use axum::headers::authorization::Bearer;
use migration::sea_orm::prelude::Uuid;
use serde::Serialize;
use serde::{Deserialize, Serialize};
use std::fmt;
use std::str::FromStr;
use ulid::Ulid;
/// Key used for caching the user's login
#[derive(Clone, Hash, PartialEq, Eq, Serialize)]
#[derive(Copy, Clone, Debug, Deserialize, Hash, PartialEq, Eq, Serialize)]
#[serde(transparent)]
pub struct UserBearerToken(pub Ulid);

@ -36,6 +36,7 @@ pub async fn admin_increase_balance(
.unwrap();
info!(?increase_balance_response, "http response");
// TODO: use a struct here
let increase_balance_response = increase_balance_response
.json::<serde_json::Value>()
.await

@ -32,6 +32,7 @@ use tracing::{info, trace, warn};
use web3_proxy::{
config::{AppConfig, TopConfig, Web3RpcConfig},
relational_db::get_migrated_db,
stats::FlushedStats,
sub_commands::ProxydSubCommand,
};
@ -59,23 +60,26 @@ pub struct TestApp {
pub proxy_provider: Provider<Http>,
/// tell the app to flush stats to the database
flush_stat_buffer_sender: mpsc::Sender<oneshot::Sender<(usize, usize)>>,
flush_stat_buffer_sender: mpsc::Sender<oneshot::Sender<FlushedStats>>,
/// tell the app to shut down (use `self.stop()`).
shutdown_sender: broadcast::Sender<()>,
}
impl TestApp {
pub async fn spawn(setup_db: bool) -> Self {
pub async fn spawn(chain_id: u64, setup_db: bool) -> Self {
info!(?chain_id);
let num_workers = 2;
// TODO: move basic setup into a test fixture
let path = env::var("PATH").unwrap();
info!("path: {}", path);
info!(%path);
// TODO: configurable rpc and block
let anvil = Anvil::new()
.chain_id(chain_id)
// .fork("https://polygon.llamarpc.com@44300000")
.spawn();
@ -244,7 +248,7 @@ impl TestApp {
// TODO: test influx
// TODO: test redis
let app_config: AppConfig = serde_json::from_value(json!({
"chain_id": 31337,
"chain_id": chain_id,
"db_url": db_url,
"default_user_max_requests_per_period": Some(6_000_000),
"deposit_factory_contract": Address::from_str(
@ -328,7 +332,7 @@ impl TestApp {
}
#[allow(unused)]
pub async fn flush_stats(&self) -> anyhow::Result<(usize, usize)> {
pub async fn flush_stats(&self) -> anyhow::Result<FlushedStats> {
let (tx, rx) = oneshot::channel();
self.flush_stat_buffer_sender.send(tx).await?;

@ -38,15 +38,20 @@ pub async fn create_user_as_admin(
};
info!(?admin_post_login_data);
let admin_login_response = r
let admin_post_login_data = r
.post(&login_post_url)
.json(&admin_post_login_data)
.send()
.await
.unwrap()
.json::<LoginPostResponse>()
.text()
.await
.unwrap();
info!("admin_post_login_data: {:#}", admin_post_login_data);
let admin_login_response: LoginPostResponse =
serde_json::from_str(&admin_post_login_data).unwrap();
info!(?admin_login_response);
// Upgrade the account to admin

@ -1,7 +1,12 @@
use crate::TestApp;
use entities::{user, user_tier};
use ethers::prelude::{LocalWallet, Signer};
use ethers::types::Signature;
use migration::sea_orm::{
self, ActiveModelTrait, ColumnTrait, EntityTrait, IntoActiveModel, QueryFilter,
};
use tracing::info;
use web3_proxy::errors::Web3ProxyResult;
use web3_proxy::frontend::users::authentication::{LoginPostResponse, PostLogin};
/// Helper function to create an "ordinary" user
@ -32,7 +37,7 @@ pub async fn create_user(
};
info!(?user_post_login_data);
let user_login_response = r
let mut user_login_response = r
.post(&login_post_url)
.json(&user_post_login_data)
.send()
@ -45,3 +50,27 @@ pub async fn create_user(
user_login_response
}
/// TODO: use an admin user to do this instead
#[allow(unused)]
pub async fn set_user_tier(
x: &TestApp,
user: user::Model,
tier_name: &str,
) -> Web3ProxyResult<user_tier::Model> {
let db_conn = x.db_conn();
let ut = user_tier::Entity::find()
.filter(user_tier::Column::Title.like(tier_name))
.one(db_conn)
.await?
.unwrap();
let mut user = user.into_active_model();
user.user_tier_id = sea_orm::Set(ut.id);
user.save(db_conn).await?;
Ok(ut)
}

@ -1,10 +1,10 @@
pub mod admin_deposits;
pub mod admin_increases_balance;
pub mod app;
pub mod create_admin;
pub mod create_user;
pub mod get_admin_deposits;
pub mod get_rpc_key;
pub mod get_user_balance;
pub mod referral;
pub mod rpc_key;
pub mod user_balance;
pub use self::app::TestApp;

@ -1,8 +1,13 @@
use std::time::Duration;
use crate::TestApp;
use serde::Deserialize;
use tracing::info;
use ulid::Ulid;
use web3_proxy::frontend::users::authentication::LoginPostResponse;
use web3_proxy::{
frontend::users::authentication::LoginPostResponse,
rpcs::provider::{connect_http, EthersHttpProvider},
};
#[derive(Debug, Deserialize)]
pub struct RpcKeyResponse {
@ -33,6 +38,7 @@ pub async fn user_get_first_rpc_key(
r: &reqwest::Client,
login_response: &LoginPostResponse,
) -> RpcKey {
// TODO: refactor to use login_response? or compare?
let get_keys = format!("{}user/keys", x.proxy_provider.url());
info!("Get balance");
@ -47,9 +53,30 @@ pub async fn user_get_first_rpc_key(
let rpc_key_response = rpc_key_response.json::<serde_json::Value>().await.unwrap();
info!(?rpc_key_response);
info!("Rpc Key");
info!("Parsing rpc key as json");
let rpc_key: RpcKeyResponse = serde_json::from_value(rpc_key_response).unwrap();
info!(?rpc_key);
rpc_key.user_rpc_keys.into_iter().next().unwrap().1
}
#[allow(unused)]
pub async fn user_get_provider(
x: &TestApp,
r: &reqwest::Client,
login_response: &LoginPostResponse,
) -> anyhow::Result<EthersHttpProvider> {
let first_key = login_response.rpc_keys.iter().next().unwrap().1;
let rpc_url = format!(
"{}rpc/{}",
x.proxy_provider.url(),
Ulid::from(first_key.secret_key)
);
connect_http(
rpc_url.parse().unwrap(),
Some(r.clone()),
Duration::from_secs(1),
)
}

@ -1,5 +1,7 @@
use crate::TestApp;
use tracing::info;
use serde_json::json;
use tracing::{info, trace};
use web3_proxy::balance::Balance;
use web3_proxy::frontend::users::authentication::LoginPostResponse;
/// Helper function to get the user's balance
@ -8,19 +10,24 @@ pub async fn user_get_balance(
x: &TestApp,
r: &reqwest::Client,
login_response: &LoginPostResponse,
) -> (serde_json::Value) {
) -> Balance {
let get_user_balance = format!("{}user/balance", x.proxy_provider.url());
info!("Get balance");
let balance_response = r
.get(get_user_balance)
.bearer_auth(login_response.bearer_token)
.send()
.await
.unwrap();
info!(?balance_response);
trace!(
?balance_response,
"get balance for user #{}",
login_response.user.id
);
let balance_response = balance_response.json::<serde_json::Value>().await.unwrap();
info!(?balance_response);
let balance = balance_response.json().await.unwrap();
balance_response
info!("balance: {:#}", json!(&balance));
balance
}

@ -5,8 +5,8 @@ use std::time::Duration;
use crate::common::admin_increases_balance::admin_increase_balance;
use crate::common::create_admin::create_user_as_admin;
use crate::common::create_user::create_user;
use crate::common::get_user_balance::user_get_balance;
use crate::common::create_user::{create_user, set_user_tier};
use crate::common::user_balance::user_get_balance;
use crate::common::TestApp;
use migration::sea_orm::prelude::Decimal;
use tracing::info;
@ -15,7 +15,7 @@ use tracing::info;
#[ignore = "under construction"]
#[test_log::test(tokio::test)]
async fn test_admin_imitate_user() {
let x = TestApp::spawn(true).await;
let x = TestApp::spawn(31337, true).await;
todo!();
}
@ -24,7 +24,7 @@ async fn test_admin_imitate_user() {
#[test_log::test(tokio::test)]
async fn test_admin_grant_credits() {
info!("Starting admin grant credits test");
let x = TestApp::spawn(true).await;
let x = TestApp::spawn(31337, true).await;
let r = reqwest::Client::builder()
.timeout(Duration::from_secs(3))
.build()
@ -39,6 +39,8 @@ async fn test_admin_grant_credits() {
let admin_login_response = create_user_as_admin(&x, &r, &admin_wallet).await;
info!(?admin_login_response);
set_user_tier(&x, user_login_response.user.clone(), "Premium").await.unwrap();
let increase_balance_response = admin_increase_balance(
&x,
&r,
@ -53,11 +55,8 @@ async fn test_admin_grant_credits() {
Decimal::from(100)
);
let user_balance_response = user_get_balance(&x, &r, &user_login_response).await;
assert_eq!(
Decimal::from_str(user_balance_response["balance"].as_str().unwrap()).unwrap(),
Decimal::from(100)
);
let user_balance = user_get_balance(&x, &r, &user_login_response).await;
assert_eq!(user_balance.remaining(), Decimal::from(100));
x.wait().await;
}
@ -66,6 +65,6 @@ async fn test_admin_grant_credits() {
#[ignore = "under construction"]
#[test_log::test(tokio::test)]
async fn test_admin_change_user_tier() {
let x = TestApp::spawn(true).await;
let x = TestApp::spawn(31337, true).await;
todo!();
}

@ -13,7 +13,7 @@ use web3_proxy::rpcs::blockchain::ArcBlock;
#[cfg_attr(not(feature = "tests-needing-docker"), ignore)]
#[test_log::test(tokio::test)]
async fn it_migrates_the_db() {
let x = TestApp::spawn(true).await;
let x = TestApp::spawn(31337, true).await;
// we call flush stats more to be sure it works than because we expect it to save any stats
x.flush_stats().await.unwrap();
@ -21,7 +21,7 @@ async fn it_migrates_the_db() {
#[test_log::test(tokio::test)]
async fn it_starts_and_stops() {
let x = TestApp::spawn(false).await;
let x = TestApp::spawn(31337, false).await;
let anvil_provider = &x.anvil_provider;
let proxy_provider = &x.proxy_provider;

@ -0,0 +1,205 @@
mod common;
use crate::common::{
admin_increases_balance::admin_increase_balance,
create_admin::create_user_as_admin,
create_user::{create_user, set_user_tier},
rpc_key::user_get_provider,
user_balance::user_get_balance,
TestApp,
};
use ethers::prelude::U64;
use migration::sea_orm::prelude::Decimal;
use std::time::Duration;
use tracing::info;
use web3_proxy::balance::Balance;
// TODO: #[cfg_attr(not(feature = "tests-needing-docker"), ignore)]
#[test_log::test(tokio::test)]
async fn test_sum_credits_used() {
// chain_id 999_001_999 costs $.10/CU
let x = TestApp::spawn(999_001_999, true).await;
let r = reqwest::Client::builder()
.timeout(Duration::from_secs(3))
.build()
.unwrap();
// create wallets for users
let user_wallet = x.wallet(0);
let admin_wallet = x.wallet(1);
// log in to create users
let admin_login_response = create_user_as_admin(&x, &r, &admin_wallet).await;
let user_login_response = create_user(&x, &r, &user_wallet, None).await;
set_user_tier(&x, user_login_response.user.clone(), "Premium")
.await
.unwrap();
// TODO: set the user's user_id to the "Premium" tier
info!("starting balance");
let balance: Balance = user_get_balance(&x, &r, &user_login_response).await;
assert_eq!(
balance.total_frontend_requests, 0,
"total_frontend_requests"
);
assert_eq!(balance.total_cache_misses, 0, "total_cache_misses");
assert_eq!(
balance.total_spent_paid_credits,
0.into(),
"total_spent_paid_credits"
);
assert_eq!(balance.total_spent, 0.into(), "total_spent");
assert_eq!(balance.remaining(), 0.into(), "remaining");
assert!(!balance.active_premium(), "active_premium");
assert!(!balance.was_ever_premium(), "was_ever_premium");
info!("make one free request against the public RPC of 16 CU");
x.proxy_provider
.request::<_, Option<U64>>("eth_blockNumber", ())
.await
.unwrap();
// connect to the user's rpc url
let user_proxy_provider = user_get_provider(&x, &r, &user_login_response)
.await
.unwrap();
info!("make one cached authenticated (but out of funds) rpc request of 16 CU");
user_proxy_provider
.request::<_, Option<U64>>("eth_blockNumber", ())
.await
.unwrap();
let query_cost: Decimal = "1.00".parse().unwrap();
// let archive_multiplier: Decimal = "2.5".parse().unwrap();
let cache_multipler: Decimal = "0.75".parse().unwrap();
let cached_query_cost: Decimal = query_cost * cache_multipler;
// flush stats
let flushed = x.flush_stats().await.unwrap();
assert_eq!(flushed.relational, 2, "relational");
assert_eq!(flushed.timeseries, 0, "timeseries");
// Give user wallet $1000
admin_increase_balance(&x, &r, &admin_login_response, &user_wallet, 1000.into()).await;
// check balance
let balance: Balance = user_get_balance(&x, &r, &user_login_response).await;
assert_eq!(
balance.total_frontend_requests, 1,
"total_frontend_requests"
);
assert_eq!(balance.total_cache_misses, 0, "total_cache_misses");
assert_eq!(
balance.total_spent_paid_credits,
0.into(),
"total_spent_paid_credits"
);
assert_eq!(balance.total_spent, cached_query_cost, "total_spent"); // TODO: not sure what this should be
assert_eq!(balance.remaining(), 1000.into(), "remaining");
assert!(balance.active_premium(), "active_premium");
assert!(balance.was_ever_premium(), "was_ever_premium");
info!("make one cached authenticated rpc request of 16 CU");
user_proxy_provider
.request::<_, Option<U64>>("eth_blockNumber", ())
.await
.unwrap();
// flush stats
let flushed = x.flush_stats().await.unwrap();
assert_eq!(flushed.relational, 1);
// check balance
let balance: Balance = user_get_balance(&x, &r, &user_login_response).await;
assert_eq!(
balance.total_frontend_requests, 2,
"total_frontend_requests"
);
assert_eq!(balance.total_cache_misses, 0, "total_cache_misses");
assert_eq!(
balance.total_spent,
cached_query_cost * Decimal::from(2),
"total_spent"
);
assert_eq!(
balance.total_spent_paid_credits, cached_query_cost,
"total_spent_paid_credits"
);
assert_eq!(
balance.remaining(),
Decimal::from(1000) - cached_query_cost,
"remaining"
);
assert!(balance.active_premium(), "active_premium");
assert!(balance.was_ever_premium(), "was_ever_premium");
info!("make ten cached authenticated requests of 16 CU");
for _ in 0..10 {
user_proxy_provider
.request::<_, Option<U64>>("eth_blockNumber", ())
.await
.unwrap();
}
// flush stats
let flushed = x.flush_stats().await.unwrap();
assert_eq!(flushed.relational, 1);
// check balance
info!("checking the final balance");
let balance: Balance = user_get_balance(&x, &r, &user_login_response).await;
// the first of our 12 total requests request was on the free tier, so paid_credits should only count 11
let expected_total_spent_paid_credits = Decimal::from(11) * cached_query_cost;
assert_eq!(
balance.total_frontend_requests, 12,
"total_frontend_requests"
);
assert_eq!(balance.total_cache_misses, 0, "total_cache_misses");
assert_eq!(
balance.total_spent_paid_credits, expected_total_spent_paid_credits,
"total_spent_paid_credits"
);
assert_eq!(
balance.total_spent,
expected_total_spent_paid_credits + cached_query_cost,
"total_spent"
);
assert_eq!(
balance.remaining(),
Decimal::from(1000) - expected_total_spent_paid_credits
);
assert!(balance.active_premium(), "active_premium");
assert!(balance.was_ever_premium(), "was_ever_premium");
// TODO: make enough queries to push the user balance negative
// check admin's balance to make sure nothing is leaking
info!("checking the admin");
let admin_balance: Balance = user_get_balance(&x, &r, &admin_login_response).await;
assert!(!admin_balance.active_premium(), "active_premium");
assert!(!admin_balance.was_ever_premium(), "was_ever_premium");
assert_eq!(
admin_balance.total_frontend_requests, 0,
"total_frontend_requests"
);
assert_eq!(admin_balance.total_cache_misses, 0, "total_cache_misses");
assert_eq!(
admin_balance.total_spent_paid_credits,
0.into(),
"total_spent_paid_credits"
);
assert_eq!(admin_balance.total_spent, 0.into(), "total_spent");
assert_eq!(admin_balance.remaining(), 0.into(), "remaining");
// TODO: query "user 0" to get the public counts
}

@ -1,15 +1,15 @@
mod common;
use crate::common::admin_deposits::get_admin_deposits;
use crate::common::admin_increases_balance::admin_increase_balance;
use crate::common::create_admin::create_user_as_admin;
use crate::common::create_user::create_user;
use crate::common::get_admin_deposits::get_admin_deposits;
use crate::common::get_rpc_key::{user_get_first_rpc_key, RpcKey};
use crate::common::get_user_balance::user_get_balance;
use crate::common::create_user::{create_user, set_user_tier};
use crate::common::referral::{
get_referral_code, get_shared_referral_codes, get_used_referral_codes, UserSharedReferralInfo,
UserUsedReferralInfo,
};
use crate::common::rpc_key::{user_get_first_rpc_key, RpcKey};
use crate::common::user_balance::user_get_balance;
use crate::common::TestApp;
use ethers::prelude::{Http, Provider};
use ethers::{signers::Signer, types::Signature};
@ -36,7 +36,7 @@ struct LoginPostResponse {
#[cfg_attr(not(feature = "tests-needing-docker"), ignore)]
#[test_log::test(tokio::test)]
async fn test_log_in_and_out() {
let x = TestApp::spawn(true).await;
let x = TestApp::spawn(31337, true).await;
let r = reqwest::Client::new();
@ -92,7 +92,7 @@ async fn test_log_in_and_out() {
#[test_log::test(tokio::test)]
async fn test_admin_balance_increase() {
info!("Starting admin can increase balance");
let x = TestApp::spawn(true).await;
let x = TestApp::spawn(31337, true).await;
let r = reqwest::Client::builder()
.timeout(Duration::from_secs(20))
.build()
@ -105,6 +105,10 @@ async fn test_admin_balance_increase() {
let admin_login_response = create_user_as_admin(&x, &r, &admin_wallet).await;
let user_login_response = create_user(&x, &r, &user_wallet, None).await;
set_user_tier(&x, user_login_response.user.clone(), "Premium")
.await
.unwrap();
// Bump both user's wallet to $20
admin_increase_balance(
&x,
@ -139,7 +143,7 @@ async fn test_admin_balance_increase() {
#[test_log::test(tokio::test)]
async fn test_user_balance_decreases() {
info!("Starting balance decreases with usage test");
let x = TestApp::spawn(true).await;
let x = TestApp::spawn(31337, true).await;
let r = reqwest::Client::builder()
.timeout(Duration::from_secs(20))
.build()
@ -152,13 +156,17 @@ async fn test_user_balance_decreases() {
let admin_login_response = create_user_as_admin(&x, &r, &admin_wallet).await;
let user_login_response = create_user(&x, &r, &user_wallet, None).await;
set_user_tier(&x, user_login_response.user.clone(), "Premium")
.await
.unwrap();
// Get the rpc keys for this user
let rpc_keys: RpcKey = user_get_first_rpc_key(&x, &r, &user_login_response).await;
let proxy_endpoint = format!("{}rpc/{}", x.proxy_provider.url(), rpc_keys.secret_key);
let proxy_provider = Provider::<Http>::try_from(proxy_endpoint).unwrap();
// Make some requests while in the free tier, so we can test bookkeeping here
for _ in 1..10_000 {
for _ in 1..=10_000 {
let _ = proxy_provider
.request::<_, Option<ArcBlock>>("eth_getBlockByNumber", ("latest", false))
.await
@ -167,35 +175,20 @@ async fn test_user_balance_decreases() {
}
// Flush all stats here
let (influx_count, mysql_count) = x.flush_stats().await.unwrap();
assert_eq!(influx_count, 0);
assert!(mysql_count > 0);
let flush_count = x.flush_stats().await.unwrap();
assert_eq!(flush_count.timeseries, 0);
assert!(flush_count.relational > 0);
// Check the balance, it should not have decreased; there should have been accounted free credits, however
let user_balance_response = user_get_balance(&x, &r, &user_login_response).await;
let user_balance = user_get_balance(&x, &r, &user_login_response).await;
// Check that the balance is 0
assert_eq!(
Decimal::from_str(user_balance_response["balance"].as_str().unwrap()).unwrap(),
Decimal::from(0)
);
assert_eq!(user_balance.remaining(), Decimal::from(0));
// Check that paid credits is 0 (because balance is 0)
assert_eq!(
Decimal::from_str(
user_balance_response["total_spent_paid_credits"]
.as_str()
.unwrap()
)
.unwrap(),
Decimal::from(0)
);
assert_eq!(user_balance.total_spent_paid_credits, Decimal::from(0));
// Check that paid credits is 0 (because balance is 0)
assert_eq!(
Decimal::from_str(user_balance_response["total_deposits"].as_str().unwrap()).unwrap(),
Decimal::from(0)
);
assert_eq!(user_balance.total_deposits(), Decimal::from(0));
// Check that total credits incl free used is larger than 0
let previously_free_spent =
Decimal::from_str(user_balance_response["total_spent"].as_str().unwrap()).unwrap();
let previously_free_spent = user_balance.total_spent;
assert!(previously_free_spent > Decimal::from(0));
// Bump both user's wallet to $20
@ -208,11 +201,10 @@ async fn test_user_balance_decreases() {
)
.await;
let user_balance_response = user_get_balance(&x, &r, &user_login_response).await;
let user_balance_pre =
Decimal::from_str(user_balance_response["balance"].as_str().unwrap()).unwrap();
let user_balance_pre = user_balance_response.remaining();
assert_eq!(user_balance_pre, Decimal::from(20));
for _ in 1..10_000 {
for _ in 1..=10_000 {
let _ = proxy_provider
.request::<_, Option<ArcBlock>>("eth_getBlockByNumber", ("latest", false))
.await
@ -221,54 +213,43 @@ async fn test_user_balance_decreases() {
}
// Flush all stats here
let (influx_count, mysql_count) = x.flush_stats().await.unwrap();
assert_eq!(influx_count, 0);
assert!(mysql_count > 0);
let flush_count = x.flush_stats().await.unwrap();
assert_eq!(flush_count.timeseries, 0);
assert!(flush_count.relational == 1);
// Deposits should not be affected, and should be equal to what was initially provided
let user_balance_response = user_get_balance(&x, &r, &user_login_response).await;
let total_deposits =
Decimal::from_str(user_balance_response["total_deposits"].as_str().unwrap()).unwrap();
let user_balance = user_get_balance(&x, &r, &user_login_response).await;
let total_deposits = user_balance.total_deposits();
assert_eq!(total_deposits, Decimal::from(20));
// Check that total_spent_paid credits is equal to total_spent, because we are all still inside premium
assert_eq!(
Decimal::from_str(
user_balance_response["total_spent_paid_credits"]
.as_str()
.unwrap()
)
.unwrap()
+ previously_free_spent,
Decimal::from_str(user_balance_response["total_spent"].as_str().unwrap()).unwrap()
user_balance.total_spent_paid_credits + previously_free_spent,
user_balance.total_spent,
);
// Get the full balance endpoint
let user_balance_post =
Decimal::from_str(user_balance_response["balance"].as_str().unwrap()).unwrap();
let user_balance_post = user_balance.remaining();
assert!(user_balance_post < user_balance_pre);
// 10k while free, 10k while premium
assert_eq!(user_balance.total_frontend_requests, 20_000);
// Balance should be total deposits - usage while in the paid tier
let total_spent_in_paid_credits = Decimal::from_str(
user_balance_response["total_spent_paid_credits"]
.as_str()
.unwrap(),
)
.unwrap();
let total_spent_in_paid_credits = user_balance.total_spent_paid_credits;
assert_eq!(
total_deposits - total_spent_in_paid_credits,
user_balance_post
);
// This should never be negative
let user_balance_total_spent =
Decimal::from_str(user_balance_response["total_spent"].as_str().unwrap()).unwrap();
assert!(user_balance_total_spent > Decimal::from(0));
assert!(user_balance.total_spent > Decimal::from(0));
}
#[cfg_attr(not(feature = "tests-needing-docker"), ignore)]
#[test_log::test(tokio::test)]
async fn test_referral_bonus_non_concurrent() {
info!("Starting referral bonus test");
let x = TestApp::spawn(true).await;
let x = TestApp::spawn(31337, true).await;
let r = reqwest::Client::builder()
.timeout(Duration::from_secs(20))
.build()
@ -286,6 +267,13 @@ async fn test_referral_bonus_non_concurrent() {
let user_login_response = create_user(&x, &r, &user_wallet, Some(referral_link.clone())).await;
set_user_tier(&x, referrer_login_response.user.clone(), "Premium")
.await
.unwrap();
set_user_tier(&x, user_login_response.user.clone(), "Premium")
.await
.unwrap();
// Bump both user's wallet to $20
admin_increase_balance(
&x,
@ -306,11 +294,9 @@ async fn test_referral_bonus_non_concurrent() {
// Get balance before for both users
let user_balance_response = user_get_balance(&x, &r, &user_login_response).await;
let user_balance_pre =
Decimal::from_str(user_balance_response["balance"].as_str().unwrap()).unwrap();
let user_balance_pre = user_balance_response.remaining();
let referrer_balance_response = user_get_balance(&x, &r, &referrer_login_response).await;
let referrer_balance_pre =
Decimal::from_str(referrer_balance_response["balance"].as_str().unwrap()).unwrap();
let referrer_balance_pre = referrer_balance_response.remaining();
// Make sure they both have balance now
assert_eq!(user_balance_pre, Decimal::from(20));
@ -354,7 +340,7 @@ async fn test_referral_bonus_non_concurrent() {
let proxy_endpoint = format!("{}rpc/{}", x.proxy_provider.url(), rpc_keys.secret_key);
let proxy_provider = Provider::<Http>::try_from(proxy_endpoint).unwrap();
for _ in 1..20_000 {
for _ in 1..=20_000 {
let _proxy_result = proxy_provider
.request::<_, Option<ArcBlock>>("eth_getBlockByNumber", ("latest", false))
.await
@ -363,24 +349,25 @@ async fn test_referral_bonus_non_concurrent() {
}
// Flush all stats here
let (influx_count, mysql_count) = x.flush_stats().await.unwrap();
assert_eq!(influx_count, 0);
assert!(mysql_count > 0);
let flush_count = x.flush_stats().await.unwrap();
assert_eq!(flush_count.timeseries, 0);
assert!(flush_count.relational > 0);
// Check that at least something was earned:
let shared_referral_code: UserSharedReferralInfo =
get_shared_referral_codes(&x, &r, &referrer_login_response).await;
info!("Referral code");
info!("{:?}", shared_referral_code.referrals.get(0).unwrap());
info!(referrals=?shared_referral_code.referrals.get(0).unwrap(), "Referral code");
let user_balance = user_get_balance(&x, &r, &user_login_response).await;
// first, make sure that 20k requests were saved to the db
assert_eq!(user_balance.total_frontend_requests, 20_000);
// We make sure that the referrer has $10 + 10% of the used balance
// The admin provides credits for both
let user_balance_response = user_get_balance(&x, &r, &user_login_response).await;
let user_balance_post =
Decimal::from_str(user_balance_response["balance"].as_str().unwrap()).unwrap();
let referrer_balance_response = user_get_balance(&x, &r, &referrer_login_response).await;
let referrer_balance_post =
Decimal::from_str(referrer_balance_response["balance"].as_str().unwrap()).unwrap();
let user_balance_post = user_balance.remaining();
let referrer_balance = user_get_balance(&x, &r, &referrer_login_response).await;
let referrer_balance_post = referrer_balance.remaining();
info!(
"Balances before and after are (user): {:?} {:?}",
@ -412,7 +399,7 @@ async fn test_referral_bonus_non_concurrent() {
#[test_log::test(tokio::test)]
async fn test_referral_bonus_concurrent_referrer_only() {
info!("Starting referral bonus test");
let x = TestApp::spawn(true).await;
let x = TestApp::spawn(31337, true).await;
let r = reqwest::Client::builder()
.timeout(Duration::from_secs(20))
.build()
@ -430,6 +417,13 @@ async fn test_referral_bonus_concurrent_referrer_only() {
let user_login_response = create_user(&x, &r, &user_wallet, Some(referral_link.clone())).await;
set_user_tier(&x, referrer_login_response.user.clone(), "Premium")
.await
.unwrap();
set_user_tier(&x, user_login_response.user.clone(), "Premium")
.await
.unwrap();
// Bump both user's wallet to $20
admin_increase_balance(
&x,
@ -450,11 +444,9 @@ async fn test_referral_bonus_concurrent_referrer_only() {
// Get balance before for both users
let user_balance_response = user_get_balance(&x, &r, &user_login_response).await;
let user_balance_pre =
Decimal::from_str(user_balance_response["balance"].as_str().unwrap()).unwrap();
let user_balance_pre = user_balance_response.remaining();
let referrer_balance_response = user_get_balance(&x, &r, &referrer_login_response).await;
let referrer_balance_pre =
Decimal::from_str(referrer_balance_response["balance"].as_str().unwrap()).unwrap();
let referrer_balance_pre = referrer_balance_response.remaining();
// Make sure they both have balance now
assert_eq!(user_balance_pre, Decimal::from(20));
@ -521,9 +513,9 @@ async fn test_referral_bonus_concurrent_referrer_only() {
}
// Flush all stats here
let (influx_count, mysql_count) = x.flush_stats().await.unwrap();
assert_eq!(influx_count, 0);
assert!(mysql_count > 0);
let flush_count = x.flush_stats().await.unwrap();
assert_eq!(flush_count.timeseries, 0);
assert!(flush_count.relational > 0);
// Check that at least something was earned:
let shared_referral_code: UserSharedReferralInfo =
@ -534,11 +526,9 @@ async fn test_referral_bonus_concurrent_referrer_only() {
// We make sure that the referrer has $10 + 10% of the used balance
// The admin provides credits for both
let user_balance_response = user_get_balance(&x, &r, &user_login_response).await;
let user_balance_post =
Decimal::from_str(user_balance_response["balance"].as_str().unwrap()).unwrap();
let user_balance_post = user_balance_response.remaining();
let referrer_balance_response = user_get_balance(&x, &r, &referrer_login_response).await;
let referrer_balance_post =
Decimal::from_str(referrer_balance_response["balance"].as_str().unwrap()).unwrap();
let referrer_balance_post = referrer_balance_response.remaining();
info!(
"Balances before and after are (user): {:?} {:?}",
@ -570,7 +560,7 @@ async fn test_referral_bonus_concurrent_referrer_only() {
#[test_log::test(tokio::test)]
async fn test_referral_bonus_concurrent_referrer_and_user() {
info!("Starting referral bonus test");
let x = TestApp::spawn(true).await;
let x = TestApp::spawn(31337, true).await;
let r = reqwest::Client::builder()
.timeout(Duration::from_secs(20))
.build()
@ -588,6 +578,13 @@ async fn test_referral_bonus_concurrent_referrer_and_user() {
let user_login_response = create_user(&x, &r, &user_wallet, Some(referral_link.clone())).await;
set_user_tier(&x, referrer_login_response.user.clone(), "Premium")
.await
.unwrap();
set_user_tier(&x, user_login_response.user.clone(), "Premium")
.await
.unwrap();
// Bump both user's wallet to $20
admin_increase_balance(
&x,
@ -608,11 +605,9 @@ async fn test_referral_bonus_concurrent_referrer_and_user() {
// Get balance before for both users
let user_balance_response = user_get_balance(&x, &r, &user_login_response).await;
let user_balance_pre =
Decimal::from_str(user_balance_response["balance"].as_str().unwrap()).unwrap();
let user_balance_pre = user_balance_response.remaining();
let referrer_balance_response = user_get_balance(&x, &r, &referrer_login_response).await;
let referrer_balance_pre =
Decimal::from_str(referrer_balance_response["balance"].as_str().unwrap()).unwrap();
let referrer_balance_pre = referrer_balance_response.remaining();
// Make sure they both have balance now
assert_eq!(user_balance_pre, Decimal::from(20));
@ -700,9 +695,9 @@ async fn test_referral_bonus_concurrent_referrer_and_user() {
}
// Flush all stats here
let (influx_count, mysql_count) = x.flush_stats().await.unwrap();
assert_eq!(influx_count, 0);
assert!(mysql_count > 0);
let flush_count = x.flush_stats().await.unwrap();
assert_eq!(flush_count.timeseries, 0);
assert!(flush_count.relational > 0);
// Check that at least something was earned:
let shared_referral_code: UserSharedReferralInfo =
@ -713,11 +708,9 @@ async fn test_referral_bonus_concurrent_referrer_and_user() {
// We make sure that the referrer has $10 + 10% of the used balance
// The admin provides credits for both
let user_balance_response = user_get_balance(&x, &r, &user_login_response).await;
let user_balance_post =
Decimal::from_str(user_balance_response["balance"].as_str().unwrap()).unwrap();
let user_balance_post = user_balance_response.remaining();
let referrer_balance_response = user_get_balance(&x, &r, &referrer_login_response).await;
let referrer_balance_post =
Decimal::from_str(referrer_balance_response["balance"].as_str().unwrap()).unwrap();
let referrer_balance_post = referrer_balance_response.remaining();
info!(
"Balances before and after are (user): {:?} {:?}",