diff --git a/Cargo.lock b/Cargo.lock index 8753b543..9b4c1ba9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -68,6 +68,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2c99f64d1e06488f620f932677e24bc6e2897582980441ae90a671415bd7ec2f" dependencies = [ "cfg-if", + "getrandom", "once_cell", "version_check", ] @@ -792,6 +793,16 @@ dependencies = [ "cc", ] +[[package]] +name = "codespan-reporting" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3538270d33cc669650c4b093848450d380def10c331d38c768e34cac80576e6e" +dependencies = [ + "termcolor", + "unicode-width", +] + [[package]] name = "coins-bip32" version = "0.8.3" @@ -1194,6 +1205,50 @@ dependencies = [ "cipher 0.4.4", ] +[[package]] +name = "cxx" +version = "1.0.94" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f61f1b6389c3fe1c316bf8a4dccc90a38208354b330925bce1f74a6c4756eb93" +dependencies = [ + "cc", + "cxxbridge-flags", + "cxxbridge-macro", + "link-cplusplus", +] + +[[package]] +name = "cxx-build" +version = "1.0.94" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "12cee708e8962df2aeb38f594aae5d827c022b6460ac71a7a3e2c3c2aae5a07b" +dependencies = [ + "cc", + "codespan-reporting", + "once_cell", + "proc-macro2", + "quote", + "scratch", + "syn 2.0.15", +] + +[[package]] +name = "cxxbridge-flags" +version = "1.0.94" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7944172ae7e4068c533afbb984114a56c46e9ccddda550499caa222902c7f7bb" + +[[package]] +name = "cxxbridge-macro" +version = "1.0.94" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2345488264226bf682893e25de0769f3360aac9957980ec49361b083ddaa5bc5" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.15", +] + [[package]] name = "dashmap" version = "4.0.2" @@ -1998,6 +2053,19 @@ dependencies = [ "miniz_oxide 0.7.1", ] +[[package]] +name = "flume" +version = "0.10.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1657b4441c3403d9f7b3409e47575237dac27b1b5726df654a6ecbf92f0f7577" +dependencies = [ + "futures-core", + "futures-sink", + "nanorand", + "pin-project", + "spin 0.9.8", +] + [[package]] name = "fnv" version = "1.0.7" @@ -2309,9 +2377,9 @@ dependencies = [ [[package]] name = "h2" -version = "0.3.19" +version = "0.3.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d357c7ae988e7d2182f7d7871d0b963962420b0678b0997ce7de72001aeab782" +checksum = "17f8a914c2987b688368b5138aa05321db91f4090cf26118185672ad588bce21" dependencies = [ "bytes", "fnv", @@ -2645,11 +2713,12 @@ dependencies = [ [[package]] name = "iana-time-zone-haiku" -version = "0.1.2" +version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f" +checksum = "0703ae284fc167426161c2e3f1da3ea71d94b21bedbcc9494e92b28e334e3dca" dependencies = [ - "cc", + "cxx", + "cxx-build", ] [[package]] @@ -2908,16 +2977,6 @@ dependencies = [ "signature 2.1.0", ] -[[package]] -name = "kanal" -version = "0.1.0-pre8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b05d55519627edaf7fd0f29981f6dc03fb52df3f5b257130eb8d0bf2801ea1d7" -dependencies = [ - "futures-core", - "lock_api", -] - [[package]] name = "keccak" version = "0.1.4" @@ -2963,7 +3022,6 @@ name = "latency" version = "0.1.0" dependencies = [ "ewma", - "kanal", "log", "serde", "tokio", @@ -3012,6 +3070,15 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "link-cplusplus" +version = "1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ecd207c9c713c34f95a097a5b029ac2ce6010530c7b49d7fea24d977dede04f5" +dependencies = [ + "cc", +] + [[package]] name = "linux-raw-sys" version = "0.3.7" @@ -3208,6 +3275,15 @@ dependencies = [ "uuid 1.3.2", ] +[[package]] +name = "nanorand" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3" +dependencies = [ + "getrandom", +] + [[package]] name = "native-tls" version = "0.2.11" @@ -4138,6 +4214,17 @@ dependencies = [ "unicase", ] +[[package]] +name = "quick_cache" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5253a3a0d56548d5b0be25414171dc780cc6870727746d05bd2bde352eee96c5" +dependencies = [ + "ahash 0.8.3", + "hashbrown 0.13.2", + "parking_lot 0.12.1", +] + [[package]] name = "quote" version = "1.0.27" @@ -4202,7 +4289,7 @@ dependencies = [ name = "rate-counter" version = "0.1.0" dependencies = [ - "kanal", + "flume", "tokio", ] @@ -4248,9 +4335,9 @@ dependencies = [ [[package]] name = "rdkafka-sys" -version = "4.4.0+1.9.2" +version = "4.3.0+1.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87ac9d87c3aba1748e3112318459f2ac8bff80bfff7359e338e0463549590249" +checksum = "d222a401698c7f2010e3967353eae566d9934dcda49c29910da922414ab4e3f4" dependencies = [ "cmake", "libc", @@ -4702,6 +4789,12 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" +[[package]] +name = "scratch" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1792db035ce95be60c3f8853017b3999209281c24e2ba5bc8e59bf97a0c590c1" + [[package]] name = "scrypt" version = "0.10.0" @@ -4923,9 +5016,9 @@ dependencies = [ [[package]] name = "security-framework" -version = "2.9.0" +version = "2.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca2855b3715770894e67cbfa3df957790aa0c9edc3bf06efa1a84d77fa0839d1" +checksum = "a332be01508d814fed64bf28f798a146d73792121129962fdf335bb3c49a4254" dependencies = [ "bitflags", "core-foundation", @@ -4936,9 +5029,9 @@ dependencies = [ [[package]] name = "security-framework-sys" -version = "2.9.0" +version = "2.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f51d0c0d83bec45f16480d0ce0058397a69e48fcdc52d1dc8855fb68acbd31a7" +checksum = "31c9bb296072e961fcbd8853511dd39c2d8be2deb1e17c6860b1d30732b323b4" dependencies = [ "core-foundation-sys", "libc", @@ -6577,6 +6670,7 @@ dependencies = [ "ethers", "ewma", "fdlimit", + "flume", "fstrings", "futures", "gethostname", @@ -6591,7 +6685,6 @@ dependencies = [ "influxdb2-structmap", "ipnet", "itertools", - "kanal", "latency", "listenfd", "log", @@ -6606,6 +6699,7 @@ dependencies = [ "parking_lot 0.12.1", "prettytable", "proctitle", + "quick_cache", "rdkafka", "redis-rate-limiter", "regex", @@ -6616,6 +6710,7 @@ dependencies = [ "serde_json", "serde_prometheus", "siwe", + "strum", "thread-fast-rng", "time 0.3.21", "tokio", diff --git a/README.md b/README.md index 4461169e..5c78789f 100644 --- a/README.md +++ b/README.md @@ -60,13 +60,13 @@ Check that the websocket is working: ``` $ websocat ws://127.0.0.1:8544 -{"id": 1, "method": "eth_subscribe", "params": ["newHeads"]} +{"jsonrpc": "2.0", "id": 1, "method": "eth_subscribe", "params": ["newHeads"]} -{"id": 2, "method": "eth_subscribe", "params": ["newPendingTransactions"]} +{"jsonrpc": "2.0", "id": 2, "method": "eth_subscribe", "params": ["newPendingTransactions"]} -{"id": 3, "method": "eth_subscribe", "params": ["newPendingFullTransactions"]} +{"jsonrpc": "2.0", "id": 3, "method": "eth_subscribe", "params": ["newPendingFullTransactions"]} -{"id": 4, "method": "eth_subscribe", "params": ["newPendingRawTransactions"]} +{"jsonrpc": "2.0", "id": 4, "method": "eth_subscribe", "params": ["newPendingRawTransactions"]} ``` You can copy `config/example.toml` to `config/production-$CHAINNAME.toml` and then run `docker-compose up --build -d` start proxies for many chains. @@ -149,6 +149,8 @@ TODO: also enable debug symbols in the release build by modifying the root Cargo Test the proxy: + wrk -t12 -c400 -d30s --latency http://127.0.0.1:8544/health + wrk -t12 -c400 -d30s --latency http://127.0.0.1:8544/status wrk -s ./wrk/getBlockNumber.lua -t12 -c400 -d30s --latency http://127.0.0.1:8544/u/$API_KEY wrk -s ./wrk/getLatestBlockByNumber.lua -t12 -c400 -d30s --latency http://127.0.0.1:8544/u/$API_KEY diff --git a/TODO.md b/TODO.md index 690007b4..b405b251 100644 --- a/TODO.md +++ b/TODO.md @@ -189,7 +189,7 @@ These are roughly in order of completition - [x] graceful shutdown. stop taking new requests and don't stop until all outstanding queries are handled - https://github.com/tokio-rs/mini-redis/blob/master/src/shutdown.rs - we need this because we need to be sure all the queries are saved in the db. maybe put stuff in Drop - - need an kanal::watch on unflushed stats that we can subscribe to. wait for it to flip to true + - need an flume::watch on unflushed stats that we can subscribe to. wait for it to flip to true - [x] don't use unix timestamps for response_millis since leap seconds will confuse it - [x] config to allow origins even on the anonymous endpoints - [x] send logs to sentry diff --git a/latency/Cargo.toml b/latency/Cargo.toml index 8583e9ab..eb51eba9 100644 --- a/latency/Cargo.toml +++ b/latency/Cargo.toml @@ -7,7 +7,6 @@ edition = "2021" [dependencies] ewma = "0.1.1" -kanal = "0.1.0-pre8" log = "0.4.17" serde = { version = "1.0.163", features = [] } tokio = { version = "1.28.1", features = ["full"] } diff --git a/latency/src/peak_ewma/mod.rs b/latency/src/peak_ewma/mod.rs index 640318c9..e3b27cdb 100644 --- a/latency/src/peak_ewma/mod.rs +++ b/latency/src/peak_ewma/mod.rs @@ -2,8 +2,9 @@ mod rtt_estimate; use std::sync::Arc; -use kanal::SendError; -use log::{error, info, trace}; +use log::{error, info}; +use tokio::sync::mpsc; +use tokio::sync::mpsc::error::TrySendError; use tokio::task::JoinHandle; use tokio::time::{Duration, Instant}; @@ -19,7 +20,7 @@ pub struct PeakEwmaLatency { /// Join handle for the latency calculation task pub join_handle: JoinHandle<()>, /// Send to update with each request duration - request_tx: kanal::AsyncSender, + request_tx: mpsc::Sender, /// Latency average and last update time rtt_estimate: Arc, /// Decay time @@ -33,7 +34,7 @@ impl PeakEwmaLatency { /// average latency. pub fn spawn(decay_ns: f64, buf_size: usize, start_latency: Duration) -> Self { debug_assert!(decay_ns > 0.0, "decay_ns must be positive"); - let (request_tx, request_rx) = kanal::bounded_async(buf_size); + let (request_tx, request_rx) = mpsc::channel(buf_size); let rtt_estimate = Arc::new(AtomicRttEstimate::new(start_latency)); let task = PeakEwmaLatencyTask { request_rx, @@ -55,10 +56,10 @@ impl PeakEwmaLatency { let mut estimate = self.rtt_estimate.load(); let now = Instant::now(); - debug_assert!( + assert!( estimate.update_at <= now, - "update_at={:?} in the future", - estimate.update_at, + "update_at is {}ns in the future", + estimate.update_at.duration_since(now).as_nanos(), ); // Update the RTT estimate to account for decay since the last update. @@ -67,26 +68,20 @@ impl PeakEwmaLatency { /// Report latency from a single request /// - /// Should only be called from the Web3Rpc that owns it. + /// Should only be called with a duration from the Web3Rpc that owns it. pub fn report(&self, duration: Duration) { match self.request_tx.try_send(duration) { - Ok(true) => { - trace!("success"); - } - Ok(false) => { + Ok(()) => {} + Err(TrySendError::Full(_)) => { // We don't want to block if the channel is full, just // report the error error!("Latency report channel full"); // TODO: could we spawn a new tokio task to report tthis later? } - Err(SendError::Closed) => { + Err(TrySendError::Closed(_)) => { unreachable!("Owner should keep channel open"); } - Err(SendError::ReceiveClosed) => { - unreachable!("Receiver should keep channel open"); - } }; - //.expect("Owner should keep channel open"); } } @@ -94,7 +89,7 @@ impl PeakEwmaLatency { #[derive(Debug)] struct PeakEwmaLatencyTask { /// Receive new request timings for update - request_rx: kanal::AsyncReceiver, + request_rx: mpsc::Receiver, /// Current estimate and update time rtt_estimate: Arc, /// Last update time, used for decay calculation @@ -106,27 +101,25 @@ struct PeakEwmaLatencyTask { impl PeakEwmaLatencyTask { /// Run the loop for updating latency async fn run(mut self) { - while let Ok(rtt) = self.request_rx.recv().await { + while let Some(rtt) = self.request_rx.recv().await { self.update(rtt); } + info!("latency loop exited"); } /// Update the estimate object atomically. - fn update(&mut self, rtt: Duration) { + fn update(&self, rtt: Duration) { let rtt = nanos(rtt); let now = Instant::now(); - debug_assert!( + assert!( self.update_at <= now, - "update_at={:?} in the future", - self.update_at, + "update_at is {}ns in the future", + self.update_at.duration_since(now).as_nanos(), ); - let x = self - .rtt_estimate + self.rtt_estimate .fetch_update(|mut rtt_estimate| rtt_estimate.update(rtt, self.decay_ns, now)); - - info!("x: {:?}", x); } } diff --git a/migration/src/lib.rs b/migration/src/lib.rs index 182f5a1e..ae9adaf7 100644 --- a/migration/src/lib.rs +++ b/migration/src/lib.rs @@ -17,12 +17,12 @@ mod m20230119_204135_better_free_tier; mod m20230125_204810_stats_v2; mod m20230130_124740_read_only_login_logic; mod m20230130_165144_prepare_admin_imitation_pre_login; -mod m20230215_152254_admin_trail; -mod m20230307_002623_migrate_rpc_accounting_to_rpc_accounting_v2; mod m20230205_130035_create_balance; mod m20230205_133755_create_referrals; mod m20230214_134254_increase_balance_transactions; +mod m20230215_152254_admin_trail; mod m20230221_230953_track_spend; +mod m20230307_002623_migrate_rpc_accounting_to_rpc_accounting_v2; mod m20230412_171916_modify_secondary_user_add_primary_user; mod m20230422_172555_premium_downgrade_logic; mod m20230511_161214_remove_columns_statsv2_origin_and_method; diff --git a/rate-counter/Cargo.toml b/rate-counter/Cargo.toml index 6e25777a..7bf027e4 100644 --- a/rate-counter/Cargo.toml +++ b/rate-counter/Cargo.toml @@ -5,5 +5,5 @@ authors = ["Bryan Stitt "] edition = "2021" [dependencies] -kanal = "0.1.0-pre8" +flume = "0.10.14" tokio = { version = "1.28.1", features = ["time"] } diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index 4d4a926c..65a1cb87 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -43,6 +43,7 @@ env_logger = "0.10.0" ethers = { version = "2.0.4", default-features = false, features = ["rustls", "ws"] } ewma = "0.1.1" fdlimit = "0.2.1" +flume = "0.10.14" fstrings = "0.2" futures = { version = "0.3.28", features = ["thread-pool"] } gethostname = "0.4.2" @@ -57,7 +58,6 @@ influxdb2 = { git = "https://github.com/llamanodes/influxdb2", features = ["rust influxdb2-structmap = { git = "https://github.com/llamanodes/influxdb2/"} ipnet = "2.7.2" itertools = "0.10.5" -kanal = "0.1.0-pre8" listenfd = "1.0.1" log = "0.4.17" mimalloc = { version = "0.1.37", optional = true} @@ -70,6 +70,7 @@ pagerduty-rs = { version = "0.1.6", default-features = false, features = ["async parking_lot = { version = "0.12.1", features = ["arc_lock"] } prettytable = "*" proctitle = "0.1.1" +quick_cache = "0.3.0" rdkafka = { version = "0.29.0" } regex = "1.8.1" reqwest = { version = "0.11.17", default-features = false, features = ["json", "tokio-rustls"] } @@ -79,6 +80,7 @@ serde = { version = "1.0.163", features = [] } serde_json = { version = "1.0.96", default-features = false, features = ["alloc", "raw_value"] } serde_prometheus = "0.2.2" siwe = "0.5.0" +strum = { version = "0.24.1", features = ["derive"] } time = "0.3.21" tokio = { version = "1.28.1", features = ["full"] } tokio-console = { version = "*", optional = true } diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index ce1ac9ba..a8e8ad01 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -4,12 +4,16 @@ mod ws; use crate::block_number::{block_needed, BlockNeeded}; use crate::config::{AppConfig, TopConfig}; use crate::frontend::authorization::{ - Authorization, RequestMetadata, RequestOrMethod, RpcSecretKey, + Authorization, RequestMetadata, RequestOrMethod, ResponseOrBytes, RpcSecretKey, }; use crate::frontend::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult}; use crate::frontend::rpc_proxy_ws::ProxyMode; use crate::jsonrpc::{ - JsonRpcForwardedResponse, JsonRpcForwardedResponseEnum, JsonRpcRequest, JsonRpcRequestEnum, + JsonRpcErrorData, JsonRpcForwardedResponse, JsonRpcForwardedResponseEnum, JsonRpcRequest, + JsonRpcRequestEnum, +}; +use crate::response_cache::{ + JsonRpcQueryCache, JsonRpcQueryCacheKey, JsonRpcQueryWeigher, JsonRpcResponseData, }; use crate::rpcs::blockchain::Web3ProxyBlock; use crate::rpcs::consensus::ConsensusWeb3Rpcs; @@ -46,15 +50,14 @@ use redis_rate_limiter::redis::AsyncCommands; use redis_rate_limiter::{redis, DeadpoolRuntime, RedisConfig, RedisPool, RedisRateLimiter}; use serde::Serialize; use serde_json::json; -use serde_json::value::to_raw_value; +use std::borrow::Cow; use std::fmt; -use std::hash::{Hash, Hasher}; use std::net::IpAddr; use std::num::NonZeroU64; use std::str::FromStr; use std::sync::{atomic, Arc}; use std::time::Duration; -use tokio::sync::{broadcast, mpsc, watch, Semaphore}; +use tokio::sync::{broadcast, watch, Semaphore}; use tokio::task::JoinHandle; use tokio::time::{sleep, timeout}; use ulid::Ulid; @@ -71,88 +74,6 @@ pub static APP_USER_AGENT: &str = concat!( // aggregate across 1 week pub const BILLING_PERIOD_SECONDS: i64 = 60 * 60 * 24 * 7; -#[derive(Debug, From)] -struct ResponseCacheKey { - // if none, this is cached until evicted - from_block: Option, - // to_block is only set when ranges of blocks are requested (like with eth_getLogs) - to_block: Option, - method: String, - params: Option, - cache_errors: bool, -} - -impl ResponseCacheKey { - fn weight(&self) -> usize { - let mut w = self.method.len(); - - if let Some(ref p) = self.params { - w += p.to_string().len(); - } - - w - } -} - -impl PartialEq for ResponseCacheKey { - fn eq(&self, other: &Self) -> bool { - if self.cache_errors != other.cache_errors { - return false; - } - - match (self.from_block.as_ref(), other.from_block.as_ref()) { - (None, None) => {} - (None, Some(_)) => { - return false; - } - (Some(_), None) => { - return false; - } - (Some(s), Some(o)) => { - if s != o { - return false; - } - } - } - - match (self.to_block.as_ref(), other.to_block.as_ref()) { - (None, None) => {} - (None, Some(_)) => { - return false; - } - (Some(_), None) => { - return false; - } - (Some(s), Some(o)) => { - if s != o { - return false; - } - } - } - - if self.method != other.method { - return false; - } - - self.params == other.params - } -} - -impl Eq for ResponseCacheKey {} - -impl Hash for ResponseCacheKey { - fn hash(&self, state: &mut H) { - self.from_block.as_ref().map(|x| x.hash()).hash(state); - self.to_block.as_ref().map(|x| x.hash()).hash(state); - self.method.hash(state); - self.params.as_ref().map(|x| x.to_string()).hash(state); - self.cache_errors.hash(state) - } -} - -type ResponseCache = - Cache; - pub type AnyhowJoinHandle = JoinHandle>; /// TODO: move this @@ -224,7 +145,7 @@ pub struct Web3ProxyApp { /// TODO: include another type so that we can use private miner relays that do not use JSONRPC requests pub private_rpcs: Option>, /// track JSONRPC responses - response_cache: ResponseCache, + pub jsonrpc_query_cache: JsonRpcQueryCache, /// rpc clients that subscribe to newHeads use this channel /// don't drop this or the sender will stop working /// TODO: broadcast channel instead? @@ -265,7 +186,7 @@ pub struct Web3ProxyApp { Cache, hashbrown::hash_map::DefaultHashBuilder>, pub kafka_producer: Option, /// channel for sending stats in a background task - pub stat_sender: Option>, + pub stat_sender: Option>, } /// flatten a JoinError into an anyhow error @@ -695,23 +616,12 @@ impl Web3ProxyApp { // responses can be very different in sizes, so this is a cache with a max capacity and a weigher // TODO: don't allow any response to be bigger than X% of the cache - let response_cache = Cache::builder() - .max_capacity(top_config.app.response_cache_max_bytes) - .weigher(|k: &ResponseCacheKey, v| { - // TODO: is this good enough? - if let Ok(v) = serde_json::to_string(v) { - let weight = k.weight() + v.len(); - - // the or in unwrap_or is probably never called - weight.try_into().unwrap_or(u32::MAX) - } else { - // this seems impossible - u32::MAX - } - }) - // TODO: what should we set? 10 minutes is arbitrary. the nodes themselves hold onto transactions for much longer - .time_to_live(Duration::from_secs(600)) - .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()); + // TODO: we should emit stats to calculate a more accurate expected cache size + let response_cache = JsonRpcQueryCache::with_weighter( + (top_config.app.response_cache_max_bytes / 2048) as usize, + top_config.app.response_cache_max_bytes, + JsonRpcQueryWeigher, + ); // create semaphores for concurrent connection limits // TODO: what should tti be for semaphores? @@ -816,7 +726,7 @@ impl Web3ProxyApp { http_client, kafka_producer, private_rpcs, - response_cache, + jsonrpc_query_cache: response_cache, watch_consensus_head_receiver, pending_tx_sender, pending_transactions, @@ -1229,8 +1139,7 @@ impl Web3ProxyApp { authorization: &Arc, request: &JsonRpcRequest, request_metadata: Arc, - num_public_rpcs: Option, - ) -> Web3ProxyResult { + ) -> Web3ProxyResult { if let Some(protected_rpcs) = self.private_rpcs.as_ref() { if !protected_rpcs.is_empty() { let protected_response = protected_rpcs @@ -1250,6 +1159,17 @@ impl Web3ProxyApp { } } + let num_public_rpcs = match authorization.checks.proxy_mode { + // TODO: how many balanced rpcs should we send to? configurable? percentage of total? + ProxyMode::Best | ProxyMode::Debug => Some(4), + ProxyMode::Fastest(0) => None, + // TODO: how many balanced rpcs should we send to? configurable? percentage of total? + // TODO: what if we do 2 per tier? we want to blast the third party rpcs + // TODO: maybe having the third party rpcs in their own Web3Rpcs would be good for this + ProxyMode::Fastest(x) => Some(x * 4), + ProxyMode::Versus => None, + }; + // no private rpcs to send to. send to a few public rpcs // try_send_all_upstream_servers puts the request id into the response. no need to do that ourselves here. self.balanced_rpcs @@ -1277,6 +1197,11 @@ impl Web3ProxyApp { // TODO: move this code to another module so that its easy to turn this trace logging on in dev trace!("Received request: {:?}", request); + // save the id so we can attach it to the response + // TODO: we don't always need to clone this. if we come from the cache, we can just take from the request + // TODO: store on the request_metadata? + let response_id = request.id.clone(); + let request_metadata = RequestMetadata::new( self, authorization.clone(), @@ -1285,7 +1210,7 @@ impl Web3ProxyApp { ) .await; - let (status_code, response) = match self + let (status_code, response_data): (_, JsonRpcResponseData) = match self ._proxy_cached_request(authorization, request, head_block_num, &request_metadata) .await { @@ -1293,7 +1218,10 @@ impl Web3ProxyApp { Err(err) => err.into_response_parts(), }; - request_metadata.add_response(&response); + let response = JsonRpcForwardedResponse::from_response_data(response_data, response_id); + + // TODO: this serializes twice :/ + request_metadata.add_response(ResponseOrBytes::Response(&response)); // TODO: with parallel request sending, I think there could be a race on this let rpcs = request_metadata.backend_rpcs_used(); @@ -1308,14 +1236,12 @@ impl Web3ProxyApp { request: &mut JsonRpcRequest, head_block_num: Option, request_metadata: &Arc, - ) -> Web3ProxyResult { - // save the id so we can attach it to the response - let request_id = request.id.clone(); - // TODO: don't clone + ) -> Web3ProxyResult { + // TODO: don't clone? let request_method = request.method.clone(); // TODO: serve net_version without querying the backend - let response: JsonRpcForwardedResponse = match request_method.as_ref() { + let response_data: JsonRpcResponseData = match request_method.as_ref() { // lots of commands are blocked method @ ("db_getHex" | "db_getString" @@ -1324,6 +1250,9 @@ impl Web3ProxyApp { | "debug_accountRange" | "debug_backtraceAt" | "debug_blockProfile" + | "debug_bundler_clearState" + | "debug_bundler_dumpMempool" + | "debug_bundler_sendBundleNow" | "debug_chaindbCompact" | "debug_chaindbProperty" | "debug_cpuProfile" @@ -1337,8 +1266,8 @@ impl Web3ProxyApp { | "debug_setGCPercent" | "debug_setHead" | "debug_setMutexProfileFraction" - | "debug_standardTraceBlockToFile" | "debug_standardTraceBadBlockToFile" + | "debug_standardTraceBlockToFile" | "debug_startCPUProfile" | "debug_startGoTrace" | "debug_stopCPUProfile" @@ -1346,6 +1275,7 @@ impl Web3ProxyApp { | "debug_writeBlockProfile" | "debug_writeMemProfile" | "debug_writeMutexProfile" + | "erigon_cacheCheck" | "eth_compileLLL" | "eth_compileSerpent" | "eth_compileSolidity" @@ -1355,24 +1285,23 @@ impl Web3ProxyApp { | "eth_signTransaction" | "eth_submitHashrate" | "eth_submitWork" - | "erigon_cacheCheck" | "les_addBalance" | "les_setClientParams" | "les_setDefaultParams" + | "miner_setEtherbase" | "miner_setExtra" + | "miner_setGasLimit" | "miner_setGasPrice" | "miner_start" | "miner_stop" - | "miner_setEtherbase" - | "miner_setGasLimit" + | "personal_ecRecover" | "personal_importRawKey" | "personal_listAccounts" | "personal_lockAccount" | "personal_newAccount" - | "personal_unlockAccount" | "personal_sendTransaction" | "personal_sign" - | "personal_ecRecover" + | "personal_unlockAccount" | "shh_addToGroup" | "shh_getFilterChanges" | "shh_getMessages" @@ -1383,13 +1312,12 @@ impl Web3ProxyApp { | "shh_post" | "shh_uninstallFilter" | "shh_version") => { - // i don't think we will ever support these methods + // i don't think we will ever support these methods. maybe do Forbidden? // TODO: what error code? - JsonRpcForwardedResponse::from_string( - format!("method unsupported: {}", method), - None, - Some(request_id), - ) + JsonRpcErrorData::from(format!( + "the method {} does not exist/is not available", + method + )).into() } // TODO: implement these commands method @ ("eth_getFilterChanges" @@ -1401,21 +1329,11 @@ impl Web3ProxyApp { | "eth_uninstallFilter") => { // TODO: unsupported command stat. use the count to prioritize new features // TODO: what error code? - JsonRpcForwardedResponse::from_string( - format!("not yet implemented: {}", method), - None, - Some(request_id), - ) - } - method @ ("debug_bundler_sendBundleNow" - | "debug_bundler_clearState" - | "debug_bundler_dumpMempool") => { - JsonRpcForwardedResponse::from_string( - // TODO: we should probably have some escaping on this. but maybe serde will protect us enough - format!("method unsupported: {}", method), - None, - Some(request_id), - ) + JsonRpcErrorData::from(format!( + "the method {} is not yet implemented. contact us if you need this", + method + )) + .into() } _method @ ("eth_sendUserOperation" | "eth_estimateUserOperationGas" @@ -1435,18 +1353,14 @@ impl Web3ProxyApp { } None => { // TODO: stats even when we error! - // TODO: use Web3ProxyError? dedicated error for no 4337 bundlers - return Err(anyhow::anyhow!("no bundler_4337_rpcs available").into()); + // TODO: dedicated error for no 4337 bundlers + return Err(Web3ProxyError::NoServersSynced); } }, - "eth_accounts" => { - JsonRpcForwardedResponse::from_value(serde_json::Value::Array(vec![]), request_id) - } + "eth_accounts" => JsonRpcResponseData::from(serde_json::Value::Array(vec![])), "eth_blockNumber" => { match head_block_num.or(self.balanced_rpcs.head_block_num()) { - Some(head_block_num) => { - JsonRpcForwardedResponse::from_value(json!(head_block_num), request_id) - } + Some(head_block_num) => JsonRpcResponseData::from(json!(head_block_num)), None => { // TODO: what does geth do if this happens? // TODO: standard not synced error @@ -1454,19 +1368,16 @@ impl Web3ProxyApp { } } } - "eth_chainId" => JsonRpcForwardedResponse::from_value( - json!(U64::from(self.config.chain_id)), - request_id, - ), + "eth_chainId" => JsonRpcResponseData::from(json!(U64::from(self.config.chain_id))), // TODO: eth_callBundle (https://docs.flashbots.net/flashbots-auction/searchers/advanced/rpc-endpoint#eth_callbundle) // TODO: eth_cancelPrivateTransaction (https://docs.flashbots.net/flashbots-auction/searchers/advanced/rpc-endpoint#eth_cancelprivatetransaction, but maybe just reject) // TODO: eth_sendPrivateTransaction (https://docs.flashbots.net/flashbots-auction/searchers/advanced/rpc-endpoint#eth_sendprivatetransaction) "eth_coinbase" => { // no need for serving coinbase - JsonRpcForwardedResponse::from_value(json!(Address::zero()), request_id) + JsonRpcResponseData::from(json!(Address::zero())) } "eth_estimateGas" => { - let mut response = self + let response_data = self .balanced_rpcs .try_proxy_connection( authorization, @@ -1477,8 +1388,8 @@ impl Web3ProxyApp { ) .await?; - if let Some(gas_estimate) = response.result.take() { - let mut gas_estimate: U256 = serde_json::from_str(gas_estimate.get()) + if let JsonRpcResponseData::Result { value, .. } = response_data { + let mut gas_estimate: U256 = serde_json::from_str(value.get()) .or(Err(Web3ProxyError::GasEstimateNotU256))?; let gas_increase = if let Some(gas_increase_percent) = @@ -1495,14 +1406,14 @@ impl Web3ProxyApp { gas_estimate += gas_increase; - JsonRpcForwardedResponse::from_value(json!(gas_estimate), request_id) + JsonRpcResponseData::from(json!(gas_estimate)) } else { - response + response_data } } "eth_getTransactionReceipt" | "eth_getTransactionByHash" => { // try to get the transaction without specifying a min_block_height - let mut response = self + let mut response_data = self .balanced_rpcs .try_proxy_connection( authorization, @@ -1514,13 +1425,13 @@ impl Web3ProxyApp { .await?; // if we got "null", it is probably because the tx is old. retry on nodes with old block data - if let Some(ref result) = response.result { - if result.get() == "null" { + if let JsonRpcResponseData::Result { value, .. } = &response_data { + if value.get() == "null" { request_metadata .archive_request .store(true, atomic::Ordering::Release); - response = self + response_data = self .balanced_rpcs .try_proxy_connection( authorization, @@ -1533,44 +1444,39 @@ impl Web3ProxyApp { } } - response + response_data } // TODO: eth_gasPrice that does awesome magic to predict the future - "eth_hashrate" => JsonRpcForwardedResponse::from_value(json!(U64::zero()), request_id), - "eth_mining" => { - JsonRpcForwardedResponse::from_value(serde_json::Value::Bool(false), request_id) - } + "eth_hashrate" => JsonRpcResponseData::from(json!(U64::zero())), + "eth_mining" => JsonRpcResponseData::from(serde_json::Value::Bool(false)), // TODO: eth_sendBundle (flashbots/eden command) // broadcast transactions to all private rpcs at once "eth_sendRawTransaction" => { - let num_public_rpcs = match authorization.checks.proxy_mode { - // TODO: how many balanced rpcs should we send to? configurable? percentage of total? - ProxyMode::Best | ProxyMode::Debug => Some(4), - ProxyMode::Fastest(0) => None, - // TODO: how many balanced rpcs should we send to? configurable? percentage of total? - // TODO: what if we do 2 per tier? we want to blast the third party rpcs - // TODO: maybe having the third party rpcs in their own Web3Rpcs would be good for this - ProxyMode::Fastest(x) => Some(x * 4), - ProxyMode::Versus => None, - }; + // TODO: decode the transaction - let mut response = self + // TODO: error if the chain_id is incorrect + + // TODO: check the cache to see if we have sent this transaction recently + // TODO: if so, use a cached response. + // TODO: if not, + // TODO: - cache successes for up to 1 minute + // TODO: - cache failures for 1 block (we see transactions skipped because out of funds. but that can change block to block) + + let mut response_data = self .try_send_protected( authorization, request, request_metadata.clone(), - num_public_rpcs, ) .await?; // sometimes we get an error that the transaction is already known by our nodes, // that's not really an error. Return the hash like a successful response would. // TODO: move this to a helper function - if let Some(ref response_error) = response.error { - if response_error.code == -32000 - && (response_error.message == "ALREADY_EXISTS: already known" - || response_error.message - == "INTERNAL_ERROR: existing tx with same hash") + if let JsonRpcResponseData::Error { value, .. } = &response_data { + if value.code == -32000 + && (value.message == "ALREADY_EXISTS: already known" + || value.message == "INTERNAL_ERROR: existing tx with same hash") { let params = request .params @@ -1608,21 +1514,19 @@ impl Web3ProxyApp { trace!("tx_hash: {:#?}", tx_hash); - let tx_hash = to_raw_value(&tx_hash).unwrap(); - - response.error = None; - response.result = Some(tx_hash); + response_data = JsonRpcResponseData::from(tx_hash); } } } // emit transaction count stats + // TODO: use this cache to avoid sending duplicate transactions? if let Some(ref salt) = self.config.public_recent_ips_salt { - if let Some(ref tx_hash) = response.result { + if let JsonRpcResponseData::Result { value, .. } = &response_data { let now = Utc::now().timestamp(); let app = self.clone(); - let salted_tx_hash = format!("{}:{}", salt, tx_hash); + let salted_tx_hash = format!("{}:{}", salt, value.get()); let f = async move { match app.redis_conn().await { @@ -1653,35 +1557,38 @@ impl Web3ProxyApp { } } - response + response_data } "eth_syncing" => { // no stats on this. its cheap // TODO: return a real response if all backends are syncing or if no servers in sync - JsonRpcForwardedResponse::from_value(serde_json::Value::Bool(false), request_id) + // TODO: const + JsonRpcResponseData::from(serde_json::Value::Bool(false)) } - "eth_subscribe" => JsonRpcForwardedResponse::from_str( - "notifications not supported. eth_subscribe is only available over a websocket", - Some(-32601), - Some(request_id), - ), - "eth_unsubscribe" => JsonRpcForwardedResponse::from_str( - "notifications not supported. eth_unsubscribe is only available over a websocket", - Some(-32601), - Some(request_id), - ), + "eth_subscribe" => JsonRpcErrorData { + message: Cow::Borrowed( + "notifications not supported. eth_subscribe is only available over a websocket", + ), + code: -32601, + data: None, + } + .into(), + "eth_unsubscribe" => JsonRpcErrorData { + message: Cow::Borrowed("notifications not supported. eth_unsubscribe is only available over a websocket"), + code: -32601, + data: None, + }.into(), "net_listening" => { // TODO: only true if there are some backends on balanced_rpcs? - JsonRpcForwardedResponse::from_value(serde_json::Value::Bool(true), request_id) + // TODO: const + JsonRpcResponseData::from(serde_json::Value::Bool(true)) } - "net_peerCount" => JsonRpcForwardedResponse::from_value( - json!(U64::from(self.balanced_rpcs.num_synced_rpcs())), - request_id, - ), - "web3_clientVersion" => JsonRpcForwardedResponse::from_value( - serde_json::Value::String(APP_USER_AGENT.to_string()), - request_id, - ), + "net_peerCount" => + JsonRpcResponseData::from(json!(U64::from(self.balanced_rpcs.num_synced_rpcs()))) + , + "web3_clientVersion" => + JsonRpcResponseData::from(serde_json::Value::String(APP_USER_AGENT.to_string())) + , "web3_sha3" => { // returns Keccak-256 (not the standardized SHA3-256) of the given data. match &request.params { @@ -1692,11 +1599,11 @@ impl Web3ProxyApp { { // TODO: what error code? // TODO: use Web3ProxyError::BadRequest - JsonRpcForwardedResponse::from_str( - "Invalid request", - Some(-32600), - Some(request_id), - ) + JsonRpcErrorData { + message: Cow::Borrowed("Invalid request"), + code: -32600, + data: None + }.into() } else { // TODO: BadRequest instead of web3_context let param = Bytes::from_str( @@ -1714,25 +1621,25 @@ impl Web3ProxyApp { let hash = H256::from(keccak256(param)); - JsonRpcForwardedResponse::from_value(json!(hash), request_id) + JsonRpcResponseData::from(json!(hash)) } } _ => { // TODO: this needs the correct error code in the response // TODO: Web3ProxyError::BadRequest instead? - JsonRpcForwardedResponse::from_str( - "invalid request", - Some(StatusCode::BAD_REQUEST.as_u16().into()), - Some(request_id), - ) + JsonRpcErrorData { + message: Cow::Borrowed("invalid request"), + code: StatusCode::BAD_REQUEST.as_u16().into(), + data: None, + }.into() } } } - "test" => JsonRpcForwardedResponse::from_str( - "The method test does not exist/is not available.", - Some(-32601), - Some(request_id), - ), + "test" => JsonRpcErrorData { + message: Cow::Borrowed("The method test does not exist/is not available."), + code: -32601, + data: None, + }.into(), // anything else gets sent to backend rpcs and cached method => { if method.starts_with("admin_") { @@ -1746,12 +1653,12 @@ impl Web3ProxyApp { .ok_or(Web3ProxyError::NoServersSynced)?; // TODO: don't clone. this happens way too much. maybe &mut? - let mut request = request.clone(); + // let mut request = request.clone(); // we do this check before checking caches because it might modify the request params // TODO: add a stat for archive vs full since they should probably cost different // TODO: this cache key can be rather large. is that okay? - let cache_key: Option = match block_needed( + let cache_key: Option = match block_needed( authorization, method, request.params.as_mut(), @@ -1760,7 +1667,7 @@ impl Web3ProxyApp { ) .await? { - BlockNeeded::CacheSuccessForever => Some(ResponseCacheKey { + BlockNeeded::CacheSuccessForever => Some(JsonRpcQueryCacheKey { from_block: None, to_block: None, method: method.to_string(), @@ -1786,9 +1693,10 @@ impl Web3ProxyApp { let request_block = self .balanced_rpcs .block(authorization, &request_block_hash, None) - .await?; + .await? + .block; - Some(ResponseCacheKey { + Some(JsonRpcQueryCacheKey { from_block: Some(request_block), to_block: None, method: method.to_string(), @@ -1816,7 +1724,8 @@ impl Web3ProxyApp { let from_block = self .balanced_rpcs .block(authorization, &from_block_hash, None) - .await?; + .await? + .block; let (to_block_hash, _) = self .balanced_rpcs @@ -1826,75 +1735,62 @@ impl Web3ProxyApp { let to_block = self .balanced_rpcs .block(authorization, &to_block_hash, None) - .await?; + .await? + .block; - Some(ResponseCacheKey { + Some(JsonRpcQueryCacheKey { from_block: Some(from_block), to_block: Some(to_block), method: method.to_string(), - // TODO: hash here? params: request.params.clone(), cache_errors, }) } }; - trace!("cache_key: {:#?}", cache_key); - let mut response = { - let request_metadata = request_metadata.clone(); + let authorization = authorization.clone(); - let authorization = authorization.clone(); + if let Some(cache_key) = cache_key { + let from_block_num = cache_key.from_block.as_ref().map(|x| x.number.unwrap()); + let to_block_num = cache_key.to_block.as_ref().map(|x| x.number.unwrap()); - if let Some(cache_key) = cache_key { - let from_block_num = cache_key.from_block.as_ref().map(|x| *x.number()); - let to_block_num = cache_key.to_block.as_ref().map(|x| *x.number()); + match self + .jsonrpc_query_cache + .get_value_or_guard_async(&cache_key).await + { + Ok(x) => x, + Err(x) => { + let response_data = self.balanced_rpcs + .try_proxy_connection( + &authorization, + request, + Some(request_metadata), + from_block_num.as_ref(), + to_block_num.as_ref(), + ) + .await?; - self.response_cache - .try_get_with(cache_key, async move { - // TODO: put the hash here instead of the block number? its in the request already. - let mut response = self - .balanced_rpcs - .try_proxy_connection( - &authorization, - &request, - Some(&request_metadata), - from_block_num.as_ref(), - to_block_num.as_ref(), - ) - .await?; + // TODO: convert the Box to an Arc + x.insert(response_data.clone()); - // discard their id by replacing it with an empty - response.id = Default::default(); - - // TODO: only cache the inner response - // TODO: how are we going to stream this? - // TODO: check response size. if its very large, return it in a custom Error type that bypasses caching? or will moka do that for us? - Ok::<_, Web3ProxyError>(response) - }) - // TODO: add context (error while caching and forwarding response {}) - .await? - } else { - self.balanced_rpcs - .try_proxy_connection( - &authorization, - &request, - Some(&request_metadata), - None, - None, - ) - .await? + response_data + } } - }; - - // since this data likely came out of a cache, the response.id is not going to match the request.id - // replace the id with our request's id. - response.id = request_id; - - response + } else { + self.balanced_rpcs + .try_proxy_connection( + &authorization, + request, + Some(request_metadata), + None, + None, + ) + .await? + } } }; - Ok(response) + Ok(response_data) } } diff --git a/web3_proxy/src/app/ws.rs b/web3_proxy/src/app/ws.rs index 6e8e61f1..07ba9b59 100644 --- a/web3_proxy/src/app/ws.rs +++ b/web3_proxy/src/app/ws.rs @@ -5,9 +5,10 @@ use crate::frontend::authorization::{Authorization, RequestMetadata, RequestOrMe use crate::frontend::errors::{Web3ProxyError, Web3ProxyResult}; use crate::jsonrpc::JsonRpcForwardedResponse; use crate::jsonrpc::JsonRpcRequest; +use crate::response_cache::JsonRpcResponseData; use crate::rpcs::transactions::TxStatus; use axum::extract::ws::Message; -use ethers::prelude::U64; +use ethers::types::U64; use futures::future::AbortHandle; use futures::future::Abortable; use futures::stream::StreamExt; @@ -24,7 +25,7 @@ impl Web3ProxyApp { jsonrpc_request: JsonRpcRequest, subscription_count: &'a AtomicUsize, // TODO: taking a sender for Message instead of the exact json we are planning to send feels wrong, but its easier for now - response_sender: kanal::AsyncSender, + response_sender: flume::Sender, ) -> Web3ProxyResult<(AbortHandle, JsonRpcForwardedResponse)> { let request_metadata = RequestMetadata::new( self, @@ -39,7 +40,7 @@ impl Web3ProxyApp { // TODO: this only needs to be unique per connection. we don't need it globably unique // TODO: have a max number of subscriptions per key/ip. have a global max number of subscriptions? how should this be calculated? let subscription_id = subscription_count.fetch_add(1, atomic::Ordering::SeqCst); - let subscription_id = U64::from(subscription_id); + let subscription_id = U64::from(subscription_id as u64); // save the id so we can use it in the response let id = jsonrpc_request.id.clone(); @@ -94,7 +95,7 @@ impl Web3ProxyApp { // TODO: can we check a content type header? let response_msg = Message::Text(response_str); - if response_sender.send(response_msg).await.is_err() { + if response_sender.send_async(response_msg).await.is_err() { // TODO: increment error_response? i don't think so. i think this will happen once every time a client disconnects. // TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle? break; @@ -158,7 +159,7 @@ impl Web3ProxyApp { // TODO: do clients support binary messages? let response_msg = Message::Text(response_str); - if response_sender.send(response_msg).await.is_err() { + if response_sender.send_async(response_msg).await.is_err() { // TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle? break; }; @@ -221,7 +222,7 @@ impl Web3ProxyApp { // TODO: do clients support binary messages? let response_msg = Message::Text(response_str); - if response_sender.send(response_msg).await.is_err() { + if response_sender.send_async(response_msg).await.is_err() { // TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle? break; }; @@ -285,7 +286,7 @@ impl Web3ProxyApp { // TODO: do clients support binary messages? let response_msg = Message::Text(response_str); - if response_sender.send(response_msg).await.is_err() { + if response_sender.send_async(response_msg).await.is_err() { // TODO: cancel this subscription earlier? select on head_block_receiver.next() and an abort handle? break; }; @@ -304,8 +305,11 @@ impl Web3ProxyApp { // TODO: do something with subscription_join_handle? - let response = JsonRpcForwardedResponse::from_value(json!(subscription_id), id); + let response_data = JsonRpcResponseData::from(json!(subscription_id)); + let response = JsonRpcForwardedResponse::from_response_data(response_data, id); + + // TODO: this serializes twice request_metadata.add_response(&response); // TODO: make a `SubscriptonHandle(AbortHandle, JoinHandle)` struct? diff --git a/web3_proxy/src/bin/web3_proxy_cli/sentryd/mod.rs b/web3_proxy/src/bin/web3_proxy_cli/sentryd/mod.rs index a1d9f50d..5e0af642 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/sentryd/mod.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/sentryd/mod.rs @@ -11,6 +11,7 @@ use log::{error, info}; use pagerduty_rs::{eventsv2async::EventsV2 as PagerdutyAsyncEventsV2, types::Event}; use serde_json::json; use std::time::Duration; +use tokio::sync::mpsc; use tokio::time::{interval, MissedTickBehavior}; use web3_proxy::{config::TopConfig, pagerduty::pagerduty_alert}; @@ -115,7 +116,7 @@ impl SentrydSubCommand { let mut handles = FuturesUnordered::new(); // channels and a task for sending errors to logs/pagerduty - let (error_sender, error_receiver) = kanal::bounded_async::(10); + let (error_sender, mut error_receiver) = mpsc::channel::(10); { let error_handler_f = async move { @@ -123,7 +124,7 @@ impl SentrydSubCommand { info!("set PAGERDUTY_INTEGRATION_KEY to send create alerts for errors"); } - while let Ok(err) = error_receiver.recv().await { + while let Some(err) = error_receiver.recv().await { log::log!(err.level, "check failed: {:#?}", err); if matches!(err.level, log::Level::Error) { @@ -257,7 +258,7 @@ async fn a_loop( class: &str, seconds: u64, error_level: log::Level, - error_sender: kanal::AsyncSender, + error_sender: mpsc::Sender, f: impl Fn(SentrydErrorBuilder) -> T, ) -> anyhow::Result<()> where diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index 51fd4099..05a947d5 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -285,8 +285,8 @@ impl Web3RpcConfig { http_client: Option, http_interval_sender: Option>>, blocks_by_hash_cache: BlocksByHashCache, - block_sender: Option>, - tx_id_sender: Option>, + block_sender: Option>, + tx_id_sender: Option>, reconnect: bool, ) -> anyhow::Result<(Arc, AnyhowJoinHandle<()>)> { if !self.extra.is_empty() { diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index 6ba772f0..34a1b398 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -33,7 +33,7 @@ use std::mem; use std::sync::atomic::{self, AtomicBool, AtomicI64, AtomicU64, AtomicUsize}; use std::time::Duration; use std::{net::IpAddr, str::FromStr, sync::Arc}; -use tokio::sync::{mpsc, OwnedSemaphorePermit, Semaphore}; +use tokio::sync::{OwnedSemaphorePermit, Semaphore}; use tokio::task::JoinHandle; use tokio::time::Instant; use ulid::Ulid; @@ -266,8 +266,8 @@ pub struct RequestMetadata { /// TODO: maybe this shouldn't be determined by ProxyMode. A request param should probably enable this pub kafka_debug_logger: Option>, - /// Cancel-safe channel to send stats to - pub stat_sender: Option>, + /// Cancel-safe channel for sending stats to the buffer + pub stat_sender: Option>, } impl Default for RequestMetadata { @@ -355,7 +355,9 @@ impl ResponseOrBytes<'_> { Self::Json(x) => serde_json::to_string(x) .expect("this should always serialize") .len(), - Self::Response(x) => x.num_bytes(), + Self::Response(x) => serde_json::to_string(x) + .expect("this should always serialize") + .len(), Self::Bytes(num_bytes) => *num_bytes, } } diff --git a/web3_proxy/src/frontend/errors.rs b/web3_proxy/src/frontend/errors.rs index 1ae29e23..e16b674e 100644 --- a/web3_proxy/src/frontend/errors.rs +++ b/web3_proxy/src/frontend/errors.rs @@ -1,10 +1,12 @@ //! Utlities for logging errors for admins and displaying errors to users. use super::authorization::Authorization; -use crate::jsonrpc::JsonRpcForwardedResponse; +use crate::jsonrpc::{JsonRpcErrorData, JsonRpcForwardedResponse}; +use crate::response_cache::JsonRpcResponseData; -use std::net::IpAddr; +use std::error::Error; use std::sync::Arc; +use std::{borrow::Cow, net::IpAddr}; use axum::{ headers, @@ -51,6 +53,7 @@ pub enum Web3ProxyError { EthersHttpClientError(ethers::prelude::HttpClientError), EthersProviderError(ethers::prelude::ProviderError), EthersWsClientError(ethers::prelude::WsClientError), + FlumeRecvError(flume::RecvError), GasEstimateNotU256, Headers(headers::Error), HeaderToString(ToStrError), @@ -64,7 +67,7 @@ pub enum Web3ProxyError { InvalidHeaderValue(InvalidHeaderValue), InvalidEip, InvalidInviteCode, - InvalidReferralCode, + UnknownReferralCode, InvalidReferer, InvalidSignatureLength, InvalidUserAgent, @@ -76,11 +79,6 @@ pub enum Web3ProxyError { JoinError(JoinError), #[display(fmt = "{:?}", _0)] #[error(ignore)] - JsonRpcForwardedError(JsonRpcForwardedResponse), - KanalReceiveError(kanal::ReceiveError), - KanalSendError(kanal::SendError), - #[display(fmt = "{:?}", _0)] - #[error(ignore)] MsgPackEncode(rmp_serde::encode::Error), NoBlockNumberOrHash, NoBlocksKnown, @@ -113,6 +111,7 @@ pub enum Web3ProxyError { #[from(ignore)] RefererNotAllowed(headers::Referer), SemaphoreAcquireError(AcquireError), + SendAppStatError(flume::SendError), SerdeJson(serde_json::Error), /// simple way to return an error message to the user and an anyhow to our logs #[display(fmt = "{}, {}, {:?}", _0, _1, _2)] @@ -139,42 +138,41 @@ pub enum Web3ProxyError { } impl Web3ProxyError { - pub fn into_response_parts(self) -> (StatusCode, JsonRpcForwardedResponse) { - match self { + pub fn into_response_parts(self) -> (StatusCode, JsonRpcResponseData) { + let (code, err): (StatusCode, JsonRpcErrorData) = match self { Self::AccessDenied => { // TODO: attach something to this trace. probably don't include much in the message though. don't want to leak creds by accident trace!("access denied"); ( StatusCode::FORBIDDEN, - JsonRpcForwardedResponse::from_string( - // TODO: is it safe to expose all of our anyhow strings? - "FORBIDDEN".to_string(), - Some(StatusCode::FORBIDDEN.as_u16().into()), - None, - ), + JsonRpcErrorData { + message: Cow::Borrowed("FORBIDDEN"), + code: StatusCode::FORBIDDEN.as_u16().into(), + data: None, + }, ) } Self::Anyhow(err) => { warn!("anyhow. err={:?}", err); ( StatusCode::INTERNAL_SERVER_ERROR, - JsonRpcForwardedResponse::from_string( + JsonRpcErrorData { // TODO: is it safe to expose all of our anyhow strings? - err.to_string(), - Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), - None, - ), + message: Cow::Owned(err.to_string()), + code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), + data: None, + }, ) } Self::BadRequest(err) => { debug!("BAD_REQUEST: {}", err); ( StatusCode::BAD_REQUEST, - JsonRpcForwardedResponse::from_str( - &format!("bad request: {}", err), - Some(StatusCode::BAD_REQUEST.as_u16().into()), - None, - ), + JsonRpcErrorData { + message: Cow::Owned(format!("bad request: {}", err)), + code: StatusCode::BAD_REQUEST.as_u16().into(), + data: None, + }, ) } Self::BadResponse(err) => { @@ -182,33 +180,33 @@ impl Web3ProxyError { debug!("BAD_RESPONSE: {}", err); ( StatusCode::INTERNAL_SERVER_ERROR, - JsonRpcForwardedResponse::from_str( - &format!("bad response: {}", err), - Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), - None, - ), + JsonRpcErrorData { + message: Cow::Owned(format!("bad response: {}", err)), + code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), + data: None, + }, ) } Self::BadRouting => { error!("BadRouting"); ( StatusCode::INTERNAL_SERVER_ERROR, - JsonRpcForwardedResponse::from_str( - "bad routing", - Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), - None, - ), + JsonRpcErrorData { + message: Cow::Borrowed("bad routing"), + code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), + data: None, + }, ) } Self::Database(err) => { error!("database err={:?}", err); ( StatusCode::INTERNAL_SERVER_ERROR, - JsonRpcForwardedResponse::from_str( - "database error!", - Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), - None, - ), + JsonRpcErrorData { + message: Cow::Borrowed("database error!"), + code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), + data: None, + }, ) } Self::EipVerificationFailed(err_1, err_191) => { @@ -218,81 +216,81 @@ impl Web3ProxyError { ); ( StatusCode::UNAUTHORIZED, - JsonRpcForwardedResponse::from_string( - format!( + JsonRpcErrorData { + message: Cow::Owned(format!( "both the primary and eip191 verification failed: {:#?}; {:#?}", err_1, err_191 - ), - Some(StatusCode::UNAUTHORIZED.as_u16().into()), - None, - ), + )), + code: StatusCode::UNAUTHORIZED.as_u16().into(), + data: None, + }, ) } Self::EthersHttpClientError(err) => { warn!("EthersHttpClientError err={:?}", err); ( StatusCode::INTERNAL_SERVER_ERROR, - JsonRpcForwardedResponse::from_str( - "ether http client error", - Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), - None, - ), + JsonRpcErrorData { + message: Cow::Borrowed("ether http client error"), + code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), + data: None, + }, ) } Self::EthersProviderError(err) => { warn!("EthersProviderError err={:?}", err); ( StatusCode::INTERNAL_SERVER_ERROR, - JsonRpcForwardedResponse::from_str( - "ether provider error", - Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), - None, - ), + JsonRpcErrorData { + message: Cow::Borrowed("ether provider error"), + code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), + data: None, + }, ) } Self::EthersWsClientError(err) => { warn!("EthersWsClientError err={:?}", err); ( StatusCode::INTERNAL_SERVER_ERROR, - JsonRpcForwardedResponse::from_str( - "ether ws client error", - Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), - None, - ), + JsonRpcErrorData { + message: Cow::Borrowed("ether ws client error"), + code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), + data: None, + }, ) } - Self::KanalReceiveError(err) => { - warn!("KanalRecvError err={:#?}", err); + Self::FlumeRecvError(err) => { + warn!("FlumeRecvError err={:#?}", err); ( StatusCode::INTERNAL_SERVER_ERROR, - JsonRpcForwardedResponse::from_str( - "flume recv error!", - Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), - None, - ), + JsonRpcErrorData { + message: Cow::Borrowed("flume recv error!"), + code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), + data: None, + }, ) } - Self::JsonRpcForwardedError(x) => (StatusCode::OK, x), + // Self::JsonRpcForwardedError(x) => (StatusCode::OK, x), Self::GasEstimateNotU256 => { warn!("GasEstimateNotU256"); ( StatusCode::INTERNAL_SERVER_ERROR, - JsonRpcForwardedResponse::from_str( - "gas estimate result is not an U256", - Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), - None, - ), + JsonRpcErrorData { + message: Cow::Borrowed("gas estimate result is not an U256"), + code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), + data: None, + }, ) } Self::Headers(err) => { warn!("HeadersError {:?}", err); ( StatusCode::BAD_REQUEST, - JsonRpcForwardedResponse::from_str( - &format!("{}", err), - Some(StatusCode::BAD_REQUEST.as_u16().into()), - None, - ), + JsonRpcErrorData { + message: Cow::Owned(format!("{}", err)), + code: StatusCode::BAD_REQUEST.as_u16().into(), + data: None, + }, ) } Self::InfluxDb2RequestError(err) => { @@ -300,135 +298,135 @@ impl Web3ProxyError { error!("influxdb2 err={:?}", err); ( StatusCode::INTERNAL_SERVER_ERROR, - JsonRpcForwardedResponse::from_str( - "influxdb2 error!", - Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), - None, - ), + JsonRpcErrorData { + message: Cow::Borrowed("influxdb2 error!"), + code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), + data: None, + }, ) } Self::InvalidBlockBounds { min, max } => { debug!("InvalidBlockBounds min={} max={}", min, max); ( StatusCode::BAD_REQUEST, - JsonRpcForwardedResponse::from_string( - format!( + JsonRpcErrorData { + message: Cow::Owned(format!( "Invalid blocks bounds requested. min ({}) > max ({})", min, max - ), - Some(StatusCode::BAD_REQUEST.as_u16().into()), - None, - ), + )), + code: StatusCode::BAD_REQUEST.as_u16().into(), + data: None, + }, ) } Self::IpAddrParse(err) => { - warn!("IpAddrParse err={:?}", err); + debug!("IpAddrParse err={:?}", err); ( StatusCode::BAD_REQUEST, - JsonRpcForwardedResponse::from_str( - &format!("{}", err), - Some(StatusCode::BAD_REQUEST.as_u16().into()), - None, - ), + JsonRpcErrorData { + message: Cow::Owned(err.to_string()), + code: StatusCode::BAD_REQUEST.as_u16().into(), + data: None, + }, ) } Self::IpNotAllowed(ip) => { - warn!("IpNotAllowed ip={})", ip); + debug!("IpNotAllowed ip={})", ip); ( StatusCode::FORBIDDEN, - JsonRpcForwardedResponse::from_string( - format!("IP ({}) is not allowed!", ip), - Some(StatusCode::FORBIDDEN.as_u16().into()), - None, - ), + JsonRpcErrorData { + message: Cow::Owned(format!("IP ({}) is not allowed!", ip)), + code: StatusCode::FORBIDDEN.as_u16().into(), + data: None, + }, ) } Self::InvalidHeaderValue(err) => { - warn!("InvalidHeaderValue err={:?}", err); + debug!("InvalidHeaderValue err={:?}", err); ( StatusCode::BAD_REQUEST, - JsonRpcForwardedResponse::from_str( - &format!("{}", err), - Some(StatusCode::BAD_REQUEST.as_u16().into()), - None, - ), + JsonRpcErrorData { + message: Cow::Owned(format!("{}", err)), + code: StatusCode::BAD_REQUEST.as_u16().into(), + data: None, + }, ) } Self::InvalidEip => { - warn!("InvalidEip"); + debug!("InvalidEip"); ( - StatusCode::UNAUTHORIZED, - JsonRpcForwardedResponse::from_str( - "invalid message eip given", - Some(StatusCode::UNAUTHORIZED.as_u16().into()), - None, - ), + StatusCode::BAD_REQUEST, + JsonRpcErrorData { + message: Cow::Borrowed("invalid message eip given"), + code: StatusCode::BAD_REQUEST.as_u16().into(), + data: None, + }, ) } Self::InvalidInviteCode => { - warn!("InvalidInviteCode"); + debug!("InvalidInviteCode"); ( StatusCode::UNAUTHORIZED, - JsonRpcForwardedResponse::from_str( - "invalid invite code", - Some(StatusCode::UNAUTHORIZED.as_u16().into()), - None, - ), + JsonRpcErrorData { + message: Cow::Borrowed("invalid invite code"), + code: StatusCode::UNAUTHORIZED.as_u16().into(), + data: None, + }, ) } - Self::InvalidReferralCode => { - warn!("InvalidReferralCode"); + Self::UnknownReferralCode => { + debug!("UnknownReferralCode"); ( StatusCode::UNAUTHORIZED, - JsonRpcForwardedResponse::from_str( - "invalid referral code", - Some(StatusCode::UNAUTHORIZED.as_u16().into()), - None, - ), + JsonRpcErrorData { + message: Cow::Borrowed("invalid referral code"), + code: StatusCode::UNAUTHORIZED.as_u16().into(), + data: None, + }, ) } Self::InvalidReferer => { - warn!("InvalidReferer"); + debug!("InvalidReferer"); ( StatusCode::BAD_REQUEST, - JsonRpcForwardedResponse::from_str( - "invalid referer!", - Some(StatusCode::BAD_REQUEST.as_u16().into()), - None, - ), + JsonRpcErrorData { + message: Cow::Borrowed("invalid referer!"), + code: StatusCode::BAD_REQUEST.as_u16().into(), + data: None, + }, ) } Self::InvalidSignatureLength => { - warn!("InvalidSignatureLength"); + debug!("InvalidSignatureLength"); ( StatusCode::BAD_REQUEST, - JsonRpcForwardedResponse::from_str( - "invalid signature length", - Some(StatusCode::BAD_REQUEST.as_u16().into()), - None, - ), + JsonRpcErrorData { + message: Cow::Borrowed("invalid signature length"), + code: StatusCode::BAD_REQUEST.as_u16().into(), + data: None, + }, ) } Self::InvalidUserAgent => { - warn!("InvalidUserAgent"); + debug!("InvalidUserAgent"); ( - StatusCode::BAD_REQUEST, - JsonRpcForwardedResponse::from_str( - "invalid user agent!", - Some(StatusCode::BAD_REQUEST.as_u16().into()), - None, - ), + StatusCode::FORBIDDEN, + JsonRpcErrorData { + message: Cow::Borrowed("invalid user agent!"), + code: StatusCode::FORBIDDEN.as_u16().into(), + data: None, + }, ) } Self::InvalidUserKey => { warn!("InvalidUserKey"); ( StatusCode::BAD_REQUEST, - JsonRpcForwardedResponse::from_str( - "UserKey was not a ULID or UUID", - Some(StatusCode::BAD_REQUEST.as_u16().into()), - None, - ), + JsonRpcErrorData { + message: Cow::Borrowed("UserKey was not a ULID or UUID"), + code: StatusCode::BAD_REQUEST.as_u16().into(), + data: None, + }, ) } Self::JoinError(err) => { @@ -442,78 +440,78 @@ impl Web3ProxyError { ( code, - JsonRpcForwardedResponse::from_str( - // TODO: different messages, too? - "Unable to complete request", - Some(code.as_u16().into()), - None, - ), + JsonRpcErrorData { + // TODO: different messages of cancelled or not? + message: Cow::Borrowed("Unable to complete request"), + code: code.as_u16().into(), + data: None, + }, ) } Self::MsgPackEncode(err) => { warn!("MsgPackEncode Error: {}", err); ( StatusCode::INTERNAL_SERVER_ERROR, - JsonRpcForwardedResponse::from_str( - &format!("msgpack encode error: {}", err), - Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), - None, - ), + JsonRpcErrorData { + message: Cow::Owned(format!("msgpack encode error: {}", err)), + code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), + data: None, + }, ) } Self::NoBlockNumberOrHash => { warn!("NoBlockNumberOrHash"); ( StatusCode::INTERNAL_SERVER_ERROR, - JsonRpcForwardedResponse::from_str( - "Blocks here must have a number or hash", - Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), - None, - ), + JsonRpcErrorData { + message: Cow::Borrowed("Blocks here must have a number or hash"), + code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), + data: None, + }, ) } Self::NoBlocksKnown => { error!("NoBlocksKnown"); ( StatusCode::BAD_GATEWAY, - JsonRpcForwardedResponse::from_str( - "no blocks known", - Some(StatusCode::BAD_GATEWAY.as_u16().into()), - None, - ), + JsonRpcErrorData { + message: Cow::Borrowed("no blocks known"), + code: StatusCode::BAD_GATEWAY.as_u16().into(), + data: None, + }, ) } Self::NoConsensusHeadBlock => { error!("NoConsensusHeadBlock"); ( StatusCode::BAD_GATEWAY, - JsonRpcForwardedResponse::from_str( - "no consensus head block", - Some(StatusCode::BAD_GATEWAY.as_u16().into()), - None, - ), + JsonRpcErrorData { + message: Cow::Borrowed("no consensus head block"), + code: StatusCode::BAD_GATEWAY.as_u16().into(), + data: None, + }, ) } Self::NoHandleReady => { error!("NoHandleReady"); ( StatusCode::BAD_GATEWAY, - JsonRpcForwardedResponse::from_str( - "unable to retry for request handle", - Some(StatusCode::BAD_GATEWAY.as_u16().into()), - None, - ), + JsonRpcErrorData { + message: Cow::Borrowed("unable to retry for request handle"), + code: StatusCode::BAD_GATEWAY.as_u16().into(), + data: None, + }, ) } Self::NoServersSynced => { warn!("NoServersSynced"); ( StatusCode::BAD_GATEWAY, - JsonRpcForwardedResponse::from_str( - "no servers synced", - Some(StatusCode::BAD_GATEWAY.as_u16().into()), - None, - ), + JsonRpcErrorData { + message: Cow::Borrowed("no servers synced"), + code: StatusCode::BAD_GATEWAY.as_u16().into(), + data: None, + }, ) } Self::NotEnoughRpcs { @@ -523,11 +521,14 @@ impl Web3ProxyError { error!("NotEnoughRpcs {}/{}", num_known, min_head_rpcs); ( StatusCode::BAD_GATEWAY, - JsonRpcForwardedResponse::from_string( - format!("not enough rpcs connected {}/{}", num_known, min_head_rpcs), - Some(StatusCode::BAD_GATEWAY.as_u16().into()), - None, - ), + JsonRpcErrorData { + message: Cow::Owned(format!( + "not enough rpcs connected {}/{}", + num_known, min_head_rpcs + )), + code: StatusCode::BAD_GATEWAY.as_u16().into(), + data: None, + }, ) } Self::NotFound => { @@ -535,88 +536,88 @@ impl Web3ProxyError { // TODO: instead of an error, show a normal html page for 404? ( StatusCode::NOT_FOUND, - JsonRpcForwardedResponse::from_str( - "not found!", - Some(StatusCode::NOT_FOUND.as_u16().into()), - None, - ), + JsonRpcErrorData { + message: Cow::Borrowed("not found!"), + code: StatusCode::NOT_FOUND.as_u16().into(), + data: None, + }, ) } Self::NotImplemented => { trace!("NotImplemented"); ( StatusCode::NOT_IMPLEMENTED, - JsonRpcForwardedResponse::from_str( - "work in progress", - Some(StatusCode::NOT_IMPLEMENTED.as_u16().into()), - None, - ), + JsonRpcErrorData { + message: Cow::Borrowed("work in progress"), + code: StatusCode::NOT_IMPLEMENTED.as_u16().into(), + data: None, + }, ) } Self::OriginRequired => { trace!("OriginRequired"); ( StatusCode::BAD_REQUEST, - JsonRpcForwardedResponse::from_str( - "Origin required", - Some(StatusCode::BAD_REQUEST.as_u16().into()), - None, - ), + JsonRpcErrorData { + message: Cow::Borrowed("Origin required"), + code: StatusCode::BAD_REQUEST.as_u16().into(), + data: None, + }, ) } Self::OriginNotAllowed(origin) => { trace!("OriginNotAllowed origin={}", origin); ( StatusCode::FORBIDDEN, - JsonRpcForwardedResponse::from_string( - format!("Origin ({}) is not allowed!", origin), - Some(StatusCode::FORBIDDEN.as_u16().into()), - None, - ), + JsonRpcErrorData { + message: Cow::Owned(format!("Origin ({}) is not allowed!", origin)), + code: StatusCode::FORBIDDEN.as_u16().into(), + data: None, + }, ) } Self::ParseBytesError(err) => { trace!("ParseBytesError err={:?}", err); ( StatusCode::BAD_REQUEST, - JsonRpcForwardedResponse::from_str( - "parse bytes error!", - Some(StatusCode::BAD_REQUEST.as_u16().into()), - None, - ), + JsonRpcErrorData { + message: Cow::Borrowed("parse bytes error!"), + code: StatusCode::BAD_REQUEST.as_u16().into(), + data: None, + }, ) } Self::ParseMsgError(err) => { trace!("ParseMsgError err={:?}", err); ( StatusCode::BAD_REQUEST, - JsonRpcForwardedResponse::from_str( - "parse message error!", - Some(StatusCode::BAD_REQUEST.as_u16().into()), - None, - ), + JsonRpcErrorData { + message: Cow::Borrowed("parse message error!"), + code: StatusCode::BAD_REQUEST.as_u16().into(), + data: None, + }, ) } Self::ParseAddressError => { trace!("ParseAddressError"); ( StatusCode::BAD_REQUEST, - JsonRpcForwardedResponse::from_str( - "unable to parse address", - Some(StatusCode::BAD_REQUEST.as_u16().into()), - None, - ), + JsonRpcErrorData { + message: Cow::Borrowed("unable to parse address"), + code: StatusCode::BAD_REQUEST.as_u16().into(), + data: None, + }, ) } Self::PaymentRequired => { trace!("PaymentRequiredError"); ( StatusCode::PAYMENT_REQUIRED, - JsonRpcForwardedResponse::from_str( - "Payment is required and user is not premium.", - Some(StatusCode::PAYMENT_REQUIRED.as_u16().into()), - None, - ), + JsonRpcErrorData { + message: Cow::Borrowed("Payment is required and user is not premium"), + code: StatusCode::PAYMENT_REQUIRED.as_u16().into(), + data: None, + }, ) } // TODO: this should actually by the id of the key. multiple users might control one key @@ -638,89 +639,91 @@ impl Web3ProxyError { format!( "too many requests from rpc key #{}.{}", authorization.checks.rpc_secret_key_id.unwrap(), - retry_msg + retry_msg, ) }; ( StatusCode::TOO_MANY_REQUESTS, - JsonRpcForwardedResponse::from_string( - msg, - Some(StatusCode::TOO_MANY_REQUESTS.as_u16().into()), - None, - ), + JsonRpcErrorData { + message: Cow::Owned(msg), + code: StatusCode::TOO_MANY_REQUESTS.as_u16().into(), + data: None, + }, ) } Self::Redis(err) => { warn!("redis err={:?}", err); ( StatusCode::INTERNAL_SERVER_ERROR, - JsonRpcForwardedResponse::from_str( - "redis error!", - Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), - None, - ), + JsonRpcErrorData { + message: Cow::Borrowed("redis error!"), + code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), + data: None, + }, ) } Self::RefererRequired => { warn!("referer required"); ( StatusCode::BAD_REQUEST, - JsonRpcForwardedResponse::from_str( - "Referer required", - Some(StatusCode::BAD_REQUEST.as_u16().into()), - None, - ), + JsonRpcErrorData { + message: Cow::Borrowed("Referer required"), + code: StatusCode::BAD_REQUEST.as_u16().into(), + data: None, + }, ) } Self::RefererNotAllowed(referer) => { warn!("referer not allowed referer={:?}", referer); ( StatusCode::FORBIDDEN, - JsonRpcForwardedResponse::from_string( - format!("Referer ({:?}) is not allowed", referer), - Some(StatusCode::FORBIDDEN.as_u16().into()), - None, - ), + JsonRpcErrorData { + message: Cow::Owned(format!("Referer ({:?}) is not allowed", referer)), + code: StatusCode::FORBIDDEN.as_u16().into(), + data: None, + }, ) } - Self::Arc(err) => match Arc::try_unwrap(err) { - Ok(err) => err, - Err(err) => Self::Anyhow(anyhow::anyhow!("{}", err)), + Self::Arc(err) => { + return match Arc::try_unwrap(err) { + Ok(err) => err, + Err(err) => Self::Anyhow(anyhow::anyhow!("{}", err)), + } + .into_response_parts(); } - .into_response_parts(), Self::SemaphoreAcquireError(err) => { warn!("semaphore acquire err={:?}", err); ( StatusCode::INTERNAL_SERVER_ERROR, - JsonRpcForwardedResponse::from_string( + JsonRpcErrorData { // TODO: is it safe to expose all of our anyhow strings? - "semaphore acquire error".to_string(), - Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), - None, - ), + message: Cow::Borrowed("semaphore acquire error"), + code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), + data: None, + }, ) } - Self::KanalSendError(err) => { + Self::SendAppStatError(err) => { error!("SendAppStatError err={:?}", err); ( StatusCode::INTERNAL_SERVER_ERROR, - JsonRpcForwardedResponse::from_str( - "error stat_sender sending response_stat", - Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), - None, - ), + JsonRpcErrorData { + message: Cow::Borrowed("error stat_sender sending response_stat"), + code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), + data: None, + }, ) } Self::SerdeJson(err) => { - warn!("serde json err={:?}", err); + warn!("serde json err={:?} source={:?}", err, err.source()); ( StatusCode::BAD_REQUEST, - JsonRpcForwardedResponse::from_str( - "de/serialization error!", - Some(StatusCode::BAD_REQUEST.as_u16().into()), - None, - ), + JsonRpcErrorData { + message: Cow::Owned(format!("de/serialization error! {}", err)), + code: StatusCode::BAD_REQUEST.as_u16().into(), + data: None, + }, ) } Self::StatusCode(status_code, err_msg, err) => { @@ -734,80 +737,84 @@ impl Web3ProxyError { ( status_code, - JsonRpcForwardedResponse::from_str(&err_msg, Some(code.into()), None), + JsonRpcErrorData { + message: Cow::Owned(err_msg), + code: code.into(), + data: None, + }, ) } Self::Timeout(x) => ( StatusCode::REQUEST_TIMEOUT, - JsonRpcForwardedResponse::from_str( - &format!("request timed out: {:?}", x), - Some(StatusCode::REQUEST_TIMEOUT.as_u16().into()), + JsonRpcErrorData { + message: Cow::Owned(format!("request timed out: {:?}", x)), + code: StatusCode::REQUEST_TIMEOUT.as_u16().into(), // TODO: include the actual id! - None, - ), + data: None, + }, ), Self::HeaderToString(err) => { // trace!(?err, "HeaderToString"); ( StatusCode::BAD_REQUEST, - JsonRpcForwardedResponse::from_str( - &format!("{}", err), - Some(StatusCode::BAD_REQUEST.as_u16().into()), - None, - ), + JsonRpcErrorData { + message: Cow::Owned(err.to_string()), + code: StatusCode::BAD_REQUEST.as_u16().into(), + data: None, + }, ) } Self::UlidDecode(err) => { // trace!(?err, "UlidDecodeError"); ( StatusCode::BAD_REQUEST, - JsonRpcForwardedResponse::from_str( - &format!("{}", err), - Some(StatusCode::BAD_REQUEST.as_u16().into()), - None, - ), + JsonRpcErrorData { + message: Cow::Owned(format!("{}", err)), + code: StatusCode::BAD_REQUEST.as_u16().into(), + data: None, + }, ) } Self::UnknownBlockNumber => { error!("UnknownBlockNumber"); ( StatusCode::BAD_GATEWAY, - JsonRpcForwardedResponse::from_str( - "no servers synced. unknown eth_blockNumber", - Some(StatusCode::BAD_GATEWAY.as_u16().into()), - None, - ), + JsonRpcErrorData { + message: Cow::Borrowed("no servers synced. unknown eth_blockNumber"), + code: StatusCode::BAD_GATEWAY.as_u16().into(), + data: None, + }, ) } // TODO: stat? Self::UnknownKey => ( StatusCode::UNAUTHORIZED, - JsonRpcForwardedResponse::from_str( - "unknown api key!", - Some(StatusCode::UNAUTHORIZED.as_u16().into()), - None, - ), + JsonRpcErrorData { + message: Cow::Borrowed("unknown api key!"), + code: StatusCode::UNAUTHORIZED.as_u16().into(), + data: None, + }, ), Self::UserAgentRequired => { warn!("UserAgentRequired"); ( StatusCode::BAD_REQUEST, - JsonRpcForwardedResponse::from_str( - "User agent required", - Some(StatusCode::BAD_REQUEST.as_u16().into()), - None, - ), + JsonRpcErrorData { + message: Cow::Borrowed("User agent required"), + code: StatusCode::BAD_REQUEST.as_u16().into(), + data: None, + }, ) } Self::UserAgentNotAllowed(ua) => { warn!("UserAgentNotAllowed ua={}", ua); ( StatusCode::FORBIDDEN, - JsonRpcForwardedResponse::from_string( - format!("User agent ({}) is not allowed!", ua), - Some(StatusCode::FORBIDDEN.as_u16().into()), - None, - ), + JsonRpcErrorData { + message: Cow::Owned(format!("User agent ({}) is not allowed!", ua)), + code: StatusCode::FORBIDDEN.as_u16().into(), + data: None, + }, ) } Self::UserIdZero => { @@ -815,75 +822,79 @@ impl Web3ProxyError { // TODO: this might actually be an application error and not a BAD_REQUEST ( StatusCode::BAD_REQUEST, - JsonRpcForwardedResponse::from_str( - "user ids should always be non-zero", - Some(StatusCode::BAD_REQUEST.as_u16().into()), - None, - ), + JsonRpcErrorData { + message: Cow::Borrowed("user ids should always be non-zero"), + code: StatusCode::BAD_REQUEST.as_u16().into(), + data: None, + }, ) } Self::VerificationError(err) => { trace!("VerificationError err={:?}", err); ( StatusCode::BAD_REQUEST, - JsonRpcForwardedResponse::from_str( - "verification error!", - Some(StatusCode::BAD_REQUEST.as_u16().into()), - None, - ), + JsonRpcErrorData { + message: Cow::Borrowed("verification error!"), + code: StatusCode::BAD_REQUEST.as_u16().into(), + data: None, + }, ) } Self::WatchRecvError(err) => { error!("WatchRecvError err={:?}", err); ( StatusCode::INTERNAL_SERVER_ERROR, - JsonRpcForwardedResponse::from_str( - "watch recv error!", - Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), - None, - ), + JsonRpcErrorData { + message: Cow::Borrowed("watch recv error!"), + code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), + data: None, + }, ) } Self::WatchSendError => { error!("WatchSendError"); ( StatusCode::INTERNAL_SERVER_ERROR, - JsonRpcForwardedResponse::from_str( - "watch send error!", - Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), - None, - ), + JsonRpcErrorData { + message: Cow::Borrowed("watch send error!"), + code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), + data: None, + }, ) } Self::WebsocketOnly => { trace!("WebsocketOnly"); ( StatusCode::BAD_REQUEST, - JsonRpcForwardedResponse::from_str( - "redirect_public_url not set. only websockets work here", - Some(StatusCode::BAD_REQUEST.as_u16().into()), - None, - ), + JsonRpcErrorData { + message: Cow::Borrowed( + "redirect_public_url not set. only websockets work here", + ), + code: StatusCode::BAD_REQUEST.as_u16().into(), + data: None, + }, ) } Self::WithContext(err, msg) => match err { Some(err) => { warn!("{:#?} w/ context {}", err, msg); - err.into_response_parts() + return err.into_response_parts(); } None => { warn!("error w/ context {}", msg); ( StatusCode::INTERNAL_SERVER_ERROR, - JsonRpcForwardedResponse::from_string( - msg, - Some(StatusCode::INTERNAL_SERVER_ERROR.as_u16().into()), - None, - ), + JsonRpcErrorData { + message: Cow::Owned(msg), + code: StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(), + data: None, + }, ) } }, - } + }; + + (code, JsonRpcResponseData::from(err)) } } @@ -903,7 +914,12 @@ impl IntoResponse for Web3ProxyError { fn into_response(self) -> Response { // TODO: include the request id in these so that users can give us something that will point to logs // TODO: status code is in the jsonrpc response and is also the first item in the tuple. DRY - let (status_code, response) = self.into_response_parts(); + let (status_code, response_data) = self.into_response_parts(); + + // this will be missing the jsonrpc id! + // its better to get request id and call from_response_data with it then to use this IntoResponse helper. + let response = + JsonRpcForwardedResponse::from_response_data(response_data, Default::default()); (status_code, Json(response)).into_response() } diff --git a/web3_proxy/src/frontend/mod.rs b/web3_proxy/src/frontend/mod.rs index 728978da..0aca1b02 100644 --- a/web3_proxy/src/frontend/mod.rs +++ b/web3_proxy/src/frontend/mod.rs @@ -16,26 +16,27 @@ use axum::{ routing::{get, post, put}, Extension, Router, }; -use http::header::AUTHORIZATION; +use http::{header::AUTHORIZATION, StatusCode}; use listenfd::ListenFd; use log::info; -use moka::future::Cache; +use std::iter::once; use std::net::SocketAddr; use std::sync::Arc; -use std::{iter::once, time::Duration}; -use tokio::sync::broadcast; +use strum::{EnumCount, EnumIter}; +use tokio::{sync::broadcast, time::Instant}; use tower_http::cors::CorsLayer; use tower_http::sensitive_headers::SetSensitiveRequestHeadersLayer; /// simple keys for caching responses -#[derive(Clone, Hash, PartialEq, Eq)] -pub enum FrontendResponseCaches { +#[derive(Copy, Clone, Hash, PartialEq, Eq, EnumCount, EnumIter)] +pub enum FrontendResponseCacheKey { + BackupsNeeded, + Health, Status, } pub type FrontendJsonResponseCache = - Cache, hashbrown::hash_map::DefaultHashBuilder>; -pub type FrontendHealthCache = Cache<(), bool, hashbrown::hash_map::DefaultHashBuilder>; + quick_cache::sync::Cache; /// Start the frontend server. pub async fn serve( @@ -47,14 +48,9 @@ pub async fn serve( // setup caches for whatever the frontend needs // no need for max items since it is limited by the enum key // TODO: latest moka allows for different ttls for different - let json_response_cache: FrontendJsonResponseCache = Cache::builder() - .time_to_live(Duration::from_secs(2)) - .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()); + let response_cache_size = FrontendResponseCacheKey::COUNT; - // /health gets a cache with a shorter lifetime - let health_cache: FrontendHealthCache = Cache::builder() - .time_to_live(Duration::from_millis(100)) - .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()); + let json_response_cache = FrontendJsonResponseCache::new(response_cache_size); // TODO: read config for if fastest/versus should be available publicly. default off @@ -220,8 +216,7 @@ pub async fn serve( // application state .layer(Extension(proxy_app)) // frontend caches - .layer(Extension(json_response_cache)) - .layer(Extension(health_cache)) + .layer(Extension(Arc::new(json_response_cache))) // 404 for any unknown routes .fallback(errors::handler_404); diff --git a/web3_proxy/src/frontend/rpc_proxy_http.rs b/web3_proxy/src/frontend/rpc_proxy_http.rs index 06b55603..1b39678c 100644 --- a/web3_proxy/src/frontend/rpc_proxy_http.rs +++ b/web3_proxy/src/frontend/rpc_proxy_http.rs @@ -63,6 +63,8 @@ async fn _proxy_web3_rpc( let authorization = Arc::new(authorization); + // TODO: calculate payload bytes here (before turning into serde_json::Value). that will save serializing later + let (status_code, response, rpcs, _semaphore) = app .proxy_web3_rpc(authorization, payload) .await diff --git a/web3_proxy/src/frontend/rpc_proxy_ws.rs b/web3_proxy/src/frontend/rpc_proxy_ws.rs index e562f297..03aa9843 100644 --- a/web3_proxy/src/frontend/rpc_proxy_ws.rs +++ b/web3_proxy/src/frontend/rpc_proxy_ws.rs @@ -20,7 +20,7 @@ use axum::{ }; use axum_client_ip::InsecureClientIp; use axum_macros::debug_handler; -use ethers::types::Bytes; +use ethers::types::U64; use fstrings::{f, format_args_f}; use futures::SinkExt; use futures::{ @@ -311,7 +311,7 @@ async fn proxy_web3_socket( let (ws_tx, ws_rx) = socket.split(); // create a channel for our reader and writer can communicate. todo: benchmark different channels - let (response_sender, response_receiver) = kanal::unbounded_async::(); + let (response_sender, response_receiver) = flume::unbounded::(); tokio::spawn(write_web3_socket(response_receiver, ws_tx)); tokio::spawn(read_web3_socket(app, authorization, ws_rx, response_sender)); @@ -323,25 +323,28 @@ async fn handle_socket_payload( app: Arc, authorization: &Arc, payload: &str, - response_sender: &kanal::AsyncSender, + response_sender: &flume::Sender, subscription_count: &AtomicUsize, - subscriptions: Arc>>, + subscriptions: Arc>>, ) -> Web3ProxyResult<(Message, Option)> { let (authorization, semaphore) = match authorization.check_again(&app).await { Ok((a, s)) => (a, s), Err(err) => { let (_, err) = err.into_response_parts(); - let err = serde_json::to_string(&err).expect("to_string should always work here"); + let err = JsonRpcForwardedResponse::from_response_data(err, Default::default()); + + let err = serde_json::to_string(&err)?; return Ok((Message::Text(err), None)); } }; // TODO: do any clients send batches over websockets? - let (id, response) = match serde_json::from_str::(payload) { + // TODO: change response into response_data + let (response_id, response) = match serde_json::from_str::(payload) { Ok(json_request) => { - let id = json_request.id.clone(); + let response_id = json_request.id.clone(); // TODO: move this to a seperate function so we can use the try operator let response: Web3ProxyResult = @@ -366,9 +369,9 @@ async fn handle_socket_payload( .as_ref() .context("there should be a result here")?; - // TODO: there must be a better way to do this - let k: Bytes = serde_json::from_str(result.get()) - .context("subscription ids must be bytes")?; + // TODO: there must be a better way to turn a RawValue + let k: U64 = serde_json::from_str(result.get()) + .context("subscription ids must be U64s")?; x.insert(k, handle); }; @@ -384,7 +387,7 @@ async fn handle_socket_payload( .await; #[derive(serde::Deserialize)] - struct EthUnsubscribeParams([Bytes; 1]); + struct EthUnsubscribeParams([U64; 1]); if let Some(params) = json_request.params { match serde_json::from_value(params) { @@ -403,9 +406,10 @@ async fn handle_socket_payload( } }; + // TODO: don't create the response here. use a JsonRpcResponseData instead let response = JsonRpcForwardedResponse::from_value( json!(partial_response), - id.clone(), + response_id.clone(), ); request_metadata.add_response(&response); @@ -428,7 +432,7 @@ async fn handle_socket_payload( .map(|(status_code, response, _)| response), }; - (id, response) + (response_id, response) } Err(err) => { let id = JsonRpcId::None.to_raw_value(); @@ -439,8 +443,10 @@ async fn handle_socket_payload( let response_str = match response { Ok(x) => serde_json::to_string(&x).expect("to_string should always work here"), Err(err) => { - let (_, mut response) = err.into_response_parts(); - response.id = id; + let (_, response_data) = err.into_response_parts(); + + let response = JsonRpcForwardedResponse::from_response_data(response_data, response_id); + serde_json::to_string(&response).expect("to_string should always work here") } }; @@ -452,7 +458,7 @@ async fn read_web3_socket( app: Arc, authorization: Arc, mut ws_rx: SplitStream, - response_sender: kanal::AsyncSender, + response_sender: flume::Sender, ) { // RwLock should be fine here. a user isn't going to be opening tons of subscriptions let subscriptions = Arc::new(RwLock::new(HashMap::new())); @@ -528,7 +534,7 @@ async fn read_web3_socket( } }; - if response_sender.send(response_msg).await.is_err() { + if response_sender.send_async(response_msg).await.is_err() { let _ = close_sender.send(true); return; }; @@ -549,13 +555,13 @@ async fn read_web3_socket( } async fn write_web3_socket( - response_rx: kanal::AsyncReceiver, + response_rx: flume::Receiver, mut ws_tx: SplitSink, ) { // TODO: increment counter for open websockets // TODO: is there any way to make this stream receive. - while let Ok(msg) = response_rx.recv().await { + while let Ok(msg) = response_rx.recv_async().await { // a response is ready // TODO: poke rate limits for this user? diff --git a/web3_proxy/src/frontend/status.rs b/web3_proxy/src/frontend/status.rs index 0e46c21a..4bc30aa3 100644 --- a/web3_proxy/src/frontend/status.rs +++ b/web3_proxy/src/frontend/status.rs @@ -3,32 +3,97 @@ //! For ease of development, users can currently access these endponts. //! They will eventually move to another port. -use super::{FrontendHealthCache, FrontendJsonResponseCache, FrontendResponseCaches}; -use crate::app::{Web3ProxyApp, APP_USER_AGENT}; -use axum::{http::StatusCode, response::IntoResponse, Extension, Json}; +use super::{FrontendJsonResponseCache, FrontendResponseCacheKey}; +use crate::{ + app::{Web3ProxyApp, APP_USER_AGENT}, + frontend::errors::Web3ProxyError, +}; +use axum::{body::Bytes, http::StatusCode, response::IntoResponse, Extension}; use axum_macros::debug_handler; +use futures::Future; +use once_cell::sync::Lazy; use serde_json::json; -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; +use tokio::time::Instant; + +static HEALTH_OK: Lazy = Lazy::new(|| Bytes::from("OK\n")); +static HEALTH_NOT_OK: Lazy = Lazy::new(|| Bytes::from(":(\n")); + +static BACKUPS_NEEDED_TRUE: Lazy = Lazy::new(|| Bytes::from("true\n")); +static BACKUPS_NEEDED_FALSE: Lazy = Lazy::new(|| Bytes::from("false\n")); + +/// simple ttl for +// TODO: make this generic for any cache/key +async fn _quick_cache_ttl( + app: Arc, + cache: Arc, + key: FrontendResponseCacheKey, + f: impl Fn(Arc) -> Fut, +) -> (StatusCode, Bytes) +where + Fut: Future, +{ + let mut response; + let expire_at; + + (response, expire_at) = cache + .get_or_insert_async::(&key, async { + let expire_at = Instant::now() + Duration::from_millis(1000); + + let response = f(app.clone()).await; + + Ok((response, expire_at)) + }) + .await + .unwrap(); + + if Instant::now() >= expire_at { + // TODO: this expiration isn't perfect + // parallel requests could overwrite eachother + // its good enough for now + let expire_at = Instant::now() + Duration::from_millis(1000); + + response = f(app).await; + + cache.insert(key, (response.clone(), expire_at)); + } + + response +} /// Health check page for load balancers to use. #[debug_handler] pub async fn health( Extension(app): Extension>, - Extension(health_cache): Extension, + Extension(cache): Extension>, ) -> impl IntoResponse { - let synced = health_cache - .get_with((), async { app.balanced_rpcs.synced() }) - .await; + _quick_cache_ttl(app, cache, FrontendResponseCacheKey::Health, _health).await +} - if synced { - (StatusCode::OK, "OK") +async fn _health(app: Arc) -> (StatusCode, Bytes) { + if app.balanced_rpcs.synced() { + (StatusCode::OK, HEALTH_OK.clone()) } else { - (StatusCode::SERVICE_UNAVAILABLE, ":(") + (StatusCode::SERVICE_UNAVAILABLE, HEALTH_NOT_OK.clone()) } } /// Easy alerting if backup servers are in use. -pub async fn backups_needed(Extension(app): Extension>) -> impl IntoResponse { +#[debug_handler] +pub async fn backups_needed( + Extension(app): Extension>, + Extension(cache): Extension>, +) -> impl IntoResponse { + _quick_cache_ttl( + app, + cache, + FrontendResponseCacheKey::BackupsNeeded, + _backups_needed, + ) + .await +} + +async fn _backups_needed(app: Arc) -> (StatusCode, Bytes) { let code = { let consensus_rpcs = app .balanced_rpcs @@ -49,9 +114,9 @@ pub async fn backups_needed(Extension(app): Extension>) -> imp }; if matches!(code, StatusCode::OK) { - (code, "no backups needed. :)") + (code, BACKUPS_NEEDED_FALSE.clone()) } else { - (code, "backups needed! :(") + (code, BACKUPS_NEEDED_TRUE.clone()) } } @@ -61,23 +126,33 @@ pub async fn backups_needed(Extension(app): Extension>) -> imp #[debug_handler] pub async fn status( Extension(app): Extension>, - Extension(response_cache): Extension, + Extension(cache): Extension>, ) -> impl IntoResponse { - let body = response_cache - .get_with(FrontendResponseCaches::Status, async { - // TODO: what else should we include? uptime, cache hit rates, cpu load, memory used - // TODO: the hostname is probably not going to change. only get once at the start? - let body = json!({ - "version": APP_USER_AGENT, - "chain_id": app.config.chain_id, - "balanced_rpcs": app.balanced_rpcs, - "private_rpcs": app.private_rpcs, - "hostname": app.hostname, - }); - - Arc::new(body) - }) - .await; - - Json(body) + _quick_cache_ttl(app, cache, FrontendResponseCacheKey::Status, _status).await +} + +// TODO: this doesn't need to be async, but _quick_cache_ttl needs an async function +async fn _status(app: Arc) -> (StatusCode, Bytes) { + // TODO: what else should we include? uptime, cache hit rates, cpu load, memory used + // TODO: the hostname is probably not going to change. only get once at the start? + let body = json!({ + "version": APP_USER_AGENT, + "chain_id": app.config.chain_id, + "balanced_rpcs": app.balanced_rpcs, + "private_rpcs": app.private_rpcs, + "bundler_4337_rpcs": app.bundler_4337_rpcs, + "hostname": app.hostname, + }); + + let body = body.to_string().into_bytes(); + + let body = Bytes::from(body); + + let code = if app.balanced_rpcs.synced() { + StatusCode::OK + } else { + StatusCode::INTERNAL_SERVER_ERROR + }; + + (code, body) } diff --git a/web3_proxy/src/frontend/users/authentication.rs b/web3_proxy/src/frontend/users/authentication.rs index e681ea41..e4a70ca3 100644 --- a/web3_proxy/src/frontend/users/authentication.rs +++ b/web3_proxy/src/frontend/users/authentication.rs @@ -321,7 +321,7 @@ pub async fn user_login_post( .filter(referrer::Column::ReferralCode.eq(referral_code)) .one(db_replica.conn()) .await? - .ok_or(Web3ProxyError::InvalidReferralCode)?; + .ok_or(Web3ProxyError::UnknownReferralCode)?; // Create a new item in the database, // marking this guy as the referrer (and ignoring a duplicate insert, if there is any...) diff --git a/web3_proxy/src/jsonrpc.rs b/web3_proxy/src/jsonrpc.rs index eabd3d0b..677cc069 100644 --- a/web3_proxy/src/jsonrpc.rs +++ b/web3_proxy/src/jsonrpc.rs @@ -1,20 +1,17 @@ use crate::frontend::errors::{Web3ProxyError, Web3ProxyResult}; +use crate::response_cache::JsonRpcResponseData; use derive_more::From; use ethers::prelude::ProviderError; use serde::de::{self, Deserializer, MapAccess, SeqAccess, Visitor}; use serde::{Deserialize, Serialize}; use serde_json::json; use serde_json::value::{to_raw_value, RawValue}; +use std::borrow::Cow; use std::fmt; -fn default_jsonrpc() -> String { - "2.0".to_string() -} - +// TODO: &str here instead of String should save a lot of allocations #[derive(Clone, Deserialize, Serialize)] pub struct JsonRpcRequest { - // TODO: skip jsonrpc entirely? its against spec to drop it, but some servers bad - #[serde(default = "default_jsonrpc")] pub jsonrpc: String, /// id could be a stricter type, but many rpcs do things against the spec pub id: Box, @@ -51,7 +48,7 @@ impl JsonRpcRequest { params: Option, ) -> anyhow::Result { let x = Self { - jsonrpc: default_jsonrpc(), + jsonrpc: "2.0".to_string(), id: id.to_raw_value(), method, params, @@ -194,19 +191,38 @@ pub struct JsonRpcErrorData { /// The error code pub code: i64, /// The error message - pub message: String, + pub message: Cow<'static, str>, /// Additional data #[serde(skip_serializing_if = "Option::is_none")] pub data: Option, } +impl From<&'static str> for JsonRpcErrorData { + fn from(value: &'static str) -> Self { + Self { + code: -32000, + message: Cow::Borrowed(value), + data: None, + } + } +} + +impl From for JsonRpcErrorData { + fn from(value: String) -> Self { + Self { + code: -32000, + message: Cow::Owned(value), + data: None, + } + } +} + /// A complete response /// TODO: better Debug response #[derive(Clone, Debug, Deserialize, Serialize)] pub struct JsonRpcForwardedResponse { // TODO: jsonrpc a &str? - #[serde(default = "default_jsonrpc")] - pub jsonrpc: String, + pub jsonrpc: &'static str, pub id: Box, #[serde(skip_serializing_if = "Option::is_none")] pub result: Option>, @@ -242,40 +258,40 @@ impl JsonRpcForwardedResponse { // TODO: this is too verbose. plenty of errors are valid, like users giving an invalid address. no need to log that // TODO: can we somehow get the initial request here? if we put that into a tracing span, will things slow down a ton? JsonRpcForwardedResponse { - jsonrpc: "2.0".to_string(), - id: id.unwrap_or_else(|| JsonRpcId::None.to_raw_value()), + jsonrpc: "2.0", + id: id.unwrap_or_default(), result: None, error: Some(JsonRpcErrorData { code: code.unwrap_or(-32099), - message, + message: Cow::Owned(message), // TODO: accept data as an argument data: None, }), } } - pub fn from_response(partial_response: Box, id: Box) -> Self { + pub fn from_raw_response(result: Box, id: Box) -> Self { JsonRpcForwardedResponse { - jsonrpc: "2.0".to_string(), + jsonrpc: "2.0", id, // TODO: since we only use the result here, should that be all we return from try_send_request? - result: Some(partial_response), + result: Some(result), error: None, } } - pub fn from_value(partial_response: serde_json::Value, id: Box) -> Self { - let partial_response = - to_raw_value(&partial_response).expect("Value to RawValue should always work"); + pub fn from_value(result: serde_json::Value, id: Box) -> Self { + let partial_response = to_raw_value(&result).expect("Value to RawValue should always work"); JsonRpcForwardedResponse { - jsonrpc: "2.0".to_string(), + jsonrpc: "2.0", id, result: Some(partial_response), error: None, } } + // TODO: delete this. its on JsonRpcErrorData pub fn from_ethers_error(e: ProviderError, id: Box) -> Web3ProxyResult { // TODO: move turning ClientError into json to a helper function? let code; @@ -302,12 +318,12 @@ impl JsonRpcForwardedResponse { } Ok(Self { - jsonrpc: "2.0".to_string(), + jsonrpc: "2.0", id, result: None, error: Some(JsonRpcErrorData { code, - message, + message: Cow::Owned(message), data, }), }) @@ -318,16 +334,21 @@ impl JsonRpcForwardedResponse { id: Box, ) -> Web3ProxyResult { match result { - Ok(response) => Ok(Self::from_response(response, id)), + Ok(response) => Ok(Self::from_raw_response(response, id)), Err(e) => Self::from_ethers_error(e, id), } } - pub fn num_bytes(&self) -> usize { - // TODO: not sure how to do this without wasting a ton of allocations - serde_json::to_string(self) - .expect("this should always be valid json") - .len() + pub fn from_response_data(data: JsonRpcResponseData, id: Box) -> Self { + match data { + JsonRpcResponseData::Result { value, .. } => Self::from_raw_response(value, id), + JsonRpcResponseData::Error { value, .. } => JsonRpcForwardedResponse { + jsonrpc: "2.0", + id, + result: None, + error: Some(value), + }, + } } } diff --git a/web3_proxy/src/lib.rs b/web3_proxy/src/lib.rs index 1f29beb3..ce4ae400 100644 --- a/web3_proxy/src/lib.rs +++ b/web3_proxy/src/lib.rs @@ -8,6 +8,7 @@ pub mod jsonrpc; pub mod pagerduty; pub mod prometheus; pub mod referral_code; +pub mod response_cache; pub mod rpcs; pub mod stats; pub mod user_token; diff --git a/web3_proxy/src/response_cache.rs b/web3_proxy/src/response_cache.rs new file mode 100644 index 00000000..c7e710bd --- /dev/null +++ b/web3_proxy/src/response_cache.rs @@ -0,0 +1,137 @@ +use crate::{ + frontend::errors::Web3ProxyError, jsonrpc::JsonRpcErrorData, rpcs::blockchain::ArcBlock, +}; +use derive_more::From; +use ethers::providers::ProviderError; +use quick_cache::{sync::Cache as QuickCache, Weighter}; +use serde_json::value::RawValue; +use std::{ + borrow::Cow, + hash::{Hash, Hasher}, + num::NonZeroU32, +}; + +#[derive(Clone, Debug, From, PartialEq, Eq)] +pub struct JsonRpcQueryCacheKey { + pub from_block: Option, + pub to_block: Option, + pub method: String, + pub params: Option, + pub cache_errors: bool, +} + +impl Hash for JsonRpcQueryCacheKey { + fn hash(&self, state: &mut H) { + self.from_block.as_ref().map(|x| x.hash).hash(state); + self.to_block.as_ref().map(|x| x.hash).hash(state); + self.method.hash(state); + + // make sure preserve_order feature is OFF + self.params.as_ref().map(|x| x.to_string()).hash(state); + + self.cache_errors.hash(state) + } +} + +pub type JsonRpcQueryCache = + QuickCache; + +#[derive(Clone)] +pub struct JsonRpcQueryWeigher; + +#[derive(Clone)] +pub enum JsonRpcResponseData { + Result { + value: Box, + size: Option, + }, + Error { + value: JsonRpcErrorData, + size: Option, + }, +} + +impl JsonRpcResponseData { + pub fn num_bytes(&self) -> NonZeroU32 { + // TODO: dry this somehow + match self { + JsonRpcResponseData::Result { value, size } => size.unwrap_or_else(|| { + let size = value.get().len(); + + NonZeroU32::new(size.clamp(1, u32::MAX as usize) as u32).unwrap() + }), + JsonRpcResponseData::Error { value, size } => size.unwrap_or_else(|| { + let size = serde_json::to_string(value).unwrap().len(); + + NonZeroU32::new(size.clamp(1, u32::MAX as usize) as u32).unwrap() + }), + } + } +} + +impl From for JsonRpcResponseData { + fn from(value: serde_json::Value) -> Self { + let value = RawValue::from_string(value.to_string()).unwrap(); + + Self::Result { value, size: None } + } +} + +impl From> for JsonRpcResponseData { + fn from(value: Box) -> Self { + Self::Result { value, size: None } + } +} + +impl From for JsonRpcResponseData { + fn from(value: JsonRpcErrorData) -> Self { + Self::Error { value, size: None } + } +} + +impl TryFrom for JsonRpcErrorData { + type Error = Web3ProxyError; + + fn try_from(e: ProviderError) -> Result { + // TODO: move turning ClientError into json to a helper function? + let code; + let message: String; + let data; + + match e { + ProviderError::JsonRpcClientError(err) => { + if let Some(err) = err.as_error_response() { + code = err.code; + message = err.message.clone(); + data = err.data.clone(); + } else if let Some(err) = err.as_serde_error() { + // this is not an rpc error. keep it as an error + return Err(Web3ProxyError::BadResponse(format!( + "bad response: {}", + err + ))); + } else { + return Err(anyhow::anyhow!("unexpected ethers error! {:?}", err).into()); + } + } + e => return Err(e.into()), + } + + Ok(JsonRpcErrorData { + code, + message: Cow::Owned(message), + data, + }) + } +} + +impl Weighter for JsonRpcQueryWeigher { + fn weight( + &self, + _key: &JsonRpcQueryCacheKey, + _qey: &(), + value: &JsonRpcResponseData, + ) -> NonZeroU32 { + value.num_bytes() + } +} diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index f0c1bdbd..1ec3bc2c 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -5,6 +5,7 @@ use super::one::Web3Rpc; use super::transactions::TxStatus; use crate::frontend::authorization::Authorization; use crate::frontend::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult}; +use crate::response_cache::JsonRpcResponseData; use crate::{config::BlockAndRpc, jsonrpc::JsonRpcRequest}; use derive_more::From; use ethers::prelude::{Block, TxHash, H256, U64}; @@ -250,15 +251,14 @@ impl Web3Rpcs { ) .await?; - if response.error.is_some() { - return Err(response.into()); - } + let value = match response { + JsonRpcResponseData::Error { .. } => { + return Err(anyhow::anyhow!("failed fetching block").into()); + } + JsonRpcResponseData::Result { value, .. } => value, + }; - let block = response - .result - .web3_context("no error, but also no block")?; - - let block: Option = serde_json::from_str(block.get())?; + let block: Option = serde_json::from_str(value.get())?; let block: ArcBlock = block.web3_context("no block in the response")?; @@ -346,13 +346,14 @@ impl Web3Rpcs { .try_send_best_consensus_head_connection(authorization, &request, None, Some(num), None) .await?; - if response.error.is_some() { - return Err(response.into()); - } + let value = match response { + JsonRpcResponseData::Error { .. } => { + return Err(anyhow::anyhow!("failed fetching block").into()); + } + JsonRpcResponseData::Result { value, .. } => value, + }; - let raw_block = response.result.web3_context("no cannonical block result")?; - - let block: ArcBlock = serde_json::from_str(raw_block.get())?; + let block: ArcBlock = serde_json::from_str(value.get())?; let block = Web3ProxyBlock::try_from(block)?; @@ -365,7 +366,7 @@ impl Web3Rpcs { pub(super) async fn process_incoming_blocks( &self, authorization: &Arc, - block_receiver: kanal::AsyncReceiver, + block_receiver: flume::Receiver, // TODO: document that this is a watch sender and not a broadcast! if things get busy, blocks might get missed // Geth's subscriptions have the same potential for skipping blocks. pending_tx_sender: Option>, @@ -373,7 +374,7 @@ impl Web3Rpcs { let mut connection_heads = ConsensusFinder::new(self.max_block_age, self.max_block_lag); loop { - match block_receiver.recv().await { + match block_receiver.recv_async().await { Ok((new_block, rpc)) => { let rpc_name = rpc.name.clone(); diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index a5658522..fbe10fa6 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -373,9 +373,9 @@ impl ConsensusFinder { .0 .tier; - trace!("first_tier: {}", current_tier); + // trace!("first_tier: {}", current_tier); - trace!("rpc_heads_by_tier: {:#?}", rpc_heads_by_tier); + // trace!("rpc_heads_by_tier: {:#?}", rpc_heads_by_tier); // loop over all the rpc heads (grouped by tier) and their parents to find consensus // TODO: i'm sure theres a lot of shortcuts that could be taken, but this is simplest to implement diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 82924eb0..e9183d86 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -8,7 +8,8 @@ use crate::config::{BlockAndRpc, TxHashAndRpc, Web3RpcConfig}; use crate::frontend::authorization::{Authorization, RequestMetadata}; use crate::frontend::errors::{Web3ProxyError, Web3ProxyResult}; use crate::frontend::rpc_proxy_ws::ProxyMode; -use crate::jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest}; +use crate::jsonrpc::{JsonRpcErrorData, JsonRpcRequest}; +use crate::response_cache::JsonRpcResponseData; use crate::rpcs::consensus::{RankedRpcMap, RpcRanking}; use crate::rpcs::transactions::TxStatus; use anyhow::Context; @@ -23,12 +24,13 @@ use hashbrown::{HashMap, HashSet}; use itertools::Itertools; use log::{debug, error, info, trace, warn, Level}; use migration::sea_orm::DatabaseConnection; -use moka::future::{Cache, ConcurrentCacheExt}; +use moka::future::Cache; use ordered_float::OrderedFloat; use serde::ser::{SerializeStruct, Serializer}; use serde::Serialize; use serde_json::json; use serde_json::value::RawValue; +use std::borrow::Cow; use std::cmp::{min_by_key, Reverse}; use std::collections::BTreeMap; use std::fmt; @@ -43,7 +45,7 @@ use tokio::time::{interval, sleep, sleep_until, Duration, Instant, MissedTickBeh #[derive(From)] pub struct Web3Rpcs { /// if watch_consensus_head_sender is some, Web3Rpc inside self will send blocks here when they get them - pub(crate) block_sender: kanal::AsyncSender<(Option, Arc)>, + pub(crate) block_sender: flume::Sender<(Option, Arc)>, /// any requests will be forwarded to one (or more) of these connections pub(crate) by_name: ArcSwap>>, /// notify all http providers to check their blocks at the same time @@ -55,10 +57,11 @@ pub struct Web3Rpcs { pub(crate) watch_consensus_rpcs_sender: watch::Sender>>, /// this head receiver makes it easy to wait until there is a new block pub(super) watch_consensus_head_sender: Option>>, + /// keep track of transactions that we have sent through subscriptions pub(super) pending_transaction_cache: Cache, - pub(super) pending_tx_id_receiver: kanal::AsyncReceiver, - pub(super) pending_tx_id_sender: kanal::AsyncSender, + pub(super) pending_tx_id_receiver: flume::Receiver, + pub(super) pending_tx_id_sender: flume::Sender, /// TODO: this map is going to grow forever unless we do some sort of pruning. maybe store pruned in redis? /// all blocks, including orphans pub(super) blocks_by_hash: BlocksByHashCache, @@ -94,8 +97,8 @@ impl Web3Rpcs { watch::Receiver>>, // watch::Receiver>, )> { - let (pending_tx_id_sender, pending_tx_id_receiver) = kanal::unbounded_async(); - let (block_sender, block_receiver) = kanal::unbounded_async::(); + let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded(); + let (block_sender, block_receiver) = flume::unbounded::(); // TODO: query the rpc to get the actual expected block time, or get from config? maybe have this be part of a health check? let expected_block_time_ms = match chain_id { @@ -347,7 +350,7 @@ impl Web3Rpcs { async fn subscribe( self: Arc, authorization: Arc, - block_receiver: kanal::AsyncReceiver, + block_receiver: flume::Receiver, pending_tx_sender: Option>, ) -> anyhow::Result<()> { let mut futures = vec![]; @@ -362,7 +365,7 @@ impl Web3Rpcs { let pending_tx_id_receiver = self.pending_tx_id_receiver.clone(); let handle = tokio::task::spawn(async move { // TODO: set up this future the same as the block funnel - while let Ok((pending_tx_id, rpc)) = pending_tx_id_receiver.recv().await { + while let Ok((pending_tx_id, rpc)) = pending_tx_id_receiver.recv_async().await { let f = clone.clone().process_incoming_tx_id( authorization.clone(), rpc, @@ -423,12 +426,11 @@ impl Web3Rpcs { pub async fn try_send_parallel_requests( &self, active_request_handles: Vec, - id: Box, method: &str, params: Option<&serde_json::Value>, error_level: Level, // TODO: remove this box once i figure out how to do the options - ) -> Web3ProxyResult { + ) -> Web3ProxyResult { // TODO: if only 1 active_request_handles, do self.try_send_request? let responses = active_request_handles @@ -447,24 +449,16 @@ impl Web3Rpcs { let mut count_map: HashMap = HashMap::new(); let mut counts: Counter = Counter::new(); let mut any_ok_with_json_result = false; - let mut any_ok_but_maybe_json_error = false; for partial_response in responses { if partial_response.is_ok() { any_ok_with_json_result = true; } - let response = - JsonRpcForwardedResponse::try_from_response_result(partial_response, id.clone()); - - // TODO: better key? - let s = format!("{:?}", response); + // TODO: better key! + let s = format!("{:?}", partial_response); if count_map.get(&s).is_none() { - if response.is_ok() { - any_ok_but_maybe_json_error = true; - } - - count_map.insert(s.clone(), response); + count_map.insert(s.clone(), partial_response); } counts.update([s].into_iter()); @@ -477,19 +471,18 @@ impl Web3Rpcs { match most_common { Ok(x) => { - if any_ok_with_json_result && x.error.is_some() { - // this one may be an "Ok", but the json has an error inside it - continue; - } // return the most common success - return Ok(x); + return Ok(x.into()); } Err(err) => { - if any_ok_but_maybe_json_error { + if any_ok_with_json_result { // the most common is an error, but there is an Ok in here somewhere. loop to find it continue; } - return Err(err); + + let err: JsonRpcErrorData = err.try_into()?; + + return Ok(err.into()); } } } @@ -601,7 +594,18 @@ impl Web3Rpcs { return Ok(OpenRequestResult::Handle(handle)); } Ok(OpenRequestResult::RetryAt(retry_at)) => { - earliest_retry_at = earliest_retry_at.min(Some(retry_at)); + trace!( + "{:?} - retry on {} @ {}", + request_ulid, + best_rpc, + retry_at.duration_since(Instant::now()).as_secs_f32() + ); + + if earliest_retry_at.is_none() { + earliest_retry_at = Some(retry_at); + } else { + earliest_retry_at = earliest_retry_at.min(Some(retry_at)); + } } Ok(OpenRequestResult::NotReady) => { // TODO: log a warning? emit a stat? @@ -641,7 +645,16 @@ impl Web3Rpcs { Ok(OpenRequestResult::NotReady) } Some(earliest_retry_at) => { - warn!("no servers on {:?}! {:?}", self, earliest_retry_at); + // TODO: log the server that retry_at came from + // TODO: `self` doesn't log well. get a pretty name for this group of servers + warn!( + "{:?} - no servers on {:?}! retry in {:?}s", + request_ulid, + self, + earliest_retry_at + .duration_since(Instant::now()) + .as_secs_f32() + ); Ok(OpenRequestResult::RetryAt(earliest_retry_at)) } @@ -733,7 +746,7 @@ impl Web3Rpcs { match rpc.try_request_handle(authorization, None).await { Ok(OpenRequestResult::RetryAt(retry_at)) => { // this rpc is not available. skip it - warn!("{} is rate limited. skipping", rpc); + trace!("{} is rate limited. skipping", rpc); earliest_retry_at = earliest_retry_at.min(Some(retry_at)); } Ok(OpenRequestResult::Handle(handle)) => { @@ -767,7 +780,7 @@ impl Web3Rpcs { request_metadata: Option<&Arc>, min_block_needed: Option<&U64>, max_block_needed: Option<&U64>, - ) -> Web3ProxyResult { + ) -> Web3ProxyResult { let mut skip_rpcs = vec![]; let mut method_not_available_response = None; @@ -783,7 +796,7 @@ impl Web3Rpcs { .best_available_rpc( authorization, request_metadata, - &[], + &skip_rpcs, min_block_needed, max_block_needed, ) @@ -800,10 +813,11 @@ impl Web3Rpcs { let is_backup_response = rpc.backup; + // TODO: instead of entirely skipping, maybe demote a tier? skip_rpcs.push(rpc); // TODO: get the log percent from the user data - let response_result = active_request_handle + let response_result: Result, _> = active_request_handle .request( &request.method, &json!(request.params), @@ -812,10 +826,7 @@ impl Web3Rpcs { ) .await; - match JsonRpcForwardedResponse::try_from_response_result( - response_result, - request.id.clone(), - ) { + match response_result { Ok(response) => { // TODO: if there are multiple responses being aggregated, this will only use the last server's backup type if let Some(request_metadata) = request_metadata { @@ -824,97 +835,98 @@ impl Web3Rpcs { .store(is_backup_response, Ordering::Release); } - if let Some(error) = response.error.as_ref() { - // trace!(?response, "rpc error"); + return Ok(response.into()); + } + Err(error) => { + // trace!(?response, "rpc error"); - if let Some(request_metadata) = request_metadata { - request_metadata - .error_response - .store(true, Ordering::Release); + // TODO: separate jsonrpc error and web3 proxy error! + if let Some(request_metadata) = request_metadata { + request_metadata + .error_response + .store(true, Ordering::Release); + } + + let error: JsonRpcErrorData = error.try_into()?; + + // some errors should be retried on other nodes + let error_msg = error.message.as_ref(); + + // different providers do different codes. check all of them + // TODO: there's probably more strings to add here + let rate_limit_substrings = ["limit", "exceeded", "quota usage"]; + for rate_limit_substr in rate_limit_substrings { + if error_msg.contains(rate_limit_substr) { + warn!("rate limited by {}", skip_rpcs.last().unwrap()); + continue; } + } - // some errors should be retried on other nodes - let error_msg = error.message.as_str(); + match error.code { + -32000 => { + // TODO: regex? + let retry_prefixes = [ + "header not found", + "header for hash not found", + "missing trie node", + "node not started", + "RPC timeout", + ]; + for retry_prefix in retry_prefixes { + if error_msg.starts_with(retry_prefix) { + // TODO: too verbose + debug!("retrying on another server"); + continue; + } + } + } + -32601 => { + let error_msg = error.message.as_ref(); - // different providers do different codes. check all of them - // TODO: there's probably more strings to add here - let rate_limit_substrings = ["limit", "exceeded", "quota usage"]; - for rate_limit_substr in rate_limit_substrings { - if error_msg.contains(rate_limit_substr) { - warn!("rate limited by {}", skip_rpcs.last().unwrap()); + // sometimes a provider does not support all rpc methods + // we check other connections rather than returning the error + // but sometimes the method is something that is actually unsupported, + // so we save the response here to return it later + + // some providers look like this + if error_msg.starts_with("the method") + && error_msg.ends_with("is not available") + { + method_not_available_response = Some(error); + continue; + } + + // others look like this (this is the example in the official spec) + if error_msg == "Method not found" { + method_not_available_response = Some(error); continue; } } - - match error.code { - -32000 => { - // TODO: regex? - let retry_prefixes = [ - "header not found", - "header for hash not found", - "missing trie node", - "node not started", - "RPC timeout", - ]; - for retry_prefix in retry_prefixes { - if error_msg.starts_with(retry_prefix) { - continue; - } - } - } - -32601 => { - let error_msg = error.message.as_str(); - - // sometimes a provider does not support all rpc methods - // we check other connections rather than returning the error - // but sometimes the method is something that is actually unsupported, - // so we save the response here to return it later - - // some providers look like this - if error_msg.starts_with("the method") - && error_msg.ends_with("is not available") - { - method_not_available_response = Some(response); - continue; - } - - // others look like this (this is the example in the official spec) - if error_msg == "Method not found" { - method_not_available_response = Some(response); - continue; - } - } - _ => {} - } - } else { - // trace!(?response, "rpc success"); + _ => {} } - return Ok(response); - } - Err(err) => { - let rpc = skip_rpcs - .last() - .expect("there must have been a provider if we got an error"); + // let rpc = skip_rpcs + // .last() + // .expect("there must have been a provider if we got an error"); // TODO: emit a stat. if a server is getting skipped a lot, something is not right // TODO: if we get a TrySendError, reconnect. wait why do we see a trysenderror on a dual provider? shouldn't it be using reqwest - trace!( - "Backend server error on {}! Retrying {:?} on another. err={:?}", - rpc, - request, - err - ); + // TODO! WRONG! ONLY SET RETRY_AT IF THIS IS A SERVER/CONNECTION ERROR. JSONRPC "error" is FINE + // trace!( + // "Backend server error on {}! Retrying {:?} on another. err={:?}", + // rpc, + // request, + // error, + // ); + // if let Some(ref hard_limit_until) = rpc.hard_limit_until { + // let retry_at = Instant::now() + Duration::from_secs(1); - if let Some(ref hard_limit_until) = rpc.hard_limit_until { - let retry_at = Instant::now() + Duration::from_secs(1); + // hard_limit_until.send_replace(retry_at); + // } - hard_limit_until.send_replace(retry_at); - } - - continue; + return Ok(error.into()); } } } @@ -923,8 +935,8 @@ impl Web3Rpcs { // sleep (TODO: with a lock?) until our rate limits should be available // TODO: if a server catches up sync while we are waiting, we could stop waiting warn!( - "All rate limits exceeded. waiting for change in synced servers or {:?}", - retry_at + "All rate limits exceeded. waiting for change in synced servers or {:?}s", + retry_at.duration_since(Instant::now()).as_secs_f32() ); // TODO: have a separate column for rate limited? @@ -934,6 +946,7 @@ impl Web3Rpcs { tokio::select! { _ = sleep_until(retry_at) => { + trace!("slept!"); skip_rpcs.pop(); } _ = watch_consensus_connections.changed() => { @@ -948,6 +961,8 @@ impl Web3Rpcs { let waiting_for = min_block_needed.max(max_block_needed); + info!("waiting for {:?}", waiting_for); + if watch_for_block(waiting_for, &mut watch_consensus_connections).await? { // block found! continue so we can check for another rpc } else { @@ -966,10 +981,12 @@ impl Web3Rpcs { .store(true, Ordering::Release); } - if let Some(r) = method_not_available_response { + if let Some(err) = method_not_available_response { // TODO: this error response is likely the user's fault. do we actually want it marked as an error? maybe keep user and server error bools? // TODO: emit a stat for unsupported methods? it would be best to block them at the proxy instead of at the backend - return Ok(r); + // TODO: this is too verbose! + debug!("{}", serde_json::to_string(&err)?); + return Ok(err.into()); } let num_conns = self.by_name.load().len(); @@ -995,8 +1012,6 @@ impl Web3Rpcs { "No archive servers synced (min {:?}, max {:?}, head {:?}) ({} known)", min_block_needed, max_block_needed, head_block_num, num_conns ); - } else if num_skipped == 0 { - // TODO: what should we log? } else { error!( "Requested data is not available (min {:?}, max {:?}, head {:?}) ({} skipped, {} known)", @@ -1008,11 +1023,12 @@ impl Web3Rpcs { // TODO: what error code? // cloudflare gives {"jsonrpc":"2.0","error":{"code":-32043,"message":"Requested data cannot be older than 128 blocks."},"id":1} - Ok(JsonRpcForwardedResponse::from_str( - "Requested data is not available", - Some(-32043), - Some(request.id.clone()), - )) + Ok(JsonRpcErrorData { + message: Cow::Borrowed("Requested data is not available"), + code: -32043, + data: None, + } + .into()) } /// be sure there is a timeout on this or it might loop forever @@ -1027,7 +1043,7 @@ impl Web3Rpcs { error_level: Level, max_count: Option, always_include_backups: bool, - ) -> Web3ProxyResult { + ) -> Web3ProxyResult { let mut watch_consensus_rpcs = self.watch_consensus_rpcs_sender.subscribe(); let start = Instant::now(); @@ -1071,7 +1087,6 @@ impl Web3Rpcs { return self .try_send_parallel_requests( active_request_handles, - request.id.clone(), request.method.as_ref(), request.params.as_ref(), error_level, @@ -1126,7 +1141,7 @@ impl Web3Rpcs { request_metadata: Option<&Arc>, min_block_needed: Option<&U64>, max_block_needed: Option<&U64>, - ) -> Web3ProxyResult { + ) -> Web3ProxyResult { match authorization.checks.proxy_mode { ProxyMode::Debug | ProxyMode::Best => { self.try_send_best_consensus_head_connection( @@ -1158,7 +1173,7 @@ impl Serialize for Web3Rpcs { where S: Serializer, { - let mut state = serializer.serialize_struct("Web3Rpcs", 6)?; + let mut state = serializer.serialize_struct("Web3Rpcs", 1)?; { let by_name = self.by_name.load(); @@ -1178,12 +1193,12 @@ impl Serialize for Web3Rpcs { } } - self.blocks_by_hash.sync(); - self.blocks_by_number.sync(); - state.serialize_field("block_hashes_count", &self.blocks_by_hash.entry_count())?; - state.serialize_field("block_hashes_size", &self.blocks_by_hash.weighted_size())?; - state.serialize_field("block_numbers_count", &self.blocks_by_number.entry_count())?; - state.serialize_field("block_numbers_size", &self.blocks_by_number.weighted_size())?; + // self.blocks_by_hash.sync(); + // self.blocks_by_number.sync(); + // state.serialize_field("block_hashes_count", &self.blocks_by_hash.entry_count())?; + // state.serialize_field("block_hashes_size", &self.blocks_by_hash.weighted_size())?; + // state.serialize_field("block_numbers_count", &self.blocks_by_number.entry_count())?; + // state.serialize_field("block_numbers_size", &self.blocks_by_number.weighted_size())?; state.end() } } @@ -1391,8 +1406,8 @@ mod tests { (lagged_rpc.name.clone(), lagged_rpc.clone()), ]); - let (block_sender, _block_receiver) = kanal::unbounded_async(); - let (pending_tx_id_sender, pending_tx_id_receiver) = kanal::unbounded_async(); + let (block_sender, _block_receiver) = flume::unbounded(); + let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded(); let (watch_consensus_rpcs_sender, _watch_consensus_rpcs_receiver) = watch::channel(None); let (watch_consensus_head_sender, _watch_consensus_head_receiver) = watch::channel(None); @@ -1643,8 +1658,8 @@ mod tests { (archive_rpc.name.clone(), archive_rpc.clone()), ]); - let (block_sender, _) = kanal::unbounded_async(); - let (pending_tx_id_sender, pending_tx_id_receiver) = kanal::unbounded_async(); + let (block_sender, _) = flume::unbounded(); + let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded(); let (watch_consensus_rpcs_sender, _) = watch::channel(None); let (watch_consensus_head_sender, _watch_consensus_head_receiver) = watch::channel(None); @@ -1807,8 +1822,8 @@ mod tests { ), ]); - let (block_sender, _) = kanal::unbounded_async(); - let (pending_tx_id_sender, pending_tx_id_receiver) = kanal::unbounded_async(); + let (block_sender, _) = flume::unbounded(); + let (pending_tx_id_sender, pending_tx_id_receiver) = flume::unbounded(); let (watch_consensus_rpcs_sender, _) = watch::channel(None); let (watch_consensus_head_sender, _watch_consensus_head_receiver) = watch::channel(None); diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index be0487e0..b9faea75 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -98,8 +98,8 @@ impl Web3Rpc { http_interval_sender: Option>>, redis_pool: Option, block_map: BlocksByHashCache, - block_sender: Option>, - tx_id_sender: Option)>>, + block_sender: Option>, + tx_id_sender: Option)>>, reconnect: bool, ) -> anyhow::Result<(Arc, AnyhowJoinHandle<()>)> { let created_at = Instant::now(); @@ -389,7 +389,7 @@ impl Web3Rpc { /// TODO: maybe it would be better to use "Full Jitter". The "Full Jitter" approach uses less work, but slightly more time. pub async fn retrying_connect( self: &Arc, - block_sender: Option<&kanal::AsyncSender>, + block_sender: Option<&flume::Sender>, chain_id: u64, db_conn: Option<&DatabaseConnection>, delay_start: bool, @@ -452,7 +452,7 @@ impl Web3Rpc { /// connect to the web3 provider async fn connect( self: &Arc, - block_sender: Option<&kanal::AsyncSender>, + block_sender: Option<&flume::Sender>, chain_id: u64, db_conn: Option<&DatabaseConnection>, ) -> anyhow::Result<()> { @@ -474,7 +474,7 @@ impl Web3Rpc { // tell the block subscriber that this rpc doesn't have any blocks if let Some(block_sender) = block_sender { block_sender - .send((None, self.clone())) + .send_async((None, self.clone())) .await .context("block_sender during connect")?; } @@ -589,7 +589,7 @@ impl Web3Rpc { pub(crate) async fn send_head_block_result( self: &Arc, new_head_block: Result, ProviderError>, - block_sender: &kanal::AsyncSender, + block_sender: &flume::Sender, block_map: BlocksByHashCache, ) -> anyhow::Result<()> { let new_head_block = match new_head_block { @@ -652,7 +652,7 @@ impl Web3Rpc { // send an empty block to take this server out of rotation block_sender - .send((new_head_block, self.clone())) + .send_async((new_head_block, self.clone())) .await .context("block_sender")?; @@ -671,11 +671,11 @@ impl Web3Rpc { self: Arc, authorization: &Arc, block_map: BlocksByHashCache, - block_sender: Option>, + block_sender: Option>, chain_id: u64, disconnect_receiver: watch::Receiver, http_interval_sender: Option>>, - tx_id_sender: Option)>>, + tx_id_sender: Option)>>, ) -> anyhow::Result<()> { let error_handler = if self.backup { RequestErrorHandler::DebugLevel @@ -896,7 +896,7 @@ impl Web3Rpc { self: Arc, authorization: Arc, http_interval_receiver: Option>, - block_sender: kanal::AsyncSender, + block_sender: flume::Sender, block_map: BlocksByHashCache, ) -> anyhow::Result<()> { trace!("watching new heads on {}", self); @@ -1091,7 +1091,7 @@ impl Web3Rpc { async fn subscribe_pending_transactions( self: Arc, authorization: Arc, - tx_id_sender: kanal::AsyncSender<(TxHash, Arc)>, + tx_id_sender: flume::Sender<(TxHash, Arc)>, ) -> anyhow::Result<()> { // TODO: give this a separate client. don't use new_head_client for everything. especially a firehose this big // TODO: timeout @@ -1116,7 +1116,7 @@ impl Web3Rpc { while let Some(pending_tx_id) = stream.next().await { tx_id_sender - .send((pending_tx_id, self.clone())) + .send_async((pending_tx_id, self.clone())) .await .context("tx_id_sender")?; @@ -1397,6 +1397,16 @@ impl Serialize for Web3Rpc { state.serialize_field("head_block", &head_block)?; } + state.serialize_field( + "total_requests", + &self.total_requests.load(atomic::Ordering::Acquire), + )?; + + state.serialize_field( + "active_requests", + &self.active_requests.load(atomic::Ordering::Relaxed), + )?; + state.serialize_field("head_latency", &self.head_latency.read().value())?; state.serialize_field( @@ -1406,16 +1416,6 @@ impl Serialize for Web3Rpc { state.serialize_field("peak_ewma", self.peak_ewma().as_ref())?; - state.serialize_field( - "active_requests", - &self.active_requests.load(atomic::Ordering::Acquire), - )?; - - state.serialize_field( - "total_requests", - &self.total_requests.load(atomic::Ordering::Acquire), - )?; - state.end() } } diff --git a/web3_proxy/src/rpcs/request.rs b/web3_proxy/src/rpcs/request.rs index 18e4ca2e..c50dd202 100644 --- a/web3_proxy/src/rpcs/request.rs +++ b/web3_proxy/src/rpcs/request.rs @@ -7,7 +7,7 @@ use entities::revert_log; use entities::sea_orm_active_enums::Method; use ethers::providers::ProviderError; use ethers::types::{Address, Bytes}; -use log::{debug, error, info, trace, warn, Level}; +use log::{debug, error, trace, warn, Level}; use migration::sea_orm::{self, ActiveEnum, ActiveModelTrait}; use serde_json::json; use std::fmt; @@ -283,12 +283,19 @@ impl OpenRequestHandle { _ => err.as_error_response().map(|x| x.message.clone()), }; + trace!("error message: {:?}", msg); + if let Some(msg) = msg { if msg.starts_with("execution reverted") { trace!("revert from {}", self.rpc); ResponseTypes::Revert } else if msg.contains("limit") || msg.contains("request") { - trace!("rate limit from {}", self.rpc); + // TODO: too verbose + if self.rpc.backup { + trace!("rate limit from {}", self.rpc); + } else { + warn!("rate limit from {}", self.rpc); + } ResponseTypes::RateLimit } else { ResponseTypes::Error @@ -303,6 +310,15 @@ impl OpenRequestHandle { if matches!(response_type, ResponseTypes::RateLimit) { if let Some(hard_limit_until) = self.rpc.hard_limit_until.as_ref() { // TODO: how long should we actually wait? different providers have different times + // TODO: if rate_limit_period_seconds is set, use that + // TODO: check response headers for rate limits too + // TODO: warn if production, debug if backup + if self.rpc.backup { + debug!("unexpected rate limit on {}!", self.rpc); + } else { + warn!("unexpected rate limit on {}!", self.rpc); + } + let retry_at = Instant::now() + Duration::from_secs(1); trace!("retry {} at: {:?}", self.rpc, retry_at); @@ -374,8 +390,9 @@ impl OpenRequestHandle { } } } else if let Some(peak_latency) = &self.rpc.peak_latency { - trace!("updating peak_latency: {}", latency.as_secs_f64()); - peak_latency.report(latency); + // trace!("updating peak_latency: {}", latency.as_secs_f64()); + // peak_latency.report(latency); + trace!("peak latency disabled for now"); } else { unreachable!("peak_latency not initialized"); } diff --git a/web3_proxy/src/stats/mod.rs b/web3_proxy/src/stats/mod.rs index 14759611..930f9e04 100644 --- a/web3_proxy/src/stats/mod.rs +++ b/web3_proxy/src/stats/mod.rs @@ -672,6 +672,7 @@ impl RpcQueryStats { method: Option<&str>, ) -> Decimal { // some methods should be free. there might be cases where method isn't set (though they should be uncommon) + // TODO: get this list from config (and add more to it) if let Some(method) = method.as_ref() { if ["eth_chainId"].contains(method) { return 0.into(); diff --git a/web3_proxy/src/stats/stat_buffer.rs b/web3_proxy/src/stats/stat_buffer.rs index a9d14329..5de91e8b 100644 --- a/web3_proxy/src/stats/stat_buffer.rs +++ b/web3_proxy/src/stats/stat_buffer.rs @@ -8,7 +8,7 @@ use log::{error, info, trace}; use migration::sea_orm::prelude::Decimal; use migration::sea_orm::DatabaseConnection; use std::time::Duration; -use tokio::sync::{broadcast, mpsc}; +use tokio::sync::broadcast; use tokio::task::JoinHandle; use tokio::time::interval; @@ -30,7 +30,7 @@ pub struct BufferedRpcQueryStats { #[derive(From)] pub struct SpawnedStatBuffer { - pub stat_sender: mpsc::UnboundedSender, + pub stat_sender: flume::Sender, /// these handles are important and must be allowed to finish pub background_handle: JoinHandle>, } @@ -65,7 +65,7 @@ impl StatBuffer { return Ok(None); } - let (stat_sender, stat_receiver) = mpsc::unbounded_channel(); + let (stat_sender, stat_receiver) = flume::unbounded(); let timestamp_precision = TimestampPrecision::Seconds; let mut new = Self { @@ -94,7 +94,7 @@ impl StatBuffer { async fn aggregate_and_save_loop( &mut self, bucket: String, - mut stat_receiver: mpsc::UnboundedReceiver, + stat_receiver: flume::Receiver, mut shutdown_receiver: broadcast::Receiver<()>, ) -> anyhow::Result<()> { let mut tsdb_save_interval = @@ -107,11 +107,11 @@ impl StatBuffer { loop { tokio::select! { - stat = stat_receiver.recv() => { - // info!("Received stat"); + stat = stat_receiver.recv_async() => { + // trace!("Received stat"); // save the stat to a buffer match stat { - Some(AppStat::RpcQuery(stat)) => { + Ok(AppStat::RpcQuery(stat)) => { if self.influxdb_client.is_some() { // TODO: round the timestamp at all? @@ -128,8 +128,8 @@ impl StatBuffer { self.accounting_db_buffer.entry(stat.accounting_key(self.billing_period_seconds)).or_default().add(stat); } } - None => { - info!("done receiving stats"); + Err(err) => { + info!("error receiving stat: {}", err); break; } }