simplify max item weight by having the weigher send MAX for big things

This commit is contained in:
Bryan Stitt 2023-06-16 15:49:10 -07:00
parent 1ec0f14144
commit 5d9365449f
9 changed files with 81 additions and 91 deletions

34
Cargo.lock generated

@ -1813,8 +1813,7 @@ dependencies = [
[[package]]
name = "ethers"
version = "2.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a58ce802c65cf3d0756dee5a61094a92cde53c1583b246e9ee5b37226c7fc15"
source = "git+https://github.com/llamanodes/ethers-rs/?rev=eb68f5d60850008cd302762bd3a5a4bdcfecc713#eb68f5d60850008cd302762bd3a5a4bdcfecc713"
dependencies = [
"ethers-addressbook",
"ethers-contract",
@ -1829,8 +1828,7 @@ dependencies = [
[[package]]
name = "ethers-addressbook"
version = "2.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b856b7b8ff5c961093cb8efe151fbcce724b451941ce20781de11a531ccd578"
source = "git+https://github.com/llamanodes/ethers-rs/?rev=eb68f5d60850008cd302762bd3a5a4bdcfecc713#eb68f5d60850008cd302762bd3a5a4bdcfecc713"
dependencies = [
"ethers-core",
"once_cell",
@ -1841,13 +1839,13 @@ dependencies = [
[[package]]
name = "ethers-contract"
version = "2.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e066a0d9cfc70c454672bf16bb433b0243427420076dc5b2f49c448fb5a10628"
source = "git+https://github.com/llamanodes/ethers-rs/?rev=eb68f5d60850008cd302762bd3a5a4bdcfecc713#eb68f5d60850008cd302762bd3a5a4bdcfecc713"
dependencies = [
"ethers-contract-abigen",
"ethers-contract-derive",
"ethers-core",
"ethers-providers",
"ethers-signers",
"futures-util",
"hex",
"once_cell",
@ -1860,8 +1858,7 @@ dependencies = [
[[package]]
name = "ethers-contract-abigen"
version = "2.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c113e3e86b6bc16d98484b2c3bb2d01d6fed9f489fe2e592e5cc87c3024d616b"
source = "git+https://github.com/llamanodes/ethers-rs/?rev=eb68f5d60850008cd302762bd3a5a4bdcfecc713#eb68f5d60850008cd302762bd3a5a4bdcfecc713"
dependencies = [
"Inflector",
"dunce",
@ -1884,8 +1881,7 @@ dependencies = [
[[package]]
name = "ethers-contract-derive"
version = "2.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8c3fb5adee25701c79ec58fcf2c63594cd8829bc9ad6037ff862d5a111101ed2"
source = "git+https://github.com/llamanodes/ethers-rs/?rev=eb68f5d60850008cd302762bd3a5a4bdcfecc713#eb68f5d60850008cd302762bd3a5a4bdcfecc713"
dependencies = [
"Inflector",
"ethers-contract-abigen",
@ -1900,8 +1896,7 @@ dependencies = [
[[package]]
name = "ethers-core"
version = "2.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6da5fa198af0d3be20c19192df2bd9590b92ce09a8421e793bec8851270f1b05"
source = "git+https://github.com/llamanodes/ethers-rs/?rev=eb68f5d60850008cd302762bd3a5a4bdcfecc713#eb68f5d60850008cd302762bd3a5a4bdcfecc713"
dependencies = [
"arrayvec",
"bytes",
@ -1930,8 +1925,7 @@ dependencies = [
[[package]]
name = "ethers-etherscan"
version = "2.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "84ebb401ba97c6f5af278c2c9936c4546cad75dec464b439ae6df249906f4caa"
source = "git+https://github.com/llamanodes/ethers-rs/?rev=eb68f5d60850008cd302762bd3a5a4bdcfecc713#eb68f5d60850008cd302762bd3a5a4bdcfecc713"
dependencies = [
"ethers-core",
"reqwest",
@ -1945,8 +1939,7 @@ dependencies = [
[[package]]
name = "ethers-middleware"
version = "2.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "740f4a773c19dd6d6a68c8c2e0996c096488d38997d524e21dc612c55da3bd24"
source = "git+https://github.com/llamanodes/ethers-rs/?rev=eb68f5d60850008cd302762bd3a5a4bdcfecc713#eb68f5d60850008cd302762bd3a5a4bdcfecc713"
dependencies = [
"async-trait",
"auto_impl",
@ -1972,8 +1965,7 @@ dependencies = [
[[package]]
name = "ethers-providers"
version = "2.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "56b498fd2a6c019d023e43e83488cd1fb0721f299055975aa6bac8dbf1e95f2c"
source = "git+https://github.com/llamanodes/ethers-rs/?rev=eb68f5d60850008cd302762bd3a5a4bdcfecc713#eb68f5d60850008cd302762bd3a5a4bdcfecc713"
dependencies = [
"async-trait",
"auto_impl",
@ -2009,8 +2001,7 @@ dependencies = [
[[package]]
name = "ethers-signers"
version = "2.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "02c4b7e15f212fa7cc2e1251868320221d4ff77a3d48068e69f47ce1c491df2d"
source = "git+https://github.com/llamanodes/ethers-rs/?rev=eb68f5d60850008cd302762bd3a5a4bdcfecc713#eb68f5d60850008cd302762bd3a5a4bdcfecc713"
dependencies = [
"async-trait",
"coins-bip32",
@ -2028,8 +2019,7 @@ dependencies = [
[[package]]
name = "ethers-solc"
version = "2.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a81c89f121595cf8959e746045bb8b25a6a38d72588561e1a3b7992fc213f674"
source = "git+https://github.com/llamanodes/ethers-rs/?rev=eb68f5d60850008cd302762bd3a5a4bdcfecc713#eb68f5d60850008cd302762bd3a5a4bdcfecc713"
dependencies = [
"cfg-if",
"dunce",

@ -10,8 +10,9 @@ path = "src/mod.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
ethers = { git = "https://github.com/llamanodes/ethers-rs/", rev = "eb68f5d60850008cd302762bd3a5a4bdcfecc713", default-features = false }
sea-orm = "0.11.3"
serde = "1.0.164"
uuid = "1.3.4"
ethers = "2.0.7"
ulid = "1.0.0"
uuid = "1.3.4"

@ -6,8 +6,9 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[build-dependencies]
ethers = "2.0.7"
ethers = { git = "https://github.com/llamanodes/ethers-rs/", rev = "eb68f5d60850008cd302762bd3a5a4bdcfecc713", default-features = false }
glob = "0.3.1"
[dependencies]
ethers = "2.0.7"
ethers = { git = "https://github.com/llamanodes/ethers-rs/", rev = "eb68f5d60850008cd302762bd3a5a4bdcfecc713", default-features = false }

@ -22,6 +22,7 @@ migration = { path = "../migration" }
payment-contracts = { path = "../payment-contracts" }
redis-rate-limiter = { path = "../redis-rate-limiter" }
ethers = { git = "https://github.com/llamanodes/ethers-rs/", rev = "eb68f5d60850008cd302762bd3a5a4bdcfecc713", default-features = false, features = ["rustls", "ws"] }
influxdb2 = { git = "https://github.com/llamanodes/influxdb2", features = ["rustls"], rev = "1e5577e14150797584f5ed0ea7aba0bd68f0a678" }
influxdb2-structmap = { git = "https://github.com/llamanodes/influxdb2/", rev = "1e5577e14150797584f5ed0ea7aba0bd68f0a678"}
@ -50,7 +51,6 @@ counter = "0.5.7"
derive_more = "0.99.17"
env_logger = "0.10.0"
ethbloom = "0.13.0"
ethers = { version = "2.0.7", default-features = false, features = ["rustls", "ws"] }
ewma = "0.1.1"
fdlimit = "0.2.1"
flume = "0.10.14"

@ -13,7 +13,7 @@ use crate::jsonrpc::{
};
use crate::relational_db::{get_db, get_migrated_db, DatabaseConnection, DatabaseReplica};
use crate::response_cache::{
json_rpc_response_weigher, JsonRpcQueryCacheKey, JsonRpcResponseCache, JsonRpcResponseEnum,
JsonRpcQueryCacheKey, JsonRpcResponseCache, JsonRpcResponseEnum, JsonRpcResponseWeigher,
};
use crate::rpcs::blockchain::Web3ProxyBlock;
use crate::rpcs::consensus::ConsensusWeb3Rpcs;
@ -43,7 +43,6 @@ use log::{error, info, trace, warn, Level};
use migration::sea_orm::prelude::Decimal;
use migration::sea_orm::{DatabaseTransaction, EntityTrait, PaginatorTrait, TransactionTrait};
use moka::future::{Cache, CacheBuilder};
use parking_lot::Mutex;
use redis_rate_limiter::redis::AsyncCommands;
use redis_rate_limiter::{redis, DeadpoolRuntime, RedisConfig, RedisPool, RedisRateLimiter};
use serde::Serialize;
@ -516,13 +515,15 @@ impl Web3ProxyApp {
// responses can be very different in sizes, so this is a cache with a max capacity and a weigher
// TODO: we should emit stats to calculate a more accurate expected cache size
// TODO: do we actually want a TTL on this?
// TODO: configurable max item weight
// TODO: resize the cache automatically
// TODO: configurable max item weight insted of hard coding to .1% of the cache?
let jsonrpc_weigher =
JsonRpcResponseWeigher((top_config.app.response_cache_max_bytes / 1000) as u32);
let jsonrpc_response_cache: JsonRpcResponseCache =
CacheBuilder::new(top_config.app.response_cache_max_bytes)
.name("jsonrpc_response_cache")
.time_to_idle(Duration::from_secs(3600))
.weigher(json_rpc_response_weigher)
.weigher(move |k, v| jsonrpc_weigher.weigh(k, v))
.build();
// TODO: how should we handle hitting this max?
@ -1703,17 +1704,13 @@ impl Web3ProxyApp {
if let Some(cache_key) = cache_key {
let from_block_num = cache_key.from_block_num();
let to_block_num = cache_key.to_block_num();
let cache_errors = cache_key.cache_errors();
let cache_jsonrpc_errors = cache_key.cache_errors();
// moka makes us do annoying things with arcs
enum CacheError {
NotCached(JsonRpcResponseEnum<Arc<RawValue>>),
Error(Arc<Web3ProxyError>),
}
// TODO: try to fetch out of s3
let x = self
self
.jsonrpc_response_cache
.try_get_with::<_, Mutex<CacheError>>(cache_key.hash(), async {
.try_get_with::<_, Web3ProxyError>(cache_key.hash(), async {
let response_data = timeout(
duration,
self.balanced_rpcs
@ -1725,34 +1722,20 @@ impl Web3ProxyApp {
to_block_num.as_ref(),
)
)
.await
.map_err(|x| Mutex::new(CacheError::Error(Arc::new(Web3ProxyError::from(x)))))?;
.await?;
// TODO: i think response data should be Arc<JsonRpcResponseEnum<Box<RawValue>>>, but that's more work
let response_data: JsonRpcResponseEnum<Arc<RawValue>> = response_data.try_into()
.map_err(|x| Mutex::new(CacheError::Error(Arc::new(x))))?;
// TODO: read max size from the config
if response_data.num_bytes() as u64 > self.config.response_cache_max_bytes / 1000 {
Err(Mutex::new(CacheError::NotCached(response_data)))
} else if matches!(response_data, JsonRpcResponseEnum::Result { .. }) || cache_errors {
Ok(response_data)
if !cache_jsonrpc_errors && let Err(err) = response_data {
// if we are not supposed to cache jsonrpc errors,
// then we must not convert Provider errors into a JsonRpcResponseEnum
// return all the errors now. moka will not cache Err results
Err(err)
} else {
Err(Mutex::new(CacheError::NotCached(response_data)))
}
}).await;
let response_data: JsonRpcResponseEnum<Arc<RawValue>> = response_data.try_into()?;
match x {
Ok(x) => x,
Err(arc_err) => {
let locked = arc_err.lock();
match &*locked {
CacheError::Error(err) => return Err(Web3ProxyError::Arc(err.clone())),
CacheError::NotCached(x) => x.clone(),
// TODO: response data should maybe be Arc<JsonRpcResponseEnum<Box<RawValue>>>, but that's more work
Ok(response_data)
}
}
}
}).await?
} else {
let x = timeout(
duration,

@ -214,7 +214,7 @@ impl Web3ProxyError {
}
Self::BadResponse(err) => {
// TODO: think about this one more. ankr gives us this because ethers fails to parse responses without an id
debug!("BAD_RESPONSE: {}", err);
debug!("BAD_RESPONSE: {:?}", err);
(
StatusCode::INTERNAL_SERVER_ERROR,
JsonRpcErrorData {

@ -314,7 +314,10 @@ impl JsonRpcForwardedResponse {
data = err.data.clone();
} else if let Some(err) = err.as_serde_error() {
// this is not an rpc error. keep it as an error
return Err(Web3ProxyError::BadResponse(err.to_string().into()));
// TODO: ankr gives us "rate limited" but ethers fails to parse because it tries to require id even though its optional
return Err(Web3ProxyError::BadResponse(
format!("parse error: {}", err).into(),
));
} else {
return Err(anyhow::anyhow!("unexpected ethers error! {:?}", err).into());
}

@ -1,3 +1,4 @@
#![feature(let_chains)]
#![feature(trait_alias)]
pub mod admin_queries;

@ -226,60 +226,65 @@ impl TryFrom<ProviderError> for JsonRpcErrorData {
}
}
pub fn json_rpc_response_weigher<K, R>(_key: &K, value: &JsonRpcResponseEnum<R>) -> u32 {
value.num_bytes()
/// The inner u32 is the maximum weight per item
#[derive(Copy, Clone)]
pub struct JsonRpcResponseWeigher(pub u32);
impl JsonRpcResponseWeigher {
pub fn weigh<K, R>(&self, _key: &K, value: &JsonRpcResponseEnum<R>) -> u32 {
let x = value.num_bytes();
if x > self.0 {
// return max. the item may start to be inserted into the cache, but it will be immediatly removed
u32::MAX
} else {
x
}
}
}
#[cfg(test)]
mod tests {
use super::JsonRpcResponseEnum;
use crate::response_cache::json_rpc_response_weigher;
use crate::response_cache::JsonRpcResponseWeigher;
use moka::future::{Cache, CacheBuilder, ConcurrentCacheExt};
use serde_json::value::RawValue;
use std::sync::Arc;
use std::{sync::Arc, time::Duration};
#[tokio::test(start_paused = true)]
async fn test_json_rpc_query_weigher() {
let max_item_weight = 200;
let weight_capacity = 1_000;
// let test_cache: Cache<u32, JsonRpcResponseEnum<Arc<RawValue>>> =
// CacheBuilder::new(weight_capacity)
// .weigher(json_rpc_response_weigher)
// .time_to_live(Duration::from_secs(2))
// .build();
let weigher = JsonRpcResponseWeigher(max_item_weight);
let small_data: JsonRpcResponseEnum<Arc<RawValue>> = JsonRpcResponseEnum::Result {
value: Box::<RawValue>::default().into(),
num_bytes: max_item_weight / 2,
};
assert_eq!(
json_rpc_response_weigher(&(), &small_data),
max_item_weight / 2
);
assert_eq!(weigher.weigh(&(), &small_data), max_item_weight / 2);
let max_sized_data: JsonRpcResponseEnum<Arc<RawValue>> = JsonRpcResponseEnum::Result {
value: Box::<RawValue>::default().into(),
num_bytes: max_item_weight,
};
assert_eq!(
json_rpc_response_weigher(&(), &max_sized_data),
max_item_weight
);
assert_eq!(weigher.weigh(&(), &max_sized_data), max_item_weight);
let oversized_data: JsonRpcResponseEnum<Arc<RawValue>> = JsonRpcResponseEnum::Result {
value: Box::<RawValue>::default().into(),
num_bytes: max_item_weight * 2,
};
assert_eq!(
json_rpc_response_weigher(&(), &oversized_data),
max_item_weight * 2
);
assert_eq!(weigher.weigh(&(), &oversized_data), u32::MAX);
let test_cache: Cache<u32, JsonRpcResponseEnum<Arc<RawValue>>> =
CacheBuilder::new(weight_capacity)
.weigher(move |k, v| weigher.weigh(k, v))
.time_to_live(Duration::from_secs(2))
.build();
// TODO: helper for inserts that does size checking
/*
test_cache.insert(0, small_data).await;
test_cache.get(&0).unwrap();
@ -289,12 +294,18 @@ mod tests {
test_cache.get(&0).unwrap();
test_cache.get(&1).unwrap();
// TODO: this will currently work! need to wrap moka cache in a checked insert
test_cache.insert(2, oversized_data).await;
test_cache.get(&0).unwrap();
test_cache.get(&1).unwrap();
// oversized data will be in the cache temporarily (it should just be an arc though, so that should be fine)
test_cache.get(&2).unwrap();
// sync should do necessary cleanup
test_cache.sync();
// now it should be empty
assert!(test_cache.get(&2).is_none());
*/
}
}