From 6e12edd5553a12a067c5043e3548706adc5db14f Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Mon, 5 Sep 2022 05:53:58 +0000 Subject: [PATCH] use sized Caches --- Cargo.lock | 235 +--------------------- Cargo.toml | 2 - TODO.md | 6 +- config/example.toml | 2 +- fifomap/Cargo.toml | 9 - fifomap/src/fifo_count_map.rs | 58 ------ fifomap/src/fifo_sized_map.rs | 92 --------- fifomap/src/lib.rs | 5 - linkedhashmap/Cargo.toml | 21 -- linkedhashmap/LICENSE | 1 - linkedhashmap/README.md | 33 ---- linkedhashmap/benches/lru.rs | 68 ------- linkedhashmap/src/lib.rs | 196 ------------------- linkedhashmap/src/linkedlist.rs | 249 ------------------------ web3_proxy/Cargo.toml | 2 +- web3_proxy/src/app.rs | 99 +++++----- web3_proxy/src/bin/web3_proxy.rs | 4 +- web3_proxy/src/block_number.rs | 8 +- web3_proxy/src/config.rs | 2 +- web3_proxy/src/frontend/http.rs | 4 +- web3_proxy/src/frontend/rate_limit.rs | 4 +- web3_proxy/src/frontend/rpc_proxy_ws.rs | 2 +- web3_proxy/src/rpcs/blockchain.rs | 78 ++++---- web3_proxy/src/rpcs/connection.rs | 62 +++--- web3_proxy/src/rpcs/connections.rs | 23 ++- 25 files changed, 148 insertions(+), 1117 deletions(-) delete mode 100644 fifomap/Cargo.toml delete mode 100644 fifomap/src/fifo_count_map.rs delete mode 100644 fifomap/src/fifo_sized_map.rs delete mode 100644 fifomap/src/lib.rs delete mode 100644 linkedhashmap/Cargo.toml delete mode 100644 linkedhashmap/LICENSE delete mode 100644 linkedhashmap/README.md delete mode 100644 linkedhashmap/benches/lru.rs delete mode 100644 linkedhashmap/src/lib.rs delete mode 100644 linkedhashmap/src/linkedlist.rs diff --git a/Cargo.lock b/Cargo.lock index 55fc133d..214ae79a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -659,18 +659,6 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "771fe0050b883fcc3ea2359b1a96bcfbc090b7116eae7c3c512c7a083fdf23d3" -[[package]] -name = "bstr" -version = "0.2.17" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba3569f383e8f1598449f1a423e72e99569137b47740b1da11ef19af3d5c3223" -dependencies = [ - "lazy_static", - "memchr", - "regex-automata", - "serde", -] - [[package]] name = "bumpalo" version = "3.9.1" @@ -781,21 +769,6 @@ dependencies = [ "serde_json", ] -[[package]] -name = "cast" -version = "0.2.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c24dab4283a142afa2fdca129b80ad2c6284e073930f964c3a1293c225ee39a" -dependencies = [ - "rustc_version", -] - -[[package]] -name = "cast" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" - [[package]] name = "cc" version = "1.0.73" @@ -836,17 +809,6 @@ dependencies = [ "generic-array 0.14.5", ] -[[package]] -name = "clap" -version = "2.34.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a0610544180c38b88101fecf2dd634b174a62eef6946f84dfc6a7127512b381c" -dependencies = [ - "bitflags", - "textwrap 0.11.0", - "unicode-width", -] - [[package]] name = "clap" version = "3.2.15" @@ -861,7 +823,7 @@ dependencies = [ "once_cell", "strsim", "termcolor", - "textwrap 0.15.0", + "textwrap", ] [[package]] @@ -1092,42 +1054,6 @@ dependencies = [ "cfg-if", ] -[[package]] -name = "criterion" -version = "0.3.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b01d6de93b2b6c65e17c634a26653a29d107b3c98c607c765bf38d041531cd8f" -dependencies = [ - "atty", - "cast 0.3.0", - "clap 2.34.0", - "criterion-plot", - "csv", - "itertools", - "lazy_static", - "num-traits", - "oorandom", - "plotters", - "rayon", - "regex", - "serde", - "serde_cbor", - "serde_derive", - "serde_json", - "tinytemplate", - "walkdir", -] - -[[package]] -name = "criterion-plot" -version = "0.4.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d00996de9f2f7559f7f4dc286073197f83e92256a59ed395f9aac01fe717da57" -dependencies = [ - "cast 0.2.7", - "itertools", -] - [[package]] name = "crossbeam-channel" version = "0.5.6" @@ -1221,28 +1147,6 @@ dependencies = [ "typenum", ] -[[package]] -name = "csv" -version = "1.1.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22813a6dc45b335f9bade10bf7271dc477e81113e89eb251a0bc2a8a81c536e1" -dependencies = [ - "bstr", - "csv-core", - "itoa 0.4.8", - "ryu", - "serde", -] - -[[package]] -name = "csv-core" -version = "0.1.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b2466559f260f48ad25fe6317b3c8dac77b5bdb5763ac7d9d6103530663bc90" -dependencies = [ - "memchr", -] - [[package]] name = "ctor" version = "0.1.22" @@ -1858,13 +1762,6 @@ dependencies = [ "subtle", ] -[[package]] -name = "fifomap" -version = "0.1.0" -dependencies = [ - "linkedhashmap", -] - [[package]] name = "filetime" version = "0.2.17" @@ -2203,12 +2100,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "half" -version = "1.8.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eabb4a44450da02c90444cf74558da904edde8fb4e9035a9a6a4e15445af0bd7" - [[package]] name = "handlebars" version = "4.3.3" @@ -2681,23 +2572,6 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "33a33a362ce288760ec6a508b94caaec573ae7d3bbbd91b87aa0bad4456839db" -[[package]] -name = "linked-hash-map" -version = "0.5.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f" - -[[package]] -name = "linkedhashmap" -version = "0.2.0" -dependencies = [ - "criterion", - "hashbrown", - "hashlink", - "linked-hash-map", - "slab", -] - [[package]] name = "lock_api" version = "0.4.8" @@ -2718,15 +2592,6 @@ dependencies = [ "value-bag", ] -[[package]] -name = "mach" -version = "0.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b823e83b2affd8f40a9ee8c29dbc56404c1e34cd2710921f2801e2cf29527afa" -dependencies = [ - "libc", -] - [[package]] name = "matchers" version = "0.1.0" @@ -2819,13 +2684,15 @@ version = "0.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0a89c33e91526792a0260425073c3db0b472cdca2cc6fcaa666dd6e65450462a" dependencies = [ + "async-io", + "async-lock", "crossbeam-channel", "crossbeam-epoch", "crossbeam-utils", + "futures-util", "num_cpus", "once_cell", "parking_lot 0.12.1", - "quanta", "scheduled-thread-pool", "skeptic", "smallvec", @@ -3018,12 +2885,6 @@ version = "1.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "074864da206b4973b84eb91683020dbefd6a8c3f0f38e054d93954e891935e4e" -[[package]] -name = "oorandom" -version = "11.1.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575" - [[package]] name = "opaque-debug" version = "0.2.3" @@ -3406,34 +3267,6 @@ version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1df8c4ec4b0627e53bdf214615ad287367e482558cf84b109250b37464dc03ae" -[[package]] -name = "plotters" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32a3fd9ec30b9749ce28cd91f255d569591cdf937fe280c312143e3c4bad6f2a" -dependencies = [ - "num-traits", - "plotters-backend", - "plotters-svg", - "wasm-bindgen", - "web-sys", -] - -[[package]] -name = "plotters-backend" -version = "0.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d88417318da0eaf0fdcdb51a0ee6c3bed624333bff8f946733049380be67ac1c" - -[[package]] -name = "plotters-svg" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "521fa9638fa597e1dc53e9412a4f9cefb01187ee1f7413076f9e6749e2885ba9" -dependencies = [ - "plotters-backend", -] - [[package]] name = "polling" version = "2.2.0" @@ -3600,22 +3433,6 @@ dependencies = [ "unicase", ] -[[package]] -name = "quanta" -version = "0.10.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b7e31331286705f455e56cca62e0e717158474ff02b7936c1fa596d983f4ae27" -dependencies = [ - "crossbeam-utils", - "libc", - "mach", - "once_cell", - "raw-cpuid", - "wasi 0.10.2+wasi-snapshot-preview1", - "web-sys", - "winapi", -] - [[package]] name = "quote" version = "1.0.18" @@ -3774,15 +3591,6 @@ dependencies = [ "rand_core 0.3.1", ] -[[package]] -name = "raw-cpuid" -version = "10.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6aa2540135b6a94f74c7bc90ad4b794f822026a894f3d7bcd185c100d13d4ad6" -dependencies = [ - "bitflags", -] - [[package]] name = "rayon" version = "1.5.3" @@ -4176,7 +3984,7 @@ checksum = "8cefd2d8878bd7e8b7313f036725fa3d08585d101fb1bf3adca7fc13f553f906" dependencies = [ "async-std", "chrono", - "clap 3.2.15", + "clap", "dotenv", "regex", "sea-schema", @@ -4205,7 +4013,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "70a28587780fbae5c414a62bf0b32405f9da2e000d94f426abf214b2b2e68631" dependencies = [ "async-trait", - "clap 3.2.15", + "clap", "dotenv", "sea-orm", "sea-orm-cli", @@ -4347,16 +4155,6 @@ dependencies = [ "serde_json", ] -[[package]] -name = "serde_cbor" -version = "0.11.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2bef2ebfde456fb76bbcf9f59315333decc4fda0b2b44b420243c11e0f5ec1f5" -dependencies = [ - "half", - "serde", -] - [[package]] name = "serde_derive" version = "1.0.144" @@ -4792,7 +4590,7 @@ checksum = "a55e00b6f95abd889ce398bd7eab2a9c62cd27281cf1bfba70847340557434cf" dependencies = [ "anyhow", "cfg-if", - "clap 3.2.15", + "clap", "console 0.14.1", "dialoguer", "fs2", @@ -4888,15 +4686,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "textwrap" -version = "0.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d326610f408c7a4eb6f51c37c330e496b08506c9457c9d34287ecc38809fb060" -dependencies = [ - "unicode-width", -] - [[package]] name = "textwrap" version = "0.15.0" @@ -4980,16 +4769,6 @@ dependencies = [ "crunchy", ] -[[package]] -name = "tinytemplate" -version = "1.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be4d6b5f19ff7664e8c98d03e2139cb510db9b0a60b55f8e8709b689d939b6bc" -dependencies = [ - "serde", - "serde_json", -] - [[package]] name = "tinyvec" version = "1.6.0" diff --git a/Cargo.toml b/Cargo.toml index 6c860d9d..01a2095c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,8 +1,6 @@ [workspace] members = [ "entities", - "fifomap", - "linkedhashmap", "migration", "redis-rate-limit", "web3_proxy", diff --git a/TODO.md b/TODO.md index e514cc3c..1b6157ca 100644 --- a/TODO.md +++ b/TODO.md @@ -137,11 +137,13 @@ - [ ] when using a bunch of slow public servers, i see "no servers in sync" even when things should be right - [ ] i think checking the parents of the heaviest chain works most of the time, but not always - maybe iterate connection heads by total weight? i still think we need to include parent hashes -- [ ] some of the DashMaps grow unbounded! Make a "SizedDashMap" that cleans up old rows with some garbage collection task - +- [x] some of the DashMaps grow unbounded! Make/find a "SizedDashMap" that cleans up old rows with some garbage collection task + - moka is exactly what we need +- [ ] add size limits to all the Caches ## V1 +- [ ] benchmarks of the different Cache implementations (futures vs dash) - [ ] benchmarks from https://github.com/llamafolio/llamafolio-api/ - [ ] benchmarks from ethspam and versus - [ ] benchmarks from other things diff --git a/config/example.toml b/config/example.toml index 7de66214..b0c14752 100644 --- a/config/example.toml +++ b/config/example.toml @@ -9,7 +9,7 @@ redis_url = "redis://dev-redis:6379/" # TODO: how do we find the optimal redis_max_connections? too high actually ends up being slower redis_max_connections = 99 redirect_public_url = "https://llamanodes.com/free-rpc-stats" -redirect_user_url = "https://llamanodes.com/user-rpc-stats/{{user_id}}" +redirect_user_url = "https://llamanodes.com/user-rpc-stats/{{user_address}}" public_rate_limit_per_minute = 0 # 1GB of cache response_cache_max_bytes = 10000000000 diff --git a/fifomap/Cargo.toml b/fifomap/Cargo.toml deleted file mode 100644 index d351684f..00000000 --- a/fifomap/Cargo.toml +++ /dev/null @@ -1,9 +0,0 @@ -[package] -name = "fifomap" -version = "0.1.0" -edition = "2021" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -linkedhashmap = { path = "../linkedhashmap", features = ["inline-more"] } diff --git a/fifomap/src/fifo_count_map.rs b/fifomap/src/fifo_count_map.rs deleted file mode 100644 index 461d25f2..00000000 --- a/fifomap/src/fifo_count_map.rs +++ /dev/null @@ -1,58 +0,0 @@ -use linkedhashmap::LinkedHashMap; -use std::{ - borrow::Borrow, - collections::hash_map::RandomState, - hash::{BuildHasher, Hash}, -}; - -pub struct FifoCountMap -where - K: Hash + Eq + Clone, - S: BuildHasher + Default, -{ - /// size limit for the map - max_count: usize, - /// FIFO - map: LinkedHashMap, -} - -impl FifoCountMap -where - K: Hash + Eq + Clone, - S: BuildHasher + Default, -{ - pub fn new(max_count: usize) -> Self { - Self { - max_count, - map: Default::default(), - } - } -} - -impl FifoCountMap -where - K: Hash + Eq + Clone, - S: BuildHasher + Default, -{ - /// if the size is larger than `self.max_size_bytes`, drop items (first in, first out) - /// no item is allowed to take more than `1/max_share` of the cache - pub fn insert(&mut self, key: K, value: V) { - // drop items until the cache has enough room for the new data - // TODO: this probably has wildly variable timings - if self.map.len() > self.max_count { - // TODO: this isn't an LRU. it's a "least recently created". does that have a fancy name? should we make it an lru? these caches only live for one block - self.map.pop_front(); - } - - self.map.insert(key, value); - } - - /// get an item from the cache, or None - pub fn get(&self, key: &Q) -> Option<&V> - where - K: Borrow, - Q: Hash + Eq, - { - self.map.get(key) - } -} diff --git a/fifomap/src/fifo_sized_map.rs b/fifomap/src/fifo_sized_map.rs deleted file mode 100644 index 584e8602..00000000 --- a/fifomap/src/fifo_sized_map.rs +++ /dev/null @@ -1,92 +0,0 @@ -use linkedhashmap::LinkedHashMap; -use std::{ - borrow::Borrow, - collections::hash_map::RandomState, - hash::{BuildHasher, Hash}, - mem::size_of_val, -}; - -// TODO: if values have wildly different sizes, this is good. but if they are all about the same, this could be simpler -pub struct FifoSizedMap -where - K: Hash + Eq + Clone, - S: BuildHasher + Default, -{ - /// size limit in bytes for the map - max_size_bytes: usize, - /// size limit in bytes for a single item in the map - max_item_bytes: usize, - /// FIFO - map: LinkedHashMap, -} - -impl FifoSizedMap -where - K: Hash + Eq + Clone, - S: BuildHasher + Default, -{ - pub fn new(max_size_bytes: usize, max_share: usize) -> Self { - let max_item_bytes = max_size_bytes / max_share; - - Self { - max_size_bytes, - max_item_bytes, - map: Default::default(), - } - } -} - -impl Default for FifoSizedMap -where - K: Hash + Eq + Clone, - S: BuildHasher + Default, -{ - fn default() -> Self { - Self::new( - // 100 MB default cache - 100_000_000, - // items cannot take more than 1% of the cache - 100, - ) - } -} - -impl FifoSizedMap -where - K: Hash + Eq + Clone, - S: BuildHasher + Default, -{ - /// if the size is larger than `self.max_size_bytes`, drop items (first in, first out) - /// no item is allowed to take more than `1/max_share` of the cache - pub fn insert(&mut self, key: K, value: V) -> bool { - // TODO: this might be too naive. not sure how much overhead the object has - let new_size = size_of_val(&key) + size_of_val(&value); - - // no item is allowed to take more than 1% of the cache - // TODO: get this from config? - // TODO: trace logging - if new_size > self.max_item_bytes { - return false; - } - - // drop items until the cache has enough room for the new data - // TODO: this probably has wildly variable timings - while size_of_val(&self.map) + new_size > self.max_size_bytes { - // TODO: this isn't an LRU. it's a "least recently created". does that have a fancy name? should we make it an lru? these caches only live for one block - self.map.pop_front(); - } - - self.map.insert(key, value); - - true - } - - /// get an item from the cache, or None - pub fn get(&self, key: &Q) -> Option<&V> - where - K: Borrow, - Q: Hash + Eq, - { - self.map.get(key) - } -} diff --git a/fifomap/src/lib.rs b/fifomap/src/lib.rs deleted file mode 100644 index 8b2655bc..00000000 --- a/fifomap/src/lib.rs +++ /dev/null @@ -1,5 +0,0 @@ -mod fifo_count_map; -mod fifo_sized_map; - -pub use fifo_count_map::FifoCountMap; -pub use fifo_sized_map::FifoSizedMap; diff --git a/linkedhashmap/Cargo.toml b/linkedhashmap/Cargo.toml deleted file mode 100644 index 009ed907..00000000 --- a/linkedhashmap/Cargo.toml +++ /dev/null @@ -1,21 +0,0 @@ -[package] -name = "linkedhashmap" -version = "0.2.0" -authors = ["quininer "] -edition = "2021" - -[features] -inline-more = [ "hashbrown" ] - -[dependencies] -slab = "0.4.7" -hashbrown = { version = "0.12.3", optional = true } - -[dev-dependencies] -criterion = "0.3.6" -hashlink = "0.8.0" -linked-hash-map = "0.5.6" - -[[bench]] -name = "lru" -harness = false diff --git a/linkedhashmap/LICENSE b/linkedhashmap/LICENSE deleted file mode 100644 index e0d985c2..00000000 --- a/linkedhashmap/LICENSE +++ /dev/null @@ -1 +0,0 @@ -https://github.com/quininer/linkedhashmap diff --git a/linkedhashmap/README.md b/linkedhashmap/README.md deleted file mode 100644 index 51b41acb..00000000 --- a/linkedhashmap/README.md +++ /dev/null @@ -1,33 +0,0 @@ -# Linked HashMap - -but 0-unsafe code. :) - - - -# benchmarks - -## default - -``` -linkedhashmap time: [88.299 ns 89.096 ns 89.886 ns] - change: [-4.3828% -2.6982% -1.0684%] (p = 0.00 < 0.05) - -hashlink time: [59.497 ns 60.937 ns 62.916 ns] - change: [-3.4227% -0.9224% +1.7368%] (p = 0.51 > 0.05) - -linked-hash-map time: [94.379 ns 95.305 ns 96.309 ns] - change: [-0.7721% +0.6709% +2.0113%] (p = 0.37 > 0.05) -``` - -## inline-more feature - -``` -linkedhashmap time: [59.607 ns 60.291 ns 61.013 ns] - change: [+1.4918% +3.2842% +4.9448%] (p = 0.00 < 0.05) - -hashlink time: [60.300 ns 60.895 ns 61.492 ns] - change: [+2.7329% +4.4155% +6.0299%] (p = 0.00 < 0.05) - -linked-hash-map time: [96.841 ns 99.359 ns 102.60 ns] - change: [+2.1387% +4.0285% +6.2305%] (p = 0.00 < 0.05) -``` diff --git a/linkedhashmap/benches/lru.rs b/linkedhashmap/benches/lru.rs deleted file mode 100644 index 2386ae00..00000000 --- a/linkedhashmap/benches/lru.rs +++ /dev/null @@ -1,68 +0,0 @@ -use criterion::{black_box, criterion_group, criterion_main, Criterion}; -use std::collections::hash_map; - -struct Bar([u64; 4]); - -const CAP: usize = 128; - -fn bench_linkedhsahmap(c: &mut Criterion) { - use linkedhashmap::LinkedHashMap; - - c.bench_function("linkedhashmap", |b| { - let mut map = LinkedHashMap::with_capacity_and_hasher(CAP, hash_map::RandomState::new()); - let mut count = 0; - - b.iter(|| { - count += 1; - let bar = black_box(Bar([0x42; 4])); - map.insert(count, bar); - if map.len() >= CAP { - map.pop_front(); - } - }); - }); -} - -fn bench_hashlink(c: &mut Criterion) { - use hashlink::LinkedHashMap; - - c.bench_function("hashlink", |b| { - let mut map = LinkedHashMap::with_capacity_and_hasher(CAP, hash_map::RandomState::new()); - let mut count = 0; - - b.iter(|| { - count += 1; - let bar = black_box(Bar([0x42; 4])); - map.insert(count, bar); - if map.len() >= CAP { - map.pop_front(); - } - }); - }); -} - -fn bench_linked_hash_map(c: &mut Criterion) { - use linked_hash_map::LinkedHashMap; - - c.bench_function("linked-hash-map", |b| { - let mut map = LinkedHashMap::with_capacity_and_hasher(CAP, hash_map::RandomState::new()); - let mut count = 0; - - b.iter(|| { - count += 1; - let bar = black_box(Bar([0x42; 4])); - map.insert(count, bar); - if map.len() >= CAP { - map.pop_front(); - } - }); - }); -} - -criterion_group!( - lru, - bench_linkedhsahmap, - bench_hashlink, - bench_linked_hash_map -); -criterion_main!(lru); diff --git a/linkedhashmap/src/lib.rs b/linkedhashmap/src/lib.rs deleted file mode 100644 index 83834259..00000000 --- a/linkedhashmap/src/lib.rs +++ /dev/null @@ -1,196 +0,0 @@ -pub mod linkedlist; - -use linkedlist::{LinkedList, NodeSlab}; -use std::borrow::Borrow; -use std::collections::hash_map::RandomState; -use std::hash::{BuildHasher, Hash}; - -#[cfg(feature = "hashbrown")] -use hashbrown::HashMap; - -#[cfg(not(feature = "hashbrown"))] -use std::collections::HashMap; - -pub struct LinkedHashMap { - slab: NodeSlab<(K, V)>, - list: LinkedList, - map: HashMap, -} - -impl LinkedHashMap -where - K: Hash + Eq + Clone, -{ - #[inline] - pub fn new() -> LinkedHashMap { - LinkedHashMap { - slab: NodeSlab::new(), - list: LinkedList::new(), - map: HashMap::with_capacity_and_hasher(0, RandomState::default()), - } - } - - #[inline] - pub fn with_capacity(cap: usize) -> LinkedHashMap { - LinkedHashMap { - slab: NodeSlab::with_capacity(cap), - list: LinkedList::new(), - map: HashMap::with_capacity_and_hasher(cap, RandomState::default()), - } - } -} - -impl Default for LinkedHashMap -where - K: Hash + Eq + Clone, - S: BuildHasher + Default, -{ - #[inline] - fn default() -> Self { - Self::with_hasher(S::default()) - } -} - -impl LinkedHashMap -where - K: Hash + Eq + Clone, - S: BuildHasher, -{ - pub fn with_hasher(hash_builder: S) -> Self { - LinkedHashMap { - slab: NodeSlab::new(), - list: LinkedList::new(), - map: HashMap::with_hasher(hash_builder), - } - } - - pub fn with_capacity_and_hasher(capacity: usize, hash_builder: S) -> Self { - LinkedHashMap { - slab: NodeSlab::with_capacity(capacity), - list: LinkedList::new(), - map: HashMap::with_capacity_and_hasher(capacity, hash_builder), - } - } - - #[inline] - pub fn len(&self) -> usize { - self.map.len() - } - - #[inline] - pub fn is_empty(&self) -> bool { - self.map.is_empty() - } - - #[inline] - pub fn reserve(&mut self, additional: usize) { - self.slab.reserve(additional); - self.map.reserve(additional); - } - - #[inline] - pub fn shrink_to_fit(&mut self) { - self.slab.shrink_to_fit(); - self.map.shrink_to_fit(); - } - - #[inline] - pub fn clear(&mut self) { - self.slab.clear(); - self.list = LinkedList::new(); - self.map.clear(); - } - - #[cfg_attr(feature = "inline-more", inline(always))] - pub fn insert(&mut self, key: K, value: V) -> Option { - let index = self.list.push(&mut self.slab, (key.clone(), value)); - let index = self.map.insert(key, index)?; - let (_, value) = self.list.remove(&mut self.slab, index)?; - Some(value) - } - - #[cfg_attr(feature = "inline-more", inline(always))] - pub fn get(&self, key: &Q) -> Option<&V> - where - K: Borrow, - Q: Hash + Eq, - { - let index = *self.map.get(key)?; - let (_, value) = self.slab.get(index)?; - Some(value) - } - - #[cfg_attr(feature = "inline-more", inline(always))] - pub fn get_mut(&mut self, key: &Q) -> Option<&mut V> - where - K: Borrow, - Q: Hash + Eq, - { - let index = *self.map.get(key)?; - let (_, value) = self.slab.get_mut(index)?; - Some(value) - } - - #[cfg_attr(feature = "inline-more", inline(always))] - pub fn touch(&mut self, key: &Q) -> Option<&mut V> - where - K: Borrow, - Q: Hash + Eq, - { - let index = *self.map.get(key)?; - let (_, value) = self.list.touch(&mut self.slab, index)?; - Some(value) - } - - #[cfg_attr(feature = "inline-more", inline(always))] - pub fn remove(&mut self, key: &Q) -> Option - where - K: Borrow, - Q: Hash + Eq, - { - let index = self.map.remove(key)?; - let (_, value) = self.list.remove(&mut self.slab, index)?; - Some(value) - } - - #[cfg_attr(feature = "inline-more", inline(always))] - pub fn pop_front(&mut self) -> Option<(K, V)> { - let (k, v) = self.list.pop_front(&mut self.slab)?; - self.map.remove(&k)?; - Some((k, v)) - } - - #[cfg_attr(feature = "inline-more", inline(always))] - pub fn pop_last(&mut self) -> Option<(K, V)> { - let (k, v) = self.list.pop_last(&mut self.slab)?; - self.map.remove(&k)?; - Some((k, v)) - } -} - -#[test] -fn test_linkedhashmap() { - #[derive(PartialEq, Eq, Debug)] - struct Bar(u64); - - let mut map = LinkedHashMap::new(); - - map.insert(0, Bar(0)); - map.insert(3, Bar(3)); - map.insert(2, Bar(2)); - map.insert(1, Bar(1)); - - assert_eq!(4, map.len()); - - assert_eq!(Some(&Bar(2)), map.get(&2)); - assert_eq!(Some(&mut Bar(3)), map.touch(&3)); - assert_eq!(Some((0, Bar(0))), map.pop_front()); - assert_eq!(Some((3, Bar(3))), map.pop_last()); - assert_eq!(Some((1, Bar(1))), map.pop_last()); - - assert_eq!(1, map.len()); - assert_eq!(Some(&mut Bar(2)), map.get_mut(&2)); - - map.clear(); - assert_eq!(0, map.len()); -} diff --git a/linkedhashmap/src/linkedlist.rs b/linkedhashmap/src/linkedlist.rs deleted file mode 100644 index bb8b3678..00000000 --- a/linkedhashmap/src/linkedlist.rs +++ /dev/null @@ -1,249 +0,0 @@ -use slab::Slab; - -#[derive(Default)] -pub struct NodeSlab(Slab>); - -#[derive(Default, Debug)] -pub struct LinkedList { - start: Option, - end: Option, -} - -pub struct Node { - value: T, - prev: Option, - next: Option, -} - -impl LinkedList { - pub const fn new() -> LinkedList { - LinkedList { - start: None, - end: None, - } - } - - #[cfg_attr(feature = "inline-more", inline(always))] - pub fn push(&mut self, NodeSlab(slab): &mut NodeSlab, value: T) -> usize { - let index = slab.insert(Node { - value, - prev: self.end.or(self.start), - next: None, - }); - - if let Some(old_end) = self.end.replace(index) { - slab[old_end].next = Some(index); - } else { - self.start = Some(index); - } - - index - } - - #[cfg_attr(feature = "inline-more", inline(always))] - pub fn pop_front(&mut self, NodeSlab(slab): &mut NodeSlab) -> Option { - let index = self.start?; - let node = slab.remove(index); - - debug_assert!(node.prev.is_none()); - - self.start = node.next; - - if let Some(index) = self.start { - slab[index].prev.take(); - } - - if Some(index) == self.end { - self.end.take(); - } - - Some(node.value) - } - - #[cfg_attr(feature = "inline-more", inline(always))] - pub fn pop_last(&mut self, NodeSlab(slab): &mut NodeSlab) -> Option { - let index = self.end?; - let node = slab.remove(index); - - debug_assert!(node.next.is_none()); - - self.end = node.prev; - - if let Some(index) = self.end { - slab[index].next.take(); - } - - if Some(index) == self.start { - self.start.take(); - } - - Some(node.value) - } - - #[cfg_attr(feature = "inline-more", inline(always))] - pub fn touch<'a, T>( - &mut self, - NodeSlab(slab): &'a mut NodeSlab, - index: usize, - ) -> Option<&'a mut T> { - let (node_prev, node_next) = { - let node = slab.get(index)?; - (node.prev, node.next) - }; - - if let Some(next) = node_next { - slab[next].prev = node_prev; - } else { - debug_assert_eq!(Some(index), self.end); - - return Some(&mut slab[index].value); - } - - if let Some(prev) = node_prev { - slab[prev].next = node_next; - } else { - self.start = node_next; - } - - let end = self.end.replace(index)?; - slab[end].next = Some(index); - - let node = &mut slab[index]; - node.prev = Some(end); - node.next.take(); - - Some(&mut node.value) - } - - #[cfg_attr(feature = "inline-more", inline(always))] - pub fn remove(&mut self, NodeSlab(slab): &mut NodeSlab, index: usize) -> Option { - let node = if slab.contains(index) { - // why not return Option :( - slab.remove(index) - } else { - return None; - }; - - if let Some(prev) = node.prev { - slab[prev].next = node.next; - } else { - self.start = node.next; - } - - if let Some(next) = node.next { - slab[next].prev = node.prev; - } else { - self.end = node.prev; - } - - Some(node.value) - } -} - -impl NodeSlab { - #[inline] - pub fn new() -> NodeSlab { - NodeSlab(Slab::new()) - } - - #[inline] - pub fn with_capacity(cap: usize) -> NodeSlab { - NodeSlab(Slab::with_capacity(cap)) - } - - #[inline] - pub fn get(&self, index: usize) -> Option<&T> { - Some(&self.0.get(index)?.value) - } - - #[inline] - pub fn get_mut(&mut self, index: usize) -> Option<&mut T> { - Some(&mut self.0.get_mut(index)?.value) - } - - #[inline] - pub fn reserve(&mut self, additional: usize) { - self.0.reserve(additional); - } - - #[inline] - pub fn shrink_to_fit(&mut self) { - self.0.shrink_to_fit(); - } - - #[inline] - pub(crate) fn clear(&mut self) { - self.0.clear(); - } -} - -#[test] -fn test_linkedlist() { - let mut slab = NodeSlab::new(); - let mut list = LinkedList::new(); - - list.push(&mut slab, 0); - assert_eq!(Some(0), list.pop_front(&mut slab)); - assert_eq!(None, list.pop_front(&mut slab)); - - list.push(&mut slab, 1); - assert_eq!(Some(1), list.pop_last(&mut slab)); - assert_eq!(None, list.pop_last(&mut slab)); - - list.push(&mut slab, 2); - list.push(&mut slab, 3); - assert_eq!(Some(2), list.pop_front(&mut slab)); - assert_eq!(Some(3), list.pop_last(&mut slab)); - eprintln!("{:?}", list); - assert_eq!(None, list.pop_front(&mut slab)); - eprintln!("{:?}", list); - assert_eq!(None, list.pop_last(&mut slab)); - - list.push(&mut slab, 4); - list.push(&mut slab, 5); - assert_eq!(Some(5), list.pop_last(&mut slab)); - assert_eq!(Some(4), list.pop_front(&mut slab)); - assert_eq!(None, list.pop_last(&mut slab)); - assert_eq!(None, list.pop_front(&mut slab)); - - let index6 = list.push(&mut slab, 6); - let index7 = list.push(&mut slab, 7); - let index8 = list.push(&mut slab, 8); - assert_eq!(Some(7), list.remove(&mut slab, index7)); - assert_eq!(None, list.remove(&mut slab, index7)); - assert_eq!(Some(&6), slab.get(index6)); - assert_eq!(Some(&8), slab.get(index8)); - assert_eq!(Some(6), list.pop_front(&mut slab)); - assert_eq!(Some(8), list.pop_front(&mut slab)); - - let index9 = list.push(&mut slab, 9); - list.push(&mut slab, 10); - assert_eq!(Some(&mut 9), list.touch(&mut slab, index9)); - assert_eq!(Some(10), list.pop_front(&mut slab)); - assert_eq!(Some(9), list.pop_front(&mut slab)); - - let index11 = list.push(&mut slab, 11); - let index12 = list.push(&mut slab, 12); - list.push(&mut slab, 13); - assert_eq!(Some(&mut 12), list.touch(&mut slab, index12)); - assert_eq!(Some(&mut 11), list.touch(&mut slab, index11)); - assert_eq!(Some(13), list.pop_front(&mut slab)); - assert_eq!(Some(12), list.pop_front(&mut slab)); - assert_eq!(Some(11), list.pop_front(&mut slab)); - - for i in 0..32 { - list.push(&mut slab, i); - } - for i in 0..32 { - assert_eq!(Some(i), list.pop_front(&mut slab)); - } - assert_eq!(None, list.pop_front(&mut slab)); - - for i in 0..32 { - list.push(&mut slab, i); - } - for i in (0..32).rev() { - assert_eq!(Some(i), list.pop_last(&mut slab)); - } - assert_eq!(None, list.pop_last(&mut slab)); -} diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index 275cd290..794c2f1d 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -34,7 +34,7 @@ flume = "0.10.14" futures = { version = "0.3.24", features = ["thread-pool"] } hashbrown = { version = "0.12.3", features = ["serde"] } http = "0.2.8" -moka = "0.9.3" +moka = { version = "0.9.3", default-features = false, features = ["future"] } notify = "5.0.0" num = "0.4.0" parking_lot = { version = "0.12.1", features = ["arc_lock"] } diff --git a/web3_proxy/src/app.rs b/web3_proxy/src/app.rs index 4dbbf971..bd6fd3da 100644 --- a/web3_proxy/src/app.rs +++ b/web3_proxy/src/app.rs @@ -12,7 +12,6 @@ use crate::rpcs::transactions::TxStatus; use crate::stats::AppStats; use anyhow::Context; use axum::extract::ws::Message; -use dashmap::DashMap; use derive_more::From; use ethers::core::utils::keccak256; use ethers::prelude::{Address, Block, Bytes, TxHash, H256, U64}; @@ -22,6 +21,7 @@ use futures::stream::FuturesUnordered; use futures::stream::StreamExt; use futures::Future; use migration::{Migrator, MigratorTrait}; +use moka::future::Cache; use redis_rate_limit::bb8::PooledConnection; use redis_rate_limit::{ bb8::{self, ErrorSink}, @@ -39,7 +39,7 @@ use tokio::sync::{broadcast, watch}; use tokio::task::JoinHandle; use tokio::time::{timeout, Instant}; use tokio_stream::wrappers::{BroadcastStream, WatchStream}; -use tracing::{debug, info, info_span, instrument, trace, warn, Instrument}; +use tracing::{debug, error, info, info_span, instrument, trace, warn, Instrument}; use uuid::Uuid; // TODO: make this customizable? @@ -54,8 +54,7 @@ static APP_USER_AGENT: &str = concat!( // TODO: better name type ResponseCacheKey = (H256, String, Option); -// TODO!! a RWLock on this made us super slow. But a DashMap makes this grow unbounded! -type ResponseCache = DashMap; +type ResponseCache = Cache; pub type AnyhowJoinHandle = JoinHandle>; @@ -79,15 +78,16 @@ pub struct Web3ProxyApp { // TODO: broadcast channel instead? head_block_receiver: watch::Receiver, pending_tx_sender: broadcast::Sender, + /// TODO: this doesn't ever get incremented! pub active_requests: AtomicUsize, pub config: AppConfig, pub db_conn: Option, - pub pending_transactions: Arc>, + /// store pending transactions that we've seen so that we don't send duplicates to subscribers + pub pending_transactions: Cache, pub rate_limiter: Option, pub redis_pool: Option, pub stats: AppStats, - // TODO: this grows unbounded! Make a "SizedDashMap" that cleans up old rows with some garbage collection task - pub user_cache: DashMap, + pub user_cache: Cache, } /// flatten a JoinError into an anyhow error @@ -151,9 +151,13 @@ impl Web3ProxyApp { Pin>>>, )> { // safety checks on the config + debug!("redirect_user_url: {}", top_config.app.redirect_user_url); assert!( - top_config.app.redirect_user_url.contains("{{user_id}}"), - "redirect user url must contain \"{{user_id}}\"" + top_config + .app + .redirect_user_url + .contains("{{user_address}}"), + "redirect user url must contain \"{{user_address}}\"" ); // first, we connect to mysql and make sure the latest migrations have run @@ -235,15 +239,17 @@ impl Web3ProxyApp { // TODO: use this? it could listen for confirmed transactions and then clear pending_transactions, but the head_block_sender is doing that drop(pending_tx_receiver); - // TODO: this will grow unbounded!! add some expiration to this. and probably move to redis - let pending_transactions = Arc::new(DashMap::new()); + // TODO: sized and timed expiration! + // TODO: put some in Redis, too? + let pending_transactions = Cache::new(10000); // TODO: don't drop the pending_tx_receiver. instead, read it to mark transactions as "seen". once seen, we won't re-send them // TODO: once a transaction is "Confirmed" we remove it from the map. this should prevent major memory leaks. // TODO: we should still have some sort of expiration or maximum size limit for the map // this block map is shared between balanced_rpcs and private_rpcs. - let block_map = BlockHashesMap::default(); + // TODO: what limits should we have for expiration? + let block_map = BlockHashesMap::new(10_000); let (balanced_rpcs, balanced_handle) = Web3Connections::spawn( top_config.app.chain_id, @@ -300,12 +306,16 @@ impl Web3ProxyApp { ) }); + // TODO: change this to a sized cache + let response_cache = Cache::new(1_000); + let user_cache = Cache::new(10_000); + let app = Self { config: top_config.app, balanced_rpcs, private_rpcs, active_requests: Default::default(), - response_cache: Default::default(), + response_cache, head_block_receiver, pending_tx_sender, pending_transactions, @@ -313,9 +323,7 @@ impl Web3ProxyApp { db_conn, redis_pool, stats: app_stats, - // TODO: make the size configurable - // TODO: better type for this? - user_cache: Default::default(), + user_cache, }; let app = Arc::new(app); @@ -613,7 +621,7 @@ impl Web3ProxyApp { trace!(?request.method, "cache hit!"); // TODO: can we make references work? maybe put them in an Arc? - return Ok(Ok(response.to_owned())); + return Ok(Ok(response)); } // TODO: another lock here so that we don't send the same request to a backend more than onces xzs @@ -638,7 +646,8 @@ impl Web3ProxyApp { let span = info_span!("rpc_request"); // let _enter = span.enter(); // DO NOT ENTER! we can't use enter across awaits! (clippy lint soon) - let partial_response: serde_json::Value = match request.method.as_ref() { + // TODO: don't clone + let partial_response: serde_json::Value = match request.method.clone().as_ref() { // lots of commands are blocked "admin_addPeer" | "admin_datadir" @@ -819,47 +828,45 @@ impl Web3ProxyApp { trace!(?min_block_needed, ?method); - let cached_response_result = - self.cached_response(min_block_needed, &request).await?; - // TODO: emit a stat on error. maybe with .map_err? - let cache_key = match cached_response_result { + let cache_key = match self.cached_response(min_block_needed, &request).await? { Ok(mut cache_result) => { + // we got a cache hit! no need to do any backend requests. + // put our request id on the cached response - // TODO: maybe only cache the inner result? + // TODO: maybe only cache the inner result? then have a JsonRpcForwardedResponse::from_cache cache_result.id = request.id; + // emit a stat + return Ok(cache_result); } Err(cache_key) => cache_key, }; - let response = match method { - "temporarily disabled" => { - // "eth_getTransactionByHash" | "eth_getTransactionReceipt" => { - // TODO: try_send_all serially with retries instead of parallel - self.private_rpcs - .try_send_all_upstream_servers(request, min_block_needed) - .await? - } - _ => { - // TODO: retries? - self.balanced_rpcs - .try_send_best_upstream_server(request, min_block_needed) - .await? - } - }; - // TODO: move this caching outside this match and cache some of the other responses? // TODO: cache the warp::reply to save us serializing every time? - if self + let response = self .response_cache - .insert(cache_key.clone(), response.clone()) - .is_some() - { - // TODO: we had another lock to prevent this, but its not worth the extra locking - debug!("already cached") - } + .try_get_with(cache_key, async move { + match method { + "temporarily disabled" => { + // "eth_getTransactionByHash" | "eth_getTransactionReceipt" => { + // TODO: try_send_all serially with retries instead of parallel + self.private_rpcs + .try_send_all_upstream_servers(request, min_block_needed) + .await + } + _ => { + // TODO: retries? + self.balanced_rpcs + .try_send_best_upstream_server(request, min_block_needed) + .await + } + } + }) + .await + .unwrap(); return Ok(response); } diff --git a/web3_proxy/src/bin/web3_proxy.rs b/web3_proxy/src/bin/web3_proxy.rs index 7cdacfda..d13f6fa8 100644 --- a/web3_proxy/src/bin/web3_proxy.rs +++ b/web3_proxy/src/bin/web3_proxy.rs @@ -215,10 +215,10 @@ mod tests { default_requests_per_minute: 6_000_000, min_sum_soft_limit: 1, min_synced_rpcs: 1, - public_rate_limit_per_minute: 0, + public_rate_limit_per_minute: 6_000_000, response_cache_max_bytes: 10_usize.pow(7), redirect_public_url: "example.com/".to_string(), - redirect_user_url: "example.com/users/{user_address}".to_string(), + redirect_user_url: "example.com/{{user_address}}".to_string(), ..Default::default() }, balanced_rpcs: HashMap::from([ diff --git a/web3_proxy/src/block_number.rs b/web3_proxy/src/block_number.rs index 846514e6..f28c2cf7 100644 --- a/web3_proxy/src/block_number.rs +++ b/web3_proxy/src/block_number.rs @@ -119,7 +119,7 @@ pub fn block_needed( } if let Some(x) = obj.get("blockHash") { - // TODO: check a linkedhashmap of recent hashes + // TODO: check a Cache of recent hashes // TODO: error if fromBlock or toBlock were set todo!("handle blockHash {}", x); } @@ -133,7 +133,7 @@ pub fn block_needed( return None; } "eth_getTransactionByBlockHashAndIndex" => { - // TODO: check a linkedhashmap of recent hashes + // TODO: check a Cache of recent hashes // try full nodes first. retry will use archive return None; } @@ -145,13 +145,13 @@ pub fn block_needed( return None; } "eth_getUncleByBlockHashAndIndex" => { - // TODO: check a linkedhashmap of recent hashes + // TODO: check a Cache of recent hashes // try full nodes first. retry will use archive return None; } "eth_getUncleByBlockNumberAndIndex" => 0, "eth_getUncleCountByBlockHash" => { - // TODO: check a linkedhashmap of recent hashes + // TODO: check a Cache of recent hashes // try full nodes first. retry will use archive return None; } diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index 6b9ad45b..51526093 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -71,7 +71,7 @@ pub struct AppConfig { pub response_cache_max_bytes: usize, /// the stats page url for an anonymous user. pub redirect_public_url: String, - /// the stats page url for a logged in user. it must contain "{user_id}" + /// the stats page url for a logged in user. it must contain "{user_address}" pub redirect_user_url: String, } diff --git a/web3_proxy/src/frontend/http.rs b/web3_proxy/src/frontend/http.rs index d8777c31..20cfba0e 100644 --- a/web3_proxy/src/frontend/http.rs +++ b/web3_proxy/src/frontend/http.rs @@ -20,7 +20,9 @@ pub async fn status(Extension(app): Extension>) -> impl IntoRe "balanced_rpcs": app.balanced_rpcs, "private_rpcs": app.private_rpcs, "num_active_requests": app.active_requests.load(Ordering::Acquire), - "num_pending_transactions": app.pending_transactions.len(), + // TODO: include number of items? + "pending_transactions_count": app.pending_transactions.entry_count(), + "pending_transactions_size": app.pending_transactions.weighted_size(), }); (StatusCode::OK, Json(body)) diff --git a/web3_proxy/src/frontend/rate_limit.rs b/web3_proxy/src/frontend/rate_limit.rs index bc69dba9..c3d32492 100644 --- a/web3_proxy/src/frontend/rate_limit.rs +++ b/web3_proxy/src/frontend/rate_limit.rs @@ -195,7 +195,7 @@ impl Web3ProxyApp { }; // save for the next run - self.user_cache.insert(user_key, user_data); + self.user_cache.insert(user_key, user_data).await; Ok(user_data) } @@ -209,7 +209,7 @@ impl Web3ProxyApp { None } else { // this key was active in the database recently - Some(*cached_user) + Some(cached_user) } } else { // cache miss diff --git a/web3_proxy/src/frontend/rpc_proxy_ws.rs b/web3_proxy/src/frontend/rpc_proxy_ws.rs index 7ec29d05..d3dc6375 100644 --- a/web3_proxy/src/frontend/rpc_proxy_ws.rs +++ b/web3_proxy/src/frontend/rpc_proxy_ws.rs @@ -72,7 +72,7 @@ pub async fn user_websocket_handler( // TODO: store this on the app and use register_template? let reg = Handlebars::new(); - // TODO: show the user's address, not their id (remember to update the checks for {{user_id}} in app.rs) + // TODO: show the user's address, not their id (remember to update the checks for {{user_address}}} in app.rs) // TODO: query to get the user's address. expose that instead of user_id let user_url = reg .render_template( diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index 2ed74a09..70772ce7 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -6,22 +6,20 @@ use crate::{ config::BlockAndRpc, jsonrpc::JsonRpcRequest, rpcs::synced_connections::SyncedConnections, }; use anyhow::Context; -use dashmap::{ - mapref::{entry::Entry, one::Ref}, - DashMap, -}; use derive_more::From; use ethers::prelude::{Block, TxHash, H256, U64}; use hashbrown::{HashMap, HashSet}; +use moka::future::Cache; use serde::Serialize; use serde_json::json; use std::{cmp::Ordering, fmt::Display, sync::Arc}; use tokio::sync::{broadcast, watch}; use tracing::{debug, info, trace, warn}; +// TODO: type for Hydrated Blocks with their full transactions? pub type ArcBlock = Arc>; -pub type BlockHashesMap = Arc>; +pub type BlockHashesMap = Cache; /// A block's hash and number. #[derive(Clone, Debug, Default, From, Serialize)] @@ -38,43 +36,37 @@ impl Display for BlockId { impl Web3Connections { /// add a block to our map and it's hash to our graphmap of the blockchain - pub fn save_block(&self, block: &ArcBlock, heaviest_chain: Option) -> anyhow::Result<()> { + pub async fn save_block( + &self, + block: &ArcBlock, + heaviest_chain: Option, + ) -> anyhow::Result<()> { // TODO: i think we can rearrange this function to make it faster on the hot path let block_hash = block.hash.as_ref().context("no block hash")?; let block_num = block.number.as_ref().context("no block num")?; let _block_td = block .total_difficulty .as_ref() - .context("no block total difficulty")?; + .expect("no block total difficulty"); // if self.block_hashes.contains_key(block_hash) { // // this block is already included. no need to continue // return Ok(()); // } - let mut blockchain = self.blockchain_graphmap.write(); + let mut blockchain = self.blockchain_graphmap.write().await; - // think more about heaviest_chain + // TODO: think more about heaviest_chain if heaviest_chain.unwrap_or(true) { - match self.block_numbers.entry(*block_num) { - Entry::Occupied(mut x) => { - let old_hash = x.insert(*block_hash); - - if block_hash == &old_hash { - // this block has already been saved - return Ok(()); - } - - // TODO: what should we do? - // TODO: if old_hash's number is > block_num, we need to remove more entries - warn!( - "do something with the old hash ({}) for {}? we may need to update a bunch more block numbers", old_hash, block_num - ) - } - Entry::Vacant(x) => { - x.insert(*block_hash); + // this is the only place that writes to block_numbers, and its inside a write lock on blockchain_graphmap, so i think there is no race + if let Some(old_hash) = self.block_numbers.get(block_num) { + if block_hash == &old_hash { + // this block has already been saved + return Ok(()); } } + // i think a race here isn't that big of a problem. just 2 inserts + self.block_numbers.insert(*block_num, *block_hash).await; } // if blockchain.contains_node(*block_hash) { @@ -84,12 +76,7 @@ impl Web3Connections { // } // TODO: theres a small race between contains_key and insert - if let Some(_overwritten) = self.block_hashes.insert(*block_hash, block.clone()) { - // there was a race and another thread wrote this block - // i don't think this will happen. the blockchain.conains_node above should be enough - // no need to continue because that other thread would have written (or soon will) write the - return Ok(()); - } + self.block_hashes.insert(*block_hash, block.clone()).await; // TODO: prettier log? or probably move the log somewhere else trace!(%block_hash, "new block"); @@ -117,7 +104,7 @@ impl Web3Connections { // first, try to get the hash from our cache // the cache is set last, so if its here, its everywhere if let Some(block) = self.block_hashes.get(hash) { - return Ok(block.clone()); + return Ok(block); } // block not in cache. we need to ask an rpc for it @@ -147,7 +134,7 @@ impl Web3Connections { let block = Arc::new(block); // the block was fetched using eth_getBlockByHash, so it should have all fields - self.save_block(&block, None)?; + self.save_block(&block, None).await?; Ok(block) } @@ -164,7 +151,7 @@ impl Web3Connections { /// Get the heaviest chain's block from cache or backend rpc pub async fn cannonical_block(&self, num: &U64) -> anyhow::Result { // we only have blocks by hash now - // maybe save them during save_block in a blocks_by_number DashMap> + // maybe save them during save_block in a blocks_by_number Cache> // if theres multiple, use petgraph to find the one on the main chain (and remove the others if they have enough confirmations) // be sure the requested block num exists @@ -183,7 +170,7 @@ impl Web3Connections { // try to get the hash from our cache // deref to not keep the lock open - if let Some(block_hash) = self.block_numbers.get(num).map(|x| *x) { + if let Some(block_hash) = self.block_numbers.get(num) { // TODO: sometimes this needs to fetch the block. why? i thought block_numbers would only be set if the block hash was set return self.block(&block_hash, None).await; } @@ -205,7 +192,7 @@ impl Web3Connections { let block = Arc::new(block); // the block was fetched using eth_getBlockByNumber, so it should have all fields and be on the heaviest chain - self.save_block(&block, Some(true))?; + self.save_block(&block, Some(true)).await?; Ok(block) } @@ -213,7 +200,8 @@ impl Web3Connections { pub(super) async fn process_incoming_blocks( &self, block_receiver: flume::Receiver, - // TODO: head_block_sender should be a broadcast_sender like pending_tx_sender + // TODO: document that this is a watch sender and not a broadcast! if things get busy, blocks might get missed + // Geth's subscriptions have the same potential for skipping blocks. head_block_sender: watch::Sender, pending_tx_sender: Option>, ) -> anyhow::Result<()> { @@ -262,7 +250,7 @@ impl Web3Connections { connection_heads.insert(rpc.name.to_owned(), rpc_head_hash); // we don't know if its on the heaviest chain yet - self.save_block(&rpc_head_block, Some(false))?; + self.save_block(&rpc_head_block, Some(false)).await?; Some(BlockId { hash: rpc_head_hash, @@ -283,7 +271,7 @@ impl Web3Connections { // iterate the known heads to find the highest_work_block let mut checked_heads = HashSet::new(); - let mut highest_work_block: Option> = None; + let mut highest_work_block: Option = None; for conn_head_hash in connection_heads.values() { if checked_heads.contains(conn_head_hash) { // we already checked this head from another rpc @@ -324,7 +312,7 @@ impl Web3Connections { } // clone to release the read lock on self.block_hashes - if let Some(mut maybe_head_block) = highest_work_block.map(|x| x.clone()) { + if let Some(mut maybe_head_block) = highest_work_block { // track rpcs on this heaviest chain so we can build a new SyncedConnections let mut heavy_rpcs: Vec<&Arc> = vec![]; // a running total of the soft limits covered by the heavy rpcs @@ -413,7 +401,7 @@ impl Web3Connections { None => { debug!(block=%heavy_block_id, %rpc, "first consensus head"); - self.save_block(&rpc_head_block, Some(true))?; + self.save_block(&rpc_head_block, Some(true)).await?; head_block_sender.send(heavy_block)?; } @@ -429,7 +417,7 @@ impl Web3Connections { info!(heavy=%heavy_block_id, old=%old_block_id, %rpc, "unc block"); // todo!("handle equal by updating the cannonical chain"); - self.save_block(&rpc_head_block, Some(true))?; + self.save_block(&rpc_head_block, Some(true)).await?; head_block_sender.send(heavy_block)?; } @@ -439,7 +427,7 @@ impl Web3Connections { // TODO: better log warn!(head=%heavy_block_id, %rpc, "chain rolled back"); - self.save_block(&rpc_head_block, Some(true))?; + self.save_block(&rpc_head_block, Some(true)).await?; // todo!("handle less by removing higher blocks from the cannonical chain"); head_block_sender.send(heavy_block)?; @@ -449,7 +437,7 @@ impl Web3Connections { // todo!("handle greater by adding this block to and any missing parents to the cannonical chain"); - self.save_block(&rpc_head_block, Some(true))?; + self.save_block(&rpc_head_block, Some(true)).await?; head_block_sender.send(heavy_block)?; } diff --git a/web3_proxy/src/rpcs/connection.rs b/web3_proxy/src/rpcs/connection.rs index 7ad92ae2..9675dddc 100644 --- a/web3_proxy/src/rpcs/connection.rs +++ b/web3_proxy/src/rpcs/connection.rs @@ -5,7 +5,6 @@ use super::request::{OpenRequestHandle, OpenRequestResult}; use crate::app::{flatten_handle, AnyhowJoinHandle}; use crate::config::BlockAndRpc; use anyhow::Context; -use dashmap::mapref::entry::Entry; use ethers::prelude::{Block, Bytes, Middleware, ProviderError, TxHash, H256, U64}; use futures::future::try_join_all; use futures::StreamExt; @@ -36,6 +35,7 @@ pub struct Web3Connection { hard_limit: Option, /// used for load balancing to the least loaded server pub(super) soft_limit: u32, + /// TODO: have an enum for this so that "no limit" prints pretty block_data_limit: AtomicU64, /// Lower weight are higher priority when sending requests pub(super) weight: u32, @@ -280,58 +280,42 @@ impl Web3Connection { // TODO: is unwrap_or_default ok? we might have an empty block let new_hash = new_head_block.hash.unwrap_or_default(); - let mut td_is_needed = new_head_block.total_difficulty.is_none(); - // if we already have this block saved, we don't need to store this copy - // be careful with the entry api! awaits during this are a very bad idea. - new_head_block = match block_map.entry(new_hash) { - Entry::Vacant(x) => { - // only save the block if it has a total difficulty! - if !td_is_needed { - x.insert(new_head_block).clone() - } else { - new_head_block - } - } - Entry::Occupied(x) => { - let existing_block = x.get().clone(); + // TODO: small race here + new_head_block = if let Some(existing_block) = block_map.get(&new_hash) { + // we only save blocks with a total difficulty + debug_assert!(existing_block.total_difficulty.is_some()); + existing_block + } else if new_head_block.total_difficulty.is_some() { + // this block has a total difficulty, it is safe to use + block_map.insert(new_hash, new_head_block).await; - // we only save blocks with a total difficulty - debug_assert!(existing_block.total_difficulty.is_some()); - td_is_needed = false; - - existing_block - } - }; - - if td_is_needed { + // we get instead of return new_head_block just in case there was a race + // TODO: but how bad is this race? it might be fine + block_map.get(&new_hash).expect("we just inserted") + } else { + // Cache miss and NO TOTAL DIFFICULTY! // self got the head block first. unfortunately its missing a necessary field // keep this even after https://github.com/ledgerwatch/erigon/issues/5190 is closed. // there are other clients and we might have to use a third party without the td fix. trace!(rpc=?self, ?new_hash, "total_difficulty missing"); - // todo: this can wait forever + // todo: this can wait forever! let complete_head_block: Block = self .wait_for_request_handle() .await? .request("eth_getBlockByHash", (new_hash, false)) .await?; - new_head_block = match block_map.entry(new_hash) { - Entry::Vacant(x) => { - // still vacant! self is still the leader - // now we definitely have total difficulty, so save - x.insert(Arc::new(complete_head_block)).clone() - } - Entry::Occupied(x) => { - let existing_block = x.get().clone(); + debug_assert!(complete_head_block.total_difficulty.is_some()); - // we only save blocks with a total difficulty - debug_assert!(existing_block.total_difficulty.is_some()); + block_map + .insert(new_hash, Arc::new(complete_head_block)) + .await; - existing_block - } - }; - } + // we get instead of return new_head_block just in case there was a race + // TODO: but how bad is this race? it might be fine + block_map.get(&new_hash).expect("we just inserted") + }; let new_num = new_head_block.number.unwrap_or_default(); diff --git a/web3_proxy/src/rpcs/connections.rs b/web3_proxy/src/rpcs/connections.rs index 69b9a4a8..c747d1f8 100644 --- a/web3_proxy/src/rpcs/connections.rs +++ b/web3_proxy/src/rpcs/connections.rs @@ -9,14 +9,13 @@ use crate::jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest}; use crate::rpcs::transactions::TxStatus; use arc_swap::ArcSwap; use counter::Counter; -use dashmap::DashMap; use derive_more::From; use ethers::prelude::{Block, ProviderError, TxHash, H256, U64}; use futures::future::{join_all, try_join_all}; use futures::stream::FuturesUnordered; use futures::StreamExt; use hashbrown::HashMap; -use parking_lot::RwLock; +use moka::future::Cache; use petgraph::graphmap::DiGraphMap; use serde::ser::{SerializeStruct, Serializer}; use serde::Serialize; @@ -25,10 +24,10 @@ use std::cmp; use std::cmp::Reverse; use std::fmt; use std::sync::Arc; +use tokio::sync::RwLock as AsyncRwLock; use tokio::sync::{broadcast, watch}; use tokio::task; -use tokio::time::{interval, sleep, sleep_until, MissedTickBehavior}; -use tokio::time::{Duration, Instant}; +use tokio::time::{interval, sleep, sleep_until, Duration, Instant, MissedTickBehavior}; use tracing::{error, info, instrument, trace, warn}; /// A collection of web3 connections. Sends requests either the current best server or all servers. @@ -37,15 +36,15 @@ pub struct Web3Connections { pub(super) conns: HashMap>, /// any requests will be forwarded to one (or more) of these connections pub(super) synced_connections: ArcSwap, - pub(super) pending_transactions: Arc>, + pub(super) pending_transactions: Cache, /// TODO: this map is going to grow forever unless we do some sort of pruning. maybe store pruned in redis? /// all blocks, including orphans pub(super) block_hashes: BlockHashesMap, /// blocks on the heaviest chain - pub(super) block_numbers: DashMap, + pub(super) block_numbers: Cache, /// TODO: this map is going to grow forever unless we do some sort of pruning. maybe store pruned in redis? /// TODO: what should we use for edges? - pub(super) blockchain_graphmap: RwLock>, + pub(super) blockchain_graphmap: AsyncRwLock>, pub(super) min_synced_rpcs: usize, pub(super) min_sum_soft_limit: u32, } @@ -63,7 +62,7 @@ impl Web3Connections { min_sum_soft_limit: u32, min_synced_rpcs: usize, pending_tx_sender: Option>, - pending_transactions: Arc>, + pending_transactions: Cache, ) -> anyhow::Result<(Arc, AnyhowJoinHandle<()>)> { let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded(); let (block_sender, block_receiver) = @@ -168,12 +167,16 @@ impl Web3Connections { let synced_connections = SyncedConnections::default(); + // TODO: sizing and expiration on these caches! + let block_hashes = Cache::new(10000); + let block_numbers = Cache::new(10000); + let connections = Arc::new(Self { conns: connections, synced_connections: ArcSwap::new(Arc::new(synced_connections)), pending_transactions, - block_hashes: Default::default(), - block_numbers: Default::default(), + block_hashes, + block_numbers, blockchain_graphmap: Default::default(), min_sum_soft_limit, min_synced_rpcs,