From df2f3d340f3d5c889c8d803e176010f51e2b8fd4 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Wed, 12 Jul 2023 00:35:07 -0700 Subject: [PATCH] More balance tests (#182) * fix popularity contest * more info in the Debug for Web3Rpc * add frontend_requests and cache_misses to the Balance query * add more to balance and stats flushing and improved test coverage * it compiles * deserializer for Ulid to Uuid I think a wrapper type on Ulid that implements sea_orm::Value is probably better * rename variable to match struct name * add deserializer for Address -> Vec * sql sum returns a Decimal. need to convert to u64 * assert more * one log and assert more * log more * use a helper to get the user's rpc provider * this should be 2 now that we have a public and authed call * this should be zero. the public has the cache miss * instrument cu calcs * trace the value we took, not the default that replaced it * move usd_per_chain into config * remove some extra logging * use Arc::into_inner to maybe avoid a race * off by 1 * pass paid credits used instead of returning it this lets us use it to write to our user balance cache first. importantly, this keeps us from holding a write lock while writing to mysql * no cache misses expected in this test * actually check the admin * put the balance checks back now that the rest of the test works * archive request is being set incorrectly * wow howd we manage flipping the greater than sign on archive depth * move latest_balance and premium_credits_used to before any stats are emitted * lint * and build undoes the linting. fun i didnt even want to lint them in the first place, so this is fine * missed incrementing total_spent when not incrementing total_spent_paid_credits * use the credits on self * use the credits on self (pt 2) * fix type for 10 cu query * convert the requestmetadata on the other side of the channel * logs * viewing stats is allowed even without a balance * move paid_credits_used to AuthorizationChecks * wip * test_sum_credits_used finally passes * UserBalanceCache::get_or_insert * re-enable rpc_secret_key_cache * move invalidate to a helper function and always call it **after** the db is commited * fix PartialEq and Eq on RpcSecretKey * cargo upgrade --- Cargo.lock | 16 +- entities/src/login.rs | 5 +- entities/src/pending_login.rs | 5 +- entities/src/revert_log.rs | 5 +- entities/src/rpc_key.rs | 5 +- entities/src/serialization.rs | 14 +- entities/src/user.rs | 5 +- latency/Cargo.toml | 2 +- web3_proxy/Cargo.toml | 4 +- web3_proxy/src/app/mod.rs | 50 +++-- web3_proxy/src/balance.rs | 89 +++++--- web3_proxy/src/caches.rs | 72 +++++- web3_proxy/src/compute_units.rs | 36 ++- web3_proxy/src/config.rs | 3 + web3_proxy/src/frontend/admin.rs | 13 +- web3_proxy/src/frontend/authorization.rs | 175 ++++++++------- web3_proxy/src/frontend/rpc_proxy_http.rs | 2 +- web3_proxy/src/frontend/status.rs | 2 +- .../src/frontend/users/authentication.rs | 43 ++-- web3_proxy/src/frontend/users/payment.rs | 30 +-- .../src/frontend/users/payment_stripe.rs | 22 +- web3_proxy/src/lib.rs | 1 + web3_proxy/src/response_cache.rs | 6 +- web3_proxy/src/rpcs/consensus.rs | 34 ++- web3_proxy/src/rpcs/one.rs | 16 ++ web3_proxy/src/stats/influxdb_queries.rs | 37 +--- web3_proxy/src/stats/mod.rs | 197 +++++++---------- web3_proxy/src/stats/stat_buffer.rs | 107 ++++++--- .../src/sub_commands/migrate_stats_to_v2.rs | 11 +- .../src/sub_commands/popularity_contest.rs | 6 +- web3_proxy/src/sub_commands/proxyd.rs | 22 +- web3_proxy/src/sub_commands/user_import.rs | 2 +- web3_proxy/src/user_token.rs | 4 +- ...et_admin_deposits.rs => admin_deposits.rs} | 0 .../tests/common/admin_increases_balance.rs | 1 + web3_proxy/tests/common/app.rs | 14 +- web3_proxy/tests/common/create_admin.rs | 9 +- web3_proxy/tests/common/create_user.rs | 31 ++- web3_proxy/tests/common/mod.rs | 6 +- .../common/{get_rpc_key.rs => rpc_key.rs} | 31 ++- .../{get_user_balance.rs => user_balance.rs} | 21 +- web3_proxy/tests/test_admins.rs | 19 +- web3_proxy/tests/test_proxy.rs | 4 +- web3_proxy/tests/test_sum_credits_used.rs | 205 ++++++++++++++++++ web3_proxy/tests/test_users.rs | 193 ++++++++--------- 45 files changed, 1011 insertions(+), 564 deletions(-) rename web3_proxy/tests/common/{get_admin_deposits.rs => admin_deposits.rs} (100%) rename web3_proxy/tests/common/{get_rpc_key.rs => rpc_key.rs} (66%) rename web3_proxy/tests/common/{get_user_balance.rs => user_balance.rs} (60%) create mode 100644 web3_proxy/tests/test_sum_credits_used.rs diff --git a/Cargo.lock b/Cargo.lock index 113e1202..d3baa4cf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3460,9 +3460,9 @@ dependencies = [ [[package]] name = "num" -version = "0.4.0" +version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43db66d1170d347f9a065114077f7dccb00c1b9478c89384490a3425279a4606" +checksum = "b05180d69e3da0e530ba2a1dae5110317e49e3b7f3d41be227dc5f92e49ee7af" dependencies = [ "num-bigint", "num-complex", @@ -4133,9 +4133,9 @@ dependencies = [ [[package]] name = "portable-atomic" -version = "1.3.3" +version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "767eb9f07d4a5ebcb39bbf2d452058a93c011373abf6832e24194a1c3f004794" +checksum = "d220334a184db82b31b83f5ff093e3315280fb2b6bbc032022b2304a509aab7a" [[package]] name = "ppv-lite86" @@ -4238,9 +4238,9 @@ checksum = "dc375e1527247fe1a97d8b7156678dfe7c1af2fc075c9a4db3690ecd2a148068" [[package]] name = "proc-macro2" -version = "1.0.63" +version = "1.0.64" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b368fba921b0dce7e60f5e04ec15e565b3303972b42bcfde1d0713b881959eb" +checksum = "78803b62cbf1f46fde80d7c0e803111524b9877184cfe7c3033659490ac7a7da" dependencies = [ "unicode-ident", ] @@ -5463,9 +5463,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.100" +version = "1.0.102" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f1e14e89be7aa4c4b78bdbdc9eb5bf8517829a600ae8eaa39a6e1d960b5185c" +checksum = "b5062a995d481b2308b6064e9af76011f2921c35f97b0468811ed9f6cd91dfed" dependencies = [ "itoa", "ryu", diff --git a/entities/src/login.rs b/entities/src/login.rs index 83d04cf6..094ef91f 100644 --- a/entities/src/login.rs +++ b/entities/src/login.rs @@ -10,7 +10,10 @@ pub struct Model { #[sea_orm(primary_key)] pub id: u64, #[sea_orm(column_type = "Binary(BlobSize::Blob(Some(16)))", unique)] - #[serde(serialize_with = "serialization::uuid_as_ulid")] + #[serde( + serialize_with = "serialization::uuid_as_ulid", + deserialize_with = "serialization::ulid_to_uuid" + )] pub bearer_token: Uuid, pub user_id: u64, pub expires_at: DateTimeUtc, diff --git a/entities/src/pending_login.rs b/entities/src/pending_login.rs index 6e99d842..344bd7d9 100644 --- a/entities/src/pending_login.rs +++ b/entities/src/pending_login.rs @@ -10,7 +10,10 @@ pub struct Model { #[sea_orm(primary_key)] pub id: u64, #[sea_orm(column_type = "Binary(BlobSize::Blob(Some(16)))", unique)] - #[serde(serialize_with = "serialization::uuid_as_ulid")] + #[serde( + serialize_with = "serialization::uuid_as_ulid", + deserialize_with = "serialization::ulid_to_uuid" + )] pub nonce: Uuid, #[sea_orm(column_type = "Text")] pub message: String, diff --git a/entities/src/revert_log.rs b/entities/src/revert_log.rs index 8f96ed9f..f377bbbe 100644 --- a/entities/src/revert_log.rs +++ b/entities/src/revert_log.rs @@ -14,7 +14,10 @@ pub struct Model { pub timestamp: DateTimeUtc, pub method: Method, #[sea_orm(column_type = "Binary(BlobSize::Blob(Some(20)))")] - #[serde(serialize_with = "serialization::vec_as_address")] + #[serde( + serialize_with = "serialization::vec_as_address", + deserialize_with = "serialization::address_to_vec" + )] pub to: Vec, #[sea_orm(column_type = "Text", nullable)] pub call_data: Option, diff --git a/entities/src/rpc_key.rs b/entities/src/rpc_key.rs index 6a202b52..e9114a2b 100644 --- a/entities/src/rpc_key.rs +++ b/entities/src/rpc_key.rs @@ -11,7 +11,10 @@ pub struct Model { pub id: u64, pub user_id: u64, #[sea_orm(column_type = "Binary(BlobSize::Blob(Some(16)))", unique)] - #[serde(serialize_with = "serialization::uuid_as_ulid")] + #[serde( + serialize_with = "serialization::uuid_as_ulid", + deserialize_with = "serialization::ulid_to_uuid" + )] pub secret_key: Uuid, pub description: Option, pub private_txs: bool, diff --git a/entities/src/serialization.rs b/entities/src/serialization.rs index 53182c25..7d8609ca 100644 --- a/entities/src/serialization.rs +++ b/entities/src/serialization.rs @@ -1,7 +1,7 @@ //! sea-orm types don't always serialize how we want. this helps that, though it won't help every case. use ethers::prelude::Address; use sea_orm::prelude::Uuid; -use serde::{Serialize, Serializer}; +use serde::{Deserialize, Deserializer, Serialize, Serializer}; use std::convert::TryInto; use ulid::Ulid; @@ -19,6 +19,12 @@ where x.serialize(s) } +pub fn address_to_vec<'de, D: Deserializer<'de>>(deserializer: D) -> Result, D::Error> { + let address = Address::deserialize(deserializer)?; + + Ok(address.to_fixed_bytes().into()) +} + pub fn uuid_as_ulid(x: &Uuid, s: S) -> Result where S: Serializer, @@ -28,3 +34,9 @@ where // TODO: to_string shouldn't be needed, but i'm still seeing Uuid length x.to_string().serialize(s) } + +pub fn ulid_to_uuid<'de, D: Deserializer<'de>>(deserializer: D) -> Result { + let ulid = Ulid::deserialize(deserializer)?; + + Ok(ulid.into()) +} diff --git a/entities/src/user.rs b/entities/src/user.rs index da9432c9..1cbe7815 100644 --- a/entities/src/user.rs +++ b/entities/src/user.rs @@ -10,7 +10,10 @@ pub struct Model { #[sea_orm(primary_key)] pub id: u64, #[sea_orm(column_type = "Binary(BlobSize::Blob(Some(20)))", unique)] - #[serde(serialize_with = "serialization::vec_as_address")] + #[serde( + serialize_with = "serialization::vec_as_address", + deserialize_with = "serialization::address_to_vec" + )] pub address: Vec, pub description: Option, pub email: Option, diff --git a/latency/Cargo.toml b/latency/Cargo.toml index c1199f38..3d3c8fb8 100644 --- a/latency/Cargo.toml +++ b/latency/Cargo.toml @@ -7,7 +7,7 @@ edition = "2021" [dependencies] log = "0.4.19" -portable-atomic = { version = "1.3.3", features = ["float"] } +portable-atomic = { version = "1.4.0", features = ["float"] } serde = { version = "1.0.171", features = [] } tokio = { version = "1.29.1", features = ["full"] } tracing = "0.1.37" diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index 1d95b568..950326fa 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -67,7 +67,7 @@ listenfd = "1.0.1" mimalloc = { version = "0.1.37", optional = true} moka = { version = "0.11.2", default-features = false, features = ["atomic64", "future", "parking_lot", "quanta", "triomphe"] } nanorand = { version = "0.7.0", default-features = false, features = ["std", "tls", "wyrand"] } -num = { version = "0.4.0" } +num = { version = "0.4.1" } num-traits = "0.2.15" once_cell = { version = "1.18.0" } ordered-float = {version = "3.7.0" } @@ -83,7 +83,7 @@ rust_decimal = { version = "1.30.0", features = ["maths"] } sentry = { version = "0.31.5", default-features = false, features = ["anyhow", "backtrace", "contexts", "panic", "reqwest", "rustls", "serde_json", "tracing"] } sentry-tracing = "0.31.5" serde = { version = "1.0.171" } -serde_json = { version = "1.0.100", default-features = false, features = ["raw_value"] } +serde_json = { version = "1.0.102", default-features = false, features = ["raw_value"] } serde_prometheus = "0.2.3" strum = { version = "0.25.0", features = ["derive"] } time_01 = { package = "time", version = "0.1.45" } diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 7b652616..ec1d5802 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -22,7 +22,7 @@ use crate::rpcs::many::Web3Rpcs; use crate::rpcs::one::Web3Rpc; use crate::rpcs::provider::{connect_http, EthersHttpProvider}; use crate::rpcs::transactions::TxStatus; -use crate::stats::{AppStat, StatBuffer}; +use crate::stats::{AppStat, FlushedStats, StatBuffer}; use anyhow::Context; use axum::http::StatusCode; use chrono::Utc; @@ -50,7 +50,7 @@ use std::str::FromStr; use std::sync::atomic::{AtomicU16, Ordering}; use std::sync::{atomic, Arc}; use std::time::Duration; -use tokio::sync::{broadcast, mpsc, watch, Semaphore, oneshot}; +use tokio::sync::{broadcast, mpsc, oneshot, watch, Semaphore}; use tokio::task::JoinHandle; use tokio::time::timeout; use tracing::{error, info, trace, warn, Level}; @@ -99,7 +99,8 @@ pub struct Web3ProxyApp { /// rate limit anonymous users pub frontend_ip_rate_limiter: Option>, /// rate limit authenticated users - pub frontend_registered_user_rate_limiter: Option>, + pub frontend_registered_user_rate_limiter: + Option>, /// concurrent/parallel request limits for anonymous users pub ip_semaphores: Cache>, pub kafka_producer: Option, @@ -179,8 +180,8 @@ impl Web3ProxyApp { top_config: TopConfig, num_workers: usize, shutdown_sender: broadcast::Sender<()>, - flush_stat_buffer_sender: mpsc::Sender>, - flush_stat_buffer_receiver: mpsc::Receiver>, + flush_stat_buffer_sender: mpsc::Sender>, + flush_stat_buffer_receiver: mpsc::Receiver>, ) -> anyhow::Result { let stat_buffer_shutdown_receiver = shutdown_sender.subscribe(); let mut background_shutdown_receiver = shutdown_sender.subscribe(); @@ -372,10 +373,11 @@ impl Web3ProxyApp { .build(); // TODO: TTL left low, this could also be a solution instead of modifiying the cache, that may be disgusting across threads / slow anyways - let user_balance_cache = CacheBuilder::new(10_000) + let user_balance_cache: UserBalanceCache = CacheBuilder::new(10_000) .name("user_balance") .time_to_live(Duration::from_secs(600)) - .build(); + .build() + .into(); // create a channel for receiving stats // we do this in a channel so we don't slow down our response to the users @@ -434,9 +436,8 @@ impl Web3ProxyApp { // these two rate limiters can share the base limiter // these are deferred rate limiters because we don't want redis network requests on the hot path // TODO: take cache_size from config - frontend_ip_rate_limiter = Some( - DeferredRateLimiter::new(20_000, "ip", rpc_rrl.clone(), None).await, - ); + frontend_ip_rate_limiter = + Some(DeferredRateLimiter::new(20_000, "ip", rpc_rrl.clone(), None).await); frontend_registered_user_rate_limiter = Some(DeferredRateLimiter::new(20_000, "key", rpc_rrl, None).await); } @@ -698,7 +699,9 @@ impl Web3ProxyApp { } pub fn influxdb_client(&self) -> Web3ProxyResult<&influxdb2::Client> { - self.influxdb_client.as_ref().ok_or(Web3ProxyError::NoDatabase) + self.influxdb_client + .as_ref() + .ok_or(Web3ProxyError::NoDatabase) } /// an ethers provider that you can use with ether's abigen. @@ -1140,15 +1143,19 @@ impl Web3ProxyApp { .await { Ok(response_data) => { - request_metadata.error_response.store(false, Ordering::Release); + request_metadata + .error_response + .store(false, Ordering::Release); (StatusCode::OK, response_data) - }, + } Err(err) => { - request_metadata.error_response.store(true, Ordering::Release); + request_metadata + .error_response + .store(true, Ordering::Release); err.as_response_parts() - }, + } }; let response = JsonRpcForwardedResponse::from_response_data(response_data, response_id); @@ -1158,6 +1165,9 @@ impl Web3ProxyApp { let rpcs = request_metadata.backend_rpcs_used(); + // there might be clones in the background, so this isn't a sure thing + let _ = request_metadata.try_send_arc_stat(); + (code, response, rpcs) } @@ -1486,7 +1496,7 @@ impl Web3ProxyApp { Err(Web3ProxyError::NoDatabase) => {}, Err(err) => { warn!( - ?err, + ?err, "unable to save stats for eth_sendRawTransaction", ) } @@ -1620,7 +1630,9 @@ impl Web3ProxyApp { } => { let block_depth = (head_block.number().saturating_sub(*block.num())).as_u64(); - if block_depth < self.config.archive_depth { + if block_depth > self.config.archive_depth { + trace!(%block_depth, archive_depth=%self.config.archive_depth); + request_metadata .archive_request .store(true, atomic::Ordering::Release); @@ -1641,7 +1653,9 @@ impl Web3ProxyApp { } => { let block_depth = (head_block.number().saturating_sub(*from_block.num())).as_u64(); - if block_depth < self.config.archive_depth { + if block_depth > self.config.archive_depth { + trace!(%block_depth, archive_depth=%self.config.archive_depth); + request_metadata .archive_request .store(true, atomic::Ordering::Release); diff --git a/web3_proxy/src/balance.rs b/web3_proxy/src/balance.rs index b8dde440..b9f03a9b 100644 --- a/web3_proxy/src/balance.rs +++ b/web3_proxy/src/balance.rs @@ -8,16 +8,22 @@ use migration::sea_orm::DbConn; use migration::sea_orm::{ColumnTrait, EntityTrait, QueryFilter, QuerySelect}; use migration::{Func, SimpleExpr}; use serde::ser::SerializeStruct; -use serde::Serialize; +use serde::{Deserialize, Serialize}; +use serde_json::json; +use tracing::info; /// Implements the balance getter which combines data from several tables -#[derive(Clone, Debug, Default)] +#[derive(Clone, Debug, Default, Deserialize)] pub struct Balance { pub admin_deposits: Decimal, pub chain_deposits: Decimal, - pub referal_bonus: Decimal, pub one_time_referee_bonus: Decimal, + pub referal_bonus: Decimal, pub stripe_deposits: Decimal, + pub total_cache_misses: u64, + pub total_frontend_requests: u64, + /// this includes credits spent inside a "free" or "downgraded" tier + /// this always increments and so will always be >= total_spent_paid_credits pub total_spent: Decimal, pub total_spent_paid_credits: Decimal, pub user_id: u64, @@ -28,13 +34,15 @@ impl Serialize for Balance { where S: serde::Serializer, { - let mut state = serializer.serialize_struct("balance", 12)?; + let mut state = serializer.serialize_struct("balance", 14)?; state.serialize_field("admin_deposits", &self.admin_deposits)?; state.serialize_field("chain_deposits", &self.chain_deposits)?; - state.serialize_field("referal_bonus", &self.referal_bonus)?; state.serialize_field("one_time_referee_bonus", &self.one_time_referee_bonus)?; + state.serialize_field("referal_bonus", &self.referal_bonus)?; state.serialize_field("stripe_deposits", &self.stripe_deposits)?; + state.serialize_field("total_cache_misses", &self.total_cache_misses)?; + state.serialize_field("total_frontend_requests", &self.total_frontend_requests)?; state.serialize_field("total_spent", &self.total_spent)?; state.serialize_field("total_spent_paid_credits", &self.total_spent_paid_credits)?; state.serialize_field("user_id", &self.user_id)?; @@ -124,30 +132,45 @@ impl Balance { .web3_context("fetching stripe deposits")? .unwrap_or_default(); - let (total_spent_paid_credits, total_spent) = rpc_accounting_v2::Entity::find() - .select_only() - .column_as( - SimpleExpr::from(Func::coalesce([ - rpc_accounting_v2::Column::SumCreditsUsed.sum(), - 0.into(), - ])), - "total_spent_paid_credits", - ) - .column_as( - SimpleExpr::from(Func::coalesce([ - rpc_accounting_v2::Column::SumInclFreeCreditsUsed.sum(), - 0.into(), - ])), - "total_spent", - ) - .inner_join(rpc_key::Entity) - // .filter(rpc_key::Column::Id.eq(rpc_accounting_v2::Column::RpcKeyId)) // TODO: i think the inner_join function handles this - .filter(rpc_key::Column::UserId.eq(user_id)) - .into_tuple() - .one(db_conn) - .await - .web3_context("fetching total_spent_paid_credits and total_spent")? - .unwrap_or_default(); + let (total_cache_misses, total_frontend_requests, total_spent_paid_credits, total_spent) = + rpc_accounting_v2::Entity::find() + .select_only() + .column_as( + SimpleExpr::from(Func::coalesce([ + rpc_accounting_v2::Column::CacheMisses.sum(), + 0.into(), + ])), + "total_cache_misses", + ) + .column_as( + SimpleExpr::from(Func::coalesce([ + rpc_accounting_v2::Column::FrontendRequests.sum(), + 0.into(), + ])), + "total_frontend_requests", + ) + .column_as( + SimpleExpr::from(Func::coalesce([ + rpc_accounting_v2::Column::SumCreditsUsed.sum(), + 0.into(), + ])), + "total_spent_paid_credits", + ) + .column_as( + SimpleExpr::from(Func::coalesce([ + rpc_accounting_v2::Column::SumInclFreeCreditsUsed.sum(), + 0.into(), + ])), + "total_spent", + ) + .inner_join(rpc_key::Entity) + // .filter(rpc_key::Column::Id.eq(rpc_accounting_v2::Column::RpcKeyId)) // TODO: i think the inner_join function handles this + .filter(rpc_key::Column::UserId.eq(user_id)) + .into_tuple::<(Decimal, Decimal, Decimal, Decimal)>() + .one(db_conn) + .await + .web3_context("fetching total_spent_paid_credits and total_spent")? + .unwrap_or_default(); let one_time_referee_bonus = referee::Entity::find() .select_only() @@ -179,17 +202,25 @@ impl Balance { .web3_context("fetching referal bonus")? .unwrap_or_default(); + let total_cache_misses: u64 = total_cache_misses.try_into()?; + let total_frontend_requests: u64 = total_frontend_requests.try_into()?; + let balance = Self { admin_deposits, chain_deposits, referal_bonus, one_time_referee_bonus, stripe_deposits, + total_cache_misses, + total_frontend_requests, total_spent, total_spent_paid_credits, user_id, }; + // TODO: lower log level + info!("balance: {:#}", json!(&balance)); + // Return None if there is no entry Ok(Some(balance)) } diff --git a/web3_proxy/src/caches.rs b/web3_proxy/src/caches.rs index ca67b7a6..ebc1e291 100644 --- a/web3_proxy/src/caches.rs +++ b/web3_proxy/src/caches.rs @@ -1,15 +1,19 @@ use crate::balance::Balance; +use crate::errors::Web3ProxyResult; use crate::frontend::authorization::{AuthorizationChecks, RpcSecretKey}; -use moka::future::Cache; +use derive_more::From; +use entities::rpc_key; +use migration::sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter}; +use moka::future::{Cache, ConcurrentCacheExt}; use std::fmt; use std::net::IpAddr; use std::sync::Arc; use tokio::sync::RwLock as AsyncRwLock; +use tracing::trace; /// Cache data from the database about rpc keys +/// TODO: try Ulid/u128 instead of RpcSecretKey in case my hash method is broken pub type RpcSecretKeyCache = Cache; -/// Cache data from the database about user balances -pub type UserBalanceCache = Cache>>; #[derive(Clone, Copy, Hash, Eq, PartialEq)] pub struct RegisteredUserRateLimitKey(pub u64, pub IpAddr); @@ -19,3 +23,65 @@ impl std::fmt::Display for RegisteredUserRateLimitKey { write!(f, "{}-{}", self.0, self.1) } } + +/// Cache data from the database about user balances +#[derive(Clone, From)] +pub struct UserBalanceCache(pub Cache>>); + +impl UserBalanceCache { + pub async fn get_or_insert( + &self, + db_conn: &DatabaseConnection, + user_id: u64, + ) -> Web3ProxyResult>> { + if user_id == 0 { + return Ok(Arc::new(AsyncRwLock::new(Balance::default()))); + } + + let x = self + .0 + .try_get_with(user_id, async move { + let x = match Balance::try_from_db(db_conn, user_id).await? { + None => Balance { + user_id, + ..Default::default() + }, + Some(x) => x, + }; + trace!(?x, "from database"); + + Ok(Arc::new(AsyncRwLock::new(x))) + }) + .await?; + + Ok(x) + } + + pub async fn invalidate( + &self, + user_id: &u64, + db_conn: &DatabaseConnection, + rpc_secret_key_cache: &RpcSecretKeyCache, + ) -> Web3ProxyResult<()> { + self.0.invalidate(user_id).await; + + trace!(%user_id, "invalidating"); + + // Remove all RPC-keys owned by this user from the cache, s.t. rate limits are re-calculated + let rpc_keys = rpc_key::Entity::find() + .filter(rpc_key::Column::UserId.eq(*user_id)) + .all(db_conn) + .await?; + + for rpc_key_entity in rpc_keys { + let rpc_key_id = rpc_key_entity.id; + let secret_key = rpc_key_entity.secret_key.into(); + + trace!(%user_id, %rpc_key_id, ?secret_key, "invalidating"); + + rpc_secret_key_cache.invalidate(&secret_key).await; + } + + Ok(()) + } +} diff --git a/web3_proxy/src/compute_units.rs b/web3_proxy/src/compute_units.rs index 9644e663..b925043f 100644 --- a/web3_proxy/src/compute_units.rs +++ b/web3_proxy/src/compute_units.rs @@ -8,12 +8,23 @@ use migration::sea_orm::prelude::Decimal; use std::str::FromStr; -use tracing::warn; +use tracing::{instrument, trace, warn}; +pub fn default_usd_per_cu(chain_id: u64) -> Decimal { + match chain_id { + // TODO: only include if `cfg(test)`? + 999_001_999 => Decimal::from_str("0.10").unwrap(), + 137 => Decimal::from_str("0.000000533333333333333").unwrap(), + _ => Decimal::from_str("0.000000400000000000000").unwrap(), + } +} + +#[derive(Debug)] pub struct ComputeUnit(Decimal); impl ComputeUnit { /// costs can vary widely depending on method and chain + #[instrument(level = "trace")] pub fn new(method: &str, chain_id: u64, response_bytes: u64) -> Self { // TODO: this works, but this is fragile. think of a better way to check the method is a subscription if method.ends_with(')') { @@ -123,11 +134,14 @@ impl ComputeUnit { let cu = Decimal::from(cu); + trace!(%cu); + Self(cu) } /// notifications and subscription responses cost per-byte - pub fn subscription_response>(num_bytes: D) -> Self { + #[instrument(level = "trace")] + pub fn subscription_response + std::fmt::Debug>(num_bytes: D) -> Self { let cu = num_bytes.into() * Decimal::new(4, 2); Self(cu) @@ -141,28 +155,40 @@ impl ComputeUnit { /// Compute cost per request /// All methods cost the same /// The number of bytes are based on input, and output bytes + #[instrument(level = "trace")] pub fn cost( &self, archive_request: bool, cache_hit: bool, error_response: bool, - usd_per_cu: Decimal, + usd_per_cu: &Decimal, ) -> Decimal { if error_response { + trace!("error responses are free"); return 0.into(); } let mut cost = self.0 * usd_per_cu; + trace!(%cost, "base"); + if archive_request { + // TODO: get from config cost *= Decimal::from_str("2.5").unwrap(); + + trace!(%cost, "archive_request"); } - // cache hits get a 25% discount if cache_hit { - cost *= Decimal::from_str("0.75").unwrap() + // cache hits get a 25% discount + // TODO: get from config + cost *= Decimal::from_str("0.75").unwrap(); + + trace!(%cost, "cache_hit"); } + trace!(%cost, "final"); + cost } } diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index a5ae2d71..56bdc17f 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -5,6 +5,7 @@ use argh::FromArgs; use ethers::prelude::{Address, TxHash}; use ethers::types::{U256, U64}; use hashbrown::HashMap; +use migration::sea_orm::prelude::Decimal; use migration::sea_orm::DatabaseConnection; use sentry::types::Dsn; use serde::Deserialize; @@ -170,6 +171,8 @@ pub struct AppConfig { /// Stripe api key for checking validity of webhooks pub stripe_whsec_key: Option, + pub usd_per_cu: Option, + /// Track rate limits in a redis (or compatible backend) /// It is okay if this data is lost. pub volatile_redis_url: Option, diff --git a/web3_proxy/src/frontend/admin.rs b/web3_proxy/src/frontend/admin.rs index b75e6162..286fb63f 100644 --- a/web3_proxy/src/frontend/admin.rs +++ b/web3_proxy/src/frontend/admin.rs @@ -17,8 +17,7 @@ use axum_client_ip::InsecureClientIp; use axum_macros::debug_handler; use chrono::{TimeZone, Utc}; use entities::{ - admin, admin_increase_balance_receipt, admin_trail, login, pending_login, rpc_key, - user, + admin, admin_increase_balance_receipt, admin_trail, login, pending_login, rpc_key, user, }; use ethers::{prelude::Address, types::Bytes}; use hashbrown::HashMap; @@ -85,7 +84,13 @@ pub async fn admin_increase_balance( txn.commit().await?; // Invalidate the user_balance_cache for this user: - app.user_balance_cache.invalidate(&user_entry.id).await; + if let Err(err) = app + .user_balance_cache + .invalidate(&user_entry.id, app.db_conn()?, &app.rpc_secret_key_cache) + .await + { + warn!(?err, "unable to invalidate caches"); + }; let out = json!({ "user": payload.user_address, @@ -327,7 +332,7 @@ pub async fn admin_imitate_login_post( let db_replica = app.db_replica()?; let user_pending_login = pending_login::Entity::find() - .filter(pending_login::Column::Nonce.eq(Uuid::from(login_nonce.clone()))) + .filter(pending_login::Column::Nonce.eq(Uuid::from(login_nonce))) .one(db_replica.as_ref()) .await .web3_context("database error while finding pending_login")? diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index 12315077..9e96c2e6 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -8,7 +8,7 @@ use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult}; use crate::jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest}; use crate::rpcs::blockchain::Web3ProxyBlock; use crate::rpcs::one::Web3Rpc; -use crate::stats::{AppStat, BackendRequests, RpcQueryStats}; +use crate::stats::{AppStat, BackendRequests}; use crate::user_token::UserBearerToken; use anyhow::Context; use axum::headers::authorization::Bearer; @@ -27,6 +27,7 @@ use http::HeaderValue; use ipnet::IpNet; use migration::sea_orm::prelude::Decimal; use migration::sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter}; +use parking_lot::Mutex; use rdkafka::message::{Header as KafkaHeader, OwnedHeaders as KafkaOwnedHeaders, OwnedMessage}; use rdkafka::producer::{FutureProducer, FutureRecord}; use rdkafka::util::Timeout as KafkaTimeout; @@ -34,6 +35,7 @@ use redis_rate_limiter::redis::AsyncCommands; use redis_rate_limiter::RedisRateLimitResult; use serde::{Deserialize, Serialize}; use std::borrow::Cow; +use std::fmt::Debug; use std::fmt::Display; use std::hash::{Hash, Hasher}; use std::mem; @@ -45,18 +47,52 @@ use tokio::sync::RwLock as AsyncRwLock; use tokio::sync::{mpsc, OwnedSemaphorePermit, Semaphore}; use tokio::task::JoinHandle; use tokio::time::Instant; -use tracing::{error, trace, warn}; +use tracing::{error, info, trace, warn}; use ulid::Ulid; use uuid::Uuid; /// This lets us use UUID and ULID while we transition to only ULIDs /// TODO: custom deserialize that can also go from String to Ulid -#[derive(Copy, Clone, Debug, Deserialize, Eq, PartialEq)] +#[derive(Copy, Clone, Deserialize)] pub enum RpcSecretKey { Ulid(Ulid), Uuid(Uuid), } +impl RpcSecretKey { + pub fn new() -> Self { + Ulid::new().into() + } + + fn as_128(&self) -> u128 { + match self { + Self::Ulid(x) => x.0, + Self::Uuid(x) => x.as_u128(), + } + } +} + +impl PartialEq for RpcSecretKey { + fn eq(&self, other: &Self) -> bool { + self.as_128() == other.as_128() + } +} + +impl Eq for RpcSecretKey {} + +impl Debug for RpcSecretKey { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Ulid(x) => Debug::fmt(x, f), + Self::Uuid(x) => { + let x = Ulid::from(x.as_u128()); + + Debug::fmt(&x, f) + } + } + } +} + /// always serialize as a ULID. impl Serialize for RpcSecretKey { fn serialize(&self, serializer: S) -> Result @@ -127,6 +163,10 @@ pub struct AuthorizationChecks { /// IMPORTANT! Once confirmed by a miner, they will be public on the blockchain! pub private_txs: bool, pub proxy_mode: ProxyMode, + /// if the account had premium when this request metadata was created + /// they might spend slightly more than they've paid, but we are okay with that + /// TODO: we could price the request now and if its too high, downgrade. but thats more complex than we need + pub paid_credits_used: bool, } /// TODO: include the authorization checks in this? @@ -153,10 +193,7 @@ pub struct KafkaDebugLogger { /// Ulids and Uuids matching the same bits hash the same impl Hash for RpcSecretKey { fn hash(&self, state: &mut H) { - let x = match self { - Self::Ulid(x) => x.0, - Self::Uuid(x) => x.as_u128(), - }; + let x = self.as_128(); x.hash(state); } @@ -308,6 +345,8 @@ pub struct RequestMetadata { pub chain_id: u64, + pub usd_per_cu: Decimal, + pub request_ulid: Ulid, /// Size of the JSON request. Does not include headers or things like that. @@ -362,17 +401,6 @@ impl RequestMetadata { .map(|x| x.checks.proxy_mode) .unwrap_or_default() } - - /// this may drift slightly if multiple servers are handling the same users, but should be close - pub async fn latest_balance(&self) -> Option { - if let Some(x) = self.authorization.as_ref() { - let x = x.checks.latest_balance.read().await.remaining(); - - Some(x) - } else { - None - } - } } #[derive(From)] @@ -482,11 +510,13 @@ impl RequestMetadata { } } + let chain_id = app.config.chain_id; + let x = Self { archive_request: false.into(), authorization: Some(authorization), backend_requests: Default::default(), - chain_id: app.config.chain_id, + chain_id, error_response: false.into(), kafka_debug_logger, method, @@ -499,6 +529,7 @@ impl RequestMetadata { response_timestamp: 0.into(), start_instant: Instant::now(), stat_sender: app.stat_sender.clone(), + usd_per_cu: app.config.usd_per_cu.unwrap_or_default(), user_error_response: false.into(), }; @@ -509,13 +540,11 @@ impl RequestMetadata { self.backend_requests.lock().clone() } - pub fn try_send_stat(mut self) -> Web3ProxyResult> { + pub fn try_send_stat(mut self) -> Web3ProxyResult<()> { if let Some(stat_sender) = self.stat_sender.take() { trace!(?self, "sending stat"); - let stat: RpcQueryStats = self.try_into()?; - - let stat: AppStat = stat.into(); + let stat: AppStat = self.into(); if let Err(err) = stat_sender.send(stat) { error!(?err, "failed sending stat"); @@ -524,11 +553,9 @@ impl RequestMetadata { }; trace!("stat sent successfully"); - - Ok(None) - } else { - Ok(Some(self)) } + + Ok(()) } pub fn add_response<'a, R: Into>>(&'a self, response: R) { @@ -556,18 +583,12 @@ impl RequestMetadata { } } - pub fn try_send_arc_stat(self: Arc) -> anyhow::Result>> { - match Arc::try_unwrap(self) { - Ok(x) => { - let not_sent = x.try_send_stat()?.map(Arc::new); - Ok(not_sent) - } - Err(not_sent) => { - trace!( - "could not send stat while {} arcs are active", - Arc::strong_count(¬_sent) - ); - Ok(Some(not_sent)) + pub fn try_send_arc_stat(self: Arc) -> Web3ProxyResult<()> { + match Arc::into_inner(self) { + Some(x) => x.try_send_stat(), + None => { + trace!("could not send stat while other arcs are active"); + Ok(()) } } } @@ -582,18 +603,12 @@ impl Drop for RequestMetadata { // turn `&mut self` into `self` let x = mem::take(self); - trace!(?self, "request metadata dropped without stat send"); + trace!(?x, "request metadata dropped without stat send"); let _ = x.try_send_stat(); } } } -impl RpcSecretKey { - pub fn new() -> Self { - Ulid::new().into() - } -} - impl Default for RpcSecretKey { fn default() -> Self { Self::new() @@ -605,7 +620,7 @@ impl Display for RpcSecretKey { // TODO: do this without dereferencing let ulid: Ulid = (*self).into(); - ulid.fmt(f) + Display::fmt(&ulid, f) } } @@ -1126,37 +1141,25 @@ impl Web3ProxyApp { } } - /// Get the balance for the user. - /// - /// If a subuser calls this function, the subuser needs to have first attained the user_id that the rpc key belongs to. - /// This function should be called anywhere where balance is required (i.e. only rpc calls, I believe ... non-rpc calls don't really require balance) - pub(crate) async fn balance_checks( - &self, - user_id: u64, - ) -> Web3ProxyResult>> { - self.user_balance_cache - .try_get_with(user_id, async move { - let db_replica = self.db_replica()?; - let x = match Balance::try_from_db(db_replica.as_ref(), user_id).await? { - None => Err(Web3ProxyError::InvalidUserKey), - Some(x) => Ok(x), - }?; - trace!("Balance for cache retrieved from database is {:?}", x); - Ok(Arc::new(AsyncRwLock::new(x))) - }) - .await - .map_err(Into::into) - } - // check the local cache for user data, or query the database pub(crate) async fn authorization_checks( &self, proxy_mode: ProxyMode, rpc_secret_key: &RpcSecretKey, ) -> Web3ProxyResult { - self.rpc_secret_key_cache + // TODO: move onto a helper function + + let fresh = Arc::new(Mutex::new(false)); + + let fresh_clone = fresh.clone(); + + let x = self + .rpc_secret_key_cache .try_get_with_by_ref(rpc_secret_key, async move { - // trace!(?rpc_secret_key, "user cache miss"); + { + let mut f = fresh.lock_arc(); + *f = true; + } let db_replica = self.db_replica()?; @@ -1248,16 +1251,24 @@ impl Web3ProxyApp { "related user tier not found, but every user should have a tier", )?; - let latest_balance = self.balance_checks(rpc_key_model.user_id).await?; + let latest_balance = self + .user_balance_cache + .get_or_insert(db_replica.as_ref(), rpc_key_model.user_id) + .await?; - // TODO: Do the logic here, as to how to treat the user, based on balance and initial check - // Clear the cache (not the login!) in the stats if a tier-change happens (clear, but don't modify roles) + let paid_credits_used: bool; if let Some(downgrade_user_tier) = user_tier_model.downgrade_tier_id { + trace!("user belongs to a premium tier. checking balance"); + let active_premium = latest_balance.read().await.active_premium(); // only consider the user premium if they have paid at least $10 and have a balance > $.01 // otherwise, set user_tier_model to the downograded tier - if !active_premium { + if active_premium { + paid_credits_used = true; + } else { + paid_credits_used = false; + // TODO: include boolean to mark that the user is downgraded user_tier_model = user_tier::Entity::find_by_id(downgrade_user_tier) @@ -1268,6 +1279,8 @@ impl Web3ProxyApp { downgrade_user_tier ))?; } + } else { + paid_credits_used = false; } let rpc_key_id = @@ -1289,13 +1302,21 @@ impl Web3ProxyApp { rpc_secret_key: Some(*rpc_secret_key), rpc_secret_key_id: rpc_key_id, user_id: rpc_key_model.user_id, + paid_credits_used, }) } None => Ok(AuthorizationChecks::default()), } }) - .await - .map_err(Into::into) + .await?; + + if *fresh_clone.lock() { + info!(?rpc_secret_key, "authorization_checks miss"); + } else { + info!(?rpc_secret_key, "authorization_checks hit"); + } + + Ok(x) } /// Authorized the ip/origin/referer/useragent and rate limit and concurrency diff --git a/web3_proxy/src/frontend/rpc_proxy_http.rs b/web3_proxy/src/frontend/rpc_proxy_http.rs index e9598ec3..5118f507 100644 --- a/web3_proxy/src/frontend/rpc_proxy_http.rs +++ b/web3_proxy/src/frontend/rpc_proxy_http.rs @@ -76,7 +76,7 @@ async fn _proxy_web3_rpc( let mut response = (status_code, Json(response)).into_response(); - // TODO: DRY this up. same for public and private queries + // TODO: DRY this up. it is the same code for public and private queries let response_headers = response.headers_mut(); // TODO: this might be slow. think about this more diff --git a/web3_proxy/src/frontend/status.rs b/web3_proxy/src/frontend/status.rs index 06aa4c53..6ef5d8c5 100644 --- a/web3_proxy/src/frontend/status.rs +++ b/web3_proxy/src/frontend/status.rs @@ -199,7 +199,7 @@ async fn _status(app: Arc) -> (StatusCode, &'static str, Bytes) { MokaCacheSerializer(&app.ip_semaphores), MokaCacheSerializer(&app.jsonrpc_response_cache), MokaCacheSerializer(&app.rpc_secret_key_cache), - MokaCacheSerializer(&app.user_balance_cache), + MokaCacheSerializer(&app.user_balance_cache.0), MokaCacheSerializer(&app.user_semaphores), ], "chain_id": app.config.chain_id, diff --git a/web3_proxy/src/frontend/users/authentication.rs b/web3_proxy/src/frontend/users/authentication.rs index 58dc2a57..27c42b83 100644 --- a/web3_proxy/src/frontend/users/authentication.rs +++ b/web3_proxy/src/frontend/users/authentication.rs @@ -22,8 +22,8 @@ use migration::sea_orm::{ QueryFilter, TransactionTrait, }; use serde::{Deserialize, Serialize}; -use serde_json::{json, Value}; use siwe::{Message, VerificationOpts}; +use std::collections::BTreeMap; use std::ops::Add; use std::str::FromStr; use std::sync::Arc; @@ -49,14 +49,11 @@ pub struct PostLogin { pub referral_code: Option, } -/// TODO: use this type in the frontend -#[derive(Debug, Deserialize)] +#[derive(Debug, Deserialize, Serialize)] pub struct LoginPostResponse { - pub bearer_token: Ulid, - pub rpc_keys: Value, - /// unknown data gets put here - #[serde(flatten, default = "HashMap::default")] - pub extra: HashMap, + pub bearer_token: UserBearerToken, + pub rpc_keys: BTreeMap, + pub user: user::Model, } /// `GET /user/login/:user_address` or `GET /user/login/:user_address/:message_eip` -- Start the "Sign In with Ethereum" (siwe) login flow. @@ -267,7 +264,7 @@ pub async fn user_login_post( let db_replica = app.db_replica()?; let user_pending_login = pending_login::Entity::find() - .filter(pending_login::Column::Nonce.eq(Uuid::from(login_nonce.clone()))) + .filter(pending_login::Column::Nonce.eq(Uuid::from(login_nonce))) .one(db_replica.as_ref()) .await .web3_context("database error while finding pending_login")? @@ -325,7 +322,7 @@ pub async fn user_login_post( trace!(?payload.referral_code); if let Some(referral_code) = payload.referral_code.as_ref() { // If it is not inside, also check in the database - trace!("Using register referral code: {:?}", referral_code); + trace!("Using register referral code: {:?}", referral_code); let user_referrer = referrer::Entity::find() .filter(referrer::Column::ReferralCode.eq(referral_code)) .one(&txn) @@ -394,19 +391,6 @@ pub async fn user_login_post( // create a bearer token for the user. let user_bearer_token = UserBearerToken::default(); - // json response with everything in it - // we could return just the bearer token, but I think they will always request api keys and the user profile - let response_json = json!({ - "rpc_keys": user_rpc_keys - .into_iter() - .map(|user_rpc_key| (user_rpc_key.id, user_rpc_key)) - .collect::>(), - "bearer_token": user_bearer_token, - "user": caller, - }); - - let response = (status_code, Json(response_json)).into_response(); - // add bearer to the database // expire in 4 weeks @@ -431,6 +415,19 @@ pub async fn user_login_post( error!("Failed to delete nonce:{}: {}", login_nonce, err); } + // json response with everything in it + // we could return just the bearer token, but I think they will always request api keys and the user profile + let response_json = LoginPostResponse { + rpc_keys: user_rpc_keys + .into_iter() + .map(|user_rpc_key| (user_rpc_key.id, user_rpc_key)) + .collect(), + bearer_token: user_bearer_token, + user: caller, + }; + + let response = (status_code, Json(response_json)).into_response(); + Ok(response) } diff --git a/web3_proxy/src/frontend/users/payment.rs b/web3_proxy/src/frontend/users/payment.rs index 0e3e7c77..97173489 100644 --- a/web3_proxy/src/frontend/users/payment.rs +++ b/web3_proxy/src/frontend/users/payment.rs @@ -15,7 +15,7 @@ use axum::{ use axum_client_ip::InsecureClientIp; use axum_macros::debug_handler; use entities::{ - admin_increase_balance_receipt, increase_on_chain_balance_receipt, rpc_key, + admin_increase_balance_receipt, increase_on_chain_balance_receipt, stripe_increase_balance_receipt, user, }; use ethers::abi::AbiEncode; @@ -30,7 +30,7 @@ use payment_contracts::ierc20::IERC20; use payment_contracts::payment_factory::{self, PaymentFactory}; use serde_json::json; use std::sync::Arc; -use tracing::{debug, info, trace}; +use tracing::{debug, info, trace, warn}; /// Implements any logic related to payments /// Removed this mainly from "user" as this was getting clogged @@ -304,6 +304,7 @@ pub async fn user_balance_post( // the transaction might contain multiple relevant logs. collect them all let mut response_data = vec![]; + let mut user_ids_to_invalidate = HashSet::new(); for log in transaction_receipt.logs { if let Some(true) = log.removed { // TODO: do we need to make sure this row is deleted? it should be handled by `handle_uncle_block` @@ -390,19 +391,7 @@ pub async fn user_balance_post( receipt.save(&txn).await?; - // Remove all RPC-keys owned by this user from the cache, s.t. rate limits are re-calculated - let rpc_keys = rpc_key::Entity::find() - .filter(rpc_key::Column::UserId.eq(recipient.id)) - .all(&txn) - .await?; - - app.user_balance_cache.invalidate(&recipient.id).await; - - for rpc_key_entity in rpc_keys { - app.rpc_secret_key_cache - .invalidate(&rpc_key_entity.secret_key.into()) - .await; - } + user_ids_to_invalidate.insert(recipient.id); let x = json!({ "amount": payment_token_amount, @@ -421,6 +410,17 @@ pub async fn user_balance_post( txn.commit().await?; + for user_id in user_ids_to_invalidate.into_iter() { + // Finally invalidate the cache as well + if let Err(err) = app + .user_balance_cache + .invalidate(&user_id, db_conn, &app.rpc_secret_key_cache) + .await + { + warn!(?err, "unable to invalidate caches"); + }; + } + let response = (StatusCode::CREATED, Json(json!(response_data))).into_response(); Ok(response) diff --git a/web3_proxy/src/frontend/users/payment_stripe.rs b/web3_proxy/src/frontend/users/payment_stripe.rs index f7f9bc9c..9a4dcd28 100644 --- a/web3_proxy/src/frontend/users/payment_stripe.rs +++ b/web3_proxy/src/frontend/users/payment_stripe.rs @@ -7,7 +7,7 @@ use axum::{ Extension, Json, TypedHeader, }; use axum_macros::debug_handler; -use entities::{rpc_key, stripe_increase_balance_receipt, user}; +use entities::{stripe_increase_balance_receipt, user}; use ethers::types::Address; use http::HeaderMap; use migration::sea_orm::prelude::Decimal; @@ -17,7 +17,7 @@ use migration::sea_orm::{ use serde_json::json; use std::sync::Arc; use stripe::Webhook; -use tracing::{debug, error}; +use tracing::{debug, error, warn}; /// `GET /user/balance/stripe` -- Use a bearer token to get the user's balance and spend. /// @@ -163,20 +163,16 @@ pub async fn user_balance_stripe_post( Some(recipient) => { let _ = insert_receipt_model.save(&txn).await; - let recipient_rpc_keys = rpc_key::Entity::find() - .filter(rpc_key::Column::UserId.eq(recipient.id)) - .all(&txn) - .await?; - txn.commit().await?; // Finally invalidate the cache as well - app.user_balance_cache.invalidate(&recipient.id).await; - for rpc_key_entity in recipient_rpc_keys { - app.rpc_secret_key_cache - .invalidate(&rpc_key_entity.secret_key.into()) - .await; - } + if let Err(err) = app + .user_balance_cache + .invalidate(&recipient.id, db_conn, &app.rpc_secret_key_cache) + .await + { + warn!(?err, "unable to invalidate caches"); + }; } None => { return Err(Web3ProxyError::BadResponse( diff --git a/web3_proxy/src/lib.rs b/web3_proxy/src/lib.rs index 8d00b691..d4744dea 100644 --- a/web3_proxy/src/lib.rs +++ b/web3_proxy/src/lib.rs @@ -1,5 +1,6 @@ #![feature(let_chains)] #![feature(trait_alias)] +#![forbid(unsafe_code)] pub mod admin_queries; pub mod app; diff --git a/web3_proxy/src/response_cache.rs b/web3_proxy/src/response_cache.rs index aa0d11be..01459188 100644 --- a/web3_proxy/src/response_cache.rs +++ b/web3_proxy/src/response_cache.rs @@ -1,10 +1,8 @@ -use crate::{ - block_number::BlockNumAndHash, errors::Web3ProxyError, jsonrpc::JsonRpcErrorData, -}; +use crate::{block_number::BlockNumAndHash, errors::Web3ProxyError, jsonrpc::JsonRpcErrorData}; use derive_more::From; use ethers::{ providers::{HttpClientError, JsonRpcError, ProviderError, WsClientError}, - types::{U64}, + types::U64, }; use hashbrown::hash_map::DefaultHashBuilder; use moka::future::Cache; diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index 775fcdba..c165028a 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -14,14 +14,13 @@ use itertools::{Itertools, MinMaxResult}; use moka::future::Cache; use serde::Serialize; use std::cmp::{Ordering, Reverse}; -use std::fmt; use std::sync::{atomic, Arc}; use std::time::Duration; use tokio::sync::broadcast; use tokio::time::Instant; use tracing::{debug, enabled, info, trace, warn, Level}; -#[derive(Clone, Serialize)] +#[derive(Clone, Debug, Serialize)] struct ConsensusRpcData { head_block_num: U64, // TODO: this is too simple. erigon has 4 prune levels (hrct) @@ -98,7 +97,7 @@ pub enum ShouldWaitForBlock { /// TODO: remove head_block/head_rpcs/tier and replace with one RankedRpcMap /// TODO: add `best_rpc(method_data_kind, min_block_needed, max_block_needed, include_backups)` /// TODO: make serializing work. the key needs to be a string. I think we need `serialize_with` -#[derive(Clone, Serialize)] +#[derive(Clone, Debug, Serialize)] pub struct RankedRpcs { pub head_block: Web3ProxyBlock, pub num_synced: usize, @@ -212,9 +211,9 @@ impl RankedRpcs { self.inner.is_empty() } + /// TODO! we should also keep the number on the head block saved #[inline] - pub fn num_consensus_rpcs(&self) -> usize { - // TODO! wrong! this is now the total num of rpcs. we should keep the number on the head block saved + pub fn num_active_rpcs(&self) -> usize { self.inner.len() } @@ -340,13 +339,6 @@ impl RankedRpcs { // TODO: sum_hard_limit? } -impl fmt::Debug for RankedRpcs { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - // TODO: the default formatter takes forever to write. we need to print something though - f.debug_struct("RankedRpcs").finish_non_exhaustive() - } -} - // TODO: refs for all of these. borrow on a Sender is cheap enough impl Web3Rpcs { pub fn head_block(&self) -> Option { @@ -435,7 +427,7 @@ impl ConsensusFinder { rpc: Option<&Arc>, new_block: Option, ) -> Web3ProxyResult { - let new_consensus_rpcs = match self + let new_ranked_rpcs = match self .find_consensus_connections(authorization, web3_rpcs) .await .web3_context("error while finding consensus head block!")? @@ -444,23 +436,23 @@ impl ConsensusFinder { Some(x) => x, }; - trace!(?new_consensus_rpcs); + trace!(?new_ranked_rpcs); let watch_consensus_head_sender = web3_rpcs.watch_head_block.as_ref().unwrap(); // TODO: think more about the default for tiers let best_tier = self.best_tier().unwrap_or_default(); let worst_tier = self.worst_tier().unwrap_or_default(); - let backups_needed = new_consensus_rpcs.backups_needed; - let consensus_head_block = new_consensus_rpcs.head_block.clone(); - let num_consensus_rpcs = new_consensus_rpcs.num_consensus_rpcs(); + let backups_needed = new_ranked_rpcs.backups_needed; + let consensus_head_block = new_ranked_rpcs.head_block.clone(); + let num_consensus_rpcs = new_ranked_rpcs.num_active_rpcs(); let num_active_rpcs = self.len(); let total_rpcs = web3_rpcs.len(); - let new_consensus_rpcs = Arc::new(new_consensus_rpcs); + let new_ranked_rpcs = Arc::new(new_ranked_rpcs); - let old_consensus_head_connections = web3_rpcs + let old_ranked_rpcs = web3_rpcs .watch_ranked_rpcs - .send_replace(Some(new_consensus_rpcs.clone())); + .send_replace(Some(new_ranked_rpcs.clone())); let backups_voted_str = if backups_needed { "B " } else { "" }; @@ -476,7 +468,7 @@ impl ConsensusFinder { "None".to_string() }; - match old_consensus_head_connections.as_ref() { + match old_ranked_rpcs.as_ref() { None => { info!( "first {}/{} {}{}/{}/{} block={}, rpc={}", diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index 675b33cb..a582864e 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -1267,6 +1267,22 @@ impl fmt::Debug for Web3Rpc { f.field("blocks", &block_data_limit); } + f.field("backup", &self.backup); + + f.field("tier", &self.tier.load(atomic::Ordering::Relaxed)); + + f.field("weighted_ms", &self.weighted_peak_latency().as_millis()); + + if let Some(head_block_watch) = self.head_block.as_ref() { + if let Some(head_block) = head_block_watch.borrow().as_ref() { + f.field("head_num", head_block.number()); + f.field("head_hash", head_block.hash()); + } else { + f.field("head_num", &None::<()>); + f.field("head_hash", &None::<()>); + } + } + f.finish_non_exhaustive() } } diff --git a/web3_proxy/src/stats/influxdb_queries.rs b/web3_proxy/src/stats/influxdb_queries.rs index be6e3cf4..3f2a3c7a 100644 --- a/web3_proxy/src/stats/influxdb_queries.rs +++ b/web3_proxy/src/stats/influxdb_queries.rs @@ -1,5 +1,4 @@ use super::StatType; -use crate::balance::Balance; use crate::errors::Web3ProxyErrorContext; use crate::{ app::Web3ProxyApp, @@ -16,7 +15,7 @@ use axum::{ Json, TypedHeader, }; use entities::sea_orm_active_enums::Role; -use entities::{rpc_key, secondary_user, user, user_tier}; +use entities::{rpc_key, secondary_user}; use fstrings::{f, format_args_f}; use hashbrown::HashMap; use influxdb2::api::query::FluxRecord; @@ -59,40 +58,6 @@ pub async fn query_user_stats<'a>( // Only allow stats if the user has an active premium role if let Some(caller_user) = &caller_user { - // get the balance of the user whose stats we are going to fetch (might be self, but might be another user) - let balance = match Balance::try_from_db(db_replica.as_ref(), user_id).await? { - None => { - return Err(Web3ProxyError::AccessDenied( - "User Stats Response requires you to authorize with a bearer token".into(), - )); - } - Some(x) => x, - }; - - // get the user tier so we can see if it is a tier that has downgrades - let relevant_balance_user_tier_id = if user_id == caller_user.id { - // use the caller's tier - caller_user.user_tier_id - } else { - // use the tier of the primary user from a secondary user - let user = user::Entity::find_by_id(user_id) - .one(db_replica.as_ref()) - .await? - .web3_context("user_id not found")?; - - user.user_tier_id - }; - - let user_tier = user_tier::Entity::find_by_id(relevant_balance_user_tier_id) - .one(db_replica.as_ref()) - .await? - .web3_context("user_tier not found")?; - - if user_tier.downgrade_tier_id.is_some() && !balance.active_premium() { - trace!(%user_id, "User does not have enough balance to qualify for premium"); - return Err(Web3ProxyError::PaymentRequired); - } - if user_id != caller_user.id { // check that there is at least on rpc-keys owned by the requested user and related to the caller user let user_rpc_key_ids: Vec = rpc_key::Entity::find() diff --git a/web3_proxy/src/stats/mod.rs b/web3_proxy/src/stats/mod.rs index dda9ce7c..301fa453 100644 --- a/web3_proxy/src/stats/mod.rs +++ b/web3_proxy/src/stats/mod.rs @@ -15,27 +15,23 @@ use anyhow::{anyhow, Context}; use axum::headers::Origin; use chrono::{DateTime, Months, TimeZone, Utc}; use derive_more::From; -use entities::{referee, referrer, rpc_accounting_v2, rpc_key}; +use entities::{referee, referrer, rpc_accounting_v2}; use influxdb2::models::DataPoint; use migration::sea_orm::prelude::Decimal; use migration::sea_orm::{ - self, ActiveModelTrait, ColumnTrait, DatabaseConnection, DbConn, EntityTrait, IntoActiveModel, - QueryFilter, TransactionTrait, + self, ActiveModelTrait, ColumnTrait, DatabaseConnection, EntityTrait, IntoActiveModel, + QueryFilter, QuerySelect, TransactionTrait, }; -use migration::{Expr, OnConflict}; +use migration::{Expr, LockType, OnConflict}; use num_traits::ToPrimitive; use parking_lot::Mutex; use std::borrow::Cow; -use std::default::Default; use std::mem; use std::num::NonZeroU64; -use std::str::FromStr; use std::sync::atomic::Ordering; use std::sync::Arc; -use tokio::sync::RwLock as AsyncRwLock; -use tracing::{error, trace}; +use tracing::{error, instrument, trace, warn}; -use crate::balance::Balance; pub use stat_buffer::{SpawnedStatBuffer, StatBuffer}; #[derive(Debug, PartialEq, Eq)] @@ -46,6 +42,12 @@ pub enum StatType { pub type BackendRequests = Mutex>>; +#[derive(Copy, Clone, Debug)] +pub struct FlushedStats { + pub relational: usize, + pub timeseries: usize, +} + /// TODO: better name? RpcQueryStatBuilder? #[derive(Clone, Debug)] pub struct RpcQueryStats { @@ -66,6 +68,8 @@ pub struct RpcQueryStats { pub compute_unit_cost: Decimal, /// If the request is invalid or received a jsonrpc error response (excluding reverts) pub user_error_response: bool, + /// If premium was active at the start of the request + pub paid_credits_used: bool, } #[derive(Clone, Debug, From, Hash, PartialEq, Eq)] @@ -103,6 +107,7 @@ impl RpcQueryStats { fn accounting_key(&self, period_seconds: i64) -> RpcQueryKey { let response_timestamp = round_timestamp(self.response_timestamp, period_seconds); + // TODO: change this to use 0 for anonymous queries let rpc_secret_key_id = self.authorization.checks.rpc_secret_key_id; let method = self.method.clone(); @@ -151,6 +156,10 @@ impl RpcQueryStats { /// stats for a single key fn owned_timeseries_key(&self) -> Option { + if !self.paid_credits_used { + return None; + } + // we don't store origin in the timeseries db. its only optionaly used for accounting let origin = None; @@ -184,12 +193,13 @@ impl RpcQueryStats { /// For now there is just one, but I think there might be others later #[derive(Debug, From)] pub enum AppStat { - RpcQuery(RpcQueryStats), + RpcQuery(RequestMetadata), } // TODO: move to stat_buffer.rs? impl BufferedRpcQueryStats { - async fn add(&mut self, stat: RpcQueryStats) { + #[instrument(level = "trace")] + async fn add(&mut self, stat: RpcQueryStats, approximate_balance_remaining: Decimal) { // a stat always come from just 1 frontend request self.frontend_requests += 1; @@ -212,8 +222,13 @@ impl BufferedRpcQueryStats { self.sum_response_millis += stat.response_millis; self.sum_credits_used += stat.compute_unit_cost; - let latest_balance = stat.authorization.checks.latest_balance.read().await; - self.approximate_latest_balance_for_influx = latest_balance.clone(); + if stat.authorization.checks.paid_credits_used { + self.paid_credits_used += stat.compute_unit_cost; + } + + self.approximate_balance_remaining = approximate_balance_remaining; + + trace!("added"); } async fn _save_db_stats( @@ -221,20 +236,9 @@ impl BufferedRpcQueryStats { chain_id: u64, db_conn: &DatabaseConnection, key: &RpcQueryKey, - active_premium: bool, - ) -> Web3ProxyResult { + ) -> Web3ProxyResult<()> { let period_datetime = Utc.timestamp_opt(key.response_timestamp, 0).unwrap(); - // // Because reading the balance and updating the stats here is not atomically locked, this may lead to a negative balance - // // This negative balance shouldn't be large tough - // // TODO: I'm not so sure about this. @david can you explain more? if someone spends over their balance, they **should** go slightly negative. after all, they would have received the premium limits for these queries - // // sum_credits_used is definitely correct. the balance can be slightly off. so it seems like we should trust sum_credits_used over balance - let paid_credits_used = if active_premium { - self.sum_credits_used - } else { - 0.into() - }; - // =============================== // // UPDATE STATISTICS // // =============================== // @@ -254,7 +258,7 @@ impl BufferedRpcQueryStats { sum_request_bytes: sea_orm::Set(self.sum_request_bytes), sum_response_millis: sea_orm::Set(self.sum_response_millis), sum_response_bytes: sea_orm::Set(self.sum_response_bytes), - sum_credits_used: sea_orm::Set(paid_credits_used), + sum_credits_used: sea_orm::Set(self.paid_credits_used), sum_incl_free_credits_used: sea_orm::Set(self.sum_credits_used), }; @@ -313,7 +317,7 @@ impl BufferedRpcQueryStats { ( rpc_accounting_v2::Column::SumCreditsUsed, Expr::col(rpc_accounting_v2::Column::SumCreditsUsed) - .add(paid_credits_used), + .add(self.paid_credits_used), ), ]) .to_owned(), @@ -321,34 +325,7 @@ impl BufferedRpcQueryStats { .exec(db_conn) .await?; - Ok(self.sum_credits_used) - } - - // TODO: This is basically a duplicate with the balance_checks, except the database - // TODO: Please refactor this. Also there are small differences, like the Error is 0 - async fn _get_user_balance( - &self, - user_id: u64, - user_balance_cache: &UserBalanceCache, - db_conn: &DbConn, - ) -> Web3ProxyResult>> { - if user_id == 0 { - return Ok(Arc::new(AsyncRwLock::new(Balance::default()))); - } - - trace!("Will get it from the balance cache"); - - let x = user_balance_cache - .try_get_with(user_id, async { - let x = match Balance::try_from_db(db_conn, user_id).await? { - Some(x) => x, - None => return Err(Web3ProxyError::InvalidUserKey), - }; - Ok(Arc::new(AsyncRwLock::new(x))) - }) - .await?; - - Ok(x) + Ok(()) } // TODO: take a db transaction instead so that we can batch? @@ -357,8 +334,8 @@ impl BufferedRpcQueryStats { chain_id: u64, db_conn: &DatabaseConnection, key: RpcQueryKey, - rpc_secret_key_cache: &RpcSecretKeyCache, user_balance_cache: &UserBalanceCache, + rpc_secret_key_cache: &RpcSecretKeyCache, ) -> Web3ProxyResult<()> { // Sanity check, if we need to save stats if key.response_timestamp == 0 { @@ -372,52 +349,16 @@ impl BufferedRpcQueryStats { // TODO: rename to owner_id? let sender_user_id = key.rpc_key_user_id.map_or(0, |x| x.get()); - // Gathering cache and database rows - let user_balance = self - ._get_user_balance(sender_user_id, user_balance_cache, db_conn) - .await?; + // save the statistics to the database: + self._save_db_stats(chain_id, db_conn, &key).await?; - let mut user_balance = user_balance.write().await; - - let premium_before = user_balance.active_premium(); - - // First of all, save the statistics to the database: - let paid_credits_used = self - ._save_db_stats(chain_id, db_conn, &key, premium_before) - .await?; - - // No need to continue if no credits were used - if self.sum_credits_used == 0.into() { - // write-lock is released - return Ok(()); - } - - // Update and possible invalidate rpc caches if necessary (if there was a downgrade) - { - user_balance.total_spent_paid_credits += paid_credits_used; - - // Invalidate caches if remaining is getting close to $0 - // It will be re-fetched again if needed - if premium_before && user_balance.remaining() < Decimal::from(1) { - let rpc_keys = rpc_key::Entity::find() - .filter(rpc_key::Column::UserId.eq(sender_user_id)) - .all(db_conn) - .await?; - - // clear all keys owned by this user from the cache - for rpc_key_entity in rpc_keys { - rpc_secret_key_cache - .invalidate(&rpc_key_entity.secret_key.into()) - .await; - } - } - } - - if premium_before { + // Apply all the referral logic; let's keep it simple and flat for now + if self.paid_credits_used > 0.into() { // Start a transaction let txn = db_conn.begin().await?; - // Apply all the referral logic; let's keep it simple and flat for now + let mut invalidate_caches = false; + // Calculate if we are above the usage threshold, and apply a bonus // Optimally we would read this from the balance, but if we do it like this, we only have to lock a single table (much safer w.r.t. deadlocks) // referral_entity.credits_applied_for_referrer * (Decimal::from(10) checks (atomically using this table only), whether the user has brought in >$100 to the referer @@ -425,6 +366,7 @@ impl BufferedRpcQueryStats { // Apply a 10$ bonus onto the user, if the user has spent 100$ // TODO: i think we do want a LockType::Update on this match referee::Entity::find() + .lock(LockType::Update) .filter(referee::Column::UserId.eq(sender_user_id)) .find_also_related(referrer::Entity) .one(&txn) @@ -432,8 +374,8 @@ impl BufferedRpcQueryStats { { Some((referral_entity, Some(referrer))) => { // Get the balance for the referrer, see if they're premium or not - let referrer_balance = self - ._get_user_balance(referrer.user_id, user_balance_cache, db_conn) + let referrer_balance = user_balance_cache + .get_or_insert(db_conn, referrer.user_id) .await?; // Just to keep locking simple, read and clone. if the value is slightly delayed, that is okay @@ -467,9 +409,9 @@ impl BufferedRpcQueryStats { referral_entity.one_time_bonus_applied_for_referee = sea_orm::Set(bonus_for_user); - // Update the cache - // TODO: race condition here? - user_balance.one_time_referee_bonus += bonus_for_user; + // writing here with `+= 10` has a race unless we lock outside of the mysql query (and thats just too slow) + // so instead we just invalidate the cache (after writing to mysql) + invalidate_caches = true; } let now = Utc::now(); @@ -481,7 +423,10 @@ impl BufferedRpcQueryStats { // TODO: Perhaps let's not worry about the referral cache here, to avoid deadlocks (hence only reading) if now <= valid_until { - let referrer_bonus = self.sum_credits_used / Decimal::from(10); + // TODO: make this configurable (and change all the other hard coded places for 10%) + let referrer_bonus = self.paid_credits_used / Decimal::from(10); + + // there is a LockType::Update on this that should keep any raises incrementing this referral_entity.credits_applied_for_referrer = sea_orm::Set( referral_entity.credits_applied_for_referrer.as_ref() + referrer_bonus, @@ -505,10 +450,19 @@ impl BufferedRpcQueryStats { _ => {} }; - // Finally commit the transaction in the database + // Finally, commit the transaction in the database txn.commit() .await .context("Failed to update referral and balance updates")?; + + if invalidate_caches { + if let Err(err) = user_balance_cache + .invalidate(&sender_user_id, db_conn, rpc_secret_key_cache) + .await + { + warn!(?err, "unable to invalidate caches"); + }; + } } Ok(()) @@ -530,10 +484,6 @@ impl BufferedRpcQueryStats { builder = builder.tag("method", key.method); - // Read the latest balance ... - let remaining = self.approximate_latest_balance_for_influx.remaining(); - trace!("Remaining balance for influx is {:?}", remaining); - builder = builder .tag("archive_needed", key.archive_needed.to_string()) .tag("error_response", key.error_response.to_string()) @@ -554,7 +504,7 @@ impl BufferedRpcQueryStats { ) .field( "balance", - remaining + self.approximate_balance_remaining .to_f64() .context("balance is really (too) large")?, ); @@ -569,10 +519,11 @@ impl BufferedRpcQueryStats { } } -impl TryFrom for RpcQueryStats { - type Error = Web3ProxyError; - - fn try_from(mut metadata: RequestMetadata) -> Result { +/// this is **intentionally** not a TryFrom> +/// We want this to run when there is **one and only one** copy of this RequestMetadata left +/// There are often multiple copies if a request is being sent to multiple servers in parallel +impl RpcQueryStats { + fn try_from_metadata(mut metadata: RequestMetadata) -> Web3ProxyResult { let mut authorization = metadata.authorization.take(); if authorization.is_none() { @@ -620,18 +571,19 @@ impl TryFrom for RpcQueryStats { let cu = ComputeUnit::new(&metadata.method, metadata.chain_id, response_bytes); - // TODO: get from config? a helper function? how should we pick this? - let usd_per_cu = match metadata.chain_id { - 137 => Decimal::from_str("0.000000533333333333333"), - _ => Decimal::from_str("0.000000400000000000000"), - }?; + let cache_hit = backend_rpcs_used.is_empty(); - let cache_hit = !backend_rpcs_used.is_empty(); - - let compute_unit_cost = cu.cost(archive_request, cache_hit, error_response, usd_per_cu); + let compute_unit_cost = cu.cost( + archive_request, + cache_hit, + error_response, + &metadata.usd_per_cu, + ); let method = mem::take(&mut metadata.method); + let paid_credits_used = authorization.checks.paid_credits_used; + let x = Self { archive_request, authorization, @@ -640,6 +592,7 @@ impl TryFrom for RpcQueryStats { compute_unit_cost, error_response, method, + paid_credits_used, request_bytes, response_bytes, response_millis, diff --git a/web3_proxy/src/stats/stat_buffer.rs b/web3_proxy/src/stats/stat_buffer.rs index b833da2b..7ef44446 100644 --- a/web3_proxy/src/stats/stat_buffer.rs +++ b/web3_proxy/src/stats/stat_buffer.rs @@ -1,8 +1,8 @@ -use super::{AppStat, RpcQueryKey}; +use super::{AppStat, FlushedStats, RpcQueryKey}; use crate::app::Web3ProxyJoinHandle; -use crate::balance::Balance; use crate::caches::{RpcSecretKeyCache, UserBalanceCache}; use crate::errors::Web3ProxyResult; +use crate::stats::RpcQueryStats; use derive_more::From; use futures::stream; use hashbrown::HashMap; @@ -12,7 +12,7 @@ use migration::sea_orm::DatabaseConnection; use std::time::Duration; use tokio::sync::{broadcast, mpsc, oneshot}; use tokio::time::{interval, sleep}; -use tracing::{error, info, trace}; +use tracing::{error, info, trace, warn}; #[derive(Debug, Default)] pub struct BufferedRpcQueryStats { @@ -27,8 +27,11 @@ pub struct BufferedRpcQueryStats { pub sum_response_millis: u64, pub sum_credits_used: Decimal, pub sum_cu_used: Decimal, - /// The user's balance at this point in time. Multiple queries might be modifying it at once. - pub approximate_latest_balance_for_influx: Balance, + pub paid_credits_used: Decimal, + /// The user's balance at this point in time. + /// Multiple queries might be modifying it at once, so this is a copy of it when received + /// None if this is an unauthenticated request + pub approximate_balance_remaining: Decimal, } #[derive(From)] @@ -53,7 +56,7 @@ pub struct StatBuffer { tsdb_save_interval_seconds: u32, user_balance_cache: UserBalanceCache, - _flush_sender: mpsc::Sender>, + _flush_sender: mpsc::Sender>, } impl StatBuffer { @@ -69,8 +72,8 @@ impl StatBuffer { user_balance_cache: Option, shutdown_receiver: broadcast::Receiver<()>, tsdb_save_interval_seconds: u32, - flush_sender: mpsc::Sender>, - flush_receiver: mpsc::Receiver>, + flush_sender: mpsc::Sender>, + flush_receiver: mpsc::Receiver>, ) -> anyhow::Result> { if influxdb_bucket.is_none() { influxdb_client = None; @@ -115,7 +118,7 @@ impl StatBuffer { &mut self, mut stat_receiver: mpsc::UnboundedReceiver, mut shutdown_receiver: broadcast::Receiver<()>, - mut flush_receiver: mpsc::Receiver>, + mut flush_receiver: mpsc::Receiver>, ) -> Web3ProxyResult<()> { let mut tsdb_save_interval = interval(Duration::from_secs(self.tsdb_save_interval_seconds as u64)); @@ -127,22 +130,71 @@ impl StatBuffer { stat = stat_receiver.recv() => { // trace!("Received stat"); // save the stat to a buffer + + // TODO: tokio spawn this! match stat { - Some(AppStat::RpcQuery(stat)) => { + Some(AppStat::RpcQuery(request_metadata)) => { + // we convert on this side of the channel so that we don't slow down the request + let stat = RpcQueryStats::try_from_metadata(request_metadata)?; + + // update the latest balance + // do this BEFORE emitting any stats + let mut approximate_balance_remaining = 0.into(); + if let Some(db_conn) = self.db_conn.as_ref() { + let user_id = stat.authorization.checks.user_id; + + // update the user's balance + if user_id != 0 { + // update the user's cached balance + let mut user_balance = stat.authorization.checks.latest_balance.write().await; + + // TODO: move this to a helper function + user_balance.total_frontend_requests += 1; + user_balance.total_spent += stat.compute_unit_cost; + + if !stat.backend_rpcs_used.is_empty() { + user_balance.total_cache_misses += 1; + } + + // if paid_credits_used is true, then they were premium at the start of the request + if stat.authorization.checks.paid_credits_used { + // TODO: this lets them get a negative remaining balance. we should clear if close to 0 + user_balance.total_spent_paid_credits += stat.compute_unit_cost; + + // check if they still have premium + if user_balance.active_premium() { + // TODO: referall credits here? i think in the save_db section still makes sense for those + } else if let Err(err) = self.user_balance_cache.invalidate(&user_balance.user_id, db_conn, &self.rpc_secret_key_cache).await { + // was premium, but isn't anymore due to paying for this query. clear the cache + // TODO: stop at <$0.000001 instead of negative? + warn!(?err, "unable to clear caches"); + } + } else if user_balance.active_premium() { + // paid credits were not used, but now we have active premium. invalidate the caches + // TODO: this seems unliekly. should we warn if this happens so we can investigate? + if let Err(err) = self.user_balance_cache.invalidate(&user_balance.user_id, db_conn, &self.rpc_secret_key_cache).await { + // was premium, but isn't anymore due to paying for this query. clear the cache + // TODO: stop at <$0.000001 instead of negative? + warn!(?err, "unable to clear caches"); + } + } + + approximate_balance_remaining = user_balance.remaining(); + } + + self.accounting_db_buffer.entry(stat.accounting_key(self.billing_period_seconds)).or_default().add(stat.clone(), approximate_balance_remaining).await; + } + if self.influxdb_client.is_some() { // TODO: round the timestamp at all? + if let Some(opt_in_timeseries_key) = stat.owned_timeseries_key() { + self.opt_in_timeseries_buffer.entry(opt_in_timeseries_key).or_default().add(stat.clone(), approximate_balance_remaining).await; + } + let global_timeseries_key = stat.global_timeseries_key(); - self.global_timeseries_buffer.entry(global_timeseries_key).or_default().add(stat.clone()).await; - - if let Some(opt_in_timeseries_key) = stat.owned_timeseries_key() { - self.opt_in_timeseries_buffer.entry(opt_in_timeseries_key).or_default().add(stat.clone()).await; - } - } - - if self.db_conn.is_some() { - self.accounting_db_buffer.entry(stat.accounting_key(self.billing_period_seconds)).or_default().add(stat).await; + self.global_timeseries_buffer.entry(global_timeseries_key).or_default().add(stat, approximate_balance_remaining).await; } } None => { @@ -152,6 +204,7 @@ impl StatBuffer { } } _ = db_save_interval.tick() => { + // TODO: tokio spawn this! (but with a semaphore on db_save_interval) trace!("DB save internal tick"); let count = self.save_relational_stats().await; if count > 0 { @@ -171,17 +224,15 @@ impl StatBuffer { trace!("flush"); let tsdb_count = self.save_tsdb_stats().await; - if tsdb_count > 0 { - trace!("Flushed {} stats to the tsdb", tsdb_count); - } let relational_count = self.save_relational_stats().await; - if relational_count > 0 { - trace!("Flushed {} stats to the relational db", relational_count); - } - if let Err(err) = x.send((tsdb_count, relational_count)) { - error!(%tsdb_count, %relational_count, ?err, "unable to notify about flushed stats"); + let flushed_stats = FlushedStats{ timeseries: tsdb_count, relational: relational_count}; + + trace!(?flushed_stats); + + if let Err(err) = x.send(flushed_stats) { + error!(?flushed_stats, ?err, "unable to notify about flushed stats"); } } None => { @@ -244,8 +295,8 @@ impl StatBuffer { self.chain_id, db_conn, key, - &self.rpc_secret_key_cache, &self.user_balance_cache, + &self.rpc_secret_key_cache, ) .await { diff --git a/web3_proxy/src/sub_commands/migrate_stats_to_v2.rs b/web3_proxy/src/sub_commands/migrate_stats_to_v2.rs index e88c7202..976f052f 100644 --- a/web3_proxy/src/sub_commands/migrate_stats_to_v2.rs +++ b/web3_proxy/src/sub_commands/migrate_stats_to_v2.rs @@ -3,7 +3,7 @@ use crate::config::TopConfig; use crate::frontend::authorization::{Authorization, RequestMetadata, RpcSecretKey}; use crate::rpcs::one::Web3Rpc; use crate::stats::StatBuffer; -use anyhow::{anyhow, Context}; +use anyhow::Context; use argh::FromArgs; use entities::{rpc_accounting, rpc_key}; use futures::stream::FuturesUnordered; @@ -176,12 +176,14 @@ impl MigrateStatsToV2SubCommand { let request_ulid = Ulid::new(); + let chain_id = x.chain_id; + // Create RequestMetadata let request_metadata = RequestMetadata { archive_request: x.archive_request.into(), authorization: Some(authorization.clone()), backend_requests: Mutex::new(backend_rpcs), - chain_id: x.chain_id, + chain_id, error_response: x.error_response.into(), // debug data is in kafka, not mysql or influx kafka_debug_logger: None, @@ -204,11 +206,10 @@ impl MigrateStatsToV2SubCommand { stat_sender: Some(stat_sender.clone()), request_ulid, user_error_response: false.into(), + usd_per_cu: top_config.app.usd_per_cu.unwrap_or_default(), }; - if let Some(x) = request_metadata.try_send_stat()? { - return Err(anyhow!("failed saving stat! {:?}", x)); - } + request_metadata.try_send_stat()?; } } diff --git a/web3_proxy/src/sub_commands/popularity_contest.rs b/web3_proxy/src/sub_commands/popularity_contest.rs index b1dc161e..b94a09b7 100644 --- a/web3_proxy/src/sub_commands/popularity_contest.rs +++ b/web3_proxy/src/sub_commands/popularity_contest.rs @@ -95,7 +95,11 @@ impl PopularityContestSubCommand { highest_block = highest_block.max(head_block); - let head_delay_ms = conn.get("head_delay_ms").unwrap().as_f64().unwrap(); + // TODO: this was moved to an async lock and so serialize can't fetch it + let head_delay_ms = conn + .get("head_delay_ms") + .and_then(|x| x.as_f64()) + .unwrap_or_default(); let peak_latency_ms = conn .get("peak_latency_ms") diff --git a/web3_proxy/src/sub_commands/proxyd.rs b/web3_proxy/src/sub_commands/proxyd.rs index f36a7a87..80ad8396 100644 --- a/web3_proxy/src/sub_commands/proxyd.rs +++ b/web3_proxy/src/sub_commands/proxyd.rs @@ -1,7 +1,7 @@ -#![forbid(unsafe_code)] - use crate::app::{flatten_handle, flatten_handles, Web3ProxyApp}; +use crate::compute_units::default_usd_per_cu; use crate::config::TopConfig; +use crate::stats::FlushedStats; use crate::{frontend, prometheus}; use argh::FromArgs; use futures::StreamExt; @@ -60,15 +60,20 @@ impl ProxydSubCommand { /// this shouldn't really be pub except it makes test fixtures easier #[allow(clippy::too_many_arguments)] pub async fn _main( - top_config: TopConfig, + mut top_config: TopConfig, top_config_path: Option, frontend_port: Arc, prometheus_port: Arc, num_workers: usize, frontend_shutdown_sender: broadcast::Sender<()>, - flush_stat_buffer_sender: mpsc::Sender>, - flush_stat_buffer_receiver: mpsc::Receiver>, + flush_stat_buffer_sender: mpsc::Sender>, + flush_stat_buffer_receiver: mpsc::Receiver>, ) -> anyhow::Result<()> { + // TODO: this is gross but it works. i'd rather it be called by serde, but it needs to know the chain id + if top_config.app.usd_per_cu.is_none() { + top_config.app.usd_per_cu = Some(default_usd_per_cu(top_config.app.chain_id)); + } + // tokio has code for catching ctrl+c so we use that to shut down in most cases // frontend_shutdown_sender is currently only used in tests, but we might make a /shutdown endpoint or something // we do not need this receiver. new receivers are made by `shutdown_sender.subscribe()` @@ -104,7 +109,12 @@ impl ProxydSubCommand { thread::spawn(move || loop { match fs::read_to_string(&top_config_path) { Ok(new_top_config) => match toml::from_str::(&new_top_config) { - Ok(new_top_config) => { + Ok(mut new_top_config) => { + if new_top_config.app.usd_per_cu.is_none() { + new_top_config.app.usd_per_cu = + Some(default_usd_per_cu(new_top_config.app.chain_id)); + } + if new_top_config != current_config { trace!("current_config: {:#?}", current_config); trace!("new_top_config: {:#?}", new_top_config); diff --git a/web3_proxy/src/sub_commands/user_import.rs b/web3_proxy/src/sub_commands/user_import.rs index 9b7c7815..f04f3a63 100644 --- a/web3_proxy/src/sub_commands/user_import.rs +++ b/web3_proxy/src/sub_commands/user_import.rs @@ -10,7 +10,7 @@ use migration::sea_orm::{ }; use std::path::{Path, PathBuf}; use std::{fs::File, io::BufReader}; -use tracing::{info}; +use tracing::info; #[derive(FromArgs, PartialEq, Eq, Debug)] /// Import users from another database. diff --git a/web3_proxy/src/user_token.rs b/web3_proxy/src/user_token.rs index c5f14066..33d59d8c 100644 --- a/web3_proxy/src/user_token.rs +++ b/web3_proxy/src/user_token.rs @@ -1,12 +1,12 @@ use axum::headers::authorization::Bearer; use migration::sea_orm::prelude::Uuid; -use serde::Serialize; +use serde::{Deserialize, Serialize}; use std::fmt; use std::str::FromStr; use ulid::Ulid; /// Key used for caching the user's login -#[derive(Clone, Hash, PartialEq, Eq, Serialize)] +#[derive(Copy, Clone, Debug, Deserialize, Hash, PartialEq, Eq, Serialize)] #[serde(transparent)] pub struct UserBearerToken(pub Ulid); diff --git a/web3_proxy/tests/common/get_admin_deposits.rs b/web3_proxy/tests/common/admin_deposits.rs similarity index 100% rename from web3_proxy/tests/common/get_admin_deposits.rs rename to web3_proxy/tests/common/admin_deposits.rs diff --git a/web3_proxy/tests/common/admin_increases_balance.rs b/web3_proxy/tests/common/admin_increases_balance.rs index 9f3e525c..a7a63fe3 100644 --- a/web3_proxy/tests/common/admin_increases_balance.rs +++ b/web3_proxy/tests/common/admin_increases_balance.rs @@ -36,6 +36,7 @@ pub async fn admin_increase_balance( .unwrap(); info!(?increase_balance_response, "http response"); + // TODO: use a struct here let increase_balance_response = increase_balance_response .json::() .await diff --git a/web3_proxy/tests/common/app.rs b/web3_proxy/tests/common/app.rs index c2b74f20..3720fca0 100644 --- a/web3_proxy/tests/common/app.rs +++ b/web3_proxy/tests/common/app.rs @@ -32,6 +32,7 @@ use tracing::{info, trace, warn}; use web3_proxy::{ config::{AppConfig, TopConfig, Web3RpcConfig}, relational_db::get_migrated_db, + stats::FlushedStats, sub_commands::ProxydSubCommand, }; @@ -59,23 +60,26 @@ pub struct TestApp { pub proxy_provider: Provider, /// tell the app to flush stats to the database - flush_stat_buffer_sender: mpsc::Sender>, + flush_stat_buffer_sender: mpsc::Sender>, /// tell the app to shut down (use `self.stop()`). shutdown_sender: broadcast::Sender<()>, } impl TestApp { - pub async fn spawn(setup_db: bool) -> Self { + pub async fn spawn(chain_id: u64, setup_db: bool) -> Self { + info!(?chain_id); + let num_workers = 2; // TODO: move basic setup into a test fixture let path = env::var("PATH").unwrap(); - info!("path: {}", path); + info!(%path); // TODO: configurable rpc and block let anvil = Anvil::new() + .chain_id(chain_id) // .fork("https://polygon.llamarpc.com@44300000") .spawn(); @@ -244,7 +248,7 @@ impl TestApp { // TODO: test influx // TODO: test redis let app_config: AppConfig = serde_json::from_value(json!({ - "chain_id": 31337, + "chain_id": chain_id, "db_url": db_url, "default_user_max_requests_per_period": Some(6_000_000), "deposit_factory_contract": Address::from_str( @@ -328,7 +332,7 @@ impl TestApp { } #[allow(unused)] - pub async fn flush_stats(&self) -> anyhow::Result<(usize, usize)> { + pub async fn flush_stats(&self) -> anyhow::Result { let (tx, rx) = oneshot::channel(); self.flush_stat_buffer_sender.send(tx).await?; diff --git a/web3_proxy/tests/common/create_admin.rs b/web3_proxy/tests/common/create_admin.rs index e40771cf..11d87f81 100644 --- a/web3_proxy/tests/common/create_admin.rs +++ b/web3_proxy/tests/common/create_admin.rs @@ -38,15 +38,20 @@ pub async fn create_user_as_admin( }; info!(?admin_post_login_data); - let admin_login_response = r + let admin_post_login_data = r .post(&login_post_url) .json(&admin_post_login_data) .send() .await .unwrap() - .json::() + .text() .await .unwrap(); + + info!("admin_post_login_data: {:#}", admin_post_login_data); + + let admin_login_response: LoginPostResponse = + serde_json::from_str(&admin_post_login_data).unwrap(); info!(?admin_login_response); // Upgrade the account to admin diff --git a/web3_proxy/tests/common/create_user.rs b/web3_proxy/tests/common/create_user.rs index e9a183ff..ea2579a9 100644 --- a/web3_proxy/tests/common/create_user.rs +++ b/web3_proxy/tests/common/create_user.rs @@ -1,7 +1,12 @@ use crate::TestApp; +use entities::{user, user_tier}; use ethers::prelude::{LocalWallet, Signer}; use ethers::types::Signature; +use migration::sea_orm::{ + self, ActiveModelTrait, ColumnTrait, EntityTrait, IntoActiveModel, QueryFilter, +}; use tracing::info; +use web3_proxy::errors::Web3ProxyResult; use web3_proxy::frontend::users::authentication::{LoginPostResponse, PostLogin}; /// Helper function to create an "ordinary" user @@ -32,7 +37,7 @@ pub async fn create_user( }; info!(?user_post_login_data); - let user_login_response = r + let mut user_login_response = r .post(&login_post_url) .json(&user_post_login_data) .send() @@ -45,3 +50,27 @@ pub async fn create_user( user_login_response } + +/// TODO: use an admin user to do this instead +#[allow(unused)] +pub async fn set_user_tier( + x: &TestApp, + user: user::Model, + tier_name: &str, +) -> Web3ProxyResult { + let db_conn = x.db_conn(); + + let ut = user_tier::Entity::find() + .filter(user_tier::Column::Title.like(tier_name)) + .one(db_conn) + .await? + .unwrap(); + + let mut user = user.into_active_model(); + + user.user_tier_id = sea_orm::Set(ut.id); + + user.save(db_conn).await?; + + Ok(ut) +} diff --git a/web3_proxy/tests/common/mod.rs b/web3_proxy/tests/common/mod.rs index 8b80aa0e..06c82600 100644 --- a/web3_proxy/tests/common/mod.rs +++ b/web3_proxy/tests/common/mod.rs @@ -1,10 +1,10 @@ +pub mod admin_deposits; pub mod admin_increases_balance; pub mod app; pub mod create_admin; pub mod create_user; -pub mod get_admin_deposits; -pub mod get_rpc_key; -pub mod get_user_balance; pub mod referral; +pub mod rpc_key; +pub mod user_balance; pub use self::app::TestApp; diff --git a/web3_proxy/tests/common/get_rpc_key.rs b/web3_proxy/tests/common/rpc_key.rs similarity index 66% rename from web3_proxy/tests/common/get_rpc_key.rs rename to web3_proxy/tests/common/rpc_key.rs index 6bfbf702..d068f1fc 100644 --- a/web3_proxy/tests/common/get_rpc_key.rs +++ b/web3_proxy/tests/common/rpc_key.rs @@ -1,8 +1,13 @@ +use std::time::Duration; + use crate::TestApp; use serde::Deserialize; use tracing::info; use ulid::Ulid; -use web3_proxy::frontend::users::authentication::LoginPostResponse; +use web3_proxy::{ + frontend::users::authentication::LoginPostResponse, + rpcs::provider::{connect_http, EthersHttpProvider}, +}; #[derive(Debug, Deserialize)] pub struct RpcKeyResponse { @@ -33,6 +38,7 @@ pub async fn user_get_first_rpc_key( r: &reqwest::Client, login_response: &LoginPostResponse, ) -> RpcKey { + // TODO: refactor to use login_response? or compare? let get_keys = format!("{}user/keys", x.proxy_provider.url()); info!("Get balance"); @@ -47,9 +53,30 @@ pub async fn user_get_first_rpc_key( let rpc_key_response = rpc_key_response.json::().await.unwrap(); info!(?rpc_key_response); - info!("Rpc Key"); + info!("Parsing rpc key as json"); let rpc_key: RpcKeyResponse = serde_json::from_value(rpc_key_response).unwrap(); info!(?rpc_key); rpc_key.user_rpc_keys.into_iter().next().unwrap().1 } + +#[allow(unused)] +pub async fn user_get_provider( + x: &TestApp, + r: &reqwest::Client, + login_response: &LoginPostResponse, +) -> anyhow::Result { + let first_key = login_response.rpc_keys.iter().next().unwrap().1; + + let rpc_url = format!( + "{}rpc/{}", + x.proxy_provider.url(), + Ulid::from(first_key.secret_key) + ); + + connect_http( + rpc_url.parse().unwrap(), + Some(r.clone()), + Duration::from_secs(1), + ) +} diff --git a/web3_proxy/tests/common/get_user_balance.rs b/web3_proxy/tests/common/user_balance.rs similarity index 60% rename from web3_proxy/tests/common/get_user_balance.rs rename to web3_proxy/tests/common/user_balance.rs index 88d5d97e..fd428c30 100644 --- a/web3_proxy/tests/common/get_user_balance.rs +++ b/web3_proxy/tests/common/user_balance.rs @@ -1,5 +1,7 @@ use crate::TestApp; -use tracing::info; +use serde_json::json; +use tracing::{info, trace}; +use web3_proxy::balance::Balance; use web3_proxy::frontend::users::authentication::LoginPostResponse; /// Helper function to get the user's balance @@ -8,19 +10,24 @@ pub async fn user_get_balance( x: &TestApp, r: &reqwest::Client, login_response: &LoginPostResponse, -) -> (serde_json::Value) { +) -> Balance { let get_user_balance = format!("{}user/balance", x.proxy_provider.url()); - info!("Get balance"); + let balance_response = r .get(get_user_balance) .bearer_auth(login_response.bearer_token) .send() .await .unwrap(); - info!(?balance_response); + trace!( + ?balance_response, + "get balance for user #{}", + login_response.user.id + ); - let balance_response = balance_response.json::().await.unwrap(); - info!(?balance_response); + let balance = balance_response.json().await.unwrap(); - balance_response + info!("balance: {:#}", json!(&balance)); + + balance } diff --git a/web3_proxy/tests/test_admins.rs b/web3_proxy/tests/test_admins.rs index 56ca1b9a..a215bad5 100644 --- a/web3_proxy/tests/test_admins.rs +++ b/web3_proxy/tests/test_admins.rs @@ -5,8 +5,8 @@ use std::time::Duration; use crate::common::admin_increases_balance::admin_increase_balance; use crate::common::create_admin::create_user_as_admin; -use crate::common::create_user::create_user; -use crate::common::get_user_balance::user_get_balance; +use crate::common::create_user::{create_user, set_user_tier}; +use crate::common::user_balance::user_get_balance; use crate::common::TestApp; use migration::sea_orm::prelude::Decimal; use tracing::info; @@ -15,7 +15,7 @@ use tracing::info; #[ignore = "under construction"] #[test_log::test(tokio::test)] async fn test_admin_imitate_user() { - let x = TestApp::spawn(true).await; + let x = TestApp::spawn(31337, true).await; todo!(); } @@ -24,7 +24,7 @@ async fn test_admin_imitate_user() { #[test_log::test(tokio::test)] async fn test_admin_grant_credits() { info!("Starting admin grant credits test"); - let x = TestApp::spawn(true).await; + let x = TestApp::spawn(31337, true).await; let r = reqwest::Client::builder() .timeout(Duration::from_secs(3)) .build() @@ -39,6 +39,8 @@ async fn test_admin_grant_credits() { let admin_login_response = create_user_as_admin(&x, &r, &admin_wallet).await; info!(?admin_login_response); + set_user_tier(&x, user_login_response.user.clone(), "Premium").await.unwrap(); + let increase_balance_response = admin_increase_balance( &x, &r, @@ -53,11 +55,8 @@ async fn test_admin_grant_credits() { Decimal::from(100) ); - let user_balance_response = user_get_balance(&x, &r, &user_login_response).await; - assert_eq!( - Decimal::from_str(user_balance_response["balance"].as_str().unwrap()).unwrap(), - Decimal::from(100) - ); + let user_balance = user_get_balance(&x, &r, &user_login_response).await; + assert_eq!(user_balance.remaining(), Decimal::from(100)); x.wait().await; } @@ -66,6 +65,6 @@ async fn test_admin_grant_credits() { #[ignore = "under construction"] #[test_log::test(tokio::test)] async fn test_admin_change_user_tier() { - let x = TestApp::spawn(true).await; + let x = TestApp::spawn(31337, true).await; todo!(); } diff --git a/web3_proxy/tests/test_proxy.rs b/web3_proxy/tests/test_proxy.rs index 83106df2..d9cdec7e 100644 --- a/web3_proxy/tests/test_proxy.rs +++ b/web3_proxy/tests/test_proxy.rs @@ -13,7 +13,7 @@ use web3_proxy::rpcs::blockchain::ArcBlock; #[cfg_attr(not(feature = "tests-needing-docker"), ignore)] #[test_log::test(tokio::test)] async fn it_migrates_the_db() { - let x = TestApp::spawn(true).await; + let x = TestApp::spawn(31337, true).await; // we call flush stats more to be sure it works than because we expect it to save any stats x.flush_stats().await.unwrap(); @@ -21,7 +21,7 @@ async fn it_migrates_the_db() { #[test_log::test(tokio::test)] async fn it_starts_and_stops() { - let x = TestApp::spawn(false).await; + let x = TestApp::spawn(31337, false).await; let anvil_provider = &x.anvil_provider; let proxy_provider = &x.proxy_provider; diff --git a/web3_proxy/tests/test_sum_credits_used.rs b/web3_proxy/tests/test_sum_credits_used.rs new file mode 100644 index 00000000..25e531d1 --- /dev/null +++ b/web3_proxy/tests/test_sum_credits_used.rs @@ -0,0 +1,205 @@ +mod common; + +use crate::common::{ + admin_increases_balance::admin_increase_balance, + create_admin::create_user_as_admin, + create_user::{create_user, set_user_tier}, + rpc_key::user_get_provider, + user_balance::user_get_balance, + TestApp, +}; +use ethers::prelude::U64; +use migration::sea_orm::prelude::Decimal; +use std::time::Duration; +use tracing::info; +use web3_proxy::balance::Balance; + +// TODO: #[cfg_attr(not(feature = "tests-needing-docker"), ignore)] +#[test_log::test(tokio::test)] +async fn test_sum_credits_used() { + // chain_id 999_001_999 costs $.10/CU + let x = TestApp::spawn(999_001_999, true).await; + + let r = reqwest::Client::builder() + .timeout(Duration::from_secs(3)) + .build() + .unwrap(); + + // create wallets for users + let user_wallet = x.wallet(0); + let admin_wallet = x.wallet(1); + + // log in to create users + let admin_login_response = create_user_as_admin(&x, &r, &admin_wallet).await; + let user_login_response = create_user(&x, &r, &user_wallet, None).await; + + set_user_tier(&x, user_login_response.user.clone(), "Premium") + .await + .unwrap(); + + // TODO: set the user's user_id to the "Premium" tier + + info!("starting balance"); + let balance: Balance = user_get_balance(&x, &r, &user_login_response).await; + assert_eq!( + balance.total_frontend_requests, 0, + "total_frontend_requests" + ); + assert_eq!(balance.total_cache_misses, 0, "total_cache_misses"); + assert_eq!( + balance.total_spent_paid_credits, + 0.into(), + "total_spent_paid_credits" + ); + assert_eq!(balance.total_spent, 0.into(), "total_spent"); + assert_eq!(balance.remaining(), 0.into(), "remaining"); + assert!(!balance.active_premium(), "active_premium"); + assert!(!balance.was_ever_premium(), "was_ever_premium"); + + info!("make one free request against the public RPC of 16 CU"); + x.proxy_provider + .request::<_, Option>("eth_blockNumber", ()) + .await + .unwrap(); + + // connect to the user's rpc url + let user_proxy_provider = user_get_provider(&x, &r, &user_login_response) + .await + .unwrap(); + + info!("make one cached authenticated (but out of funds) rpc request of 16 CU"); + user_proxy_provider + .request::<_, Option>("eth_blockNumber", ()) + .await + .unwrap(); + + let query_cost: Decimal = "1.00".parse().unwrap(); + + // let archive_multiplier: Decimal = "2.5".parse().unwrap(); + + let cache_multipler: Decimal = "0.75".parse().unwrap(); + + let cached_query_cost: Decimal = query_cost * cache_multipler; + + // flush stats + let flushed = x.flush_stats().await.unwrap(); + assert_eq!(flushed.relational, 2, "relational"); + assert_eq!(flushed.timeseries, 0, "timeseries"); + + // Give user wallet $1000 + admin_increase_balance(&x, &r, &admin_login_response, &user_wallet, 1000.into()).await; + + // check balance + let balance: Balance = user_get_balance(&x, &r, &user_login_response).await; + assert_eq!( + balance.total_frontend_requests, 1, + "total_frontend_requests" + ); + assert_eq!(balance.total_cache_misses, 0, "total_cache_misses"); + assert_eq!( + balance.total_spent_paid_credits, + 0.into(), + "total_spent_paid_credits" + ); + assert_eq!(balance.total_spent, cached_query_cost, "total_spent"); // TODO: not sure what this should be + assert_eq!(balance.remaining(), 1000.into(), "remaining"); + assert!(balance.active_premium(), "active_premium"); + assert!(balance.was_ever_premium(), "was_ever_premium"); + + info!("make one cached authenticated rpc request of 16 CU"); + user_proxy_provider + .request::<_, Option>("eth_blockNumber", ()) + .await + .unwrap(); + + // flush stats + let flushed = x.flush_stats().await.unwrap(); + assert_eq!(flushed.relational, 1); + + // check balance + let balance: Balance = user_get_balance(&x, &r, &user_login_response).await; + assert_eq!( + balance.total_frontend_requests, 2, + "total_frontend_requests" + ); + assert_eq!(balance.total_cache_misses, 0, "total_cache_misses"); + assert_eq!( + balance.total_spent, + cached_query_cost * Decimal::from(2), + "total_spent" + ); + assert_eq!( + balance.total_spent_paid_credits, cached_query_cost, + "total_spent_paid_credits" + ); + assert_eq!( + balance.remaining(), + Decimal::from(1000) - cached_query_cost, + "remaining" + ); + assert!(balance.active_premium(), "active_premium"); + assert!(balance.was_ever_premium(), "was_ever_premium"); + + info!("make ten cached authenticated requests of 16 CU"); + for _ in 0..10 { + user_proxy_provider + .request::<_, Option>("eth_blockNumber", ()) + .await + .unwrap(); + } + + // flush stats + let flushed = x.flush_stats().await.unwrap(); + assert_eq!(flushed.relational, 1); + + // check balance + info!("checking the final balance"); + let balance: Balance = user_get_balance(&x, &r, &user_login_response).await; + + // the first of our 12 total requests request was on the free tier, so paid_credits should only count 11 + let expected_total_spent_paid_credits = Decimal::from(11) * cached_query_cost; + + assert_eq!( + balance.total_frontend_requests, 12, + "total_frontend_requests" + ); + assert_eq!(balance.total_cache_misses, 0, "total_cache_misses"); + assert_eq!( + balance.total_spent_paid_credits, expected_total_spent_paid_credits, + "total_spent_paid_credits" + ); + assert_eq!( + balance.total_spent, + expected_total_spent_paid_credits + cached_query_cost, + "total_spent" + ); + assert_eq!( + balance.remaining(), + Decimal::from(1000) - expected_total_spent_paid_credits + ); + assert!(balance.active_premium(), "active_premium"); + assert!(balance.was_ever_premium(), "was_ever_premium"); + + // TODO: make enough queries to push the user balance negative + + // check admin's balance to make sure nothing is leaking + info!("checking the admin"); + let admin_balance: Balance = user_get_balance(&x, &r, &admin_login_response).await; + + assert!(!admin_balance.active_premium(), "active_premium"); + assert!(!admin_balance.was_ever_premium(), "was_ever_premium"); + assert_eq!( + admin_balance.total_frontend_requests, 0, + "total_frontend_requests" + ); + assert_eq!(admin_balance.total_cache_misses, 0, "total_cache_misses"); + assert_eq!( + admin_balance.total_spent_paid_credits, + 0.into(), + "total_spent_paid_credits" + ); + assert_eq!(admin_balance.total_spent, 0.into(), "total_spent"); + assert_eq!(admin_balance.remaining(), 0.into(), "remaining"); + + // TODO: query "user 0" to get the public counts +} diff --git a/web3_proxy/tests/test_users.rs b/web3_proxy/tests/test_users.rs index 740fd22d..d25f47f4 100644 --- a/web3_proxy/tests/test_users.rs +++ b/web3_proxy/tests/test_users.rs @@ -1,15 +1,15 @@ mod common; +use crate::common::admin_deposits::get_admin_deposits; use crate::common::admin_increases_balance::admin_increase_balance; use crate::common::create_admin::create_user_as_admin; -use crate::common::create_user::create_user; -use crate::common::get_admin_deposits::get_admin_deposits; -use crate::common::get_rpc_key::{user_get_first_rpc_key, RpcKey}; -use crate::common::get_user_balance::user_get_balance; +use crate::common::create_user::{create_user, set_user_tier}; use crate::common::referral::{ get_referral_code, get_shared_referral_codes, get_used_referral_codes, UserSharedReferralInfo, UserUsedReferralInfo, }; +use crate::common::rpc_key::{user_get_first_rpc_key, RpcKey}; +use crate::common::user_balance::user_get_balance; use crate::common::TestApp; use ethers::prelude::{Http, Provider}; use ethers::{signers::Signer, types::Signature}; @@ -36,7 +36,7 @@ struct LoginPostResponse { #[cfg_attr(not(feature = "tests-needing-docker"), ignore)] #[test_log::test(tokio::test)] async fn test_log_in_and_out() { - let x = TestApp::spawn(true).await; + let x = TestApp::spawn(31337, true).await; let r = reqwest::Client::new(); @@ -92,7 +92,7 @@ async fn test_log_in_and_out() { #[test_log::test(tokio::test)] async fn test_admin_balance_increase() { info!("Starting admin can increase balance"); - let x = TestApp::spawn(true).await; + let x = TestApp::spawn(31337, true).await; let r = reqwest::Client::builder() .timeout(Duration::from_secs(20)) .build() @@ -105,6 +105,10 @@ async fn test_admin_balance_increase() { let admin_login_response = create_user_as_admin(&x, &r, &admin_wallet).await; let user_login_response = create_user(&x, &r, &user_wallet, None).await; + set_user_tier(&x, user_login_response.user.clone(), "Premium") + .await + .unwrap(); + // Bump both user's wallet to $20 admin_increase_balance( &x, @@ -139,7 +143,7 @@ async fn test_admin_balance_increase() { #[test_log::test(tokio::test)] async fn test_user_balance_decreases() { info!("Starting balance decreases with usage test"); - let x = TestApp::spawn(true).await; + let x = TestApp::spawn(31337, true).await; let r = reqwest::Client::builder() .timeout(Duration::from_secs(20)) .build() @@ -152,13 +156,17 @@ async fn test_user_balance_decreases() { let admin_login_response = create_user_as_admin(&x, &r, &admin_wallet).await; let user_login_response = create_user(&x, &r, &user_wallet, None).await; + set_user_tier(&x, user_login_response.user.clone(), "Premium") + .await + .unwrap(); + // Get the rpc keys for this user let rpc_keys: RpcKey = user_get_first_rpc_key(&x, &r, &user_login_response).await; let proxy_endpoint = format!("{}rpc/{}", x.proxy_provider.url(), rpc_keys.secret_key); let proxy_provider = Provider::::try_from(proxy_endpoint).unwrap(); // Make some requests while in the free tier, so we can test bookkeeping here - for _ in 1..10_000 { + for _ in 1..=10_000 { let _ = proxy_provider .request::<_, Option>("eth_getBlockByNumber", ("latest", false)) .await @@ -167,35 +175,20 @@ async fn test_user_balance_decreases() { } // Flush all stats here - let (influx_count, mysql_count) = x.flush_stats().await.unwrap(); - assert_eq!(influx_count, 0); - assert!(mysql_count > 0); + let flush_count = x.flush_stats().await.unwrap(); + assert_eq!(flush_count.timeseries, 0); + assert!(flush_count.relational > 0); // Check the balance, it should not have decreased; there should have been accounted free credits, however - let user_balance_response = user_get_balance(&x, &r, &user_login_response).await; + let user_balance = user_get_balance(&x, &r, &user_login_response).await; // Check that the balance is 0 - assert_eq!( - Decimal::from_str(user_balance_response["balance"].as_str().unwrap()).unwrap(), - Decimal::from(0) - ); + assert_eq!(user_balance.remaining(), Decimal::from(0)); // Check that paid credits is 0 (because balance is 0) - assert_eq!( - Decimal::from_str( - user_balance_response["total_spent_paid_credits"] - .as_str() - .unwrap() - ) - .unwrap(), - Decimal::from(0) - ); + assert_eq!(user_balance.total_spent_paid_credits, Decimal::from(0)); // Check that paid credits is 0 (because balance is 0) - assert_eq!( - Decimal::from_str(user_balance_response["total_deposits"].as_str().unwrap()).unwrap(), - Decimal::from(0) - ); + assert_eq!(user_balance.total_deposits(), Decimal::from(0)); // Check that total credits incl free used is larger than 0 - let previously_free_spent = - Decimal::from_str(user_balance_response["total_spent"].as_str().unwrap()).unwrap(); + let previously_free_spent = user_balance.total_spent; assert!(previously_free_spent > Decimal::from(0)); // Bump both user's wallet to $20 @@ -208,11 +201,10 @@ async fn test_user_balance_decreases() { ) .await; let user_balance_response = user_get_balance(&x, &r, &user_login_response).await; - let user_balance_pre = - Decimal::from_str(user_balance_response["balance"].as_str().unwrap()).unwrap(); + let user_balance_pre = user_balance_response.remaining(); assert_eq!(user_balance_pre, Decimal::from(20)); - for _ in 1..10_000 { + for _ in 1..=10_000 { let _ = proxy_provider .request::<_, Option>("eth_getBlockByNumber", ("latest", false)) .await @@ -221,54 +213,43 @@ async fn test_user_balance_decreases() { } // Flush all stats here - let (influx_count, mysql_count) = x.flush_stats().await.unwrap(); - assert_eq!(influx_count, 0); - assert!(mysql_count > 0); + let flush_count = x.flush_stats().await.unwrap(); + assert_eq!(flush_count.timeseries, 0); + assert!(flush_count.relational == 1); // Deposits should not be affected, and should be equal to what was initially provided - let user_balance_response = user_get_balance(&x, &r, &user_login_response).await; - let total_deposits = - Decimal::from_str(user_balance_response["total_deposits"].as_str().unwrap()).unwrap(); + let user_balance = user_get_balance(&x, &r, &user_login_response).await; + + let total_deposits = user_balance.total_deposits(); assert_eq!(total_deposits, Decimal::from(20)); // Check that total_spent_paid credits is equal to total_spent, because we are all still inside premium assert_eq!( - Decimal::from_str( - user_balance_response["total_spent_paid_credits"] - .as_str() - .unwrap() - ) - .unwrap() - + previously_free_spent, - Decimal::from_str(user_balance_response["total_spent"].as_str().unwrap()).unwrap() + user_balance.total_spent_paid_credits + previously_free_spent, + user_balance.total_spent, ); // Get the full balance endpoint - let user_balance_post = - Decimal::from_str(user_balance_response["balance"].as_str().unwrap()).unwrap(); + let user_balance_post = user_balance.remaining(); assert!(user_balance_post < user_balance_pre); + // 10k while free, 10k while premium + assert_eq!(user_balance.total_frontend_requests, 20_000); + // Balance should be total deposits - usage while in the paid tier - let total_spent_in_paid_credits = Decimal::from_str( - user_balance_response["total_spent_paid_credits"] - .as_str() - .unwrap(), - ) - .unwrap(); + let total_spent_in_paid_credits = user_balance.total_spent_paid_credits; assert_eq!( total_deposits - total_spent_in_paid_credits, user_balance_post ); // This should never be negative - let user_balance_total_spent = - Decimal::from_str(user_balance_response["total_spent"].as_str().unwrap()).unwrap(); - assert!(user_balance_total_spent > Decimal::from(0)); + assert!(user_balance.total_spent > Decimal::from(0)); } #[cfg_attr(not(feature = "tests-needing-docker"), ignore)] #[test_log::test(tokio::test)] async fn test_referral_bonus_non_concurrent() { info!("Starting referral bonus test"); - let x = TestApp::spawn(true).await; + let x = TestApp::spawn(31337, true).await; let r = reqwest::Client::builder() .timeout(Duration::from_secs(20)) .build() @@ -286,6 +267,13 @@ async fn test_referral_bonus_non_concurrent() { let user_login_response = create_user(&x, &r, &user_wallet, Some(referral_link.clone())).await; + set_user_tier(&x, referrer_login_response.user.clone(), "Premium") + .await + .unwrap(); + set_user_tier(&x, user_login_response.user.clone(), "Premium") + .await + .unwrap(); + // Bump both user's wallet to $20 admin_increase_balance( &x, @@ -306,11 +294,9 @@ async fn test_referral_bonus_non_concurrent() { // Get balance before for both users let user_balance_response = user_get_balance(&x, &r, &user_login_response).await; - let user_balance_pre = - Decimal::from_str(user_balance_response["balance"].as_str().unwrap()).unwrap(); + let user_balance_pre = user_balance_response.remaining(); let referrer_balance_response = user_get_balance(&x, &r, &referrer_login_response).await; - let referrer_balance_pre = - Decimal::from_str(referrer_balance_response["balance"].as_str().unwrap()).unwrap(); + let referrer_balance_pre = referrer_balance_response.remaining(); // Make sure they both have balance now assert_eq!(user_balance_pre, Decimal::from(20)); @@ -354,7 +340,7 @@ async fn test_referral_bonus_non_concurrent() { let proxy_endpoint = format!("{}rpc/{}", x.proxy_provider.url(), rpc_keys.secret_key); let proxy_provider = Provider::::try_from(proxy_endpoint).unwrap(); - for _ in 1..20_000 { + for _ in 1..=20_000 { let _proxy_result = proxy_provider .request::<_, Option>("eth_getBlockByNumber", ("latest", false)) .await @@ -363,24 +349,25 @@ async fn test_referral_bonus_non_concurrent() { } // Flush all stats here - let (influx_count, mysql_count) = x.flush_stats().await.unwrap(); - assert_eq!(influx_count, 0); - assert!(mysql_count > 0); + let flush_count = x.flush_stats().await.unwrap(); + assert_eq!(flush_count.timeseries, 0); + assert!(flush_count.relational > 0); // Check that at least something was earned: let shared_referral_code: UserSharedReferralInfo = get_shared_referral_codes(&x, &r, &referrer_login_response).await; - info!("Referral code"); - info!("{:?}", shared_referral_code.referrals.get(0).unwrap()); + info!(referrals=?shared_referral_code.referrals.get(0).unwrap(), "Referral code"); + + let user_balance = user_get_balance(&x, &r, &user_login_response).await; + + // first, make sure that 20k requests were saved to the db + assert_eq!(user_balance.total_frontend_requests, 20_000); // We make sure that the referrer has $10 + 10% of the used balance // The admin provides credits for both - let user_balance_response = user_get_balance(&x, &r, &user_login_response).await; - let user_balance_post = - Decimal::from_str(user_balance_response["balance"].as_str().unwrap()).unwrap(); - let referrer_balance_response = user_get_balance(&x, &r, &referrer_login_response).await; - let referrer_balance_post = - Decimal::from_str(referrer_balance_response["balance"].as_str().unwrap()).unwrap(); + let user_balance_post = user_balance.remaining(); + let referrer_balance = user_get_balance(&x, &r, &referrer_login_response).await; + let referrer_balance_post = referrer_balance.remaining(); info!( "Balances before and after are (user): {:?} {:?}", @@ -412,7 +399,7 @@ async fn test_referral_bonus_non_concurrent() { #[test_log::test(tokio::test)] async fn test_referral_bonus_concurrent_referrer_only() { info!("Starting referral bonus test"); - let x = TestApp::spawn(true).await; + let x = TestApp::spawn(31337, true).await; let r = reqwest::Client::builder() .timeout(Duration::from_secs(20)) .build() @@ -430,6 +417,13 @@ async fn test_referral_bonus_concurrent_referrer_only() { let user_login_response = create_user(&x, &r, &user_wallet, Some(referral_link.clone())).await; + set_user_tier(&x, referrer_login_response.user.clone(), "Premium") + .await + .unwrap(); + set_user_tier(&x, user_login_response.user.clone(), "Premium") + .await + .unwrap(); + // Bump both user's wallet to $20 admin_increase_balance( &x, @@ -450,11 +444,9 @@ async fn test_referral_bonus_concurrent_referrer_only() { // Get balance before for both users let user_balance_response = user_get_balance(&x, &r, &user_login_response).await; - let user_balance_pre = - Decimal::from_str(user_balance_response["balance"].as_str().unwrap()).unwrap(); + let user_balance_pre = user_balance_response.remaining(); let referrer_balance_response = user_get_balance(&x, &r, &referrer_login_response).await; - let referrer_balance_pre = - Decimal::from_str(referrer_balance_response["balance"].as_str().unwrap()).unwrap(); + let referrer_balance_pre = referrer_balance_response.remaining(); // Make sure they both have balance now assert_eq!(user_balance_pre, Decimal::from(20)); @@ -521,9 +513,9 @@ async fn test_referral_bonus_concurrent_referrer_only() { } // Flush all stats here - let (influx_count, mysql_count) = x.flush_stats().await.unwrap(); - assert_eq!(influx_count, 0); - assert!(mysql_count > 0); + let flush_count = x.flush_stats().await.unwrap(); + assert_eq!(flush_count.timeseries, 0); + assert!(flush_count.relational > 0); // Check that at least something was earned: let shared_referral_code: UserSharedReferralInfo = @@ -534,11 +526,9 @@ async fn test_referral_bonus_concurrent_referrer_only() { // We make sure that the referrer has $10 + 10% of the used balance // The admin provides credits for both let user_balance_response = user_get_balance(&x, &r, &user_login_response).await; - let user_balance_post = - Decimal::from_str(user_balance_response["balance"].as_str().unwrap()).unwrap(); + let user_balance_post = user_balance_response.remaining(); let referrer_balance_response = user_get_balance(&x, &r, &referrer_login_response).await; - let referrer_balance_post = - Decimal::from_str(referrer_balance_response["balance"].as_str().unwrap()).unwrap(); + let referrer_balance_post = referrer_balance_response.remaining(); info!( "Balances before and after are (user): {:?} {:?}", @@ -570,7 +560,7 @@ async fn test_referral_bonus_concurrent_referrer_only() { #[test_log::test(tokio::test)] async fn test_referral_bonus_concurrent_referrer_and_user() { info!("Starting referral bonus test"); - let x = TestApp::spawn(true).await; + let x = TestApp::spawn(31337, true).await; let r = reqwest::Client::builder() .timeout(Duration::from_secs(20)) .build() @@ -588,6 +578,13 @@ async fn test_referral_bonus_concurrent_referrer_and_user() { let user_login_response = create_user(&x, &r, &user_wallet, Some(referral_link.clone())).await; + set_user_tier(&x, referrer_login_response.user.clone(), "Premium") + .await + .unwrap(); + set_user_tier(&x, user_login_response.user.clone(), "Premium") + .await + .unwrap(); + // Bump both user's wallet to $20 admin_increase_balance( &x, @@ -608,11 +605,9 @@ async fn test_referral_bonus_concurrent_referrer_and_user() { // Get balance before for both users let user_balance_response = user_get_balance(&x, &r, &user_login_response).await; - let user_balance_pre = - Decimal::from_str(user_balance_response["balance"].as_str().unwrap()).unwrap(); + let user_balance_pre = user_balance_response.remaining(); let referrer_balance_response = user_get_balance(&x, &r, &referrer_login_response).await; - let referrer_balance_pre = - Decimal::from_str(referrer_balance_response["balance"].as_str().unwrap()).unwrap(); + let referrer_balance_pre = referrer_balance_response.remaining(); // Make sure they both have balance now assert_eq!(user_balance_pre, Decimal::from(20)); @@ -700,9 +695,9 @@ async fn test_referral_bonus_concurrent_referrer_and_user() { } // Flush all stats here - let (influx_count, mysql_count) = x.flush_stats().await.unwrap(); - assert_eq!(influx_count, 0); - assert!(mysql_count > 0); + let flush_count = x.flush_stats().await.unwrap(); + assert_eq!(flush_count.timeseries, 0); + assert!(flush_count.relational > 0); // Check that at least something was earned: let shared_referral_code: UserSharedReferralInfo = @@ -713,11 +708,9 @@ async fn test_referral_bonus_concurrent_referrer_and_user() { // We make sure that the referrer has $10 + 10% of the used balance // The admin provides credits for both let user_balance_response = user_get_balance(&x, &r, &user_login_response).await; - let user_balance_post = - Decimal::from_str(user_balance_response["balance"].as_str().unwrap()).unwrap(); + let user_balance_post = user_balance_response.remaining(); let referrer_balance_response = user_get_balance(&x, &r, &referrer_login_response).await; - let referrer_balance_post = - Decimal::from_str(referrer_balance_response["balance"].as_str().unwrap()).unwrap(); + let referrer_balance_post = referrer_balance_response.remaining(); info!( "Balances before and after are (user): {:?} {:?}",