From f0b6465069c21a40639d7f96557cfedc4f71f94c Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Tue, 26 Sep 2023 18:18:06 -0700 Subject: [PATCH] streaming responses 2 (#219) * streaming responses compile, still small TODOs & cleanup * refactor to allow short curcuits on uncacheable requests (#220) * refactor to allow short curcuits on uncacheable requests * handle error inside an Arc from moka * arc instead of box * lint * more lint * add bonus rate limits and keep all semaphores after rate limits (#221) * add bonus rate limits and keep all semaphores after rate limits * remove stale todos * better function names * cargo upgrade and update * make some panics warn. more todo * pass request_metadata through so streaming responses include their final size * remove stale TODO * remove more stale todos * add file i missed * make to_json_string work well enough * check config for free_subscriptions --------- Co-authored-by: Rory Neithinger --- Cargo.lock | 63 ++- migration/src/m20221031_211916_clean_up.rs | 2 +- .../src/m20230119_204135_better_free_tier.rs | 2 +- redis-rate-limiter/Cargo.toml | 2 +- web3_proxy/Cargo.toml | 2 + web3_proxy/src/app/mod.rs | 426 +++++++++++------- web3_proxy/src/app/ws.rs | 74 +-- web3_proxy/src/config.rs | 11 +- web3_proxy/src/errors.rs | 20 +- web3_proxy/src/frontend/authorization.rs | 374 +++++++++------ web3_proxy/src/frontend/rpc_proxy_http.rs | 4 +- web3_proxy/src/frontend/rpc_proxy_ws.rs | 20 +- web3_proxy/src/frontend/streaming.rs | 40 ++ web3_proxy/src/jsonrpc.rs | 396 +++++++++++++++- web3_proxy/src/response_cache.rs | 36 +- web3_proxy/src/rpcs/consensus.rs | 22 - web3_proxy/src/rpcs/many.rs | 230 +++++----- web3_proxy/src/rpcs/one.rs | 51 ++- web3_proxy/src/rpcs/request.rs | 54 ++- web3_proxy/src/stats/mod.rs | 9 +- web3_proxy_cli/src/bin/wait_for_sync.rs | 2 - .../src/sub_commands/migrate_stats_to_v2.rs | 2 +- 22 files changed, 1275 insertions(+), 567 deletions(-) create mode 100644 web3_proxy/src/frontend/streaming.rs diff --git a/Cargo.lock b/Cargo.lock index ac4976ac..a718d95e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -777,9 +777,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.4.4" +version = "4.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1d7b8d5ec32af0fadc644bf1fd509a688c2103b185644bb1e29d164e0703136" +checksum = "824956d0dca8334758a5b7f7e50518d66ea319330cbceedcf76905c2f6ab30e3" dependencies = [ "clap_builder", "clap_derive 4.4.2", @@ -787,9 +787,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.4.4" +version = "4.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5179bb514e4d7c2051749d8fcefa2ed6d06a9f4e6d69faf3805f5d80b8cf8d56" +checksum = "122ec64120a49b4563ccaedcbea7818d069ed8e9aa6d829b82d8a4128936b2ab" dependencies = [ "anstream", "anstyle", @@ -1240,23 +1240,22 @@ checksum = "c2e66c9d817f1720209181c316d28635c050fa304f9c79e47a520882661b7308" [[package]] name = "deadpool" -version = "0.9.5" +version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "421fe0f90f2ab22016f32a9881be5134fdd71c65298917084b0c7477cbc3856e" +checksum = "fb84100978c1c7b37f09ed3ce3e5f843af02c2a2c431bae5b19230dad2c1b490" dependencies = [ "async-trait", "deadpool-runtime", "num_cpus", - "retain_mut", "serde", "tokio", ] [[package]] name = "deadpool-redis" -version = "0.12.0" +version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f1760f60ffc6653b4afd924c5792098d8c00d9a3deb6b3d989eac17949dc422" +checksum = "84930e585871d35b8e06d3e03d03e3a8a4c5dc71afa4376c7cd5f9223e1da1ea" dependencies = [ "deadpool", "redis", @@ -1265,9 +1264,9 @@ dependencies = [ [[package]] name = "deadpool-runtime" -version = "0.1.2" +version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eaa37046cc0f6c3cc6090fbdbf73ef0b8ef4cfcc37f6befc0020f63e8cf121e1" +checksum = "63dfa964fe2a66f3fde91fc70b267fe193d822c7e603e2a675a49a7f46ad3f49" dependencies = [ "tokio", ] @@ -1952,9 +1951,9 @@ dependencies = [ [[package]] name = "fastrand" -version = "2.0.0" +version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6999dc1837253364c2ebb0704ba97994bd874e8f195d665c50b7548f6ea92764" +checksum = "25cbce373ec4653f1a01a31e8a5e5ec0c622dc27ff9c4e6606eefef5cbbed4a5" [[package]] name = "fdlimit" @@ -3553,9 +3552,9 @@ dependencies = [ [[package]] name = "parking" -version = "2.1.0" +version = "2.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "14f2252c834a40ed9bb5422029649578e63aa341ac401f74e719dd1afda8394e" +checksum = "e52c774a4c39359c1d1c52e43f73dd91a75a614652c825408eec30c95a9b2067" [[package]] name = "parking_lot" @@ -3662,9 +3661,9 @@ checksum = "9b2a4787296e9989611394c33f193f676704af1686e70b8f8033ab5ba9a35a94" [[package]] name = "pest" -version = "2.7.3" +version = "2.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d7a4d085fd991ac8d5b05a147b437791b4260b76326baf0fc60cf7c9c27ecd33" +checksum = "c022f1e7b65d6a24c0dbbd5fb344c66881bc01f3e5ae74a1c8100f2f985d98a4" dependencies = [ "memchr", "thiserror", @@ -3673,9 +3672,9 @@ dependencies = [ [[package]] name = "pest_derive" -version = "2.7.3" +version = "2.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2bee7be22ce7918f641a33f08e3f43388c7656772244e2bbb2477f44cc9021a" +checksum = "35513f630d46400a977c4cb58f78e1bfbe01434316e60c37d27b9ad6139c66d8" dependencies = [ "pest", "pest_generator", @@ -3683,9 +3682,9 @@ dependencies = [ [[package]] name = "pest_generator" -version = "2.7.3" +version = "2.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d1511785c5e98d79a05e8a6bc34b4ac2168a0e3e92161862030ad84daa223141" +checksum = "bc9fc1b9e7057baba189b5c626e2d6f40681ae5b6eb064dc7c7834101ec8123a" dependencies = [ "pest", "pest_meta", @@ -3696,9 +3695,9 @@ dependencies = [ [[package]] name = "pest_meta" -version = "2.7.3" +version = "2.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b42f0394d3123e33353ca5e1e89092e533d2cc490389f2bd6131c43c634ebc5f" +checksum = "1df74e9e7ec4053ceb980e7c0c8bd3594e977fde1af91daba9c928e8e8c6708d" dependencies = [ "once_cell", "pest", @@ -4351,12 +4350,6 @@ dependencies = [ "winreg", ] -[[package]] -name = "retain_mut" -version = "0.1.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4389f1d5789befaf6029ebd9f7dac4af7f7e3d61b69d4f30e2ac02b57e7712b0" - [[package]] name = "rfc6979" version = "0.4.0" @@ -4736,7 +4729,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6bef60732e6016c5643350c87f43a697e8c074e41e4e2a9d961c056cb1310915" dependencies = [ "chrono", - "clap 4.4.4", + "clap 4.4.5", "dotenvy", "glob", "regex", @@ -4767,7 +4760,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7e53b6ddaf6dbb84e5dfc3fb78634ed0a4d6d64e7479500ab2585db239747031" dependencies = [ "async-trait", - "clap 4.4.4", + "clap 4.4.5", "dotenvy", "futures", "sea-orm", @@ -5172,9 +5165,9 @@ dependencies = [ [[package]] name = "sha2" -version = "0.10.7" +version = "0.10.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "479fb9d862239e610720565ca91403019f2f00410f1864c5aa7479b950a76ed8" +checksum = "793db75ad2bcafc3ffa7c68b215fee268f537982cd901d132f89c6343f3a3dc8" dependencies = [ "cfg-if", "cpufeatures", @@ -5777,7 +5770,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cb94d2f3cc536af71caac6b6fcebf65860b347e7ce0cc9ebe8f70d3e521054ef" dependencies = [ "cfg-if", - "fastrand 2.0.0", + "fastrand 2.0.1", "redox_syscall 0.3.5", "rustix", "windows-sys", @@ -6632,6 +6625,7 @@ dependencies = [ "axum-client-ip", "axum-macros", "base64 0.21.4", + "bytes", "chrono", "console-subscriber", "counter", @@ -6646,6 +6640,7 @@ dependencies = [ "fdlimit", "fstrings", "futures", + "futures-util", "glob", "handlebars", "hashbrown 0.14.0", diff --git a/migration/src/m20221031_211916_clean_up.rs b/migration/src/m20221031_211916_clean_up.rs index 7cceebf9..8a485095 100644 --- a/migration/src/m20221031_211916_clean_up.rs +++ b/migration/src/m20221031_211916_clean_up.rs @@ -97,6 +97,6 @@ impl MigrationTrait for Migration { async fn down(&self, _manager: &SchemaManager) -> Result<(), DbErr> { // Replace the sample below with your own migration scripts - todo!(); + unimplemented!(); } } diff --git a/migration/src/m20230119_204135_better_free_tier.rs b/migration/src/m20230119_204135_better_free_tier.rs index aef9e5f8..93e3de3c 100644 --- a/migration/src/m20230119_204135_better_free_tier.rs +++ b/migration/src/m20230119_204135_better_free_tier.rs @@ -27,7 +27,7 @@ impl MigrationTrait for Migration { } async fn down(&self, _manager: &SchemaManager) -> Result<(), DbErr> { - todo!(); + unimplemented!(); } } diff --git a/redis-rate-limiter/Cargo.toml b/redis-rate-limiter/Cargo.toml index 52beb618..f948ce98 100644 --- a/redis-rate-limiter/Cargo.toml +++ b/redis-rate-limiter/Cargo.toml @@ -7,5 +7,5 @@ edition = "2021" [dependencies] anyhow = "1.0.75" chrono = "0.4.31" -deadpool-redis = { version = "0.12.0", features = ["rt_tokio_1", "serde"] } +deadpool-redis = { version = "0.13.0", features = ["rt_tokio_1", "serde"] } tokio = "1.32.0" diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index 63b236eb..a37f4daa 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -99,6 +99,8 @@ uuid = { version = "1.4.1", default-features = false, features = ["fast-rng", "v # TODO: why doesn't this work in dev-dependencies test-log = { version = "0.2.12", default-features = false, features = ["trace"] } +bytes = "1.5.0" +futures-util = "0.3.28" # # TODO: bring this back # check-if-email-exists = "0.9.0" diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index f83ca022..7b2f32bf 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -10,8 +10,8 @@ use crate::frontend::authorization::{ use crate::frontend::rpc_proxy_ws::ProxyMode; use crate::globals::{global_db_conn, DatabaseError, DB_CONN, DB_REPLICA}; use crate::jsonrpc::{ - JsonRpcErrorData, JsonRpcForwardedResponse, JsonRpcForwardedResponseEnum, JsonRpcId, - JsonRpcParams, JsonRpcRequest, JsonRpcRequestEnum, JsonRpcResultData, + self, JsonRpcErrorData, JsonRpcId, JsonRpcParams, JsonRpcRequest, JsonRpcRequestEnum, + JsonRpcResultData, SingleResponse, }; use crate::relational_db::{connect_db, migrate_db}; use crate::response_cache::{ @@ -84,6 +84,10 @@ pub struct Web3ProxyApp { pub http_client: Option, /// track JSONRPC responses pub jsonrpc_response_cache: JsonRpcResponseCache, + /// track JSONRPC cache keys that have failed caching + pub jsonrpc_response_failed_cache_keys: Cache, + /// de-dupe requests (but with easy timeouts) + pub jsonrpc_response_semaphores: Cache>, /// rpc clients that subscribe to newHeads use this channel /// don't drop this or the sender will stop working /// TODO: broadcast channel instead? @@ -93,10 +97,13 @@ pub struct Web3ProxyApp { pub hostname: Option, pub frontend_port: Arc, /// rate limit anonymous users - pub frontend_ip_rate_limiter: Option>, + pub frontend_public_rate_limiter: Option>, + /// bonus rate limit for anonymous users + pub bonus_frontend_public_rate_limiter: Option, /// rate limit authenticated users - pub frontend_registered_user_rate_limiter: - Option>, + pub frontend_premium_rate_limiter: Option>, + /// bonus rate limit for authenticated users + pub bonus_frontend_premium_rate_limiter: Option, /// concurrent/parallel request limits for anonymous users pub ip_semaphores: Cache>, /// give some bonus capacity to public users @@ -359,9 +366,11 @@ impl Web3ProxyApp { // create rate limiters // these are optional. they require redis - let mut frontend_ip_rate_limiter = None; - let mut frontend_registered_user_rate_limiter = None; + let mut frontend_public_rate_limiter = None; + let mut frontend_premium_rate_limiter = None; let mut login_rate_limiter = None; + let mut bonus_frontend_public_rate_limiter: Option = None; + let mut bonus_frontend_premium_rate_limiter: Option = None; if let Some(ref redis_pool) = vredis_pool { if let Some(public_requests_per_period) = top_config.app.public_requests_per_period { @@ -377,10 +386,29 @@ impl Web3ProxyApp { // these two rate limiters can share the base limiter // these are deferred rate limiters because we don't want redis network requests on the hot path // TODO: take cache_size from config - frontend_ip_rate_limiter = + frontend_public_rate_limiter = Some(DeferredRateLimiter::new(20_000, "ip", rpc_rrl.clone(), None).await); - frontend_registered_user_rate_limiter = + frontend_premium_rate_limiter = Some(DeferredRateLimiter::new(20_000, "key", rpc_rrl, None).await); + + if top_config.app.bonus_frontend_public_rate_limit > 0 { + bonus_frontend_public_rate_limiter = Some(RedisRateLimiter::new( + "web3_proxy", + "bonus_frontend_public", + top_config.app.bonus_frontend_public_rate_limit, + 60.0, + redis_pool.clone(), + )); + } + if top_config.app.bonus_frontend_premium_rate_limit > 0 { + bonus_frontend_premium_rate_limiter = Some(RedisRateLimiter::new( + "web3_proxy", + "bonus_frontend_premium", + top_config.app.bonus_frontend_premium_rate_limit, + 60.0, + redis_pool.clone(), + )); + } } // login rate limiter @@ -493,26 +521,39 @@ impl Web3ProxyApp { .ok() .and_then(|x| x.to_str().map(|x| x.to_string())); - // TODO: get the size out of the config - let bonus_ip_concurrency = Arc::new(Semaphore::new(top_config.app.bonus_ip_concurrency)); + let bonus_ip_concurrency = + Arc::new(Semaphore::new(top_config.app.bonus_public_concurrency)); let bonus_user_concurrency = - Arc::new(Semaphore::new(top_config.app.bonus_user_concurrency)); + Arc::new(Semaphore::new(top_config.app.bonus_premium_concurrency)); + + // TODO: what size? + let jsonrpc_response_semaphores = CacheBuilder::new(10_000) + .name("jsonrpc_response_semaphores") + .build(); + + let jsonrpc_response_failed_cache_keys = CacheBuilder::new(100_000) + .name("jsonrpc_response_failed_cache_keys") + .build(); let app = Self { balanced_rpcs, + bonus_frontend_public_rate_limiter, + bonus_frontend_premium_rate_limiter, bonus_ip_concurrency, bonus_user_concurrency, bundler_4337_rpcs, config: top_config.app.clone(), - frontend_ip_rate_limiter, + frontend_public_rate_limiter, frontend_port: frontend_port.clone(), - frontend_registered_user_rate_limiter, + frontend_premium_rate_limiter, hostname, http_client, influxdb_client, internal_provider: Default::default(), ip_semaphores, jsonrpc_response_cache, + jsonrpc_response_failed_cache_keys, + jsonrpc_response_semaphores, kafka_producer, login_rate_limiter, pending_txid_firehose: deduped_txid_firehose, @@ -983,15 +1024,10 @@ impl Web3ProxyApp { let (_, response, _) = self.proxy_request(request, authorization, None).await; - if let Some(result) = response.result { - let result = serde_json::from_str(result.get())?; - - Ok(result) - } else if let Some(error_data) = response.error { - // TODO: this might lose the http error code - Err(Web3ProxyError::JsonRpcErrorData(error_data)) - } else { - unimplemented!(); + // TODO: error handling? + match response.parsed().await?.payload { + jsonrpc::Payload::Success { result } => Ok(serde_json::from_str(result.get())?), + jsonrpc::Payload::Error { error } => Err(Web3ProxyError::JsonRpcErrorData(error)), } } @@ -1000,7 +1036,7 @@ impl Web3ProxyApp { self: &Arc, authorization: Arc, request: JsonRpcRequestEnum, - ) -> Web3ProxyResult<(StatusCode, JsonRpcForwardedResponseEnum, Vec>)> { + ) -> Web3ProxyResult<(StatusCode, jsonrpc::Response, Vec>)> { // trace!(?request, "proxy_web3_rpc"); let response = match request { @@ -1009,11 +1045,7 @@ impl Web3ProxyApp { .proxy_request(request, authorization.clone(), None) .await; - ( - status_code, - JsonRpcForwardedResponseEnum::Single(response), - rpcs, - ) + (status_code, jsonrpc::Response::Single(response), rpcs) } JsonRpcRequestEnum::Batch(requests) => { let (responses, rpcs) = self @@ -1021,11 +1053,7 @@ impl Web3ProxyApp { .await?; // TODO: real status code. if an error happens, i don't think we are following the spec here - ( - StatusCode::OK, - JsonRpcForwardedResponseEnum::Batch(responses), - rpcs, - ) + (StatusCode::OK, jsonrpc::Response::Batch(responses), rpcs) } }; @@ -1038,7 +1066,7 @@ impl Web3ProxyApp { self: &Arc, authorization: &Arc, requests: Vec, - ) -> Web3ProxyResult<(Vec, Vec>)> { + ) -> Web3ProxyResult<(Vec, Vec>)> { // TODO: we should probably change ethers-rs to support this directly. they pushed this off to v2 though let num_requests = requests.len(); @@ -1065,14 +1093,15 @@ impl Web3ProxyApp { ) .await; - let mut collected: Vec = Vec::with_capacity(num_requests); + let mut collected: Vec = Vec::with_capacity(num_requests); let mut collected_rpc_names: HashSet = HashSet::new(); let mut collected_rpcs: Vec> = vec![]; for response in responses { // TODO: any way to attach the tried rpcs to the error? it is likely helpful let (_status_code, response, rpcs) = response; - collected.push(response); + // TODO: individual error handling + collected.push(response.parsed().await?); collected_rpcs.extend(rpcs.into_iter().filter(|x| { if collected_rpc_names.contains(&x.name) { false @@ -1107,14 +1136,14 @@ impl Web3ProxyApp { method: &str, params: &P, request_metadata: &Arc, - ) -> Web3ProxyResult> { + ) -> Web3ProxyResult> { if let Some(protected_rpcs) = self.private_rpcs.as_ref() { if !protected_rpcs.is_empty() { let protected_response = protected_rpcs .try_send_all_synced_connections( method, params, - Some(request_metadata), + request_metadata, None, None, Some(Duration::from_secs(10)), @@ -1144,7 +1173,7 @@ impl Web3ProxyApp { .try_send_all_synced_connections( method, params, - Some(request_metadata), + request_metadata, None, None, Some(Duration::from_secs(10)), @@ -1160,7 +1189,7 @@ impl Web3ProxyApp { mut request: JsonRpcRequest, authorization: Arc, head_block: Option<&Web3ProxyBlock>, - ) -> (StatusCode, JsonRpcForwardedResponse, Vec>) { + ) -> (StatusCode, jsonrpc::SingleResponse, Vec>) { let request_metadata = RequestMetadata::new( self, authorization, @@ -1184,8 +1213,10 @@ impl Web3ProxyApp { tries += 1; - let (code, response_data) = match self + let (code, response) = match self ._proxy_request_with_caching( + // TODO: avoid clone here + response_id.clone(), &request.method, &mut request.params, head_block, @@ -1211,7 +1242,7 @@ impl Web3ProxyApp { .user_error_response .store(false, Ordering::Relaxed); - err.as_response_parts() + err.as_json_response_parts(response_id) } Err(Web3ProxyError::JsonRpcResponse(response_data)) => { request_metadata @@ -1221,7 +1252,9 @@ impl Web3ProxyApp { .user_error_response .store(response_data.is_error(), Ordering::Relaxed); - (StatusCode::OK, response_data) + let response = + jsonrpc::ParsedResponse::from_response_data(response_data, response_id); + (StatusCode::OK, response.into()) } Err(err) => { if tries <= max_tries { @@ -1238,12 +1271,10 @@ impl Web3ProxyApp { .user_error_response .store(false, Ordering::Relaxed); - err.as_response_parts() + err.as_json_response_parts(response_id) } }; - let response = JsonRpcForwardedResponse::from_response_data(response_data, response_id); - // TODO: this serializes twice :/ request_metadata.add_response(ResponseOrBytes::Response(&response)); @@ -1260,14 +1291,15 @@ impl Web3ProxyApp { /// TODO: how can we make this generic? async fn _proxy_request_with_caching( self: &Arc, + id: Box, method: &str, params: &mut serde_json::Value, head_block: Option<&Web3ProxyBlock>, request_metadata: &Arc, - ) -> Web3ProxyResult>> { + ) -> Web3ProxyResult { // TODO: serve net_version without querying the backend // TODO: don't force RawValue - let response_data: JsonRpcResponseEnum> = match method { + let response: jsonrpc::SingleResponse = match method { // lots of commands are blocked method @ ("db_getHex" | "db_getString" @@ -1357,18 +1389,16 @@ impl Web3ProxyApp { | "eth_supportedEntryPoints" | "web3_bundlerVersion") => match self.bundler_4337_rpcs.as_ref() { Some(bundler_4337_rpcs) => { - let x = bundler_4337_rpcs - .try_proxy_connection::<_, Box>( + bundler_4337_rpcs + .try_proxy_connection::<_, Arc>( method, params, - Some(request_metadata), + request_metadata, Some(Duration::from_secs(30)), None, None, ) - .await?; - - x.into() + .await? } None => { // TODO: stats even when we error! @@ -1376,22 +1406,23 @@ impl Web3ProxyApp { return Err(Web3ProxyError::NoServersSynced); } }, - "eth_accounts" => JsonRpcResponseEnum::from(serde_json::Value::Array(vec![])), + // TODO: id + "eth_accounts" => jsonrpc::ParsedResponse::from_value(serde_json::Value::Array(vec![]), id).into(), "eth_blockNumber" => { match head_block.cloned().or(self.balanced_rpcs.head_block()) { - Some(head_block) => JsonRpcResponseEnum::from(json!(head_block.number())), + Some(head_block) => jsonrpc::ParsedResponse::from_value(json!(head_block.number()), id).into(), None => { return Err(Web3ProxyError::NoServersSynced); } } } - "eth_chainId" => JsonRpcResponseEnum::from(json!(U64::from(self.config.chain_id))), + "eth_chainId" => jsonrpc::ParsedResponse::from_value(json!(U64::from(self.config.chain_id)), id).into(), // TODO: eth_callBundle (https://docs.flashbots.net/flashbots-auction/searchers/advanced/rpc-endpoint#eth_callbundle) // TODO: eth_cancelPrivateTransaction (https://docs.flashbots.net/flashbots-auction/searchers/advanced/rpc-endpoint#eth_cancelprivatetransaction, but maybe just reject) // TODO: eth_sendPrivateTransaction (https://docs.flashbots.net/flashbots-auction/searchers/advanced/rpc-endpoint#eth_sendprivatetransaction) "eth_coinbase" => { // no need for serving coinbase - JsonRpcResponseEnum::from(json!(Address::zero())) + jsonrpc::ParsedResponse::from_value(json!(Address::zero()), id).into() } "eth_estimateGas" => { // TODO: timeout @@ -1400,12 +1431,15 @@ impl Web3ProxyApp { .try_proxy_connection::<_, U256>( method, params, - Some(request_metadata), + request_metadata, Some(Duration::from_secs(30)), None, None, ) - .await?; + .await? + .parsed() + .await? + .into_result()?; let gas_increase = if let Some(gas_increase_percent) = self.config.gas_increase_percent @@ -1422,59 +1456,62 @@ impl Web3ProxyApp { gas_estimate += gas_increase; // TODO: from_serializable? - JsonRpcResponseEnum::from(json!(gas_estimate)) + jsonrpc::ParsedResponse::from_value(json!(gas_estimate), id).into() } "eth_getTransactionReceipt" | "eth_getTransactionByHash" => { // try to get the transaction without specifying a min_block_height // TODO: timeout - let mut response_data = self + let parsed = match self .balanced_rpcs - .try_proxy_connection::<_, Box>( + .try_proxy_connection::<_, Arc>( method, params, - Some(request_metadata), + request_metadata, Some(Duration::from_secs(30)), None, None, ) - .await; + .await { + Ok(response) => response.parsed().await.map_err(Into::into), + Err(err) => Err(err), + }; // if we got "null", it is probably because the tx is old. retry on nodes with old block data - let try_archive = if let Ok(value) = &response_data { + let try_archive = if let Ok(Some(value)) = parsed.as_ref().map(|r| r.result()) { value.get() == "null" || value.get() == "" || value.get() == "\"\"" } else { true }; - if try_archive { - if let Some(head_block_num) = head_block.map(|x| x.number()) { - // TODO: only charge for archive if it gave a result - request_metadata - .archive_request - .store(true, atomic::Ordering::Relaxed); + if try_archive && let Some(head_block_num) = head_block.map(|x| x.number()) { + // TODO: only charge for archive if it gave a result + request_metadata + .archive_request + .store(true, atomic::Ordering::Relaxed); - response_data = self - .balanced_rpcs - .try_proxy_connection::<_, Box>( - method, - params, - Some(request_metadata), - Some(Duration::from_secs(30)), - // TODO: should this be block 0 instead? - Some(&U64::one()), - // TODO: is this a good way to allow lagged archive nodes a try - Some(&head_block_num.saturating_sub(5.into()).clamp(U64::one(), U64::MAX)), - ) - .await; - } + self + .balanced_rpcs + .try_proxy_connection::<_, Arc>( + method, + params, + request_metadata, + Some(Duration::from_secs(30)), + // TODO: should this be block 0 instead? + Some(&U64::one()), + // TODO: is this a good way to allow lagged archive nodes a try + Some(&head_block_num.saturating_sub(5.into()).clamp(U64::one(), U64::MAX)), + ) + .await? + } else { + parsed?.into() } - response_data.try_into()? + // TODO: if parsed is an error, return a null instead } // TODO: eth_gasPrice that does awesome magic to predict the future - "eth_hashrate" => JsonRpcResponseEnum::from(json!(U64::zero())), - "eth_mining" => JsonRpcResponseEnum::from(serde_json::Value::Bool(false)), + "eth_hashrate" => jsonrpc::ParsedResponse::from_value(json!(U64::zero()), id).into(), + "eth_mining" => jsonrpc::ParsedResponse::from_value(serde_json::Value::Bool(false), id).into(), // TODO: eth_sendBundle (flashbots/eden command) // broadcast transactions to all private rpcs at once "eth_sendRawTransaction" => { @@ -1574,35 +1611,34 @@ impl Web3ProxyApp { } } - response + jsonrpc::ParsedResponse::from_response_data(response, id).into() } "eth_syncing" => { // no stats on this. its cheap // TODO: return a real response if all backends are syncing or if no servers in sync // TODO: const - JsonRpcResponseEnum::from(serde_json::Value::Bool(false)) + jsonrpc::ParsedResponse::from_value(serde_json::Value::Bool(false), id).into() } - "eth_subscribe" => JsonRpcErrorData { + "eth_subscribe" => jsonrpc::ParsedResponse::from_error(JsonRpcErrorData { message: "notifications not supported. eth_subscribe is only available over a websocket".into(), code: -32601, data: None, - } - .into(), - "eth_unsubscribe" => JsonRpcErrorData { + }, id).into(), + "eth_unsubscribe" => jsonrpc::ParsedResponse::from_error(JsonRpcErrorData { message: "notifications not supported. eth_unsubscribe is only available over a websocket".into(), code: -32601, data: None, - }.into(), + }, id).into(), "net_listening" => { // TODO: only true if there are some backends on balanced_rpcs? // TODO: const - JsonRpcResponseEnum::from(serde_json::Value::Bool(true)) + jsonrpc::ParsedResponse::from_value(serde_json::Value::Bool(true), id).into() } - "net_peerCount" => - JsonRpcResponseEnum::from(json!(U64::from(self.balanced_rpcs.num_synced_rpcs()))) + "net_peerCount" => + jsonrpc::ParsedResponse::from_value(json!(U64::from(self.balanced_rpcs.num_synced_rpcs())), id).into() , - "web3_clientVersion" => - JsonRpcResponseEnum::from(serde_json::Value::String(APP_USER_AGENT.to_string())) + "web3_clientVersion" => + jsonrpc::ParsedResponse::from_value(serde_json::Value::String(APP_USER_AGENT.to_string()), id).into() , "web3_sha3" => { // returns Keccak-256 (not the standardized SHA3-256) of the given data. @@ -1615,11 +1651,11 @@ impl Web3ProxyApp { { // TODO: what error code? // TODO: use Web3ProxyError::BadRequest - JsonRpcErrorData { + jsonrpc::ParsedResponse::from_error(JsonRpcErrorData { message: "Invalid request".into(), code: -32600, data: None - }.into() + }, id).into() } else { // TODO: BadRequest instead of web3_context let param = Bytes::from_str( @@ -1637,25 +1673,25 @@ impl Web3ProxyApp { let hash = H256::from(keccak256(param)); - JsonRpcResponseEnum::from(json!(hash)) + jsonrpc::ParsedResponse::from_value(json!(hash), id).into() } } _ => { // TODO: this needs the correct error code in the response // TODO: Web3ProxyError::BadRequest instead? - JsonRpcErrorData { + jsonrpc::ParsedResponse::from_error(JsonRpcErrorData { message: "invalid request".into(), code: StatusCode::BAD_REQUEST.as_u16().into(), data: None, - }.into() + }, id).into() } } } - "test" => JsonRpcErrorData { + "test" => jsonrpc::ParsedResponse::from_error(JsonRpcErrorData { message: "The method test does not exist/is not available.".into(), code: -32601, data: None, - }.into(), + }, id).into(), // anything else gets sent to backend rpcs and cached method => { if method.starts_with("admin_") { @@ -1744,89 +1780,157 @@ impl Web3ProxyApp { // TODO: think more about this timeout. we should probably have a `request_expires_at` Duration on the request_metadata // TODO: different user tiers should have different timeouts // erigon's timeout is 300, so keep this a few seconds shorter - let max_wait = Some(Duration::from_secs(295)); + let max_wait = Some(Duration::from_secs(290)); if let Some(cache_key) = cache_key { let from_block_num = cache_key.from_block_num().copied(); let to_block_num = cache_key.to_block_num().copied(); let cache_jsonrpc_errors = cache_key.cache_errors(); + let cache_key_hash = cache_key.hash(); // don't cache anything larger than 16 MiB let max_response_cache_bytes = 16 * (1024 ^ 2); // self.config.max_response_cache_bytes; // TODO: try to fetch out of s3 - // TODO: clone less? - let app = self.clone(); - let method = method.to_string(); - let params = params.clone(); - let request_metadata = request_metadata.clone(); + let x: SingleResponse = if let Some(data) = self.jsonrpc_response_cache.get(&cache_key_hash).await { + // it was cached! easy! + // TODO: wait. this currently panics. why? + jsonrpc::ParsedResponse::from_response_data(data, id).into() + } else if self.jsonrpc_response_failed_cache_keys.contains_key(&cache_key_hash) { + // this is a cache_key that we know won't cache + // NOTICE! We do **NOT** use get which means the key's hotness is not updated. we don't use time-to-idler here so thats fine. but be careful if that changes + timeout( + Duration::from_secs(295), + self.balanced_rpcs + .try_proxy_connection::<_, Arc>( + method, + params, + request_metadata, + max_wait, + None, + None, + ) + ).await?? + } else { + // TODO: acquire a semaphore from a map with the cache key as the key + // TODO: try it, if that fails, then we are already running. wait until the semaphore completes and then run on. they will all go only one at a time though + // TODO: if we got the semaphore, do the try_get_with + // TODO: if the response is too big to cache mark the cache_key as not cacheable. maybe CacheMode can check that cache? - let f = async move { - app - .jsonrpc_response_cache - .try_get_with::<_, Web3ProxyError>(cache_key.hash(), async { - let response_data = timeout(Duration::from_secs(290), app.balanced_rpcs + let s = self.jsonrpc_response_semaphores.get_with(cache_key_hash, async move { + Arc::new(Semaphore::new(1)) + }).await; + + // TODO: don't always do 1 second. use the median request latency instead + match timeout(Duration::from_secs(1), s.acquire_owned()).await { + Err(_) => { + // TODO: should we try to cache this? whatever has the semaphore //should// handle that for us + timeout( + Duration::from_secs(295), + self.balanced_rpcs .try_proxy_connection::<_, Arc>( - &method, - ¶ms, - Some(&request_metadata), + method, + params, + request_metadata, max_wait, - from_block_num.as_ref(), - to_block_num.as_ref(), - )).await; + None, + None, + ) + ).await?? + } + Ok(_p) => { + // we got the permit! we are either first, or we were waiting a short time to get it in which case this response should be cached + // TODO: clone less? + let f = { + let app = self.clone(); + let method = method.to_string(); + let params = params.clone(); + let request_metadata = request_metadata.clone(); - match response_data { - Ok(response_data) => { - if !cache_jsonrpc_errors && let Err(err) = response_data { - // if we are not supposed to cache jsonrpc errors, - // then we must not convert Provider errors into a JsonRpcResponseEnum - // return all the errors now. moka will not cache Err results - Err(err) - } else { - // convert jsonrpc errors into JsonRpcResponseEnum, but leave the rest as errors - let response_data: JsonRpcResponseEnum> = response_data.try_into()?; + async move { + app + .jsonrpc_response_cache + .try_get_with::<_, Web3ProxyError>(cache_key.hash(), async { + let response_data = timeout(Duration::from_secs(290), app.balanced_rpcs + .try_proxy_connection::<_, Arc>( + &method, + ¶ms, + &request_metadata, + max_wait, + from_block_num.as_ref(), + to_block_num.as_ref(), + )).await; - if response_data.is_null() { - // don't ever cache "null" as a success. its too likely to be a problem - Err(Web3ProxyError::NullJsonRpcResult) - } else if response_data.num_bytes() > max_response_cache_bytes { - // don't cache really large requests - // TODO: emit a stat - Err(Web3ProxyError::JsonRpcResponse(response_data)) - } else { - // TODO: response data should maybe be Arc>>, but that's more work - Ok(response_data) - } - } + match response_data { + Ok(response_data) => { + if !cache_jsonrpc_errors && let Err(err) = response_data { + // if we are not supposed to cache jsonrpc errors, + // then we must not convert Provider errors into a JsonRpcResponseEnum + // return all the errors now. moka will not cache Err results + Err(err) + } else { + // convert jsonrpc errors into JsonRpcResponseEnum, but leave the rest as errors + let response_data: JsonRpcResponseEnum> = response_data.try_into()?; + + if response_data.is_null() { + // don't ever cache "null" as a success. its too likely to be a problem + Err(Web3ProxyError::NullJsonRpcResult) + } else if response_data.num_bytes() > max_response_cache_bytes { + // don't cache really large requests + // TODO: emit a stat + Err(Web3ProxyError::JsonRpcResponse(response_data)) + } else { + // TODO: response data should maybe be Arc>>, but that's more work + Ok(response_data) + } + } + } + Err(err) => Err(Web3ProxyError::from(err)), + } + }).await } - Err(err) => Err(Web3ProxyError::from(err)), - } - }).await - }; + }; - // this is spawned so that if the client disconnects, the app keeps polling the future with a lock inside the moka cache - tokio::spawn(f).await?? + // this is spawned so that if the client disconnects, the app keeps polling the future with a lock inside the moka cache + // TODO: is this expect actually safe!? could there be a background process that still has the arc? + match tokio::spawn(f).await? { + Ok(response_data) => Ok(jsonrpc::ParsedResponse::from_response_data(response_data, id).into()), + Err(err) => { + self.jsonrpc_response_failed_cache_keys.insert(cache_key_hash, ()).await; + + if let Web3ProxyError::StreamResponse(x) = err.as_ref() { + let x = x.lock().take().expect("stream processing should only happen once"); + + Ok(jsonrpc::SingleResponse::Stream(x)) + } else { + Err(err) + } + }, + }? + } + } + }; + + x } else { - let x = timeout( - Duration::from_secs(300), + timeout( + Duration::from_secs(295), self.balanced_rpcs .try_proxy_connection::<_, Arc>( method, params, - Some(request_metadata), + request_metadata, max_wait, None, None, ) - ).await??; - - x.into() + ).await?? } } }; - Ok(response_data) + Ok(response) } } diff --git a/web3_proxy/src/app/ws.rs b/web3_proxy/src/app/ws.rs index 0ac99493..b93e2416 100644 --- a/web3_proxy/src/app/ws.rs +++ b/web3_proxy/src/app/ws.rs @@ -3,8 +3,7 @@ use super::Web3ProxyApp; use crate::errors::{Web3ProxyError, Web3ProxyResult}; use crate::frontend::authorization::{Authorization, RequestMetadata, RequestOrMethod}; -use crate::jsonrpc::JsonRpcForwardedResponse; -use crate::jsonrpc::JsonRpcRequest; +use crate::jsonrpc::{self, JsonRpcRequest}; use crate::response_cache::JsonRpcResponseEnum; use axum::extract::ws::{CloseFrame, Message}; use deferred_rate_limiter::DeferredRateLimitResult; @@ -30,7 +29,7 @@ impl Web3ProxyApp { subscription_count: &'a AtomicU64, // TODO: taking a sender for Message instead of the exact json we are planning to send feels wrong, but its easier for now response_sender: mpsc::Sender, - ) -> Web3ProxyResult<(AbortHandle, JsonRpcForwardedResponse)> { + ) -> Web3ProxyResult<(AbortHandle, jsonrpc::ParsedResponse)> { let subscribe_to = jsonrpc_request .params .get(0) @@ -41,7 +40,10 @@ impl Web3ProxyApp { // anyone can subscribe to newHeads // only premium users are allowed to subscribe to the other things - if !(subscribe_to == "newHeads" || authorization.active_premium().await) { + if !(self.config.free_subscriptions + || subscribe_to == "newHeads" + || authorization.active_premium().await) + { return Err(Web3ProxyError::AccessDenied( "eth_subscribe for this event requires an active premium account".into(), )); @@ -214,10 +216,13 @@ impl Web3ProxyApp { let response_data = JsonRpcResponseEnum::from(json!(subscription_id)); - let response = JsonRpcForwardedResponse::from_response_data(response_data, id); + let response = jsonrpc::ParsedResponse::from_response_data(response_data, id); + // TODO: better way of passing in ParsedResponse + let response = jsonrpc::SingleResponse::Parsed(response); // TODO: this serializes twice request_metadata.add_response(&response); + let response = response.parsed().await.expect("Response already parsed"); // TODO: make a `SubscriptonHandle(AbortHandle, JoinHandle)` struct? Ok((subscription_abort_handle, response)) @@ -227,45 +232,44 @@ impl Web3ProxyApp { &self, request_metadata: &RequestMetadata, ) -> Option { - if let Some(authorization) = request_metadata.authorization.as_ref() { - if authorization.checks.rpc_secret_key_id.is_none() { - if let Some(rate_limiter) = &self.frontend_ip_rate_limiter { - match rate_limiter - .throttle( - authorization.ip, - authorization.checks.max_requests_per_period, - 1, - ) - .await - { - Ok(DeferredRateLimitResult::RetryNever) => { - let close_frame = CloseFrame { + let authorization = &request_metadata.authorization; + + if !authorization.active_premium().await { + if let Some(rate_limiter) = &self.frontend_public_rate_limiter { + match rate_limiter + .throttle( + authorization.ip, + authorization.checks.max_requests_per_period, + 1, + ) + .await + { + Ok(DeferredRateLimitResult::RetryNever) => { + let close_frame = CloseFrame { code: StatusCode::TOO_MANY_REQUESTS.as_u16(), reason: "rate limited. upgrade to premium for unlimited websocket messages" .into(), }; - return Some(Message::Close(Some(close_frame))); - } - Ok(DeferredRateLimitResult::RetryAt(retry_at)) => { - let retry_at = retry_at.duration_since(Instant::now()); + return Some(Message::Close(Some(close_frame))); + } + Ok(DeferredRateLimitResult::RetryAt(retry_at)) => { + let retry_at = retry_at.duration_since(Instant::now()); - let reason = format!("rate limited. upgrade to premium for unlimited websocket messages. retry in {}s", retry_at.as_secs_f32()); + let reason = format!("rate limited. upgrade to premium for unlimited websocket messages. retry in {}s", retry_at.as_secs_f32()); - let close_frame = CloseFrame { - code: StatusCode::TOO_MANY_REQUESTS.as_u16(), - reason: reason.into(), - }; + let close_frame = CloseFrame { + code: StatusCode::TOO_MANY_REQUESTS.as_u16(), + reason: reason.into(), + }; - return Some(Message::Close(Some(close_frame))); - } - Ok(_) => {} - Err(err) => { - // this an internal error of some kind, not the rate limit being hit - // TODO: i really want axum to do this for us in a single place. - error!("rate limiter is unhappy. allowing ip. err={:?}", err); - } + return Some(Message::Close(Some(close_frame))); + } + Ok(_) => {} + Err(err) => { + // this an internal error of some kind, not the rate limit being hit + error!(?err, "rate limiter is unhappy. allowing ip"); } } } diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index dface507..330fc5f2 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -85,10 +85,17 @@ pub struct AppConfig { /// pool of extra connections allowed for authenticated users #[serde_inline_default(0usize)] - pub bonus_user_concurrency: usize, + pub bonus_premium_concurrency: usize, /// pool of extra connections allowed for anonymous users #[serde_inline_default(0usize)] - pub bonus_ip_concurrency: usize, + pub bonus_public_concurrency: usize, + + /// pool of extra requests per second allowed for authenticaed users + #[serde_inline_default(0u64)] + pub bonus_frontend_public_rate_limit: u64, + + #[serde_inline_default(0u64)] + pub bonus_frontend_premium_rate_limit: u64, /// EVM chain id. 1 for ETH /// TODO: better type for chain_id? max of `u64::MAX / 2 - 36` diff --git a/web3_proxy/src/errors.rs b/web3_proxy/src/errors.rs index 2670f445..e7244cd9 100644 --- a/web3_proxy/src/errors.rs +++ b/web3_proxy/src/errors.rs @@ -1,7 +1,7 @@ //! Utlities for logging errors for admins and displaying errors to users. use crate::frontend::authorization::Authorization; -use crate::jsonrpc::{JsonRpcErrorData, JsonRpcForwardedResponse}; +use crate::jsonrpc::{self, JsonRpcErrorData, JsonRpcForwardedResponse}; use crate::response_cache::JsonRpcResponseEnum; use crate::rpcs::provider::EthersHttpProvider; use axum::extract::rejection::JsonRejection; @@ -19,6 +19,7 @@ use http::header::InvalidHeaderValue; use http::uri::InvalidUri; use ipnet::AddrParseError; use migration::sea_orm::DbErr; +use parking_lot::Mutex; use redis_rate_limiter::redis::RedisError; use redis_rate_limiter::RedisPoolError; use reqwest::header::ToStrError; @@ -127,6 +128,10 @@ pub enum Web3ProxyError { #[error(ignore)] #[display(fmt = "{:?}", _0)] JsonRpcResponse(JsonRpcResponseEnum>), + /// make it easy to skip caching streaming results + #[error(ignore)] + #[display(fmt = "{:?}", _0)] + StreamResponse(Mutex>), /// make it easy to skip caching null results NullJsonRpcResult, OriginRequired, @@ -184,6 +189,15 @@ pub enum Web3ProxyError { } impl Web3ProxyError { + pub fn as_json_response_parts( + &self, + id: Box, + ) -> (StatusCode, jsonrpc::SingleResponse) { + let (code, response_data) = self.as_response_parts(); + let response = jsonrpc::ParsedResponse::from_response_data(response_data, id); + (code, response.into()) + } + /// turn the error into an axum response. /// pub fn as_response_parts(&self) -> (StatusCode, JsonRpcResponseEnum>) { @@ -769,6 +783,10 @@ impl Web3ProxyError { // TODO: shame we have to clone, but its an Arc so its not terrible return (StatusCode::OK, response_enum.clone()); } + Self::StreamResponse(_resp) => { + // TODO: better way of doing this? + unreachable!("stream is pulled out, not used here"); + } Self::NullJsonRpcResult => { return (StatusCode::OK, JsonRpcResponseEnum::NullResult); } diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index 0a346867..1d4d5e06 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -4,9 +4,10 @@ use super::rpc_proxy_ws::ProxyMode; use crate::app::{Web3ProxyApp, APP_USER_AGENT}; use crate::balance::Balance; use crate::caches::RegisteredUserRateLimitKey; +use crate::compute_units::default_usd_per_cu; use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult}; use crate::globals::global_db_replica_conn; -use crate::jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest}; +use crate::jsonrpc::{self, JsonRpcParams, JsonRpcRequest}; use crate::rpcs::blockchain::Web3ProxyBlock; use crate::rpcs::one::Web3Rpc; use crate::stats::{AppStat, BackendRequests}; @@ -16,7 +17,7 @@ use axum::headers::authorization::Bearer; use axum::headers::{Header, Origin, Referer, UserAgent}; use chrono::Utc; use core::fmt; -use deferred_rate_limiter::DeferredRateLimitResult; +use deferred_rate_limiter::{DeferredRateLimitResult, DeferredRateLimiter}; use derivative::Derivative; use derive_more::From; use entities::{login, rpc_key, user, user_tier}; @@ -32,8 +33,9 @@ use rdkafka::message::{Header as KafkaHeader, OwnedHeaders as KafkaOwnedHeaders, use rdkafka::producer::{FutureProducer, FutureRecord}; use rdkafka::util::Timeout as KafkaTimeout; use redis_rate_limiter::redis::AsyncCommands; -use redis_rate_limiter::RedisRateLimitResult; +use redis_rate_limiter::{RedisRateLimitResult, RedisRateLimiter}; use serde::{Deserialize, Serialize}; +use serde_json::json; use std::borrow::Cow; use std::fmt::Debug; use std::fmt::Display; @@ -339,7 +341,7 @@ pub struct RequestMetadata { /// TODO: this is more complex than "requires a block older than X height". different types of data can be pruned differently pub archive_request: AtomicBool, - pub authorization: Option>, + pub authorization: Arc, pub chain_id: u64, @@ -394,10 +396,7 @@ impl Default for Authorization { impl RequestMetadata { pub fn proxy_mode(&self) -> ProxyMode { - self.authorization - .as_ref() - .map(|x| x.checks.proxy_mode) - .unwrap_or_default() + self.authorization.checks.proxy_mode } } @@ -443,7 +442,7 @@ impl<'a> From<&'a str> for RequestOrMethod<'a> { #[derive(From)] pub enum ResponseOrBytes<'a> { Json(&'a serde_json::Value), - Response(&'a JsonRpcForwardedResponse), + Response(&'a jsonrpc::SingleResponse), Error(&'a Web3ProxyError), Bytes(usize), } @@ -460,9 +459,7 @@ impl ResponseOrBytes<'_> { Self::Json(x) => serde_json::to_string(x) .expect("this should always serialize") .len(), - Self::Response(x) => serde_json::to_string(x) - .expect("this should always serialize") - .len(), + Self::Response(x) => x.num_bytes(), Self::Bytes(num_bytes) => *num_bytes, Self::Error(x) => { let (_, x) = x.as_response_parts(); @@ -518,7 +515,7 @@ impl RequestMetadata { let x = Self { archive_request: false.into(), - authorization: Some(authorization), + authorization, backend_requests: Default::default(), chain_id, error_response: false.into(), @@ -540,6 +537,51 @@ impl RequestMetadata { Arc::new(x) } + pub fn new_internal(chain_id: u64, method: &str, params: &P) -> Arc { + let authorization = Arc::new(Authorization::internal().unwrap()); + let request_ulid = Ulid::new(); + let method = method.to_string().into(); + + // TODO: how can we get this? + let stat_sender = None; + + // TODO: how can we do this efficiently? having to serialize sucks + let request_bytes = json!({ + "jsonrpc": "2.0", + "id": 1, + "method": method, + "params": params, + }) + .to_string() + .len(); + + // TODO: we should be getting this from config instead! + let usd_per_cu = default_usd_per_cu(chain_id); + + let x = Self { + archive_request: false.into(), + authorization, + backend_requests: Default::default(), + chain_id, + error_response: false.into(), + kafka_debug_logger: None, + method, + no_servers: 0.into(), + request_bytes, + request_ulid, + response_bytes: 0.into(), + response_from_backup_rpc: false.into(), + response_millis: 0.into(), + response_timestamp: 0.into(), + start_instant: Instant::now(), + stat_sender, + usd_per_cu, + user_error_response: false.into(), + }; + + Arc::new(x) + } + pub fn backend_rpcs_used(&self) -> Vec> { self.backend_requests.lock().clone() } @@ -584,7 +626,14 @@ impl RequestMetadata { if let Some(kafka_debug_logger) = self.kafka_debug_logger.as_ref() { if let ResponseOrBytes::Response(response) = response { - kafka_debug_logger.log_debug_response(response); + match response { + jsonrpc::SingleResponse::Parsed(response) => { + kafka_debug_logger.log_debug_response(response); + } + jsonrpc::SingleResponse::Stream(_) => { + warn!("need to handle streaming response debug logging"); + } + } } } } @@ -828,7 +877,7 @@ pub async fn ip_is_authorized( ) -> Web3ProxyResult<(Authorization, Option)> { // TODO: i think we could write an `impl From` for this // TODO: move this to an AuthorizedUser extrator - let (authorization, semaphore) = match app.rate_limit_by_ip(ip, origin, proxy_mode).await? { + let (authorization, semaphore) = match app.rate_limit_public(ip, origin, proxy_mode).await? { RateLimitResult::Allowed(authorization, semaphore) => (authorization, semaphore), RateLimitResult::RateLimited(authorization, retry_at) => { // TODO: in the background, emit a stat (maybe simplest to use a channel?) @@ -892,7 +941,7 @@ pub async fn key_is_authorized( // check the rate limits. error if over the limit // TODO: i think this should be in an "impl From" or "impl Into" let (authorization, semaphore) = match app - .rate_limit_by_rpc_key(ip, origin, proxy_mode, referer, rpc_key, user_agent) + .rate_limit_premium(ip, origin, proxy_mode, referer, rpc_key, user_agent) .await? { RateLimitResult::Allowed(authorization, semaphore) => (authorization, semaphore), @@ -944,7 +993,10 @@ pub async fn key_is_authorized( impl Web3ProxyApp { /// Limit the number of concurrent requests from the given ip address. - pub async fn ip_semaphore(&self, ip: &IpAddr) -> Web3ProxyResult> { + pub async fn permit_public_concurrency( + &self, + ip: &IpAddr, + ) -> Web3ProxyResult> { if let Some(max_concurrent_requests) = self.config.public_max_concurrent_requests { let semaphore = self .ip_semaphores @@ -974,11 +1026,13 @@ impl Web3ProxyApp { /// Limit the number of concurrent requests for a given user across all of their keys /// keep the semaphore alive until the user's request is entirely complete - pub async fn user_semaphore( + pub async fn permit_premium_concurrency( &self, - authorization_checks: &AuthorizationChecks, + authorization: &Authorization, ip: &IpAddr, ) -> Web3ProxyResult> { + let authorization_checks = &authorization.checks; + if let Some(max_concurrent_requests) = authorization_checks.max_concurrent_requests { let user_id = authorization_checks .user_id @@ -1042,7 +1096,7 @@ impl Web3ProxyApp { ip: IpAddr, proxy_mode: ProxyMode, ) -> Web3ProxyResult { - // TODO: dry this up with rate_limit_by_rpc_key? + // TODO: if ip is on the local network, always allow? // we don't care about user agent or origin or referer let authorization = Authorization::external( @@ -1054,46 +1108,20 @@ impl Web3ProxyApp { None, )?; - // no semaphore is needed here because login rate limits are low - // TODO: are we sure do not we want a semaphore here? - let semaphore = None; + let label = ip.to_string(); - // TODO: if ip is on the local network, always allow? - - if let Some(rate_limiter) = &self.login_rate_limiter { - match rate_limiter.throttle_label(&ip.to_string(), None, 1).await { - Ok(RedisRateLimitResult::Allowed(_)) => { - Ok(RateLimitResult::Allowed(authorization, semaphore)) - } - Ok(RedisRateLimitResult::RetryAt(retry_at, _)) => { - // TODO: set headers so they know when they can retry - // TODO: debug or trace? - // this is too verbose, but a stat might be good - // // trace!(?ip, "login rate limit exceeded until {:?}", retry_at); - - Ok(RateLimitResult::RateLimited(authorization, Some(retry_at))) - } - Ok(RedisRateLimitResult::RetryNever) => { - // TODO: i don't think we'll get here. maybe if we ban an IP forever? seems unlikely - // // trace!(?ip, "login rate limit is 0"); - Ok(RateLimitResult::RateLimited(authorization, None)) - } - Err(err) => { - // internal error, not rate limit being hit - // TODO: i really want axum to do this for us in a single place. - error!("login rate limiter is unhappy. allowing ip. err={:?}", err); - - Ok(RateLimitResult::Allowed(authorization, None)) - } - } - } else { - // TODO: if no redis, rate limit with a local cache? "warn!" probably isn't right - Ok(RateLimitResult::Allowed(authorization, None)) - } + redis_rate_limit( + &self.login_rate_limiter, + authorization, + None, + Some(&label), + None, + ) + .await } /// origin is included because it can override the default rate limits - pub async fn rate_limit_by_ip( + pub async fn rate_limit_public( &self, ip: &IpAddr, origin: Option<&Origin>, @@ -1119,44 +1147,38 @@ impl Web3ProxyApp { None, )?; - if let Some(rate_limiter) = &self.frontend_ip_rate_limiter { - match rate_limiter - .throttle(*ip, authorization.checks.max_requests_per_period, 1) - .await - { - Ok(DeferredRateLimitResult::Allowed) => { - // rate limit allowed us. check concurrent request limits - let semaphore = self.ip_semaphore(ip).await?; + if let Some(rate_limiter) = &self.frontend_public_rate_limiter { + let mut x = deferred_redis_rate_limit(authorization, *ip, None, rate_limiter).await?; - Ok(RateLimitResult::Allowed(authorization, semaphore)) - } - Ok(DeferredRateLimitResult::RetryAt(retry_at)) => { - // TODO: set headers so they know when they can retry - // // trace!(?ip, "rate limit exceeded until {:?}", retry_at); - Ok(RateLimitResult::RateLimited(authorization, Some(retry_at))) - } - Ok(DeferredRateLimitResult::RetryNever) => { - // TODO: i don't think we'll get here. maybe if we ban an IP forever? seems unlikely - // // trace!(?ip, "rate limit is 0"); - Ok(RateLimitResult::RateLimited(authorization, None)) - } - Err(err) => { - // this an internal error of some kind, not the rate limit being hit - // TODO: i really want axum to do this for us in a single place. - error!("rate limiter is unhappy. allowing ip. err={:?}", err); - - // at least we can still check the semaphore - let semaphore = self.ip_semaphore(ip).await?; - - Ok(RateLimitResult::Allowed(authorization, semaphore)) - } + if let RateLimitResult::RateLimited(authorization, retry_at) = x { + // we got rate limited, try bonus_frontend_public_rate_limiter + x = redis_rate_limit( + &self.bonus_frontend_public_rate_limiter, + authorization, + retry_at, + None, + None, + ) + .await?; } + + if let RateLimitResult::Allowed(a, b) = x { + debug_assert!(b.is_none()); + + let permit = self.permit_public_concurrency(ip).await?; + + x = RateLimitResult::Allowed(a, permit) + } + + debug_assert!(!matches!(x, RateLimitResult::UnknownKey)); + + Ok(x) } else { // no redis, but we can still check the ip semaphore - let semaphore = self.ip_semaphore(ip).await?; + let permit = self.permit_public_concurrency(ip).await?; // TODO: if no redis, rate limit with a local cache? "warn!" probably isn't right - Ok(RateLimitResult::Allowed(authorization, semaphore)) + Ok(RateLimitResult::Allowed(authorization, permit)) } } @@ -1323,8 +1345,8 @@ impl Web3ProxyApp { Ok(x) } - /// Authorized the ip/origin/referer/useragent and rate limit and concurrency - pub async fn rate_limit_by_rpc_key( + /// Authorize the key/ip/origin/referer/useragent and handle rate and concurrency limits + pub async fn rate_limit_premium( &self, ip: &IpAddr, origin: Option<&Origin>, @@ -1342,7 +1364,7 @@ impl Web3ProxyApp { // ?err, // "db is down. cannot check rpc key. fallback to ip rate limits" // ); - return self.rate_limit_by_ip(ip, origin, proxy_mode).await; + return self.rate_limit_public(ip, origin, proxy_mode).await; } return Err(err); @@ -1352,13 +1374,9 @@ impl Web3ProxyApp { // if no rpc_key_id matching the given rpc was found, then we can't rate limit by key if authorization_checks.rpc_secret_key_id.is_none() { trace!("unknown key. falling back to free limits"); - return self.rate_limit_by_ip(ip, origin, proxy_mode).await; + return self.rate_limit_public(ip, origin, proxy_mode).await; } - // only allow this rpc_key to run a limited amount of concurrent requests - // TODO: rate limit should be BEFORE the semaphore! - let semaphore = self.user_semaphore(&authorization_checks, ip).await?; - let authorization = Authorization::try_new( authorization_checks, ip, @@ -1370,47 +1388,61 @@ impl Web3ProxyApp { // user key is valid. now check rate limits if let Some(user_max_requests_per_period) = authorization.checks.max_requests_per_period { - if let Some(rate_limiter) = &self.frontend_registered_user_rate_limiter { - match rate_limiter - .throttle( - RegisteredUserRateLimitKey(authorization.checks.user_id, *ip), - Some(user_max_requests_per_period), - 1, - ) - .await - { - Ok(DeferredRateLimitResult::Allowed) => { - return Ok(RateLimitResult::Allowed(authorization, semaphore)) - } - Ok(DeferredRateLimitResult::RetryAt(retry_at)) => { - // TODO: set headers so they know when they can retry - // TODO: debug or trace? - // this is too verbose, but a stat might be good - // TODO: keys are secrets! use the id instead - // TODO: emit a stat - // trace!(?rpc_key, "rate limit exceeded until {:?}", retry_at); - return Ok(RateLimitResult::RateLimited(authorization, Some(retry_at))); - } - Ok(DeferredRateLimitResult::RetryNever) => { - // TODO: keys are secret. don't log them! - // trace!(?rpc_key, "rate limit is 0"); - // TODO: emit a stat - return Ok(RateLimitResult::RateLimited(authorization, None)); - } - Err(err) => { - // internal error, not rate limit being hit - // TODO: i really want axum to do this for us in a single place. - error!(?err, "rate limiter is unhappy. allowing rpc_key"); + if let Some(rate_limiter) = &self.frontend_premium_rate_limiter { + let key = RegisteredUserRateLimitKey(authorization.checks.user_id, *ip); - return Ok(RateLimitResult::Allowed(authorization, semaphore)); - } + let mut x = deferred_redis_rate_limit( + authorization, + key, + Some(user_max_requests_per_period), + rate_limiter, + ) + .await?; + + if let RateLimitResult::RateLimited(authorization, retry_at) = x { + // rate limited by the user's key+ip. check to see if there are any limits available in the bonus premium pool + x = redis_rate_limit( + &self.bonus_frontend_premium_rate_limiter, + authorization, + retry_at, + None, + None, + ) + .await?; } + + if let RateLimitResult::RateLimited(authorization, retry_at) = x { + // premium got rate limited too. check the bonus public pool + x = redis_rate_limit( + &self.bonus_frontend_public_rate_limiter, + authorization, + retry_at, + None, + None, + ) + .await?; + } + + if let RateLimitResult::Allowed(a, b) = x { + debug_assert!(b.is_none()); + + // only allow this rpc_key to run a limited amount of concurrent requests + let permit = self.permit_premium_concurrency(&a, ip).await?; + + x = RateLimitResult::Allowed(a, permit) + } + + debug_assert!(!matches!(x, RateLimitResult::UnknownKey)); + + return Ok(x); } else { // TODO: if no redis, rate limit with just a local cache? } } - Ok(RateLimitResult::Allowed(authorization, semaphore)) + let permit = self.permit_premium_concurrency(&authorization, ip).await?; + + Ok(RateLimitResult::Allowed(authorization, permit)) } } @@ -1440,3 +1472,85 @@ impl Authorization { Ok((a, s)) } } + +/// this fails open! +/// this never includes a semaphore! if you want one, add it after this call +/// if `max_requests_per_period` is none, the limit in the authorization is used +pub async fn deferred_redis_rate_limit( + authorization: Authorization, + key: K, + max_requests_per_period: Option, + rate_limiter: &DeferredRateLimiter, +) -> Web3ProxyResult +where + K: Send + Sync + Copy + Clone + Display + Hash + Eq + PartialEq + 'static, +{ + let max_requests_per_period = + max_requests_per_period.or(authorization.checks.max_requests_per_period); + + let x = match rate_limiter.throttle(key, max_requests_per_period, 1).await { + Ok(DeferredRateLimitResult::Allowed) => RateLimitResult::Allowed(authorization, None), + Ok(DeferredRateLimitResult::RetryAt(retry_at)) => { + // TODO: set headers so they know when they can retry + // TODO: debug or trace? + // this is too verbose, but a stat might be good + // TODO: emit a stat + // trace!(?rpc_key, "rate limit exceeded until {:?}", retry_at); + RateLimitResult::RateLimited(authorization, Some(retry_at)) + } + Ok(DeferredRateLimitResult::RetryNever) => { + // TODO: keys are secret. don't log them! + // trace!(?rpc_key, "rate limit is 0"); + // TODO: emit a stat + RateLimitResult::RateLimited(authorization, None) + } + Err(err) => { + // internal error, not rate limit being hit + error!(?err, %key, "rate limiter is unhappy. allowing key"); + + RateLimitResult::Allowed(authorization, None) + } + }; + + Ok(x) +} + +/// this never includes a semaphore! if you want one, add it after this call +/// if `max_requests_per_period` is none, the limit in the authorization is used +pub async fn redis_rate_limit( + rate_limiter: &Option, + authorization: Authorization, + mut retry_at: Option, + label: Option<&str>, + max_requests_per_period: Option, +) -> Web3ProxyResult { + let max_requests_per_period = + max_requests_per_period.or(authorization.checks.max_requests_per_period); + + let x = if let Some(rate_limiter) = rate_limiter { + match rate_limiter + .throttle_label(label.unwrap_or_default(), max_requests_per_period, 1) + .await + { + Ok(RedisRateLimitResult::Allowed(..)) => RateLimitResult::Allowed(authorization, None), + Ok(RedisRateLimitResult::RetryAt(new_retry_at, ..)) => { + retry_at = retry_at.min(Some(new_retry_at)); + + RateLimitResult::RateLimited(authorization, retry_at) + } + Ok(RedisRateLimitResult::RetryNever) => { + RateLimitResult::RateLimited(authorization, retry_at) + } + Err(err) => { + // this an internal error of some kind, not the rate limit being hit + error!("rate limiter is unhappy. allowing ip. err={:?}", err); + + RateLimitResult::Allowed(authorization, None) + } + } + } else { + RateLimitResult::Allowed(authorization, None) + }; + + Ok(x) +} diff --git a/web3_proxy/src/frontend/rpc_proxy_http.rs b/web3_proxy/src/frontend/rpc_proxy_http.rs index 7bd43894..b5eb40d4 100644 --- a/web3_proxy/src/frontend/rpc_proxy_http.rs +++ b/web3_proxy/src/frontend/rpc_proxy_http.rs @@ -85,7 +85,7 @@ async fn _proxy_web3_rpc( .await .map_err(|e| e.into_response_with_id(first_id))?; - let mut response = (status_code, Json(response)).into_response(); + let mut response = (status_code, response).into_response(); // TODO: DRY this up. it is the same code for public and private queries let response_headers = response.headers_mut(); @@ -280,7 +280,7 @@ async fn _proxy_web3_rpc_with_key( .await .map_err(|e| e.into_response_with_id(first_id))?; - let mut response = (status_code, Json(response)).into_response(); + let mut response = (status_code, response).into_response(); let headers = response.headers_mut(); diff --git a/web3_proxy/src/frontend/rpc_proxy_ws.rs b/web3_proxy/src/frontend/rpc_proxy_ws.rs index 659b0e9e..7602b6c4 100644 --- a/web3_proxy/src/frontend/rpc_proxy_ws.rs +++ b/web3_proxy/src/frontend/rpc_proxy_ws.rs @@ -4,11 +4,11 @@ use super::authorization::{ip_is_authorized, key_is_authorized, Authorization, RequestMetadata}; use crate::errors::{Web3ProxyError, Web3ProxyResponse}; -use crate::jsonrpc::JsonRpcId; +use crate::jsonrpc::{self, JsonRpcId}; use crate::{ app::Web3ProxyApp, errors::Web3ProxyResult, - jsonrpc::{JsonRpcForwardedResponse, JsonRpcForwardedResponseEnum, JsonRpcRequest}, + jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest}, }; use axum::headers::{Origin, Referer, UserAgent}; use axum::{ @@ -323,11 +323,11 @@ async fn websocket_proxy_web3_rpc( response_sender: &mpsc::Sender, subscription_count: &AtomicU64, subscriptions: &AsyncRwLock>, -) -> (Box, Web3ProxyResult) { +) -> (Box, Web3ProxyResult) { let response_id = json_request.id.clone(); // TODO: move this to a seperate function so we can use the try operator - let response: Web3ProxyResult = match &json_request.method[..] { + let response: Web3ProxyResult = match &json_request.method[..] { "eth_subscribe" => { // TODO: how can we subscribe with proxy_mode? match app @@ -340,7 +340,10 @@ async fn websocket_proxy_web3_rpc( .await { Ok((handle, response)) => { - if let Some(subscription_id) = response.result.clone() { + if let jsonrpc::Payload::Success { + result: ref subscription_id, + } = response.payload + { let mut x = subscriptions.write().await; let key: U64 = serde_json::from_str(subscription_id.get()).unwrap(); @@ -389,9 +392,12 @@ async fn websocket_proxy_web3_rpc( }; let response = - JsonRpcForwardedResponse::from_value(json!(partial_response), response_id.clone()); + jsonrpc::ParsedResponse::from_value(json!(partial_response), response_id.clone()); + // TODO: better way of passing in ParsedResponse + let response = jsonrpc::SingleResponse::Parsed(response); request_metadata.add_response(&response); + let response = response.parsed().await.expect("Response already parsed"); Ok(response.into()) } @@ -441,7 +447,7 @@ async fn handle_socket_payload( }; let response_str = match response { - Ok(x) => serde_json::to_string(&x).expect("to_string should always work here"), + Ok(x) => x.to_json_string().await?, Err(err) => { let (_, response_data) = err.as_response_parts(); diff --git a/web3_proxy/src/frontend/streaming.rs b/web3_proxy/src/frontend/streaming.rs new file mode 100644 index 00000000..f3eef9e5 --- /dev/null +++ b/web3_proxy/src/frontend/streaming.rs @@ -0,0 +1,40 @@ +use axum::{body::BoxBody, response::IntoResponse}; +use bytes::Bytes; +use futures::StreamExt; +use http::Response; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::stream::Stream; + +struct SizingBody { + inner: B, + request_metadata: RequestMetadata, +} + +impl SizingBody { + fn new(inner: B) -> Self { + Self { inner, size: 0 } + } +} + +impl Stream for SizingBody +where + B: Stream>> + Unpin, +{ + type Item = Result>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match Pin::new(&mut self.inner).poll_next(cx) { + Poll::Ready(Some(Ok(chunk))) => { + self.size += chunk.len(); + Poll::Ready(Some(Ok(chunk))) + } + Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))), + Poll::Ready(None) => { + println!("Final response size: {}", self.size); + Poll::Ready(None) + } + Poll::Pending => Poll::Pending, + } + } +} diff --git a/web3_proxy/src/jsonrpc.rs b/web3_proxy/src/jsonrpc.rs index 17f87987..dbc4510e 100644 --- a/web3_proxy/src/jsonrpc.rs +++ b/web3_proxy/src/jsonrpc.rs @@ -1,9 +1,11 @@ -use crate::app::Web3ProxyApp; -use crate::errors::Web3ProxyError; -use crate::frontend::authorization::{Authorization, RequestMetadata, RequestOrMethod}; -use crate::response_cache::JsonRpcResponseEnum; -use axum::response::Response; +use axum::body::StreamBody; +use axum::response::{IntoResponse, Response as AxumResponse}; +use axum::Json; +use bytes::{Bytes, BytesMut}; use derive_more::From; +use ethers::providers::ProviderError; +use futures_util::stream::{self, StreamExt}; +use futures_util::TryStreamExt; use serde::de::{self, Deserializer, MapAccess, SeqAccess, Visitor}; use serde::{Deserialize, Serialize}; use serde_inline_default::serde_inline_default; @@ -11,13 +13,367 @@ use serde_json::json; use serde_json::value::{to_raw_value, RawValue}; use std::borrow::Cow; use std::fmt; +use std::marker::PhantomData; use std::sync::{atomic, Arc}; use std::time::Duration; use tokio::time::sleep; +use crate::app::Web3ProxyApp; +use crate::errors::{Web3ProxyError, Web3ProxyResult}; +use crate::frontend::authorization::{Authorization, RequestMetadata, RequestOrMethod}; +use crate::response_cache::JsonRpcResponseEnum; + pub trait JsonRpcParams = fmt::Debug + serde::Serialize + Send + Sync + 'static; pub trait JsonRpcResultData = serde::Serialize + serde::de::DeserializeOwned + fmt::Debug + Send; +// TODO: borrow values to avoid allocs if possible +#[derive(Debug, Serialize)] +pub struct ParsedResponse> { + jsonrpc: String, + id: Option>, + #[serde(flatten)] + pub payload: Payload, +} + +impl ParsedResponse { + pub fn from_value(value: serde_json::Value, id: Box) -> Self { + let result = serde_json::value::to_raw_value(&value) + .expect("this should not fail") + .into(); + Self::from_result(result, Some(id)) + } +} + +impl ParsedResponse> { + pub fn from_response_data(data: JsonRpcResponseEnum>, id: Box) -> Self { + match data { + JsonRpcResponseEnum::NullResult => { + let x: Box = Default::default(); + Self::from_result(Arc::from(x), Some(id)) + } + JsonRpcResponseEnum::RpcError { error_data, .. } => Self::from_error(error_data, id), + JsonRpcResponseEnum::Result { value, .. } => Self::from_result(value, Some(id)), + } + } +} + +impl ParsedResponse { + pub fn from_result(result: T, id: Option>) -> Self { + Self { + jsonrpc: "2.0".to_string(), + id, + payload: Payload::Success { result }, + } + } + + pub fn from_error(error: JsonRpcErrorData, id: Box) -> Self { + Self { + jsonrpc: "2.0".to_string(), + id: Some(id), + payload: Payload::Error { error }, + } + } + + pub fn result(&self) -> Option<&T> { + match &self.payload { + Payload::Success { result } => Some(result), + Payload::Error { .. } => None, + } + } + + pub fn into_result(self) -> Web3ProxyResult { + match self.payload { + Payload::Success { result } => Ok(result), + Payload::Error { error } => Err(Web3ProxyError::JsonRpcErrorData(error)), + } + } +} + +impl<'de, T> Deserialize<'de> for ParsedResponse +where + T: de::DeserializeOwned, +{ + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + struct ResponseVisitor(PhantomData); + impl<'de, T> de::Visitor<'de> for ResponseVisitor + where + T: de::DeserializeOwned, + { + type Value = ParsedResponse; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("a valid jsonrpc 2.0 response object") + } + + fn visit_map(self, mut map: A) -> Result + where + A: de::MapAccess<'de>, + { + let mut jsonrpc = None; + + // response & error + let mut id = None; + // only response + let mut result = None; + // only error + let mut error = None; + + while let Some(key) = map.next_key()? { + match key { + "jsonrpc" => { + if jsonrpc.is_some() { + return Err(de::Error::duplicate_field("jsonrpc")); + } + + let value = map.next_value()?; + if value != "2.0" { + return Err(de::Error::invalid_value( + de::Unexpected::Str(value), + &"2.0", + )); + } + + jsonrpc = Some(value); + } + "id" => { + if id.is_some() { + return Err(de::Error::duplicate_field("id")); + } + + let value: Box = map.next_value()?; + id = Some(value); + } + "result" => { + if result.is_some() { + return Err(de::Error::duplicate_field("result")); + } + + let value: T = map.next_value()?; + result = Some(value); + } + "error" => { + if error.is_some() { + return Err(de::Error::duplicate_field("Error")); + } + + let value: JsonRpcErrorData = map.next_value()?; + error = Some(value); + } + key => { + return Err(de::Error::unknown_field( + key, + &["jsonrpc", "id", "result", "error"], + )); + } + } + } + + // jsonrpc version must be present in all responses + let jsonrpc = jsonrpc + .ok_or_else(|| de::Error::missing_field("jsonrpc"))? + .to_string(); + + let payload = match (result, error) { + (Some(result), None) => Payload::Success { result }, + (None, Some(error)) => Payload::Error { error }, + _ => { + return Err(de::Error::custom( + "response must be either a success or error object", + )) + } + }; + + Ok(ParsedResponse { + jsonrpc, + id, + payload, + }) + } + } + + deserializer.deserialize_map(ResponseVisitor(PhantomData)) + } +} + +#[derive(Debug, Deserialize, Serialize)] +#[serde(untagged)] +pub enum Payload { + Success { result: T }, + Error { error: JsonRpcErrorData }, +} + +#[derive(Debug)] +pub struct StreamResponse { + buffer: Bytes, + response: reqwest::Response, + request_metadata: Arc, +} + +impl StreamResponse { + // TODO: error handing + pub async fn read(self) -> Result, ProviderError> + where + T: de::DeserializeOwned, + { + let mut buffer = BytesMut::with_capacity(self.buffer.len()); + buffer.extend_from_slice(&self.buffer); + buffer.extend_from_slice(&self.response.bytes().await?); + let parsed = serde_json::from_slice(&buffer)?; + Ok(parsed) + } +} + +impl IntoResponse for StreamResponse { + fn into_response(self) -> axum::response::Response { + let stream = stream::once(async { Ok::<_, reqwest::Error>(self.buffer) }) + .chain(self.response.bytes_stream()) + .map_ok(move |x| { + let len = x.len(); + + self.request_metadata.add_response(len); + + x + }); + let body = StreamBody::new(stream); + body.into_response() + } +} + +#[derive(Debug)] +pub enum SingleResponse> { + Parsed(ParsedResponse), + Stream(StreamResponse), +} + +impl SingleResponse +where + T: de::DeserializeOwned + Serialize, +{ + // TODO: threshold from configs + // TODO: error handling + pub async fn read_if_short( + mut response: reqwest::Response, + nbytes: u64, + request_metadata: Arc, + ) -> Result, ProviderError> { + match response.content_length() { + // short + Some(len) if len <= nbytes => Ok(Self::from_bytes(response.bytes().await?)?), + // long + Some(_) => Ok(Self::Stream(StreamResponse { + buffer: Bytes::new(), + response, + request_metadata, + })), + None => { + let mut buffer = BytesMut::new(); + while (buffer.len() as u64) < nbytes { + match response.chunk().await? { + Some(chunk) => { + buffer.extend_from_slice(&chunk); + } + None => return Ok(Self::from_bytes(buffer.freeze())?), + } + } + let buffer = buffer.freeze(); + Ok(Self::Stream(StreamResponse { + buffer, + response, + request_metadata, + })) + } + } + } + + fn from_bytes(buf: Bytes) -> Result { + let val = serde_json::from_slice(&buf)?; + Ok(Self::Parsed(val)) + } + + // TODO: error handling + pub async fn parsed(self) -> Result, ProviderError> { + match self { + Self::Parsed(resp) => Ok(resp), + Self::Stream(resp) => resp.read().await, + } + } + + pub fn num_bytes(&self) -> usize { + match self { + Self::Parsed(response) => serde_json::to_string(response) + .expect("this should always serialize") + .len(), + Self::Stream(response) => match response.response.content_length() { + Some(len) => len as usize, + None => 0, + }, + } + } +} + +impl From> for SingleResponse { + fn from(response: ParsedResponse) -> Self { + Self::Parsed(response) + } +} + +impl IntoResponse for SingleResponse +where + T: Serialize, +{ + fn into_response(self) -> axum::response::Response { + match self { + Self::Parsed(resp) => Json(resp).into_response(), + Self::Stream(resp) => resp.into_response(), + } + } +} + +#[derive(Debug)] +pub enum Response> { + Single(SingleResponse), + Batch(Vec>), +} + +impl Response> { + pub async fn to_json_string(self) -> Result { + let x = match self { + Self::Single(resp) => { + // TODO: handle streaming differently? + let parsed = resp.parsed().await?; + + serde_json::to_string(&parsed) + } + Self::Batch(resps) => serde_json::to_string(&resps), + }; + + let x = x.expect("to_string should always work"); + + Ok(x) + } +} + +impl From> for Response { + fn from(response: ParsedResponse) -> Self { + Self::Single(SingleResponse::Parsed(response)) + } +} + +impl IntoResponse for Response +where + T: Serialize, +{ + fn into_response(self) -> axum::response::Response { + match self { + Self::Single(resp) => resp.into_response(), + Self::Batch(resps) => Json(resps).into_response(), + } + } +} + // TODO: &str here instead of String should save a lot of allocations // TODO: generic type for params? #[serde_inline_default] @@ -120,7 +476,7 @@ impl JsonRpcRequestEnum { app: &Web3ProxyApp, authorization: &Arc, duration: Duration, - ) -> Result<(), Response> { + ) -> Result<(), AxumResponse> { let err_id = match self.validate() { None => return Ok(()), Some(x) => x, @@ -269,6 +625,14 @@ pub struct JsonRpcErrorData { pub data: Option, } +impl JsonRpcErrorData { + pub fn num_bytes(&self) -> usize { + serde_json::to_string(self) + .expect("should always serialize") + .len() + } +} + impl From<&'static str> for JsonRpcErrorData { fn from(value: &'static str) -> Self { Self { @@ -397,6 +761,26 @@ pub enum JsonRpcForwardedResponseEnum { mod tests { use super::*; + #[test] + fn deserialize_response() { + let json = r#"{"jsonrpc":"2.0","id":null,"result":100}"#; + let obj: ParsedResponse = serde_json::from_str(json).unwrap(); + assert!(matches!(obj.payload, Payload::Success { .. })); + } + + #[test] + fn serialize_response() { + let obj = ParsedResponse { + jsonrpc: "2.0".to_string(), + id: None, + payload: Payload::Success { + result: serde_json::value::RawValue::from_string("100".to_string()).unwrap(), + }, + }; + let json = serde_json::to_string(&obj).unwrap(); + assert_eq!(json, r#"{"jsonrpc":"2.0","id":null,"result":100}"#); + } + #[test] fn this_deserialize_single() { let input = r#"{"jsonrpc":"2.0","method":"eth_blockNumber","params":[],"id":1}"#; diff --git a/web3_proxy/src/response_cache.rs b/web3_proxy/src/response_cache.rs index 2b445b4c..f033db4b 100644 --- a/web3_proxy/src/response_cache.rs +++ b/web3_proxy/src/response_cache.rs @@ -1,4 +1,8 @@ -use crate::{block_number::BlockNumAndHash, errors::Web3ProxyError, jsonrpc::JsonRpcErrorData}; +use crate::{ + block_number::BlockNumAndHash, + errors::{Web3ProxyError, Web3ProxyResult}, + jsonrpc::{self, JsonRpcErrorData}, +}; use derive_more::From; use ethers::{ providers::{HttpClientError, JsonRpcError, ProviderError, WsClientError}, @@ -6,6 +10,7 @@ use ethers::{ }; use hashbrown::hash_map::DefaultHashBuilder; use moka::future::Cache; +use parking_lot::Mutex; use serde_json::value::RawValue; use std::{ hash::{BuildHasher, Hash, Hasher}, @@ -135,6 +140,35 @@ impl JsonRpcResponseEnum> { } } +impl TryFrom> for JsonRpcResponseEnum> { + type Error = Web3ProxyError; + fn try_from(response: Web3ProxyResult) -> Result { + match response { + Ok(jsonrpc::SingleResponse::Parsed(parsed)) => match parsed.payload { + jsonrpc::Payload::Success { result } => { + let num_bytes = result.get().len() as u32; + Ok(JsonRpcResponseEnum::Result { + value: result, + num_bytes, + }) + } + jsonrpc::Payload::Error { error } => { + let num_bytes = error.num_bytes() as u32; + Ok(JsonRpcResponseEnum::RpcError { + error_data: error, + // TODO: this double serializes + num_bytes, + }) + } + }, + Ok(jsonrpc::SingleResponse::Stream(stream)) => { + Err(Web3ProxyError::StreamResponse(Mutex::new(Some(stream)))) + } + Err(err) => err.try_into(), + } + } +} + impl From for JsonRpcResponseEnum> { fn from(value: serde_json::Value) -> Self { let value = RawValue::from_string(value.to_string()).unwrap(); diff --git a/web3_proxy/src/rpcs/consensus.rs b/web3_proxy/src/rpcs/consensus.rs index 5597c50f..8d329d3f 100644 --- a/web3_proxy/src/rpcs/consensus.rs +++ b/web3_proxy/src/rpcs/consensus.rs @@ -108,20 +108,6 @@ pub struct RankedRpcs { } impl RankedRpcs { - // /// useful when Web3Rpcs does not track the head block - // pub fn from_all(rpcs: &Web3Rpcs) -> Self { - // let inner = vec![( - // RpcRanking::default(), - // rpcs.by_name - // .read() - // .values() - // .cloned() - // .collect::>>(), - // )]; - - // todo!() - // } - pub fn from_votes( min_synced_rpcs: usize, min_sum_soft_limit: u32, @@ -915,11 +901,3 @@ impl ConsensusFinder { .max() } } - -#[cfg(test)] -mod test { - // #[test] - // fn test_simplest_case_consensus_head_connections() { - // todo!(); - // } -} diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index 8178399c..c1746bc3 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -6,16 +6,16 @@ use super::request::{OpenRequestHandle, OpenRequestResult, RequestErrorHandler}; use crate::app::{flatten_handle, Web3ProxyApp, Web3ProxyJoinHandle}; use crate::config::{average_block_interval, BlockAndRpc, Web3RpcConfig}; use crate::errors::{Web3ProxyError, Web3ProxyResult}; -use crate::frontend::authorization::{Authorization, RequestMetadata}; +use crate::frontend::authorization::RequestMetadata; use crate::frontend::rpc_proxy_ws::ProxyMode; use crate::frontend::status::MokaCacheSerializer; -use crate::jsonrpc::{JsonRpcErrorData, JsonRpcParams, JsonRpcResultData}; +use crate::jsonrpc::{self, JsonRpcErrorData, JsonRpcParams, JsonRpcResultData}; use counter::Counter; use derive_more::From; use ethers::prelude::{TxHash, U64}; use futures::future::try_join_all; use futures::stream::FuturesUnordered; -use futures::{StreamExt, TryFutureExt}; +use futures::StreamExt; use hashbrown::HashMap; use itertools::Itertools; use moka::future::CacheBuilder; @@ -371,7 +371,7 @@ impl Web3Rpcs { method: &str, params: &P, max_wait: Option, - ) -> Result, Web3ProxyError> { + ) -> Result, Web3ProxyError> { // TODO: if only 1 active_request_handles, do self.try_send_request? let max_wait = max_wait.unwrap_or_else(|| Duration::from_secs(300)); @@ -380,20 +380,23 @@ impl Web3Rpcs { let responses = active_request_handles .into_iter() .map(|active_request_handle| async move { - let result: Result, Web3ProxyError>, Web3ProxyError> = - timeout( - max_wait, - active_request_handle - .request(method, &json!(¶ms)) - .map_err(Web3ProxyError::EthersProvider), - ) + let result: Result, Web3ProxyError>, Web3ProxyError> = + timeout(max_wait, async { + match active_request_handle.request(method, &json!(¶ms)).await { + Ok(response) => match response.parsed().await { + Ok(parsed) => parsed.into_result(), + Err(err) => Err(Web3ProxyError::EthersProvider(err)), + }, + Err(err) => Err(Web3ProxyError::EthersProvider(err)), + } + }) .await .map_err(Web3ProxyError::from); result.flatten() }) .collect::>() - .collect::, Web3ProxyError>>>() + .collect::, Web3ProxyError>>>() .await; // TODO: Strings are not great keys, but we can't use RawValue or ProviderError as keys because they don't implement Hash or Eq @@ -443,7 +446,7 @@ impl Web3Rpcs { async fn _best_available_rpc( &self, - authorization: &Arc, + request_metadata: &Arc, error_handler: Option, potential_rpcs: &[Arc], skip: &mut Vec>, @@ -463,7 +466,7 @@ impl Web3Rpcs { // just because it has lower latency doesn't mean we are sure to get a connection. there might be rate limits // TODO: what error_handler? match faster_rpc - .try_request_handle(authorization, error_handler) + .try_request_handle(request_metadata, error_handler) .await { Ok(OpenRequestResult::Handle(handle)) => { @@ -503,7 +506,7 @@ impl Web3Rpcs { #[instrument(level = "trace")] pub async fn wait_for_best_rpc( &self, - request_metadata: Option<&Arc>, + request_metadata: &Arc, skip_rpcs: &mut Vec>, min_block_needed: Option<&U64>, max_block_needed: Option<&U64>, @@ -514,18 +517,13 @@ impl Web3Rpcs { let mut earliest_retry_at: Option = None; - // TODO: pass db_conn to the "default" authorization for revert logging - let authorization = request_metadata - .and_then(|x| x.authorization.clone()) - .unwrap_or_default(); - if self.watch_head_block.is_none() { // if this group of servers is not watching the head block, we don't know what is "best" based on block height // TODO: do this without cloning here let potential_rpcs = self.by_name.read().values().cloned().collect::>(); let x = self - ._best_available_rpc(&authorization, error_handler, &potential_rpcs, skip_rpcs) + ._best_available_rpc(request_metadata, error_handler, &potential_rpcs, skip_rpcs) .await; return Ok(x); @@ -569,7 +567,7 @@ impl Web3Rpcs { match self ._best_available_rpc( - &authorization, + request_metadata, error_handler, &potential_rpcs, skip_rpcs, @@ -632,9 +630,7 @@ impl Web3Rpcs { potential_rpcs.clear(); } - if let Some(request_metadata) = request_metadata { - request_metadata.no_servers.fetch_add(1, Ordering::Relaxed); - } + request_metadata.no_servers.fetch_add(1, Ordering::Relaxed); if let Some(retry_at) = earliest_retry_at { // TODO: log the server that retry_at came from @@ -660,7 +656,7 @@ impl Web3Rpcs { // TODO: this is broken pub async fn all_connections( &self, - request_metadata: Option<&Arc>, + request_metadata: &Arc, min_block_needed: Option<&U64>, max_block_needed: Option<&U64>, max_count: Option, @@ -691,10 +687,6 @@ impl Web3Rpcs { trace!("all_rpcs: {:#?}", all_rpcs); - let authorization = request_metadata - .and_then(|x| x.authorization.clone()) - .unwrap_or_default(); - for rpc in all_rpcs { trace!("trying {}", rpc); @@ -714,7 +706,7 @@ impl Web3Rpcs { } // check rate limits and increment our connection counter - match rpc.try_request_handle(&authorization, error_level).await { + match rpc.try_request_handle(request_metadata, error_level).await { Ok(OpenRequestResult::RetryAt(retry_at)) => { // this rpc is not available. skip it trace!("{} is rate limited. skipping", rpc); @@ -752,9 +744,17 @@ impl Web3Rpcs { params: &P, max_wait: Option, ) -> Web3ProxyResult { - // TODO: no request_metadata means we won't have stats on this internal request. - self.request_with_metadata(method, params, None, max_wait, None, None) - .await + let request_metadata = RequestMetadata::new_internal(self.chain_id, method, params); + + let response = self + .request_with_metadata(method, params, &request_metadata, max_wait, None, None) + .await?; + let parsed = response.parsed().await?; + match parsed.payload { + jsonrpc::Payload::Success { result } => Ok(result), + // TODO: confirm this error type is correct + jsonrpc::Payload::Error { error } => Err(error.into()), + } } /// Make a request with stat tracking. @@ -762,11 +762,11 @@ impl Web3Rpcs { &self, method: &str, params: &P, - request_metadata: Option<&Arc>, + request_metadata: &Arc, max_wait: Option, min_block_needed: Option<&U64>, max_block_needed: Option<&U64>, - ) -> Web3ProxyResult { + ) -> Web3ProxyResult> { let mut skip_rpcs = vec![]; let mut method_not_available_response = None; @@ -804,28 +804,24 @@ impl Web3Rpcs { // TODO: look at backend_requests instead let rpc = active_request_handle.clone_connection(); - if let Some(request_metadata) = request_metadata { - request_metadata.backend_requests.lock().push(rpc.clone()); - } + request_metadata.backend_requests.lock().push(rpc.clone()); let is_backup_response = rpc.backup; match active_request_handle.request::(method, params).await { Ok(response) => { // TODO: if there are multiple responses being aggregated, this will only use the last server's backup type - if let Some(request_metadata) = request_metadata { - request_metadata - .response_from_backup_rpc - .store(is_backup_response, Ordering::Relaxed); + request_metadata + .response_from_backup_rpc + .store(is_backup_response, Ordering::Relaxed); - request_metadata - .user_error_response - .store(false, Ordering::Relaxed); + request_metadata + .user_error_response + .store(false, Ordering::Relaxed); - request_metadata - .error_response - .store(false, Ordering::Relaxed); - } + request_metadata + .error_response + .store(false, Ordering::Relaxed); return Ok(response); } @@ -833,25 +829,21 @@ impl Web3Rpcs { // TODO: if this is an error, do NOT return. continue to try on another server let error = match JsonRpcErrorData::try_from(&error) { Ok(x) => { - if let Some(request_metadata) = request_metadata { - request_metadata - .user_error_response - .store(true, Ordering::Relaxed); - } + request_metadata + .user_error_response + .store(true, Ordering::Relaxed); x } Err(err) => { warn!(?err, "error from {}", rpc); - if let Some(request_metadata) = request_metadata { - request_metadata - .error_response - .store(true, Ordering::Relaxed); + request_metadata + .error_response + .store(true, Ordering::Relaxed); - request_metadata - .user_error_response - .store(false, Ordering::Relaxed); - } + request_metadata + .user_error_response + .store(false, Ordering::Relaxed); last_provider_error = Some(error); @@ -980,9 +972,7 @@ impl Web3Rpcs { ); // TODO: have a separate column for rate limited? - if let Some(request_metadata) = request_metadata { - request_metadata.no_servers.fetch_add(1, Ordering::Relaxed); - } + request_metadata.no_servers.fetch_add(1, Ordering::Relaxed); select! { _ = sleep_until(retry_at) => { @@ -997,26 +987,22 @@ impl Web3Rpcs { } } OpenRequestResult::NotReady => { - if let Some(request_metadata) = request_metadata { - request_metadata - .error_response - .store(true, Ordering::Relaxed); - } + request_metadata + .error_response + .store(true, Ordering::Relaxed); break; } } } if let Some(err) = method_not_available_response { - if let Some(request_metadata) = request_metadata { - request_metadata - .error_response - .store(false, Ordering::Relaxed); + request_metadata + .error_response + .store(false, Ordering::Relaxed); - request_metadata - .user_error_response - .store(true, Ordering::Relaxed); - } + request_metadata + .user_error_response + .store(true, Ordering::Relaxed); // this error response is likely the user's fault // TODO: emit a stat for unsupported methods. then we can know what there is demand for or if we are missing a feature @@ -1092,13 +1078,13 @@ impl Web3Rpcs { self: &Arc, method: &str, params: &P, - request_metadata: Option<&Arc>, + request_metadata: &Arc, min_block_needed: Option<&U64>, max_block_needed: Option<&U64>, max_wait: Option, error_level: Option, max_sends: Option, - ) -> Web3ProxyResult> { + ) -> Web3ProxyResult> { let mut watch_consensus_rpcs = self.watch_ranked_rpcs.subscribe(); let start = Instant::now(); @@ -1121,26 +1107,24 @@ impl Web3Rpcs { .await { Ok(active_request_handles) => { - if let Some(request_metadata) = request_metadata { - let mut only_backups_used = true; + let mut only_backups_used = true; - request_metadata.backend_requests.lock().extend( - active_request_handles.iter().map(|x| { - let rpc = x.clone_connection(); + request_metadata.backend_requests.lock().extend( + active_request_handles.iter().map(|x| { + let rpc = x.clone_connection(); - if !rpc.backup { - // TODO: even if a backup is included, it is possible the response is still from a primary connection. think about this more - only_backups_used = false; - } + if !rpc.backup { + // TODO: even if a backup is included, it is possible the response is still from a primary connection. think about this more + only_backups_used = false; + } - rpc - }), - ); + rpc + }), + ); - request_metadata - .response_from_backup_rpc - .store(only_backups_used, Ordering::Relaxed); - } + request_metadata + .response_from_backup_rpc + .store(only_backups_used, Ordering::Relaxed); let x = self .try_send_parallel_requests( @@ -1161,10 +1145,8 @@ impl Web3Rpcs { "No servers in sync on! Retrying", ); - if let Some(request_metadata) = &request_metadata { - // TODO: if this times out, i think we drop this - request_metadata.no_servers.fetch_add(1, Ordering::Relaxed); - } + // TODO: if this times out, i think we drop this + request_metadata.no_servers.fetch_add(1, Ordering::Relaxed); let max_sleep = if let Some(max_wait) = max_wait { start + max_wait @@ -1188,9 +1170,7 @@ impl Web3Rpcs { } } Err(Some(retry_at)) => { - if let Some(request_metadata) = &request_metadata { - request_metadata.no_servers.fetch_add(1, Ordering::Relaxed); - } + request_metadata.no_servers.fetch_add(1, Ordering::Relaxed); if let Some(max_wait) = max_wait { if start.elapsed() > max_wait { @@ -1240,12 +1220,12 @@ impl Web3Rpcs { &self, method: &str, params: &P, - request_metadata: Option<&Arc>, + request_metadata: &Arc, max_wait: Option, min_block_needed: Option<&U64>, max_block_needed: Option<&U64>, - ) -> Web3ProxyResult { - let proxy_mode = request_metadata.map(|x| x.proxy_mode()).unwrap_or_default(); + ) -> Web3ProxyResult> { + let proxy_mode = request_metadata.proxy_mode(); match proxy_mode { ProxyMode::Debug | ProxyMode::Best => { @@ -1547,8 +1527,9 @@ mod tests { assert!(rpcs.head_block_hash().is_none()); // all_backend_connections gives all non-backup servers regardless of sync status + let m = Arc::new(RequestMetadata::default()); assert_eq!( - rpcs.all_connections(None, None, None, None, None) + rpcs.all_connections(&m, None, None, None, None) .await .unwrap() .len(), @@ -1556,9 +1537,10 @@ mod tests { ); // best_synced_backend_connection which servers to be synced with the head block should not find any nodes + let m = Arc::new(RequestMetadata::default()); let x = rpcs .wait_for_best_rpc( - None, + &m, &mut vec![], Some(head_block.number.as_ref().unwrap()), None, @@ -1651,9 +1633,10 @@ mod tests { assert!(!lagged_rpc.has_block_data(head_block.number.as_ref().unwrap())); // TODO: make sure the handle is for the expected rpc + let m = Arc::new(RequestMetadata::default()); assert!(matches!( rpcs.wait_for_best_rpc( - None, + &m, &mut vec![], None, None, @@ -1665,9 +1648,10 @@ mod tests { )); // TODO: make sure the handle is for the expected rpc + let m = Arc::new(RequestMetadata::default()); assert!(matches!( rpcs.wait_for_best_rpc( - None, + &m, &mut vec![], Some(&0.into()), None, @@ -1679,9 +1663,10 @@ mod tests { )); // TODO: make sure the handle is for the expected rpc + let m = Arc::new(RequestMetadata::default()); assert!(matches!( rpcs.wait_for_best_rpc( - None, + &m, &mut vec![], Some(&1.into()), None, @@ -1693,9 +1678,10 @@ mod tests { )); // future block should not get a handle + let m = Arc::new(RequestMetadata::default()); let future_rpc = rpcs .wait_for_best_rpc( - None, + &m, &mut vec![], Some(&2.into()), None, @@ -1801,9 +1787,10 @@ mod tests { // best_synced_backend_connection requires servers to be synced with the head block // TODO: test with and without passing the head_block.number? + let m = Arc::new(RequestMetadata::default()); let best_available_server = rpcs .wait_for_best_rpc( - None, + &m, &mut vec![], Some(head_block.number()), None, @@ -1819,9 +1806,10 @@ mod tests { OpenRequestResult::Handle(_) )); + let m = Arc::new(RequestMetadata::default()); let _best_available_server_from_none = rpcs .wait_for_best_rpc( - None, + &m, &mut vec![], None, None, @@ -1832,9 +1820,10 @@ mod tests { // assert_eq!(best_available_server, best_available_server_from_none); + let m = Arc::new(RequestMetadata::default()); let best_archive_server = rpcs .wait_for_best_rpc( - None, + &m, &mut vec![], Some(&1.into()), None, @@ -1961,8 +1950,9 @@ mod tests { // best_synced_backend_connection requires servers to be synced with the head block // TODO: test with and without passing the head_block.number? + let m = Arc::new(RequestMetadata::default()); let head_connections = rpcs - .all_connections(None, Some(block_2.number()), None, None, None) + .all_connections(&m, Some(block_2.number()), None, None, None) .await; debug!("head_connections: {:#?}", head_connections); @@ -1973,8 +1963,9 @@ mod tests { "wrong number of connections" ); + let m = Arc::new(RequestMetadata::default()); let all_connections = rpcs - .all_connections(None, Some(block_1.number()), None, None, None) + .all_connections(&m, Some(block_1.number()), None, None, None) .await; debug!("all_connections: {:#?}", all_connections); @@ -1985,7 +1976,8 @@ mod tests { "wrong number of connections" ); - let all_connections = rpcs.all_connections(None, None, None, None, None).await; + let m = Arc::new(RequestMetadata::default()); + let all_connections = rpcs.all_connections(&m, None, None, None, None).await; debug!("all_connections: {:#?}", all_connections); diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index 115972e7..b57216da 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -1,12 +1,12 @@ //! Rate-limited communication with a web3 provider. use super::blockchain::{ArcBlock, BlocksByHashCache, Web3ProxyBlock}; -use super::provider::{connect_http, connect_ws, EthersHttpProvider, EthersWsProvider}; +use super::provider::{connect_ws, EthersWsProvider}; use super::request::{OpenRequestHandle, OpenRequestResult}; use crate::app::{flatten_handle, Web3ProxyJoinHandle}; use crate::config::{BlockAndRpc, Web3RpcConfig}; use crate::errors::{Web3ProxyError, Web3ProxyErrorContext, Web3ProxyResult}; -use crate::frontend::authorization::Authorization; -use crate::jsonrpc::{JsonRpcParams, JsonRpcResultData}; +use crate::frontend::authorization::RequestMetadata; +use crate::jsonrpc::{self, JsonRpcParams, JsonRpcResultData}; use crate::rpcs::request::RequestErrorHandler; use anyhow::{anyhow, Context}; use arc_swap::ArcSwapOption; @@ -39,7 +39,8 @@ pub struct Web3Rpc { pub db_conn: Option, pub subscribe_txs: bool, /// most all requests prefer use the http_provider - pub(super) http_provider: Option, + pub(super) http_client: Option, + pub(super) http_url: Option, /// the websocket url is only used for subscriptions pub(super) ws_url: Option, /// the websocket provider is only used for subscriptions @@ -164,14 +165,13 @@ impl Web3Rpc { let median_request_latency = RollingQuantileLatency::spawn_median(1_000).await; - let http_provider = if let Some(http_url) = config.http_url { + let (http_url, http_client) = if let Some(http_url) = config.http_url { let http_url = http_url.parse::()?; - - Some(connect_http(http_url, http_client, block_interval)?) - - // TODO: check the provider is on the right chain + // TODO: double-check not missing anything from connect_http() + let http_client = http_client.unwrap_or_default(); + (Some(http_url), Some(http_client)) } else { - None + (None, None) }; let ws_url = if let Some(ws_url) = config.ws_url { @@ -194,7 +194,8 @@ impl Web3Rpc { hard_limit, hard_limit_until: Some(hard_limit_until), head_block: Some(head_block), - http_provider, + http_url, + http_client, max_head_block_age, name, peak_latency: Some(peak_latency), @@ -916,7 +917,7 @@ impl Web3Rpc { self.send_head_block_result(Ok(Some(block)), &block_sender, &block_map) .await?; } - } else if self.http_provider.is_some() { + } else if self.http_client.is_some() { // there is a "watch_blocks" function, but a lot of public nodes (including llamanodes) do not support the necessary rpc endpoints // TODO: is 1/2 the block time okay? let mut i = interval(self.block_interval / 2); @@ -960,7 +961,7 @@ impl Web3Rpc { pub async fn wait_for_request_handle( self: &Arc, - authorization: &Arc, + request_metadata: &Arc, max_wait: Option, error_handler: Option, ) -> Web3ProxyResult { @@ -969,7 +970,10 @@ impl Web3Rpc { let max_wait_until = max_wait.map(|x| Instant::now() + x); loop { - match self.try_request_handle(authorization, error_handler).await { + match self + .try_request_handle(request_metadata, error_handler) + .await + { Ok(OpenRequestResult::Handle(handle)) => return Ok(handle), Ok(OpenRequestResult::RetryAt(retry_at)) => { // TODO: emit a stat? @@ -1011,7 +1015,7 @@ impl Web3Rpc { pub async fn try_request_handle( self: &Arc, - authorization: &Arc, + request_metadata: &Arc, error_handler: Option, ) -> Web3ProxyResult { // TODO: if websocket is reconnecting, return an error? @@ -1062,7 +1066,7 @@ impl Web3Rpc { }; let handle = - OpenRequestHandle::new(authorization.clone(), self.clone(), error_handler).await; + OpenRequestHandle::new(request_metadata.clone(), self.clone(), error_handler).await; Ok(handle.into()) } @@ -1084,17 +1088,20 @@ impl Web3Rpc { self: &Arc, method: &str, params: &P, - authorization: &Arc, + request_metadata: &Arc, error_handler: Option, max_wait: Option, ) -> Web3ProxyResult { let handle = self - .wait_for_request_handle(authorization, max_wait, error_handler) + .wait_for_request_handle(request_metadata, max_wait, error_handler) .await?; - let x = handle.request::(method, params).await?; - - Ok(x) + let response = handle.request::(method, params).await?; + let parsed = response.parsed().await?; + match parsed.payload { + jsonrpc::Payload::Success { result } => Ok(result), + jsonrpc::Payload::Error { error } => Err(error.into()), + } } } @@ -1108,7 +1115,7 @@ impl Hash for Web3Rpc { self.name.hash(state); // TODO: url does NOT include the authorization data. i think created_at should protect us if auth changes without anything else - self.http_provider.as_ref().map(|x| x.url()).hash(state); + self.http_url.hash(state); // TODO: figure out how to get the url for the ws provider // self.ws_provider.map(|x| x.url()).hash(state); diff --git a/web3_proxy/src/rpcs/request.rs b/web3_proxy/src/rpcs/request.rs index 2b615734..54968f4d 100644 --- a/web3_proxy/src/rpcs/request.rs +++ b/web3_proxy/src/rpcs/request.rs @@ -1,8 +1,8 @@ use super::one::Web3Rpc; use crate::errors::{Web3ProxyErrorContext, Web3ProxyResult}; -use crate::frontend::authorization::{Authorization, AuthorizationType}; +use crate::frontend::authorization::{Authorization, AuthorizationType, RequestMetadata}; use crate::globals::{global_db_conn, DB_CONN}; -use crate::jsonrpc::{JsonRpcParams, JsonRpcResultData}; +use crate::jsonrpc::{self, JsonRpcParams, JsonRpcResultData}; use chrono::Utc; use derive_more::From; use entities::revert_log; @@ -30,7 +30,7 @@ pub enum OpenRequestResult { /// Opening this handle checks rate limits. Developers, try to keep opening a handle and using it as close together as possible #[derive(Debug)] pub struct OpenRequestHandle { - authorization: Arc, + request_metadata: Arc, error_handler: RequestErrorHandler, rpc: Arc, } @@ -133,7 +133,7 @@ impl Drop for OpenRequestHandle { impl OpenRequestHandle { pub async fn new( - authorization: Arc, + request_metadata: Arc, rpc: Arc, error_handler: Option, ) -> Self { @@ -146,7 +146,7 @@ impl OpenRequestHandle { let error_handler = error_handler.unwrap_or_default(); Self { - authorization, + request_metadata, error_handler, rpc, } @@ -169,13 +169,15 @@ impl OpenRequestHandle { self, method: &str, params: &P, - ) -> Result { + ) -> Result, ProviderError> { // TODO: use tracing spans // TODO: including params in this log is way too verbose // trace!(rpc=%self.rpc, %method, "request"); trace!("requesting from {}", self.rpc); - match self.authorization.authorization_type { + let authorization = &self.request_metadata.authorization; + + match &authorization.authorization_type { AuthorizationType::Frontend => { self.rpc .external_requests @@ -194,10 +196,37 @@ impl OpenRequestHandle { // TODO: replace ethers-rs providers with our own that supports streaming the responses // TODO: replace ethers-rs providers with our own that handles "id" being null - let response: Result = if let Some(ref p) = self.rpc.http_provider { - p.request(method, params).await + let response: Result, _> = if let ( + Some(ref url), + Some(ref client), + ) = + (&self.rpc.http_url, &self.rpc.http_client) + { + let params: serde_json::Value = serde_json::to_value(params)?; + let request = jsonrpc::JsonRpcRequest::new( + // TODO: proper id + jsonrpc::JsonRpcId::Number(1), + method.to_string(), + params, + ) + .expect("request creation cannot fail"); + match client.post(url.clone()).json(&request).send().await { + // TODO: threshold from configs + Ok(response) => { + jsonrpc::SingleResponse::read_if_short( + response, + 1024, + self.request_metadata.clone(), + ) + .await + } + Err(err) => Err(err.into()), + } } else if let Some(p) = self.rpc.ws_provider.load().as_ref() { - p.request(method, params).await + p.request(method, params) + .await + // TODO: Id here + .map(|result| jsonrpc::ParsedResponse::from_result(result, None).into()) } else { return Err(ProviderError::CustomError( "no provider configured!".to_string(), @@ -227,7 +256,8 @@ impl OpenRequestHandle { // trace!(%method, "skipping save on revert"); RequestErrorHandler::TraceLevel } else if DB_CONN.read().await.is_ok() { - let log_revert_chance = self.authorization.checks.log_revert_chance; + let log_revert_chance = + self.request_metadata.authorization.checks.log_revert_chance; if log_revert_chance == 0 { // trace!(%method, "no chance. skipping save on revert"); @@ -385,7 +415,7 @@ impl OpenRequestHandle { Ok(params) => { // spawn saving to the database so we don't slow down the request // TODO: log if this errors - let f = self.authorization.clone().save_revert(method, params.0 .0); + let f = authorization.clone().save_revert(method, params.0 .0); tokio::spawn(f); } diff --git a/web3_proxy/src/stats/mod.rs b/web3_proxy/src/stats/mod.rs index 3e2fb071..213dab68 100644 --- a/web3_proxy/src/stats/mod.rs +++ b/web3_proxy/src/stats/mod.rs @@ -539,13 +539,8 @@ impl BufferedRpcQueryStats { /// There are often multiple copies if a request is being sent to multiple servers in parallel impl RpcQueryStats { fn try_from_metadata(mut metadata: RequestMetadata) -> Web3ProxyResult { - let mut authorization = metadata.authorization.take(); - - if authorization.is_none() { - authorization = Some(Arc::new(Authorization::internal()?)); - } - - let authorization = authorization.expect("Authorization will always be set"); + // TODO: do this without a clone + let authorization = metadata.authorization.clone(); let archive_request = metadata.archive_request.load(Ordering::Relaxed); diff --git a/web3_proxy_cli/src/bin/wait_for_sync.rs b/web3_proxy_cli/src/bin/wait_for_sync.rs index bc53132f..4ac6c3dc 100644 --- a/web3_proxy_cli/src/bin/wait_for_sync.rs +++ b/web3_proxy_cli/src/bin/wait_for_sync.rs @@ -44,8 +44,6 @@ async fn main() -> anyhow::Result<()> { std::env::set_var("RUST_LOG", "wait_for_sync=debug"); } - // todo!("set up tracing"); - // this probably won't matter for us in docker, but better safe than sorry fdlimit::raise_fd_limit(); diff --git a/web3_proxy_cli/src/sub_commands/migrate_stats_to_v2.rs b/web3_proxy_cli/src/sub_commands/migrate_stats_to_v2.rs index 8358f570..f3fe88ed 100644 --- a/web3_proxy_cli/src/sub_commands/migrate_stats_to_v2.rs +++ b/web3_proxy_cli/src/sub_commands/migrate_stats_to_v2.rs @@ -187,7 +187,7 @@ impl MigrateStatsToV2SubCommand { // Create RequestMetadata let request_metadata = RequestMetadata { archive_request: x.archive_request.into(), - authorization: Some(authorization.clone()), + authorization: authorization.clone(), backend_requests: Mutex::new(backend_rpcs), chain_id, error_response: x.error_response.into(),