From 05af0551c9b5eb1fa1cdae20e12da1e16e896350 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Sat, 17 Sep 2022 02:17:20 +0000 Subject: [PATCH] use ahash this might actually be slower. do real benchmarks --- Cargo.lock | 1 + deferred-rate-limiter/src/lib.rs | 6 ++---- web3_proxy/Cargo.toml | 1 + web3_proxy/src/app.rs | 30 ++++++++++++++++++++---------- web3_proxy/src/config.rs | 4 ++-- web3_proxy/src/rpcs/blockchain.rs | 2 +- web3_proxy/src/rpcs/connection.rs | 10 +++++----- web3_proxy/src/rpcs/connections.rs | 22 ++++++++++++++-------- 8 files changed, 46 insertions(+), 30 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bb2fda05..0459bae4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5544,6 +5544,7 @@ dependencies = [ name = "web3_proxy" version = "0.1.0" dependencies = [ + "ahash 0.8.0", "anyhow", "arc-swap", "argh", diff --git a/deferred-rate-limiter/src/lib.rs b/deferred-rate-limiter/src/lib.rs index ed2bcd55..573382ec 100644 --- a/deferred-rate-limiter/src/lib.rs +++ b/deferred-rate-limiter/src/lib.rs @@ -34,14 +34,12 @@ where pub fn new(cache_size: u64, prefix: &str, rrl: RedisRateLimiter) -> Self { let ttl = rrl.period as u64; - let hasher = ahash::RandomState::new(); - - // TODO: think more about this ttl. if + // TODO: time to live is not right. we want this ttl counter to start only after redis is down. this works for now let local_cache = Cache::builder() .time_to_live(Duration::from_secs(ttl)) .max_capacity(cache_size) .name(prefix) - .build_with_hasher(hasher); + .build_with_hasher(ahash::RandomState::new()); Self { local_cache, diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index a62913d0..a4247268 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -19,6 +19,7 @@ entities = { path = "../entities" } migration = { path = "../migration" } redis-rate-limiter = { path = "../redis-rate-limiter" } +ahash = "0.8.0" anyhow = { version = "1.0.65", features = ["backtrace"] } arc-swap = "1.5.1" argh = "0.1.8" diff --git a/web3_proxy/src/app.rs b/web3_proxy/src/app.rs index d17e1085..37c16674 100644 --- a/web3_proxy/src/app.rs +++ b/web3_proxy/src/app.rs @@ -6,7 +6,7 @@ use crate::jsonrpc::JsonRpcForwardedResponse; use crate::jsonrpc::JsonRpcForwardedResponseEnum; use crate::jsonrpc::JsonRpcRequest; use crate::jsonrpc::JsonRpcRequestEnum; -use crate::rpcs::blockchain::{ArcBlock, BlockHashesMap, BlockId}; +use crate::rpcs::blockchain::{ArcBlock, BlockId}; use crate::rpcs::connections::Web3Connections; use crate::rpcs::request::OpenRequestHandleMetrics; use crate::rpcs::transactions::TxStatus; @@ -54,7 +54,7 @@ static APP_USER_AGENT: &str = concat!( /// block hash, method, params // TODO: better name type ResponseCacheKey = (H256, String, Option); -type ResponseCache = Cache; +type ResponseCache = Cache; pub type AnyhowJoinHandle = JoinHandle>; @@ -84,11 +84,11 @@ pub struct Web3ProxyApp { app_metrics: Arc, open_request_handle_metrics: Arc, /// store pending transactions that we've seen so that we don't send duplicates to subscribers - pub pending_transactions: Cache, + pub pending_transactions: Cache, pub frontend_ip_rate_limiter: Option>, pub frontend_key_rate_limiter: Option>, pub redis_pool: Option, - pub user_cache: Cache, + pub user_cache: Cache, } /// flatten a JoinError into an anyhow error @@ -232,7 +232,10 @@ impl Web3ProxyApp { // test the pool if let Err(err) = redis_pool.get().await { - error!("failed to connect to redis. some features will be disabled"); + error!( + ?err, + "failed to connect to redis. some features will be disabled" + ); }; Some(redis_pool) @@ -252,8 +255,9 @@ impl Web3ProxyApp { drop(pending_tx_receiver); // TODO: sized and timed expiration! - // TODO: put some in Redis, too? - let pending_transactions = Cache::new(10000); + let pending_transactions = Cache::builder() + .max_capacity(10_000) + .build_with_hasher(ahash::RandomState::new()); // TODO: don't drop the pending_tx_receiver. instead, read it to mark transactions as "seen". once seen, we won't re-send them // TODO: once a transaction is "Confirmed" we remove it from the map. this should prevent major memory leaks. @@ -261,7 +265,9 @@ impl Web3ProxyApp { // this block map is shared between balanced_rpcs and private_rpcs. // TODO: what limits should we have for expiration? - let block_map = BlockHashesMap::new(10_000); + let block_map = Cache::builder() + .max_capacity(10_000) + .build_with_hasher(ahash::RandomState::new()); let (balanced_rpcs, balanced_handle) = Web3Connections::spawn( top_config.app.chain_id, @@ -332,9 +338,13 @@ impl Web3ProxyApp { } // TODO: change this to a sized cache. theres some potentially giant responses that will break things - let response_cache = Cache::new(10_000); + let response_cache = Cache::builder() + .max_capacity(10_000) + .build_with_hasher(ahash::RandomState::new()); - let user_cache = Cache::new(10_000); + let user_cache = Cache::builder() + .max_capacity(10_000) + .build_with_hasher(ahash::RandomState::new()); let app = Self { config: top_config.app, diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index 0eab0cae..2e71ec70 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -1,4 +1,4 @@ -use crate::rpcs::blockchain::BlockHashesMap; +use crate::rpcs::blockchain::BlockHashesCache; use crate::rpcs::connection::Web3Connection; use crate::rpcs::request::OpenRequestHandleMetrics; use crate::{app::AnyhowJoinHandle, rpcs::blockchain::ArcBlock}; @@ -120,7 +120,7 @@ impl Web3ConnectionConfig { chain_id: u64, http_client: Option, http_interval_sender: Option>>, - block_map: BlockHashesMap, + block_map: BlockHashesCache, block_sender: Option>, tx_id_sender: Option>, open_request_handle_metrics: Arc, diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index 6d31476e..e6d1ea43 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -19,7 +19,7 @@ use tracing::{debug, trace, warn}; // TODO: type for Hydrated Blocks with their full transactions? pub type ArcBlock = Arc>; -pub type BlockHashesMap = Cache; +pub type BlockHashesCache = Cache; /// A block's hash and number. #[derive(Clone, Debug, Default, From, Serialize)] diff --git a/web3_proxy/src/rpcs/connection.rs b/web3_proxy/src/rpcs/connection.rs index f186996f..ccb8c328 100644 --- a/web3_proxy/src/rpcs/connection.rs +++ b/web3_proxy/src/rpcs/connection.rs @@ -1,5 +1,5 @@ ///! Rate-limited communication with a web3 provider. -use super::blockchain::{ArcBlock, BlockHashesMap, BlockId}; +use super::blockchain::{ArcBlock, BlockHashesCache, BlockId}; use super::provider::Web3Provider; use super::request::{OpenRequestHandle, OpenRequestHandleMetrics, OpenRequestResult}; use crate::app::{flatten_handle, AnyhowJoinHandle}; @@ -69,7 +69,7 @@ impl Web3Connection { hard_limit: Option<(u64, RedisPool)>, // TODO: think more about this type soft_limit: u32, - block_map: BlockHashesMap, + block_map: BlockHashesCache, block_sender: Option>, tx_id_sender: Option)>>, reconnect: bool, @@ -366,7 +366,7 @@ impl Web3Connection { self: &Arc, new_head_block: Result, ProviderError>, block_sender: &flume::Sender, - block_map: BlockHashesMap, + block_map: BlockHashesCache, ) -> anyhow::Result<()> { match new_head_block { Ok(None) => { @@ -428,7 +428,7 @@ impl Web3Connection { async fn subscribe( self: Arc, http_interval_sender: Option>>, - block_map: BlockHashesMap, + block_map: BlockHashesCache, block_sender: Option>, tx_id_sender: Option)>>, reconnect: bool, @@ -516,7 +516,7 @@ impl Web3Connection { self: Arc, http_interval_receiver: Option>, block_sender: flume::Sender, - block_map: BlockHashesMap, + block_map: BlockHashesCache, ) -> anyhow::Result<()> { info!(%self, "watching new heads"); diff --git a/web3_proxy/src/rpcs/connections.rs b/web3_proxy/src/rpcs/connections.rs index dea9bed1..17b49ce8 100644 --- a/web3_proxy/src/rpcs/connections.rs +++ b/web3_proxy/src/rpcs/connections.rs @@ -1,5 +1,5 @@ ///! Load balanced communication with a group of web3 providers -use super::blockchain::{ArcBlock, BlockHashesMap}; +use super::blockchain::{ArcBlock, BlockHashesCache}; use super::connection::Web3Connection; use super::request::{OpenRequestHandle, OpenRequestHandleMetrics, OpenRequestResult}; use super::synced_connections::SyncedConnections; @@ -36,12 +36,12 @@ pub struct Web3Connections { pub(super) conns: HashMap>, /// any requests will be forwarded to one (or more) of these connections pub(super) synced_connections: ArcSwap, - pub(super) pending_transactions: Cache, + pub(super) pending_transactions: Cache, /// TODO: this map is going to grow forever unless we do some sort of pruning. maybe store pruned in redis? /// all blocks, including orphans - pub(super) block_hashes: BlockHashesMap, + pub(super) block_hashes: BlockHashesCache, /// blocks on the heaviest chain - pub(super) block_numbers: Cache, + pub(super) block_numbers: Cache, /// TODO: this map is going to grow forever unless we do some sort of pruning. maybe store pruned in redis? /// TODO: what should we use for edges? pub(super) blockchain_graphmap: AsyncRwLock>, @@ -57,12 +57,12 @@ impl Web3Connections { server_configs: HashMap, http_client: Option, redis_pool: Option, - block_map: BlockHashesMap, + block_map: BlockHashesCache, head_block_sender: Option>, min_sum_soft_limit: u32, min_synced_rpcs: usize, pending_tx_sender: Option>, - pending_transactions: Cache, + pending_transactions: Cache, open_request_handle_metrics: Arc, ) -> anyhow::Result<(Arc, AnyhowJoinHandle<()>)> { let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded(); @@ -176,8 +176,14 @@ impl Web3Connections { let synced_connections = SyncedConnections::default(); // TODO: sizing and expiration on these caches! - let block_hashes = Cache::new(10000); - let block_numbers = Cache::new(10000); + let block_hashes = Cache::builder() + .time_to_idle(Duration::from_secs(600)) + .max_capacity(10_000) + .build_with_hasher(ahash::RandomState::new()); + let block_numbers = Cache::builder() + .time_to_idle(Duration::from_secs(600)) + .max_capacity(10_000) + .build_with_hasher(ahash::RandomState::new()); let connections = Arc::new(Self { conns: connections,