From b61675e9282c0790a499d2ace40355c80d0d0dc8 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Thu, 18 May 2023 13:34:22 -0700 Subject: [PATCH] replace all moka with quick_cache_ttl --- Cargo.lock | 193 +---------------------- TODO.md | 2 +- deferred-rate-limiter/src/lib.rs | 19 +-- quick_cache_ttl/src/cache.rs | 48 +++++- quick_cache_ttl/src/kq_cache.rs | 9 +- quick_cache_ttl/src/lib.rs | 1 + web3_proxy/Cargo.toml | 1 - web3_proxy/src/app/mod.rs | 54 +++---- web3_proxy/src/frontend/authorization.rs | 43 +++-- web3_proxy/src/frontend/errors.rs | 10 -- web3_proxy/src/frontend/mod.rs | 2 +- web3_proxy/src/rpcs/blockchain.rs | 33 ++-- web3_proxy/src/rpcs/consensus.rs | 14 +- web3_proxy/src/rpcs/many.rs | 81 +++++----- web3_proxy/src/rpcs/one.rs | 8 +- web3_proxy/src/rpcs/transactions.rs | 2 +- web3_proxy/src/stats/stat_buffer.rs | 2 +- 17 files changed, 184 insertions(+), 338 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 43ccb657..410724bc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -164,35 +164,6 @@ dependencies = [ "term", ] -[[package]] -name = "async-io" -version = "1.13.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0fc5b45d93ef0529756f812ca52e44c221b35341892d3dcc34132ac02f3dd2af" -dependencies = [ - "async-lock", - "autocfg", - "cfg-if", - "concurrent-queue", - "futures-lite", - "log", - "parking", - "polling", - "rustix", - "slab", - "socket2", - "waker-fn", -] - -[[package]] -name = "async-lock" -version = "2.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa24f727524730b077666307f2734b4a1a1c57acb79193127dcc8914d5242dd7" -dependencies = [ - "event-listener", -] - [[package]] name = "async-stream" version = "0.3.5" @@ -593,12 +564,6 @@ dependencies = [ "syn 1.0.109", ] -[[package]] -name = "bytecount" -version = "0.6.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2c676a478f63e9fa2dd5368a42f28bba0d6c560b775f38583c8bbaa7fcd67c9c" - [[package]] name = "byteorder" version = "1.4.3" @@ -653,19 +618,6 @@ dependencies = [ "serde", ] -[[package]] -name = "cargo_metadata" -version = "0.14.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4acbb09d9ee8e23699b9634375c72795d095bf268439da88562cf9b501f181fa" -dependencies = [ - "camino", - "cargo-platform", - "semver", - "serde", - "serde_json", -] - [[package]] name = "cargo_metadata" version = "0.15.4" @@ -901,15 +853,6 @@ dependencies = [ "tokio-util", ] -[[package]] -name = "concurrent-queue" -version = "2.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62ec6771ecfa0762d24683ee5a32ad78487a3d3afdc0fb8cae19d2c5deb50b7c" -dependencies = [ - "crossbeam-utils", -] - [[package]] name = "console" version = "0.14.1" @@ -1626,15 +1569,6 @@ dependencies = [ "libc", ] -[[package]] -name = "error-chain" -version = "0.12.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2d2f06b9cac1506ece98fe3231e3cc9c4410ec3d5b1f24ae1c8946f0742cdefc" -dependencies = [ - "version_check", -] - [[package]] name = "eth-keystore" version = "0.5.0" @@ -1803,7 +1737,7 @@ checksum = "198ea9efa8480fa69f73d31d41b1601dace13d053c6fe4be6f5878d9dfcf0108" dependencies = [ "arrayvec", "bytes", - "cargo_metadata 0.15.4", + "cargo_metadata", "chrono", "elliptic-curve 0.13.4", "ethabi", @@ -2204,21 +2138,6 @@ version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964" -[[package]] -name = "futures-lite" -version = "1.13.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49a9d51ce47660b1e808d3c990b4709f2f415d928835a17dfd16991515c46bce" -dependencies = [ - "fastrand", - "futures-core", - "futures-io", - "memchr", - "parking", - "pin-project-lite", - "waker-fn", -] - [[package]] name = "futures-locks" version = "0.7.1" @@ -3250,31 +3169,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "moka" -version = "0.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "934030d03f6191edbb4ba16835ccdb80d560788ac686570a8e2986a0fb59ded8" -dependencies = [ - "async-io", - "async-lock", - "crossbeam-channel", - "crossbeam-epoch", - "crossbeam-utils", - "futures-util", - "num_cpus", - "once_cell", - "parking_lot 0.12.1", - "rustc_version", - "scheduled-thread-pool", - "skeptic", - "smallvec", - "tagptr", - "thiserror", - "triomphe", - "uuid 1.3.2", -] - [[package]] name = "nanorand" version = "0.7.0" @@ -3704,12 +3598,6 @@ dependencies = [ "syn 1.0.109", ] -[[package]] -name = "parking" -version = "2.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "14f2252c834a40ed9bb5422029649578e63aa341ac401f74e719dd1afda8394e" - [[package]] name = "parking_lot" version = "0.11.2" @@ -4016,22 +3904,6 @@ version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "26072860ba924cbfa98ea39c8c19b4dd6a4a25423dbdf219c1eca91aa0cf6964" -[[package]] -name = "polling" -version = "2.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4b2d323e8ca7996b3e23126511a523f7e62924d93ecd5ae73b333815b0eb3dce" -dependencies = [ - "autocfg", - "bitflags", - "cfg-if", - "concurrent-queue", - "libc", - "log", - "pin-project-lite", - "windows-sys 0.48.0", -] - [[package]] name = "ppv-lite86" version = "0.2.17" @@ -4203,17 +4075,6 @@ dependencies = [ "syn 1.0.109", ] -[[package]] -name = "pulldown-cmark" -version = "0.9.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2d9cc634bc78768157b5cbfe988ffcd1dcba95cd2b2f03a88316c08c6d00ed63" -dependencies = [ - "bitflags", - "memchr", - "unicase", -] - [[package]] name = "quick_cache" version = "0.3.0" @@ -4777,15 +4638,6 @@ dependencies = [ "windows-sys 0.42.0", ] -[[package]] -name = "scheduled-thread-pool" -version = "0.2.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3cbc66816425a074528352f5789333ecff06ca41b36b0b0efdfbb29edc391a19" -dependencies = [ - "parking_lot 0.12.1", -] - [[package]] name = "scoped-tls" version = "1.0.1" @@ -5400,21 +5252,6 @@ dependencies = [ "time 0.3.21", ] -[[package]] -name = "skeptic" -version = "0.13.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "16d23b015676c90a0f01c197bfdc786c20342c73a0afdda9025adb0bc42940a8" -dependencies = [ - "bytecount", - "cargo_metadata 0.14.2", - "error-chain", - "glob", - "pulldown-cmark", - "tempfile", - "walkdir", -] - [[package]] name = "slab" version = "0.4.8" @@ -5754,12 +5591,6 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" -[[package]] -name = "tagptr" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417" - [[package]] name = "tap" version = "1.0.1" @@ -6310,12 +6141,6 @@ dependencies = [ "tracing-log", ] -[[package]] -name = "triomphe" -version = "0.1.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1ee9bd9239c339d714d657fac840c6d2a4f9c45f4f9ec7b0975113458be78db" - [[package]] name = "try-lock" version = "0.2.4" @@ -6400,15 +6225,6 @@ dependencies = [ "libc", ] -[[package]] -name = "unicase" -version = "2.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50f37be617794602aabbeee0be4f259dc1778fabe05e2d67ee8f79326d5cb4f6" -dependencies = [ - "version_check", -] - [[package]] name = "unicode-bidi" version = "0.3.13" @@ -6531,12 +6347,6 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" -[[package]] -name = "waker-fn" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca" - [[package]] name = "walkdir" version = "2.3.3" @@ -6699,7 +6509,6 @@ dependencies = [ "log", "migration", "mimalloc", - "moka", "num", "num-traits", "once_cell", diff --git a/TODO.md b/TODO.md index 195c293d..5bab04ae 100644 --- a/TODO.md +++ b/TODO.md @@ -128,7 +128,7 @@ These are roughly in order of completition - this was intentional so that recently confirmed transactions go to a server that is more likely to have the tx. - but under heavy load, we hit their rate limits. need a "retry_until_success" function that goes to balanced_rpcs. or maybe store in redis the txids that we broadcast privately and use that to route. - [x] some of the DashMaps grow unbounded! Make/find a "SizedDashMap" that cleans up old rows with some garbage collection task - - moka is exactly what we need + - moka has all the features that we need and more - [x] if block data limit is 0, say Unknown in Debug output - [x] basic request method stats (using the user_id and other fields that are in the tracing frame) - [x] refactor from_anyhow_error to have consistent error codes and http codes. maybe implement the Error trait diff --git a/deferred-rate-limiter/src/lib.rs b/deferred-rate-limiter/src/lib.rs index 65f320a3..99cf5a2e 100644 --- a/deferred-rate-limiter/src/lib.rs +++ b/deferred-rate-limiter/src/lib.rs @@ -1,6 +1,6 @@ //#![warn(missing_docs)] use log::error; -use quick_cache_ttl::{CacheWithTTL, UnitWeighter}; +use quick_cache_ttl::CacheWithTTL; use redis_rate_limiter::{RedisRateLimitResult, RedisRateLimiter}; use std::cmp::Eq; use std::fmt::{Debug, Display}; @@ -16,8 +16,7 @@ pub struct DeferredRateLimiter where K: Send + Sync, { - local_cache: - CacheWithTTL, UnitWeighter, hashbrown::hash_map::DefaultHashBuilder>, + local_cache: CacheWithTTL>, prefix: String, rrl: RedisRateLimiter, /// if None, defers to the max on rrl @@ -46,18 +45,8 @@ where // TODO: time to live is not exactly right. we want this ttl counter to start only after redis is down. this works for now // TODO: what do these weigh? // TODO: allow skipping max_capacity - let local_cache = CacheWithTTL::new( - cache_size, - cache_size as u64, - UnitWeighter, - hashbrown::hash_map::DefaultHashBuilder::default(), - Duration::from_secs(ttl), - ) - .await; - // .time_to_live(Duration::from_secs(ttl)) - // .max_capacity(cache_size) - // .name(prefix) - // .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()); + let local_cache = + CacheWithTTL::new_with_capacity(cache_size, Duration::from_secs(ttl)).await; Self { local_cache, diff --git a/quick_cache_ttl/src/cache.rs b/quick_cache_ttl/src/cache.rs index 6afceb23..6f2ac650 100644 --- a/quick_cache_ttl/src/cache.rs +++ b/quick_cache_ttl/src/cache.rs @@ -2,26 +2,61 @@ use quick_cache::{DefaultHashBuilder, UnitWeighter, Weighter}; use std::{ future::Future, hash::{BuildHasher, Hash}, + sync::Arc, time::Duration, }; use crate::{KQCacheWithTTL, PlaceholderGuardWithTTL}; -pub struct CacheWithTTL(KQCacheWithTTL); +pub struct CacheWithTTL( + KQCacheWithTTL, +); impl CacheWithTTL { - pub async fn new_with_unit_weights(estimated_items_capacity: usize, ttl: Duration) -> Self { + pub async fn new_with_capacity(capacity: usize, ttl: Duration) -> Self { Self::new( - estimated_items_capacity, - estimated_items_capacity as u64, + capacity, + capacity as u64, UnitWeighter, DefaultHashBuilder::default(), ttl, ) .await } + + pub async fn arc_with_capacity(capacity: usize, ttl: Duration) -> Arc { + let x = Self::new_with_capacity(capacity, ttl).await; + + Arc::new(x) + } +} + +impl< + Key: Eq + Hash + Clone + Send + Sync + 'static, + Val: Clone + Send + Sync + 'static, + We: Weighter + Clone + Send + Sync + 'static, + B: BuildHasher + Clone + Default + Send + Sync + 'static, + > CacheWithTTL +{ + pub async fn new_with_weights( + estimated_items_capacity: usize, + weight_capacity: u64, + weighter: We, + ttl: Duration, + ) -> Self { + let inner = KQCacheWithTTL::new( + estimated_items_capacity, + weight_capacity, + weighter, + B::default(), + ttl, + ) + .await; + + Self(inner) + } } impl< @@ -50,6 +85,11 @@ impl< Self(inner) } + #[inline] + pub fn get(&self, key: &Key) -> Option { + self.0.get(key, &()) + } + #[inline] pub async fn get_or_insert_async(&self, key: &Key, f: Fut) -> Result where diff --git a/quick_cache_ttl/src/kq_cache.rs b/quick_cache_ttl/src/kq_cache.rs index 5859cb8d..e566c127 100644 --- a/quick_cache_ttl/src/kq_cache.rs +++ b/quick_cache_ttl/src/kq_cache.rs @@ -8,10 +8,10 @@ use tokio::task::JoinHandle; use tokio::time::{sleep_until, Instant}; pub struct KQCacheWithTTL { - pub(crate) cache: Arc>, + cache: Arc>, pub task_handle: JoinHandle<()>, ttl: Duration, - pub(crate) tx: flume::Sender<(Instant, Key, Qey)>, + tx: flume::Sender<(Instant, Key, Qey)>, } struct KQCacheWithTTLTask { @@ -68,6 +68,11 @@ impl< } } + #[inline] + pub fn get(&self, key: &Key, qey: &Qey) -> Option { + self.cache.get(key, qey) + } + #[inline] pub async fn get_or_insert_async(&self, key: &Key, qey: &Qey, f: Fut) -> Result where diff --git a/quick_cache_ttl/src/lib.rs b/quick_cache_ttl/src/lib.rs index b2a05332..ada0c5b0 100644 --- a/quick_cache_ttl/src/lib.rs +++ b/quick_cache_ttl/src/lib.rs @@ -3,4 +3,5 @@ mod kq_cache; pub use cache::CacheWithTTL; pub use kq_cache::{KQCacheWithTTL, PlaceholderGuardWithTTL}; +pub use quick_cache::sync::{Cache, KQCache}; pub use quick_cache::{DefaultHashBuilder, UnitWeighter, Weighter}; diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index bca468c6..1c6b6003 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -62,7 +62,6 @@ itertools = "0.10.5" listenfd = "1.0.1" log = "0.4.17" mimalloc = { version = "0.1.37", optional = true} -moka = { version = "0.11.0", default-features = false, features = ["future"] } num = "0.4.0" num-traits = "0.2.15" once_cell = { version = "1.17.1" } diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index ed77cfe8..bdc70c90 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -46,7 +46,7 @@ use migration::sea_orm::{ }; use migration::sea_query::table::ColumnDef; use migration::{Alias, DbErr, Migrator, MigratorTrait, Table}; -use moka::future::Cache; +use quick_cache_ttl::{Cache, CacheWithTTL}; use redis_rate_limiter::redis::AsyncCommands; use redis_rate_limiter::{redis, DeadpoolRuntime, RedisConfig, RedisPool, RedisRateLimiter}; use serde::Serialize; @@ -61,7 +61,6 @@ use std::time::Duration; use tokio::sync::{broadcast, watch, Semaphore}; use tokio::task::JoinHandle; use tokio::time::{sleep, timeout}; -use ulid::Ulid; // TODO: make this customizable? // TODO: include GIT_REF in here. i had trouble getting https://docs.rs/vergen/latest/vergen/ to work with a workspace. also .git is in .dockerignore @@ -127,9 +126,8 @@ impl DatabaseReplica { } } -// TODO: this should be a the secret key id, not the key itself! -pub type RpcSecretKeyCache = - Cache; +/// Cache data from the database about rpc keys +pub type RpcSecretKeyCache = Arc>; /// The application // TODO: i'm sure this is more arcs than necessary, but spawning futures makes references hard @@ -161,7 +159,7 @@ pub struct Web3ProxyApp { pub hostname: Option, /// store pending transactions that we've seen so that we don't send duplicates to subscribers /// TODO: think about this more. might be worth storing if we sent the transaction or not and using this for automatic retries - pub pending_transactions: Cache, + pub pending_transactions: Arc>, /// rate limit anonymous users pub frontend_ip_rate_limiter: Option>, /// rate limit authenticated users @@ -178,13 +176,11 @@ pub struct Web3ProxyApp { // TODO: should the key be our RpcSecretKey class instead of Ulid? pub rpc_secret_key_cache: RpcSecretKeyCache, /// concurrent/parallel RPC request limits for authenticated users - pub registered_user_semaphores: - Cache, hashbrown::hash_map::DefaultHashBuilder>, + pub rpc_key_semaphores: Cache>, /// concurrent/parallel request limits for anonymous users - pub ip_semaphores: Cache, hashbrown::hash_map::DefaultHashBuilder>, + pub ip_semaphores: Cache>, /// concurrent/parallel application request limits for authenticated users - pub bearer_token_semaphores: - Cache, hashbrown::hash_map::DefaultHashBuilder>, + pub bearer_token_semaphores: Cache>, pub kafka_producer: Option, /// channel for sending stats in a background task pub stat_sender: Option>, @@ -510,10 +506,8 @@ impl Web3ProxyApp { // if there is no database of users, there will be no keys and so this will be empty // TODO: max_capacity from config // TODO: ttl from config - let rpc_secret_key_cache = Cache::builder() - .max_capacity(10_000) - .time_to_live(Duration::from_secs(600)) - .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()); + let rpc_secret_key_cache = + CacheWithTTL::arc_with_capacity(10_000, Duration::from_secs(600)).await; // create a channel for receiving stats // we do this in a channel so we don't slow down our response to the users @@ -603,13 +597,11 @@ impl Web3ProxyApp { // TODO: capacity from configs // all these are the same size, so no need for a weigher // TODO: this used to have a time_to_idle - let pending_transactions = Cache::builder() - .max_capacity(10_000) - // TODO: different chains might handle this differently - // TODO: what should we set? 5 minutes is arbitrary. the nodes themselves hold onto transactions for much longer - // TODO: this used to be time_to_update, but - .time_to_live(Duration::from_secs(300)) - .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()); + // TODO: different chains might handle this differently + // TODO: what should we set? 5 minutes is arbitrary. the nodes themselves hold onto transactions for much longer + // TODO: this used to be time_to_update, but + let pending_transactions = + CacheWithTTL::arc_with_capacity(10_000, Duration::from_secs(300)).await; // responses can be very different in sizes, so this is a cache with a max capacity and a weigher // TODO: don't allow any response to be bigger than X% of the cache @@ -624,17 +616,15 @@ impl Web3ProxyApp { ) .await; + // TODO: how should we handle hitting this max? + let max_users = 20_000; + // create semaphores for concurrent connection limits + // TODO: how can we implement time til idle? // TODO: what should tti be for semaphores? - let bearer_token_semaphores = Cache::builder() - .time_to_idle(Duration::from_secs(120)) - .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()); - let ip_semaphores = Cache::builder() - .time_to_idle(Duration::from_secs(120)) - .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()); - let registered_user_semaphores = Cache::builder() - .time_to_idle(Duration::from_secs(120)) - .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()); + let bearer_token_semaphores = Cache::new(max_users); + let ip_semaphores = Cache::new(max_users); + let registered_user_semaphores = Cache::new(max_users); let (balanced_rpcs, balanced_handle, consensus_connections_watcher) = Web3Rpcs::spawn( top_config.app.chain_id, @@ -745,7 +735,7 @@ impl Web3ProxyApp { rpc_secret_key_cache, bearer_token_semaphores, ip_semaphores, - registered_user_semaphores, + rpc_key_semaphores: registered_user_semaphores, stat_sender, }; diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index 13403ac7..51ad4834 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -28,7 +28,9 @@ use rdkafka::producer::{FutureProducer, FutureRecord}; use rdkafka::util::Timeout as KafkaTimeout; use redis_rate_limiter::redis::AsyncCommands; use redis_rate_limiter::RedisRateLimitResult; +use std::convert::Infallible; use std::fmt::Display; +use std::hash::Hash; use std::mem; use std::sync::atomic::{self, AtomicBool, AtomicI64, AtomicU64, AtomicUsize}; use std::time::Duration; @@ -47,6 +49,15 @@ pub enum RpcSecretKey { Uuid(Uuid), } +impl Hash for RpcSecretKey { + fn hash(&self, state: &mut H) { + match self { + Self::Ulid(x) => state.write_u128(x.0), + Self::Uuid(x) => state.write_u128(x.as_u128()), + } + } +} + /// TODO: should this have IpAddr and Origin or AuthorizationChecks? #[derive(Debug)] pub enum RateLimitResult { @@ -872,12 +883,12 @@ impl Web3ProxyApp { if let Some(max_concurrent_requests) = self.config.public_max_concurrent_requests { let semaphore = self .ip_semaphores - .get_with_by_ref(ip, async move { + .get_or_insert_async::(ip, async move { // TODO: set max_concurrent_requests dynamically based on load? let s = Semaphore::new(max_concurrent_requests); - Arc::new(s) + Ok(Arc::new(s)) }) - .await; + .await?; // if semaphore.available_permits() == 0 { // // TODO: concurrent limit hit! emit a stat? less important for anon users @@ -901,17 +912,17 @@ impl Web3ProxyApp { let user_id = authorization_checks .user_id .try_into() - .or(Err(Web3ProxyError::UserIdZero)) - .web3_context("user ids should always be non-zero")?; + .or(Err(Web3ProxyError::UserIdZero))?; let semaphore = self - .registered_user_semaphores - .get_with(user_id, async move { + .rpc_key_semaphores + .get_or_insert_async(&user_id, async move { let s = Semaphore::new(max_concurrent_requests as usize); // trace!("new semaphore for user_id {}", user_id); - Arc::new(s) + Ok::<_, Infallible>(Arc::new(s)) }) - .await; + .await + .unwrap(); // if semaphore.available_permits() == 0 { // // TODO: concurrent limit hit! emit a stat? this has a race condition though. @@ -939,11 +950,12 @@ impl Web3ProxyApp { // limit concurrent requests let semaphore = self .bearer_token_semaphores - .get_with_by_ref(&user_bearer_token, async move { + .get_or_insert_async::(&user_bearer_token, async move { let s = Semaphore::new(self.config.bearer_token_max_concurrent_requests as usize); - Arc::new(s) + Ok(Arc::new(s)) }) - .await; + .await + .unwrap(); let semaphore_permit = semaphore.acquire_owned().await?; @@ -1086,9 +1098,9 @@ impl Web3ProxyApp { proxy_mode: ProxyMode, rpc_secret_key: RpcSecretKey, ) -> Web3ProxyResult { - let authorization_checks: Result<_, Arc> = self + let authorization_checks: Result<_, Web3ProxyError> = self .rpc_secret_key_cache - .try_get_with(rpc_secret_key.into(), async move { + .get_or_insert_async(&rpc_secret_key, async move { // trace!(?rpc_secret_key, "user cache miss"); let db_replica = self @@ -1107,6 +1119,7 @@ impl Web3ProxyApp { Some(rpc_key_model) => { // TODO: move these splits into helper functions // TODO: can we have sea orm handle this for us? + // TODO: don't expect. return an application error let user_model = user::Entity::find_by_id(rpc_key_model.user_id) .one(db_replica.conn()) .await? @@ -1209,7 +1222,7 @@ impl Web3ProxyApp { }) .await; - authorization_checks.map_err(Web3ProxyError::Arc) + authorization_checks } /// Authorized the ip/origin/referer/useragent and rate limit and concurrency diff --git a/web3_proxy/src/frontend/errors.rs b/web3_proxy/src/frontend/errors.rs index e16b674e..9126526e 100644 --- a/web3_proxy/src/frontend/errors.rs +++ b/web3_proxy/src/frontend/errors.rs @@ -5,7 +5,6 @@ use crate::jsonrpc::{JsonRpcErrorData, JsonRpcForwardedResponse}; use crate::response_cache::JsonRpcResponseData; use std::error::Error; -use std::sync::Arc; use std::{borrow::Cow, net::IpAddr}; use axum::{ @@ -33,13 +32,11 @@ impl From for Web3ProxyResult<()> { } } -// TODO: #[derive(Debug, Display, Error, From)] pub enum Web3ProxyError { AccessDenied, #[error(ignore)] Anyhow(anyhow::Error), - Arc(Arc), #[error(ignore)] #[from(ignore)] BadRequest(String), @@ -685,13 +682,6 @@ impl Web3ProxyError { }, ) } - Self::Arc(err) => { - return match Arc::try_unwrap(err) { - Ok(err) => err, - Err(err) => Self::Anyhow(anyhow::anyhow!("{}", err)), - } - .into_response_parts(); - } Self::SemaphoreAcquireError(err) => { warn!("semaphore acquire err={:?}", err); ( diff --git a/web3_proxy/src/frontend/mod.rs b/web3_proxy/src/frontend/mod.rs index 9eb2f6d9..e1496960 100644 --- a/web3_proxy/src/frontend/mod.rs +++ b/web3_proxy/src/frontend/mod.rs @@ -56,7 +56,7 @@ pub async fn serve( let response_cache_size = ResponseCacheKey::COUNT; let response_cache = - ResponseCache::new_with_unit_weights(response_cache_size, Duration::from_secs(1)).await; + ResponseCache::new_with_capacity(response_cache_size, Duration::from_secs(1)).await; // TODO: read config for if fastest/versus should be available publicly. default off diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index 828eacb3..ff2c08c4 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -10,10 +10,11 @@ use crate::{config::BlockAndRpc, jsonrpc::JsonRpcRequest}; use derive_more::From; use ethers::prelude::{Block, TxHash, H256, U64}; use log::{debug, trace, warn, Level}; -use moka::future::Cache; +use quick_cache_ttl::CacheWithTTL; use serde::ser::SerializeStruct; use serde::Serialize; use serde_json::json; +use std::convert::Infallible; use std::hash::Hash; use std::{cmp::Ordering, fmt::Display, sync::Arc}; use tokio::sync::broadcast; @@ -22,7 +23,8 @@ use tokio::time::Duration; // TODO: type for Hydrated Blocks with their full transactions? pub type ArcBlock = Arc>; -pub type BlocksByHashCache = Cache; +pub type BlocksByHashCache = Arc>; +pub type BlocksByNumberCache = Arc>; /// A block and its age. #[derive(Clone, Debug, Default, From)] @@ -168,9 +170,7 @@ impl Web3Rpcs { heaviest_chain: bool, ) -> Web3ProxyResult { // TODO: i think we can rearrange this function to make it faster on the hot path - let block_hash = block.hash(); - - if block_hash.is_zero() { + if block.hash().is_zero() { debug!("Skipping block without hash!"); return Ok(block); } @@ -182,15 +182,18 @@ impl Web3Rpcs { // this is the only place that writes to block_numbers // multiple inserts should be okay though // TODO: info that there was a fork? - self.blocks_by_number.insert(*block_num, *block_hash).await; + self.blocks_by_number.insert(*block_num, *block.hash()); } // this block is very likely already in block_hashes // TODO: use their get_with + let block_hash = *block.hash(); + let block = self .blocks_by_hash - .get_with(*block_hash, async move { block }) - .await; + .get_or_insert_async::(&block_hash, async move { Ok(block) }) + .await + .unwrap(); Ok(block) } @@ -423,7 +426,7 @@ impl Web3Rpcs { return Ok(()); } - let new_synced_connections = match consensus_finder + let new_consensus_rpcs = match consensus_finder .find_consensus_connections(authorization, self) .await { @@ -436,21 +439,21 @@ impl Web3Rpcs { Ok(Some(x)) => x, }; - trace!("new_synced_connections: {:#?}", new_synced_connections); + trace!("new_synced_connections: {:#?}", new_consensus_rpcs); let watch_consensus_head_sender = self.watch_consensus_head_sender.as_ref().unwrap(); - let consensus_tier = new_synced_connections.tier; + let consensus_tier = new_consensus_rpcs.tier; // TODO: think more about this unwrap let total_tiers = consensus_finder.worst_tier().unwrap_or(10); - let backups_needed = new_synced_connections.backups_needed; - let consensus_head_block = new_synced_connections.head_block.clone(); - let num_consensus_rpcs = new_synced_connections.num_consensus_rpcs(); + let backups_needed = new_consensus_rpcs.backups_needed; + let consensus_head_block = new_consensus_rpcs.head_block.clone(); + let num_consensus_rpcs = new_consensus_rpcs.num_consensus_rpcs(); let num_active_rpcs = consensus_finder.len(); let total_rpcs = self.by_name.load().len(); let old_consensus_head_connections = self .watch_consensus_rpcs_sender - .send_replace(Some(Arc::new(new_synced_connections))); + .send_replace(Some(Arc::new(new_consensus_rpcs))); let backups_voted_str = if backups_needed { "B " } else { "" }; diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index 90385901..806dc448 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -8,10 +8,11 @@ use ethers::prelude::{H256, U64}; use hashbrown::{HashMap, HashSet}; use itertools::{Itertools, MinMaxResult}; use log::{debug, trace, warn}; -use moka::future::Cache; +use quick_cache_ttl::Cache; use serde::Serialize; use std::cmp::{Ordering, Reverse}; use std::collections::BTreeMap; +use std::convert::Infallible; use std::fmt; use std::sync::Arc; use tokio::time::Instant; @@ -321,7 +322,7 @@ impl Web3Rpcs { } } -type FirstSeenCache = Cache; +type FirstSeenCache = Cache; /// A ConsensusConnections builder that tracks all connection heads across multiple groups of servers pub struct ConsensusFinder { @@ -342,9 +343,7 @@ impl ConsensusFinder { pub fn new(max_block_age: Option, max_block_lag: Option) -> Self { // TODO: what's a good capacity for this? it shouldn't need to be very large // TODO: if we change Web3ProxyBlock to store the instance, i think we could use the block_by_hash cache - let first_seen = Cache::builder() - .max_capacity(16) - .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()); + let first_seen = Cache::new(16); // TODO: hard coding 0-9 isn't great, but its easier than refactoring this to be smart about config reloading let rpc_heads = HashMap::new(); @@ -372,8 +371,9 @@ impl ConsensusFinder { async fn insert(&mut self, rpc: Arc, block: Web3ProxyBlock) -> Option { let first_seen = self .first_seen - .get_with_by_ref(block.hash(), async { Instant::now() }) - .await; + .get_or_insert_async::(block.hash(), async { Ok(Instant::now()) }) + .await + .unwrap(); // calculate elapsed time before trying to lock let latency = first_seen.elapsed(); diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 6030a7d4..6311b5b7 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -1,5 +1,5 @@ ///! Load balanced communication with a group of web3 rpc providers -use super::blockchain::{BlocksByHashCache, Web3ProxyBlock}; +use super::blockchain::{BlocksByHashCache, BlocksByNumberCache, Web3ProxyBlock}; use super::consensus::{ConsensusWeb3Rpcs, ShouldWaitForBlock}; use super::one::Web3Rpc; use super::request::{OpenRequestHandle, OpenRequestResult, RequestErrorHandler}; @@ -16,7 +16,7 @@ use anyhow::Context; use arc_swap::ArcSwap; use counter::Counter; use derive_more::From; -use ethers::prelude::{ProviderError, TxHash, H256, U64}; +use ethers::prelude::{ProviderError, TxHash, U64}; use futures::future::try_join_all; use futures::stream::FuturesUnordered; use futures::StreamExt; @@ -24,8 +24,8 @@ use hashbrown::{HashMap, HashSet}; use itertools::Itertools; use log::{debug, error, info, trace, warn, Level}; use migration::sea_orm::DatabaseConnection; -use moka::future::Cache; use ordered_float::OrderedFloat; +use quick_cache_ttl::CacheWithTTL; use serde::ser::{SerializeStruct, Serializer}; use serde::Serialize; use serde_json::json; @@ -58,15 +58,14 @@ pub struct Web3Rpcs { /// this head receiver makes it easy to wait until there is a new block pub(super) watch_consensus_head_sender: Option>>, /// keep track of transactions that we have sent through subscriptions - pub(super) pending_transaction_cache: - Cache, + pub(super) pending_transaction_cache: Arc>, pub(super) pending_tx_id_receiver: flume::Receiver, pub(super) pending_tx_id_sender: flume::Sender, /// TODO: this map is going to grow forever unless we do some sort of pruning. maybe store pruned in redis? /// all blocks, including orphans pub(super) blocks_by_hash: BlocksByHashCache, /// blocks on the heaviest chain - pub(super) blocks_by_number: Cache, + pub(super) blocks_by_number: BlocksByNumberCache, /// the number of rpcs required to agree on consensus for the head block (thundering herd protection) pub(super) min_head_rpcs: usize, /// the soft limit required to agree on consensus for the head block. (thundering herd protection) @@ -89,7 +88,7 @@ impl Web3Rpcs { min_head_rpcs: usize, min_sum_soft_limit: u32, name: String, - pending_transaction_cache: Cache, + pending_transaction_cache: Arc>, pending_tx_sender: Option>, watch_consensus_head_sender: Option>>, ) -> anyhow::Result<( @@ -159,24 +158,23 @@ impl Web3Rpcs { }; // these blocks don't have full transactions, but they do have rather variable amounts of transaction hashes - // TODO: how can we do the weigher better? need to know actual allocated size + // TODO: how can we do the weigher this? need to know actual allocated size // TODO: time_to_idle instead? // TODO: limits from config - let blocks_by_hash: BlocksByHashCache = Cache::builder() - .max_capacity(1024 * 1024 * 1024) - .weigher(|_k, v: &Web3ProxyBlock| { - 1 + v.block.transactions.len().try_into().unwrap_or(u32::MAX) - }) - .time_to_live(Duration::from_secs(30 * 60)) - .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()); + let blocks_by_hash: BlocksByHashCache = + Arc::new(CacheWithTTL::new_with_capacity(10_000, Duration::from_secs(30 * 60)).await); + // .max_capacity(1024 * 1024 * 1024) + // .weigher(|_k, v: &Web3ProxyBlock| { + // 1 + v.block.transactions.len().try_into().unwrap_or(u32::MAX) + // }) + // .time_to_live(Duration::from_secs(30 * 60)) + // .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()); // all block numbers are the same size, so no need for weigher // TODO: limits from config // TODO: time_to_idle instead? - let blocks_by_number = Cache::builder() - .time_to_live(Duration::from_secs(30 * 60)) - .max_capacity(10_000) - .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()); + let blocks_by_number = + Arc::new(CacheWithTTL::new_with_capacity(10_000, Duration::from_secs(30 * 60)).await); let (watch_consensus_rpcs_sender, consensus_connections_watcher) = watch::channel(Default::default()); @@ -264,7 +262,7 @@ impl Web3Rpcs { }; let pending_tx_id_sender = Some(self.pending_tx_id_sender.clone()); - let blocks_by_hash = self.blocks_by_hash.clone(); + let blocks_by_hash_cache = self.blocks_by_hash.clone(); let http_interval_sender = self.http_interval_sender.clone(); let chain_id = app.config.chain_id; @@ -277,7 +275,7 @@ impl Web3Rpcs { chain_id, http_client, http_interval_sender, - blocks_by_hash, + blocks_by_hash_cache, block_sender, pending_tx_id_sender, true, @@ -1249,6 +1247,7 @@ mod tests { use crate::rpcs::consensus::ConsensusFinder; use crate::rpcs::{blockchain::Web3ProxyBlock, provider::Web3Provider}; use arc_swap::ArcSwap; + use ethers::types::H256; use ethers::types::{Block, U256}; use latency::PeakEwmaLatency; use log::{trace, LevelFilter}; @@ -1436,14 +1435,15 @@ mod tests { name: "test".to_string(), watch_consensus_head_sender: Some(watch_consensus_head_sender), watch_consensus_rpcs_sender, - pending_transaction_cache: Cache::builder() - .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()), + pending_transaction_cache: CacheWithTTL::arc_with_capacity( + 100, + Duration::from_secs(60), + ) + .await, pending_tx_id_receiver, pending_tx_id_sender, - blocks_by_hash: Cache::builder() - .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()), - blocks_by_number: Cache::builder() - .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()), + blocks_by_hash: CacheWithTTL::arc_with_capacity(100, Duration::from_secs(60)).await, + blocks_by_number: CacheWithTTL::arc_with_capacity(100, Duration::from_secs(60)).await, // TODO: test max_block_age? max_block_age: None, // TODO: test max_block_lag? @@ -1688,14 +1688,15 @@ mod tests { name: "test".to_string(), watch_consensus_head_sender: Some(watch_consensus_head_sender), watch_consensus_rpcs_sender, - pending_transaction_cache: Cache::builder() - .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()), + pending_transaction_cache: CacheWithTTL::arc_with_capacity( + 100, + Duration::from_secs(120), + ) + .await, pending_tx_id_receiver, pending_tx_id_sender, - blocks_by_hash: Cache::builder() - .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()), - blocks_by_number: Cache::builder() - .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()), + blocks_by_hash: CacheWithTTL::arc_with_capacity(100, Duration::from_secs(120)).await, + blocks_by_number: CacheWithTTL::arc_with_capacity(100, Duration::from_secs(120)).await, min_head_rpcs: 1, min_sum_soft_limit: 4_000, max_block_age: None, @@ -1853,14 +1854,16 @@ mod tests { name: "test".to_string(), watch_consensus_head_sender: Some(watch_consensus_head_sender), watch_consensus_rpcs_sender, - pending_transaction_cache: Cache::builder() - .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()), + pending_transaction_cache: CacheWithTTL::arc_with_capacity( + 10_000, + Duration::from_secs(120), + ) + .await, pending_tx_id_receiver, pending_tx_id_sender, - blocks_by_hash: Cache::builder() - .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()), - blocks_by_number: Cache::builder() - .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()), + blocks_by_hash: CacheWithTTL::arc_with_capacity(10_000, Duration::from_secs(120)).await, + blocks_by_number: CacheWithTTL::arc_with_capacity(10_000, Duration::from_secs(120)) + .await, min_head_rpcs: 1, min_sum_soft_limit: 1_000, max_block_age: None, diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index b37a4425..6095c872 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -23,6 +23,7 @@ use serde::Serialize; use serde_json::json; use std::borrow::Cow; use std::cmp::min; +use std::convert::Infallible; use std::fmt; use std::hash::{Hash, Hasher}; use std::sync::atomic::{self, AtomicBool, AtomicU64, AtomicUsize}; @@ -622,8 +623,11 @@ impl Web3Rpc { // if we already have this block saved, set new_head_block to that arc. otherwise store this copy let new_head_block = block_map - .get_with(new_hash, async move { new_head_block }) - .await; + .get_or_insert_async::( + &new_hash, + async move { Ok(new_head_block) }, + ) + .await?; // save the block so we don't send the same one multiple times // also save so that archive checks can know how far back to query diff --git a/web3_proxy/src/rpcs/transactions.rs b/web3_proxy/src/rpcs/transactions.rs index 687c5224..d8c007ee 100644 --- a/web3_proxy/src/rpcs/transactions.rs +++ b/web3_proxy/src/rpcs/transactions.rs @@ -82,7 +82,7 @@ impl Web3Rpcs { } // trace!(?pending_tx_id, "checking pending_transactions on {}", rpc); - if self.pending_transaction_cache.contains_key(&pending_tx_id) { + if self.pending_transaction_cache.get(&pending_tx_id).is_some() { // this transaction has already been processed return Ok(()); } diff --git a/web3_proxy/src/stats/stat_buffer.rs b/web3_proxy/src/stats/stat_buffer.rs index d7a5dea1..f8fd2db8 100644 --- a/web3_proxy/src/stats/stat_buffer.rs +++ b/web3_proxy/src/stats/stat_buffer.rs @@ -4,7 +4,7 @@ use derive_more::From; use futures::stream; use hashbrown::HashMap; use influxdb2::api::write::TimestampPrecision; -use log::{debug, error, info, trace}; +use log::{error, info, trace}; use migration::sea_orm::prelude::Decimal; use migration::sea_orm::DatabaseConnection; use std::time::Duration;