From 5d9365449f8a2bb9490521ae3259e1cc357c6ce7 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Fri, 16 Jun 2023 15:49:10 -0700 Subject: [PATCH] simplify max item weight by having the weigher send MAX for big things --- Cargo.lock | 34 +++++++----------- entities/Cargo.toml | 5 +-- payment-contracts/Cargo.toml | 5 +-- web3_proxy/Cargo.toml | 2 +- web3_proxy/src/app/mod.rs | 57 +++++++++++------------------ web3_proxy/src/errors.rs | 2 +- web3_proxy/src/jsonrpc.rs | 5 ++- web3_proxy/src/lib.rs | 1 + web3_proxy/src/response_cache.rs | 61 +++++++++++++++++++------------- 9 files changed, 81 insertions(+), 91 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d1e4849c..9d85d32e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1813,8 +1813,7 @@ dependencies = [ [[package]] name = "ethers" version = "2.0.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2a58ce802c65cf3d0756dee5a61094a92cde53c1583b246e9ee5b37226c7fc15" +source = "git+https://github.com/llamanodes/ethers-rs/?rev=eb68f5d60850008cd302762bd3a5a4bdcfecc713#eb68f5d60850008cd302762bd3a5a4bdcfecc713" dependencies = [ "ethers-addressbook", "ethers-contract", @@ -1829,8 +1828,7 @@ dependencies = [ [[package]] name = "ethers-addressbook" version = "2.0.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b856b7b8ff5c961093cb8efe151fbcce724b451941ce20781de11a531ccd578" +source = "git+https://github.com/llamanodes/ethers-rs/?rev=eb68f5d60850008cd302762bd3a5a4bdcfecc713#eb68f5d60850008cd302762bd3a5a4bdcfecc713" dependencies = [ "ethers-core", "once_cell", @@ -1841,13 +1839,13 @@ dependencies = [ [[package]] name = "ethers-contract" version = "2.0.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e066a0d9cfc70c454672bf16bb433b0243427420076dc5b2f49c448fb5a10628" +source = "git+https://github.com/llamanodes/ethers-rs/?rev=eb68f5d60850008cd302762bd3a5a4bdcfecc713#eb68f5d60850008cd302762bd3a5a4bdcfecc713" dependencies = [ "ethers-contract-abigen", "ethers-contract-derive", "ethers-core", "ethers-providers", + "ethers-signers", "futures-util", "hex", "once_cell", @@ -1860,8 +1858,7 @@ dependencies = [ [[package]] name = "ethers-contract-abigen" version = "2.0.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c113e3e86b6bc16d98484b2c3bb2d01d6fed9f489fe2e592e5cc87c3024d616b" +source = "git+https://github.com/llamanodes/ethers-rs/?rev=eb68f5d60850008cd302762bd3a5a4bdcfecc713#eb68f5d60850008cd302762bd3a5a4bdcfecc713" dependencies = [ "Inflector", "dunce", @@ -1884,8 +1881,7 @@ dependencies = [ [[package]] name = "ethers-contract-derive" version = "2.0.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8c3fb5adee25701c79ec58fcf2c63594cd8829bc9ad6037ff862d5a111101ed2" +source = "git+https://github.com/llamanodes/ethers-rs/?rev=eb68f5d60850008cd302762bd3a5a4bdcfecc713#eb68f5d60850008cd302762bd3a5a4bdcfecc713" dependencies = [ "Inflector", "ethers-contract-abigen", @@ -1900,8 +1896,7 @@ dependencies = [ [[package]] name = "ethers-core" version = "2.0.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6da5fa198af0d3be20c19192df2bd9590b92ce09a8421e793bec8851270f1b05" +source = "git+https://github.com/llamanodes/ethers-rs/?rev=eb68f5d60850008cd302762bd3a5a4bdcfecc713#eb68f5d60850008cd302762bd3a5a4bdcfecc713" dependencies = [ "arrayvec", "bytes", @@ -1930,8 +1925,7 @@ dependencies = [ [[package]] name = "ethers-etherscan" version = "2.0.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "84ebb401ba97c6f5af278c2c9936c4546cad75dec464b439ae6df249906f4caa" +source = "git+https://github.com/llamanodes/ethers-rs/?rev=eb68f5d60850008cd302762bd3a5a4bdcfecc713#eb68f5d60850008cd302762bd3a5a4bdcfecc713" dependencies = [ "ethers-core", "reqwest", @@ -1945,8 +1939,7 @@ dependencies = [ [[package]] name = "ethers-middleware" version = "2.0.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "740f4a773c19dd6d6a68c8c2e0996c096488d38997d524e21dc612c55da3bd24" +source = "git+https://github.com/llamanodes/ethers-rs/?rev=eb68f5d60850008cd302762bd3a5a4bdcfecc713#eb68f5d60850008cd302762bd3a5a4bdcfecc713" dependencies = [ "async-trait", "auto_impl", @@ -1972,8 +1965,7 @@ dependencies = [ [[package]] name = "ethers-providers" version = "2.0.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "56b498fd2a6c019d023e43e83488cd1fb0721f299055975aa6bac8dbf1e95f2c" +source = "git+https://github.com/llamanodes/ethers-rs/?rev=eb68f5d60850008cd302762bd3a5a4bdcfecc713#eb68f5d60850008cd302762bd3a5a4bdcfecc713" dependencies = [ "async-trait", "auto_impl", @@ -2009,8 +2001,7 @@ dependencies = [ [[package]] name = "ethers-signers" version = "2.0.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "02c4b7e15f212fa7cc2e1251868320221d4ff77a3d48068e69f47ce1c491df2d" +source = "git+https://github.com/llamanodes/ethers-rs/?rev=eb68f5d60850008cd302762bd3a5a4bdcfecc713#eb68f5d60850008cd302762bd3a5a4bdcfecc713" dependencies = [ "async-trait", "coins-bip32", @@ -2028,8 +2019,7 @@ dependencies = [ [[package]] name = "ethers-solc" version = "2.0.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a81c89f121595cf8959e746045bb8b25a6a38d72588561e1a3b7992fc213f674" +source = "git+https://github.com/llamanodes/ethers-rs/?rev=eb68f5d60850008cd302762bd3a5a4bdcfecc713#eb68f5d60850008cd302762bd3a5a4bdcfecc713" dependencies = [ "cfg-if", "dunce", diff --git a/entities/Cargo.toml b/entities/Cargo.toml index 7209d85e..cfd6ba9d 100644 --- a/entities/Cargo.toml +++ b/entities/Cargo.toml @@ -10,8 +10,9 @@ path = "src/mod.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +ethers = { git = "https://github.com/llamanodes/ethers-rs/", rev = "eb68f5d60850008cd302762bd3a5a4bdcfecc713", default-features = false } + sea-orm = "0.11.3" serde = "1.0.164" -uuid = "1.3.4" -ethers = "2.0.7" ulid = "1.0.0" +uuid = "1.3.4" diff --git a/payment-contracts/Cargo.toml b/payment-contracts/Cargo.toml index 475947c5..f8c233b4 100644 --- a/payment-contracts/Cargo.toml +++ b/payment-contracts/Cargo.toml @@ -6,8 +6,9 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [build-dependencies] -ethers = "2.0.7" +ethers = { git = "https://github.com/llamanodes/ethers-rs/", rev = "eb68f5d60850008cd302762bd3a5a4bdcfecc713", default-features = false } + glob = "0.3.1" [dependencies] -ethers = "2.0.7" +ethers = { git = "https://github.com/llamanodes/ethers-rs/", rev = "eb68f5d60850008cd302762bd3a5a4bdcfecc713", default-features = false } diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index fa73815f..3695d8d7 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -22,6 +22,7 @@ migration = { path = "../migration" } payment-contracts = { path = "../payment-contracts" } redis-rate-limiter = { path = "../redis-rate-limiter" } +ethers = { git = "https://github.com/llamanodes/ethers-rs/", rev = "eb68f5d60850008cd302762bd3a5a4bdcfecc713", default-features = false, features = ["rustls", "ws"] } influxdb2 = { git = "https://github.com/llamanodes/influxdb2", features = ["rustls"], rev = "1e5577e14150797584f5ed0ea7aba0bd68f0a678" } influxdb2-structmap = { git = "https://github.com/llamanodes/influxdb2/", rev = "1e5577e14150797584f5ed0ea7aba0bd68f0a678"} @@ -50,7 +51,6 @@ counter = "0.5.7" derive_more = "0.99.17" env_logger = "0.10.0" ethbloom = "0.13.0" -ethers = { version = "2.0.7", default-features = false, features = ["rustls", "ws"] } ewma = "0.1.1" fdlimit = "0.2.1" flume = "0.10.14" diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index 13244fd9..c8a6d3d8 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -13,7 +13,7 @@ use crate::jsonrpc::{ }; use crate::relational_db::{get_db, get_migrated_db, DatabaseConnection, DatabaseReplica}; use crate::response_cache::{ - json_rpc_response_weigher, JsonRpcQueryCacheKey, JsonRpcResponseCache, JsonRpcResponseEnum, + JsonRpcQueryCacheKey, JsonRpcResponseCache, JsonRpcResponseEnum, JsonRpcResponseWeigher, }; use crate::rpcs::blockchain::Web3ProxyBlock; use crate::rpcs::consensus::ConsensusWeb3Rpcs; @@ -43,7 +43,6 @@ use log::{error, info, trace, warn, Level}; use migration::sea_orm::prelude::Decimal; use migration::sea_orm::{DatabaseTransaction, EntityTrait, PaginatorTrait, TransactionTrait}; use moka::future::{Cache, CacheBuilder}; -use parking_lot::Mutex; use redis_rate_limiter::redis::AsyncCommands; use redis_rate_limiter::{redis, DeadpoolRuntime, RedisConfig, RedisPool, RedisRateLimiter}; use serde::Serialize; @@ -516,13 +515,15 @@ impl Web3ProxyApp { // responses can be very different in sizes, so this is a cache with a max capacity and a weigher // TODO: we should emit stats to calculate a more accurate expected cache size // TODO: do we actually want a TTL on this? - // TODO: configurable max item weight - // TODO: resize the cache automatically + // TODO: configurable max item weight insted of hard coding to .1% of the cache? + let jsonrpc_weigher = + JsonRpcResponseWeigher((top_config.app.response_cache_max_bytes / 1000) as u32); + let jsonrpc_response_cache: JsonRpcResponseCache = CacheBuilder::new(top_config.app.response_cache_max_bytes) .name("jsonrpc_response_cache") .time_to_idle(Duration::from_secs(3600)) - .weigher(json_rpc_response_weigher) + .weigher(move |k, v| jsonrpc_weigher.weigh(k, v)) .build(); // TODO: how should we handle hitting this max? @@ -1703,17 +1704,13 @@ impl Web3ProxyApp { if let Some(cache_key) = cache_key { let from_block_num = cache_key.from_block_num(); let to_block_num = cache_key.to_block_num(); - let cache_errors = cache_key.cache_errors(); + let cache_jsonrpc_errors = cache_key.cache_errors(); - // moka makes us do annoying things with arcs - enum CacheError { - NotCached(JsonRpcResponseEnum>), - Error(Arc), - } + // TODO: try to fetch out of s3 - let x = self + self .jsonrpc_response_cache - .try_get_with::<_, Mutex>(cache_key.hash(), async { + .try_get_with::<_, Web3ProxyError>(cache_key.hash(), async { let response_data = timeout( duration, self.balanced_rpcs @@ -1725,34 +1722,20 @@ impl Web3ProxyApp { to_block_num.as_ref(), ) ) - .await - .map_err(|x| Mutex::new(CacheError::Error(Arc::new(Web3ProxyError::from(x)))))?; + .await?; - // TODO: i think response data should be Arc>>, but that's more work - let response_data: JsonRpcResponseEnum> = response_data.try_into() - .map_err(|x| Mutex::new(CacheError::Error(Arc::new(x))))?; - - // TODO: read max size from the config - if response_data.num_bytes() as u64 > self.config.response_cache_max_bytes / 1000 { - Err(Mutex::new(CacheError::NotCached(response_data))) - } else if matches!(response_data, JsonRpcResponseEnum::Result { .. }) || cache_errors { - Ok(response_data) + if !cache_jsonrpc_errors && let Err(err) = response_data { + // if we are not supposed to cache jsonrpc errors, + // then we must not convert Provider errors into a JsonRpcResponseEnum + // return all the errors now. moka will not cache Err results + Err(err) } else { - Err(Mutex::new(CacheError::NotCached(response_data))) - } - }).await; + let response_data: JsonRpcResponseEnum> = response_data.try_into()?; - match x { - Ok(x) => x, - Err(arc_err) => { - let locked = arc_err.lock(); - - match &*locked { - CacheError::Error(err) => return Err(Web3ProxyError::Arc(err.clone())), - CacheError::NotCached(x) => x.clone(), + // TODO: response data should maybe be Arc>>, but that's more work + Ok(response_data) } - } - } + }).await? } else { let x = timeout( duration, diff --git a/web3_proxy/src/errors.rs b/web3_proxy/src/errors.rs index e0be3c1a..96a074c3 100644 --- a/web3_proxy/src/errors.rs +++ b/web3_proxy/src/errors.rs @@ -214,7 +214,7 @@ impl Web3ProxyError { } Self::BadResponse(err) => { // TODO: think about this one more. ankr gives us this because ethers fails to parse responses without an id - debug!("BAD_RESPONSE: {}", err); + debug!("BAD_RESPONSE: {:?}", err); ( StatusCode::INTERNAL_SERVER_ERROR, JsonRpcErrorData { diff --git a/web3_proxy/src/jsonrpc.rs b/web3_proxy/src/jsonrpc.rs index fd6ef7f1..1d57d197 100644 --- a/web3_proxy/src/jsonrpc.rs +++ b/web3_proxy/src/jsonrpc.rs @@ -314,7 +314,10 @@ impl JsonRpcForwardedResponse { data = err.data.clone(); } else if let Some(err) = err.as_serde_error() { // this is not an rpc error. keep it as an error - return Err(Web3ProxyError::BadResponse(err.to_string().into())); + // TODO: ankr gives us "rate limited" but ethers fails to parse because it tries to require id even though its optional + return Err(Web3ProxyError::BadResponse( + format!("parse error: {}", err).into(), + )); } else { return Err(anyhow::anyhow!("unexpected ethers error! {:?}", err).into()); } diff --git a/web3_proxy/src/lib.rs b/web3_proxy/src/lib.rs index e345c7ed..4038dcc7 100644 --- a/web3_proxy/src/lib.rs +++ b/web3_proxy/src/lib.rs @@ -1,3 +1,4 @@ +#![feature(let_chains)] #![feature(trait_alias)] pub mod admin_queries; diff --git a/web3_proxy/src/response_cache.rs b/web3_proxy/src/response_cache.rs index 3e68267c..8bca377e 100644 --- a/web3_proxy/src/response_cache.rs +++ b/web3_proxy/src/response_cache.rs @@ -226,60 +226,65 @@ impl TryFrom for JsonRpcErrorData { } } -pub fn json_rpc_response_weigher(_key: &K, value: &JsonRpcResponseEnum) -> u32 { - value.num_bytes() +/// The inner u32 is the maximum weight per item +#[derive(Copy, Clone)] +pub struct JsonRpcResponseWeigher(pub u32); + +impl JsonRpcResponseWeigher { + pub fn weigh(&self, _key: &K, value: &JsonRpcResponseEnum) -> u32 { + let x = value.num_bytes(); + + if x > self.0 { + // return max. the item may start to be inserted into the cache, but it will be immediatly removed + u32::MAX + } else { + x + } + } } #[cfg(test)] mod tests { use super::JsonRpcResponseEnum; - use crate::response_cache::json_rpc_response_weigher; + use crate::response_cache::JsonRpcResponseWeigher; + use moka::future::{Cache, CacheBuilder, ConcurrentCacheExt}; use serde_json::value::RawValue; - use std::sync::Arc; + use std::{sync::Arc, time::Duration}; #[tokio::test(start_paused = true)] async fn test_json_rpc_query_weigher() { let max_item_weight = 200; let weight_capacity = 1_000; - // let test_cache: Cache>> = - // CacheBuilder::new(weight_capacity) - // .weigher(json_rpc_response_weigher) - // .time_to_live(Duration::from_secs(2)) - // .build(); + let weigher = JsonRpcResponseWeigher(max_item_weight); let small_data: JsonRpcResponseEnum> = JsonRpcResponseEnum::Result { value: Box::::default().into(), num_bytes: max_item_weight / 2, }; - assert_eq!( - json_rpc_response_weigher(&(), &small_data), - max_item_weight / 2 - ); + assert_eq!(weigher.weigh(&(), &small_data), max_item_weight / 2); let max_sized_data: JsonRpcResponseEnum> = JsonRpcResponseEnum::Result { value: Box::::default().into(), num_bytes: max_item_weight, }; - assert_eq!( - json_rpc_response_weigher(&(), &max_sized_data), - max_item_weight - ); + assert_eq!(weigher.weigh(&(), &max_sized_data), max_item_weight); let oversized_data: JsonRpcResponseEnum> = JsonRpcResponseEnum::Result { value: Box::::default().into(), num_bytes: max_item_weight * 2, }; - assert_eq!( - json_rpc_response_weigher(&(), &oversized_data), - max_item_weight * 2 - ); + assert_eq!(weigher.weigh(&(), &oversized_data), u32::MAX); + + let test_cache: Cache>> = + CacheBuilder::new(weight_capacity) + .weigher(move |k, v| weigher.weigh(k, v)) + .time_to_live(Duration::from_secs(2)) + .build(); - // TODO: helper for inserts that does size checking - /* test_cache.insert(0, small_data).await; test_cache.get(&0).unwrap(); @@ -289,12 +294,18 @@ mod tests { test_cache.get(&0).unwrap(); test_cache.get(&1).unwrap(); - // TODO: this will currently work! need to wrap moka cache in a checked insert test_cache.insert(2, oversized_data).await; test_cache.get(&0).unwrap(); test_cache.get(&1).unwrap(); + + // oversized data will be in the cache temporarily (it should just be an arc though, so that should be fine) + test_cache.get(&2).unwrap(); + + // sync should do necessary cleanup + test_cache.sync(); + + // now it should be empty assert!(test_cache.get(&2).is_none()); - */ } }