From f0a9f0307654d8cd6ded38f6f86abfe1fbbd4a48 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Wed, 7 Jun 2023 10:57:01 -0700 Subject: [PATCH 1/5] smaller caches --- web3_proxy/src/rpcs/many.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 4ee3aadc..9c8fd5c7 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -98,14 +98,14 @@ impl Web3Rpcs { // TODO: actual weighter on this // TODO: time_to_idle instead? let blocks_by_hash: BlocksByHashCache = Arc::new( - CacheWithTTL::new("blocks_by_hash", 10_000, Duration::from_secs(30 * 60)).await, + CacheWithTTL::new("blocks_by_hash", 1_000, Duration::from_secs(30 * 60)).await, ); // all block numbers are the same size, so no need for weigher // TODO: limits from config // TODO: time_to_idle instead? let blocks_by_number = Arc::new( - CacheWithTTL::new("blocks_by_number", 10_000, Duration::from_secs(30 * 60)).await, + CacheWithTTL::new("blocks_by_number", 1_000, Duration::from_secs(30 * 60)).await, ); let (watch_consensus_rpcs_sender, consensus_connections_watcher) = From aa57dd2fcf961a3fbfd3579800b12001d6debad0 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Wed, 7 Jun 2023 10:48:55 -0700 Subject: [PATCH 2/5] serialize the caches on /status --- Cargo.lock | 1 + quick_cache_ttl/Cargo.toml | 1 + quick_cache_ttl/src/cache.rs | 44 ++++++++++++++++++++++--------- quick_cache_ttl/src/kq_cache.rs | 38 +++++++++++++++++++------- web3_proxy/src/frontend/status.rs | 9 ++++--- web3_proxy/src/rpcs/many.rs | 2 +- 6 files changed, 69 insertions(+), 26 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9e67990b..29ef2fa2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4033,6 +4033,7 @@ dependencies = [ "flume", "log", "quick_cache", + "serde", "tokio", ] diff --git a/quick_cache_ttl/Cargo.toml b/quick_cache_ttl/Cargo.toml index 8df44b3e..6e0da4f4 100644 --- a/quick_cache_ttl/Cargo.toml +++ b/quick_cache_ttl/Cargo.toml @@ -9,6 +9,7 @@ edition = "2021" flume = "0.10.14" log = "0.4.18" quick_cache = "0.3.0" +serde = "1" tokio = { version = "1.28.2", features = ["full"] } [dev-dependencies] diff --git a/quick_cache_ttl/src/cache.rs b/quick_cache_ttl/src/cache.rs index 418d6082..fd14c03b 100644 --- a/quick_cache_ttl/src/cache.rs +++ b/quick_cache_ttl/src/cache.rs @@ -1,4 +1,6 @@ +use crate::{KQCacheWithTTL, PlaceholderGuardWithTTL}; use quick_cache::{DefaultHashBuilder, UnitWeighter, Weighter}; +use serde::{Serialize, Serializer}; use std::{ fmt::Debug, future::Future, @@ -8,8 +10,6 @@ use std::{ time::Duration, }; -use crate::{KQCacheWithTTL, PlaceholderGuardWithTTL}; - pub struct CacheWithTTL( KQCacheWithTTL, ); @@ -110,14 +110,6 @@ impl< self.0.get_or_insert_async(key, &(), f).await } - #[inline] - pub async fn try_get_or_insert_async(&self, key: &Key, f: Fut) -> Result - where - Fut: Future>, - { - self.0.try_get_or_insert_async(key, &(), f).await - } - #[inline] pub async fn get_value_or_guard_async( &self, @@ -126,6 +118,16 @@ impl< self.0.get_value_or_guard_async(key, ()).await } + #[inline] + pub fn peek(&self, key: &Key) -> Option { + self.0.peek(key, &()) + } + + #[inline] + pub fn remove(&self, key: &Key) -> bool { + self.0.remove(key, &()) + } + /// if the item was too large to insert, it is returned with the error /// IMPORTANT! Inserting the same key multiple times does NOT reset the TTL! #[inline] @@ -134,7 +136,25 @@ impl< } #[inline] - pub fn remove(&self, key: &Key) -> bool { - self.0.remove(key, &()) + pub async fn try_get_or_insert_async(&self, key: &Key, f: Fut) -> Result + where + Fut: Future>, + { + self.0.try_get_or_insert_async(key, &(), f).await + } +} + +impl< + Key: Clone + Debug + Eq + Hash + Send + Sync + 'static, + Val: Clone + Send + Sync + 'static, + We: Weighter + Clone + Send + Sync + 'static, + B: BuildHasher + Clone + Send + Sync + 'static, + > Serialize for CacheWithTTL +{ + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + self.0.serialize(serializer) } } diff --git a/quick_cache_ttl/src/kq_cache.rs b/quick_cache_ttl/src/kq_cache.rs index 14b68064..1ecb04dd 100644 --- a/quick_cache_ttl/src/kq_cache.rs +++ b/quick_cache_ttl/src/kq_cache.rs @@ -1,6 +1,8 @@ use log::{log_enabled, trace}; use quick_cache::sync::KQCache; use quick_cache::{PlaceholderGuard, Weighter}; +use serde::ser::SerializeStruct; +use serde::{Serialize, Serializer}; use std::convert::Infallible; use std::fmt::Debug; use std::future::Future; @@ -151,11 +153,6 @@ impl< } } - #[inline] - pub fn hits(&self) -> u64 { - self.cache.hits() - } - /// if the item was too large to insert, it is returned with the error /// IMPORTANT! Inserting the same key multiple times does NOT reset the TTL! #[inline] @@ -183,11 +180,6 @@ impl< } } - #[inline] - pub fn misses(&self) -> u64 { - self.cache.misses() - } - #[inline] pub fn peek(&self, key: &Key, qey: &Qey) -> Option { self.cache.peek(key, qey) @@ -268,3 +260,29 @@ impl< self.tx.send((expire_at, self.key, self.qey)).unwrap(); } } + +impl< + Key: Clone + Debug + Eq + Hash + Send + Sync + 'static, + Qey: Clone + Debug + Eq + Hash + Send + Sync + 'static, + Val: Clone + Send + Sync + 'static, + We: Weighter + Clone + Send + Sync + 'static, + B: BuildHasher + Clone + Send + Sync + 'static, + > Serialize for KQCacheWithTTL +{ + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let mut state = serializer.serialize_struct(self.name, 5)?; + + state.serialize_field("len", &self.cache.len())?; + state.serialize_field("weight", &self.cache.weight())?; + + state.serialize_field("capacity", &self.cache.capacity())?; + + state.serialize_field("hits", &self.cache.hits())?; + state.serialize_field("misses", &self.cache.misses())?; + + state.end() + } +} diff --git a/web3_proxy/src/frontend/status.rs b/web3_proxy/src/frontend/status.rs index fe3f0eb5..46d4af15 100644 --- a/web3_proxy/src/frontend/status.rs +++ b/web3_proxy/src/frontend/status.rs @@ -135,12 +135,15 @@ async fn _status(app: Arc) -> (StatusCode, &'static str, Bytes) { // TODO: what else should we include? uptime, cache hit rates, cpu load, memory used // TODO: the hostname is probably not going to change. only get once at the start? let body = json!({ - "version": APP_USER_AGENT, - "chain_id": app.config.chain_id, "balanced_rpcs": app.balanced_rpcs, - "private_rpcs": app.private_rpcs, "bundler_4337_rpcs": app.bundler_4337_rpcs, + "chain_id": app.config.chain_id, "hostname": app.hostname, + "jsonrpc_response_cache": app.jsonrpc_response_cache, + "private_rpcs": app.private_rpcs, + "rpc_secret_key_cache": app.rpc_secret_key_cache, + "user_balance_cache": app.user_balance_cache, + "version": APP_USER_AGENT, }); let body = body.to_string().into_bytes(); diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 9c8fd5c7..3abc8fb6 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -1202,7 +1202,7 @@ impl Serialize for Web3Rpcs { where S: Serializer, { - let mut state = serializer.serialize_struct("Web3Rpcs", 1)?; + let mut state = serializer.serialize_struct("Web3Rpcs", 2)?; { let by_name = self.by_name.load(); From 8249b0aefe4fc1553b2deb5a952437651ee4f7a6 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Wed, 7 Jun 2023 11:18:36 -0700 Subject: [PATCH 3/5] cache isnt on main yet --- web3_proxy/src/frontend/status.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/web3_proxy/src/frontend/status.rs b/web3_proxy/src/frontend/status.rs index 46d4af15..02804f8c 100644 --- a/web3_proxy/src/frontend/status.rs +++ b/web3_proxy/src/frontend/status.rs @@ -142,7 +142,6 @@ async fn _status(app: Arc) -> (StatusCode, &'static str, Bytes) { "jsonrpc_response_cache": app.jsonrpc_response_cache, "private_rpcs": app.private_rpcs, "rpc_secret_key_cache": app.rpc_secret_key_cache, - "user_balance_cache": app.user_balance_cache, "version": APP_USER_AGENT, }); From b50579ea3a9376d2502784f064f3bcf76d87d7f6 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Wed, 7 Jun 2023 12:04:39 -0700 Subject: [PATCH 4/5] add block caches to web3rpcs serializer --- web3_proxy/src/rpcs/many.rs | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 3abc8fb6..72f4c082 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -1202,7 +1202,7 @@ impl Serialize for Web3Rpcs { where S: Serializer, { - let mut state = serializer.serialize_struct("Web3Rpcs", 2)?; + let mut state = serializer.serialize_struct("Web3Rpcs", 4)?; { let by_name = self.by_name.load(); @@ -1222,12 +1222,9 @@ impl Serialize for Web3Rpcs { } } - // self.blocks_by_hash.sync(); - // self.blocks_by_number.sync(); - // state.serialize_field("block_hashes_count", &self.blocks_by_hash.entry_count())?; - // state.serialize_field("block_hashes_size", &self.blocks_by_hash.weighted_size())?; - // state.serialize_field("block_numbers_count", &self.blocks_by_number.entry_count())?; - // state.serialize_field("block_numbers_size", &self.blocks_by_number.weighted_size())?; + state.serialize_field("blocks_by_hash", &self.blocks_by_hash)?; + state.serialize_field("blocks_by_number", &self.blocks_by_number)?; + state.end() } } From 91d288315f9e9e0e13b6db830c22ef0afb5a81f5 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Wed, 7 Jun 2023 13:05:15 -0700 Subject: [PATCH 5/5] add queue lengths to status page --- web3_proxy/src/rpcs/many.rs | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 72f4c082..905906e7 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -1202,7 +1202,7 @@ impl Serialize for Web3Rpcs { where S: Serializer, { - let mut state = serializer.serialize_struct("Web3Rpcs", 4)?; + let mut state = serializer.serialize_struct("Web3Rpcs", 8)?; { let by_name = self.by_name.load(); @@ -1224,6 +1224,20 @@ impl Serialize for Web3Rpcs { state.serialize_field("blocks_by_hash", &self.blocks_by_hash)?; state.serialize_field("blocks_by_number", &self.blocks_by_number)?; + state.serialize_field("pending_transaction_cache", &self.pending_transaction_cache)?; + + state.serialize_field("block_sender_len", &self.block_sender.len())?; + + state.serialize_field( + "watch_consensus_rpcs_receivers", + &self.watch_consensus_rpcs_sender.receiver_count(), + )?; + + if let Some(ref x) = self.watch_consensus_head_sender { + state.serialize_field("watch_consensus_head_receivers", &x.receiver_count())?; + } else { + state.serialize_field("watch_consensus_head_receivers", &None::<()>)?; + } state.end() }