# Conflicts:
#	scripts/101-balance-referral-stats.sh
#	web3_proxy/src/frontend/users/payment.rs
This commit is contained in:
yenicelik 2023-06-08 22:22:43 +02:00
commit 591d34e090
25 changed files with 684 additions and 399 deletions

258
Cargo.lock generated

@ -68,7 +68,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2c99f64d1e06488f620f932677e24bc6e2897582980441ae90a671415bd7ec2f"
dependencies = [
"cfg-if",
"getrandom",
"once_cell",
"version_check",
]
@ -189,6 +188,35 @@ dependencies = [
"tokio",
]
[[package]]
name = "async-io"
version = "1.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fc5b45d93ef0529756f812ca52e44c221b35341892d3dcc34132ac02f3dd2af"
dependencies = [
"async-lock",
"autocfg",
"cfg-if",
"concurrent-queue",
"futures-lite",
"log",
"parking",
"polling",
"rustix",
"slab",
"socket2",
"waker-fn",
]
[[package]]
name = "async-lock"
version = "2.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fa24f727524730b077666307f2734b4a1a1c57acb79193127dcc8914d5242dd7"
dependencies = [
"event-listener",
]
[[package]]
name = "async-stream"
version = "0.3.5"
@ -589,6 +617,12 @@ dependencies = [
"syn 1.0.109",
]
[[package]]
name = "bytecount"
version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2c676a478f63e9fa2dd5368a42f28bba0d6c560b775f38583c8bbaa7fcd67c9c"
[[package]]
name = "byteorder"
version = "1.4.3"
@ -643,6 +677,19 @@ dependencies = [
"serde",
]
[[package]]
name = "cargo_metadata"
version = "0.14.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4acbb09d9ee8e23699b9634375c72795d095bf268439da88562cf9b501f181fa"
dependencies = [
"camino",
"cargo-platform",
"semver",
"serde",
"serde_json",
]
[[package]]
name = "cargo_metadata"
version = "0.15.4"
@ -868,6 +915,15 @@ dependencies = [
"tokio-util",
]
[[package]]
name = "concurrent-queue"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "62ec6771ecfa0762d24683ee5a32ad78487a3d3afdc0fb8cae19d2c5deb50b7c"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "console"
version = "0.14.1"
@ -1230,7 +1286,7 @@ dependencies = [
"anyhow",
"hashbrown 0.14.0",
"log",
"quick_cache_ttl",
"moka",
"redis-rate-limiter",
"tokio",
]
@ -1546,6 +1602,15 @@ dependencies = [
"libc",
]
[[package]]
name = "error-chain"
version = "0.12.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2d2f06b9cac1506ece98fe3231e3cc9c4410ec3d5b1f24ae1c8946f0742cdefc"
dependencies = [
"version_check",
]
[[package]]
name = "eth-keystore"
version = "0.5.0"
@ -1711,7 +1776,7 @@ checksum = "6da5fa198af0d3be20c19192df2bd9590b92ce09a8421e793bec8851270f1b05"
dependencies = [
"arrayvec",
"bytes",
"cargo_metadata",
"cargo_metadata 0.15.4",
"chrono",
"elliptic-curve 0.13.5",
"ethabi",
@ -2107,6 +2172,21 @@ version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964"
[[package]]
name = "futures-lite"
version = "1.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49a9d51ce47660b1e808d3c990b4709f2f415d928835a17dfd16991515c46bce"
dependencies = [
"fastrand",
"futures-core",
"futures-io",
"memchr",
"parking",
"pin-project-lite",
"waker-fn",
]
[[package]]
name = "futures-locks"
version = "0.7.1"
@ -2992,6 +3072,15 @@ version = "0.4.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "518ef76f2f87365916b142844c16d8fefd85039bc5699050210a7778ee1cd1de"
[[package]]
name = "mach2"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d0d1830bcd151a6fc4aea1369af235b36c1528fe976b8ff678683c9995eade8"
dependencies = [
"libc",
]
[[package]]
name = "match_cfg"
version = "0.1.0"
@ -3126,6 +3215,32 @@ dependencies = [
"winapi",
]
[[package]]
name = "moka"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "36506f2f935238463605f3bb13b362f1949daafc3b347d05d60ae08836db2bd2"
dependencies = [
"async-io",
"async-lock",
"crossbeam-channel",
"crossbeam-epoch",
"crossbeam-utils",
"futures-util",
"num_cpus",
"once_cell",
"parking_lot 0.12.1",
"quanta",
"rustc_version",
"scheduled-thread-pool",
"skeptic",
"smallvec",
"tagptr",
"thiserror",
"triomphe",
"uuid 1.3.3",
]
[[package]]
name = "nanorand"
version = "0.7.0"
@ -3525,7 +3640,7 @@ checksum = "bd10bab2b6df910bbe6c4987d76aa4221235103d9a9c000cfabcee6a6abc8f7a"
dependencies = [
"reqwest",
"serde",
"time 0.3.21",
"time 0.3.22",
"url",
]
@ -3555,6 +3670,12 @@ dependencies = [
"syn 1.0.109",
]
[[package]]
name = "parking"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "14f2252c834a40ed9bb5422029649578e63aa341ac401f74e719dd1afda8394e"
[[package]]
name = "parking_lot"
version = "0.11.2"
@ -3869,6 +3990,22 @@ version = "0.3.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26072860ba924cbfa98ea39c8c19b4dd6a4a25423dbdf219c1eca91aa0cf6964"
[[package]]
name = "polling"
version = "2.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4b2d323e8ca7996b3e23126511a523f7e62924d93ecd5ae73b333815b0eb3dce"
dependencies = [
"autocfg",
"bitflags",
"cfg-if",
"concurrent-queue",
"libc",
"log",
"pin-project-lite",
"windows-sys 0.48.0",
]
[[package]]
name = "ppv-lite86"
version = "0.2.17"
@ -4041,25 +4178,30 @@ dependencies = [
]
[[package]]
name = "quick_cache"
version = "0.3.0"
name = "pulldown-cmark"
version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5253a3a0d56548d5b0be25414171dc780cc6870727746d05bd2bde352eee96c5"
checksum = "77a1a2f1f0a7ecff9c31abbe177637be0e97a0aef46cf8738ece09327985d998"
dependencies = [
"ahash 0.8.3",
"hashbrown 0.13.2",
"parking_lot 0.12.1",
"bitflags",
"memchr",
"unicase",
]
[[package]]
name = "quick_cache_ttl"
version = "0.1.0"
name = "quanta"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a17e662a7a8291a865152364c20c7abc5e60486ab2001e8ec10b24862de0b9ab"
dependencies = [
"flume",
"log",
"quick_cache",
"serde",
"tokio",
"crossbeam-utils",
"libc",
"mach2",
"once_cell",
"raw-cpuid",
"wasi 0.11.0+wasi-snapshot-preview1",
"web-sys",
"winapi",
]
[[package]]
@ -4129,6 +4271,15 @@ dependencies = [
"tokio",
]
[[package]]
name = "raw-cpuid"
version = "10.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c297679cb867470fa8c9f67dbba74a78d78e3e98d7cf2b08d6d71540f797332"
dependencies = [
"bitflags",
]
[[package]]
name = "rayon"
version = "1.7.0"
@ -4627,6 +4778,15 @@ dependencies = [
"windows-sys 0.42.0",
]
[[package]]
name = "scheduled-thread-pool"
version = "0.2.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3cbc66816425a074528352f5789333ecff06ca41b36b0b0efdfbb29edc391a19"
dependencies = [
"parking_lot 0.12.1",
]
[[package]]
name = "scoped-tls"
version = "1.0.1"
@ -4683,7 +4843,7 @@ dependencies = [
"serde_json",
"sqlx",
"thiserror",
"time 0.3.21",
"time 0.3.22",
"tracing",
"url",
"uuid 1.3.3",
@ -4746,7 +4906,7 @@ dependencies = [
"rust_decimal",
"sea-query-derive",
"serde_json",
"time 0.3.21",
"time 0.3.22",
"uuid 1.3.3",
]
@ -4762,7 +4922,7 @@ dependencies = [
"sea-query",
"serde_json",
"sqlx",
"time 0.3.21",
"time 0.3.22",
"uuid 1.3.3",
]
@ -5017,7 +5177,7 @@ dependencies = [
"serde",
"serde_json",
"thiserror",
"time 0.3.21",
"time 0.3.22",
"url",
"uuid 1.3.3",
]
@ -5245,7 +5405,22 @@ dependencies = [
"rand",
"sha3",
"thiserror",
"time 0.3.21",
"time 0.3.22",
]
[[package]]
name = "skeptic"
version = "0.13.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "16d23b015676c90a0f01c197bfdc786c20342c73a0afdda9025adb0bc42940a8"
dependencies = [
"bytecount",
"cargo_metadata 0.14.2",
"error-chain",
"glob",
"pulldown-cmark",
"tempfile",
"walkdir",
]
[[package]]
@ -5422,7 +5597,7 @@ dependencies = [
"sqlx-rt",
"stringprep",
"thiserror",
"time 0.3.21",
"time 0.3.22",
"tokio-stream",
"url",
"uuid 1.3.3",
@ -5587,6 +5762,12 @@ version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160"
[[package]]
name = "tagptr"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417"
[[package]]
name = "tap"
version = "1.0.1"
@ -5704,9 +5885,9 @@ dependencies = [
[[package]]
name = "time"
version = "0.3.21"
version = "0.3.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f3403384eaacbca9923fa06940178ac13e4edb725486d70e8e15881d0c836cc"
checksum = "ea9e1b3cf1243ae005d9e74085d4d542f3125458f3a81af210d901dcd7411efd"
dependencies = [
"itoa",
"serde",
@ -6158,6 +6339,12 @@ dependencies = [
"tracing-log",
]
[[package]]
name = "triomphe"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f1ee9bd9239c339d714d657fac840c6d2a4f9c45f4f9ec7b0975113458be78db"
[[package]]
name = "try-lock"
version = "0.2.4"
@ -6261,6 +6448,15 @@ dependencies = [
"libc",
]
[[package]]
name = "unicase"
version = "2.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "50f37be617794602aabbeee0be4f259dc1778fabe05e2d67ee8f79326d5cb4f6"
dependencies = [
"version_check",
]
[[package]]
name = "unicode-bidi"
version = "0.3.13"
@ -6385,6 +6581,12 @@ version = "0.9.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f"
[[package]]
name = "waker-fn"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca"
[[package]]
name = "walkdir"
version = "2.3.3"
@ -6547,6 +6749,7 @@ dependencies = [
"log",
"migration",
"mimalloc",
"moka",
"num",
"num-traits",
"once_cell",
@ -6556,7 +6759,6 @@ dependencies = [
"payment-contracts",
"prettytable",
"proctitle",
"quick_cache_ttl",
"rdkafka",
"redis-rate-limiter",
"regex",
@ -6570,7 +6772,7 @@ dependencies = [
"siwe",
"strum",
"thread-fast-rng",
"time 0.3.21",
"time 0.3.22",
"tokio",
"tokio-console",
"tokio-stream",
@ -6895,7 +7097,7 @@ dependencies = [
"hmac",
"pbkdf2 0.11.0",
"sha1",
"time 0.3.21",
"time 0.3.22",
"zstd",
]

@ -5,7 +5,6 @@ members = [
"latency",
"migration",
"payment-contracts",
"quick_cache_ttl",
"rate-counter",
"redis-rate-limiter",
"thread-fast-rng",

@ -171,6 +171,6 @@ Note: Testing with `getLatestBlockByNumber.lua` is not great because the latest
Run [ethspam](https://github.com/INFURA/versus) and [versus](https://github.com/shazow/ethspam) for a more realistic load test:
ethspam --rpc http://127.0.0.1:8544 | versus --concurrency=100 --stop-after=10000 http://127.0.0.1:8544
ethspam --rpc http://127.0.0.1:8544 | versus --concurrency=10 --stop-after=1000 http://127.0.0.1:8544
ethspam --rpc http://127.0.0.1:8544/u/$API_KEY | versus --concurrency=100 --stop-after=10000 http://127.0.0.1:8544/u/$API_KEY

@ -5,10 +5,10 @@ authors = ["Bryan Stitt <bryan@stitthappens.com>"]
edition = "2021"
[dependencies]
quick_cache_ttl = { path = "../quick_cache_ttl" }
redis-rate-limiter = { path = "../redis-rate-limiter" }
anyhow = "1.0.71"
hashbrown = "0.14.0"
log = "0.4.18"
moka = { version = "0.11.1", features = ["future"] }
tokio = "1.28.2"

@ -1,6 +1,6 @@
//#![warn(missing_docs)]
use log::error;
use quick_cache_ttl::CacheWithTTL;
use moka::future::{Cache, CacheBuilder};
use redis_rate_limiter::{RedisRateLimitResult, RedisRateLimiter};
use std::cmp::Eq;
use std::fmt::{Debug, Display};
@ -16,7 +16,7 @@ pub struct DeferredRateLimiter<K>
where
K: Send + Sync,
{
local_cache: CacheWithTTL<K, Arc<AtomicU64>>,
local_cache: Cache<K, Arc<AtomicU64>>,
prefix: String,
rrl: RedisRateLimiter,
/// if None, defers to the max on rrl
@ -46,12 +46,10 @@ where
// TODO: what do these weigh?
// TODO: allow skipping max_capacity
// TODO: prefix instead of a static str
let local_cache = CacheWithTTL::new(
"deferred rate limiter",
cache_size,
Duration::from_secs(ttl),
)
.await;
let local_cache = CacheBuilder::new(cache_size.try_into().unwrap())
.time_to_live(Duration::from_secs(ttl))
.name(&format!("DeferredRateLimiter-{}", prefix))
.build();
Self {
local_cache,
@ -91,7 +89,7 @@ where
// set arc_deferred_rate_limit_result and return the count
self.local_cache
.try_get_or_insert_async::<anyhow::Error, _>(&key, async move {
.try_get_with_by_ref::<_, anyhow::Error, _>(&key, async move {
// we do not use the try operator here because we want to be okay with redis errors
let redis_count = match rrl
.throttle_label(&redis_key, Some(max_requests_per_period), count)
@ -130,7 +128,8 @@ where
Ok(Arc::new(AtomicU64::new(redis_count)))
})
.await?
.await
.map_err(|x| anyhow::anyhow!("cache error! {}", x))?
};
let mut locked = deferred_rate_limit_result.lock().await;

@ -56,18 +56,9 @@ impl MigrationTrait for Migration {
}
}
/// Learn more at https://docs.rs/sea-query#iden
#[derive(Iden)]
enum User {
Table,
Id,
}
#[derive(Iden)]
enum Balance {
Table,
Id,
UserId,
TotalSpentIncludingFreeTier,
TotalSpentOutsideFreeTier,
TotalDeposits,

@ -94,6 +94,15 @@ curl \
# Referred user makes some requests
for i in {1..10000}
do
curl \
-X POST "127.0.0.1:8544/rpc/01H2D5CAP1KF2NKRS30SGATDSD" \
-H "Content-Type: application/json" \
--data '{"method":"eth_blockNumber","params":[],"id":1,"jsonrpc":"2.0"}'
done
# Let's also make simultaneous requests
for i in {1..10000}
do
curl \
-X POST "127.0.0.1:8544/rpc/01H2D5DN4D423VR2KFWBZE46TR" \
@ -101,11 +110,21 @@ do
--data '{"method":"eth_blockNumber","params":[],"id":1,"jsonrpc":"2.0"}'
done
# Let's simultaneously also let the referrer make some requests, to make sure deadlocks do not occur
for i in {1..10000}
do
curl \
-X POST "127.0.0.1:8544/rpc/01H2D5CAP1KF2NKRS30SGATDSD" \
-H "Content-Type: application/json" \
--data '{"method":"eth_blockNumber","params":[],"id":1,"jsonrpc":"2.0"}'
done
# Get some data on the referral items
curl \
-H "Authorization: Bearer 01H2D5DN564M4Q2T6PETEZY83Q" \
-X GET "127.0.0.1:8544/user/referral/stats/used-codes"
curl \
-H "Authorization: Bearer 01H2D5CAQJF7P80222P4ZAFQ26" \
-X GET "127.0.0.1:8544/user/referral/stats/used-codes"
curl \
-H "Authorization: Bearer 01H2D5DN564M4Q2T6PETEZY83Q" \
-X GET "127.0.0.1:8544/user/referral/stats/shared-codes"
curl \
-H "Authorization: Bearer 01H2D5CAQJF7P80222P4ZAFQ26" \
-X GET "127.0.0.1:8544/user/referral/stats/shared-codes"

@ -20,7 +20,6 @@ entities = { path = "../entities" }
latency = { path = "../latency" }
migration = { path = "../migration" }
payment-contracts = { path = "../payment-contracts" }
quick_cache_ttl = { path = "../quick_cache_ttl" }
redis-rate-limiter = { path = "../redis-rate-limiter" }
thread-fast-rng = { path = "../thread-fast-rng" }
@ -68,6 +67,7 @@ itertools = "0.10.5"
listenfd = "1.0.1"
log = "0.4.18"
mimalloc = { version = "0.1.37", optional = true}
moka = { version = "0.11.1", features = ["future"] }
num = "0.4.0"
num-traits = "0.2.15"
once_cell = { version = "1.18.0" }
@ -87,7 +87,7 @@ serde_json = { version = "1.0.96", default-features = false, features = ["alloc"
serde_prometheus = "0.2.2"
siwe = "0.5.2"
strum = { version = "0.24.1", features = ["derive"] }
time = "0.3.21"
time = "0.3.22"
tokio = { version = "1.28.2", features = ["full"] }
tokio-console = { version = "0.1.8", optional = true }
tokio-stream = { version = "0.1.14", features = ["sync"] }

@ -13,7 +13,7 @@ use crate::jsonrpc::{
};
use crate::relational_db::{get_db, get_migrated_db, DatabaseConnection, DatabaseReplica};
use crate::response_cache::{
JsonRpcQueryCacheKey, JsonRpcResponseCache, JsonRpcResponseEnum, JsonRpcResponseWeigher,
json_rpc_response_weigher, JsonRpcQueryCacheKey, JsonRpcResponseCache, JsonRpcResponseEnum,
};
use crate::rpcs::blockchain::Web3ProxyBlock;
use crate::rpcs::consensus::ConsensusWeb3Rpcs;
@ -42,7 +42,8 @@ use ipnet::IpNet;
use log::{error, info, trace, warn, Level};
use migration::sea_orm::prelude::Decimal;
use migration::sea_orm::{EntityTrait, PaginatorTrait};
use quick_cache_ttl::{Cache, CacheWithTTL};
use moka::future::{Cache, CacheBuilder};
use parking_lot::Mutex;
use redis_rate_limiter::redis::AsyncCommands;
use redis_rate_limiter::{redis, DeadpoolRuntime, RedisConfig, RedisPool, RedisRateLimiter};
use serde::Serialize;
@ -51,7 +52,7 @@ use serde_json::value::RawValue;
use std::borrow::Cow;
use std::fmt;
use std::net::IpAddr;
use std::num::{NonZeroU32, NonZeroU64};
use std::num::NonZeroU64;
use std::str::FromStr;
use std::sync::{atomic, Arc};
use std::time::Duration;
@ -110,8 +111,8 @@ pub struct AuthorizationChecks {
}
/// Cache data from the database about rpc keys
pub type RpcSecretKeyCache = Arc<CacheWithTTL<RpcSecretKey, AuthorizationChecks>>;
pub type UserBalanceCache = Arc<CacheWithTTL<NonZeroU64, Arc<RwLock<Decimal>>>>; // Could also be an AtomicDecimal
pub type RpcSecretKeyCache = Cache<RpcSecretKey, AuthorizationChecks>;
pub type UserBalanceCache = Cache<NonZeroU64, Arc<RwLock<Decimal>>>;
/// The application
// TODO: i'm sure this is more arcs than necessary, but spawning futures makes references hard
@ -144,7 +145,7 @@ pub struct Web3ProxyApp {
pub internal_provider: Arc<EthersHttpProvider>,
/// store pending transactions that we've seen so that we don't send duplicates to subscribers
/// TODO: think about this more. might be worth storing if we sent the transaction or not and using this for automatic retries
pub pending_transactions: Arc<CacheWithTTL<TxHash, TxStatus>>,
pub pending_transactions: Cache<TxHash, TxStatus>,
/// rate limit anonymous users
pub frontend_ip_rate_limiter: Option<DeferredRateLimiter<IpAddr>>,
/// rate limit authenticated users
@ -398,17 +399,16 @@ impl Web3ProxyApp {
// if there is no database of users, there will be no keys and so this will be empty
// TODO: max_capacity from config
// TODO: ttl from config
let rpc_secret_key_cache = CacheWithTTL::arc_with_capacity(
"rpc_secret_key_cache",
10_000,
Duration::from_secs(600),
)
.await;
let rpc_secret_key_cache = CacheBuilder::new(10_000)
.name("rpc_secret_key")
.time_to_live(Duration::from_secs(600))
.build();
// TODO: TTL left low, this could also be a solution instead of modifiying the cache, that may be disgusting across threads / slow anyways
let user_balance_cache =
CacheWithTTL::arc_with_capacity("user_balance_cache", 10_000, Duration::from_secs(600))
.await;
let user_balance_cache = CacheBuilder::new(10_000)
.name("user_balance")
.time_to_live(Duration::from_secs(600))
.build();
// create a channel for receiving stats
// we do this in a channel so we don't slow down our response to the users
@ -502,27 +502,27 @@ impl Web3ProxyApp {
// TODO: different chains might handle this differently
// TODO: what should we set? 5 minutes is arbitrary. the nodes themselves hold onto transactions for much longer
// TODO: this used to be time_to_update, but
let pending_transactions = CacheWithTTL::arc_with_capacity(
"pending_transactions",
10_000,
Duration::from_secs(300),
)
.await;
let pending_transactions = CacheBuilder::new(10_000)
.name("pending_transactions")
.time_to_live(Duration::from_secs(300))
.build();
// responses can be very different in sizes, so this is a cache with a max capacity and a weigher
// TODO: we should emit stats to calculate a more accurate expected cache size
// TODO: do we actually want a TTL on this?
// TODO: configurable max item weight instead of using ~0.1%
// TODO: resize the cache automatically
let response_cache = JsonRpcResponseCache::new_with_weights(
"response_cache",
(top_config.app.response_cache_max_bytes / 16_384) as usize,
NonZeroU32::try_from((top_config.app.response_cache_max_bytes / 1024) as u32).unwrap(),
top_config.app.response_cache_max_bytes,
JsonRpcResponseWeigher,
Duration::from_secs(3600),
)
.await;
let response_cache: JsonRpcResponseCache =
CacheBuilder::new(top_config.app.response_cache_max_bytes)
.weigher(json_rpc_response_weigher)
.build();
// (top_config.app.response_cache_max_bytes / 16_384) as usize,
// NonZeroU32::try_from((top_config.app.response_cache_max_bytes / 1024) as u32).unwrap(),
// top_config.app.response_cache_max_bytes,
// JsonRpcResponseWeigher,
// Duration::from_secs(3600),
// )
// .await;
// TODO: how should we handle hitting this max?
let max_users = 20_000;
@ -1160,7 +1160,7 @@ impl Web3ProxyApp {
.await
{
Ok(response_data) => (StatusCode::OK, response_data),
Err(err) => err.into_response_parts(),
Err(err) => err.as_response_parts(),
};
let response = JsonRpcForwardedResponse::from_response_data(response_data, response_id);
@ -1699,12 +1699,15 @@ impl Web3ProxyApp {
let to_block_num = cache_key.to_block_num();
let cache_errors = cache_key.cache_errors();
match self
// moka makes us do annoying things with arcs
enum CacheError {
NotCached(JsonRpcResponseEnum<Arc<RawValue>>),
Error(Arc<Web3ProxyError>),
}
let x = self
.jsonrpc_response_cache
.get_value_or_guard_async(cache_key.hash()).await
{
Ok(x) => x,
Err(x) => {
.try_get_with::<_, Mutex<CacheError>>(cache_key.hash(), async {
let response_data = timeout(
duration,
self.balanced_rpcs
@ -1716,16 +1719,32 @@ impl Web3ProxyApp {
to_block_num.as_ref(),
)
)
.await?;
.await
.map_err(|x| Mutex::new(CacheError::Error(Arc::new(Web3ProxyError::from(x)))))?;
let response_data: JsonRpcResponseEnum<Arc<RawValue>> = response_data.try_into()?;
// TODO: i think response data should be Arc<JsonRpcResponseEnum<Box<RawValue>>>, but that's more work
let response_data: JsonRpcResponseEnum<Arc<RawValue>> = response_data.try_into()
.map_err(|x| Mutex::new(CacheError::Error(Arc::new(x))))?;
if matches!(response_data, JsonRpcResponseEnum::Result { .. }) || cache_errors {
// TODO: convert the Box<RawValue> to an Arc<RawValue>?
x.insert(response_data.clone());
// TODO: read max size from the config
if response_data.num_bytes() as u64 > self.config.response_cache_max_bytes / 1000 {
Err(Mutex::new(CacheError::NotCached(response_data)))
} else if matches!(response_data, JsonRpcResponseEnum::Result { .. }) || cache_errors {
Ok(response_data)
} else {
Err(Mutex::new(CacheError::NotCached(response_data)))
}
}).await;
response_data
match x {
Ok(x) => x,
Err(arc_err) => {
let locked = arc_err.lock();
match &*locked {
CacheError::Error(err) => return Err(Web3ProxyError::Arc(err.clone())),
CacheError::NotCached(x) => x.clone(),
}
}
}
} else {

@ -22,6 +22,7 @@ use rust_decimal::Error as DecimalError;
use serde::Serialize;
use serde_json::value::RawValue;
use std::error::Error;
use std::sync::Arc;
use std::{borrow::Cow, net::IpAddr};
use tokio::{sync::AcquireError, task::JoinError, time::Instant};
@ -42,6 +43,7 @@ pub enum Web3ProxyError {
AccessDenied,
#[error(ignore)]
Anyhow(anyhow::Error),
Arc(Arc<Self>),
#[error(ignore)]
#[from(ignore)]
BadRequest(Cow<'static, str>),
@ -154,7 +156,7 @@ pub enum Web3ProxyError {
}
impl Web3ProxyError {
pub fn into_response_parts<R: Serialize>(self) -> (StatusCode, JsonRpcResponseEnum<R>) {
pub fn as_response_parts<R: Serialize>(&self) -> (StatusCode, JsonRpcResponseEnum<R>) {
// TODO: include a unique request id in the data
let (code, err): (StatusCode, JsonRpcErrorData) = match self {
Self::Abi(err) => {
@ -192,6 +194,10 @@ impl Web3ProxyError {
},
)
}
Self::Arc(err) => {
// recurse
return err.as_response_parts::<R>();
}
Self::BadRequest(err) => {
debug!("BAD_REQUEST: {}", err);
(
@ -533,7 +539,10 @@ impl Web3ProxyError {
},
)
}
Self::JsonRpcErrorData(jsonrpc_error_data) => (StatusCode::OK, jsonrpc_error_data),
Self::JsonRpcErrorData(jsonrpc_error_data) => {
// TODO: do this without clone? the Arc needed it though
(StatusCode::OK, jsonrpc_error_data.clone())
}
Self::MsgPackEncode(err) => {
warn!("MsgPackEncode Error: {}", err);
(
@ -764,7 +773,7 @@ impl Web3ProxyError {
)
}
Self::RefererRequired => {
warn!("referer required");
debug!("referer required");
(
StatusCode::BAD_REQUEST,
JsonRpcErrorData {
@ -775,7 +784,7 @@ impl Web3ProxyError {
)
}
Self::RefererNotAllowed(referer) => {
warn!("referer not allowed referer={:?}", referer);
debug!("referer not allowed referer={:?}", referer);
(
StatusCode::FORBIDDEN,
JsonRpcErrorData {
@ -829,9 +838,9 @@ impl Web3ProxyError {
}
(
status_code,
*status_code,
JsonRpcErrorData {
message: Cow::Owned(err_msg),
message: err_msg.to_owned().into(),
code: code.into(),
data: None,
},
@ -840,7 +849,7 @@ impl Web3ProxyError {
Self::Timeout(x) => (
StatusCode::REQUEST_TIMEOUT,
JsonRpcErrorData {
message: Cow::Owned(format!("request timed out: {:?}", x)),
message: format!("request timed out: {:?}", x).into(),
code: StatusCode::REQUEST_TIMEOUT.as_u16().into(),
// TODO: include the actual id!
data: None,
@ -851,7 +860,7 @@ impl Web3ProxyError {
(
StatusCode::BAD_REQUEST,
JsonRpcErrorData {
message: Cow::Owned(err.to_string()),
message: err.to_string().into(),
code: StatusCode::BAD_REQUEST.as_u16().into(),
data: None,
},
@ -862,7 +871,7 @@ impl Web3ProxyError {
(
StatusCode::BAD_REQUEST,
JsonRpcErrorData {
message: Cow::Owned(format!("{}", err)),
message: format!("{}", err).into(),
code: StatusCode::BAD_REQUEST.as_u16().into(),
data: None,
},
@ -873,7 +882,7 @@ impl Web3ProxyError {
(
StatusCode::BAD_GATEWAY,
JsonRpcErrorData {
message: Cow::Borrowed("no servers synced. unknown eth_blockNumber"),
message: "no servers synced. unknown eth_blockNumber".into(),
code: StatusCode::BAD_GATEWAY.as_u16().into(),
data: None,
},
@ -889,7 +898,7 @@ impl Web3ProxyError {
},
),
Self::UserAgentRequired => {
warn!("UserAgentRequired");
debug!("UserAgentRequired");
(
StatusCode::BAD_REQUEST,
JsonRpcErrorData {
@ -900,7 +909,7 @@ impl Web3ProxyError {
)
}
Self::UserAgentNotAllowed(ua) => {
warn!("UserAgentNotAllowed ua={}", ua);
debug!("UserAgentNotAllowed ua={}", ua);
(
StatusCode::FORBIDDEN,
JsonRpcErrorData {
@ -960,9 +969,7 @@ impl Web3ProxyError {
(
StatusCode::BAD_REQUEST,
JsonRpcErrorData {
message: Cow::Borrowed(
"redirect_public_url not set. only websockets work here",
),
message: "redirect_public_url not set. only websockets work here".into(),
code: StatusCode::BAD_REQUEST.as_u16().into(),
data: None,
},
@ -971,14 +978,14 @@ impl Web3ProxyError {
Self::WithContext(err, msg) => match err {
Some(err) => {
warn!("{:#?} w/ context {}", err, msg);
return err.into_response_parts();
return err.as_response_parts();
}
None => {
warn!("error w/ context {}", msg);
(
StatusCode::INTERNAL_SERVER_ERROR,
JsonRpcErrorData {
message: Cow::Owned(msg),
message: msg.to_owned().into(),
code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(),
data: None,
},
@ -991,7 +998,7 @@ impl Web3ProxyError {
}
pub fn into_response_with_id(self, id: Box<RawValue>) -> Response {
let (status_code, response_data) = self.into_response_parts();
let (status_code, response_data) = self.as_response_parts();
let response = JsonRpcForwardedResponse::from_response_data(response_data, id);

@ -30,7 +30,6 @@ use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::util::Timeout as KafkaTimeout;
use redis_rate_limiter::redis::AsyncCommands;
use redis_rate_limiter::RedisRateLimitResult;
use std::convert::Infallible;
use std::fmt::Display;
use std::hash::{Hash, Hasher};
use std::mem;
@ -921,13 +920,12 @@ impl Web3ProxyApp {
if let Some(max_concurrent_requests) = self.config.public_max_concurrent_requests {
let semaphore = self
.ip_semaphores
.get_or_insert_async::<Infallible>(ip, async move {
.get_with_by_ref(ip, async {
// TODO: set max_concurrent_requests dynamically based on load?
let s = Semaphore::new(max_concurrent_requests);
Ok(Arc::new(s))
Arc::new(s)
})
.await
.expect("infallible");
.await;
let semaphore_permit = semaphore.acquire_owned().await?;
@ -951,12 +949,11 @@ impl Web3ProxyApp {
let semaphore = self
.user_semaphores
.get_or_insert_async::<Infallible>(&user_id, async move {
.get_with_by_ref(&user_id, async move {
let s = Semaphore::new(max_concurrent_requests as usize);
Ok(Arc::new(s))
Arc::new(s)
})
.await
.expect("infallible");
.await;
let semaphore_permit = semaphore.acquire_owned().await?;
@ -980,12 +977,11 @@ impl Web3ProxyApp {
// limit concurrent requests
let semaphore = self
.bearer_token_semaphores
.get_or_insert_async::<Infallible>(&user_bearer_token, async move {
.get_with_by_ref(&user_bearer_token, async move {
let s = Semaphore::new(self.config.bearer_token_max_concurrent_requests as usize);
Ok(Arc::new(s))
Arc::new(s)
})
.await
.expect("infallible");
.await;
let semaphore_permit = semaphore.acquire_owned().await?;
@ -1139,25 +1135,25 @@ impl Web3ProxyApp {
) -> Web3ProxyResult<Arc<RwLock<Decimal>>> {
match NonZeroU64::try_from(user_id) {
Err(_) => Ok(Arc::new(RwLock::new(Decimal::default()))),
Ok(x) => {
self.user_balance_cache
.try_get_or_insert_async(&x, async move {
let db_replica = self
.db_replica()
.web3_context("Getting database connection")?;
Ok(x) => self
.user_balance_cache
.try_get_with_by_ref(&x, async move {
let db_replica = self
.db_replica()
.web3_context("Getting database connection")?;
let balance: Decimal = match balance::Entity::find()
.filter(balance::Column::UserId.eq(user_id))
.one(db_replica.as_ref())
.await?
{
Some(x) => x.total_deposits - x.total_spent_outside_free_tier,
None => Decimal::default(),
};
Ok(Arc::new(RwLock::new(balance)))
})
.await
}
let balance: Decimal = match balance::Entity::find()
.filter(balance::Column::UserId.eq(user_id))
.one(db_replica.as_ref())
.await?
{
Some(x) => x.total_deposits - x.total_spent_outside_free_tier,
None => Decimal::default(),
};
Ok(Arc::new(RwLock::new(balance)))
})
.await
.map_err(Into::into),
}
}
@ -1168,7 +1164,7 @@ impl Web3ProxyApp {
rpc_secret_key: RpcSecretKey,
) -> Web3ProxyResult<AuthorizationChecks> {
self.rpc_secret_key_cache
.try_get_or_insert_async(&rpc_secret_key, async move {
.try_get_with_by_ref(&rpc_secret_key, async move {
// trace!(?rpc_secret_key, "user cache miss");
let db_replica = self
@ -1303,6 +1299,7 @@ impl Web3ProxyApp {
}
})
.await
.map_err(Into::into)
}
/// Authorized the ip/origin/referer/useragent and rate limit and concurrency

@ -18,7 +18,7 @@ use axum::{
use http::{header::AUTHORIZATION, StatusCode};
use listenfd::ListenFd;
use log::{debug, info};
use quick_cache_ttl::UnitWeighter;
use moka::future::{Cache, CacheBuilder};
use std::net::SocketAddr;
use std::sync::Arc;
use std::{iter::once, time::Duration};
@ -37,12 +37,7 @@ pub enum ResponseCacheKey {
Status,
}
pub type ResponseCache = quick_cache_ttl::CacheWithTTL<
ResponseCacheKey,
(StatusCode, &'static str, axum::body::Bytes),
UnitWeighter,
quick_cache_ttl::DefaultHashBuilder,
>;
pub type ResponseCache = Cache<ResponseCacheKey, (StatusCode, &'static str, axum::body::Bytes)>;
/// Start the frontend server.
pub async fn serve(
@ -58,12 +53,10 @@ pub async fn serve(
debug!("response_cache size: {}", response_cache_size);
let response_cache = ResponseCache::new(
"response_cache",
response_cache_size,
Duration::from_secs(1),
)
.await;
let response_cache: ResponseCache = CacheBuilder::new(response_cache_size as u64)
.name("frontend_response")
.time_to_live(Duration::from_secs(1))
.build();
// TODO: read config for if fastest/versus should be available publicly. default off

@ -330,7 +330,7 @@ async fn handle_socket_payload(
let (authorization, semaphore) = match authorization.check_again(&app).await {
Ok((a, s)) => (a, s),
Err(err) => {
let (_, err) = err.into_response_parts();
let (_, err) = err.as_response_parts();
let err = JsonRpcForwardedResponse::from_response_data(err, Default::default());
@ -437,7 +437,7 @@ async fn handle_socket_payload(
let response_str = match response {
Ok(x) => serde_json::to_string(&x).expect("to_string should always work here"),
Err(err) => {
let (_, response_data) = err.into_response_parts();
let (_, response_data) = err.as_response_parts();
let response = JsonRpcForwardedResponse::from_response_data(response_data, response_id);

@ -13,7 +13,9 @@ use axum::{
};
use axum_macros::debug_handler;
use log::trace;
use moka::future::Cache;
use once_cell::sync::Lazy;
use serde::{ser::SerializeStruct, Serialize};
use serde_json::json;
use std::sync::Arc;
@ -33,7 +35,7 @@ pub async fn health(
Extension(cache): Extension<Arc<ResponseCache>>,
) -> impl IntoResponse {
let (code, content_type, body) = cache
.get_or_insert_async(&ResponseCacheKey::Health, async move { _health(app).await })
.get_with(ResponseCacheKey::Health, async move { _health(app).await })
.await;
Response::builder()
@ -66,7 +68,7 @@ pub async fn backups_needed(
Extension(cache): Extension<Arc<ResponseCache>>,
) -> impl IntoResponse {
let (code, content_type, body) = cache
.get_or_insert_async(&ResponseCacheKey::BackupsNeeded, async move {
.get_with(ResponseCacheKey::BackupsNeeded, async move {
_backups_needed(app).await
})
.await;
@ -117,7 +119,7 @@ pub async fn status(
Extension(cache): Extension<Arc<ResponseCache>>,
) -> impl IntoResponse {
let (code, content_type, body) = cache
.get_or_insert_async(&ResponseCacheKey::Status, async move { _status(app).await })
.get_with(ResponseCacheKey::Status, async move { _status(app).await })
.await;
Response::builder()
@ -139,10 +141,10 @@ async fn _status(app: Arc<Web3ProxyApp>) -> (StatusCode, &'static str, Bytes) {
"bundler_4337_rpcs": app.bundler_4337_rpcs,
"chain_id": app.config.chain_id,
"hostname": app.hostname,
"jsonrpc_response_cache": app.jsonrpc_response_cache,
"jsonrpc_response_cache": MokaCacheSerializer(&app.jsonrpc_response_cache),
"private_rpcs": app.private_rpcs,
"rpc_secret_key_cache": app.rpc_secret_key_cache,
"user_balance_cache": app.user_balance_cache,
"rpc_secret_key_cache": MokaCacheSerializer(&app.rpc_secret_key_cache),
"user_balance_cache": MokaCacheSerializer(&app.user_balance_cache),
"version": APP_USER_AGENT,
});
@ -158,3 +160,20 @@ async fn _status(app: Arc<Web3ProxyApp>) -> (StatusCode, &'static str, Bytes) {
(code, CONTENT_TYPE_JSON, body)
}
pub struct MokaCacheSerializer<'a, K, V>(pub &'a Cache<K, V>);
impl<'a, K, V> Serialize for MokaCacheSerializer<'a, K, V> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let mut state = serializer.serialize_struct("MokaCache", 3)?;
state.serialize_field("entry_count", &self.0.entry_count())?;
state.serialize_field("name", &self.0.name())?;
state.serialize_field("weighted_size", &self.0.weighted_size())?;
state.end()
}
}

@ -18,7 +18,7 @@ use entities::{balance, login, pending_login, referee, referrer, rpc_key, user};
use ethers::{prelude::Address, types::Bytes};
use hashbrown::HashMap;
use http::StatusCode;
use log::{debug, warn};
use log::{debug, warn, trace};
use migration::sea_orm::prelude::{Decimal, Uuid};
use migration::sea_orm::{
self, ActiveModelTrait, ColumnTrait, EntityTrait, IntoActiveModel, QueryFilter,
@ -314,7 +314,7 @@ pub async fn user_login_post(
debug!("Refferal code is: {:?}", payload.referral_code);
if let Some(referral_code) = payload.referral_code.as_ref() {
// If it is not inside, also check in the database
warn!("Using register referral code: {:?}", referral_code);
trace!("Using register referral code: {:?}", referral_code);
let user_referrer = referrer::Entity::find()
.filter(referrer::Column::ReferralCode.eq(referral_code))
.one(db_replica.as_ref())
@ -345,7 +345,7 @@ pub async fn user_login_post(
// First, optionally catch a referral code from the parameters if there is any
if let Some(referral_code) = payload.referral_code.as_ref() {
// If it is not inside, also check in the database
warn!("Using referral code: {:?}", referral_code);
trace!("Using referral code: {:?}", referral_code);
let user_referrer = referrer::Entity::find()
.filter(referrer::Column::ReferralCode.eq(referral_code))
.one(db_replica.as_ref())

@ -181,7 +181,7 @@ pub async fn user_balance_post(
// .topics[0]
// {
// debug!("Bloom input bytes is: {:?}", x);
// debug!("Bloom input bytes is: {:?}", x..as_fixed_bytes());
// debug!("Bloom input bytes is: {:?}", x.as_fixed_bytes());
// debug!("Bloom input as hex is: {:?}", hex!(x));
// let bloom_input = BloomInput::Raw(hex!(x));
// debug!(
@ -312,13 +312,14 @@ pub async fn user_balance_post(
match NonZeroU64::try_from(recipient.id) {
Err(_) => {}
Ok(x) => {
app.user_balance_cache.remove(&x);
app.user_balance_cache.invalidate(&x).await;
}
};
for rpc_key_entity in rpc_keys {
app.rpc_secret_key_cache
.remove(&rpc_key_entity.secret_key.into());
.invalidate(&rpc_key_entity.secret_key.into())
.await;
}
let x = json!({

@ -15,9 +15,8 @@ use entities::{balance, rpc_key, secondary_user, user};
use ethers::types::Address;
use hashbrown::HashMap;
use http::StatusCode;
use log::{trace, warn};
use log::trace;
use migration::sea_orm;
use migration::sea_orm::prelude::Decimal;
use migration::sea_orm::ActiveModelTrait;
use migration::sea_orm::ColumnTrait;
use migration::sea_orm::EntityTrait;
@ -146,7 +145,7 @@ pub async fn get_subusers(
.all(db_replica.as_ref())
.await?;
warn!("Subusers are: {:?}", subusers);
trace!("Subusers are: {}", json!(subusers));
// Now return the list
let response_json = json!({

@ -2,12 +2,11 @@ use crate::{errors::Web3ProxyError, jsonrpc::JsonRpcErrorData, rpcs::blockchain:
use derive_more::From;
use ethers::{providers::ProviderError, types::U64};
use hashbrown::hash_map::DefaultHashBuilder;
use quick_cache_ttl::{CacheWithTTL, Weighter};
use moka::future::Cache;
use serde_json::value::RawValue;
use std::{
borrow::Cow,
hash::{BuildHasher, Hash, Hasher},
num::NonZeroU32,
sync::Arc,
};
@ -82,28 +81,24 @@ impl JsonRpcQueryCacheKey {
}
}
pub type JsonRpcResponseCache =
CacheWithTTL<u64, JsonRpcResponseEnum<Arc<RawValue>>, JsonRpcResponseWeigher>;
#[derive(Clone)]
pub struct JsonRpcResponseWeigher;
pub type JsonRpcResponseCache = Cache<u64, JsonRpcResponseEnum<Arc<RawValue>>>;
/// TODO: we might need one that holds RawValue and one that holds serde_json::Value
#[derive(Clone, Debug)]
pub enum JsonRpcResponseEnum<R> {
Result {
value: R,
num_bytes: NonZeroU32,
num_bytes: u32,
},
RpcError {
error_data: JsonRpcErrorData,
num_bytes: NonZeroU32,
num_bytes: u32,
},
}
// TODO: impl for other inner result types?
impl<R> JsonRpcResponseEnum<R> {
pub fn num_bytes(&self) -> NonZeroU32 {
pub fn num_bytes(&self) -> u32 {
match self {
Self::Result { num_bytes, .. } => *num_bytes,
Self::RpcError { num_bytes, .. } => *num_bytes,
@ -123,7 +118,7 @@ impl From<Arc<RawValue>> for JsonRpcResponseEnum<Arc<RawValue>> {
fn from(value: Arc<RawValue>) -> Self {
let num_bytes = value.get().len();
let num_bytes = NonZeroU32::try_from(num_bytes as u32).unwrap();
let num_bytes = num_bytes as u32;
Self::Result { value, num_bytes }
}
@ -133,7 +128,7 @@ impl From<Box<RawValue>> for JsonRpcResponseEnum<Arc<RawValue>> {
fn from(value: Box<RawValue>) -> Self {
let num_bytes = value.get().len();
let num_bytes = NonZeroU32::try_from(num_bytes as u32).unwrap();
let num_bytes = num_bytes as u32;
let value = value.into();
@ -190,7 +185,7 @@ impl<R> From<JsonRpcErrorData> for JsonRpcResponseEnum<R> {
// TODO: wrap the error in a complete response?
let num_bytes = serde_json::to_string(&value).unwrap().len();
let num_bytes = NonZeroU32::try_from(num_bytes as u32).unwrap();
let num_bytes = num_bytes as u32;
Self::RpcError {
error_data: value,
@ -235,67 +230,75 @@ impl TryFrom<ProviderError> for JsonRpcErrorData {
}
}
// TODO: instead of Arc<RawValue>, be generic
impl<K, Q> Weighter<K, Q, JsonRpcResponseEnum<Arc<RawValue>>> for JsonRpcResponseWeigher {
fn weight(&self, _key: &K, _qey: &Q, value: &JsonRpcResponseEnum<Arc<RawValue>>) -> NonZeroU32 {
value.num_bytes()
}
pub fn json_rpc_response_weigher<K, R>(_key: &K, value: &JsonRpcResponseEnum<R>) -> u32 {
value.num_bytes()
}
#[cfg(test)]
mod tests {
use super::{JsonRpcResponseEnum, JsonRpcResponseWeigher};
use quick_cache_ttl::CacheWithTTL;
use super::JsonRpcResponseEnum;
use crate::response_cache::json_rpc_response_weigher;
use serde_json::value::RawValue;
use std::{num::NonZeroU32, sync::Arc, time::Duration};
use std::sync::Arc;
#[tokio::test(start_paused = true)]
async fn test_json_rpc_query_weigher() {
let max_item_weight = 200;
let weight_capacity = 1_000;
let test_cache: CacheWithTTL<
u32,
JsonRpcResponseEnum<Arc<RawValue>>,
JsonRpcResponseWeigher,
> = CacheWithTTL::new_with_weights(
"test",
5,
max_item_weight.try_into().unwrap(),
weight_capacity,
JsonRpcResponseWeigher,
Duration::from_secs(2),
)
.await;
// let test_cache: Cache<u32, JsonRpcResponseEnum<Arc<RawValue>>> =
// CacheBuilder::new(weight_capacity)
// .weigher(json_rpc_response_weigher)
// .time_to_live(Duration::from_secs(2))
// .build();
let small_data = JsonRpcResponseEnum::Result {
let small_data: JsonRpcResponseEnum<Arc<RawValue>> = JsonRpcResponseEnum::Result {
value: Box::<RawValue>::default().into(),
num_bytes: NonZeroU32::try_from(max_item_weight / 2).unwrap(),
num_bytes: max_item_weight / 2,
};
let max_sized_data = JsonRpcResponseEnum::Result {
assert_eq!(
json_rpc_response_weigher(&(), &small_data),
max_item_weight / 2
);
let max_sized_data: JsonRpcResponseEnum<Arc<RawValue>> = JsonRpcResponseEnum::Result {
value: Box::<RawValue>::default().into(),
num_bytes: NonZeroU32::try_from(max_item_weight).unwrap(),
num_bytes: max_item_weight,
};
let oversized_data = JsonRpcResponseEnum::Result {
assert_eq!(
json_rpc_response_weigher(&(), &max_sized_data),
max_item_weight
);
let oversized_data: JsonRpcResponseEnum<Arc<RawValue>> = JsonRpcResponseEnum::Result {
value: Box::<RawValue>::default().into(),
num_bytes: NonZeroU32::try_from(max_item_weight * 2).unwrap(),
num_bytes: max_item_weight * 2,
};
test_cache.try_insert(0, small_data).unwrap();
assert_eq!(
json_rpc_response_weigher(&(), &oversized_data),
max_item_weight * 2
);
// TODO: helper for inserts that does size checking
/*
test_cache.insert(0, small_data).await;
test_cache.get(&0).unwrap();
test_cache.try_insert(1, max_sized_data).unwrap();
test_cache.insert(1, max_sized_data).await;
test_cache.get(&0).unwrap();
test_cache.get(&1).unwrap();
test_cache.try_insert(2, oversized_data).unwrap_err();
// TODO: this will currently work! need to wrap moka cache in a checked insert
test_cache.insert(2, oversized_data).await;
test_cache.get(&0).unwrap();
test_cache.get(&1).unwrap();
assert!(test_cache.get(&2).is_none());
*/
}
}

@ -9,7 +9,7 @@ use crate::frontend::authorization::Authorization;
use derive_more::From;
use ethers::prelude::{Block, TxHash, H256, U64};
use log::{debug, trace, warn};
use quick_cache_ttl::CacheWithTTL;
use moka::future::Cache;
use serde::ser::SerializeStruct;
use serde::Serialize;
use serde_json::json;
@ -20,8 +20,8 @@ use tokio::sync::broadcast;
// TODO: type for Hydrated Blocks with their full transactions?
pub type ArcBlock = Arc<Block<TxHash>>;
pub type BlocksByHashCache = Arc<CacheWithTTL<H256, Web3ProxyBlock>>;
pub type BlocksByNumberCache = Arc<CacheWithTTL<U64, H256>>;
pub type BlocksByHashCache = Cache<H256, Web3ProxyBlock>;
pub type BlocksByNumberCache = Cache<U64, H256>;
/// A block and its age.
#[derive(Clone, Debug, Default, From)]
@ -172,8 +172,6 @@ impl Web3Rpcs {
return Ok(block);
}
let block_num = block.number();
// this block is very likely already in block_hashes
// TODO: use their get_with
let block_hash = *block.hash();
@ -182,15 +180,17 @@ impl Web3Rpcs {
if heaviest_chain {
// this is the only place that writes to block_numbers
// multiple inserts should be okay though
// TODO: info that there was a fork?
// TODO: info if there was a fork?
let block_num = block.number();
self.blocks_by_number
.get_or_insert_async(block_num, async move { block_hash })
.get_with_by_ref(block_num, async move { block_hash })
.await;
}
let block = self
.blocks_by_hash
.get_or_insert_async(&block_hash, async move { block })
.get_with_by_ref(&block_hash, async move { block })
.await;
Ok(block)
@ -468,7 +468,7 @@ impl Web3Rpcs {
// hash changed
debug!(
"unc {}/{} {}{}/{}/{} con_head={} old={} rpc={}@{}",
"unc {}/{} {}{}/{}/{} con={} old={} rpc={}@{}",
consensus_tier,
total_tiers,
backups_voted_str,

@ -8,11 +8,10 @@ use ethers::prelude::{H256, U64};
use hashbrown::{HashMap, HashSet};
use itertools::{Itertools, MinMaxResult};
use log::{trace, warn};
use quick_cache_ttl::Cache;
use moka::future::Cache;
use serde::Serialize;
use std::cmp::{Ordering, Reverse};
use std::collections::BTreeMap;
use std::convert::Infallible;
use std::fmt;
use std::sync::Arc;
use tokio::time::Instant;
@ -378,9 +377,8 @@ impl ConsensusFinder {
async fn insert(&mut self, rpc: Arc<Web3Rpc>, block: Web3ProxyBlock) -> Option<Web3ProxyBlock> {
let first_seen = self
.first_seen
.get_or_insert_async::<Infallible>(block.hash(), async { Ok(Instant::now()) })
.await
.expect("this cache get is infallible");
.get_with_by_ref(block.hash(), async { Instant::now() })
.await;
// calculate elapsed time before trying to lock
let latency = first_seen.elapsed();

@ -8,6 +8,7 @@ use crate::config::{BlockAndRpc, TxHashAndRpc, Web3RpcConfig};
use crate::errors::{Web3ProxyError, Web3ProxyResult};
use crate::frontend::authorization::{Authorization, RequestMetadata};
use crate::frontend::rpc_proxy_ws::ProxyMode;
use crate::frontend::status::MokaCacheSerializer;
use crate::jsonrpc::{JsonRpcErrorData, JsonRpcParams, JsonRpcResultData};
use crate::rpcs::transactions::TxStatus;
use arc_swap::ArcSwap;
@ -21,8 +22,8 @@ use hashbrown::{HashMap, HashSet};
use itertools::Itertools;
use log::{debug, error, info, trace, warn};
use migration::sea_orm::DatabaseConnection;
use moka::future::{Cache, CacheBuilder};
use ordered_float::OrderedFloat;
use quick_cache_ttl::CacheWithTTL;
use serde::ser::{SerializeStruct, Serializer};
use serde::Serialize;
use serde_json::json;
@ -53,7 +54,7 @@ pub struct Web3Rpcs {
/// this head receiver makes it easy to wait until there is a new block
pub(super) watch_consensus_head_sender: Option<watch::Sender<Option<Web3ProxyBlock>>>,
/// keep track of transactions that we have sent through subscriptions
pub(super) pending_transaction_cache: Arc<CacheWithTTL<TxHash, TxStatus>>,
pub(super) pending_transaction_cache: Cache<TxHash, TxStatus>,
pub(super) pending_tx_id_receiver: flume::Receiver<TxHashAndRpc>,
pub(super) pending_tx_id_sender: flume::Sender<TxHashAndRpc>,
/// TODO: this map is going to grow forever unless we do some sort of pruning. maybe store pruned in redis?
@ -81,7 +82,7 @@ impl Web3Rpcs {
min_head_rpcs: usize,
min_sum_soft_limit: u32,
name: String,
pending_transaction_cache: Arc<CacheWithTTL<TxHash, TxStatus>>,
pending_transaction_cache: Cache<TxHash, TxStatus>,
pending_tx_sender: Option<broadcast::Sender<TxStatus>>,
watch_consensus_head_sender: Option<watch::Sender<Option<Web3ProxyBlock>>>,
) -> anyhow::Result<(
@ -96,16 +97,18 @@ impl Web3Rpcs {
// these blocks don't have full transactions, but they do have rather variable amounts of transaction hashes
// TODO: actual weighter on this
// TODO: time_to_idle instead?
let blocks_by_hash: BlocksByHashCache = Arc::new(
CacheWithTTL::new("blocks_by_hash", 1_000, Duration::from_secs(30 * 60)).await,
);
let blocks_by_hash: BlocksByHashCache = CacheBuilder::new(1_000)
.name("blocks_by_hash")
.time_to_idle(Duration::from_secs(30 * 60))
.build();
// all block numbers are the same size, so no need for weigher
// TODO: limits from config
// TODO: time_to_idle instead?
let blocks_by_number = Arc::new(
CacheWithTTL::new("blocks_by_number", 1_000, Duration::from_secs(30 * 60)).await,
);
let blocks_by_number = CacheBuilder::new(1_000)
.name("blocks_by_number")
.time_to_idle(Duration::from_secs(30 * 60))
.build();
let (watch_consensus_rpcs_sender, consensus_connections_watcher) =
watch::channel(Default::default());
@ -1254,9 +1257,15 @@ impl Serialize for Web3Rpcs {
}
}
state.serialize_field("blocks_by_hash", &self.blocks_by_hash)?;
state.serialize_field("blocks_by_number", &self.blocks_by_number)?;
state.serialize_field("pending_transaction_cache", &self.pending_transaction_cache)?;
state.serialize_field("blocks_by_hash", &MokaCacheSerializer(&self.blocks_by_hash))?;
state.serialize_field(
"blocks_by_number",
&MokaCacheSerializer(&self.blocks_by_number),
)?;
state.serialize_field(
"pending_transaction_cache",
&MokaCacheSerializer(&self.pending_transaction_cache),
)?;
state.serialize_field("block_sender_len", &self.block_sender.len())?;
@ -1308,6 +1317,7 @@ mod tests {
use ethers::types::{Block, U256};
use latency::PeakEwmaLatency;
use log::{trace, LevelFilter};
use moka::future::CacheBuilder;
use parking_lot::RwLock;
use tokio::sync::RwLock as AsyncRwLock;
@ -1488,26 +1498,17 @@ mod tests {
name: "test".to_string(),
watch_consensus_head_sender: Some(watch_consensus_head_sender),
watch_consensus_rpcs_sender,
pending_transaction_cache: CacheWithTTL::arc_with_capacity(
"pending_transaction_cache",
100,
Duration::from_secs(60),
)
.await,
pending_transaction_cache: CacheBuilder::new(100)
.time_to_live(Duration::from_secs(60))
.build(),
pending_tx_id_receiver,
pending_tx_id_sender,
blocks_by_hash: CacheWithTTL::arc_with_capacity(
"blocks_by_hash",
100,
Duration::from_secs(60),
)
.await,
blocks_by_number: CacheWithTTL::arc_with_capacity(
"blocks_by_number",
100,
Duration::from_secs(60),
)
.await,
blocks_by_hash: CacheBuilder::new(100)
.time_to_live(Duration::from_secs(60))
.build(),
blocks_by_number: CacheBuilder::new(100)
.time_to_live(Duration::from_secs(60))
.build(),
// TODO: test max_block_age?
max_block_age: None,
// TODO: test max_block_lag?
@ -1777,26 +1778,17 @@ mod tests {
name: "test".to_string(),
watch_consensus_head_sender: Some(watch_consensus_head_sender),
watch_consensus_rpcs_sender,
pending_transaction_cache: CacheWithTTL::arc_with_capacity(
"pending_transaction_cache",
100,
Duration::from_secs(120),
)
.await,
pending_transaction_cache: CacheBuilder::new(100)
.time_to_live(Duration::from_secs(120))
.build(),
pending_tx_id_receiver,
pending_tx_id_sender,
blocks_by_hash: CacheWithTTL::arc_with_capacity(
"blocks_by_hash",
100,
Duration::from_secs(120),
)
.await,
blocks_by_number: CacheWithTTL::arc_with_capacity(
"blocks_by_number",
100,
Duration::from_secs(120),
)
.await,
blocks_by_hash: CacheBuilder::new(100)
.time_to_live(Duration::from_secs(120))
.build(),
blocks_by_number: CacheBuilder::new(100)
.time_to_live(Duration::from_secs(120))
.build(),
min_head_rpcs: 1,
min_sum_soft_limit: 4_000,
max_block_age: None,
@ -1972,26 +1964,11 @@ mod tests {
name: "test".to_string(),
watch_consensus_head_sender: Some(watch_consensus_head_sender),
watch_consensus_rpcs_sender,
pending_transaction_cache: CacheWithTTL::arc_with_capacity(
"pending_transaction_cache",
10_000,
Duration::from_secs(120),
)
.await,
pending_transaction_cache: Cache::new(10_000),
pending_tx_id_receiver,
pending_tx_id_sender,
blocks_by_hash: CacheWithTTL::arc_with_capacity(
"blocks_by_hash",
10_000,
Duration::from_secs(120),
)
.await,
blocks_by_number: CacheWithTTL::arc_with_capacity(
"blocks_by_number",
10_000,
Duration::from_secs(120),
)
.await,
blocks_by_hash: Cache::new(10_000),
blocks_by_number: Cache::new(10_000),
min_head_rpcs: 1,
min_sum_soft_limit: 1_000,
max_block_age: None,

@ -416,7 +416,7 @@ impl Web3Rpc {
// if we already have this block saved, set new_head_block to that arc. otherwise store this copy
let new_head_block = block_map
.get_or_insert_async(&new_hash, async move { new_head_block })
.get_with_by_ref(&new_hash, async move { new_head_block })
.await;
// save the block so we don't send the same one multiple times

@ -20,10 +20,8 @@ use fstrings::{f, format_args_f};
use hashbrown::HashMap;
use influxdb2::api::query::FluxRecord;
use influxdb2::models::Query;
use log::{error, info, warn};
use migration::sea_orm::ColumnTrait;
use migration::sea_orm::EntityTrait;
use migration::sea_orm::QueryFilter;
use log::{debug, error, trace, warn};
use migration::sea_orm::{ColumnTrait, EntityTrait, QueryFilter};
use serde_json::json;
use ulid::Ulid;
@ -146,7 +144,7 @@ pub async fn query_user_stats<'a>(
.clone()
.context("No influxdb bucket was provided")?; // "web3_proxy";
info!("Bucket is {:?}", bucket);
trace!("Bucket is {:?}", bucket);
let mut filter_chain_id = "".to_string();
if chain_id != 0 {
filter_chain_id = f!(r#"|> filter(fn: (r) => r["chain_id"] == "{chain_id}")"#);
@ -154,14 +152,15 @@ pub async fn query_user_stats<'a>(
// Fetch and request for balance
info!(
trace!(
"Query start and stop are: {:?} {:?}",
query_start, query_stop
query_start,
query_stop
);
// info!("Query column parameters are: {:?}", stats_column);
info!("Query measurement is: {:?}", measurement);
info!("Filters are: {:?}", filter_chain_id); // filter_field
info!("window seconds are: {:?}", query_window_seconds);
trace!("Query measurement is: {:?}", measurement);
trace!("Filters are: {:?}", filter_chain_id); // filter_field
trace!("window seconds are: {:?}", query_window_seconds);
let drop_method = match stat_response_type {
StatType::Aggregated => f!(r#"|> drop(columns: ["method"])"#),
@ -190,9 +189,9 @@ pub async fn query_user_stats<'a>(
|> sort(columns: ["_time", "_measurement", "archive_needed", "chain_id", "error_response", "method", "rpc_secret_key_id"], desc: true)
"#);
info!("Raw query to db is: {:?}", query);
debug!("Raw query to db is: {:#?}", query);
let query = Query::new(query.to_string());
info!("Query to db is: {:?}", query);
trace!("Query to db is: {:?}", query);
// Make the query and collect all data
let raw_influx_responses: Vec<FluxRecord> = influxdb_client

@ -17,7 +17,7 @@ use derive_more::From;
use entities::sea_orm_active_enums::TrackingLevel;
use entities::{balance, referee, referrer, rpc_accounting_v2, rpc_key};
use influxdb2::models::DataPoint;
use log::trace;
use log::{debug, trace};
use migration::sea_orm::prelude::Decimal;
use migration::sea_orm::{
self, ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter, TransactionTrait,
@ -184,12 +184,14 @@ impl RpcQueryStats {
}
}
#[derive(Debug, Default)]
struct Deltas {
balance_used_outside_free_tier: Decimal,
balance_used_including_free_tier: Decimal,
sender_bonus_applied: bool,
referrer_deposit_delta: Decimal,
sender_bonus_balance_deposited: Decimal,
balance_spent_including_free_credits: Decimal,
balance_spent_excluding_free_credits: Decimal,
apply_usage_bonus_to_request_sender: bool,
usage_bonus_to_request_sender_through_referral: Decimal,
bonus_to_referrer: Decimal,
}
/// A stat that we aggregate and then store in a database.
@ -375,29 +377,42 @@ impl BufferedRpcQueryStats {
referral_objects: Option<(referee::Model, referrer::Model)>,
) -> Web3ProxyResult<(Deltas, Option<(referee::Model, referrer::Model)>)> {
// Calculate Balance Only
let mut deltas = Deltas {
balance_used_outside_free_tier: Default::default(),
balance_used_including_free_tier: Default::default(),
sender_bonus_applied: false,
referrer_deposit_delta: Default::default(),
sender_bonus_balance_deposited: Default::default(),
};
let mut deltas = Deltas::default();
// Calculate a bunch using referrals as well
if let Some((referral_entity, referrer_code_entity)) = referral_objects {
deltas.sender_bonus_applied = referral_entity.credits_applied_for_referee;
deltas.apply_usage_bonus_to_request_sender =
referral_entity.credits_applied_for_referee;
// Calculate if we are above the usage threshold, and apply a bonus
// Optimally we would read this from the balance, but if we do it like this, we only have to lock a single table (much safer w.r.t. deadlocks)
// referral_entity.credits_applied_for_referrer * (Decimal::from(10) checks (atomically using this table only), whether the user has brought in >$100 to the referer
// In this case, the sender receives $100 as a bonus / gift
// Apply a 10$ bonus onto the user, if the user has spent 100$
debug!(
"Were credits applied so far? {:?} {:?}",
referral_entity.credits_applied_for_referee,
!referral_entity.credits_applied_for_referee
);
debug!(
"Credits applied for referrer so far? {:?}",
referral_entity.credits_applied_for_referrer
);
debug!("Sum credits used? {:?}", self.sum_credits_used);
debug!(
"Hello: {:?}",
(referral_entity.credits_applied_for_referrer * (Decimal::from(10))
+ self.sum_credits_used)
>= Decimal::from(100)
);
if !referral_entity.credits_applied_for_referee
&& (referral_entity.credits_applied_for_referrer * (Decimal::from(10))
+ self.sum_credits_used)
>= Decimal::from(100)
{
deltas.sender_bonus_balance_deposited += Decimal::from(10);
deltas.sender_bonus_applied = true;
debug!("Adding sender bonus balance");
deltas.usage_bonus_to_request_sender_through_referral = Decimal::from(10);
deltas.apply_usage_bonus_to_request_sender = true;
}
// Calculate how much the referrer should get, limited to the last 12 months
@ -407,24 +422,40 @@ impl BufferedRpcQueryStats {
+ Months::new(12);
if now <= valid_until {
deltas.referrer_deposit_delta += self.sum_credits_used / Decimal::new(10, 0);
deltas.bonus_to_referrer += self.sum_credits_used / Decimal::new(10, 0);
}
return Ok((deltas, Some((referral_entity, referrer_code_entity))));
}
// Duplicate code, I should fix this later ...
let user_balance = sender_balance.total_deposits
- sender_balance.total_spent_outside_free_tier
+ deltas.usage_bonus_to_request_sender_through_referral;
let user_balance = (sender_balance.total_deposits + deltas.sender_bonus_balance_deposited
- sender_balance.total_spent_outside_free_tier);
// Split up the component of into how much of the paid component was used, and how much of the free component was used (anything after "balance")
if user_balance >= Decimal::from(0) {
deltas.balance_used_outside_free_tier = self.sum_credits_used;
// Split up the component of into how much of the paid component was used, and how much of the free component was used (anything after "balance")
if user_balance - self.sum_credits_used >= Decimal::from(0) {
deltas.balance_spent_including_free_credits = self.sum_credits_used;
deltas.balance_spent_excluding_free_credits = self.sum_credits_used;
} else {
deltas.balance_spent_including_free_credits = user_balance;
deltas.balance_spent_excluding_free_credits = self.sum_credits_used;
}
Ok((deltas, Some((referral_entity, referrer_code_entity))))
} else {
deltas.balance_used_outside_free_tier =
user_balance + deltas.sender_bonus_balance_deposited;
deltas.balance_used_including_free_tier = self.sum_credits_used;
}
let user_balance = sender_balance.total_deposits
- sender_balance.total_spent_outside_free_tier
+ deltas.usage_bonus_to_request_sender_through_referral;
Ok((deltas, None))
// Split up the component of into how much of the paid component was used, and how much of the free component was used (anything after "balance")
if user_balance - self.sum_credits_used >= Decimal::from(0) {
deltas.balance_spent_including_free_credits = self.sum_credits_used;
deltas.balance_spent_excluding_free_credits = self.sum_credits_used;
} else {
deltas.balance_spent_including_free_credits = user_balance;
deltas.balance_spent_excluding_free_credits = self.sum_credits_used;
}
Ok((deltas, None))
}
}
/// Save all referral-based objects in the database
@ -435,33 +466,42 @@ impl BufferedRpcQueryStats {
sender_rpc_entity: &rpc_key::Model,
referral_objects: &Option<(referee::Model, referrer::Model)>,
) -> Web3ProxyResult<()> {
// Do the user updates
// Do the sender balance updates
let user_balance = balance::ActiveModel {
id: sea_orm::NotSet,
total_deposits: sea_orm::Set(deltas.sender_bonus_balance_deposited),
total_spent_including_free_tier: sea_orm::Set(deltas.balance_used_including_free_tier),
total_spent_outside_free_tier: sea_orm::Set(deltas.balance_used_outside_free_tier),
total_deposits: sea_orm::Set(deltas.usage_bonus_to_request_sender_through_referral),
total_spent_including_free_tier: sea_orm::Set(
deltas.balance_spent_including_free_credits,
),
total_spent_outside_free_tier: sea_orm::Set(
deltas.balance_spent_excluding_free_credits,
),
user_id: sea_orm::Set(sender_rpc_entity.user_id),
};
// In any case, add to the balance
debug!(
"Delta is: {:?} from credits used {:?}",
deltas, self.sum_credits_used
);
let _ = balance::Entity::insert(user_balance)
.on_conflict(
OnConflict::new()
.values([
(
balance::Column::TotalDeposits,
Expr::col(balance::Column::TotalDeposits)
.add(deltas.sender_bonus_balance_deposited),
),
(
balance::Column::TotalSpentIncludingFreeTier,
Expr::col(balance::Column::TotalSpentIncludingFreeTier)
.add(deltas.balance_used_including_free_tier),
.add(deltas.balance_spent_including_free_credits),
),
(
balance::Column::TotalSpentOutsideFreeTier,
Expr::col(balance::Column::TotalSpentOutsideFreeTier)
.add(deltas.balance_used_outside_free_tier),
.add(deltas.balance_spent_excluding_free_credits),
),
(
balance::Column::TotalDeposits,
Expr::col(balance::Column::TotalDeposits)
.add(deltas.usage_bonus_to_request_sender_through_referral),
),
])
.to_owned(),
@ -471,57 +511,78 @@ impl BufferedRpcQueryStats {
// Do the referrer_entry updates
if let Some((referral_entity, referrer_code_entity)) = referral_objects {
if deltas.referrer_deposit_delta > Decimal::from(0) {
let referee_entry = referee::ActiveModel {
id: sea_orm::Unchanged(referral_entity.id),
referral_start_date: sea_orm::Unchanged(referral_entity.referral_start_date),
used_referral_code: sea_orm::Unchanged(referral_entity.used_referral_code),
user_id: sea_orm::Unchanged(referral_entity.user_id),
debug!("Positive referrer deposit delta");
let referee_entry = referee::ActiveModel {
id: sea_orm::Unchanged(referral_entity.id),
credits_applied_for_referee: sea_orm::Set(
deltas.apply_usage_bonus_to_request_sender,
),
credits_applied_for_referrer: sea_orm::Set(deltas.bonus_to_referrer),
credits_applied_for_referee: sea_orm::Set(deltas.sender_bonus_applied),
credits_applied_for_referrer: sea_orm::Set(deltas.referrer_deposit_delta),
};
referee::Entity::insert(referee_entry)
referral_start_date: sea_orm::Unchanged(referral_entity.referral_start_date),
used_referral_code: sea_orm::Unchanged(referral_entity.used_referral_code),
user_id: sea_orm::Unchanged(referral_entity.user_id),
};
// If there was a referral, first of all check if credits should be applied to the sender itself (once he spent 100$)
// If these two values are not equal, that means that we have not applied the bonus just yet.
// In that case, we can apply the bonus just now.
if referral_entity.credits_applied_for_referee
!= deltas.apply_usage_bonus_to_request_sender
{
referee::Entity::insert(referee_entry.clone())
.on_conflict(
OnConflict::new()
.values([
(
referee::Column::CreditsAppliedForReferee,
// Make it a "Set"
Expr::col(referee::Column::CreditsAppliedForReferee)
.eq(deltas.sender_bonus_applied),
),
(
referee::Column::CreditsAppliedForReferrer,
Expr::col(referee::Column::CreditsAppliedForReferrer)
.add(deltas.referrer_deposit_delta),
),
])
.values([(
// TODO Make it a "Set", add is hacky (but works ..)
referee::Column::CreditsAppliedForReferee,
Expr::col(referee::Column::CreditsAppliedForReferee)
.add(deltas.apply_usage_bonus_to_request_sender),
)])
.to_owned(),
)
.exec(txn)
.await?;
let user_balance = balance::ActiveModel {
id: sea_orm::NotSet,
total_deposits: sea_orm::Set(deltas.referrer_deposit_delta),
user_id: sea_orm::Set(referral_entity.user_id),
..Default::default()
};
// Also add a bonus to the sender (But this should already have been done with the above code!!)
}
let _ = balance::Entity::insert(user_balance)
// If the bonus to the referrer is non-empty, also apply that
if deltas.bonus_to_referrer > Decimal::from(0) {
referee::Entity::insert(referee_entry)
.on_conflict(
OnConflict::new()
.values([(
balance::Column::TotalDeposits,
Expr::col(balance::Column::TotalDeposits)
.add(deltas.referrer_deposit_delta),
// TODO Make it a "Set", add is hacky (but works ..)
referee::Column::CreditsAppliedForReferrer,
Expr::col(referee::Column::CreditsAppliedForReferrer)
.add(deltas.bonus_to_referrer),
)])
.to_owned(),
)
.exec(txn)
.await?;
}
// Finally, add to the balance of the referrer
let user_balance = balance::ActiveModel {
id: sea_orm::NotSet,
total_deposits: sea_orm::Set(deltas.bonus_to_referrer),
user_id: sea_orm::Set(referrer_code_entity.user_id),
..Default::default()
};
let _ = balance::Entity::insert(user_balance)
.on_conflict(
OnConflict::new()
.values([(
balance::Column::TotalDeposits,
Expr::col(balance::Column::TotalDeposits).add(deltas.bonus_to_referrer),
)])
.to_owned(),
)
.exec(txn)
.await?;
};
Ok(())
}
@ -560,8 +621,8 @@ impl BufferedRpcQueryStats {
let balance_before = *latest_balance;
// Now modify the balance
// TODO: Double check this (perhaps while testing...)
*latest_balance = *latest_balance - deltas.balance_used_outside_free_tier
+ deltas.sender_bonus_balance_deposited;
*latest_balance = *latest_balance - deltas.balance_spent_including_free_credits
+ deltas.usage_bonus_to_request_sender_through_referral;
if *latest_balance < Decimal::from(0) {
*latest_balance = Decimal::from(0);
}
@ -576,11 +637,13 @@ impl BufferedRpcQueryStats {
for rpc_key_entity in rpc_keys {
// TODO: Not sure which one was inserted, just delete both ...
rpc_secret_key_cache.remove(&rpc_key_entity.secret_key.into());
rpc_secret_key_cache
.invalidate(&rpc_key_entity.secret_key.into())
.await;
}
if let Ok(non_zero_user_id) = NonZeroU64::try_from(sender_rpc_entity.user_id) {
user_balance_cache.remove(&non_zero_user_id);
user_balance_cache.invalidate(&non_zero_user_id).await;
}
}
@ -599,7 +662,7 @@ impl BufferedRpcQueryStats {
// // In principle, do not remove the cache for the referrer; the next reload will trigger premium
// // We don't touch the RPC keys at this stage for the refferer, a payment must be paid to reset those (we want to keep things simple here)
// // Anyways, the RPC keys will be updated in 5 min (600 seconds)
// user_balance_cache.remove(&referrer_user_id);
// user_balance_cache.invalidate(&referrer_user_id).await;
// }
// };
@ -821,7 +884,7 @@ impl RpcQueryStats {
method: Option<&str>,
) -> Decimal {
// for now, always return 0 for cost
0.into()
Decimal::new(0, 1)
/*
// some methods should be free. there might be cases where method isn't set (though they should be uncommon)

@ -136,14 +136,14 @@ impl StatBuffer {
}
}
_ = db_save_interval.tick() => {
// info!("DB save internal tick");
trace!("DB save internal tick");
let count = self.save_relational_stats().await;
if count > 0 {
trace!("Saved {} stats to the relational db", count);
}
}
_ = tsdb_save_interval.tick() => {
// info!("TSDB save internal tick");
trace!("TSDB save internal tick");
let count = self.save_tsdb_stats(&bucket).await;
if count > 0 {
trace!("Saved {} stats to the tsdb", count);