diff --git a/Cargo.lock b/Cargo.lock index 33ba79d3..152ac517 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -68,7 +68,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2c99f64d1e06488f620f932677e24bc6e2897582980441ae90a671415bd7ec2f" dependencies = [ "cfg-if", - "getrandom", "once_cell", "version_check", ] @@ -189,6 +188,35 @@ dependencies = [ "tokio", ] +[[package]] +name = "async-io" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fc5b45d93ef0529756f812ca52e44c221b35341892d3dcc34132ac02f3dd2af" +dependencies = [ + "async-lock", + "autocfg", + "cfg-if", + "concurrent-queue", + "futures-lite", + "log", + "parking", + "polling", + "rustix", + "slab", + "socket2", + "waker-fn", +] + +[[package]] +name = "async-lock" +version = "2.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa24f727524730b077666307f2734b4a1a1c57acb79193127dcc8914d5242dd7" +dependencies = [ + "event-listener", +] + [[package]] name = "async-stream" version = "0.3.5" @@ -589,6 +617,12 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "bytecount" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c676a478f63e9fa2dd5368a42f28bba0d6c560b775f38583c8bbaa7fcd67c9c" + [[package]] name = "byteorder" version = "1.4.3" @@ -643,6 +677,19 @@ dependencies = [ "serde", ] +[[package]] +name = "cargo_metadata" +version = "0.14.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4acbb09d9ee8e23699b9634375c72795d095bf268439da88562cf9b501f181fa" +dependencies = [ + "camino", + "cargo-platform", + "semver", + "serde", + "serde_json", +] + [[package]] name = "cargo_metadata" version = "0.15.4" @@ -868,6 +915,15 @@ dependencies = [ "tokio-util", ] +[[package]] +name = "concurrent-queue" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62ec6771ecfa0762d24683ee5a32ad78487a3d3afdc0fb8cae19d2c5deb50b7c" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "console" version = "0.14.1" @@ -1230,7 +1286,7 @@ dependencies = [ "anyhow", "hashbrown 0.14.0", "log", - "quick_cache_ttl", + "moka", "redis-rate-limiter", "tokio", ] @@ -1546,6 +1602,15 @@ dependencies = [ "libc", ] +[[package]] +name = "error-chain" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d2f06b9cac1506ece98fe3231e3cc9c4410ec3d5b1f24ae1c8946f0742cdefc" +dependencies = [ + "version_check", +] + [[package]] name = "eth-keystore" version = "0.5.0" @@ -1711,7 +1776,7 @@ checksum = "6da5fa198af0d3be20c19192df2bd9590b92ce09a8421e793bec8851270f1b05" dependencies = [ "arrayvec", "bytes", - "cargo_metadata", + "cargo_metadata 0.15.4", "chrono", "elliptic-curve 0.13.5", "ethabi", @@ -2107,6 +2172,21 @@ version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964" +[[package]] +name = "futures-lite" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49a9d51ce47660b1e808d3c990b4709f2f415d928835a17dfd16991515c46bce" +dependencies = [ + "fastrand", + "futures-core", + "futures-io", + "memchr", + "parking", + "pin-project-lite", + "waker-fn", +] + [[package]] name = "futures-locks" version = "0.7.1" @@ -2992,6 +3072,15 @@ version = "0.4.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "518ef76f2f87365916b142844c16d8fefd85039bc5699050210a7778ee1cd1de" +[[package]] +name = "mach2" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d0d1830bcd151a6fc4aea1369af235b36c1528fe976b8ff678683c9995eade8" +dependencies = [ + "libc", +] + [[package]] name = "match_cfg" version = "0.1.0" @@ -3126,6 +3215,32 @@ dependencies = [ "winapi", ] +[[package]] +name = "moka" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "36506f2f935238463605f3bb13b362f1949daafc3b347d05d60ae08836db2bd2" +dependencies = [ + "async-io", + "async-lock", + "crossbeam-channel", + "crossbeam-epoch", + "crossbeam-utils", + "futures-util", + "num_cpus", + "once_cell", + "parking_lot 0.12.1", + "quanta", + "rustc_version", + "scheduled-thread-pool", + "skeptic", + "smallvec", + "tagptr", + "thiserror", + "triomphe", + "uuid 1.3.3", +] + [[package]] name = "nanorand" version = "0.7.0" @@ -3555,6 +3670,12 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "parking" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14f2252c834a40ed9bb5422029649578e63aa341ac401f74e719dd1afda8394e" + [[package]] name = "parking_lot" version = "0.11.2" @@ -3869,6 +3990,22 @@ version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "26072860ba924cbfa98ea39c8c19b4dd6a4a25423dbdf219c1eca91aa0cf6964" +[[package]] +name = "polling" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b2d323e8ca7996b3e23126511a523f7e62924d93ecd5ae73b333815b0eb3dce" +dependencies = [ + "autocfg", + "bitflags", + "cfg-if", + "concurrent-queue", + "libc", + "log", + "pin-project-lite", + "windows-sys 0.48.0", +] + [[package]] name = "ppv-lite86" version = "0.2.17" @@ -4041,25 +4178,30 @@ dependencies = [ ] [[package]] -name = "quick_cache" -version = "0.3.0" +name = "pulldown-cmark" +version = "0.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5253a3a0d56548d5b0be25414171dc780cc6870727746d05bd2bde352eee96c5" +checksum = "77a1a2f1f0a7ecff9c31abbe177637be0e97a0aef46cf8738ece09327985d998" dependencies = [ - "ahash 0.8.3", - "hashbrown 0.13.2", - "parking_lot 0.12.1", + "bitflags", + "memchr", + "unicase", ] [[package]] -name = "quick_cache_ttl" -version = "0.1.0" +name = "quanta" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a17e662a7a8291a865152364c20c7abc5e60486ab2001e8ec10b24862de0b9ab" dependencies = [ - "flume", - "log", - "quick_cache", - "serde", - "tokio", + "crossbeam-utils", + "libc", + "mach2", + "once_cell", + "raw-cpuid", + "wasi 0.11.0+wasi-snapshot-preview1", + "web-sys", + "winapi", ] [[package]] @@ -4129,6 +4271,15 @@ dependencies = [ "tokio", ] +[[package]] +name = "raw-cpuid" +version = "10.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c297679cb867470fa8c9f67dbba74a78d78e3e98d7cf2b08d6d71540f797332" +dependencies = [ + "bitflags", +] + [[package]] name = "rayon" version = "1.7.0" @@ -4627,6 +4778,15 @@ dependencies = [ "windows-sys 0.42.0", ] +[[package]] +name = "scheduled-thread-pool" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3cbc66816425a074528352f5789333ecff06ca41b36b0b0efdfbb29edc391a19" +dependencies = [ + "parking_lot 0.12.1", +] + [[package]] name = "scoped-tls" version = "1.0.1" @@ -5248,6 +5408,21 @@ dependencies = [ "time 0.3.21", ] +[[package]] +name = "skeptic" +version = "0.13.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16d23b015676c90a0f01c197bfdc786c20342c73a0afdda9025adb0bc42940a8" +dependencies = [ + "bytecount", + "cargo_metadata 0.14.2", + "error-chain", + "glob", + "pulldown-cmark", + "tempfile", + "walkdir", +] + [[package]] name = "slab" version = "0.4.8" @@ -5587,6 +5762,12 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" +[[package]] +name = "tagptr" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417" + [[package]] name = "tap" version = "1.0.1" @@ -6158,6 +6339,12 @@ dependencies = [ "tracing-log", ] +[[package]] +name = "triomphe" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1ee9bd9239c339d714d657fac840c6d2a4f9c45f4f9ec7b0975113458be78db" + [[package]] name = "try-lock" version = "0.2.4" @@ -6261,6 +6448,15 @@ dependencies = [ "libc", ] +[[package]] +name = "unicase" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50f37be617794602aabbeee0be4f259dc1778fabe05e2d67ee8f79326d5cb4f6" +dependencies = [ + "version_check", +] + [[package]] name = "unicode-bidi" version = "0.3.13" @@ -6385,6 +6581,12 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +[[package]] +name = "waker-fn" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca" + [[package]] name = "walkdir" version = "2.3.3" @@ -6547,6 +6749,7 @@ dependencies = [ "log", "migration", "mimalloc", + "moka", "num", "num-traits", "once_cell", @@ -6556,7 +6759,6 @@ dependencies = [ "payment-contracts", "prettytable", "proctitle", - "quick_cache_ttl", "rdkafka", "redis-rate-limiter", "regex", diff --git a/Cargo.toml b/Cargo.toml index d777eb3f..a766de62 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,7 +5,6 @@ members = [ "latency", "migration", "payment-contracts", - "quick_cache_ttl", "rate-counter", "redis-rate-limiter", "thread-fast-rng", diff --git a/README.md b/README.md index 8b55df42..0f8d4bc5 100644 --- a/README.md +++ b/README.md @@ -171,6 +171,6 @@ Note: Testing with `getLatestBlockByNumber.lua` is not great because the latest Run [ethspam](https://github.com/INFURA/versus) and [versus](https://github.com/shazow/ethspam) for a more realistic load test: - ethspam --rpc http://127.0.0.1:8544 | versus --concurrency=100 --stop-after=10000 http://127.0.0.1:8544 + ethspam --rpc http://127.0.0.1:8544 | versus --concurrency=10 --stop-after=1000 http://127.0.0.1:8544 ethspam --rpc http://127.0.0.1:8544/u/$API_KEY | versus --concurrency=100 --stop-after=10000 http://127.0.0.1:8544/u/$API_KEY diff --git a/deferred-rate-limiter/Cargo.toml b/deferred-rate-limiter/Cargo.toml index c99fea76..52b17b61 100644 --- a/deferred-rate-limiter/Cargo.toml +++ b/deferred-rate-limiter/Cargo.toml @@ -5,10 +5,10 @@ authors = ["Bryan Stitt "] edition = "2021" [dependencies] -quick_cache_ttl = { path = "../quick_cache_ttl" } redis-rate-limiter = { path = "../redis-rate-limiter" } anyhow = "1.0.71" hashbrown = "0.14.0" log = "0.4.18" +moka = { version = "0.11.1", features = ["future"] } tokio = "1.28.2" diff --git a/deferred-rate-limiter/src/lib.rs b/deferred-rate-limiter/src/lib.rs index b15920ea..c3603c77 100644 --- a/deferred-rate-limiter/src/lib.rs +++ b/deferred-rate-limiter/src/lib.rs @@ -1,6 +1,6 @@ //#![warn(missing_docs)] use log::error; -use quick_cache_ttl::CacheWithTTL; +use moka::future::{Cache, CacheBuilder}; use redis_rate_limiter::{RedisRateLimitResult, RedisRateLimiter}; use std::cmp::Eq; use std::fmt::{Debug, Display}; @@ -16,7 +16,7 @@ pub struct DeferredRateLimiter where K: Send + Sync, { - local_cache: CacheWithTTL>, + local_cache: Cache>, prefix: String, rrl: RedisRateLimiter, /// if None, defers to the max on rrl @@ -46,12 +46,9 @@ where // TODO: what do these weigh? // TODO: allow skipping max_capacity // TODO: prefix instead of a static str - let local_cache = CacheWithTTL::new( - "deferred rate limiter", - cache_size, - Duration::from_secs(ttl), - ) - .await; + let local_cache = CacheBuilder::new(cache_size.try_into().unwrap()) + .time_to_live(Duration::from_secs(ttl)) + .build(); Self { local_cache, @@ -91,7 +88,7 @@ where // set arc_deferred_rate_limit_result and return the count self.local_cache - .try_get_or_insert_async::(&key, async move { + .try_get_with_by_ref::<_, anyhow::Error, _>(&key, async move { // we do not use the try operator here because we want to be okay with redis errors let redis_count = match rrl .throttle_label(&redis_key, Some(max_requests_per_period), count) @@ -130,7 +127,8 @@ where Ok(Arc::new(AtomicU64::new(redis_count))) }) - .await? + .await + .map_err(|x| anyhow::anyhow!("cache error! {}", x))? }; let mut locked = deferred_rate_limit_result.lock().await; diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index 217ad5d5..4596a5a1 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -20,7 +20,6 @@ entities = { path = "../entities" } latency = { path = "../latency" } migration = { path = "../migration" } payment-contracts = { path = "../payment-contracts" } -quick_cache_ttl = { path = "../quick_cache_ttl" } redis-rate-limiter = { path = "../redis-rate-limiter" } thread-fast-rng = { path = "../thread-fast-rng" } @@ -68,6 +67,7 @@ itertools = "0.10.5" listenfd = "1.0.1" log = "0.4.18" mimalloc = { version = "0.1.37", optional = true} +moka = { version = "0.11.1", features = ["future"] } num = "0.4.0" num-traits = "0.2.15" once_cell = { version = "1.18.0" } diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index d74300e3..691841bb 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -13,7 +13,7 @@ use crate::jsonrpc::{ }; use crate::relational_db::{get_db, get_migrated_db, DatabaseConnection, DatabaseReplica}; use crate::response_cache::{ - JsonRpcQueryCacheKey, JsonRpcResponseCache, JsonRpcResponseEnum, JsonRpcResponseWeigher, + json_rpc_response_weigher, JsonRpcQueryCacheKey, JsonRpcResponseCache, JsonRpcResponseEnum, }; use crate::rpcs::blockchain::Web3ProxyBlock; use crate::rpcs::consensus::ConsensusWeb3Rpcs; @@ -42,7 +42,8 @@ use ipnet::IpNet; use log::{error, info, trace, warn, Level}; use migration::sea_orm::prelude::Decimal; use migration::sea_orm::{EntityTrait, PaginatorTrait}; -use quick_cache_ttl::{Cache, CacheWithTTL}; +use moka::future::{Cache, CacheBuilder}; +use parking_lot::Mutex; use redis_rate_limiter::redis::AsyncCommands; use redis_rate_limiter::{redis, DeadpoolRuntime, RedisConfig, RedisPool, RedisRateLimiter}; use serde::Serialize; @@ -51,7 +52,7 @@ use serde_json::value::RawValue; use std::borrow::Cow; use std::fmt; use std::net::IpAddr; -use std::num::{NonZeroU32, NonZeroU64}; +use std::num::NonZeroU64; use std::str::FromStr; use std::sync::{atomic, Arc}; use std::time::Duration; @@ -110,8 +111,8 @@ pub struct AuthorizationChecks { } /// Cache data from the database about rpc keys -pub type RpcSecretKeyCache = Arc>; -pub type UserBalanceCache = Arc>>>; // Could also be an AtomicDecimal +pub type RpcSecretKeyCache = Cache; +pub type UserBalanceCache = Cache>>; /// The application // TODO: i'm sure this is more arcs than necessary, but spawning futures makes references hard @@ -144,7 +145,7 @@ pub struct Web3ProxyApp { pub internal_provider: Arc, /// store pending transactions that we've seen so that we don't send duplicates to subscribers /// TODO: think about this more. might be worth storing if we sent the transaction or not and using this for automatic retries - pub pending_transactions: Arc>, + pub pending_transactions: Cache, /// rate limit anonymous users pub frontend_ip_rate_limiter: Option>, /// rate limit authenticated users @@ -398,17 +399,14 @@ impl Web3ProxyApp { // if there is no database of users, there will be no keys and so this will be empty // TODO: max_capacity from config // TODO: ttl from config - let rpc_secret_key_cache = CacheWithTTL::arc_with_capacity( - "rpc_secret_key_cache", - 10_000, - Duration::from_secs(600), - ) - .await; + let rpc_secret_key_cache = CacheBuilder::new(10_000) + .time_to_live(Duration::from_secs(600)) + .build(); // TODO: TTL left low, this could also be a solution instead of modifiying the cache, that may be disgusting across threads / slow anyways - let user_balance_cache = - CacheWithTTL::arc_with_capacity("user_balance_cache", 10_000, Duration::from_secs(600)) - .await; + let user_balance_cache = CacheBuilder::new(10_000) + .time_to_live(Duration::from_secs(600)) + .build(); // create a channel for receiving stats // we do this in a channel so we don't slow down our response to the users @@ -502,27 +500,26 @@ impl Web3ProxyApp { // TODO: different chains might handle this differently // TODO: what should we set? 5 minutes is arbitrary. the nodes themselves hold onto transactions for much longer // TODO: this used to be time_to_update, but - let pending_transactions = CacheWithTTL::arc_with_capacity( - "pending_transactions", - 10_000, - Duration::from_secs(300), - ) - .await; + let pending_transactions = CacheBuilder::new(10_000) + .time_to_live(Duration::from_secs(300)) + .build(); // responses can be very different in sizes, so this is a cache with a max capacity and a weigher // TODO: we should emit stats to calculate a more accurate expected cache size // TODO: do we actually want a TTL on this? // TODO: configurable max item weight instead of using ~0.1% // TODO: resize the cache automatically - let response_cache = JsonRpcResponseCache::new_with_weights( - "response_cache", - (top_config.app.response_cache_max_bytes / 16_384) as usize, - NonZeroU32::try_from((top_config.app.response_cache_max_bytes / 1024) as u32).unwrap(), - top_config.app.response_cache_max_bytes, - JsonRpcResponseWeigher, - Duration::from_secs(3600), - ) - .await; + let response_cache: JsonRpcResponseCache = + CacheBuilder::new(top_config.app.response_cache_max_bytes) + .weigher(json_rpc_response_weigher) + .build(); + // (top_config.app.response_cache_max_bytes / 16_384) as usize, + // NonZeroU32::try_from((top_config.app.response_cache_max_bytes / 1024) as u32).unwrap(), + // top_config.app.response_cache_max_bytes, + // JsonRpcResponseWeigher, + // Duration::from_secs(3600), + // ) + // .await; // TODO: how should we handle hitting this max? let max_users = 20_000; @@ -1160,7 +1157,7 @@ impl Web3ProxyApp { .await { Ok(response_data) => (StatusCode::OK, response_data), - Err(err) => err.into_response_parts(), + Err(err) => err.as_response_parts(), }; let response = JsonRpcForwardedResponse::from_response_data(response_data, response_id); @@ -1699,12 +1696,15 @@ impl Web3ProxyApp { let to_block_num = cache_key.to_block_num(); let cache_errors = cache_key.cache_errors(); - match self + // moka makes us do annoying things with arcs + enum CacheError { + NotCached(JsonRpcResponseEnum>), + Error(Arc), + } + + let x = self .jsonrpc_response_cache - .get_value_or_guard_async(cache_key.hash()).await - { - Ok(x) => x, - Err(x) => { + .try_get_with::<_, Mutex>(cache_key.hash(), async { let response_data = timeout( duration, self.balanced_rpcs @@ -1716,16 +1716,32 @@ impl Web3ProxyApp { to_block_num.as_ref(), ) ) - .await?; + .await + .map_err(|x| Mutex::new(CacheError::Error(Arc::new(Web3ProxyError::from(x)))))?; - let response_data: JsonRpcResponseEnum> = response_data.try_into()?; + // TODO: i think response data should be Arc>>, but that's more work + let response_data: JsonRpcResponseEnum> = response_data.try_into() + .map_err(|x| Mutex::new(CacheError::Error(Arc::new(x))))?; - if matches!(response_data, JsonRpcResponseEnum::Result { .. }) || cache_errors { - // TODO: convert the Box to an Arc? - x.insert(response_data.clone()); + // TODO: read max size from the config + if response_data.num_bytes() as u64 > self.config.response_cache_max_bytes / 1000 { + Err(Mutex::new(CacheError::NotCached(response_data))) + } else if matches!(response_data, JsonRpcResponseEnum::Result { .. }) || cache_errors { + Ok(response_data) + } else { + Err(Mutex::new(CacheError::NotCached(response_data))) } + }).await; - response_data + match x { + Ok(x) => x, + Err(arc_err) => { + let locked = arc_err.lock(); + + match &*locked { + CacheError::Error(err) => return Err(Web3ProxyError::Arc(err.clone())), + CacheError::NotCached(x) => x.clone(), + } } } } else { diff --git a/web3_proxy/src/errors.rs b/web3_proxy/src/errors.rs index 3a17bbd0..84771670 100644 --- a/web3_proxy/src/errors.rs +++ b/web3_proxy/src/errors.rs @@ -22,6 +22,7 @@ use rust_decimal::Error as DecimalError; use serde::Serialize; use serde_json::value::RawValue; use std::error::Error; +use std::sync::Arc; use std::{borrow::Cow, net::IpAddr}; use tokio::{sync::AcquireError, task::JoinError, time::Instant}; @@ -42,6 +43,7 @@ pub enum Web3ProxyError { AccessDenied, #[error(ignore)] Anyhow(anyhow::Error), + Arc(Arc), #[error(ignore)] #[from(ignore)] BadRequest(Cow<'static, str>), @@ -154,7 +156,7 @@ pub enum Web3ProxyError { } impl Web3ProxyError { - pub fn into_response_parts(self) -> (StatusCode, JsonRpcResponseEnum) { + pub fn as_response_parts(&self) -> (StatusCode, JsonRpcResponseEnum) { // TODO: include a unique request id in the data let (code, err): (StatusCode, JsonRpcErrorData) = match self { Self::Abi(err) => { @@ -192,6 +194,10 @@ impl Web3ProxyError { }, ) } + Self::Arc(err) => { + // recurse + return err.as_response_parts::(); + } Self::BadRequest(err) => { debug!("BAD_REQUEST: {}", err); ( @@ -533,7 +539,10 @@ impl Web3ProxyError { }, ) } - Self::JsonRpcErrorData(jsonrpc_error_data) => (StatusCode::OK, jsonrpc_error_data), + Self::JsonRpcErrorData(jsonrpc_error_data) => { + // TODO: do this without clone? the Arc needed it though + (StatusCode::OK, jsonrpc_error_data.clone()) + } Self::MsgPackEncode(err) => { warn!("MsgPackEncode Error: {}", err); ( @@ -829,9 +838,9 @@ impl Web3ProxyError { } ( - status_code, + *status_code, JsonRpcErrorData { - message: Cow::Owned(err_msg), + message: err_msg.to_owned().into(), code: code.into(), data: None, }, @@ -840,7 +849,7 @@ impl Web3ProxyError { Self::Timeout(x) => ( StatusCode::REQUEST_TIMEOUT, JsonRpcErrorData { - message: Cow::Owned(format!("request timed out: {:?}", x)), + message: format!("request timed out: {:?}", x).into(), code: StatusCode::REQUEST_TIMEOUT.as_u16().into(), // TODO: include the actual id! data: None, @@ -851,7 +860,7 @@ impl Web3ProxyError { ( StatusCode::BAD_REQUEST, JsonRpcErrorData { - message: Cow::Owned(err.to_string()), + message: err.to_string().into(), code: StatusCode::BAD_REQUEST.as_u16().into(), data: None, }, @@ -862,7 +871,7 @@ impl Web3ProxyError { ( StatusCode::BAD_REQUEST, JsonRpcErrorData { - message: Cow::Owned(format!("{}", err)), + message: format!("{}", err).into(), code: StatusCode::BAD_REQUEST.as_u16().into(), data: None, }, @@ -873,7 +882,7 @@ impl Web3ProxyError { ( StatusCode::BAD_GATEWAY, JsonRpcErrorData { - message: Cow::Borrowed("no servers synced. unknown eth_blockNumber"), + message: "no servers synced. unknown eth_blockNumber".into(), code: StatusCode::BAD_GATEWAY.as_u16().into(), data: None, }, @@ -960,9 +969,7 @@ impl Web3ProxyError { ( StatusCode::BAD_REQUEST, JsonRpcErrorData { - message: Cow::Borrowed( - "redirect_public_url not set. only websockets work here", - ), + message: "redirect_public_url not set. only websockets work here".into(), code: StatusCode::BAD_REQUEST.as_u16().into(), data: None, }, @@ -971,14 +978,14 @@ impl Web3ProxyError { Self::WithContext(err, msg) => match err { Some(err) => { warn!("{:#?} w/ context {}", err, msg); - return err.into_response_parts(); + return err.as_response_parts(); } None => { warn!("error w/ context {}", msg); ( StatusCode::INTERNAL_SERVER_ERROR, JsonRpcErrorData { - message: Cow::Owned(msg), + message: msg.to_owned().into(), code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), data: None, }, @@ -991,7 +998,7 @@ impl Web3ProxyError { } pub fn into_response_with_id(self, id: Box) -> Response { - let (status_code, response_data) = self.into_response_parts(); + let (status_code, response_data) = self.as_response_parts(); let response = JsonRpcForwardedResponse::from_response_data(response_data, id); diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index 63798948..ecdf4bed 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -30,7 +30,6 @@ use rdkafka::producer::{FutureProducer, FutureRecord}; use rdkafka::util::Timeout as KafkaTimeout; use redis_rate_limiter::redis::AsyncCommands; use redis_rate_limiter::RedisRateLimitResult; -use std::convert::Infallible; use std::fmt::Display; use std::hash::{Hash, Hasher}; use std::mem; @@ -921,13 +920,12 @@ impl Web3ProxyApp { if let Some(max_concurrent_requests) = self.config.public_max_concurrent_requests { let semaphore = self .ip_semaphores - .get_or_insert_async::(ip, async move { + .get_with_by_ref(ip, async { // TODO: set max_concurrent_requests dynamically based on load? let s = Semaphore::new(max_concurrent_requests); - Ok(Arc::new(s)) + Arc::new(s) }) - .await - .expect("infallible"); + .await; let semaphore_permit = semaphore.acquire_owned().await?; @@ -951,12 +949,11 @@ impl Web3ProxyApp { let semaphore = self .user_semaphores - .get_or_insert_async::(&user_id, async move { + .get_with_by_ref(&user_id, async move { let s = Semaphore::new(max_concurrent_requests as usize); - Ok(Arc::new(s)) + Arc::new(s) }) - .await - .expect("infallible"); + .await; let semaphore_permit = semaphore.acquire_owned().await?; @@ -980,12 +977,11 @@ impl Web3ProxyApp { // limit concurrent requests let semaphore = self .bearer_token_semaphores - .get_or_insert_async::(&user_bearer_token, async move { + .get_with_by_ref(&user_bearer_token, async move { let s = Semaphore::new(self.config.bearer_token_max_concurrent_requests as usize); - Ok(Arc::new(s)) + Arc::new(s) }) - .await - .expect("infallible"); + .await; let semaphore_permit = semaphore.acquire_owned().await?; @@ -1139,25 +1135,25 @@ impl Web3ProxyApp { ) -> Web3ProxyResult>> { match NonZeroU64::try_from(user_id) { Err(_) => Ok(Arc::new(RwLock::new(Decimal::default()))), - Ok(x) => { - self.user_balance_cache - .try_get_or_insert_async(&x, async move { - let db_replica = self - .db_replica() - .web3_context("Getting database connection")?; + Ok(x) => self + .user_balance_cache + .try_get_with_by_ref(&x, async move { + let db_replica = self + .db_replica() + .web3_context("Getting database connection")?; - let balance: Decimal = match balance::Entity::find() - .filter(balance::Column::UserId.eq(user_id)) - .one(db_replica.as_ref()) - .await? - { - Some(x) => x.total_deposits - x.total_spent_outside_free_tier, - None => Decimal::default(), - }; - Ok(Arc::new(RwLock::new(balance))) - }) - .await - } + let balance: Decimal = match balance::Entity::find() + .filter(balance::Column::UserId.eq(user_id)) + .one(db_replica.as_ref()) + .await? + { + Some(x) => x.total_deposits - x.total_spent_outside_free_tier, + None => Decimal::default(), + }; + Ok(Arc::new(RwLock::new(balance))) + }) + .await + .map_err(Into::into), } } @@ -1168,7 +1164,7 @@ impl Web3ProxyApp { rpc_secret_key: RpcSecretKey, ) -> Web3ProxyResult { self.rpc_secret_key_cache - .try_get_or_insert_async(&rpc_secret_key, async move { + .try_get_with_by_ref(&rpc_secret_key, async move { // trace!(?rpc_secret_key, "user cache miss"); let db_replica = self @@ -1303,6 +1299,7 @@ impl Web3ProxyApp { } }) .await + .map_err(Into::into) } /// Authorized the ip/origin/referer/useragent and rate limit and concurrency diff --git a/web3_proxy/src/frontend/mod.rs b/web3_proxy/src/frontend/mod.rs index 952dc11c..112f8ef4 100644 --- a/web3_proxy/src/frontend/mod.rs +++ b/web3_proxy/src/frontend/mod.rs @@ -18,7 +18,7 @@ use axum::{ use http::{header::AUTHORIZATION, StatusCode}; use listenfd::ListenFd; use log::{debug, info}; -use quick_cache_ttl::UnitWeighter; +use moka::future::{Cache, CacheBuilder}; use std::net::SocketAddr; use std::sync::Arc; use std::{iter::once, time::Duration}; @@ -37,12 +37,7 @@ pub enum ResponseCacheKey { Status, } -pub type ResponseCache = quick_cache_ttl::CacheWithTTL< - ResponseCacheKey, - (StatusCode, &'static str, axum::body::Bytes), - UnitWeighter, - quick_cache_ttl::DefaultHashBuilder, ->; +pub type ResponseCache = Cache; /// Start the frontend server. pub async fn serve( @@ -58,12 +53,9 @@ pub async fn serve( debug!("response_cache size: {}", response_cache_size); - let response_cache = ResponseCache::new( - "response_cache", - response_cache_size, - Duration::from_secs(1), - ) - .await; + let response_cache: ResponseCache = CacheBuilder::new(response_cache_size as u64) + .time_to_live(Duration::from_secs(1)) + .build(); // TODO: read config for if fastest/versus should be available publicly. default off diff --git a/web3_proxy/src/frontend/rpc_proxy_ws.rs b/web3_proxy/src/frontend/rpc_proxy_ws.rs index f3f12869..e597a9ab 100644 --- a/web3_proxy/src/frontend/rpc_proxy_ws.rs +++ b/web3_proxy/src/frontend/rpc_proxy_ws.rs @@ -330,7 +330,7 @@ async fn handle_socket_payload( let (authorization, semaphore) = match authorization.check_again(&app).await { Ok((a, s)) => (a, s), Err(err) => { - let (_, err) = err.into_response_parts(); + let (_, err) = err.as_response_parts(); let err = JsonRpcForwardedResponse::from_response_data(err, Default::default()); @@ -437,7 +437,7 @@ async fn handle_socket_payload( let response_str = match response { Ok(x) => serde_json::to_string(&x).expect("to_string should always work here"), Err(err) => { - let (_, response_data) = err.into_response_parts(); + let (_, response_data) = err.as_response_parts(); let response = JsonRpcForwardedResponse::from_response_data(response_data, response_id); diff --git a/web3_proxy/src/frontend/status.rs b/web3_proxy/src/frontend/status.rs index 46d4af15..bb2e840b 100644 --- a/web3_proxy/src/frontend/status.rs +++ b/web3_proxy/src/frontend/status.rs @@ -13,7 +13,9 @@ use axum::{ }; use axum_macros::debug_handler; use log::trace; +use moka::future::Cache; use once_cell::sync::Lazy; +use serde::{ser::SerializeStruct, Serialize}; use serde_json::json; use std::sync::Arc; @@ -33,7 +35,7 @@ pub async fn health( Extension(cache): Extension>, ) -> impl IntoResponse { let (code, content_type, body) = cache - .get_or_insert_async(&ResponseCacheKey::Health, async move { _health(app).await }) + .get_with(ResponseCacheKey::Health, async move { _health(app).await }) .await; Response::builder() @@ -66,7 +68,7 @@ pub async fn backups_needed( Extension(cache): Extension>, ) -> impl IntoResponse { let (code, content_type, body) = cache - .get_or_insert_async(&ResponseCacheKey::BackupsNeeded, async move { + .get_with(ResponseCacheKey::BackupsNeeded, async move { _backups_needed(app).await }) .await; @@ -117,7 +119,7 @@ pub async fn status( Extension(cache): Extension>, ) -> impl IntoResponse { let (code, content_type, body) = cache - .get_or_insert_async(&ResponseCacheKey::Status, async move { _status(app).await }) + .get_with(ResponseCacheKey::Status, async move { _status(app).await }) .await; Response::builder() @@ -139,10 +141,10 @@ async fn _status(app: Arc) -> (StatusCode, &'static str, Bytes) { "bundler_4337_rpcs": app.bundler_4337_rpcs, "chain_id": app.config.chain_id, "hostname": app.hostname, - "jsonrpc_response_cache": app.jsonrpc_response_cache, + "jsonrpc_response_cache": MokaCacheSerializer(&app.jsonrpc_response_cache), "private_rpcs": app.private_rpcs, - "rpc_secret_key_cache": app.rpc_secret_key_cache, - "user_balance_cache": app.user_balance_cache, + "rpc_secret_key_cache": MokaCacheSerializer(&app.rpc_secret_key_cache), + "user_balance_cache": MokaCacheSerializer(&app.user_balance_cache), "version": APP_USER_AGENT, }); @@ -158,3 +160,22 @@ async fn _status(app: Arc) -> (StatusCode, &'static str, Bytes) { (code, CONTENT_TYPE_JSON, body) } + +pub struct MokaCacheSerializer<'a, K, V>(pub &'a Cache); + +impl<'a, K, V> Serialize for MokaCacheSerializer<'a, K, V> { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + let mut state = serializer.serialize_struct("MokaCache", 2)?; + + self.0.weighted_size(); + + state.serialize_field("entry_count", &self.0.entry_count())?; + + todo!(); + + // state.end() + } +} diff --git a/web3_proxy/src/frontend/users/payment.rs b/web3_proxy/src/frontend/users/payment.rs index 6f3ee818..2b2b0bcb 100644 --- a/web3_proxy/src/frontend/users/payment.rs +++ b/web3_proxy/src/frontend/users/payment.rs @@ -295,13 +295,14 @@ pub async fn user_balance_post( match NonZeroU64::try_from(recipient.id) { Err(_) => {} Ok(x) => { - app.user_balance_cache.remove(&x); + app.user_balance_cache.invalidate(&x).await; } }; for rpc_key_entity in rpc_keys { app.rpc_secret_key_cache - .remove(&rpc_key_entity.secret_key.into()); + .invalidate(&rpc_key_entity.secret_key.into()) + .await; } let x = json!({ diff --git a/web3_proxy/src/response_cache.rs b/web3_proxy/src/response_cache.rs index 562e7de4..348016d3 100644 --- a/web3_proxy/src/response_cache.rs +++ b/web3_proxy/src/response_cache.rs @@ -2,12 +2,11 @@ use crate::{errors::Web3ProxyError, jsonrpc::JsonRpcErrorData, rpcs::blockchain: use derive_more::From; use ethers::{providers::ProviderError, types::U64}; use hashbrown::hash_map::DefaultHashBuilder; -use quick_cache_ttl::{CacheWithTTL, Weighter}; +use moka::future::Cache; use serde_json::value::RawValue; use std::{ borrow::Cow, hash::{BuildHasher, Hash, Hasher}, - num::NonZeroU32, sync::Arc, }; @@ -82,28 +81,24 @@ impl JsonRpcQueryCacheKey { } } -pub type JsonRpcResponseCache = - CacheWithTTL>, JsonRpcResponseWeigher>; - -#[derive(Clone)] -pub struct JsonRpcResponseWeigher; +pub type JsonRpcResponseCache = Cache>>; /// TODO: we might need one that holds RawValue and one that holds serde_json::Value #[derive(Clone, Debug)] pub enum JsonRpcResponseEnum { Result { value: R, - num_bytes: NonZeroU32, + num_bytes: u32, }, RpcError { error_data: JsonRpcErrorData, - num_bytes: NonZeroU32, + num_bytes: u32, }, } // TODO: impl for other inner result types? impl JsonRpcResponseEnum { - pub fn num_bytes(&self) -> NonZeroU32 { + pub fn num_bytes(&self) -> u32 { match self { Self::Result { num_bytes, .. } => *num_bytes, Self::RpcError { num_bytes, .. } => *num_bytes, @@ -123,7 +118,7 @@ impl From> for JsonRpcResponseEnum> { fn from(value: Arc) -> Self { let num_bytes = value.get().len(); - let num_bytes = NonZeroU32::try_from(num_bytes as u32).unwrap(); + let num_bytes = num_bytes as u32; Self::Result { value, num_bytes } } @@ -133,7 +128,7 @@ impl From> for JsonRpcResponseEnum> { fn from(value: Box) -> Self { let num_bytes = value.get().len(); - let num_bytes = NonZeroU32::try_from(num_bytes as u32).unwrap(); + let num_bytes = num_bytes as u32; let value = value.into(); @@ -190,7 +185,7 @@ impl From for JsonRpcResponseEnum { // TODO: wrap the error in a complete response? let num_bytes = serde_json::to_string(&value).unwrap().len(); - let num_bytes = NonZeroU32::try_from(num_bytes as u32).unwrap(); + let num_bytes = num_bytes as u32; Self::RpcError { error_data: value, @@ -235,64 +230,55 @@ impl TryFrom for JsonRpcErrorData { } } -// TODO: instead of Arc, be generic -impl Weighter>> for JsonRpcResponseWeigher { - fn weight(&self, _key: &K, _qey: &Q, value: &JsonRpcResponseEnum>) -> NonZeroU32 { - value.num_bytes() - } +pub fn json_rpc_response_weigher(_key: &K, value: &JsonRpcResponseEnum) -> u32 { + value.num_bytes() } #[cfg(test)] mod tests { - use super::{JsonRpcResponseEnum, JsonRpcResponseWeigher}; - use quick_cache_ttl::CacheWithTTL; + use super::JsonRpcResponseEnum; + use crate::response_cache::json_rpc_response_weigher; + use moka::future::{Cache, CacheBuilder}; use serde_json::value::RawValue; - use std::{num::NonZeroU32, sync::Arc, time::Duration}; + use std::{sync::Arc, time::Duration}; #[tokio::test(start_paused = true)] async fn test_json_rpc_query_weigher() { let max_item_weight = 200; let weight_capacity = 1_000; - let test_cache: CacheWithTTL< - u32, - JsonRpcResponseEnum>, - JsonRpcResponseWeigher, - > = CacheWithTTL::new_with_weights( - "test", - 5, - max_item_weight.try_into().unwrap(), - weight_capacity, - JsonRpcResponseWeigher, - Duration::from_secs(2), - ) - .await; + let test_cache: Cache>> = + CacheBuilder::new(weight_capacity) + .weigher(json_rpc_response_weigher) + .time_to_live(Duration::from_secs(2)) + .build(); let small_data = JsonRpcResponseEnum::Result { value: Box::::default().into(), - num_bytes: NonZeroU32::try_from(max_item_weight / 2).unwrap(), + num_bytes: max_item_weight, }; let max_sized_data = JsonRpcResponseEnum::Result { value: Box::::default().into(), - num_bytes: NonZeroU32::try_from(max_item_weight).unwrap(), + num_bytes: max_item_weight, }; let oversized_data = JsonRpcResponseEnum::Result { value: Box::::default().into(), - num_bytes: NonZeroU32::try_from(max_item_weight * 2).unwrap(), + num_bytes: max_item_weight * 2, }; - test_cache.try_insert(0, small_data).unwrap(); + test_cache.insert(0, small_data).await; test_cache.get(&0).unwrap(); - test_cache.try_insert(1, max_sized_data).unwrap(); + test_cache.insert(1, max_sized_data).await; test_cache.get(&0).unwrap(); test_cache.get(&1).unwrap(); - test_cache.try_insert(2, oversized_data).unwrap_err(); + // TODO: this will currently work! need to wrap moka cache in a checked insert + test_cache.insert(2, oversized_data).await; test_cache.get(&0).unwrap(); test_cache.get(&1).unwrap(); diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index 4d4375db..f1fe2e80 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -9,7 +9,7 @@ use crate::frontend::authorization::Authorization; use derive_more::From; use ethers::prelude::{Block, TxHash, H256, U64}; use log::{debug, trace, warn}; -use quick_cache_ttl::CacheWithTTL; +use moka::future::Cache; use serde::ser::SerializeStruct; use serde::Serialize; use serde_json::json; @@ -20,8 +20,8 @@ use tokio::sync::broadcast; // TODO: type for Hydrated Blocks with their full transactions? pub type ArcBlock = Arc>; -pub type BlocksByHashCache = Arc>; -pub type BlocksByNumberCache = Arc>; +pub type BlocksByHashCache = Cache; +pub type BlocksByNumberCache = Cache; /// A block and its age. #[derive(Clone, Debug, Default, From)] @@ -172,8 +172,6 @@ impl Web3Rpcs { return Ok(block); } - let block_num = block.number(); - // this block is very likely already in block_hashes // TODO: use their get_with let block_hash = *block.hash(); @@ -182,15 +180,17 @@ impl Web3Rpcs { if heaviest_chain { // this is the only place that writes to block_numbers // multiple inserts should be okay though - // TODO: info that there was a fork? + // TODO: info if there was a fork? + let block_num = block.number(); + self.blocks_by_number - .get_or_insert_async(block_num, async move { block_hash }) + .get_with_by_ref(block_num, async move { block_hash }) .await; } let block = self .blocks_by_hash - .get_or_insert_async(&block_hash, async move { block }) + .get_with_by_ref(&block_hash, async move { block }) .await; Ok(block) @@ -468,7 +468,7 @@ impl Web3Rpcs { // hash changed debug!( - "unc {}/{} {}{}/{}/{} con_head={} old={} rpc={}@{}", + "unc {}/{} {}{}/{}/{} con={} old={} rpc={}@{}", consensus_tier, total_tiers, backups_voted_str, diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index ed7073d9..4d54613f 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -8,11 +8,10 @@ use ethers::prelude::{H256, U64}; use hashbrown::{HashMap, HashSet}; use itertools::{Itertools, MinMaxResult}; use log::{trace, warn}; -use quick_cache_ttl::Cache; +use moka::future::Cache; use serde::Serialize; use std::cmp::{Ordering, Reverse}; use std::collections::BTreeMap; -use std::convert::Infallible; use std::fmt; use std::sync::Arc; use tokio::time::Instant; @@ -378,9 +377,8 @@ impl ConsensusFinder { async fn insert(&mut self, rpc: Arc, block: Web3ProxyBlock) -> Option { let first_seen = self .first_seen - .get_or_insert_async::(block.hash(), async { Ok(Instant::now()) }) - .await - .expect("this cache get is infallible"); + .get_with_by_ref(block.hash(), async { Instant::now() }) + .await; // calculate elapsed time before trying to lock let latency = first_seen.elapsed(); diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index a2369173..b5f60012 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -8,6 +8,7 @@ use crate::config::{BlockAndRpc, TxHashAndRpc, Web3RpcConfig}; use crate::errors::{Web3ProxyError, Web3ProxyResult}; use crate::frontend::authorization::{Authorization, RequestMetadata}; use crate::frontend::rpc_proxy_ws::ProxyMode; +use crate::frontend::status::MokaCacheSerializer; use crate::jsonrpc::{JsonRpcErrorData, JsonRpcParams, JsonRpcResultData}; use crate::rpcs::transactions::TxStatus; use arc_swap::ArcSwap; @@ -21,8 +22,8 @@ use hashbrown::{HashMap, HashSet}; use itertools::Itertools; use log::{debug, error, info, trace, warn}; use migration::sea_orm::DatabaseConnection; +use moka::future::{Cache, CacheBuilder}; use ordered_float::OrderedFloat; -use quick_cache_ttl::CacheWithTTL; use serde::ser::{SerializeStruct, Serializer}; use serde::Serialize; use serde_json::json; @@ -53,7 +54,7 @@ pub struct Web3Rpcs { /// this head receiver makes it easy to wait until there is a new block pub(super) watch_consensus_head_sender: Option>>, /// keep track of transactions that we have sent through subscriptions - pub(super) pending_transaction_cache: Arc>, + pub(super) pending_transaction_cache: Cache, pub(super) pending_tx_id_receiver: flume::Receiver, pub(super) pending_tx_id_sender: flume::Sender, /// TODO: this map is going to grow forever unless we do some sort of pruning. maybe store pruned in redis? @@ -81,7 +82,7 @@ impl Web3Rpcs { min_head_rpcs: usize, min_sum_soft_limit: u32, name: String, - pending_transaction_cache: Arc>, + pending_transaction_cache: Cache, pending_tx_sender: Option>, watch_consensus_head_sender: Option>>, ) -> anyhow::Result<( @@ -96,16 +97,16 @@ impl Web3Rpcs { // these blocks don't have full transactions, but they do have rather variable amounts of transaction hashes // TODO: actual weighter on this // TODO: time_to_idle instead? - let blocks_by_hash: BlocksByHashCache = Arc::new( - CacheWithTTL::new("blocks_by_hash", 1_000, Duration::from_secs(30 * 60)).await, - ); + let blocks_by_hash: BlocksByHashCache = CacheBuilder::new(1_000) + .time_to_idle(Duration::from_secs(30 * 60)) + .build(); // all block numbers are the same size, so no need for weigher // TODO: limits from config // TODO: time_to_idle instead? - let blocks_by_number = Arc::new( - CacheWithTTL::new("blocks_by_number", 1_000, Duration::from_secs(30 * 60)).await, - ); + let blocks_by_number = CacheBuilder::new(1_000) + .time_to_idle(Duration::from_secs(30 * 60)) + .build(); let (watch_consensus_rpcs_sender, consensus_connections_watcher) = watch::channel(Default::default()); @@ -1254,9 +1255,15 @@ impl Serialize for Web3Rpcs { } } - state.serialize_field("blocks_by_hash", &self.blocks_by_hash)?; - state.serialize_field("blocks_by_number", &self.blocks_by_number)?; - state.serialize_field("pending_transaction_cache", &self.pending_transaction_cache)?; + state.serialize_field("blocks_by_hash", &MokaCacheSerializer(&self.blocks_by_hash))?; + state.serialize_field( + "blocks_by_number", + &MokaCacheSerializer(&self.blocks_by_number), + )?; + state.serialize_field( + "pending_transaction_cache", + &MokaCacheSerializer(&self.pending_transaction_cache), + )?; state.serialize_field("block_sender_len", &self.block_sender.len())?; @@ -1308,6 +1315,7 @@ mod tests { use ethers::types::{Block, U256}; use latency::PeakEwmaLatency; use log::{trace, LevelFilter}; + use moka::future::CacheBuilder; use parking_lot::RwLock; use tokio::sync::RwLock as AsyncRwLock; @@ -1488,26 +1496,17 @@ mod tests { name: "test".to_string(), watch_consensus_head_sender: Some(watch_consensus_head_sender), watch_consensus_rpcs_sender, - pending_transaction_cache: CacheWithTTL::arc_with_capacity( - "pending_transaction_cache", - 100, - Duration::from_secs(60), - ) - .await, + pending_transaction_cache: CacheBuilder::new(100) + .time_to_live(Duration::from_secs(60)) + .build(), pending_tx_id_receiver, pending_tx_id_sender, - blocks_by_hash: CacheWithTTL::arc_with_capacity( - "blocks_by_hash", - 100, - Duration::from_secs(60), - ) - .await, - blocks_by_number: CacheWithTTL::arc_with_capacity( - "blocks_by_number", - 100, - Duration::from_secs(60), - ) - .await, + blocks_by_hash: CacheBuilder::new(100) + .time_to_live(Duration::from_secs(60)) + .build(), + blocks_by_number: CacheBuilder::new(100) + .time_to_live(Duration::from_secs(60)) + .build(), // TODO: test max_block_age? max_block_age: None, // TODO: test max_block_lag? @@ -1777,26 +1776,17 @@ mod tests { name: "test".to_string(), watch_consensus_head_sender: Some(watch_consensus_head_sender), watch_consensus_rpcs_sender, - pending_transaction_cache: CacheWithTTL::arc_with_capacity( - "pending_transaction_cache", - 100, - Duration::from_secs(120), - ) - .await, + pending_transaction_cache: CacheBuilder::new(100) + .time_to_live(Duration::from_secs(120)) + .build(), pending_tx_id_receiver, pending_tx_id_sender, - blocks_by_hash: CacheWithTTL::arc_with_capacity( - "blocks_by_hash", - 100, - Duration::from_secs(120), - ) - .await, - blocks_by_number: CacheWithTTL::arc_with_capacity( - "blocks_by_number", - 100, - Duration::from_secs(120), - ) - .await, + blocks_by_hash: CacheBuilder::new(100) + .time_to_live(Duration::from_secs(120)) + .build(), + blocks_by_number: CacheBuilder::new(100) + .time_to_live(Duration::from_secs(120)) + .build(), min_head_rpcs: 1, min_sum_soft_limit: 4_000, max_block_age: None, @@ -1972,26 +1962,11 @@ mod tests { name: "test".to_string(), watch_consensus_head_sender: Some(watch_consensus_head_sender), watch_consensus_rpcs_sender, - pending_transaction_cache: CacheWithTTL::arc_with_capacity( - "pending_transaction_cache", - 10_000, - Duration::from_secs(120), - ) - .await, + pending_transaction_cache: Cache::new(10_000), pending_tx_id_receiver, pending_tx_id_sender, - blocks_by_hash: CacheWithTTL::arc_with_capacity( - "blocks_by_hash", - 10_000, - Duration::from_secs(120), - ) - .await, - blocks_by_number: CacheWithTTL::arc_with_capacity( - "blocks_by_number", - 10_000, - Duration::from_secs(120), - ) - .await, + blocks_by_hash: Cache::new(10_000), + blocks_by_number: Cache::new(10_000), min_head_rpcs: 1, min_sum_soft_limit: 1_000, max_block_age: None, diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index 909c35a4..fcd8cdab 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -416,7 +416,7 @@ impl Web3Rpc { // if we already have this block saved, set new_head_block to that arc. otherwise store this copy let new_head_block = block_map - .get_or_insert_async(&new_hash, async move { new_head_block }) + .get_with_by_ref(&new_hash, async move { new_head_block }) .await; // save the block so we don't send the same one multiple times diff --git a/web3_proxy/src/stats/mod.rs b/web3_proxy/src/stats/mod.rs index 5d3cbf63..57249b09 100644 --- a/web3_proxy/src/stats/mod.rs +++ b/web3_proxy/src/stats/mod.rs @@ -576,11 +576,13 @@ impl BufferedRpcQueryStats { for rpc_key_entity in rpc_keys { // TODO: Not sure which one was inserted, just delete both ... - rpc_secret_key_cache.remove(&rpc_key_entity.secret_key.into()); + rpc_secret_key_cache + .invalidate(&rpc_key_entity.secret_key.into()) + .await; } if let Ok(non_zero_user_id) = NonZeroU64::try_from(sender_rpc_entity.user_id) { - user_balance_cache.remove(&non_zero_user_id); + user_balance_cache.invalidate(&non_zero_user_id).await; } } @@ -599,7 +601,7 @@ impl BufferedRpcQueryStats { // // In principle, do not remove the cache for the referrer; the next reload will trigger premium // // We don't touch the RPC keys at this stage for the refferer, a payment must be paid to reset those (we want to keep things simple here) // // Anyways, the RPC keys will be updated in 5 min (600 seconds) - // user_balance_cache.remove(&referrer_user_id); + // user_balance_cache.invalidate(&referrer_user_id).await; // } // };