diff --git a/Cargo.lock b/Cargo.lock index 33ba79d3..c8c817a7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -68,7 +68,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2c99f64d1e06488f620f932677e24bc6e2897582980441ae90a671415bd7ec2f" dependencies = [ "cfg-if", - "getrandom", "once_cell", "version_check", ] @@ -189,6 +188,35 @@ dependencies = [ "tokio", ] +[[package]] +name = "async-io" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fc5b45d93ef0529756f812ca52e44c221b35341892d3dcc34132ac02f3dd2af" +dependencies = [ + "async-lock", + "autocfg", + "cfg-if", + "concurrent-queue", + "futures-lite", + "log", + "parking", + "polling", + "rustix", + "slab", + "socket2", + "waker-fn", +] + +[[package]] +name = "async-lock" +version = "2.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa24f727524730b077666307f2734b4a1a1c57acb79193127dcc8914d5242dd7" +dependencies = [ + "event-listener", +] + [[package]] name = "async-stream" version = "0.3.5" @@ -589,6 +617,12 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "bytecount" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c676a478f63e9fa2dd5368a42f28bba0d6c560b775f38583c8bbaa7fcd67c9c" + [[package]] name = "byteorder" version = "1.4.3" @@ -643,6 +677,19 @@ dependencies = [ "serde", ] +[[package]] +name = "cargo_metadata" +version = "0.14.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4acbb09d9ee8e23699b9634375c72795d095bf268439da88562cf9b501f181fa" +dependencies = [ + "camino", + "cargo-platform", + "semver", + "serde", + "serde_json", +] + [[package]] name = "cargo_metadata" version = "0.15.4" @@ -868,6 +915,15 @@ dependencies = [ "tokio-util", ] +[[package]] +name = "concurrent-queue" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62ec6771ecfa0762d24683ee5a32ad78487a3d3afdc0fb8cae19d2c5deb50b7c" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "console" version = "0.14.1" @@ -1230,7 +1286,7 @@ dependencies = [ "anyhow", "hashbrown 0.14.0", "log", - "quick_cache_ttl", + "moka", "redis-rate-limiter", "tokio", ] @@ -1546,6 +1602,15 @@ dependencies = [ "libc", ] +[[package]] +name = "error-chain" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d2f06b9cac1506ece98fe3231e3cc9c4410ec3d5b1f24ae1c8946f0742cdefc" +dependencies = [ + "version_check", +] + [[package]] name = "eth-keystore" version = "0.5.0" @@ -1711,7 +1776,7 @@ checksum = "6da5fa198af0d3be20c19192df2bd9590b92ce09a8421e793bec8851270f1b05" dependencies = [ "arrayvec", "bytes", - "cargo_metadata", + "cargo_metadata 0.15.4", "chrono", "elliptic-curve 0.13.5", "ethabi", @@ -2107,6 +2172,21 @@ version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964" +[[package]] +name = "futures-lite" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49a9d51ce47660b1e808d3c990b4709f2f415d928835a17dfd16991515c46bce" +dependencies = [ + "fastrand", + "futures-core", + "futures-io", + "memchr", + "parking", + "pin-project-lite", + "waker-fn", +] + [[package]] name = "futures-locks" version = "0.7.1" @@ -2992,6 +3072,15 @@ version = "0.4.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "518ef76f2f87365916b142844c16d8fefd85039bc5699050210a7778ee1cd1de" +[[package]] +name = "mach2" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d0d1830bcd151a6fc4aea1369af235b36c1528fe976b8ff678683c9995eade8" +dependencies = [ + "libc", +] + [[package]] name = "match_cfg" version = "0.1.0" @@ -3126,6 +3215,32 @@ dependencies = [ "winapi", ] +[[package]] +name = "moka" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "36506f2f935238463605f3bb13b362f1949daafc3b347d05d60ae08836db2bd2" +dependencies = [ + "async-io", + "async-lock", + "crossbeam-channel", + "crossbeam-epoch", + "crossbeam-utils", + "futures-util", + "num_cpus", + "once_cell", + "parking_lot 0.12.1", + "quanta", + "rustc_version", + "scheduled-thread-pool", + "skeptic", + "smallvec", + "tagptr", + "thiserror", + "triomphe", + "uuid 1.3.3", +] + [[package]] name = "nanorand" version = "0.7.0" @@ -3525,7 +3640,7 @@ checksum = "bd10bab2b6df910bbe6c4987d76aa4221235103d9a9c000cfabcee6a6abc8f7a" dependencies = [ "reqwest", "serde", - "time 0.3.21", + "time 0.3.22", "url", ] @@ -3555,6 +3670,12 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "parking" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14f2252c834a40ed9bb5422029649578e63aa341ac401f74e719dd1afda8394e" + [[package]] name = "parking_lot" version = "0.11.2" @@ -3869,6 +3990,22 @@ version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "26072860ba924cbfa98ea39c8c19b4dd6a4a25423dbdf219c1eca91aa0cf6964" +[[package]] +name = "polling" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b2d323e8ca7996b3e23126511a523f7e62924d93ecd5ae73b333815b0eb3dce" +dependencies = [ + "autocfg", + "bitflags", + "cfg-if", + "concurrent-queue", + "libc", + "log", + "pin-project-lite", + "windows-sys 0.48.0", +] + [[package]] name = "ppv-lite86" version = "0.2.17" @@ -4041,25 +4178,30 @@ dependencies = [ ] [[package]] -name = "quick_cache" -version = "0.3.0" +name = "pulldown-cmark" +version = "0.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5253a3a0d56548d5b0be25414171dc780cc6870727746d05bd2bde352eee96c5" +checksum = "77a1a2f1f0a7ecff9c31abbe177637be0e97a0aef46cf8738ece09327985d998" dependencies = [ - "ahash 0.8.3", - "hashbrown 0.13.2", - "parking_lot 0.12.1", + "bitflags", + "memchr", + "unicase", ] [[package]] -name = "quick_cache_ttl" -version = "0.1.0" +name = "quanta" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a17e662a7a8291a865152364c20c7abc5e60486ab2001e8ec10b24862de0b9ab" dependencies = [ - "flume", - "log", - "quick_cache", - "serde", - "tokio", + "crossbeam-utils", + "libc", + "mach2", + "once_cell", + "raw-cpuid", + "wasi 0.11.0+wasi-snapshot-preview1", + "web-sys", + "winapi", ] [[package]] @@ -4129,6 +4271,15 @@ dependencies = [ "tokio", ] +[[package]] +name = "raw-cpuid" +version = "10.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c297679cb867470fa8c9f67dbba74a78d78e3e98d7cf2b08d6d71540f797332" +dependencies = [ + "bitflags", +] + [[package]] name = "rayon" version = "1.7.0" @@ -4627,6 +4778,15 @@ dependencies = [ "windows-sys 0.42.0", ] +[[package]] +name = "scheduled-thread-pool" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3cbc66816425a074528352f5789333ecff06ca41b36b0b0efdfbb29edc391a19" +dependencies = [ + "parking_lot 0.12.1", +] + [[package]] name = "scoped-tls" version = "1.0.1" @@ -4683,7 +4843,7 @@ dependencies = [ "serde_json", "sqlx", "thiserror", - "time 0.3.21", + "time 0.3.22", "tracing", "url", "uuid 1.3.3", @@ -4746,7 +4906,7 @@ dependencies = [ "rust_decimal", "sea-query-derive", "serde_json", - "time 0.3.21", + "time 0.3.22", "uuid 1.3.3", ] @@ -4762,7 +4922,7 @@ dependencies = [ "sea-query", "serde_json", "sqlx", - "time 0.3.21", + "time 0.3.22", "uuid 1.3.3", ] @@ -5017,7 +5177,7 @@ dependencies = [ "serde", "serde_json", "thiserror", - "time 0.3.21", + "time 0.3.22", "url", "uuid 1.3.3", ] @@ -5245,7 +5405,22 @@ dependencies = [ "rand", "sha3", "thiserror", - "time 0.3.21", + "time 0.3.22", +] + +[[package]] +name = "skeptic" +version = "0.13.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16d23b015676c90a0f01c197bfdc786c20342c73a0afdda9025adb0bc42940a8" +dependencies = [ + "bytecount", + "cargo_metadata 0.14.2", + "error-chain", + "glob", + "pulldown-cmark", + "tempfile", + "walkdir", ] [[package]] @@ -5422,7 +5597,7 @@ dependencies = [ "sqlx-rt", "stringprep", "thiserror", - "time 0.3.21", + "time 0.3.22", "tokio-stream", "url", "uuid 1.3.3", @@ -5587,6 +5762,12 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" +[[package]] +name = "tagptr" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417" + [[package]] name = "tap" version = "1.0.1" @@ -5704,9 +5885,9 @@ dependencies = [ [[package]] name = "time" -version = "0.3.21" +version = "0.3.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f3403384eaacbca9923fa06940178ac13e4edb725486d70e8e15881d0c836cc" +checksum = "ea9e1b3cf1243ae005d9e74085d4d542f3125458f3a81af210d901dcd7411efd" dependencies = [ "itoa", "serde", @@ -6158,6 +6339,12 @@ dependencies = [ "tracing-log", ] +[[package]] +name = "triomphe" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1ee9bd9239c339d714d657fac840c6d2a4f9c45f4f9ec7b0975113458be78db" + [[package]] name = "try-lock" version = "0.2.4" @@ -6261,6 +6448,15 @@ dependencies = [ "libc", ] +[[package]] +name = "unicase" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50f37be617794602aabbeee0be4f259dc1778fabe05e2d67ee8f79326d5cb4f6" +dependencies = [ + "version_check", +] + [[package]] name = "unicode-bidi" version = "0.3.13" @@ -6385,6 +6581,12 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +[[package]] +name = "waker-fn" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca" + [[package]] name = "walkdir" version = "2.3.3" @@ -6547,6 +6749,7 @@ dependencies = [ "log", "migration", "mimalloc", + "moka", "num", "num-traits", "once_cell", @@ -6556,7 +6759,6 @@ dependencies = [ "payment-contracts", "prettytable", "proctitle", - "quick_cache_ttl", "rdkafka", "redis-rate-limiter", "regex", @@ -6570,7 +6772,7 @@ dependencies = [ "siwe", "strum", "thread-fast-rng", - "time 0.3.21", + "time 0.3.22", "tokio", "tokio-console", "tokio-stream", @@ -6895,7 +7097,7 @@ dependencies = [ "hmac", "pbkdf2 0.11.0", "sha1", - "time 0.3.21", + "time 0.3.22", "zstd", ] diff --git a/Cargo.toml b/Cargo.toml index d777eb3f..a766de62 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,7 +5,6 @@ members = [ "latency", "migration", "payment-contracts", - "quick_cache_ttl", "rate-counter", "redis-rate-limiter", "thread-fast-rng", diff --git a/README.md b/README.md index 8b55df42..0f8d4bc5 100644 --- a/README.md +++ b/README.md @@ -171,6 +171,6 @@ Note: Testing with `getLatestBlockByNumber.lua` is not great because the latest Run [ethspam](https://github.com/INFURA/versus) and [versus](https://github.com/shazow/ethspam) for a more realistic load test: - ethspam --rpc http://127.0.0.1:8544 | versus --concurrency=100 --stop-after=10000 http://127.0.0.1:8544 + ethspam --rpc http://127.0.0.1:8544 | versus --concurrency=10 --stop-after=1000 http://127.0.0.1:8544 ethspam --rpc http://127.0.0.1:8544/u/$API_KEY | versus --concurrency=100 --stop-after=10000 http://127.0.0.1:8544/u/$API_KEY diff --git a/deferred-rate-limiter/Cargo.toml b/deferred-rate-limiter/Cargo.toml index c99fea76..52b17b61 100644 --- a/deferred-rate-limiter/Cargo.toml +++ b/deferred-rate-limiter/Cargo.toml @@ -5,10 +5,10 @@ authors = ["Bryan Stitt "] edition = "2021" [dependencies] -quick_cache_ttl = { path = "../quick_cache_ttl" } redis-rate-limiter = { path = "../redis-rate-limiter" } anyhow = "1.0.71" hashbrown = "0.14.0" log = "0.4.18" +moka = { version = "0.11.1", features = ["future"] } tokio = "1.28.2" diff --git a/deferred-rate-limiter/src/lib.rs b/deferred-rate-limiter/src/lib.rs index b15920ea..e20511ae 100644 --- a/deferred-rate-limiter/src/lib.rs +++ b/deferred-rate-limiter/src/lib.rs @@ -1,6 +1,6 @@ //#![warn(missing_docs)] use log::error; -use quick_cache_ttl::CacheWithTTL; +use moka::future::{Cache, CacheBuilder}; use redis_rate_limiter::{RedisRateLimitResult, RedisRateLimiter}; use std::cmp::Eq; use std::fmt::{Debug, Display}; @@ -16,7 +16,7 @@ pub struct DeferredRateLimiter where K: Send + Sync, { - local_cache: CacheWithTTL>, + local_cache: Cache>, prefix: String, rrl: RedisRateLimiter, /// if None, defers to the max on rrl @@ -46,12 +46,10 @@ where // TODO: what do these weigh? // TODO: allow skipping max_capacity // TODO: prefix instead of a static str - let local_cache = CacheWithTTL::new( - "deferred rate limiter", - cache_size, - Duration::from_secs(ttl), - ) - .await; + let local_cache = CacheBuilder::new(cache_size.try_into().unwrap()) + .time_to_live(Duration::from_secs(ttl)) + .name(&format!("DeferredRateLimiter-{}", prefix)) + .build(); Self { local_cache, @@ -91,7 +89,7 @@ where // set arc_deferred_rate_limit_result and return the count self.local_cache - .try_get_or_insert_async::(&key, async move { + .try_get_with_by_ref::<_, anyhow::Error, _>(&key, async move { // we do not use the try operator here because we want to be okay with redis errors let redis_count = match rrl .throttle_label(&redis_key, Some(max_requests_per_period), count) @@ -130,7 +128,8 @@ where Ok(Arc::new(AtomicU64::new(redis_count))) }) - .await? + .await + .map_err(|x| anyhow::anyhow!("cache error! {}", x))? }; let mut locked = deferred_rate_limit_result.lock().await; diff --git a/migration/src/m20230607_221917_total_deposits.rs b/migration/src/m20230607_221917_total_deposits.rs index 3e544c90..382f78d3 100644 --- a/migration/src/m20230607_221917_total_deposits.rs +++ b/migration/src/m20230607_221917_total_deposits.rs @@ -56,18 +56,9 @@ impl MigrationTrait for Migration { } } -/// Learn more at https://docs.rs/sea-query#iden -#[derive(Iden)] -enum User { - Table, - Id, -} - #[derive(Iden)] enum Balance { Table, - Id, - UserId, TotalSpentIncludingFreeTier, TotalSpentOutsideFreeTier, TotalDeposits, diff --git a/scripts/101-balance-referral-stats.sh b/scripts/101-balance-referral-stats.sh index 33fee68b..942d99fa 100644 --- a/scripts/101-balance-referral-stats.sh +++ b/scripts/101-balance-referral-stats.sh @@ -94,6 +94,15 @@ curl \ # Referred user makes some requests for i in {1..10000} +do + curl \ + -X POST "127.0.0.1:8544/rpc/01H2D5CAP1KF2NKRS30SGATDSD" \ + -H "Content-Type: application/json" \ + --data '{"method":"eth_blockNumber","params":[],"id":1,"jsonrpc":"2.0"}' +done + +# Let's also make simultaneous requests +for i in {1..10000} do curl \ -X POST "127.0.0.1:8544/rpc/01H2D5DN4D423VR2KFWBZE46TR" \ @@ -101,11 +110,21 @@ do --data '{"method":"eth_blockNumber","params":[],"id":1,"jsonrpc":"2.0"}' done -# Let's simultaneously also let the referrer make some requests, to make sure deadlocks do not occur -for i in {1..10000} -do - curl \ - -X POST "127.0.0.1:8544/rpc/01H2D5CAP1KF2NKRS30SGATDSD" \ - -H "Content-Type: application/json" \ - --data '{"method":"eth_blockNumber","params":[],"id":1,"jsonrpc":"2.0"}' -done + +# Get some data on the referral items +curl \ +-H "Authorization: Bearer 01H2D5DN564M4Q2T6PETEZY83Q" \ +-X GET "127.0.0.1:8544/user/referral/stats/used-codes" + +curl \ +-H "Authorization: Bearer 01H2D5CAQJF7P80222P4ZAFQ26" \ +-X GET "127.0.0.1:8544/user/referral/stats/used-codes" + + +curl \ +-H "Authorization: Bearer 01H2D5DN564M4Q2T6PETEZY83Q" \ +-X GET "127.0.0.1:8544/user/referral/stats/shared-codes" + +curl \ +-H "Authorization: Bearer 01H2D5CAQJF7P80222P4ZAFQ26" \ +-X GET "127.0.0.1:8544/user/referral/stats/shared-codes" diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index 217ad5d5..21058798 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -20,7 +20,6 @@ entities = { path = "../entities" } latency = { path = "../latency" } migration = { path = "../migration" } payment-contracts = { path = "../payment-contracts" } -quick_cache_ttl = { path = "../quick_cache_ttl" } redis-rate-limiter = { path = "../redis-rate-limiter" } thread-fast-rng = { path = "../thread-fast-rng" } @@ -68,6 +67,7 @@ itertools = "0.10.5" listenfd = "1.0.1" log = "0.4.18" mimalloc = { version = "0.1.37", optional = true} +moka = { version = "0.11.1", features = ["future"] } num = "0.4.0" num-traits = "0.2.15" once_cell = { version = "1.18.0" } @@ -87,7 +87,7 @@ serde_json = { version = "1.0.96", default-features = false, features = ["alloc" serde_prometheus = "0.2.2" siwe = "0.5.2" strum = { version = "0.24.1", features = ["derive"] } -time = "0.3.21" +time = "0.3.22" tokio = { version = "1.28.2", features = ["full"] } tokio-console = { version = "0.1.8", optional = true } tokio-stream = { version = "0.1.14", features = ["sync"] } diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index d74300e3..664c0c26 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -13,7 +13,7 @@ use crate::jsonrpc::{ }; use crate::relational_db::{get_db, get_migrated_db, DatabaseConnection, DatabaseReplica}; use crate::response_cache::{ - JsonRpcQueryCacheKey, JsonRpcResponseCache, JsonRpcResponseEnum, JsonRpcResponseWeigher, + json_rpc_response_weigher, JsonRpcQueryCacheKey, JsonRpcResponseCache, JsonRpcResponseEnum, }; use crate::rpcs::blockchain::Web3ProxyBlock; use crate::rpcs::consensus::ConsensusWeb3Rpcs; @@ -42,7 +42,8 @@ use ipnet::IpNet; use log::{error, info, trace, warn, Level}; use migration::sea_orm::prelude::Decimal; use migration::sea_orm::{EntityTrait, PaginatorTrait}; -use quick_cache_ttl::{Cache, CacheWithTTL}; +use moka::future::{Cache, CacheBuilder}; +use parking_lot::Mutex; use redis_rate_limiter::redis::AsyncCommands; use redis_rate_limiter::{redis, DeadpoolRuntime, RedisConfig, RedisPool, RedisRateLimiter}; use serde::Serialize; @@ -51,7 +52,7 @@ use serde_json::value::RawValue; use std::borrow::Cow; use std::fmt; use std::net::IpAddr; -use std::num::{NonZeroU32, NonZeroU64}; +use std::num::NonZeroU64; use std::str::FromStr; use std::sync::{atomic, Arc}; use std::time::Duration; @@ -110,8 +111,8 @@ pub struct AuthorizationChecks { } /// Cache data from the database about rpc keys -pub type RpcSecretKeyCache = Arc>; -pub type UserBalanceCache = Arc>>>; // Could also be an AtomicDecimal +pub type RpcSecretKeyCache = Cache; +pub type UserBalanceCache = Cache>>; /// The application // TODO: i'm sure this is more arcs than necessary, but spawning futures makes references hard @@ -144,7 +145,7 @@ pub struct Web3ProxyApp { pub internal_provider: Arc, /// store pending transactions that we've seen so that we don't send duplicates to subscribers /// TODO: think about this more. might be worth storing if we sent the transaction or not and using this for automatic retries - pub pending_transactions: Arc>, + pub pending_transactions: Cache, /// rate limit anonymous users pub frontend_ip_rate_limiter: Option>, /// rate limit authenticated users @@ -398,17 +399,16 @@ impl Web3ProxyApp { // if there is no database of users, there will be no keys and so this will be empty // TODO: max_capacity from config // TODO: ttl from config - let rpc_secret_key_cache = CacheWithTTL::arc_with_capacity( - "rpc_secret_key_cache", - 10_000, - Duration::from_secs(600), - ) - .await; + let rpc_secret_key_cache = CacheBuilder::new(10_000) + .name("rpc_secret_key") + .time_to_live(Duration::from_secs(600)) + .build(); // TODO: TTL left low, this could also be a solution instead of modifiying the cache, that may be disgusting across threads / slow anyways - let user_balance_cache = - CacheWithTTL::arc_with_capacity("user_balance_cache", 10_000, Duration::from_secs(600)) - .await; + let user_balance_cache = CacheBuilder::new(10_000) + .name("user_balance") + .time_to_live(Duration::from_secs(600)) + .build(); // create a channel for receiving stats // we do this in a channel so we don't slow down our response to the users @@ -502,27 +502,27 @@ impl Web3ProxyApp { // TODO: different chains might handle this differently // TODO: what should we set? 5 minutes is arbitrary. the nodes themselves hold onto transactions for much longer // TODO: this used to be time_to_update, but - let pending_transactions = CacheWithTTL::arc_with_capacity( - "pending_transactions", - 10_000, - Duration::from_secs(300), - ) - .await; + let pending_transactions = CacheBuilder::new(10_000) + .name("pending_transactions") + .time_to_live(Duration::from_secs(300)) + .build(); // responses can be very different in sizes, so this is a cache with a max capacity and a weigher // TODO: we should emit stats to calculate a more accurate expected cache size // TODO: do we actually want a TTL on this? // TODO: configurable max item weight instead of using ~0.1% // TODO: resize the cache automatically - let response_cache = JsonRpcResponseCache::new_with_weights( - "response_cache", - (top_config.app.response_cache_max_bytes / 16_384) as usize, - NonZeroU32::try_from((top_config.app.response_cache_max_bytes / 1024) as u32).unwrap(), - top_config.app.response_cache_max_bytes, - JsonRpcResponseWeigher, - Duration::from_secs(3600), - ) - .await; + let response_cache: JsonRpcResponseCache = + CacheBuilder::new(top_config.app.response_cache_max_bytes) + .weigher(json_rpc_response_weigher) + .build(); + // (top_config.app.response_cache_max_bytes / 16_384) as usize, + // NonZeroU32::try_from((top_config.app.response_cache_max_bytes / 1024) as u32).unwrap(), + // top_config.app.response_cache_max_bytes, + // JsonRpcResponseWeigher, + // Duration::from_secs(3600), + // ) + // .await; // TODO: how should we handle hitting this max? let max_users = 20_000; @@ -1160,7 +1160,7 @@ impl Web3ProxyApp { .await { Ok(response_data) => (StatusCode::OK, response_data), - Err(err) => err.into_response_parts(), + Err(err) => err.as_response_parts(), }; let response = JsonRpcForwardedResponse::from_response_data(response_data, response_id); @@ -1699,12 +1699,15 @@ impl Web3ProxyApp { let to_block_num = cache_key.to_block_num(); let cache_errors = cache_key.cache_errors(); - match self + // moka makes us do annoying things with arcs + enum CacheError { + NotCached(JsonRpcResponseEnum>), + Error(Arc), + } + + let x = self .jsonrpc_response_cache - .get_value_or_guard_async(cache_key.hash()).await - { - Ok(x) => x, - Err(x) => { + .try_get_with::<_, Mutex>(cache_key.hash(), async { let response_data = timeout( duration, self.balanced_rpcs @@ -1716,16 +1719,32 @@ impl Web3ProxyApp { to_block_num.as_ref(), ) ) - .await?; + .await + .map_err(|x| Mutex::new(CacheError::Error(Arc::new(Web3ProxyError::from(x)))))?; - let response_data: JsonRpcResponseEnum> = response_data.try_into()?; + // TODO: i think response data should be Arc>>, but that's more work + let response_data: JsonRpcResponseEnum> = response_data.try_into() + .map_err(|x| Mutex::new(CacheError::Error(Arc::new(x))))?; - if matches!(response_data, JsonRpcResponseEnum::Result { .. }) || cache_errors { - // TODO: convert the Box to an Arc? - x.insert(response_data.clone()); + // TODO: read max size from the config + if response_data.num_bytes() as u64 > self.config.response_cache_max_bytes / 1000 { + Err(Mutex::new(CacheError::NotCached(response_data))) + } else if matches!(response_data, JsonRpcResponseEnum::Result { .. }) || cache_errors { + Ok(response_data) + } else { + Err(Mutex::new(CacheError::NotCached(response_data))) } + }).await; - response_data + match x { + Ok(x) => x, + Err(arc_err) => { + let locked = arc_err.lock(); + + match &*locked { + CacheError::Error(err) => return Err(Web3ProxyError::Arc(err.clone())), + CacheError::NotCached(x) => x.clone(), + } } } } else { diff --git a/web3_proxy/src/errors.rs b/web3_proxy/src/errors.rs index 3a17bbd0..7ac37da2 100644 --- a/web3_proxy/src/errors.rs +++ b/web3_proxy/src/errors.rs @@ -22,6 +22,7 @@ use rust_decimal::Error as DecimalError; use serde::Serialize; use serde_json::value::RawValue; use std::error::Error; +use std::sync::Arc; use std::{borrow::Cow, net::IpAddr}; use tokio::{sync::AcquireError, task::JoinError, time::Instant}; @@ -42,6 +43,7 @@ pub enum Web3ProxyError { AccessDenied, #[error(ignore)] Anyhow(anyhow::Error), + Arc(Arc), #[error(ignore)] #[from(ignore)] BadRequest(Cow<'static, str>), @@ -154,7 +156,7 @@ pub enum Web3ProxyError { } impl Web3ProxyError { - pub fn into_response_parts(self) -> (StatusCode, JsonRpcResponseEnum) { + pub fn as_response_parts(&self) -> (StatusCode, JsonRpcResponseEnum) { // TODO: include a unique request id in the data let (code, err): (StatusCode, JsonRpcErrorData) = match self { Self::Abi(err) => { @@ -192,6 +194,10 @@ impl Web3ProxyError { }, ) } + Self::Arc(err) => { + // recurse + return err.as_response_parts::(); + } Self::BadRequest(err) => { debug!("BAD_REQUEST: {}", err); ( @@ -533,7 +539,10 @@ impl Web3ProxyError { }, ) } - Self::JsonRpcErrorData(jsonrpc_error_data) => (StatusCode::OK, jsonrpc_error_data), + Self::JsonRpcErrorData(jsonrpc_error_data) => { + // TODO: do this without clone? the Arc needed it though + (StatusCode::OK, jsonrpc_error_data.clone()) + } Self::MsgPackEncode(err) => { warn!("MsgPackEncode Error: {}", err); ( @@ -764,7 +773,7 @@ impl Web3ProxyError { ) } Self::RefererRequired => { - warn!("referer required"); + debug!("referer required"); ( StatusCode::BAD_REQUEST, JsonRpcErrorData { @@ -775,7 +784,7 @@ impl Web3ProxyError { ) } Self::RefererNotAllowed(referer) => { - warn!("referer not allowed referer={:?}", referer); + debug!("referer not allowed referer={:?}", referer); ( StatusCode::FORBIDDEN, JsonRpcErrorData { @@ -829,9 +838,9 @@ impl Web3ProxyError { } ( - status_code, + *status_code, JsonRpcErrorData { - message: Cow::Owned(err_msg), + message: err_msg.to_owned().into(), code: code.into(), data: None, }, @@ -840,7 +849,7 @@ impl Web3ProxyError { Self::Timeout(x) => ( StatusCode::REQUEST_TIMEOUT, JsonRpcErrorData { - message: Cow::Owned(format!("request timed out: {:?}", x)), + message: format!("request timed out: {:?}", x).into(), code: StatusCode::REQUEST_TIMEOUT.as_u16().into(), // TODO: include the actual id! data: None, @@ -851,7 +860,7 @@ impl Web3ProxyError { ( StatusCode::BAD_REQUEST, JsonRpcErrorData { - message: Cow::Owned(err.to_string()), + message: err.to_string().into(), code: StatusCode::BAD_REQUEST.as_u16().into(), data: None, }, @@ -862,7 +871,7 @@ impl Web3ProxyError { ( StatusCode::BAD_REQUEST, JsonRpcErrorData { - message: Cow::Owned(format!("{}", err)), + message: format!("{}", err).into(), code: StatusCode::BAD_REQUEST.as_u16().into(), data: None, }, @@ -873,7 +882,7 @@ impl Web3ProxyError { ( StatusCode::BAD_GATEWAY, JsonRpcErrorData { - message: Cow::Borrowed("no servers synced. unknown eth_blockNumber"), + message: "no servers synced. unknown eth_blockNumber".into(), code: StatusCode::BAD_GATEWAY.as_u16().into(), data: None, }, @@ -889,7 +898,7 @@ impl Web3ProxyError { }, ), Self::UserAgentRequired => { - warn!("UserAgentRequired"); + debug!("UserAgentRequired"); ( StatusCode::BAD_REQUEST, JsonRpcErrorData { @@ -900,7 +909,7 @@ impl Web3ProxyError { ) } Self::UserAgentNotAllowed(ua) => { - warn!("UserAgentNotAllowed ua={}", ua); + debug!("UserAgentNotAllowed ua={}", ua); ( StatusCode::FORBIDDEN, JsonRpcErrorData { @@ -960,9 +969,7 @@ impl Web3ProxyError { ( StatusCode::BAD_REQUEST, JsonRpcErrorData { - message: Cow::Borrowed( - "redirect_public_url not set. only websockets work here", - ), + message: "redirect_public_url not set. only websockets work here".into(), code: StatusCode::BAD_REQUEST.as_u16().into(), data: None, }, @@ -971,14 +978,14 @@ impl Web3ProxyError { Self::WithContext(err, msg) => match err { Some(err) => { warn!("{:#?} w/ context {}", err, msg); - return err.into_response_parts(); + return err.as_response_parts(); } None => { warn!("error w/ context {}", msg); ( StatusCode::INTERNAL_SERVER_ERROR, JsonRpcErrorData { - message: Cow::Owned(msg), + message: msg.to_owned().into(), code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), data: None, }, @@ -991,7 +998,7 @@ impl Web3ProxyError { } pub fn into_response_with_id(self, id: Box) -> Response { - let (status_code, response_data) = self.into_response_parts(); + let (status_code, response_data) = self.as_response_parts(); let response = JsonRpcForwardedResponse::from_response_data(response_data, id); diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index 63798948..ecdf4bed 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -30,7 +30,6 @@ use rdkafka::producer::{FutureProducer, FutureRecord}; use rdkafka::util::Timeout as KafkaTimeout; use redis_rate_limiter::redis::AsyncCommands; use redis_rate_limiter::RedisRateLimitResult; -use std::convert::Infallible; use std::fmt::Display; use std::hash::{Hash, Hasher}; use std::mem; @@ -921,13 +920,12 @@ impl Web3ProxyApp { if let Some(max_concurrent_requests) = self.config.public_max_concurrent_requests { let semaphore = self .ip_semaphores - .get_or_insert_async::(ip, async move { + .get_with_by_ref(ip, async { // TODO: set max_concurrent_requests dynamically based on load? let s = Semaphore::new(max_concurrent_requests); - Ok(Arc::new(s)) + Arc::new(s) }) - .await - .expect("infallible"); + .await; let semaphore_permit = semaphore.acquire_owned().await?; @@ -951,12 +949,11 @@ impl Web3ProxyApp { let semaphore = self .user_semaphores - .get_or_insert_async::(&user_id, async move { + .get_with_by_ref(&user_id, async move { let s = Semaphore::new(max_concurrent_requests as usize); - Ok(Arc::new(s)) + Arc::new(s) }) - .await - .expect("infallible"); + .await; let semaphore_permit = semaphore.acquire_owned().await?; @@ -980,12 +977,11 @@ impl Web3ProxyApp { // limit concurrent requests let semaphore = self .bearer_token_semaphores - .get_or_insert_async::(&user_bearer_token, async move { + .get_with_by_ref(&user_bearer_token, async move { let s = Semaphore::new(self.config.bearer_token_max_concurrent_requests as usize); - Ok(Arc::new(s)) + Arc::new(s) }) - .await - .expect("infallible"); + .await; let semaphore_permit = semaphore.acquire_owned().await?; @@ -1139,25 +1135,25 @@ impl Web3ProxyApp { ) -> Web3ProxyResult>> { match NonZeroU64::try_from(user_id) { Err(_) => Ok(Arc::new(RwLock::new(Decimal::default()))), - Ok(x) => { - self.user_balance_cache - .try_get_or_insert_async(&x, async move { - let db_replica = self - .db_replica() - .web3_context("Getting database connection")?; + Ok(x) => self + .user_balance_cache + .try_get_with_by_ref(&x, async move { + let db_replica = self + .db_replica() + .web3_context("Getting database connection")?; - let balance: Decimal = match balance::Entity::find() - .filter(balance::Column::UserId.eq(user_id)) - .one(db_replica.as_ref()) - .await? - { - Some(x) => x.total_deposits - x.total_spent_outside_free_tier, - None => Decimal::default(), - }; - Ok(Arc::new(RwLock::new(balance))) - }) - .await - } + let balance: Decimal = match balance::Entity::find() + .filter(balance::Column::UserId.eq(user_id)) + .one(db_replica.as_ref()) + .await? + { + Some(x) => x.total_deposits - x.total_spent_outside_free_tier, + None => Decimal::default(), + }; + Ok(Arc::new(RwLock::new(balance))) + }) + .await + .map_err(Into::into), } } @@ -1168,7 +1164,7 @@ impl Web3ProxyApp { rpc_secret_key: RpcSecretKey, ) -> Web3ProxyResult { self.rpc_secret_key_cache - .try_get_or_insert_async(&rpc_secret_key, async move { + .try_get_with_by_ref(&rpc_secret_key, async move { // trace!(?rpc_secret_key, "user cache miss"); let db_replica = self @@ -1303,6 +1299,7 @@ impl Web3ProxyApp { } }) .await + .map_err(Into::into) } /// Authorized the ip/origin/referer/useragent and rate limit and concurrency diff --git a/web3_proxy/src/frontend/mod.rs b/web3_proxy/src/frontend/mod.rs index 952dc11c..9528a189 100644 --- a/web3_proxy/src/frontend/mod.rs +++ b/web3_proxy/src/frontend/mod.rs @@ -18,7 +18,7 @@ use axum::{ use http::{header::AUTHORIZATION, StatusCode}; use listenfd::ListenFd; use log::{debug, info}; -use quick_cache_ttl::UnitWeighter; +use moka::future::{Cache, CacheBuilder}; use std::net::SocketAddr; use std::sync::Arc; use std::{iter::once, time::Duration}; @@ -37,12 +37,7 @@ pub enum ResponseCacheKey { Status, } -pub type ResponseCache = quick_cache_ttl::CacheWithTTL< - ResponseCacheKey, - (StatusCode, &'static str, axum::body::Bytes), - UnitWeighter, - quick_cache_ttl::DefaultHashBuilder, ->; +pub type ResponseCache = Cache; /// Start the frontend server. pub async fn serve( @@ -58,12 +53,10 @@ pub async fn serve( debug!("response_cache size: {}", response_cache_size); - let response_cache = ResponseCache::new( - "response_cache", - response_cache_size, - Duration::from_secs(1), - ) - .await; + let response_cache: ResponseCache = CacheBuilder::new(response_cache_size as u64) + .name("frontend_response") + .time_to_live(Duration::from_secs(1)) + .build(); // TODO: read config for if fastest/versus should be available publicly. default off diff --git a/web3_proxy/src/frontend/rpc_proxy_ws.rs b/web3_proxy/src/frontend/rpc_proxy_ws.rs index f3f12869..e597a9ab 100644 --- a/web3_proxy/src/frontend/rpc_proxy_ws.rs +++ b/web3_proxy/src/frontend/rpc_proxy_ws.rs @@ -330,7 +330,7 @@ async fn handle_socket_payload( let (authorization, semaphore) = match authorization.check_again(&app).await { Ok((a, s)) => (a, s), Err(err) => { - let (_, err) = err.into_response_parts(); + let (_, err) = err.as_response_parts(); let err = JsonRpcForwardedResponse::from_response_data(err, Default::default()); @@ -437,7 +437,7 @@ async fn handle_socket_payload( let response_str = match response { Ok(x) => serde_json::to_string(&x).expect("to_string should always work here"), Err(err) => { - let (_, response_data) = err.into_response_parts(); + let (_, response_data) = err.as_response_parts(); let response = JsonRpcForwardedResponse::from_response_data(response_data, response_id); diff --git a/web3_proxy/src/frontend/status.rs b/web3_proxy/src/frontend/status.rs index 46d4af15..f189301a 100644 --- a/web3_proxy/src/frontend/status.rs +++ b/web3_proxy/src/frontend/status.rs @@ -13,7 +13,9 @@ use axum::{ }; use axum_macros::debug_handler; use log::trace; +use moka::future::Cache; use once_cell::sync::Lazy; +use serde::{ser::SerializeStruct, Serialize}; use serde_json::json; use std::sync::Arc; @@ -33,7 +35,7 @@ pub async fn health( Extension(cache): Extension>, ) -> impl IntoResponse { let (code, content_type, body) = cache - .get_or_insert_async(&ResponseCacheKey::Health, async move { _health(app).await }) + .get_with(ResponseCacheKey::Health, async move { _health(app).await }) .await; Response::builder() @@ -66,7 +68,7 @@ pub async fn backups_needed( Extension(cache): Extension>, ) -> impl IntoResponse { let (code, content_type, body) = cache - .get_or_insert_async(&ResponseCacheKey::BackupsNeeded, async move { + .get_with(ResponseCacheKey::BackupsNeeded, async move { _backups_needed(app).await }) .await; @@ -117,7 +119,7 @@ pub async fn status( Extension(cache): Extension>, ) -> impl IntoResponse { let (code, content_type, body) = cache - .get_or_insert_async(&ResponseCacheKey::Status, async move { _status(app).await }) + .get_with(ResponseCacheKey::Status, async move { _status(app).await }) .await; Response::builder() @@ -139,10 +141,10 @@ async fn _status(app: Arc) -> (StatusCode, &'static str, Bytes) { "bundler_4337_rpcs": app.bundler_4337_rpcs, "chain_id": app.config.chain_id, "hostname": app.hostname, - "jsonrpc_response_cache": app.jsonrpc_response_cache, + "jsonrpc_response_cache": MokaCacheSerializer(&app.jsonrpc_response_cache), "private_rpcs": app.private_rpcs, - "rpc_secret_key_cache": app.rpc_secret_key_cache, - "user_balance_cache": app.user_balance_cache, + "rpc_secret_key_cache": MokaCacheSerializer(&app.rpc_secret_key_cache), + "user_balance_cache": MokaCacheSerializer(&app.user_balance_cache), "version": APP_USER_AGENT, }); @@ -158,3 +160,20 @@ async fn _status(app: Arc) -> (StatusCode, &'static str, Bytes) { (code, CONTENT_TYPE_JSON, body) } + +pub struct MokaCacheSerializer<'a, K, V>(pub &'a Cache); + +impl<'a, K, V> Serialize for MokaCacheSerializer<'a, K, V> { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + let mut state = serializer.serialize_struct("MokaCache", 3)?; + + state.serialize_field("entry_count", &self.0.entry_count())?; + state.serialize_field("name", &self.0.name())?; + state.serialize_field("weighted_size", &self.0.weighted_size())?; + + state.end() + } +} diff --git a/web3_proxy/src/frontend/users/authentication.rs b/web3_proxy/src/frontend/users/authentication.rs index cb51fb67..44cfde21 100644 --- a/web3_proxy/src/frontend/users/authentication.rs +++ b/web3_proxy/src/frontend/users/authentication.rs @@ -18,7 +18,7 @@ use entities::{balance, login, pending_login, referee, referrer, rpc_key, user}; use ethers::{prelude::Address, types::Bytes}; use hashbrown::HashMap; use http::StatusCode; -use log::{debug, warn}; +use log::{debug, warn, trace}; use migration::sea_orm::prelude::{Decimal, Uuid}; use migration::sea_orm::{ self, ActiveModelTrait, ColumnTrait, EntityTrait, IntoActiveModel, QueryFilter, @@ -314,7 +314,7 @@ pub async fn user_login_post( debug!("Refferal code is: {:?}", payload.referral_code); if let Some(referral_code) = payload.referral_code.as_ref() { // If it is not inside, also check in the database - warn!("Using register referral code: {:?}", referral_code); + trace!("Using register referral code: {:?}", referral_code); let user_referrer = referrer::Entity::find() .filter(referrer::Column::ReferralCode.eq(referral_code)) .one(db_replica.as_ref()) @@ -345,7 +345,7 @@ pub async fn user_login_post( // First, optionally catch a referral code from the parameters if there is any if let Some(referral_code) = payload.referral_code.as_ref() { // If it is not inside, also check in the database - warn!("Using referral code: {:?}", referral_code); + trace!("Using referral code: {:?}", referral_code); let user_referrer = referrer::Entity::find() .filter(referrer::Column::ReferralCode.eq(referral_code)) .one(db_replica.as_ref()) diff --git a/web3_proxy/src/frontend/users/payment.rs b/web3_proxy/src/frontend/users/payment.rs index 4ae27296..60bb7e2d 100644 --- a/web3_proxy/src/frontend/users/payment.rs +++ b/web3_proxy/src/frontend/users/payment.rs @@ -181,7 +181,7 @@ pub async fn user_balance_post( // .topics[0] // { // debug!("Bloom input bytes is: {:?}", x); - // debug!("Bloom input bytes is: {:?}", x..as_fixed_bytes()); + // debug!("Bloom input bytes is: {:?}", x.as_fixed_bytes()); // debug!("Bloom input as hex is: {:?}", hex!(x)); // let bloom_input = BloomInput::Raw(hex!(x)); // debug!( @@ -312,13 +312,14 @@ pub async fn user_balance_post( match NonZeroU64::try_from(recipient.id) { Err(_) => {} Ok(x) => { - app.user_balance_cache.remove(&x); + app.user_balance_cache.invalidate(&x).await; } }; for rpc_key_entity in rpc_keys { app.rpc_secret_key_cache - .remove(&rpc_key_entity.secret_key.into()); + .invalidate(&rpc_key_entity.secret_key.into()) + .await; } let x = json!({ diff --git a/web3_proxy/src/frontend/users/subuser.rs b/web3_proxy/src/frontend/users/subuser.rs index 2cea1621..35352518 100644 --- a/web3_proxy/src/frontend/users/subuser.rs +++ b/web3_proxy/src/frontend/users/subuser.rs @@ -15,9 +15,8 @@ use entities::{balance, rpc_key, secondary_user, user}; use ethers::types::Address; use hashbrown::HashMap; use http::StatusCode; -use log::{trace, warn}; +use log::trace; use migration::sea_orm; -use migration::sea_orm::prelude::Decimal; use migration::sea_orm::ActiveModelTrait; use migration::sea_orm::ColumnTrait; use migration::sea_orm::EntityTrait; @@ -146,7 +145,7 @@ pub async fn get_subusers( .all(db_replica.as_ref()) .await?; - warn!("Subusers are: {:?}", subusers); + trace!("Subusers are: {}", json!(subusers)); // Now return the list let response_json = json!({ diff --git a/web3_proxy/src/response_cache.rs b/web3_proxy/src/response_cache.rs index 562e7de4..f9541672 100644 --- a/web3_proxy/src/response_cache.rs +++ b/web3_proxy/src/response_cache.rs @@ -2,12 +2,11 @@ use crate::{errors::Web3ProxyError, jsonrpc::JsonRpcErrorData, rpcs::blockchain: use derive_more::From; use ethers::{providers::ProviderError, types::U64}; use hashbrown::hash_map::DefaultHashBuilder; -use quick_cache_ttl::{CacheWithTTL, Weighter}; +use moka::future::Cache; use serde_json::value::RawValue; use std::{ borrow::Cow, hash::{BuildHasher, Hash, Hasher}, - num::NonZeroU32, sync::Arc, }; @@ -82,28 +81,24 @@ impl JsonRpcQueryCacheKey { } } -pub type JsonRpcResponseCache = - CacheWithTTL>, JsonRpcResponseWeigher>; - -#[derive(Clone)] -pub struct JsonRpcResponseWeigher; +pub type JsonRpcResponseCache = Cache>>; /// TODO: we might need one that holds RawValue and one that holds serde_json::Value #[derive(Clone, Debug)] pub enum JsonRpcResponseEnum { Result { value: R, - num_bytes: NonZeroU32, + num_bytes: u32, }, RpcError { error_data: JsonRpcErrorData, - num_bytes: NonZeroU32, + num_bytes: u32, }, } // TODO: impl for other inner result types? impl JsonRpcResponseEnum { - pub fn num_bytes(&self) -> NonZeroU32 { + pub fn num_bytes(&self) -> u32 { match self { Self::Result { num_bytes, .. } => *num_bytes, Self::RpcError { num_bytes, .. } => *num_bytes, @@ -123,7 +118,7 @@ impl From> for JsonRpcResponseEnum> { fn from(value: Arc) -> Self { let num_bytes = value.get().len(); - let num_bytes = NonZeroU32::try_from(num_bytes as u32).unwrap(); + let num_bytes = num_bytes as u32; Self::Result { value, num_bytes } } @@ -133,7 +128,7 @@ impl From> for JsonRpcResponseEnum> { fn from(value: Box) -> Self { let num_bytes = value.get().len(); - let num_bytes = NonZeroU32::try_from(num_bytes as u32).unwrap(); + let num_bytes = num_bytes as u32; let value = value.into(); @@ -190,7 +185,7 @@ impl From for JsonRpcResponseEnum { // TODO: wrap the error in a complete response? let num_bytes = serde_json::to_string(&value).unwrap().len(); - let num_bytes = NonZeroU32::try_from(num_bytes as u32).unwrap(); + let num_bytes = num_bytes as u32; Self::RpcError { error_data: value, @@ -235,67 +230,75 @@ impl TryFrom for JsonRpcErrorData { } } -// TODO: instead of Arc, be generic -impl Weighter>> for JsonRpcResponseWeigher { - fn weight(&self, _key: &K, _qey: &Q, value: &JsonRpcResponseEnum>) -> NonZeroU32 { - value.num_bytes() - } +pub fn json_rpc_response_weigher(_key: &K, value: &JsonRpcResponseEnum) -> u32 { + value.num_bytes() } #[cfg(test)] mod tests { - use super::{JsonRpcResponseEnum, JsonRpcResponseWeigher}; - use quick_cache_ttl::CacheWithTTL; + use super::JsonRpcResponseEnum; + use crate::response_cache::json_rpc_response_weigher; use serde_json::value::RawValue; - use std::{num::NonZeroU32, sync::Arc, time::Duration}; + use std::sync::Arc; #[tokio::test(start_paused = true)] async fn test_json_rpc_query_weigher() { let max_item_weight = 200; let weight_capacity = 1_000; - let test_cache: CacheWithTTL< - u32, - JsonRpcResponseEnum>, - JsonRpcResponseWeigher, - > = CacheWithTTL::new_with_weights( - "test", - 5, - max_item_weight.try_into().unwrap(), - weight_capacity, - JsonRpcResponseWeigher, - Duration::from_secs(2), - ) - .await; + // let test_cache: Cache>> = + // CacheBuilder::new(weight_capacity) + // .weigher(json_rpc_response_weigher) + // .time_to_live(Duration::from_secs(2)) + // .build(); - let small_data = JsonRpcResponseEnum::Result { + let small_data: JsonRpcResponseEnum> = JsonRpcResponseEnum::Result { value: Box::::default().into(), - num_bytes: NonZeroU32::try_from(max_item_weight / 2).unwrap(), + num_bytes: max_item_weight / 2, }; - let max_sized_data = JsonRpcResponseEnum::Result { + assert_eq!( + json_rpc_response_weigher(&(), &small_data), + max_item_weight / 2 + ); + + let max_sized_data: JsonRpcResponseEnum> = JsonRpcResponseEnum::Result { value: Box::::default().into(), - num_bytes: NonZeroU32::try_from(max_item_weight).unwrap(), + num_bytes: max_item_weight, }; - let oversized_data = JsonRpcResponseEnum::Result { + assert_eq!( + json_rpc_response_weigher(&(), &max_sized_data), + max_item_weight + ); + + let oversized_data: JsonRpcResponseEnum> = JsonRpcResponseEnum::Result { value: Box::::default().into(), - num_bytes: NonZeroU32::try_from(max_item_weight * 2).unwrap(), + num_bytes: max_item_weight * 2, }; - test_cache.try_insert(0, small_data).unwrap(); + assert_eq!( + json_rpc_response_weigher(&(), &oversized_data), + max_item_weight * 2 + ); + + // TODO: helper for inserts that does size checking + /* + test_cache.insert(0, small_data).await; test_cache.get(&0).unwrap(); - test_cache.try_insert(1, max_sized_data).unwrap(); + test_cache.insert(1, max_sized_data).await; test_cache.get(&0).unwrap(); test_cache.get(&1).unwrap(); - test_cache.try_insert(2, oversized_data).unwrap_err(); + // TODO: this will currently work! need to wrap moka cache in a checked insert + test_cache.insert(2, oversized_data).await; test_cache.get(&0).unwrap(); test_cache.get(&1).unwrap(); assert!(test_cache.get(&2).is_none()); + */ } } diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index 4d4375db..f1fe2e80 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -9,7 +9,7 @@ use crate::frontend::authorization::Authorization; use derive_more::From; use ethers::prelude::{Block, TxHash, H256, U64}; use log::{debug, trace, warn}; -use quick_cache_ttl::CacheWithTTL; +use moka::future::Cache; use serde::ser::SerializeStruct; use serde::Serialize; use serde_json::json; @@ -20,8 +20,8 @@ use tokio::sync::broadcast; // TODO: type for Hydrated Blocks with their full transactions? pub type ArcBlock = Arc>; -pub type BlocksByHashCache = Arc>; -pub type BlocksByNumberCache = Arc>; +pub type BlocksByHashCache = Cache; +pub type BlocksByNumberCache = Cache; /// A block and its age. #[derive(Clone, Debug, Default, From)] @@ -172,8 +172,6 @@ impl Web3Rpcs { return Ok(block); } - let block_num = block.number(); - // this block is very likely already in block_hashes // TODO: use their get_with let block_hash = *block.hash(); @@ -182,15 +180,17 @@ impl Web3Rpcs { if heaviest_chain { // this is the only place that writes to block_numbers // multiple inserts should be okay though - // TODO: info that there was a fork? + // TODO: info if there was a fork? + let block_num = block.number(); + self.blocks_by_number - .get_or_insert_async(block_num, async move { block_hash }) + .get_with_by_ref(block_num, async move { block_hash }) .await; } let block = self .blocks_by_hash - .get_or_insert_async(&block_hash, async move { block }) + .get_with_by_ref(&block_hash, async move { block }) .await; Ok(block) @@ -468,7 +468,7 @@ impl Web3Rpcs { // hash changed debug!( - "unc {}/{} {}{}/{}/{} con_head={} old={} rpc={}@{}", + "unc {}/{} {}{}/{}/{} con={} old={} rpc={}@{}", consensus_tier, total_tiers, backups_voted_str, diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index ed7073d9..4d54613f 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -8,11 +8,10 @@ use ethers::prelude::{H256, U64}; use hashbrown::{HashMap, HashSet}; use itertools::{Itertools, MinMaxResult}; use log::{trace, warn}; -use quick_cache_ttl::Cache; +use moka::future::Cache; use serde::Serialize; use std::cmp::{Ordering, Reverse}; use std::collections::BTreeMap; -use std::convert::Infallible; use std::fmt; use std::sync::Arc; use tokio::time::Instant; @@ -378,9 +377,8 @@ impl ConsensusFinder { async fn insert(&mut self, rpc: Arc, block: Web3ProxyBlock) -> Option { let first_seen = self .first_seen - .get_or_insert_async::(block.hash(), async { Ok(Instant::now()) }) - .await - .expect("this cache get is infallible"); + .get_with_by_ref(block.hash(), async { Instant::now() }) + .await; // calculate elapsed time before trying to lock let latency = first_seen.elapsed(); diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index a2369173..d6e8c030 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -8,6 +8,7 @@ use crate::config::{BlockAndRpc, TxHashAndRpc, Web3RpcConfig}; use crate::errors::{Web3ProxyError, Web3ProxyResult}; use crate::frontend::authorization::{Authorization, RequestMetadata}; use crate::frontend::rpc_proxy_ws::ProxyMode; +use crate::frontend::status::MokaCacheSerializer; use crate::jsonrpc::{JsonRpcErrorData, JsonRpcParams, JsonRpcResultData}; use crate::rpcs::transactions::TxStatus; use arc_swap::ArcSwap; @@ -21,8 +22,8 @@ use hashbrown::{HashMap, HashSet}; use itertools::Itertools; use log::{debug, error, info, trace, warn}; use migration::sea_orm::DatabaseConnection; +use moka::future::{Cache, CacheBuilder}; use ordered_float::OrderedFloat; -use quick_cache_ttl::CacheWithTTL; use serde::ser::{SerializeStruct, Serializer}; use serde::Serialize; use serde_json::json; @@ -53,7 +54,7 @@ pub struct Web3Rpcs { /// this head receiver makes it easy to wait until there is a new block pub(super) watch_consensus_head_sender: Option>>, /// keep track of transactions that we have sent through subscriptions - pub(super) pending_transaction_cache: Arc>, + pub(super) pending_transaction_cache: Cache, pub(super) pending_tx_id_receiver: flume::Receiver, pub(super) pending_tx_id_sender: flume::Sender, /// TODO: this map is going to grow forever unless we do some sort of pruning. maybe store pruned in redis? @@ -81,7 +82,7 @@ impl Web3Rpcs { min_head_rpcs: usize, min_sum_soft_limit: u32, name: String, - pending_transaction_cache: Arc>, + pending_transaction_cache: Cache, pending_tx_sender: Option>, watch_consensus_head_sender: Option>>, ) -> anyhow::Result<( @@ -96,16 +97,18 @@ impl Web3Rpcs { // these blocks don't have full transactions, but they do have rather variable amounts of transaction hashes // TODO: actual weighter on this // TODO: time_to_idle instead? - let blocks_by_hash: BlocksByHashCache = Arc::new( - CacheWithTTL::new("blocks_by_hash", 1_000, Duration::from_secs(30 * 60)).await, - ); + let blocks_by_hash: BlocksByHashCache = CacheBuilder::new(1_000) + .name("blocks_by_hash") + .time_to_idle(Duration::from_secs(30 * 60)) + .build(); // all block numbers are the same size, so no need for weigher // TODO: limits from config // TODO: time_to_idle instead? - let blocks_by_number = Arc::new( - CacheWithTTL::new("blocks_by_number", 1_000, Duration::from_secs(30 * 60)).await, - ); + let blocks_by_number = CacheBuilder::new(1_000) + .name("blocks_by_number") + .time_to_idle(Duration::from_secs(30 * 60)) + .build(); let (watch_consensus_rpcs_sender, consensus_connections_watcher) = watch::channel(Default::default()); @@ -1254,9 +1257,15 @@ impl Serialize for Web3Rpcs { } } - state.serialize_field("blocks_by_hash", &self.blocks_by_hash)?; - state.serialize_field("blocks_by_number", &self.blocks_by_number)?; - state.serialize_field("pending_transaction_cache", &self.pending_transaction_cache)?; + state.serialize_field("blocks_by_hash", &MokaCacheSerializer(&self.blocks_by_hash))?; + state.serialize_field( + "blocks_by_number", + &MokaCacheSerializer(&self.blocks_by_number), + )?; + state.serialize_field( + "pending_transaction_cache", + &MokaCacheSerializer(&self.pending_transaction_cache), + )?; state.serialize_field("block_sender_len", &self.block_sender.len())?; @@ -1308,6 +1317,7 @@ mod tests { use ethers::types::{Block, U256}; use latency::PeakEwmaLatency; use log::{trace, LevelFilter}; + use moka::future::CacheBuilder; use parking_lot::RwLock; use tokio::sync::RwLock as AsyncRwLock; @@ -1488,26 +1498,17 @@ mod tests { name: "test".to_string(), watch_consensus_head_sender: Some(watch_consensus_head_sender), watch_consensus_rpcs_sender, - pending_transaction_cache: CacheWithTTL::arc_with_capacity( - "pending_transaction_cache", - 100, - Duration::from_secs(60), - ) - .await, + pending_transaction_cache: CacheBuilder::new(100) + .time_to_live(Duration::from_secs(60)) + .build(), pending_tx_id_receiver, pending_tx_id_sender, - blocks_by_hash: CacheWithTTL::arc_with_capacity( - "blocks_by_hash", - 100, - Duration::from_secs(60), - ) - .await, - blocks_by_number: CacheWithTTL::arc_with_capacity( - "blocks_by_number", - 100, - Duration::from_secs(60), - ) - .await, + blocks_by_hash: CacheBuilder::new(100) + .time_to_live(Duration::from_secs(60)) + .build(), + blocks_by_number: CacheBuilder::new(100) + .time_to_live(Duration::from_secs(60)) + .build(), // TODO: test max_block_age? max_block_age: None, // TODO: test max_block_lag? @@ -1777,26 +1778,17 @@ mod tests { name: "test".to_string(), watch_consensus_head_sender: Some(watch_consensus_head_sender), watch_consensus_rpcs_sender, - pending_transaction_cache: CacheWithTTL::arc_with_capacity( - "pending_transaction_cache", - 100, - Duration::from_secs(120), - ) - .await, + pending_transaction_cache: CacheBuilder::new(100) + .time_to_live(Duration::from_secs(120)) + .build(), pending_tx_id_receiver, pending_tx_id_sender, - blocks_by_hash: CacheWithTTL::arc_with_capacity( - "blocks_by_hash", - 100, - Duration::from_secs(120), - ) - .await, - blocks_by_number: CacheWithTTL::arc_with_capacity( - "blocks_by_number", - 100, - Duration::from_secs(120), - ) - .await, + blocks_by_hash: CacheBuilder::new(100) + .time_to_live(Duration::from_secs(120)) + .build(), + blocks_by_number: CacheBuilder::new(100) + .time_to_live(Duration::from_secs(120)) + .build(), min_head_rpcs: 1, min_sum_soft_limit: 4_000, max_block_age: None, @@ -1972,26 +1964,11 @@ mod tests { name: "test".to_string(), watch_consensus_head_sender: Some(watch_consensus_head_sender), watch_consensus_rpcs_sender, - pending_transaction_cache: CacheWithTTL::arc_with_capacity( - "pending_transaction_cache", - 10_000, - Duration::from_secs(120), - ) - .await, + pending_transaction_cache: Cache::new(10_000), pending_tx_id_receiver, pending_tx_id_sender, - blocks_by_hash: CacheWithTTL::arc_with_capacity( - "blocks_by_hash", - 10_000, - Duration::from_secs(120), - ) - .await, - blocks_by_number: CacheWithTTL::arc_with_capacity( - "blocks_by_number", - 10_000, - Duration::from_secs(120), - ) - .await, + blocks_by_hash: Cache::new(10_000), + blocks_by_number: Cache::new(10_000), min_head_rpcs: 1, min_sum_soft_limit: 1_000, max_block_age: None, diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index 909c35a4..fcd8cdab 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -416,7 +416,7 @@ impl Web3Rpc { // if we already have this block saved, set new_head_block to that arc. otherwise store this copy let new_head_block = block_map - .get_or_insert_async(&new_hash, async move { new_head_block }) + .get_with_by_ref(&new_hash, async move { new_head_block }) .await; // save the block so we don't send the same one multiple times diff --git a/web3_proxy/src/stats/influxdb_queries.rs b/web3_proxy/src/stats/influxdb_queries.rs index e9fda7d2..fe3d966b 100644 --- a/web3_proxy/src/stats/influxdb_queries.rs +++ b/web3_proxy/src/stats/influxdb_queries.rs @@ -20,10 +20,8 @@ use fstrings::{f, format_args_f}; use hashbrown::HashMap; use influxdb2::api::query::FluxRecord; use influxdb2::models::Query; -use log::{error, info, warn}; -use migration::sea_orm::ColumnTrait; -use migration::sea_orm::EntityTrait; -use migration::sea_orm::QueryFilter; +use log::{debug, error, trace, warn}; +use migration::sea_orm::{ColumnTrait, EntityTrait, QueryFilter}; use serde_json::json; use ulid::Ulid; @@ -146,7 +144,7 @@ pub async fn query_user_stats<'a>( .clone() .context("No influxdb bucket was provided")?; // "web3_proxy"; - info!("Bucket is {:?}", bucket); + trace!("Bucket is {:?}", bucket); let mut filter_chain_id = "".to_string(); if chain_id != 0 { filter_chain_id = f!(r#"|> filter(fn: (r) => r["chain_id"] == "{chain_id}")"#); @@ -154,14 +152,15 @@ pub async fn query_user_stats<'a>( // Fetch and request for balance - info!( + trace!( "Query start and stop are: {:?} {:?}", - query_start, query_stop + query_start, + query_stop ); // info!("Query column parameters are: {:?}", stats_column); - info!("Query measurement is: {:?}", measurement); - info!("Filters are: {:?}", filter_chain_id); // filter_field - info!("window seconds are: {:?}", query_window_seconds); + trace!("Query measurement is: {:?}", measurement); + trace!("Filters are: {:?}", filter_chain_id); // filter_field + trace!("window seconds are: {:?}", query_window_seconds); let drop_method = match stat_response_type { StatType::Aggregated => f!(r#"|> drop(columns: ["method"])"#), @@ -190,9 +189,9 @@ pub async fn query_user_stats<'a>( |> sort(columns: ["_time", "_measurement", "archive_needed", "chain_id", "error_response", "method", "rpc_secret_key_id"], desc: true) "#); - info!("Raw query to db is: {:?}", query); + debug!("Raw query to db is: {:#?}", query); let query = Query::new(query.to_string()); - info!("Query to db is: {:?}", query); + trace!("Query to db is: {:?}", query); // Make the query and collect all data let raw_influx_responses: Vec = influxdb_client diff --git a/web3_proxy/src/stats/mod.rs b/web3_proxy/src/stats/mod.rs index 5d3cbf63..6409aead 100644 --- a/web3_proxy/src/stats/mod.rs +++ b/web3_proxy/src/stats/mod.rs @@ -17,7 +17,7 @@ use derive_more::From; use entities::sea_orm_active_enums::TrackingLevel; use entities::{balance, referee, referrer, rpc_accounting_v2, rpc_key}; use influxdb2::models::DataPoint; -use log::trace; +use log::{debug, trace}; use migration::sea_orm::prelude::Decimal; use migration::sea_orm::{ self, ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter, TransactionTrait, @@ -184,12 +184,14 @@ impl RpcQueryStats { } } +#[derive(Debug, Default)] struct Deltas { - balance_used_outside_free_tier: Decimal, - balance_used_including_free_tier: Decimal, - sender_bonus_applied: bool, - referrer_deposit_delta: Decimal, - sender_bonus_balance_deposited: Decimal, + balance_spent_including_free_credits: Decimal, + balance_spent_excluding_free_credits: Decimal, + apply_usage_bonus_to_request_sender: bool, + usage_bonus_to_request_sender_through_referral: Decimal, + + bonus_to_referrer: Decimal, } /// A stat that we aggregate and then store in a database. @@ -375,29 +377,42 @@ impl BufferedRpcQueryStats { referral_objects: Option<(referee::Model, referrer::Model)>, ) -> Web3ProxyResult<(Deltas, Option<(referee::Model, referrer::Model)>)> { // Calculate Balance Only - let mut deltas = Deltas { - balance_used_outside_free_tier: Default::default(), - balance_used_including_free_tier: Default::default(), - sender_bonus_applied: false, - referrer_deposit_delta: Default::default(), - sender_bonus_balance_deposited: Default::default(), - }; + let mut deltas = Deltas::default(); // Calculate a bunch using referrals as well if let Some((referral_entity, referrer_code_entity)) = referral_objects { - deltas.sender_bonus_applied = referral_entity.credits_applied_for_referee; + deltas.apply_usage_bonus_to_request_sender = + referral_entity.credits_applied_for_referee; // Calculate if we are above the usage threshold, and apply a bonus // Optimally we would read this from the balance, but if we do it like this, we only have to lock a single table (much safer w.r.t. deadlocks) // referral_entity.credits_applied_for_referrer * (Decimal::from(10) checks (atomically using this table only), whether the user has brought in >$100 to the referer // In this case, the sender receives $100 as a bonus / gift + // Apply a 10$ bonus onto the user, if the user has spent 100$ + debug!( + "Were credits applied so far? {:?} {:?}", + referral_entity.credits_applied_for_referee, + !referral_entity.credits_applied_for_referee + ); + debug!( + "Credits applied for referrer so far? {:?}", + referral_entity.credits_applied_for_referrer + ); + debug!("Sum credits used? {:?}", self.sum_credits_used); + debug!( + "Hello: {:?}", + (referral_entity.credits_applied_for_referrer * (Decimal::from(10)) + + self.sum_credits_used) + >= Decimal::from(100) + ); if !referral_entity.credits_applied_for_referee && (referral_entity.credits_applied_for_referrer * (Decimal::from(10)) + self.sum_credits_used) >= Decimal::from(100) { - deltas.sender_bonus_balance_deposited += Decimal::from(10); - deltas.sender_bonus_applied = true; + debug!("Adding sender bonus balance"); + deltas.usage_bonus_to_request_sender_through_referral = Decimal::from(10); + deltas.apply_usage_bonus_to_request_sender = true; } // Calculate how much the referrer should get, limited to the last 12 months @@ -407,24 +422,40 @@ impl BufferedRpcQueryStats { + Months::new(12); if now <= valid_until { - deltas.referrer_deposit_delta += self.sum_credits_used / Decimal::new(10, 0); + deltas.bonus_to_referrer += self.sum_credits_used / Decimal::new(10, 0); } - return Ok((deltas, Some((referral_entity, referrer_code_entity)))); - } + // Duplicate code, I should fix this later ... + let user_balance = sender_balance.total_deposits + - sender_balance.total_spent_outside_free_tier + + deltas.usage_bonus_to_request_sender_through_referral; - let user_balance = (sender_balance.total_deposits + deltas.sender_bonus_balance_deposited - - sender_balance.total_spent_outside_free_tier); - // Split up the component of into how much of the paid component was used, and how much of the free component was used (anything after "balance") - if user_balance >= Decimal::from(0) { - deltas.balance_used_outside_free_tier = self.sum_credits_used; + // Split up the component of into how much of the paid component was used, and how much of the free component was used (anything after "balance") + if user_balance - self.sum_credits_used >= Decimal::from(0) { + deltas.balance_spent_including_free_credits = self.sum_credits_used; + deltas.balance_spent_excluding_free_credits = self.sum_credits_used; + } else { + deltas.balance_spent_including_free_credits = user_balance; + deltas.balance_spent_excluding_free_credits = self.sum_credits_used; + } + + Ok((deltas, Some((referral_entity, referrer_code_entity)))) } else { - deltas.balance_used_outside_free_tier = - user_balance + deltas.sender_bonus_balance_deposited; - deltas.balance_used_including_free_tier = self.sum_credits_used; - } + let user_balance = sender_balance.total_deposits + - sender_balance.total_spent_outside_free_tier + + deltas.usage_bonus_to_request_sender_through_referral; - Ok((deltas, None)) + // Split up the component of into how much of the paid component was used, and how much of the free component was used (anything after "balance") + if user_balance - self.sum_credits_used >= Decimal::from(0) { + deltas.balance_spent_including_free_credits = self.sum_credits_used; + deltas.balance_spent_excluding_free_credits = self.sum_credits_used; + } else { + deltas.balance_spent_including_free_credits = user_balance; + deltas.balance_spent_excluding_free_credits = self.sum_credits_used; + } + + Ok((deltas, None)) + } } /// Save all referral-based objects in the database @@ -435,33 +466,42 @@ impl BufferedRpcQueryStats { sender_rpc_entity: &rpc_key::Model, referral_objects: &Option<(referee::Model, referrer::Model)>, ) -> Web3ProxyResult<()> { - // Do the user updates + // Do the sender balance updates let user_balance = balance::ActiveModel { id: sea_orm::NotSet, - total_deposits: sea_orm::Set(deltas.sender_bonus_balance_deposited), - total_spent_including_free_tier: sea_orm::Set(deltas.balance_used_including_free_tier), - total_spent_outside_free_tier: sea_orm::Set(deltas.balance_used_outside_free_tier), + total_deposits: sea_orm::Set(deltas.usage_bonus_to_request_sender_through_referral), + total_spent_including_free_tier: sea_orm::Set( + deltas.balance_spent_including_free_credits, + ), + total_spent_outside_free_tier: sea_orm::Set( + deltas.balance_spent_excluding_free_credits, + ), user_id: sea_orm::Set(sender_rpc_entity.user_id), }; + // In any case, add to the balance + debug!( + "Delta is: {:?} from credits used {:?}", + deltas, self.sum_credits_used + ); let _ = balance::Entity::insert(user_balance) .on_conflict( OnConflict::new() .values([ - ( - balance::Column::TotalDeposits, - Expr::col(balance::Column::TotalDeposits) - .add(deltas.sender_bonus_balance_deposited), - ), ( balance::Column::TotalSpentIncludingFreeTier, Expr::col(balance::Column::TotalSpentIncludingFreeTier) - .add(deltas.balance_used_including_free_tier), + .add(deltas.balance_spent_including_free_credits), ), ( balance::Column::TotalSpentOutsideFreeTier, Expr::col(balance::Column::TotalSpentOutsideFreeTier) - .add(deltas.balance_used_outside_free_tier), + .add(deltas.balance_spent_excluding_free_credits), + ), + ( + balance::Column::TotalDeposits, + Expr::col(balance::Column::TotalDeposits) + .add(deltas.usage_bonus_to_request_sender_through_referral), ), ]) .to_owned(), @@ -471,57 +511,78 @@ impl BufferedRpcQueryStats { // Do the referrer_entry updates if let Some((referral_entity, referrer_code_entity)) = referral_objects { - if deltas.referrer_deposit_delta > Decimal::from(0) { - let referee_entry = referee::ActiveModel { - id: sea_orm::Unchanged(referral_entity.id), - referral_start_date: sea_orm::Unchanged(referral_entity.referral_start_date), - used_referral_code: sea_orm::Unchanged(referral_entity.used_referral_code), - user_id: sea_orm::Unchanged(referral_entity.user_id), + debug!("Positive referrer deposit delta"); + let referee_entry = referee::ActiveModel { + id: sea_orm::Unchanged(referral_entity.id), + credits_applied_for_referee: sea_orm::Set( + deltas.apply_usage_bonus_to_request_sender, + ), + credits_applied_for_referrer: sea_orm::Set(deltas.bonus_to_referrer), - credits_applied_for_referee: sea_orm::Set(deltas.sender_bonus_applied), - credits_applied_for_referrer: sea_orm::Set(deltas.referrer_deposit_delta), - }; - referee::Entity::insert(referee_entry) + referral_start_date: sea_orm::Unchanged(referral_entity.referral_start_date), + used_referral_code: sea_orm::Unchanged(referral_entity.used_referral_code), + user_id: sea_orm::Unchanged(referral_entity.user_id), + }; + + // If there was a referral, first of all check if credits should be applied to the sender itself (once he spent 100$) + // If these two values are not equal, that means that we have not applied the bonus just yet. + // In that case, we can apply the bonus just now. + if referral_entity.credits_applied_for_referee + != deltas.apply_usage_bonus_to_request_sender + { + referee::Entity::insert(referee_entry.clone()) .on_conflict( OnConflict::new() - .values([ - ( - referee::Column::CreditsAppliedForReferee, - // Make it a "Set" - Expr::col(referee::Column::CreditsAppliedForReferee) - .eq(deltas.sender_bonus_applied), - ), - ( - referee::Column::CreditsAppliedForReferrer, - Expr::col(referee::Column::CreditsAppliedForReferrer) - .add(deltas.referrer_deposit_delta), - ), - ]) + .values([( + // TODO Make it a "Set", add is hacky (but works ..) + referee::Column::CreditsAppliedForReferee, + Expr::col(referee::Column::CreditsAppliedForReferee) + .add(deltas.apply_usage_bonus_to_request_sender), + )]) .to_owned(), ) .exec(txn) .await?; - let user_balance = balance::ActiveModel { - id: sea_orm::NotSet, - total_deposits: sea_orm::Set(deltas.referrer_deposit_delta), - user_id: sea_orm::Set(referral_entity.user_id), - ..Default::default() - }; + // Also add a bonus to the sender (But this should already have been done with the above code!!) + } - let _ = balance::Entity::insert(user_balance) + // If the bonus to the referrer is non-empty, also apply that + if deltas.bonus_to_referrer > Decimal::from(0) { + referee::Entity::insert(referee_entry) .on_conflict( OnConflict::new() .values([( - balance::Column::TotalDeposits, - Expr::col(balance::Column::TotalDeposits) - .add(deltas.referrer_deposit_delta), + // TODO Make it a "Set", add is hacky (but works ..) + referee::Column::CreditsAppliedForReferrer, + Expr::col(referee::Column::CreditsAppliedForReferrer) + .add(deltas.bonus_to_referrer), )]) .to_owned(), ) .exec(txn) .await?; } + + // Finally, add to the balance of the referrer + let user_balance = balance::ActiveModel { + id: sea_orm::NotSet, + total_deposits: sea_orm::Set(deltas.bonus_to_referrer), + user_id: sea_orm::Set(referrer_code_entity.user_id), + ..Default::default() + }; + + let _ = balance::Entity::insert(user_balance) + .on_conflict( + OnConflict::new() + .values([( + balance::Column::TotalDeposits, + Expr::col(balance::Column::TotalDeposits).add(deltas.bonus_to_referrer), + )]) + .to_owned(), + ) + .exec(txn) + .await?; }; Ok(()) } @@ -560,8 +621,8 @@ impl BufferedRpcQueryStats { let balance_before = *latest_balance; // Now modify the balance // TODO: Double check this (perhaps while testing...) - *latest_balance = *latest_balance - deltas.balance_used_outside_free_tier - + deltas.sender_bonus_balance_deposited; + *latest_balance = *latest_balance - deltas.balance_spent_including_free_credits + + deltas.usage_bonus_to_request_sender_through_referral; if *latest_balance < Decimal::from(0) { *latest_balance = Decimal::from(0); } @@ -576,11 +637,13 @@ impl BufferedRpcQueryStats { for rpc_key_entity in rpc_keys { // TODO: Not sure which one was inserted, just delete both ... - rpc_secret_key_cache.remove(&rpc_key_entity.secret_key.into()); + rpc_secret_key_cache + .invalidate(&rpc_key_entity.secret_key.into()) + .await; } if let Ok(non_zero_user_id) = NonZeroU64::try_from(sender_rpc_entity.user_id) { - user_balance_cache.remove(&non_zero_user_id); + user_balance_cache.invalidate(&non_zero_user_id).await; } } @@ -599,7 +662,7 @@ impl BufferedRpcQueryStats { // // In principle, do not remove the cache for the referrer; the next reload will trigger premium // // We don't touch the RPC keys at this stage for the refferer, a payment must be paid to reset those (we want to keep things simple here) // // Anyways, the RPC keys will be updated in 5 min (600 seconds) - // user_balance_cache.remove(&referrer_user_id); + // user_balance_cache.invalidate(&referrer_user_id).await; // } // }; @@ -821,7 +884,7 @@ impl RpcQueryStats { method: Option<&str>, ) -> Decimal { // for now, always return 0 for cost - 0.into() + Decimal::new(0, 1) /* // some methods should be free. there might be cases where method isn't set (though they should be uncommon) diff --git a/web3_proxy/src/stats/stat_buffer.rs b/web3_proxy/src/stats/stat_buffer.rs index 5cd7d2f8..71412f49 100644 --- a/web3_proxy/src/stats/stat_buffer.rs +++ b/web3_proxy/src/stats/stat_buffer.rs @@ -136,14 +136,14 @@ impl StatBuffer { } } _ = db_save_interval.tick() => { - // info!("DB save internal tick"); + trace!("DB save internal tick"); let count = self.save_relational_stats().await; if count > 0 { trace!("Saved {} stats to the relational db", count); } } _ = tsdb_save_interval.tick() => { - // info!("TSDB save internal tick"); + trace!("TSDB save internal tick"); let count = self.save_tsdb_stats(&bucket).await; if count > 0 { trace!("Saved {} stats to the tsdb", count);