use sized Caches

This commit is contained in:
Bryan Stitt 2022-09-05 05:53:58 +00:00
parent 62d747c1b3
commit 6e12edd555
25 changed files with 148 additions and 1117 deletions

235
Cargo.lock generated

@ -659,18 +659,6 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "771fe0050b883fcc3ea2359b1a96bcfbc090b7116eae7c3c512c7a083fdf23d3"
[[package]]
name = "bstr"
version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba3569f383e8f1598449f1a423e72e99569137b47740b1da11ef19af3d5c3223"
dependencies = [
"lazy_static",
"memchr",
"regex-automata",
"serde",
]
[[package]]
name = "bumpalo"
version = "3.9.1"
@ -781,21 +769,6 @@ dependencies = [
"serde_json",
]
[[package]]
name = "cast"
version = "0.2.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4c24dab4283a142afa2fdca129b80ad2c6284e073930f964c3a1293c225ee39a"
dependencies = [
"rustc_version",
]
[[package]]
name = "cast"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5"
[[package]]
name = "cc"
version = "1.0.73"
@ -836,17 +809,6 @@ dependencies = [
"generic-array 0.14.5",
]
[[package]]
name = "clap"
version = "2.34.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a0610544180c38b88101fecf2dd634b174a62eef6946f84dfc6a7127512b381c"
dependencies = [
"bitflags",
"textwrap 0.11.0",
"unicode-width",
]
[[package]]
name = "clap"
version = "3.2.15"
@ -861,7 +823,7 @@ dependencies = [
"once_cell",
"strsim",
"termcolor",
"textwrap 0.15.0",
"textwrap",
]
[[package]]
@ -1092,42 +1054,6 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "criterion"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b01d6de93b2b6c65e17c634a26653a29d107b3c98c607c765bf38d041531cd8f"
dependencies = [
"atty",
"cast 0.3.0",
"clap 2.34.0",
"criterion-plot",
"csv",
"itertools",
"lazy_static",
"num-traits",
"oorandom",
"plotters",
"rayon",
"regex",
"serde",
"serde_cbor",
"serde_derive",
"serde_json",
"tinytemplate",
"walkdir",
]
[[package]]
name = "criterion-plot"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d00996de9f2f7559f7f4dc286073197f83e92256a59ed395f9aac01fe717da57"
dependencies = [
"cast 0.2.7",
"itertools",
]
[[package]]
name = "crossbeam-channel"
version = "0.5.6"
@ -1221,28 +1147,6 @@ dependencies = [
"typenum",
]
[[package]]
name = "csv"
version = "1.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22813a6dc45b335f9bade10bf7271dc477e81113e89eb251a0bc2a8a81c536e1"
dependencies = [
"bstr",
"csv-core",
"itoa 0.4.8",
"ryu",
"serde",
]
[[package]]
name = "csv-core"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b2466559f260f48ad25fe6317b3c8dac77b5bdb5763ac7d9d6103530663bc90"
dependencies = [
"memchr",
]
[[package]]
name = "ctor"
version = "0.1.22"
@ -1858,13 +1762,6 @@ dependencies = [
"subtle",
]
[[package]]
name = "fifomap"
version = "0.1.0"
dependencies = [
"linkedhashmap",
]
[[package]]
name = "filetime"
version = "0.2.17"
@ -2203,12 +2100,6 @@ dependencies = [
"tracing",
]
[[package]]
name = "half"
version = "1.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eabb4a44450da02c90444cf74558da904edde8fb4e9035a9a6a4e15445af0bd7"
[[package]]
name = "handlebars"
version = "4.3.3"
@ -2681,23 +2572,6 @@ version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33a33a362ce288760ec6a508b94caaec573ae7d3bbbd91b87aa0bad4456839db"
[[package]]
name = "linked-hash-map"
version = "0.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f"
[[package]]
name = "linkedhashmap"
version = "0.2.0"
dependencies = [
"criterion",
"hashbrown",
"hashlink",
"linked-hash-map",
"slab",
]
[[package]]
name = "lock_api"
version = "0.4.8"
@ -2718,15 +2592,6 @@ dependencies = [
"value-bag",
]
[[package]]
name = "mach"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b823e83b2affd8f40a9ee8c29dbc56404c1e34cd2710921f2801e2cf29527afa"
dependencies = [
"libc",
]
[[package]]
name = "matchers"
version = "0.1.0"
@ -2819,13 +2684,15 @@ version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0a89c33e91526792a0260425073c3db0b472cdca2cc6fcaa666dd6e65450462a"
dependencies = [
"async-io",
"async-lock",
"crossbeam-channel",
"crossbeam-epoch",
"crossbeam-utils",
"futures-util",
"num_cpus",
"once_cell",
"parking_lot 0.12.1",
"quanta",
"scheduled-thread-pool",
"skeptic",
"smallvec",
@ -3018,12 +2885,6 @@ version = "1.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "074864da206b4973b84eb91683020dbefd6a8c3f0f38e054d93954e891935e4e"
[[package]]
name = "oorandom"
version = "11.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575"
[[package]]
name = "opaque-debug"
version = "0.2.3"
@ -3406,34 +3267,6 @@ version = "0.3.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1df8c4ec4b0627e53bdf214615ad287367e482558cf84b109250b37464dc03ae"
[[package]]
name = "plotters"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32a3fd9ec30b9749ce28cd91f255d569591cdf937fe280c312143e3c4bad6f2a"
dependencies = [
"num-traits",
"plotters-backend",
"plotters-svg",
"wasm-bindgen",
"web-sys",
]
[[package]]
name = "plotters-backend"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d88417318da0eaf0fdcdb51a0ee6c3bed624333bff8f946733049380be67ac1c"
[[package]]
name = "plotters-svg"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "521fa9638fa597e1dc53e9412a4f9cefb01187ee1f7413076f9e6749e2885ba9"
dependencies = [
"plotters-backend",
]
[[package]]
name = "polling"
version = "2.2.0"
@ -3600,22 +3433,6 @@ dependencies = [
"unicase",
]
[[package]]
name = "quanta"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b7e31331286705f455e56cca62e0e717158474ff02b7936c1fa596d983f4ae27"
dependencies = [
"crossbeam-utils",
"libc",
"mach",
"once_cell",
"raw-cpuid",
"wasi 0.10.2+wasi-snapshot-preview1",
"web-sys",
"winapi",
]
[[package]]
name = "quote"
version = "1.0.18"
@ -3774,15 +3591,6 @@ dependencies = [
"rand_core 0.3.1",
]
[[package]]
name = "raw-cpuid"
version = "10.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6aa2540135b6a94f74c7bc90ad4b794f822026a894f3d7bcd185c100d13d4ad6"
dependencies = [
"bitflags",
]
[[package]]
name = "rayon"
version = "1.5.3"
@ -4176,7 +3984,7 @@ checksum = "8cefd2d8878bd7e8b7313f036725fa3d08585d101fb1bf3adca7fc13f553f906"
dependencies = [
"async-std",
"chrono",
"clap 3.2.15",
"clap",
"dotenv",
"regex",
"sea-schema",
@ -4205,7 +4013,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "70a28587780fbae5c414a62bf0b32405f9da2e000d94f426abf214b2b2e68631"
dependencies = [
"async-trait",
"clap 3.2.15",
"clap",
"dotenv",
"sea-orm",
"sea-orm-cli",
@ -4347,16 +4155,6 @@ dependencies = [
"serde_json",
]
[[package]]
name = "serde_cbor"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2bef2ebfde456fb76bbcf9f59315333decc4fda0b2b44b420243c11e0f5ec1f5"
dependencies = [
"half",
"serde",
]
[[package]]
name = "serde_derive"
version = "1.0.144"
@ -4792,7 +4590,7 @@ checksum = "a55e00b6f95abd889ce398bd7eab2a9c62cd27281cf1bfba70847340557434cf"
dependencies = [
"anyhow",
"cfg-if",
"clap 3.2.15",
"clap",
"console 0.14.1",
"dialoguer",
"fs2",
@ -4888,15 +4686,6 @@ dependencies = [
"winapi",
]
[[package]]
name = "textwrap"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d326610f408c7a4eb6f51c37c330e496b08506c9457c9d34287ecc38809fb060"
dependencies = [
"unicode-width",
]
[[package]]
name = "textwrap"
version = "0.15.0"
@ -4980,16 +4769,6 @@ dependencies = [
"crunchy",
]
[[package]]
name = "tinytemplate"
version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be4d6b5f19ff7664e8c98d03e2139cb510db9b0a60b55f8e8709b689d939b6bc"
dependencies = [
"serde",
"serde_json",
]
[[package]]
name = "tinyvec"
version = "1.6.0"

@ -1,8 +1,6 @@
[workspace]
members = [
"entities",
"fifomap",
"linkedhashmap",
"migration",
"redis-rate-limit",
"web3_proxy",

@ -137,11 +137,13 @@
- [ ] when using a bunch of slow public servers, i see "no servers in sync" even when things should be right
- [ ] i think checking the parents of the heaviest chain works most of the time, but not always
- maybe iterate connection heads by total weight? i still think we need to include parent hashes
- [ ] some of the DashMaps grow unbounded! Make a "SizedDashMap" that cleans up old rows with some garbage collection task
- [x] some of the DashMaps grow unbounded! Make/find a "SizedDashMap" that cleans up old rows with some garbage collection task
- moka is exactly what we need
- [ ] add size limits to all the Caches
## V1
- [ ] benchmarks of the different Cache implementations (futures vs dash)
- [ ] benchmarks from https://github.com/llamafolio/llamafolio-api/
- [ ] benchmarks from ethspam and versus
- [ ] benchmarks from other things

@ -9,7 +9,7 @@ redis_url = "redis://dev-redis:6379/"
# TODO: how do we find the optimal redis_max_connections? too high actually ends up being slower
redis_max_connections = 99
redirect_public_url = "https://llamanodes.com/free-rpc-stats"
redirect_user_url = "https://llamanodes.com/user-rpc-stats/{{user_id}}"
redirect_user_url = "https://llamanodes.com/user-rpc-stats/{{user_address}}"
public_rate_limit_per_minute = 0
# 1GB of cache
response_cache_max_bytes = 10000000000

@ -1,9 +0,0 @@
[package]
name = "fifomap"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
linkedhashmap = { path = "../linkedhashmap", features = ["inline-more"] }

@ -1,58 +0,0 @@
use linkedhashmap::LinkedHashMap;
use std::{
borrow::Borrow,
collections::hash_map::RandomState,
hash::{BuildHasher, Hash},
};
pub struct FifoCountMap<K, V, S = RandomState>
where
K: Hash + Eq + Clone,
S: BuildHasher + Default,
{
/// size limit for the map
max_count: usize,
/// FIFO
map: LinkedHashMap<K, V, S>,
}
impl<K, V, S> FifoCountMap<K, V, S>
where
K: Hash + Eq + Clone,
S: BuildHasher + Default,
{
pub fn new(max_count: usize) -> Self {
Self {
max_count,
map: Default::default(),
}
}
}
impl<K, V, S> FifoCountMap<K, V, S>
where
K: Hash + Eq + Clone,
S: BuildHasher + Default,
{
/// if the size is larger than `self.max_size_bytes`, drop items (first in, first out)
/// no item is allowed to take more than `1/max_share` of the cache
pub fn insert(&mut self, key: K, value: V) {
// drop items until the cache has enough room for the new data
// TODO: this probably has wildly variable timings
if self.map.len() > self.max_count {
// TODO: this isn't an LRU. it's a "least recently created". does that have a fancy name? should we make it an lru? these caches only live for one block
self.map.pop_front();
}
self.map.insert(key, value);
}
/// get an item from the cache, or None
pub fn get<Q>(&self, key: &Q) -> Option<&V>
where
K: Borrow<Q>,
Q: Hash + Eq,
{
self.map.get(key)
}
}

@ -1,92 +0,0 @@
use linkedhashmap::LinkedHashMap;
use std::{
borrow::Borrow,
collections::hash_map::RandomState,
hash::{BuildHasher, Hash},
mem::size_of_val,
};
// TODO: if values have wildly different sizes, this is good. but if they are all about the same, this could be simpler
pub struct FifoSizedMap<K, V, S = RandomState>
where
K: Hash + Eq + Clone,
S: BuildHasher + Default,
{
/// size limit in bytes for the map
max_size_bytes: usize,
/// size limit in bytes for a single item in the map
max_item_bytes: usize,
/// FIFO
map: LinkedHashMap<K, V, S>,
}
impl<K, V, S> FifoSizedMap<K, V, S>
where
K: Hash + Eq + Clone,
S: BuildHasher + Default,
{
pub fn new(max_size_bytes: usize, max_share: usize) -> Self {
let max_item_bytes = max_size_bytes / max_share;
Self {
max_size_bytes,
max_item_bytes,
map: Default::default(),
}
}
}
impl<K, V, S> Default for FifoSizedMap<K, V, S>
where
K: Hash + Eq + Clone,
S: BuildHasher + Default,
{
fn default() -> Self {
Self::new(
// 100 MB default cache
100_000_000,
// items cannot take more than 1% of the cache
100,
)
}
}
impl<K, V, S> FifoSizedMap<K, V, S>
where
K: Hash + Eq + Clone,
S: BuildHasher + Default,
{
/// if the size is larger than `self.max_size_bytes`, drop items (first in, first out)
/// no item is allowed to take more than `1/max_share` of the cache
pub fn insert(&mut self, key: K, value: V) -> bool {
// TODO: this might be too naive. not sure how much overhead the object has
let new_size = size_of_val(&key) + size_of_val(&value);
// no item is allowed to take more than 1% of the cache
// TODO: get this from config?
// TODO: trace logging
if new_size > self.max_item_bytes {
return false;
}
// drop items until the cache has enough room for the new data
// TODO: this probably has wildly variable timings
while size_of_val(&self.map) + new_size > self.max_size_bytes {
// TODO: this isn't an LRU. it's a "least recently created". does that have a fancy name? should we make it an lru? these caches only live for one block
self.map.pop_front();
}
self.map.insert(key, value);
true
}
/// get an item from the cache, or None
pub fn get<Q>(&self, key: &Q) -> Option<&V>
where
K: Borrow<Q>,
Q: Hash + Eq,
{
self.map.get(key)
}
}

@ -1,5 +0,0 @@
mod fifo_count_map;
mod fifo_sized_map;
pub use fifo_count_map::FifoCountMap;
pub use fifo_sized_map::FifoSizedMap;

@ -1,21 +0,0 @@
[package]
name = "linkedhashmap"
version = "0.2.0"
authors = ["quininer <quininer@live.com>"]
edition = "2021"
[features]
inline-more = [ "hashbrown" ]
[dependencies]
slab = "0.4.7"
hashbrown = { version = "0.12.3", optional = true }
[dev-dependencies]
criterion = "0.3.6"
hashlink = "0.8.0"
linked-hash-map = "0.5.6"
[[bench]]
name = "lru"
harness = false

@ -1 +0,0 @@
https://github.com/quininer/linkedhashmap

@ -1,33 +0,0 @@
# Linked HashMap
but 0-unsafe code. :)
<https://github.com/quininer/linkedhashmap>
# benchmarks
## default
```
linkedhashmap time: [88.299 ns 89.096 ns 89.886 ns]
change: [-4.3828% -2.6982% -1.0684%] (p = 0.00 < 0.05)
hashlink time: [59.497 ns 60.937 ns 62.916 ns]
change: [-3.4227% -0.9224% +1.7368%] (p = 0.51 > 0.05)
linked-hash-map time: [94.379 ns 95.305 ns 96.309 ns]
change: [-0.7721% +0.6709% +2.0113%] (p = 0.37 > 0.05)
```
## inline-more feature
```
linkedhashmap time: [59.607 ns 60.291 ns 61.013 ns]
change: [+1.4918% +3.2842% +4.9448%] (p = 0.00 < 0.05)
hashlink time: [60.300 ns 60.895 ns 61.492 ns]
change: [+2.7329% +4.4155% +6.0299%] (p = 0.00 < 0.05)
linked-hash-map time: [96.841 ns 99.359 ns 102.60 ns]
change: [+2.1387% +4.0285% +6.2305%] (p = 0.00 < 0.05)
```

@ -1,68 +0,0 @@
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use std::collections::hash_map;
struct Bar([u64; 4]);
const CAP: usize = 128;
fn bench_linkedhsahmap(c: &mut Criterion) {
use linkedhashmap::LinkedHashMap;
c.bench_function("linkedhashmap", |b| {
let mut map = LinkedHashMap::with_capacity_and_hasher(CAP, hash_map::RandomState::new());
let mut count = 0;
b.iter(|| {
count += 1;
let bar = black_box(Bar([0x42; 4]));
map.insert(count, bar);
if map.len() >= CAP {
map.pop_front();
}
});
});
}
fn bench_hashlink(c: &mut Criterion) {
use hashlink::LinkedHashMap;
c.bench_function("hashlink", |b| {
let mut map = LinkedHashMap::with_capacity_and_hasher(CAP, hash_map::RandomState::new());
let mut count = 0;
b.iter(|| {
count += 1;
let bar = black_box(Bar([0x42; 4]));
map.insert(count, bar);
if map.len() >= CAP {
map.pop_front();
}
});
});
}
fn bench_linked_hash_map(c: &mut Criterion) {
use linked_hash_map::LinkedHashMap;
c.bench_function("linked-hash-map", |b| {
let mut map = LinkedHashMap::with_capacity_and_hasher(CAP, hash_map::RandomState::new());
let mut count = 0;
b.iter(|| {
count += 1;
let bar = black_box(Bar([0x42; 4]));
map.insert(count, bar);
if map.len() >= CAP {
map.pop_front();
}
});
});
}
criterion_group!(
lru,
bench_linkedhsahmap,
bench_hashlink,
bench_linked_hash_map
);
criterion_main!(lru);

@ -1,196 +0,0 @@
pub mod linkedlist;
use linkedlist::{LinkedList, NodeSlab};
use std::borrow::Borrow;
use std::collections::hash_map::RandomState;
use std::hash::{BuildHasher, Hash};
#[cfg(feature = "hashbrown")]
use hashbrown::HashMap;
#[cfg(not(feature = "hashbrown"))]
use std::collections::HashMap;
pub struct LinkedHashMap<K, V, S = RandomState> {
slab: NodeSlab<(K, V)>,
list: LinkedList,
map: HashMap<K, usize, S>,
}
impl<K, V> LinkedHashMap<K, V>
where
K: Hash + Eq + Clone,
{
#[inline]
pub fn new() -> LinkedHashMap<K, V> {
LinkedHashMap {
slab: NodeSlab::new(),
list: LinkedList::new(),
map: HashMap::with_capacity_and_hasher(0, RandomState::default()),
}
}
#[inline]
pub fn with_capacity(cap: usize) -> LinkedHashMap<K, V> {
LinkedHashMap {
slab: NodeSlab::with_capacity(cap),
list: LinkedList::new(),
map: HashMap::with_capacity_and_hasher(cap, RandomState::default()),
}
}
}
impl<K, V, S> Default for LinkedHashMap<K, V, S>
where
K: Hash + Eq + Clone,
S: BuildHasher + Default,
{
#[inline]
fn default() -> Self {
Self::with_hasher(S::default())
}
}
impl<K, V, S> LinkedHashMap<K, V, S>
where
K: Hash + Eq + Clone,
S: BuildHasher,
{
pub fn with_hasher(hash_builder: S) -> Self {
LinkedHashMap {
slab: NodeSlab::new(),
list: LinkedList::new(),
map: HashMap::with_hasher(hash_builder),
}
}
pub fn with_capacity_and_hasher(capacity: usize, hash_builder: S) -> Self {
LinkedHashMap {
slab: NodeSlab::with_capacity(capacity),
list: LinkedList::new(),
map: HashMap::with_capacity_and_hasher(capacity, hash_builder),
}
}
#[inline]
pub fn len(&self) -> usize {
self.map.len()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.map.is_empty()
}
#[inline]
pub fn reserve(&mut self, additional: usize) {
self.slab.reserve(additional);
self.map.reserve(additional);
}
#[inline]
pub fn shrink_to_fit(&mut self) {
self.slab.shrink_to_fit();
self.map.shrink_to_fit();
}
#[inline]
pub fn clear(&mut self) {
self.slab.clear();
self.list = LinkedList::new();
self.map.clear();
}
#[cfg_attr(feature = "inline-more", inline(always))]
pub fn insert(&mut self, key: K, value: V) -> Option<V> {
let index = self.list.push(&mut self.slab, (key.clone(), value));
let index = self.map.insert(key, index)?;
let (_, value) = self.list.remove(&mut self.slab, index)?;
Some(value)
}
#[cfg_attr(feature = "inline-more", inline(always))]
pub fn get<Q>(&self, key: &Q) -> Option<&V>
where
K: Borrow<Q>,
Q: Hash + Eq,
{
let index = *self.map.get(key)?;
let (_, value) = self.slab.get(index)?;
Some(value)
}
#[cfg_attr(feature = "inline-more", inline(always))]
pub fn get_mut<Q>(&mut self, key: &Q) -> Option<&mut V>
where
K: Borrow<Q>,
Q: Hash + Eq,
{
let index = *self.map.get(key)?;
let (_, value) = self.slab.get_mut(index)?;
Some(value)
}
#[cfg_attr(feature = "inline-more", inline(always))]
pub fn touch<Q>(&mut self, key: &Q) -> Option<&mut V>
where
K: Borrow<Q>,
Q: Hash + Eq,
{
let index = *self.map.get(key)?;
let (_, value) = self.list.touch(&mut self.slab, index)?;
Some(value)
}
#[cfg_attr(feature = "inline-more", inline(always))]
pub fn remove<Q>(&mut self, key: &Q) -> Option<V>
where
K: Borrow<Q>,
Q: Hash + Eq,
{
let index = self.map.remove(key)?;
let (_, value) = self.list.remove(&mut self.slab, index)?;
Some(value)
}
#[cfg_attr(feature = "inline-more", inline(always))]
pub fn pop_front(&mut self) -> Option<(K, V)> {
let (k, v) = self.list.pop_front(&mut self.slab)?;
self.map.remove(&k)?;
Some((k, v))
}
#[cfg_attr(feature = "inline-more", inline(always))]
pub fn pop_last(&mut self) -> Option<(K, V)> {
let (k, v) = self.list.pop_last(&mut self.slab)?;
self.map.remove(&k)?;
Some((k, v))
}
}
#[test]
fn test_linkedhashmap() {
#[derive(PartialEq, Eq, Debug)]
struct Bar(u64);
let mut map = LinkedHashMap::new();
map.insert(0, Bar(0));
map.insert(3, Bar(3));
map.insert(2, Bar(2));
map.insert(1, Bar(1));
assert_eq!(4, map.len());
assert_eq!(Some(&Bar(2)), map.get(&2));
assert_eq!(Some(&mut Bar(3)), map.touch(&3));
assert_eq!(Some((0, Bar(0))), map.pop_front());
assert_eq!(Some((3, Bar(3))), map.pop_last());
assert_eq!(Some((1, Bar(1))), map.pop_last());
assert_eq!(1, map.len());
assert_eq!(Some(&mut Bar(2)), map.get_mut(&2));
map.clear();
assert_eq!(0, map.len());
}

@ -1,249 +0,0 @@
use slab::Slab;
#[derive(Default)]
pub struct NodeSlab<T>(Slab<Node<T>>);
#[derive(Default, Debug)]
pub struct LinkedList {
start: Option<usize>,
end: Option<usize>,
}
pub struct Node<T> {
value: T,
prev: Option<usize>,
next: Option<usize>,
}
impl LinkedList {
pub const fn new() -> LinkedList {
LinkedList {
start: None,
end: None,
}
}
#[cfg_attr(feature = "inline-more", inline(always))]
pub fn push<T>(&mut self, NodeSlab(slab): &mut NodeSlab<T>, value: T) -> usize {
let index = slab.insert(Node {
value,
prev: self.end.or(self.start),
next: None,
});
if let Some(old_end) = self.end.replace(index) {
slab[old_end].next = Some(index);
} else {
self.start = Some(index);
}
index
}
#[cfg_attr(feature = "inline-more", inline(always))]
pub fn pop_front<T>(&mut self, NodeSlab(slab): &mut NodeSlab<T>) -> Option<T> {
let index = self.start?;
let node = slab.remove(index);
debug_assert!(node.prev.is_none());
self.start = node.next;
if let Some(index) = self.start {
slab[index].prev.take();
}
if Some(index) == self.end {
self.end.take();
}
Some(node.value)
}
#[cfg_attr(feature = "inline-more", inline(always))]
pub fn pop_last<T>(&mut self, NodeSlab(slab): &mut NodeSlab<T>) -> Option<T> {
let index = self.end?;
let node = slab.remove(index);
debug_assert!(node.next.is_none());
self.end = node.prev;
if let Some(index) = self.end {
slab[index].next.take();
}
if Some(index) == self.start {
self.start.take();
}
Some(node.value)
}
#[cfg_attr(feature = "inline-more", inline(always))]
pub fn touch<'a, T>(
&mut self,
NodeSlab(slab): &'a mut NodeSlab<T>,
index: usize,
) -> Option<&'a mut T> {
let (node_prev, node_next) = {
let node = slab.get(index)?;
(node.prev, node.next)
};
if let Some(next) = node_next {
slab[next].prev = node_prev;
} else {
debug_assert_eq!(Some(index), self.end);
return Some(&mut slab[index].value);
}
if let Some(prev) = node_prev {
slab[prev].next = node_next;
} else {
self.start = node_next;
}
let end = self.end.replace(index)?;
slab[end].next = Some(index);
let node = &mut slab[index];
node.prev = Some(end);
node.next.take();
Some(&mut node.value)
}
#[cfg_attr(feature = "inline-more", inline(always))]
pub fn remove<T>(&mut self, NodeSlab(slab): &mut NodeSlab<T>, index: usize) -> Option<T> {
let node = if slab.contains(index) {
// why not return Option :(
slab.remove(index)
} else {
return None;
};
if let Some(prev) = node.prev {
slab[prev].next = node.next;
} else {
self.start = node.next;
}
if let Some(next) = node.next {
slab[next].prev = node.prev;
} else {
self.end = node.prev;
}
Some(node.value)
}
}
impl<T> NodeSlab<T> {
#[inline]
pub fn new() -> NodeSlab<T> {
NodeSlab(Slab::new())
}
#[inline]
pub fn with_capacity(cap: usize) -> NodeSlab<T> {
NodeSlab(Slab::with_capacity(cap))
}
#[inline]
pub fn get(&self, index: usize) -> Option<&T> {
Some(&self.0.get(index)?.value)
}
#[inline]
pub fn get_mut(&mut self, index: usize) -> Option<&mut T> {
Some(&mut self.0.get_mut(index)?.value)
}
#[inline]
pub fn reserve(&mut self, additional: usize) {
self.0.reserve(additional);
}
#[inline]
pub fn shrink_to_fit(&mut self) {
self.0.shrink_to_fit();
}
#[inline]
pub(crate) fn clear(&mut self) {
self.0.clear();
}
}
#[test]
fn test_linkedlist() {
let mut slab = NodeSlab::new();
let mut list = LinkedList::new();
list.push(&mut slab, 0);
assert_eq!(Some(0), list.pop_front(&mut slab));
assert_eq!(None, list.pop_front(&mut slab));
list.push(&mut slab, 1);
assert_eq!(Some(1), list.pop_last(&mut slab));
assert_eq!(None, list.pop_last(&mut slab));
list.push(&mut slab, 2);
list.push(&mut slab, 3);
assert_eq!(Some(2), list.pop_front(&mut slab));
assert_eq!(Some(3), list.pop_last(&mut slab));
eprintln!("{:?}", list);
assert_eq!(None, list.pop_front(&mut slab));
eprintln!("{:?}", list);
assert_eq!(None, list.pop_last(&mut slab));
list.push(&mut slab, 4);
list.push(&mut slab, 5);
assert_eq!(Some(5), list.pop_last(&mut slab));
assert_eq!(Some(4), list.pop_front(&mut slab));
assert_eq!(None, list.pop_last(&mut slab));
assert_eq!(None, list.pop_front(&mut slab));
let index6 = list.push(&mut slab, 6);
let index7 = list.push(&mut slab, 7);
let index8 = list.push(&mut slab, 8);
assert_eq!(Some(7), list.remove(&mut slab, index7));
assert_eq!(None, list.remove(&mut slab, index7));
assert_eq!(Some(&6), slab.get(index6));
assert_eq!(Some(&8), slab.get(index8));
assert_eq!(Some(6), list.pop_front(&mut slab));
assert_eq!(Some(8), list.pop_front(&mut slab));
let index9 = list.push(&mut slab, 9);
list.push(&mut slab, 10);
assert_eq!(Some(&mut 9), list.touch(&mut slab, index9));
assert_eq!(Some(10), list.pop_front(&mut slab));
assert_eq!(Some(9), list.pop_front(&mut slab));
let index11 = list.push(&mut slab, 11);
let index12 = list.push(&mut slab, 12);
list.push(&mut slab, 13);
assert_eq!(Some(&mut 12), list.touch(&mut slab, index12));
assert_eq!(Some(&mut 11), list.touch(&mut slab, index11));
assert_eq!(Some(13), list.pop_front(&mut slab));
assert_eq!(Some(12), list.pop_front(&mut slab));
assert_eq!(Some(11), list.pop_front(&mut slab));
for i in 0..32 {
list.push(&mut slab, i);
}
for i in 0..32 {
assert_eq!(Some(i), list.pop_front(&mut slab));
}
assert_eq!(None, list.pop_front(&mut slab));
for i in 0..32 {
list.push(&mut slab, i);
}
for i in (0..32).rev() {
assert_eq!(Some(i), list.pop_last(&mut slab));
}
assert_eq!(None, list.pop_last(&mut slab));
}

@ -34,7 +34,7 @@ flume = "0.10.14"
futures = { version = "0.3.24", features = ["thread-pool"] }
hashbrown = { version = "0.12.3", features = ["serde"] }
http = "0.2.8"
moka = "0.9.3"
moka = { version = "0.9.3", default-features = false, features = ["future"] }
notify = "5.0.0"
num = "0.4.0"
parking_lot = { version = "0.12.1", features = ["arc_lock"] }

@ -12,7 +12,6 @@ use crate::rpcs::transactions::TxStatus;
use crate::stats::AppStats;
use anyhow::Context;
use axum::extract::ws::Message;
use dashmap::DashMap;
use derive_more::From;
use ethers::core::utils::keccak256;
use ethers::prelude::{Address, Block, Bytes, TxHash, H256, U64};
@ -22,6 +21,7 @@ use futures::stream::FuturesUnordered;
use futures::stream::StreamExt;
use futures::Future;
use migration::{Migrator, MigratorTrait};
use moka::future::Cache;
use redis_rate_limit::bb8::PooledConnection;
use redis_rate_limit::{
bb8::{self, ErrorSink},
@ -39,7 +39,7 @@ use tokio::sync::{broadcast, watch};
use tokio::task::JoinHandle;
use tokio::time::{timeout, Instant};
use tokio_stream::wrappers::{BroadcastStream, WatchStream};
use tracing::{debug, info, info_span, instrument, trace, warn, Instrument};
use tracing::{debug, error, info, info_span, instrument, trace, warn, Instrument};
use uuid::Uuid;
// TODO: make this customizable?
@ -54,8 +54,7 @@ static APP_USER_AGENT: &str = concat!(
// TODO: better name
type ResponseCacheKey = (H256, String, Option<String>);
// TODO!! a RWLock on this made us super slow. But a DashMap makes this grow unbounded!
type ResponseCache = DashMap<ResponseCacheKey, JsonRpcForwardedResponse>;
type ResponseCache = Cache<ResponseCacheKey, JsonRpcForwardedResponse>;
pub type AnyhowJoinHandle<T> = JoinHandle<anyhow::Result<T>>;
@ -79,15 +78,16 @@ pub struct Web3ProxyApp {
// TODO: broadcast channel instead?
head_block_receiver: watch::Receiver<ArcBlock>,
pending_tx_sender: broadcast::Sender<TxStatus>,
/// TODO: this doesn't ever get incremented!
pub active_requests: AtomicUsize,
pub config: AppConfig,
pub db_conn: Option<sea_orm::DatabaseConnection>,
pub pending_transactions: Arc<DashMap<TxHash, TxStatus>>,
/// store pending transactions that we've seen so that we don't send duplicates to subscribers
pub pending_transactions: Cache<TxHash, TxStatus>,
pub rate_limiter: Option<RedisRateLimit>,
pub redis_pool: Option<RedisPool>,
pub stats: AppStats,
// TODO: this grows unbounded! Make a "SizedDashMap" that cleans up old rows with some garbage collection task
pub user_cache: DashMap<Uuid, UserCacheValue>,
pub user_cache: Cache<Uuid, UserCacheValue>,
}
/// flatten a JoinError into an anyhow error
@ -151,9 +151,13 @@ impl Web3ProxyApp {
Pin<Box<dyn Future<Output = anyhow::Result<()>>>>,
)> {
// safety checks on the config
debug!("redirect_user_url: {}", top_config.app.redirect_user_url);
assert!(
top_config.app.redirect_user_url.contains("{{user_id}}"),
"redirect user url must contain \"{{user_id}}\""
top_config
.app
.redirect_user_url
.contains("{{user_address}}"),
"redirect user url must contain \"{{user_address}}\""
);
// first, we connect to mysql and make sure the latest migrations have run
@ -235,15 +239,17 @@ impl Web3ProxyApp {
// TODO: use this? it could listen for confirmed transactions and then clear pending_transactions, but the head_block_sender is doing that
drop(pending_tx_receiver);
// TODO: this will grow unbounded!! add some expiration to this. and probably move to redis
let pending_transactions = Arc::new(DashMap::new());
// TODO: sized and timed expiration!
// TODO: put some in Redis, too?
let pending_transactions = Cache::new(10000);
// TODO: don't drop the pending_tx_receiver. instead, read it to mark transactions as "seen". once seen, we won't re-send them
// TODO: once a transaction is "Confirmed" we remove it from the map. this should prevent major memory leaks.
// TODO: we should still have some sort of expiration or maximum size limit for the map
// this block map is shared between balanced_rpcs and private_rpcs.
let block_map = BlockHashesMap::default();
// TODO: what limits should we have for expiration?
let block_map = BlockHashesMap::new(10_000);
let (balanced_rpcs, balanced_handle) = Web3Connections::spawn(
top_config.app.chain_id,
@ -300,12 +306,16 @@ impl Web3ProxyApp {
)
});
// TODO: change this to a sized cache
let response_cache = Cache::new(1_000);
let user_cache = Cache::new(10_000);
let app = Self {
config: top_config.app,
balanced_rpcs,
private_rpcs,
active_requests: Default::default(),
response_cache: Default::default(),
response_cache,
head_block_receiver,
pending_tx_sender,
pending_transactions,
@ -313,9 +323,7 @@ impl Web3ProxyApp {
db_conn,
redis_pool,
stats: app_stats,
// TODO: make the size configurable
// TODO: better type for this?
user_cache: Default::default(),
user_cache,
};
let app = Arc::new(app);
@ -613,7 +621,7 @@ impl Web3ProxyApp {
trace!(?request.method, "cache hit!");
// TODO: can we make references work? maybe put them in an Arc?
return Ok(Ok(response.to_owned()));
return Ok(Ok(response));
}
// TODO: another lock here so that we don't send the same request to a backend more than onces xzs
@ -638,7 +646,8 @@ impl Web3ProxyApp {
let span = info_span!("rpc_request");
// let _enter = span.enter(); // DO NOT ENTER! we can't use enter across awaits! (clippy lint soon)
let partial_response: serde_json::Value = match request.method.as_ref() {
// TODO: don't clone
let partial_response: serde_json::Value = match request.method.clone().as_ref() {
// lots of commands are blocked
"admin_addPeer"
| "admin_datadir"
@ -819,47 +828,45 @@ impl Web3ProxyApp {
trace!(?min_block_needed, ?method);
let cached_response_result =
self.cached_response(min_block_needed, &request).await?;
// TODO: emit a stat on error. maybe with .map_err?
let cache_key = match cached_response_result {
let cache_key = match self.cached_response(min_block_needed, &request).await? {
Ok(mut cache_result) => {
// we got a cache hit! no need to do any backend requests.
// put our request id on the cached response
// TODO: maybe only cache the inner result?
// TODO: maybe only cache the inner result? then have a JsonRpcForwardedResponse::from_cache
cache_result.id = request.id;
// emit a stat
return Ok(cache_result);
}
Err(cache_key) => cache_key,
};
let response = match method {
"temporarily disabled" => {
// "eth_getTransactionByHash" | "eth_getTransactionReceipt" => {
// TODO: try_send_all serially with retries instead of parallel
self.private_rpcs
.try_send_all_upstream_servers(request, min_block_needed)
.await?
}
_ => {
// TODO: retries?
self.balanced_rpcs
.try_send_best_upstream_server(request, min_block_needed)
.await?
}
};
// TODO: move this caching outside this match and cache some of the other responses?
// TODO: cache the warp::reply to save us serializing every time?
if self
let response = self
.response_cache
.insert(cache_key.clone(), response.clone())
.is_some()
{
// TODO: we had another lock to prevent this, but its not worth the extra locking
debug!("already cached")
}
.try_get_with(cache_key, async move {
match method {
"temporarily disabled" => {
// "eth_getTransactionByHash" | "eth_getTransactionReceipt" => {
// TODO: try_send_all serially with retries instead of parallel
self.private_rpcs
.try_send_all_upstream_servers(request, min_block_needed)
.await
}
_ => {
// TODO: retries?
self.balanced_rpcs
.try_send_best_upstream_server(request, min_block_needed)
.await
}
}
})
.await
.unwrap();
return Ok(response);
}

@ -215,10 +215,10 @@ mod tests {
default_requests_per_minute: 6_000_000,
min_sum_soft_limit: 1,
min_synced_rpcs: 1,
public_rate_limit_per_minute: 0,
public_rate_limit_per_minute: 6_000_000,
response_cache_max_bytes: 10_usize.pow(7),
redirect_public_url: "example.com/".to_string(),
redirect_user_url: "example.com/users/{user_address}".to_string(),
redirect_user_url: "example.com/{{user_address}}".to_string(),
..Default::default()
},
balanced_rpcs: HashMap::from([

@ -119,7 +119,7 @@ pub fn block_needed(
}
if let Some(x) = obj.get("blockHash") {
// TODO: check a linkedhashmap of recent hashes
// TODO: check a Cache of recent hashes
// TODO: error if fromBlock or toBlock were set
todo!("handle blockHash {}", x);
}
@ -133,7 +133,7 @@ pub fn block_needed(
return None;
}
"eth_getTransactionByBlockHashAndIndex" => {
// TODO: check a linkedhashmap of recent hashes
// TODO: check a Cache of recent hashes
// try full nodes first. retry will use archive
return None;
}
@ -145,13 +145,13 @@ pub fn block_needed(
return None;
}
"eth_getUncleByBlockHashAndIndex" => {
// TODO: check a linkedhashmap of recent hashes
// TODO: check a Cache of recent hashes
// try full nodes first. retry will use archive
return None;
}
"eth_getUncleByBlockNumberAndIndex" => 0,
"eth_getUncleCountByBlockHash" => {
// TODO: check a linkedhashmap of recent hashes
// TODO: check a Cache of recent hashes
// try full nodes first. retry will use archive
return None;
}

@ -71,7 +71,7 @@ pub struct AppConfig {
pub response_cache_max_bytes: usize,
/// the stats page url for an anonymous user.
pub redirect_public_url: String,
/// the stats page url for a logged in user. it must contain "{user_id}"
/// the stats page url for a logged in user. it must contain "{user_address}"
pub redirect_user_url: String,
}

@ -20,7 +20,9 @@ pub async fn status(Extension(app): Extension<Arc<Web3ProxyApp>>) -> impl IntoRe
"balanced_rpcs": app.balanced_rpcs,
"private_rpcs": app.private_rpcs,
"num_active_requests": app.active_requests.load(Ordering::Acquire),
"num_pending_transactions": app.pending_transactions.len(),
// TODO: include number of items?
"pending_transactions_count": app.pending_transactions.entry_count(),
"pending_transactions_size": app.pending_transactions.weighted_size(),
});
(StatusCode::OK, Json(body))

@ -195,7 +195,7 @@ impl Web3ProxyApp {
};
// save for the next run
self.user_cache.insert(user_key, user_data);
self.user_cache.insert(user_key, user_data).await;
Ok(user_data)
}
@ -209,7 +209,7 @@ impl Web3ProxyApp {
None
} else {
// this key was active in the database recently
Some(*cached_user)
Some(cached_user)
}
} else {
// cache miss

@ -72,7 +72,7 @@ pub async fn user_websocket_handler(
// TODO: store this on the app and use register_template?
let reg = Handlebars::new();
// TODO: show the user's address, not their id (remember to update the checks for {{user_id}} in app.rs)
// TODO: show the user's address, not their id (remember to update the checks for {{user_address}}} in app.rs)
// TODO: query to get the user's address. expose that instead of user_id
let user_url = reg
.render_template(

@ -6,22 +6,20 @@ use crate::{
config::BlockAndRpc, jsonrpc::JsonRpcRequest, rpcs::synced_connections::SyncedConnections,
};
use anyhow::Context;
use dashmap::{
mapref::{entry::Entry, one::Ref},
DashMap,
};
use derive_more::From;
use ethers::prelude::{Block, TxHash, H256, U64};
use hashbrown::{HashMap, HashSet};
use moka::future::Cache;
use serde::Serialize;
use serde_json::json;
use std::{cmp::Ordering, fmt::Display, sync::Arc};
use tokio::sync::{broadcast, watch};
use tracing::{debug, info, trace, warn};
// TODO: type for Hydrated Blocks with their full transactions?
pub type ArcBlock = Arc<Block<TxHash>>;
pub type BlockHashesMap = Arc<DashMap<H256, ArcBlock>>;
pub type BlockHashesMap = Cache<H256, ArcBlock>;
/// A block's hash and number.
#[derive(Clone, Debug, Default, From, Serialize)]
@ -38,43 +36,37 @@ impl Display for BlockId {
impl Web3Connections {
/// add a block to our map and it's hash to our graphmap of the blockchain
pub fn save_block(&self, block: &ArcBlock, heaviest_chain: Option<bool>) -> anyhow::Result<()> {
pub async fn save_block(
&self,
block: &ArcBlock,
heaviest_chain: Option<bool>,
) -> anyhow::Result<()> {
// TODO: i think we can rearrange this function to make it faster on the hot path
let block_hash = block.hash.as_ref().context("no block hash")?;
let block_num = block.number.as_ref().context("no block num")?;
let _block_td = block
.total_difficulty
.as_ref()
.context("no block total difficulty")?;
.expect("no block total difficulty");
// if self.block_hashes.contains_key(block_hash) {
// // this block is already included. no need to continue
// return Ok(());
// }
let mut blockchain = self.blockchain_graphmap.write();
let mut blockchain = self.blockchain_graphmap.write().await;
// think more about heaviest_chain
// TODO: think more about heaviest_chain
if heaviest_chain.unwrap_or(true) {
match self.block_numbers.entry(*block_num) {
Entry::Occupied(mut x) => {
let old_hash = x.insert(*block_hash);
if block_hash == &old_hash {
// this block has already been saved
return Ok(());
}
// TODO: what should we do?
// TODO: if old_hash's number is > block_num, we need to remove more entries
warn!(
"do something with the old hash ({}) for {}? we may need to update a bunch more block numbers", old_hash, block_num
)
}
Entry::Vacant(x) => {
x.insert(*block_hash);
// this is the only place that writes to block_numbers, and its inside a write lock on blockchain_graphmap, so i think there is no race
if let Some(old_hash) = self.block_numbers.get(block_num) {
if block_hash == &old_hash {
// this block has already been saved
return Ok(());
}
}
// i think a race here isn't that big of a problem. just 2 inserts
self.block_numbers.insert(*block_num, *block_hash).await;
}
// if blockchain.contains_node(*block_hash) {
@ -84,12 +76,7 @@ impl Web3Connections {
// }
// TODO: theres a small race between contains_key and insert
if let Some(_overwritten) = self.block_hashes.insert(*block_hash, block.clone()) {
// there was a race and another thread wrote this block
// i don't think this will happen. the blockchain.conains_node above should be enough
// no need to continue because that other thread would have written (or soon will) write the
return Ok(());
}
self.block_hashes.insert(*block_hash, block.clone()).await;
// TODO: prettier log? or probably move the log somewhere else
trace!(%block_hash, "new block");
@ -117,7 +104,7 @@ impl Web3Connections {
// first, try to get the hash from our cache
// the cache is set last, so if its here, its everywhere
if let Some(block) = self.block_hashes.get(hash) {
return Ok(block.clone());
return Ok(block);
}
// block not in cache. we need to ask an rpc for it
@ -147,7 +134,7 @@ impl Web3Connections {
let block = Arc::new(block);
// the block was fetched using eth_getBlockByHash, so it should have all fields
self.save_block(&block, None)?;
self.save_block(&block, None).await?;
Ok(block)
}
@ -164,7 +151,7 @@ impl Web3Connections {
/// Get the heaviest chain's block from cache or backend rpc
pub async fn cannonical_block(&self, num: &U64) -> anyhow::Result<ArcBlock> {
// we only have blocks by hash now
// maybe save them during save_block in a blocks_by_number DashMap<U64, Vec<ArcBlock>>
// maybe save them during save_block in a blocks_by_number Cache<U64, Vec<ArcBlock>>
// if theres multiple, use petgraph to find the one on the main chain (and remove the others if they have enough confirmations)
// be sure the requested block num exists
@ -183,7 +170,7 @@ impl Web3Connections {
// try to get the hash from our cache
// deref to not keep the lock open
if let Some(block_hash) = self.block_numbers.get(num).map(|x| *x) {
if let Some(block_hash) = self.block_numbers.get(num) {
// TODO: sometimes this needs to fetch the block. why? i thought block_numbers would only be set if the block hash was set
return self.block(&block_hash, None).await;
}
@ -205,7 +192,7 @@ impl Web3Connections {
let block = Arc::new(block);
// the block was fetched using eth_getBlockByNumber, so it should have all fields and be on the heaviest chain
self.save_block(&block, Some(true))?;
self.save_block(&block, Some(true)).await?;
Ok(block)
}
@ -213,7 +200,8 @@ impl Web3Connections {
pub(super) async fn process_incoming_blocks(
&self,
block_receiver: flume::Receiver<BlockAndRpc>,
// TODO: head_block_sender should be a broadcast_sender like pending_tx_sender
// TODO: document that this is a watch sender and not a broadcast! if things get busy, blocks might get missed
// Geth's subscriptions have the same potential for skipping blocks.
head_block_sender: watch::Sender<ArcBlock>,
pending_tx_sender: Option<broadcast::Sender<TxStatus>>,
) -> anyhow::Result<()> {
@ -262,7 +250,7 @@ impl Web3Connections {
connection_heads.insert(rpc.name.to_owned(), rpc_head_hash);
// we don't know if its on the heaviest chain yet
self.save_block(&rpc_head_block, Some(false))?;
self.save_block(&rpc_head_block, Some(false)).await?;
Some(BlockId {
hash: rpc_head_hash,
@ -283,7 +271,7 @@ impl Web3Connections {
// iterate the known heads to find the highest_work_block
let mut checked_heads = HashSet::new();
let mut highest_work_block: Option<Ref<H256, ArcBlock>> = None;
let mut highest_work_block: Option<ArcBlock> = None;
for conn_head_hash in connection_heads.values() {
if checked_heads.contains(conn_head_hash) {
// we already checked this head from another rpc
@ -324,7 +312,7 @@ impl Web3Connections {
}
// clone to release the read lock on self.block_hashes
if let Some(mut maybe_head_block) = highest_work_block.map(|x| x.clone()) {
if let Some(mut maybe_head_block) = highest_work_block {
// track rpcs on this heaviest chain so we can build a new SyncedConnections
let mut heavy_rpcs: Vec<&Arc<Web3Connection>> = vec![];
// a running total of the soft limits covered by the heavy rpcs
@ -413,7 +401,7 @@ impl Web3Connections {
None => {
debug!(block=%heavy_block_id, %rpc, "first consensus head");
self.save_block(&rpc_head_block, Some(true))?;
self.save_block(&rpc_head_block, Some(true)).await?;
head_block_sender.send(heavy_block)?;
}
@ -429,7 +417,7 @@ impl Web3Connections {
info!(heavy=%heavy_block_id, old=%old_block_id, %rpc, "unc block");
// todo!("handle equal by updating the cannonical chain");
self.save_block(&rpc_head_block, Some(true))?;
self.save_block(&rpc_head_block, Some(true)).await?;
head_block_sender.send(heavy_block)?;
}
@ -439,7 +427,7 @@ impl Web3Connections {
// TODO: better log
warn!(head=%heavy_block_id, %rpc, "chain rolled back");
self.save_block(&rpc_head_block, Some(true))?;
self.save_block(&rpc_head_block, Some(true)).await?;
// todo!("handle less by removing higher blocks from the cannonical chain");
head_block_sender.send(heavy_block)?;
@ -449,7 +437,7 @@ impl Web3Connections {
// todo!("handle greater by adding this block to and any missing parents to the cannonical chain");
self.save_block(&rpc_head_block, Some(true))?;
self.save_block(&rpc_head_block, Some(true)).await?;
head_block_sender.send(heavy_block)?;
}

@ -5,7 +5,6 @@ use super::request::{OpenRequestHandle, OpenRequestResult};
use crate::app::{flatten_handle, AnyhowJoinHandle};
use crate::config::BlockAndRpc;
use anyhow::Context;
use dashmap::mapref::entry::Entry;
use ethers::prelude::{Block, Bytes, Middleware, ProviderError, TxHash, H256, U64};
use futures::future::try_join_all;
use futures::StreamExt;
@ -36,6 +35,7 @@ pub struct Web3Connection {
hard_limit: Option<RedisRateLimit>,
/// used for load balancing to the least loaded server
pub(super) soft_limit: u32,
/// TODO: have an enum for this so that "no limit" prints pretty
block_data_limit: AtomicU64,
/// Lower weight are higher priority when sending requests
pub(super) weight: u32,
@ -280,58 +280,42 @@ impl Web3Connection {
// TODO: is unwrap_or_default ok? we might have an empty block
let new_hash = new_head_block.hash.unwrap_or_default();
let mut td_is_needed = new_head_block.total_difficulty.is_none();
// if we already have this block saved, we don't need to store this copy
// be careful with the entry api! awaits during this are a very bad idea.
new_head_block = match block_map.entry(new_hash) {
Entry::Vacant(x) => {
// only save the block if it has a total difficulty!
if !td_is_needed {
x.insert(new_head_block).clone()
} else {
new_head_block
}
}
Entry::Occupied(x) => {
let existing_block = x.get().clone();
// TODO: small race here
new_head_block = if let Some(existing_block) = block_map.get(&new_hash) {
// we only save blocks with a total difficulty
debug_assert!(existing_block.total_difficulty.is_some());
existing_block
} else if new_head_block.total_difficulty.is_some() {
// this block has a total difficulty, it is safe to use
block_map.insert(new_hash, new_head_block).await;
// we only save blocks with a total difficulty
debug_assert!(existing_block.total_difficulty.is_some());
td_is_needed = false;
existing_block
}
};
if td_is_needed {
// we get instead of return new_head_block just in case there was a race
// TODO: but how bad is this race? it might be fine
block_map.get(&new_hash).expect("we just inserted")
} else {
// Cache miss and NO TOTAL DIFFICULTY!
// self got the head block first. unfortunately its missing a necessary field
// keep this even after https://github.com/ledgerwatch/erigon/issues/5190 is closed.
// there are other clients and we might have to use a third party without the td fix.
trace!(rpc=?self, ?new_hash, "total_difficulty missing");
// todo: this can wait forever
// todo: this can wait forever!
let complete_head_block: Block<TxHash> = self
.wait_for_request_handle()
.await?
.request("eth_getBlockByHash", (new_hash, false))
.await?;
new_head_block = match block_map.entry(new_hash) {
Entry::Vacant(x) => {
// still vacant! self is still the leader
// now we definitely have total difficulty, so save
x.insert(Arc::new(complete_head_block)).clone()
}
Entry::Occupied(x) => {
let existing_block = x.get().clone();
debug_assert!(complete_head_block.total_difficulty.is_some());
// we only save blocks with a total difficulty
debug_assert!(existing_block.total_difficulty.is_some());
block_map
.insert(new_hash, Arc::new(complete_head_block))
.await;
existing_block
}
};
}
// we get instead of return new_head_block just in case there was a race
// TODO: but how bad is this race? it might be fine
block_map.get(&new_hash).expect("we just inserted")
};
let new_num = new_head_block.number.unwrap_or_default();

@ -9,14 +9,13 @@ use crate::jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest};
use crate::rpcs::transactions::TxStatus;
use arc_swap::ArcSwap;
use counter::Counter;
use dashmap::DashMap;
use derive_more::From;
use ethers::prelude::{Block, ProviderError, TxHash, H256, U64};
use futures::future::{join_all, try_join_all};
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use hashbrown::HashMap;
use parking_lot::RwLock;
use moka::future::Cache;
use petgraph::graphmap::DiGraphMap;
use serde::ser::{SerializeStruct, Serializer};
use serde::Serialize;
@ -25,10 +24,10 @@ use std::cmp;
use std::cmp::Reverse;
use std::fmt;
use std::sync::Arc;
use tokio::sync::RwLock as AsyncRwLock;
use tokio::sync::{broadcast, watch};
use tokio::task;
use tokio::time::{interval, sleep, sleep_until, MissedTickBehavior};
use tokio::time::{Duration, Instant};
use tokio::time::{interval, sleep, sleep_until, Duration, Instant, MissedTickBehavior};
use tracing::{error, info, instrument, trace, warn};
/// A collection of web3 connections. Sends requests either the current best server or all servers.
@ -37,15 +36,15 @@ pub struct Web3Connections {
pub(super) conns: HashMap<String, Arc<Web3Connection>>,
/// any requests will be forwarded to one (or more) of these connections
pub(super) synced_connections: ArcSwap<SyncedConnections>,
pub(super) pending_transactions: Arc<DashMap<TxHash, TxStatus>>,
pub(super) pending_transactions: Cache<TxHash, TxStatus>,
/// TODO: this map is going to grow forever unless we do some sort of pruning. maybe store pruned in redis?
/// all blocks, including orphans
pub(super) block_hashes: BlockHashesMap,
/// blocks on the heaviest chain
pub(super) block_numbers: DashMap<U64, H256>,
pub(super) block_numbers: Cache<U64, H256>,
/// TODO: this map is going to grow forever unless we do some sort of pruning. maybe store pruned in redis?
/// TODO: what should we use for edges?
pub(super) blockchain_graphmap: RwLock<DiGraphMap<H256, u32>>,
pub(super) blockchain_graphmap: AsyncRwLock<DiGraphMap<H256, u32>>,
pub(super) min_synced_rpcs: usize,
pub(super) min_sum_soft_limit: u32,
}
@ -63,7 +62,7 @@ impl Web3Connections {
min_sum_soft_limit: u32,
min_synced_rpcs: usize,
pending_tx_sender: Option<broadcast::Sender<TxStatus>>,
pending_transactions: Arc<DashMap<TxHash, TxStatus>>,
pending_transactions: Cache<TxHash, TxStatus>,
) -> anyhow::Result<(Arc<Self>, AnyhowJoinHandle<()>)> {
let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded();
let (block_sender, block_receiver) =
@ -168,12 +167,16 @@ impl Web3Connections {
let synced_connections = SyncedConnections::default();
// TODO: sizing and expiration on these caches!
let block_hashes = Cache::new(10000);
let block_numbers = Cache::new(10000);
let connections = Arc::new(Self {
conns: connections,
synced_connections: ArcSwap::new(Arc::new(synced_connections)),
pending_transactions,
block_hashes: Default::default(),
block_numbers: Default::default(),
block_hashes,
block_numbers,
blockchain_graphmap: Default::default(),
min_sum_soft_limit,
min_synced_rpcs,