replace all moka with quick_cache_ttl

This commit is contained in:
Bryan Stitt 2023-05-18 13:34:22 -07:00
parent 24439c5143
commit b61675e928
17 changed files with 184 additions and 338 deletions

193
Cargo.lock generated

@ -164,35 +164,6 @@ dependencies = [
"term",
]
[[package]]
name = "async-io"
version = "1.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fc5b45d93ef0529756f812ca52e44c221b35341892d3dcc34132ac02f3dd2af"
dependencies = [
"async-lock",
"autocfg",
"cfg-if",
"concurrent-queue",
"futures-lite",
"log",
"parking",
"polling",
"rustix",
"slab",
"socket2",
"waker-fn",
]
[[package]]
name = "async-lock"
version = "2.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fa24f727524730b077666307f2734b4a1a1c57acb79193127dcc8914d5242dd7"
dependencies = [
"event-listener",
]
[[package]]
name = "async-stream"
version = "0.3.5"
@ -593,12 +564,6 @@ dependencies = [
"syn 1.0.109",
]
[[package]]
name = "bytecount"
version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2c676a478f63e9fa2dd5368a42f28bba0d6c560b775f38583c8bbaa7fcd67c9c"
[[package]]
name = "byteorder"
version = "1.4.3"
@ -653,19 +618,6 @@ dependencies = [
"serde",
]
[[package]]
name = "cargo_metadata"
version = "0.14.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4acbb09d9ee8e23699b9634375c72795d095bf268439da88562cf9b501f181fa"
dependencies = [
"camino",
"cargo-platform",
"semver",
"serde",
"serde_json",
]
[[package]]
name = "cargo_metadata"
version = "0.15.4"
@ -901,15 +853,6 @@ dependencies = [
"tokio-util",
]
[[package]]
name = "concurrent-queue"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "62ec6771ecfa0762d24683ee5a32ad78487a3d3afdc0fb8cae19d2c5deb50b7c"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "console"
version = "0.14.1"
@ -1626,15 +1569,6 @@ dependencies = [
"libc",
]
[[package]]
name = "error-chain"
version = "0.12.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2d2f06b9cac1506ece98fe3231e3cc9c4410ec3d5b1f24ae1c8946f0742cdefc"
dependencies = [
"version_check",
]
[[package]]
name = "eth-keystore"
version = "0.5.0"
@ -1803,7 +1737,7 @@ checksum = "198ea9efa8480fa69f73d31d41b1601dace13d053c6fe4be6f5878d9dfcf0108"
dependencies = [
"arrayvec",
"bytes",
"cargo_metadata 0.15.4",
"cargo_metadata",
"chrono",
"elliptic-curve 0.13.4",
"ethabi",
@ -2204,21 +2138,6 @@ version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964"
[[package]]
name = "futures-lite"
version = "1.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49a9d51ce47660b1e808d3c990b4709f2f415d928835a17dfd16991515c46bce"
dependencies = [
"fastrand",
"futures-core",
"futures-io",
"memchr",
"parking",
"pin-project-lite",
"waker-fn",
]
[[package]]
name = "futures-locks"
version = "0.7.1"
@ -3250,31 +3169,6 @@ dependencies = [
"winapi",
]
[[package]]
name = "moka"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "934030d03f6191edbb4ba16835ccdb80d560788ac686570a8e2986a0fb59ded8"
dependencies = [
"async-io",
"async-lock",
"crossbeam-channel",
"crossbeam-epoch",
"crossbeam-utils",
"futures-util",
"num_cpus",
"once_cell",
"parking_lot 0.12.1",
"rustc_version",
"scheduled-thread-pool",
"skeptic",
"smallvec",
"tagptr",
"thiserror",
"triomphe",
"uuid 1.3.2",
]
[[package]]
name = "nanorand"
version = "0.7.0"
@ -3704,12 +3598,6 @@ dependencies = [
"syn 1.0.109",
]
[[package]]
name = "parking"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "14f2252c834a40ed9bb5422029649578e63aa341ac401f74e719dd1afda8394e"
[[package]]
name = "parking_lot"
version = "0.11.2"
@ -4016,22 +3904,6 @@ version = "0.3.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26072860ba924cbfa98ea39c8c19b4dd6a4a25423dbdf219c1eca91aa0cf6964"
[[package]]
name = "polling"
version = "2.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4b2d323e8ca7996b3e23126511a523f7e62924d93ecd5ae73b333815b0eb3dce"
dependencies = [
"autocfg",
"bitflags",
"cfg-if",
"concurrent-queue",
"libc",
"log",
"pin-project-lite",
"windows-sys 0.48.0",
]
[[package]]
name = "ppv-lite86"
version = "0.2.17"
@ -4203,17 +4075,6 @@ dependencies = [
"syn 1.0.109",
]
[[package]]
name = "pulldown-cmark"
version = "0.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2d9cc634bc78768157b5cbfe988ffcd1dcba95cd2b2f03a88316c08c6d00ed63"
dependencies = [
"bitflags",
"memchr",
"unicase",
]
[[package]]
name = "quick_cache"
version = "0.3.0"
@ -4777,15 +4638,6 @@ dependencies = [
"windows-sys 0.42.0",
]
[[package]]
name = "scheduled-thread-pool"
version = "0.2.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3cbc66816425a074528352f5789333ecff06ca41b36b0b0efdfbb29edc391a19"
dependencies = [
"parking_lot 0.12.1",
]
[[package]]
name = "scoped-tls"
version = "1.0.1"
@ -5400,21 +5252,6 @@ dependencies = [
"time 0.3.21",
]
[[package]]
name = "skeptic"
version = "0.13.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "16d23b015676c90a0f01c197bfdc786c20342c73a0afdda9025adb0bc42940a8"
dependencies = [
"bytecount",
"cargo_metadata 0.14.2",
"error-chain",
"glob",
"pulldown-cmark",
"tempfile",
"walkdir",
]
[[package]]
name = "slab"
version = "0.4.8"
@ -5754,12 +5591,6 @@ version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160"
[[package]]
name = "tagptr"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417"
[[package]]
name = "tap"
version = "1.0.1"
@ -6310,12 +6141,6 @@ dependencies = [
"tracing-log",
]
[[package]]
name = "triomphe"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f1ee9bd9239c339d714d657fac840c6d2a4f9c45f4f9ec7b0975113458be78db"
[[package]]
name = "try-lock"
version = "0.2.4"
@ -6400,15 +6225,6 @@ dependencies = [
"libc",
]
[[package]]
name = "unicase"
version = "2.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "50f37be617794602aabbeee0be4f259dc1778fabe05e2d67ee8f79326d5cb4f6"
dependencies = [
"version_check",
]
[[package]]
name = "unicode-bidi"
version = "0.3.13"
@ -6531,12 +6347,6 @@ version = "0.9.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f"
[[package]]
name = "waker-fn"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca"
[[package]]
name = "walkdir"
version = "2.3.3"
@ -6699,7 +6509,6 @@ dependencies = [
"log",
"migration",
"mimalloc",
"moka",
"num",
"num-traits",
"once_cell",

@ -128,7 +128,7 @@ These are roughly in order of completition
- this was intentional so that recently confirmed transactions go to a server that is more likely to have the tx.
- but under heavy load, we hit their rate limits. need a "retry_until_success" function that goes to balanced_rpcs. or maybe store in redis the txids that we broadcast privately and use that to route.
- [x] some of the DashMaps grow unbounded! Make/find a "SizedDashMap" that cleans up old rows with some garbage collection task
- moka is exactly what we need
- moka has all the features that we need and more
- [x] if block data limit is 0, say Unknown in Debug output
- [x] basic request method stats (using the user_id and other fields that are in the tracing frame)
- [x] refactor from_anyhow_error to have consistent error codes and http codes. maybe implement the Error trait

@ -1,6 +1,6 @@
//#![warn(missing_docs)]
use log::error;
use quick_cache_ttl::{CacheWithTTL, UnitWeighter};
use quick_cache_ttl::CacheWithTTL;
use redis_rate_limiter::{RedisRateLimitResult, RedisRateLimiter};
use std::cmp::Eq;
use std::fmt::{Debug, Display};
@ -16,8 +16,7 @@ pub struct DeferredRateLimiter<K>
where
K: Send + Sync,
{
local_cache:
CacheWithTTL<K, Arc<AtomicU64>, UnitWeighter, hashbrown::hash_map::DefaultHashBuilder>,
local_cache: CacheWithTTL<K, Arc<AtomicU64>>,
prefix: String,
rrl: RedisRateLimiter,
/// if None, defers to the max on rrl
@ -46,18 +45,8 @@ where
// TODO: time to live is not exactly right. we want this ttl counter to start only after redis is down. this works for now
// TODO: what do these weigh?
// TODO: allow skipping max_capacity
let local_cache = CacheWithTTL::new(
cache_size,
cache_size as u64,
UnitWeighter,
hashbrown::hash_map::DefaultHashBuilder::default(),
Duration::from_secs(ttl),
)
.await;
// .time_to_live(Duration::from_secs(ttl))
// .max_capacity(cache_size)
// .name(prefix)
// .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default());
let local_cache =
CacheWithTTL::new_with_capacity(cache_size, Duration::from_secs(ttl)).await;
Self {
local_cache,

@ -2,26 +2,61 @@ use quick_cache::{DefaultHashBuilder, UnitWeighter, Weighter};
use std::{
future::Future,
hash::{BuildHasher, Hash},
sync::Arc,
time::Duration,
};
use crate::{KQCacheWithTTL, PlaceholderGuardWithTTL};
pub struct CacheWithTTL<Key, Val, We, B>(KQCacheWithTTL<Key, (), Val, We, B>);
pub struct CacheWithTTL<Key, Val, We = UnitWeighter, B = DefaultHashBuilder>(
KQCacheWithTTL<Key, (), Val, We, B>,
);
impl<Key: Eq + Hash + Clone + Send + Sync + 'static, Val: Clone + Send + Sync + 'static>
CacheWithTTL<Key, Val, UnitWeighter, DefaultHashBuilder>
{
pub async fn new_with_unit_weights(estimated_items_capacity: usize, ttl: Duration) -> Self {
pub async fn new_with_capacity(capacity: usize, ttl: Duration) -> Self {
Self::new(
estimated_items_capacity,
estimated_items_capacity as u64,
capacity,
capacity as u64,
UnitWeighter,
DefaultHashBuilder::default(),
ttl,
)
.await
}
pub async fn arc_with_capacity(capacity: usize, ttl: Duration) -> Arc<Self> {
let x = Self::new_with_capacity(capacity, ttl).await;
Arc::new(x)
}
}
impl<
Key: Eq + Hash + Clone + Send + Sync + 'static,
Val: Clone + Send + Sync + 'static,
We: Weighter<Key, (), Val> + Clone + Send + Sync + 'static,
B: BuildHasher + Clone + Default + Send + Sync + 'static,
> CacheWithTTL<Key, Val, We, B>
{
pub async fn new_with_weights(
estimated_items_capacity: usize,
weight_capacity: u64,
weighter: We,
ttl: Duration,
) -> Self {
let inner = KQCacheWithTTL::new(
estimated_items_capacity,
weight_capacity,
weighter,
B::default(),
ttl,
)
.await;
Self(inner)
}
}
impl<
@ -50,6 +85,11 @@ impl<
Self(inner)
}
#[inline]
pub fn get(&self, key: &Key) -> Option<Val> {
self.0.get(key, &())
}
#[inline]
pub async fn get_or_insert_async<E, Fut>(&self, key: &Key, f: Fut) -> Result<Val, E>
where

@ -8,10 +8,10 @@ use tokio::task::JoinHandle;
use tokio::time::{sleep_until, Instant};
pub struct KQCacheWithTTL<Key, Qey, Val, We, B> {
pub(crate) cache: Arc<KQCache<Key, Qey, Val, We, B>>,
cache: Arc<KQCache<Key, Qey, Val, We, B>>,
pub task_handle: JoinHandle<()>,
ttl: Duration,
pub(crate) tx: flume::Sender<(Instant, Key, Qey)>,
tx: flume::Sender<(Instant, Key, Qey)>,
}
struct KQCacheWithTTLTask<Key, Qey, Val, We, B> {
@ -68,6 +68,11 @@ impl<
}
}
#[inline]
pub fn get(&self, key: &Key, qey: &Qey) -> Option<Val> {
self.cache.get(key, qey)
}
#[inline]
pub async fn get_or_insert_async<E, Fut>(&self, key: &Key, qey: &Qey, f: Fut) -> Result<Val, E>
where

@ -3,4 +3,5 @@ mod kq_cache;
pub use cache::CacheWithTTL;
pub use kq_cache::{KQCacheWithTTL, PlaceholderGuardWithTTL};
pub use quick_cache::sync::{Cache, KQCache};
pub use quick_cache::{DefaultHashBuilder, UnitWeighter, Weighter};

@ -62,7 +62,6 @@ itertools = "0.10.5"
listenfd = "1.0.1"
log = "0.4.17"
mimalloc = { version = "0.1.37", optional = true}
moka = { version = "0.11.0", default-features = false, features = ["future"] }
num = "0.4.0"
num-traits = "0.2.15"
once_cell = { version = "1.17.1" }

@ -46,7 +46,7 @@ use migration::sea_orm::{
};
use migration::sea_query::table::ColumnDef;
use migration::{Alias, DbErr, Migrator, MigratorTrait, Table};
use moka::future::Cache;
use quick_cache_ttl::{Cache, CacheWithTTL};
use redis_rate_limiter::redis::AsyncCommands;
use redis_rate_limiter::{redis, DeadpoolRuntime, RedisConfig, RedisPool, RedisRateLimiter};
use serde::Serialize;
@ -61,7 +61,6 @@ use std::time::Duration;
use tokio::sync::{broadcast, watch, Semaphore};
use tokio::task::JoinHandle;
use tokio::time::{sleep, timeout};
use ulid::Ulid;
// TODO: make this customizable?
// TODO: include GIT_REF in here. i had trouble getting https://docs.rs/vergen/latest/vergen/ to work with a workspace. also .git is in .dockerignore
@ -127,9 +126,8 @@ impl DatabaseReplica {
}
}
// TODO: this should be a the secret key id, not the key itself!
pub type RpcSecretKeyCache =
Cache<Ulid, AuthorizationChecks, hashbrown::hash_map::DefaultHashBuilder>;
/// Cache data from the database about rpc keys
pub type RpcSecretKeyCache = Arc<CacheWithTTL<RpcSecretKey, AuthorizationChecks>>;
/// The application
// TODO: i'm sure this is more arcs than necessary, but spawning futures makes references hard
@ -161,7 +159,7 @@ pub struct Web3ProxyApp {
pub hostname: Option<String>,
/// store pending transactions that we've seen so that we don't send duplicates to subscribers
/// TODO: think about this more. might be worth storing if we sent the transaction or not and using this for automatic retries
pub pending_transactions: Cache<TxHash, TxStatus, hashbrown::hash_map::DefaultHashBuilder>,
pub pending_transactions: Arc<CacheWithTTL<TxHash, TxStatus>>,
/// rate limit anonymous users
pub frontend_ip_rate_limiter: Option<DeferredRateLimiter<IpAddr>>,
/// rate limit authenticated users
@ -178,13 +176,11 @@ pub struct Web3ProxyApp {
// TODO: should the key be our RpcSecretKey class instead of Ulid?
pub rpc_secret_key_cache: RpcSecretKeyCache,
/// concurrent/parallel RPC request limits for authenticated users
pub registered_user_semaphores:
Cache<NonZeroU64, Arc<Semaphore>, hashbrown::hash_map::DefaultHashBuilder>,
pub rpc_key_semaphores: Cache<NonZeroU64, Arc<Semaphore>>,
/// concurrent/parallel request limits for anonymous users
pub ip_semaphores: Cache<IpAddr, Arc<Semaphore>, hashbrown::hash_map::DefaultHashBuilder>,
pub ip_semaphores: Cache<IpAddr, Arc<Semaphore>>,
/// concurrent/parallel application request limits for authenticated users
pub bearer_token_semaphores:
Cache<UserBearerToken, Arc<Semaphore>, hashbrown::hash_map::DefaultHashBuilder>,
pub bearer_token_semaphores: Cache<UserBearerToken, Arc<Semaphore>>,
pub kafka_producer: Option<rdkafka::producer::FutureProducer>,
/// channel for sending stats in a background task
pub stat_sender: Option<flume::Sender<AppStat>>,
@ -510,10 +506,8 @@ impl Web3ProxyApp {
// if there is no database of users, there will be no keys and so this will be empty
// TODO: max_capacity from config
// TODO: ttl from config
let rpc_secret_key_cache = Cache::builder()
.max_capacity(10_000)
.time_to_live(Duration::from_secs(600))
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default());
let rpc_secret_key_cache =
CacheWithTTL::arc_with_capacity(10_000, Duration::from_secs(600)).await;
// create a channel for receiving stats
// we do this in a channel so we don't slow down our response to the users
@ -603,13 +597,11 @@ impl Web3ProxyApp {
// TODO: capacity from configs
// all these are the same size, so no need for a weigher
// TODO: this used to have a time_to_idle
let pending_transactions = Cache::builder()
.max_capacity(10_000)
// TODO: different chains might handle this differently
// TODO: what should we set? 5 minutes is arbitrary. the nodes themselves hold onto transactions for much longer
// TODO: this used to be time_to_update, but
.time_to_live(Duration::from_secs(300))
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default());
// TODO: different chains might handle this differently
// TODO: what should we set? 5 minutes is arbitrary. the nodes themselves hold onto transactions for much longer
// TODO: this used to be time_to_update, but
let pending_transactions =
CacheWithTTL::arc_with_capacity(10_000, Duration::from_secs(300)).await;
// responses can be very different in sizes, so this is a cache with a max capacity and a weigher
// TODO: don't allow any response to be bigger than X% of the cache
@ -624,17 +616,15 @@ impl Web3ProxyApp {
)
.await;
// TODO: how should we handle hitting this max?
let max_users = 20_000;
// create semaphores for concurrent connection limits
// TODO: how can we implement time til idle?
// TODO: what should tti be for semaphores?
let bearer_token_semaphores = Cache::builder()
.time_to_idle(Duration::from_secs(120))
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default());
let ip_semaphores = Cache::builder()
.time_to_idle(Duration::from_secs(120))
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default());
let registered_user_semaphores = Cache::builder()
.time_to_idle(Duration::from_secs(120))
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default());
let bearer_token_semaphores = Cache::new(max_users);
let ip_semaphores = Cache::new(max_users);
let registered_user_semaphores = Cache::new(max_users);
let (balanced_rpcs, balanced_handle, consensus_connections_watcher) = Web3Rpcs::spawn(
top_config.app.chain_id,
@ -745,7 +735,7 @@ impl Web3ProxyApp {
rpc_secret_key_cache,
bearer_token_semaphores,
ip_semaphores,
registered_user_semaphores,
rpc_key_semaphores: registered_user_semaphores,
stat_sender,
};

@ -28,7 +28,9 @@ use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::util::Timeout as KafkaTimeout;
use redis_rate_limiter::redis::AsyncCommands;
use redis_rate_limiter::RedisRateLimitResult;
use std::convert::Infallible;
use std::fmt::Display;
use std::hash::Hash;
use std::mem;
use std::sync::atomic::{self, AtomicBool, AtomicI64, AtomicU64, AtomicUsize};
use std::time::Duration;
@ -47,6 +49,15 @@ pub enum RpcSecretKey {
Uuid(Uuid),
}
impl Hash for RpcSecretKey {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
match self {
Self::Ulid(x) => state.write_u128(x.0),
Self::Uuid(x) => state.write_u128(x.as_u128()),
}
}
}
/// TODO: should this have IpAddr and Origin or AuthorizationChecks?
#[derive(Debug)]
pub enum RateLimitResult {
@ -872,12 +883,12 @@ impl Web3ProxyApp {
if let Some(max_concurrent_requests) = self.config.public_max_concurrent_requests {
let semaphore = self
.ip_semaphores
.get_with_by_ref(ip, async move {
.get_or_insert_async::<Web3ProxyError>(ip, async move {
// TODO: set max_concurrent_requests dynamically based on load?
let s = Semaphore::new(max_concurrent_requests);
Arc::new(s)
Ok(Arc::new(s))
})
.await;
.await?;
// if semaphore.available_permits() == 0 {
// // TODO: concurrent limit hit! emit a stat? less important for anon users
@ -901,17 +912,17 @@ impl Web3ProxyApp {
let user_id = authorization_checks
.user_id
.try_into()
.or(Err(Web3ProxyError::UserIdZero))
.web3_context("user ids should always be non-zero")?;
.or(Err(Web3ProxyError::UserIdZero))?;
let semaphore = self
.registered_user_semaphores
.get_with(user_id, async move {
.rpc_key_semaphores
.get_or_insert_async(&user_id, async move {
let s = Semaphore::new(max_concurrent_requests as usize);
// trace!("new semaphore for user_id {}", user_id);
Arc::new(s)
Ok::<_, Infallible>(Arc::new(s))
})
.await;
.await
.unwrap();
// if semaphore.available_permits() == 0 {
// // TODO: concurrent limit hit! emit a stat? this has a race condition though.
@ -939,11 +950,12 @@ impl Web3ProxyApp {
// limit concurrent requests
let semaphore = self
.bearer_token_semaphores
.get_with_by_ref(&user_bearer_token, async move {
.get_or_insert_async::<Infallible>(&user_bearer_token, async move {
let s = Semaphore::new(self.config.bearer_token_max_concurrent_requests as usize);
Arc::new(s)
Ok(Arc::new(s))
})
.await;
.await
.unwrap();
let semaphore_permit = semaphore.acquire_owned().await?;
@ -1086,9 +1098,9 @@ impl Web3ProxyApp {
proxy_mode: ProxyMode,
rpc_secret_key: RpcSecretKey,
) -> Web3ProxyResult<AuthorizationChecks> {
let authorization_checks: Result<_, Arc<Web3ProxyError>> = self
let authorization_checks: Result<_, Web3ProxyError> = self
.rpc_secret_key_cache
.try_get_with(rpc_secret_key.into(), async move {
.get_or_insert_async(&rpc_secret_key, async move {
// trace!(?rpc_secret_key, "user cache miss");
let db_replica = self
@ -1107,6 +1119,7 @@ impl Web3ProxyApp {
Some(rpc_key_model) => {
// TODO: move these splits into helper functions
// TODO: can we have sea orm handle this for us?
// TODO: don't expect. return an application error
let user_model = user::Entity::find_by_id(rpc_key_model.user_id)
.one(db_replica.conn())
.await?
@ -1209,7 +1222,7 @@ impl Web3ProxyApp {
})
.await;
authorization_checks.map_err(Web3ProxyError::Arc)
authorization_checks
}
/// Authorized the ip/origin/referer/useragent and rate limit and concurrency

@ -5,7 +5,6 @@ use crate::jsonrpc::{JsonRpcErrorData, JsonRpcForwardedResponse};
use crate::response_cache::JsonRpcResponseData;
use std::error::Error;
use std::sync::Arc;
use std::{borrow::Cow, net::IpAddr};
use axum::{
@ -33,13 +32,11 @@ impl From<Web3ProxyError> for Web3ProxyResult<()> {
}
}
// TODO:
#[derive(Debug, Display, Error, From)]
pub enum Web3ProxyError {
AccessDenied,
#[error(ignore)]
Anyhow(anyhow::Error),
Arc(Arc<Web3ProxyError>),
#[error(ignore)]
#[from(ignore)]
BadRequest(String),
@ -685,13 +682,6 @@ impl Web3ProxyError {
},
)
}
Self::Arc(err) => {
return match Arc::try_unwrap(err) {
Ok(err) => err,
Err(err) => Self::Anyhow(anyhow::anyhow!("{}", err)),
}
.into_response_parts();
}
Self::SemaphoreAcquireError(err) => {
warn!("semaphore acquire err={:?}", err);
(

@ -56,7 +56,7 @@ pub async fn serve(
let response_cache_size = ResponseCacheKey::COUNT;
let response_cache =
ResponseCache::new_with_unit_weights(response_cache_size, Duration::from_secs(1)).await;
ResponseCache::new_with_capacity(response_cache_size, Duration::from_secs(1)).await;
// TODO: read config for if fastest/versus should be available publicly. default off

@ -10,10 +10,11 @@ use crate::{config::BlockAndRpc, jsonrpc::JsonRpcRequest};
use derive_more::From;
use ethers::prelude::{Block, TxHash, H256, U64};
use log::{debug, trace, warn, Level};
use moka::future::Cache;
use quick_cache_ttl::CacheWithTTL;
use serde::ser::SerializeStruct;
use serde::Serialize;
use serde_json::json;
use std::convert::Infallible;
use std::hash::Hash;
use std::{cmp::Ordering, fmt::Display, sync::Arc};
use tokio::sync::broadcast;
@ -22,7 +23,8 @@ use tokio::time::Duration;
// TODO: type for Hydrated Blocks with their full transactions?
pub type ArcBlock = Arc<Block<TxHash>>;
pub type BlocksByHashCache = Cache<H256, Web3ProxyBlock, hashbrown::hash_map::DefaultHashBuilder>;
pub type BlocksByHashCache = Arc<CacheWithTTL<H256, Web3ProxyBlock>>;
pub type BlocksByNumberCache = Arc<CacheWithTTL<U64, H256>>;
/// A block and its age.
#[derive(Clone, Debug, Default, From)]
@ -168,9 +170,7 @@ impl Web3Rpcs {
heaviest_chain: bool,
) -> Web3ProxyResult<Web3ProxyBlock> {
// TODO: i think we can rearrange this function to make it faster on the hot path
let block_hash = block.hash();
if block_hash.is_zero() {
if block.hash().is_zero() {
debug!("Skipping block without hash!");
return Ok(block);
}
@ -182,15 +182,18 @@ impl Web3Rpcs {
// this is the only place that writes to block_numbers
// multiple inserts should be okay though
// TODO: info that there was a fork?
self.blocks_by_number.insert(*block_num, *block_hash).await;
self.blocks_by_number.insert(*block_num, *block.hash());
}
// this block is very likely already in block_hashes
// TODO: use their get_with
let block_hash = *block.hash();
let block = self
.blocks_by_hash
.get_with(*block_hash, async move { block })
.await;
.get_or_insert_async::<Infallible, _>(&block_hash, async move { Ok(block) })
.await
.unwrap();
Ok(block)
}
@ -423,7 +426,7 @@ impl Web3Rpcs {
return Ok(());
}
let new_synced_connections = match consensus_finder
let new_consensus_rpcs = match consensus_finder
.find_consensus_connections(authorization, self)
.await
{
@ -436,21 +439,21 @@ impl Web3Rpcs {
Ok(Some(x)) => x,
};
trace!("new_synced_connections: {:#?}", new_synced_connections);
trace!("new_synced_connections: {:#?}", new_consensus_rpcs);
let watch_consensus_head_sender = self.watch_consensus_head_sender.as_ref().unwrap();
let consensus_tier = new_synced_connections.tier;
let consensus_tier = new_consensus_rpcs.tier;
// TODO: think more about this unwrap
let total_tiers = consensus_finder.worst_tier().unwrap_or(10);
let backups_needed = new_synced_connections.backups_needed;
let consensus_head_block = new_synced_connections.head_block.clone();
let num_consensus_rpcs = new_synced_connections.num_consensus_rpcs();
let backups_needed = new_consensus_rpcs.backups_needed;
let consensus_head_block = new_consensus_rpcs.head_block.clone();
let num_consensus_rpcs = new_consensus_rpcs.num_consensus_rpcs();
let num_active_rpcs = consensus_finder.len();
let total_rpcs = self.by_name.load().len();
let old_consensus_head_connections = self
.watch_consensus_rpcs_sender
.send_replace(Some(Arc::new(new_synced_connections)));
.send_replace(Some(Arc::new(new_consensus_rpcs)));
let backups_voted_str = if backups_needed { "B " } else { "" };

@ -8,10 +8,11 @@ use ethers::prelude::{H256, U64};
use hashbrown::{HashMap, HashSet};
use itertools::{Itertools, MinMaxResult};
use log::{debug, trace, warn};
use moka::future::Cache;
use quick_cache_ttl::Cache;
use serde::Serialize;
use std::cmp::{Ordering, Reverse};
use std::collections::BTreeMap;
use std::convert::Infallible;
use std::fmt;
use std::sync::Arc;
use tokio::time::Instant;
@ -321,7 +322,7 @@ impl Web3Rpcs {
}
}
type FirstSeenCache = Cache<H256, Instant, hashbrown::hash_map::DefaultHashBuilder>;
type FirstSeenCache = Cache<H256, Instant>;
/// A ConsensusConnections builder that tracks all connection heads across multiple groups of servers
pub struct ConsensusFinder {
@ -342,9 +343,7 @@ impl ConsensusFinder {
pub fn new(max_block_age: Option<u64>, max_block_lag: Option<U64>) -> Self {
// TODO: what's a good capacity for this? it shouldn't need to be very large
// TODO: if we change Web3ProxyBlock to store the instance, i think we could use the block_by_hash cache
let first_seen = Cache::builder()
.max_capacity(16)
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default());
let first_seen = Cache::new(16);
// TODO: hard coding 0-9 isn't great, but its easier than refactoring this to be smart about config reloading
let rpc_heads = HashMap::new();
@ -372,8 +371,9 @@ impl ConsensusFinder {
async fn insert(&mut self, rpc: Arc<Web3Rpc>, block: Web3ProxyBlock) -> Option<Web3ProxyBlock> {
let first_seen = self
.first_seen
.get_with_by_ref(block.hash(), async { Instant::now() })
.await;
.get_or_insert_async::<Infallible>(block.hash(), async { Ok(Instant::now()) })
.await
.unwrap();
// calculate elapsed time before trying to lock
let latency = first_seen.elapsed();

@ -1,5 +1,5 @@
///! Load balanced communication with a group of web3 rpc providers
use super::blockchain::{BlocksByHashCache, Web3ProxyBlock};
use super::blockchain::{BlocksByHashCache, BlocksByNumberCache, Web3ProxyBlock};
use super::consensus::{ConsensusWeb3Rpcs, ShouldWaitForBlock};
use super::one::Web3Rpc;
use super::request::{OpenRequestHandle, OpenRequestResult, RequestErrorHandler};
@ -16,7 +16,7 @@ use anyhow::Context;
use arc_swap::ArcSwap;
use counter::Counter;
use derive_more::From;
use ethers::prelude::{ProviderError, TxHash, H256, U64};
use ethers::prelude::{ProviderError, TxHash, U64};
use futures::future::try_join_all;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
@ -24,8 +24,8 @@ use hashbrown::{HashMap, HashSet};
use itertools::Itertools;
use log::{debug, error, info, trace, warn, Level};
use migration::sea_orm::DatabaseConnection;
use moka::future::Cache;
use ordered_float::OrderedFloat;
use quick_cache_ttl::CacheWithTTL;
use serde::ser::{SerializeStruct, Serializer};
use serde::Serialize;
use serde_json::json;
@ -58,15 +58,14 @@ pub struct Web3Rpcs {
/// this head receiver makes it easy to wait until there is a new block
pub(super) watch_consensus_head_sender: Option<watch::Sender<Option<Web3ProxyBlock>>>,
/// keep track of transactions that we have sent through subscriptions
pub(super) pending_transaction_cache:
Cache<TxHash, TxStatus, hashbrown::hash_map::DefaultHashBuilder>,
pub(super) pending_transaction_cache: Arc<CacheWithTTL<TxHash, TxStatus>>,
pub(super) pending_tx_id_receiver: flume::Receiver<TxHashAndRpc>,
pub(super) pending_tx_id_sender: flume::Sender<TxHashAndRpc>,
/// TODO: this map is going to grow forever unless we do some sort of pruning. maybe store pruned in redis?
/// all blocks, including orphans
pub(super) blocks_by_hash: BlocksByHashCache,
/// blocks on the heaviest chain
pub(super) blocks_by_number: Cache<U64, H256, hashbrown::hash_map::DefaultHashBuilder>,
pub(super) blocks_by_number: BlocksByNumberCache,
/// the number of rpcs required to agree on consensus for the head block (thundering herd protection)
pub(super) min_head_rpcs: usize,
/// the soft limit required to agree on consensus for the head block. (thundering herd protection)
@ -89,7 +88,7 @@ impl Web3Rpcs {
min_head_rpcs: usize,
min_sum_soft_limit: u32,
name: String,
pending_transaction_cache: Cache<TxHash, TxStatus, hashbrown::hash_map::DefaultHashBuilder>,
pending_transaction_cache: Arc<CacheWithTTL<TxHash, TxStatus>>,
pending_tx_sender: Option<broadcast::Sender<TxStatus>>,
watch_consensus_head_sender: Option<watch::Sender<Option<Web3ProxyBlock>>>,
) -> anyhow::Result<(
@ -159,24 +158,23 @@ impl Web3Rpcs {
};
// these blocks don't have full transactions, but they do have rather variable amounts of transaction hashes
// TODO: how can we do the weigher better? need to know actual allocated size
// TODO: how can we do the weigher this? need to know actual allocated size
// TODO: time_to_idle instead?
// TODO: limits from config
let blocks_by_hash: BlocksByHashCache = Cache::builder()
.max_capacity(1024 * 1024 * 1024)
.weigher(|_k, v: &Web3ProxyBlock| {
1 + v.block.transactions.len().try_into().unwrap_or(u32::MAX)
})
.time_to_live(Duration::from_secs(30 * 60))
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default());
let blocks_by_hash: BlocksByHashCache =
Arc::new(CacheWithTTL::new_with_capacity(10_000, Duration::from_secs(30 * 60)).await);
// .max_capacity(1024 * 1024 * 1024)
// .weigher(|_k, v: &Web3ProxyBlock| {
// 1 + v.block.transactions.len().try_into().unwrap_or(u32::MAX)
// })
// .time_to_live(Duration::from_secs(30 * 60))
// .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default());
// all block numbers are the same size, so no need for weigher
// TODO: limits from config
// TODO: time_to_idle instead?
let blocks_by_number = Cache::builder()
.time_to_live(Duration::from_secs(30 * 60))
.max_capacity(10_000)
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default());
let blocks_by_number =
Arc::new(CacheWithTTL::new_with_capacity(10_000, Duration::from_secs(30 * 60)).await);
let (watch_consensus_rpcs_sender, consensus_connections_watcher) =
watch::channel(Default::default());
@ -264,7 +262,7 @@ impl Web3Rpcs {
};
let pending_tx_id_sender = Some(self.pending_tx_id_sender.clone());
let blocks_by_hash = self.blocks_by_hash.clone();
let blocks_by_hash_cache = self.blocks_by_hash.clone();
let http_interval_sender = self.http_interval_sender.clone();
let chain_id = app.config.chain_id;
@ -277,7 +275,7 @@ impl Web3Rpcs {
chain_id,
http_client,
http_interval_sender,
blocks_by_hash,
blocks_by_hash_cache,
block_sender,
pending_tx_id_sender,
true,
@ -1249,6 +1247,7 @@ mod tests {
use crate::rpcs::consensus::ConsensusFinder;
use crate::rpcs::{blockchain::Web3ProxyBlock, provider::Web3Provider};
use arc_swap::ArcSwap;
use ethers::types::H256;
use ethers::types::{Block, U256};
use latency::PeakEwmaLatency;
use log::{trace, LevelFilter};
@ -1436,14 +1435,15 @@ mod tests {
name: "test".to_string(),
watch_consensus_head_sender: Some(watch_consensus_head_sender),
watch_consensus_rpcs_sender,
pending_transaction_cache: Cache::builder()
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()),
pending_transaction_cache: CacheWithTTL::arc_with_capacity(
100,
Duration::from_secs(60),
)
.await,
pending_tx_id_receiver,
pending_tx_id_sender,
blocks_by_hash: Cache::builder()
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()),
blocks_by_number: Cache::builder()
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()),
blocks_by_hash: CacheWithTTL::arc_with_capacity(100, Duration::from_secs(60)).await,
blocks_by_number: CacheWithTTL::arc_with_capacity(100, Duration::from_secs(60)).await,
// TODO: test max_block_age?
max_block_age: None,
// TODO: test max_block_lag?
@ -1688,14 +1688,15 @@ mod tests {
name: "test".to_string(),
watch_consensus_head_sender: Some(watch_consensus_head_sender),
watch_consensus_rpcs_sender,
pending_transaction_cache: Cache::builder()
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()),
pending_transaction_cache: CacheWithTTL::arc_with_capacity(
100,
Duration::from_secs(120),
)
.await,
pending_tx_id_receiver,
pending_tx_id_sender,
blocks_by_hash: Cache::builder()
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()),
blocks_by_number: Cache::builder()
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()),
blocks_by_hash: CacheWithTTL::arc_with_capacity(100, Duration::from_secs(120)).await,
blocks_by_number: CacheWithTTL::arc_with_capacity(100, Duration::from_secs(120)).await,
min_head_rpcs: 1,
min_sum_soft_limit: 4_000,
max_block_age: None,
@ -1853,14 +1854,16 @@ mod tests {
name: "test".to_string(),
watch_consensus_head_sender: Some(watch_consensus_head_sender),
watch_consensus_rpcs_sender,
pending_transaction_cache: Cache::builder()
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()),
pending_transaction_cache: CacheWithTTL::arc_with_capacity(
10_000,
Duration::from_secs(120),
)
.await,
pending_tx_id_receiver,
pending_tx_id_sender,
blocks_by_hash: Cache::builder()
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()),
blocks_by_number: Cache::builder()
.build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()),
blocks_by_hash: CacheWithTTL::arc_with_capacity(10_000, Duration::from_secs(120)).await,
blocks_by_number: CacheWithTTL::arc_with_capacity(10_000, Duration::from_secs(120))
.await,
min_head_rpcs: 1,
min_sum_soft_limit: 1_000,
max_block_age: None,

@ -23,6 +23,7 @@ use serde::Serialize;
use serde_json::json;
use std::borrow::Cow;
use std::cmp::min;
use std::convert::Infallible;
use std::fmt;
use std::hash::{Hash, Hasher};
use std::sync::atomic::{self, AtomicBool, AtomicU64, AtomicUsize};
@ -622,8 +623,11 @@ impl Web3Rpc {
// if we already have this block saved, set new_head_block to that arc. otherwise store this copy
let new_head_block = block_map
.get_with(new_hash, async move { new_head_block })
.await;
.get_or_insert_async::<Infallible, _>(
&new_hash,
async move { Ok(new_head_block) },
)
.await?;
// save the block so we don't send the same one multiple times
// also save so that archive checks can know how far back to query

@ -82,7 +82,7 @@ impl Web3Rpcs {
}
// trace!(?pending_tx_id, "checking pending_transactions on {}", rpc);
if self.pending_transaction_cache.contains_key(&pending_tx_id) {
if self.pending_transaction_cache.get(&pending_tx_id).is_some() {
// this transaction has already been processed
return Ok(());
}

@ -4,7 +4,7 @@ use derive_more::From;
use futures::stream;
use hashbrown::HashMap;
use influxdb2::api::write::TimestampPrecision;
use log::{debug, error, info, trace};
use log::{error, info, trace};
use migration::sea_orm::prelude::Decimal;
use migration::sea_orm::DatabaseConnection;
use std::time::Duration;