From 56fdf481298df7fa7d4e3678ccadae20418e01f1 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Mon, 24 Apr 2023 11:00:12 -0700 Subject: [PATCH] Send transactions to protected and public rpcs (#57) * move protected transactions into their own function and dry stats sending * cargo upgrade * comments * time to live instead of time to idle * minor workaround for eth_chainId * cargo upgrade --- Cargo.lock | 125 +++++----- entities/Cargo.toml | 2 +- migration/Cargo.toml | 2 +- redis-rate-limiter/Cargo.toml | 2 +- web3_proxy/Cargo.toml | 4 +- web3_proxy/src/app/mod.rs | 349 ++++++++++++++-------------- web3_proxy/src/bin/wait_for_sync.rs | 2 + web3_proxy/src/block_number.rs | 7 +- web3_proxy/src/rpcs/many.rs | 8 +- web3_proxy/src/rpcs/one.rs | 4 +- web3_proxy/src/rpcs/request.rs | 11 +- 11 files changed, 262 insertions(+), 254 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ae05a0fb..e1f6cfa2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -74,9 +74,9 @@ dependencies = [ [[package]] name = "aho-corasick" -version = "0.7.20" +version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc936419f96fa211c1b9166887b38e5e40b19958e5b895be7c1f93adec7071ac" +checksum = "67fc08ce920c31afb70f013dcce1bfc3a3195de6a228474e45e1f145b36f8d04" dependencies = [ "memchr", ] @@ -408,9 +408,9 @@ checksum = "a4a4ddaa51a5bc52a6948f74c06d20aaaddb71924eab79b8c97a8c556e942d6a" [[package]] name = "base64ct" -version = "1.5.3" +version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b645a089122eccb6111b4f81cbc1a49f5900ac4666bb93ac027feaecf15607bf" +checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" [[package]] name = "bech32" @@ -492,9 +492,9 @@ dependencies = [ [[package]] name = "block-buffer" -version = "0.10.3" +version = "0.10.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69cce20737498f97b993470a6e536b8523f0af7892a4f928cceb1ac5e52ebe7e" +checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71" dependencies = [ "generic-array", ] @@ -555,9 +555,9 @@ dependencies = [ [[package]] name = "bumpalo" -version = "3.12.0" +version = "3.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0d261e256854913907f67ed06efbc3338dfe6179796deefc1ff763fc1aee5535" +checksum = "9b1ce199063694f33ffb7dd4e0ee620741495c32833cde5aa08f02a0bf96f0c8" [[package]] name = "byte-slice-cast" @@ -567,19 +567,20 @@ checksum = "c3ac9f8b63eca6fd385229b3675f6cc0dc5c8a5c8a54a59d4f52ffd670d87b0c" [[package]] name = "bytecheck" -version = "0.6.9" +version = "0.6.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d11cac2c12b5adc6570dad2ee1b87eff4955dac476fe12d81e5fdd352e52406f" +checksum = "13fe11640a23eb24562225322cd3e452b93a3d4091d62fab69c70542fcd17d1f" dependencies = [ "bytecheck_derive", "ptr_meta", + "simdutf8", ] [[package]] name = "bytecheck_derive" -version = "0.6.9" +version = "0.6.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "13e576ebe98e605500b3c8041bb888e966653577172df6dd97398714eb30b9bf" +checksum = "e31225543cb46f81a7e224762764f4a6a0f097b1db0b175f69e8065efaa42de5" dependencies = [ "proc-macro2", "quote", @@ -817,21 +818,19 @@ dependencies = [ [[package]] name = "coins-bip39" -version = "0.8.5" +version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ad2a68a46b9d8cc90484f0689adc0e4c890eb215bf698ae52e5235bb88f40be7" +checksum = "84f4d04ee18e58356accd644896aeb2094ddeafb6a713e056cef0c0a8e468c15" dependencies = [ "bitvec 0.17.4", "coins-bip32", "getrandom", - "hex", "hmac", "once_cell", "pbkdf2 0.12.1", "rand", "sha2 0.10.6", "thiserror", - "tracing", ] [[package]] @@ -965,7 +964,7 @@ dependencies = [ "tonic", "tracing", "tracing-core", - "tracing-subscriber 0.3.16", + "tracing-subscriber 0.3.17", ] [[package]] @@ -1019,9 +1018,9 @@ dependencies = [ [[package]] name = "cpufeatures" -version = "0.2.5" +version = "0.2.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28d997bd5e24a5928dd43e46dc529867e207907fe0b239c3477d924f7f2ca320" +checksum = "3e4c1eaa2012c47becbbad2ab175484c2a84d1185b566fb2cc5b8707343dfe58" dependencies = [ "libc", ] @@ -1087,9 +1086,9 @@ dependencies = [ [[package]] name = "crossbeam-utils" -version = "0.8.14" +version = "0.8.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4fb766fa798726286dbbb842f174001dab8abc7b627a1dd86e0b7222a95d929f" +checksum = "3c063cd8cc95f5c377ed0d4b49a4b21f632396ff690e8470c29b3359b346984b" dependencies = [ "cfg-if", ] @@ -1270,9 +1269,9 @@ dependencies = [ [[package]] name = "deadpool-redis" -version = "0.11.1" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b8bde44cbfdf17ae5baa45c9f43073b320f1a19955389315629304a23909ad2" +checksum = "5f1760f60ffc6653b4afd924c5792098d8c00d9a3deb6b3d989eac17949dc422" dependencies = [ "deadpool", "redis", @@ -1387,7 +1386,7 @@ version = "0.10.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8168378f4e5023e7218c89c891c0fd8ecdb5e5e4f18cb78f38cf245dd021e76f" dependencies = [ - "block-buffer 0.10.3", + "block-buffer 0.10.4", "const-oid 0.9.2", "crypto-common", "subtle", @@ -3004,7 +3003,7 @@ dependencies = [ "petgraph", "pico-args", "regex", - "regex-syntax", + "regex-syntax 0.6.29", "string_cache", "term", "tiny-keccak", @@ -3031,9 +3030,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.141" +version = "0.2.142" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3304a64d199bb964be99741b7a14d26972741915b3649639149b2479bb46f4b5" +checksum = "6a987beff54b60ffa6d51982e1aa1146bc42f19bd26be28b0586f252fccf5317" [[package]] name = "libm" @@ -3576,9 +3575,9 @@ dependencies = [ [[package]] name = "ordered-float" -version = "3.6.0" +version = "3.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "13a384337e997e6860ffbaa83708b2ef329fd8c54cb67a5f64d421e0f943254f" +checksum = "2fc2dbde8f8a79f2102cc474ceb0ad68e3b80b85289ea62389b60e66777e4213" dependencies = [ "num-traits", ] @@ -3596,9 +3595,9 @@ dependencies = [ [[package]] name = "os_str_bytes" -version = "6.4.1" +version = "6.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b7820b9daea5457c9f21c69448905d723fbd21136ccf521748f23fd49e723ee" +checksum = "ceedf44fb00f2d1984b0bc98102627ce622e083e49a5bacdb3e514fa4238e267" [[package]] name = "ouroboros" @@ -4312,9 +4311,9 @@ dependencies = [ [[package]] name = "redis" -version = "0.22.3" +version = "0.23.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aa8455fa3621f6b41c514946de66ea0531f57ca017b2e6c7cc368035ea5b46df" +checksum = "3ea8c51b5dc1d8e5fd3350ec8167f464ec0995e79f2e90a075b63371500d557f" dependencies = [ "async-trait", "bytes", @@ -4370,13 +4369,13 @@ dependencies = [ [[package]] name = "regex" -version = "1.7.3" +version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b1f693b24f6ac912f4893ef08244d70b6067480d2f1a46e950c9691e6749d1d" +checksum = "af83e617f331cc6ae2da5443c602dfa5af81e517212d9d611a5b3ba1777b5370" dependencies = [ "aho-corasick", "memchr", - "regex-syntax", + "regex-syntax 0.7.1", ] [[package]] @@ -4385,7 +4384,7 @@ version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" dependencies = [ - "regex-syntax", + "regex-syntax 0.6.29", ] [[package]] @@ -4394,6 +4393,12 @@ version = "0.6.29" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" +[[package]] +name = "regex-syntax" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5996294f19bd3aae0453a862ad728f60e6600695733dd5df01da90c54363a3c" + [[package]] name = "rend" version = "0.4.0" @@ -4683,9 +4688,9 @@ dependencies = [ [[package]] name = "rustversion" -version = "1.0.11" +version = "1.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5583e89e108996506031660fe09baa5011b9dd0341b89029313006d1fb508d70" +checksum = "4f3208ce4d8448b3f3e7d168a73f5e0c43a61e32930de3bceeccedb388b6bf06" [[package]] name = "ryu" @@ -4795,9 +4800,9 @@ dependencies = [ [[package]] name = "sea-orm" -version = "0.11.2" +version = "0.11.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c5d875e2fcd965320e50066028ac0b4877ff07edbb734a6ddfeff48a87dbab38" +checksum = "fade86e8d41fd1a4721f84cb834f4ca2783f973cc30e6212b7fafc134f169214" dependencies = [ "async-stream", "async-trait", @@ -4823,9 +4828,9 @@ dependencies = [ [[package]] name = "sea-orm-cli" -version = "0.11.2" +version = "0.11.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ead9f7dac975f10447f17d08edbb2046daa087b5e0b50bbf8211f303459078c" +checksum = "efbf34a2caf70c2e3be9bb1e674e9540f6dfd7c8f40f6f05daf3b9740e476005" dependencies = [ "chrono", "clap", @@ -4833,15 +4838,15 @@ dependencies = [ "regex", "sea-schema", "tracing", - "tracing-subscriber 0.3.16", + "tracing-subscriber 0.3.17", "url", ] [[package]] name = "sea-orm-macros" -version = "0.11.2" +version = "0.11.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9b593e9c0cdbb18cafd4da7b92e67a9c2d9892934f3a2d8bbac73d5ba4a98a1" +checksum = "28936f26d62234ff0be16f80115dbdeb3237fe9c25cf18fbcd1e3b3592360f20" dependencies = [ "bae", "heck 0.3.3", @@ -4852,9 +4857,9 @@ dependencies = [ [[package]] name = "sea-orm-migration" -version = "0.11.2" +version = "0.11.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "edba7a6123c1035b0530deb713820688f0234431ab6c1893b14dce493ade76af" +checksum = "278d3adfd0832b6ffc17d3cfbc574d3695a5c1b38814e0bc8ac238d33f3d87cf" dependencies = [ "async-trait", "clap", @@ -4864,7 +4869,7 @@ dependencies = [ "sea-orm-cli", "sea-schema", "tracing", - "tracing-subscriber 0.3.16", + "tracing-subscriber 0.3.17", ] [[package]] @@ -5339,6 +5344,12 @@ dependencies = [ "rand_core", ] +[[package]] +name = "simdutf8" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f27f6278552951f1f2b8cf9da965d10969b2efdea95a6ec47987ab46edfe263a" + [[package]] name = "siphasher" version = "0.3.10" @@ -5932,7 +5943,7 @@ dependencies = [ "toml 0.5.11", "tonic", "tracing", - "tracing-subscriber 0.3.16", + "tracing-subscriber 0.3.17", "tui", ] @@ -6191,13 +6202,13 @@ dependencies = [ [[package]] name = "tracing-attributes" -version = "0.1.23" +version = "0.1.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4017f8f45139870ca7e672686113917c71c7a6e02d4924eda67186083c03081a" +checksum = "0f57e3ca2a01450b1a921183a9c9cbfda207fd822cef4ccb00a65402cbba7a74" dependencies = [ "proc-macro2", "quote", - "syn 1.0.109", + "syn 2.0.15", ] [[package]] @@ -6275,9 +6286,9 @@ dependencies = [ [[package]] name = "tracing-subscriber" -version = "0.3.16" +version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a6176eae26dd70d0c919749377897b54a9276bd7061339665dd68777926b5a70" +checksum = "30a651bc37f915e81f087d86e62a18eec5f79550c7faff886f7090b4ea757c77" dependencies = [ "matchers 0.1.0", "nu-ansi-term", @@ -6978,9 +6989,9 @@ checksum = "09041cd90cf85f7f8b2df60c646f853b7f535ce68f85244eb6731cf89fa498ec" [[package]] name = "zeroize" -version = "1.5.7" +version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c394b5bd0c6f669e7275d9c20aa90ae064cb22e75a1cad54e1b34088034b149f" +checksum = "2a0956f1ba7c7909bfb66c2e9e4124ab6f6482560f6628b5aaeba39207c9aad9" [[package]] name = "zip" diff --git a/entities/Cargo.toml b/entities/Cargo.toml index 8fe7206e..1d9a1d54 100644 --- a/entities/Cargo.toml +++ b/entities/Cargo.toml @@ -10,7 +10,7 @@ path = "src/mod.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -sea-orm = "0.11.2" +sea-orm = "0.11.3" serde = "1.0.160" uuid = "1.3.1" ethers = "2.0.3" diff --git a/migration/Cargo.toml b/migration/Cargo.toml index 22d5c752..b6fa5de2 100644 --- a/migration/Cargo.toml +++ b/migration/Cargo.toml @@ -12,7 +12,7 @@ path = "src/lib.rs" tokio = { version = "1.27.0", features = ["full", "tracing"] } [dependencies.sea-orm-migration] -version = "0.11.2" +version = "0.11.3" features = [ # Enable at least one `ASYNC_RUNTIME` and `DATABASE_DRIVER` feature if you want to run migration via CLI. # View the list of supported features at https://www.sea-ql.org/SeaORM/docs/install-and-config/database-and-async-runtime. diff --git a/redis-rate-limiter/Cargo.toml b/redis-rate-limiter/Cargo.toml index 5f0480e8..7c7c4c71 100644 --- a/redis-rate-limiter/Cargo.toml +++ b/redis-rate-limiter/Cargo.toml @@ -7,5 +7,5 @@ edition = "2021" [dependencies] anyhow = "1.0.70" chrono = "0.4.24" -deadpool-redis = { version = "0.11.1", features = ["rt_tokio_1", "serde"] } +deadpool-redis = { version = "0.12.0", features = ["rt_tokio_1", "serde"] } tokio = "1.27.0" diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index 1136ad26..7db6e8f4 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -58,13 +58,13 @@ moka = { version = "0.10.2", default-features = false, features = ["future"] } num = "0.4.0" num-traits = "0.2.15" once_cell = { version = "1.17.1" } -ordered-float = "3.6.0" +ordered-float = "3.7.0" pagerduty-rs = { version = "0.1.6", default-features = false, features = ["async", "rustls", "sync"] } parking_lot = { version = "0.12.1", features = ["arc_lock"] } prettytable = "*" proctitle = "0.1.1" rdkafka = { version = "0.29.0" } -regex = "1.7.3" +regex = "1.8.1" reqwest = { version = "0.11.16", default-features = false, features = ["json", "tokio-rustls"] } rmp-serde = "1.1.1" rustc-hash = "1.1.0" diff --git a/web3_proxy/src/app/mod.rs b/web3_proxy/src/app/mod.rs index eec2883f..0f4b30fa 100644 --- a/web3_proxy/src/app/mod.rs +++ b/web3_proxy/src/app/mod.rs @@ -670,12 +670,13 @@ impl Web3ProxyApp { // TODO: capacity from configs // all these are the same size, so no need for a weigher - // TODO: ttl on this? or is max_capacity fine? + // TODO: this used to have a time_to_idle let pending_transactions = Cache::builder() .max_capacity(10_000) // TODO: different chains might handle this differently // TODO: what should we set? 5 minutes is arbitrary. the nodes themselves hold onto transactions for much longer - .time_to_idle(Duration::from_secs(300)) + // TODO: this used to be time_to_update, but + .time_to_live(Duration::from_secs(300)) .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()); // responses can be very different in sizes, so this is a cache with a max capacity and a weigher @@ -695,7 +696,7 @@ impl Web3ProxyApp { } }) // TODO: what should we set? 10 minutes is arbitrary. the nodes themselves hold onto transactions for much longer - .time_to_idle(Duration::from_secs(600)) + .time_to_live(Duration::from_secs(600)) .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()); // all the users are the same size, so no need for a weigher @@ -1208,14 +1209,101 @@ impl Web3ProxyApp { } } + /// try to send transactions to the best available rpcs with private mempools + /// if no private rpcs are configured, then some public rpcs are used instead + async fn try_send_protected( + self: &Arc, + authorization: &Arc, + request: &JsonRpcRequest, + request_metadata: Arc, + num_public_rpcs: Option, + head_block_num: Option, + ) -> Web3ProxyResult { + // TODO: error/wait if no head block? + // TODO: configurable lag + let min_block_needed = head_block_num + .or(self.balanced_rpcs.head_block_num()) + .ok_or_else(|| Web3ProxyError::NoServersSynced)? + .saturating_sub(3.into()); + + if let Some(protected_rpcs) = self.private_rpcs.as_ref() { + if !protected_rpcs.is_empty() { + // send to protected and public rpcs at the same time + // TODO: send to tier 0 of private, wait a block, ..., tier N of private, wait a block, public + // TODO: allow premium users to choose specifically where they want transactions to go + let public_f = { + let authorization = authorization.clone(); + let clone = self.clone(); + // TODO: should request be in an arc? inside the request metadata? + let request = request.clone(); + let request_metadata = Some(request_metadata.clone()); + + async move { + clone + .balanced_rpcs + .try_send_all_synced_connections( + &authorization, + &request, + request_metadata, + Some(&min_block_needed), + None, + Level::Trace, + num_public_rpcs, + true, + ) + .await + } + }; + + let public_handle = tokio::spawn(public_f); + + let protected_response = protected_rpcs + .try_send_all_synced_connections( + authorization, + request, + Some(request_metadata), + None, + None, + Level::Trace, + None, + true, + ) + .await?; + + // wait for sending to the public rpcs to finish + // TODO: let this run in the background instead? + public_handle.await??; + + return Ok(protected_response); + } + } + + // no private rpcs to send to. send to a few public rpcs + // try_send_all_upstream_servers puts the request id into the response. no need to do that ourselves here. + self.balanced_rpcs + .try_send_all_synced_connections( + authorization, + request, + Some(request_metadata), + Some(&min_block_needed), + None, + Level::Trace, + num_public_rpcs, + true, + ) + .await + } + // TODO: more robust stats and kafka logic! if we use the try operator, they aren't saved! + // TODO: move this to another module async fn proxy_cached_request( self: &Arc, authorization: &Arc, mut request: JsonRpcRequest, head_block_num: Option, ) -> Web3ProxyResult<(JsonRpcForwardedResponse, Vec>)> { - // trace!("Received request: {:?}", request); + // TODO: move this code to another module so that its easy to turn this trace logging on in dev + trace!("Received request: {:?}", request); let request_metadata = Arc::new(RequestMetadata::new(request.num_bytes())); @@ -1287,7 +1375,7 @@ impl Web3ProxyApp { // TODO: if eth_chainId or net_version, serve those without querying the backend // TODO: don't clone? - let partial_response: serde_json::Value = match request_method.as_ref() { + let response: JsonRpcForwardedResponse = match request_method.as_ref() { // lots of commands are blocked method @ ("db_getHex" | "db_getString" @@ -1357,14 +1445,11 @@ impl Web3ProxyApp { | "shh_version") => { // i don't think we will ever support these methods // TODO: what error code? - return Ok(( - JsonRpcForwardedResponse::from_string( - format!("method unsupported: {}", method), - None, - Some(request_id), - ), - vec![], - )); + JsonRpcForwardedResponse::from_string( + format!("method unsupported: {}", method), + None, + Some(request_id), + ) } // TODO: implement these commands method @ ("eth_getFilterChanges" @@ -1376,36 +1461,29 @@ impl Web3ProxyApp { | "eth_uninstallFilter") => { // TODO: unsupported command stat. use the count to prioritize new features // TODO: what error code? - return Ok(( - // TODO: what code? - JsonRpcForwardedResponse::from_string( - format!("not yet implemented: {}", method), - None, - Some(request_id), - ), - vec![], - )); + JsonRpcForwardedResponse::from_string( + format!("not yet implemented: {}", method), + None, + Some(request_id), + ) } method @ ("debug_bundler_sendBundleNow" | "debug_bundler_clearState" | "debug_bundler_dumpMempool") => { - return Ok(( - JsonRpcForwardedResponse::from_string( - // TODO: we should probably have some escaping on this. but maybe serde will protect us enough - format!("method unsupported: {}", method), - None, - Some(request_id), - ), - vec![], - )); + JsonRpcForwardedResponse::from_string( + // TODO: we should probably have some escaping on this. but maybe serde will protect us enough + format!("method unsupported: {}", method), + None, + Some(request_id), + ) } - method @ ("eth_sendUserOperation" + _method @ ("eth_sendUserOperation" | "eth_estimateUserOperationGas" | "eth_getUserOperationByHash" | "eth_getUserOperationReceipt" | "eth_supportedEntryPoints") => match self.bundler_4337_rpcs.as_ref() { Some(bundler_4337_rpcs) => { - let response = bundler_4337_rpcs + bundler_4337_rpcs .try_proxy_connection( authorization, request, @@ -1413,56 +1491,39 @@ impl Web3ProxyApp { None, None, ) - .await?; - - // TODO: DRY - let rpcs = request_metadata.backend_requests.lock().clone(); - - if let Some(stat_sender) = self.stat_sender.as_ref() { - let response_stat = RpcQueryStats::new( - Some(method.to_string()), - authorization.clone(), - request_metadata, - response.num_bytes(), - ); - - stat_sender - .send_async(response_stat.into()) - .await - .map_err(Web3ProxyError::SendAppStatError)?; - } - - return Ok((response, rpcs)); + .await? } None => { // TODO: stats! + // TODO: not synced error? return Err(anyhow::anyhow!("no bundler_4337_rpcs available").into()); } }, - // some commands can use local data or caches "eth_accounts" => { - // no stats on this. its cheap - serde_json::Value::Array(vec![]) + JsonRpcForwardedResponse::from_value(serde_json::Value::Array(vec![]), request_id) } "eth_blockNumber" => { match head_block_num.or(self.balanced_rpcs.head_block_num()) { Some(head_block_num) => { - json!(head_block_num) + JsonRpcForwardedResponse::from_value(json!(head_block_num), request_id) } None => { // TODO: what does geth do if this happens? - return Err(Web3ProxyError::UnknownBlockNumber); + // TODO: standard not synced error + return Err(Web3ProxyError::NoServersSynced); } } } - "eth_chainId" => json!(U64::from(self.config.chain_id)), + "eth_chainId" => JsonRpcForwardedResponse::from_value( + json!(U64::from(self.config.chain_id)), + request_id, + ), // TODO: eth_callBundle (https://docs.flashbots.net/flashbots-auction/searchers/advanced/rpc-endpoint#eth_callbundle) // TODO: eth_cancelPrivateTransaction (https://docs.flashbots.net/flashbots-auction/searchers/advanced/rpc-endpoint#eth_cancelprivatetransaction, but maybe just reject) // TODO: eth_sendPrivateTransaction (https://docs.flashbots.net/flashbots-auction/searchers/advanced/rpc-endpoint#eth_sendprivatetransaction) "eth_coinbase" => { // no need for serving coinbase - // no stats on this. its cheap - json!(Address::zero()) + JsonRpcForwardedResponse::from_value(json!(Address::zero()), request_id) } "eth_estimateGas" => { let mut response = self @@ -1501,22 +1562,17 @@ impl Web3ProxyApp { gas_estimate += gas_increase; - json!(gas_estimate) + JsonRpcForwardedResponse::from_value(json!(gas_estimate), request_id) } // TODO: eth_gasPrice that does awesome magic to predict the future - "eth_hashrate" => { - // no stats on this. its cheap - json!(U64::zero()) - } + "eth_hashrate" => JsonRpcForwardedResponse::from_value(json!(U64::zero()), request_id), "eth_mining" => { - // no stats on this. its cheap - serde_json::Value::Bool(false) + JsonRpcForwardedResponse::from_value(serde_json::Value::Bool(false), request_id) } - // TODO: eth_sendBundle (flashbots command) + // TODO: eth_sendBundle (flashbots/eden command) // broadcast transactions to all private rpcs at once "eth_sendRawTransaction" => { - // TODO: how should we handle private_mode here? - let default_num = match authorization.checks.proxy_mode { + let num_public_rpcs = match authorization.checks.proxy_mode { // TODO: how many balanced rpcs should we send to? configurable? percentage of total? ProxyMode::Best | ProxyMode::Debug => Some(4), ProxyMode::Fastest(0) => None, @@ -1527,47 +1583,26 @@ impl Web3ProxyApp { ProxyMode::Versus => None, }; - let (private_rpcs, num) = if let Some(private_rpcs) = self.private_rpcs.as_ref() { - if !private_rpcs.is_empty() && authorization.checks.private_txs { - // if we are sending the transaction privately, no matter the proxy_mode, we send to ALL private rpcs - (private_rpcs, None) - } else { - // TODO: send to balanced_rpcs AND private_rpcs - (&self.balanced_rpcs, default_num) - } - } else { - (&self.balanced_rpcs, default_num) - }; - - let head_block_num = head_block_num - .or(self.balanced_rpcs.head_block_num()) - .ok_or_else(|| Web3ProxyError::NoServersSynced)?; - - // TODO: error/wait if no head block! - - // try_send_all_upstream_servers puts the request id into the response. no need to do that ourselves here. - // TODO: what lag should we allow? - let mut response = private_rpcs - .try_send_all_synced_connections( + let mut response = self + .try_send_protected( authorization, &request, - Some(request_metadata.clone()), - Some(&head_block_num.saturating_sub(2.into())), - None, - Level::Trace, - num, - true, + request_metadata.clone(), + num_public_rpcs, + head_block_num, ) .await?; // sometimes we get an error that the transaction is already known by our nodes, - // that's not really an error. Just return the hash like a successful response would. + // that's not really an error. Return the hash like a successful response would. + // TODO: move this to a helper function if let Some(response_error) = response.error.as_ref() { if response_error.code == -32000 && (response_error.message == "ALREADY_EXISTS: already known" || response_error.message == "INTERNAL_ERROR: existing tx with same hash") { + // TODO: expect instead of web3_context? let params = request .params .web3_context("there must be params if we got this far")?; @@ -1598,9 +1633,7 @@ impl Web3ProxyApp { } } - let rpcs = request_metadata.backend_requests.lock().clone(); - - // emit stats + // emit transaction count stats if let Some(salt) = self.config.public_recent_ips_salt.as_ref() { if let Some(tx_hash) = response.result.clone() { let now = Utc::now().timestamp(); @@ -1638,47 +1671,35 @@ impl Web3ProxyApp { } } - return Ok((response, rpcs)); + response } "eth_syncing" => { // no stats on this. its cheap // TODO: return a real response if all backends are syncing or if no servers in sync - serde_json::Value::Bool(false) - } - "eth_subscribe" => { - return Ok(( - JsonRpcForwardedResponse::from_str( - "notifications not supported. eth_subscribe is only available over a websocket", - Some(-32601), - Some(request_id), - ), - vec![], - )); - } - "eth_unsubscribe" => { - return Ok(( - JsonRpcForwardedResponse::from_str( - "notifications not supported. eth_unsubscribe is only available over a websocket", - Some(-32601), - Some(request_id), - ), - vec![], - )); + JsonRpcForwardedResponse::from_value(serde_json::Value::Bool(false), request_id) } + "eth_subscribe" => JsonRpcForwardedResponse::from_str( + "notifications not supported. eth_subscribe is only available over a websocket", + Some(-32601), + Some(request_id), + ), + "eth_unsubscribe" => JsonRpcForwardedResponse::from_str( + "notifications not supported. eth_unsubscribe is only available over a websocket", + Some(-32601), + Some(request_id), + ), "net_listening" => { - // no stats on this. its cheap // TODO: only if there are some backends on balanced_rpcs? - serde_json::Value::Bool(true) - } - "net_peerCount" => { - // no stats on this. its cheap - // TODO: do something with proxy_mode here? - json!(U64::from(self.balanced_rpcs.num_synced_rpcs())) - } - "web3_clientVersion" => { - // no stats on this. its cheap - serde_json::Value::String(APP_USER_AGENT.to_string()) + JsonRpcForwardedResponse::from_value(serde_json::Value::Bool(true), request_id) } + "net_peerCount" => JsonRpcForwardedResponse::from_value( + json!(U64::from(self.balanced_rpcs.num_synced_rpcs())), + request_id, + ), + "web3_clientVersion" => JsonRpcForwardedResponse::from_value( + serde_json::Value::String(APP_USER_AGENT.to_string()), + request_id, + ), "web3_sha3" => { // returns Keccak-256 (not the standardized SHA3-256) of the given data. match &request.params { @@ -1713,32 +1734,23 @@ impl Web3ProxyApp { let hash = H256::from(keccak256(param)); - json!(hash) + JsonRpcForwardedResponse::from_value(json!(hash), request_id) } _ => { // TODO: this needs the correct error code in the response - // TODO: emit stat? - return Ok(( - JsonRpcForwardedResponse::from_str( - "invalid request", - Some(StatusCode::BAD_REQUEST.as_u16().into()), - Some(request_id), - ), - vec![], - )); + JsonRpcForwardedResponse::from_str( + "invalid request", + Some(StatusCode::BAD_REQUEST.as_u16().into()), + Some(request_id), + ) } } } - "test" => { - return Ok(( - JsonRpcForwardedResponse::from_str( - "The method test does not exist/is not available.", - Some(-32601), - Some(request_id), - ), - vec![], - )); - } + "test" => JsonRpcForwardedResponse::from_str( + "The method test does not exist/is not available.", + Some(-32601), + Some(request_id), + ), // anything else gets sent to backend rpcs and cached method => { if method.starts_with("admin_") { @@ -1893,32 +1905,14 @@ impl Web3ProxyApp { // replace the id with our request's id. response.id = request_id; - // TODO: DRY! - let rpcs = request_metadata.backend_requests.lock().clone(); - - if let Some(stat_sender) = self.stat_sender.as_ref() { - let response_stat = RpcQueryStats::new( - Some(method.to_string()), - authorization.clone(), - request_metadata, - response.num_bytes(), - ); - - stat_sender - .send_async(response_stat.into()) - .await - .map_err(Web3ProxyError::SendAppStatError)?; - } - - return Ok((response, rpcs)); + response } }; - let response = JsonRpcForwardedResponse::from_value(partial_response, request_id); - - // TODO: DRY + // save the rpcs so they can be included in a response header let rpcs = request_metadata.backend_requests.lock().clone(); + // send stats used for accounting and graphs if let Some(stat_sender) = self.stat_sender.as_ref() { let response_stat = RpcQueryStats::new( Some(request_method), @@ -1933,6 +1927,7 @@ impl Web3ProxyApp { .map_err(Web3ProxyError::SendAppStatError)?; } + // send debug info as a kafka log if let Some((kafka_topic, kafka_key, kafka_headers)) = kafka_stuff { let kafka_producer = self .kafka_producer diff --git a/web3_proxy/src/bin/wait_for_sync.rs b/web3_proxy/src/bin/wait_for_sync.rs index c13d5fe5..a44a377c 100644 --- a/web3_proxy/src/bin/wait_for_sync.rs +++ b/web3_proxy/src/bin/wait_for_sync.rs @@ -131,10 +131,12 @@ struct JsonRpcChainIdResult { } async fn get_chain_id(rpc: &str, client: &reqwest::Client) -> anyhow::Result { + // empty params aren't required by the spec, but some rpc providers require them let get_chain_id_request = json!({ "id": "1", "jsonrpc": "2.0", "method": "eth_chainId", + "params": [], }); // TODO: loop until chain id is found? diff --git a/web3_proxy/src/block_number.rs b/web3_proxy/src/block_number.rs index a32392e1..729e36e0 100644 --- a/web3_proxy/src/block_number.rs +++ b/web3_proxy/src/block_number.rs @@ -177,12 +177,13 @@ pub async fn block_needed( "eth_getLogs" => { // TODO: think about this more // TODO: jsonrpc has a specific code for this - // TODO: this shouldn't be a 500. this should be a 400. 500 will make haproxy retry a bunch let obj = params .get_mut(0) - .ok_or_else(|| anyhow::anyhow!("invalid format. no params"))? + .ok_or_else(|| Web3ProxyError::BadRequest("invalid format. no params".to_string()))? .as_object_mut() - .ok_or_else(|| Web3ProxyError::BadRequest("invalid format".to_string()))?; + .ok_or_else(|| { + Web3ProxyError::BadRequest("invalid format. params not object".to_string()) + })?; if obj.contains_key("blockHash") { return Ok(BlockNeeded::CacheSuccessForever); diff --git a/web3_proxy/src/rpcs/many.rs b/web3_proxy/src/rpcs/many.rs index dbead8a8..7a48cdf0 100644 --- a/web3_proxy/src/rpcs/many.rs +++ b/web3_proxy/src/rpcs/many.rs @@ -157,19 +157,21 @@ impl Web3Rpcs { // these blocks don't have full transactions, but they do have rather variable amounts of transaction hashes // TODO: how can we do the weigher better? need to know actual allocated size + // TODO: time_to_idle instead? // TODO: limits from config let blocks_by_hash: BlocksByHashCache = Cache::builder() .max_capacity(1024 * 1024 * 1024) .weigher(|_k, v: &Web3ProxyBlock| { 1 + v.block.transactions.len().try_into().unwrap_or(u32::MAX) }) - .time_to_idle(Duration::from_secs(600)) + .time_to_live(Duration::from_secs(30 * 60)) .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()); // all block numbers are the same size, so no need for weigher // TODO: limits from config + // TODO: time_to_idle instead? let blocks_by_number = Cache::builder() - .time_to_idle(Duration::from_secs(600)) + .time_to_live(Duration::from_secs(30 * 60)) .max_capacity(10_000) .build_with_hasher(hashbrown::hash_map::DefaultHashBuilder::default()); @@ -1099,7 +1101,7 @@ impl Web3Rpcs { /// be sure there is a timeout on this or it might loop forever #[allow(clippy::too_many_arguments)] pub async fn try_send_all_synced_connections( - &self, + self: &Arc, authorization: &Arc, request: &JsonRpcRequest, request_metadata: Option>, diff --git a/web3_proxy/src/rpcs/one.rs b/web3_proxy/src/rpcs/one.rs index 1b7246e2..09a4af4a 100644 --- a/web3_proxy/src/rpcs/one.rs +++ b/web3_proxy/src/rpcs/one.rs @@ -560,12 +560,12 @@ impl Web3Rpc { .context(format!("waiting for request handle on {}", self))? .request( "eth_chainId", - &json!(Option::None::<()>), + &json!(Vec::<()>::new()), Level::Trace.into(), unlocked_provider.clone(), ) .await; - // trace!("found_chain_id: {:?}", found_chain_id); + trace!("found_chain_id: {:#?}", found_chain_id); match found_chain_id { Ok(found_chain_id) => { diff --git a/web3_proxy/src/rpcs/request.rs b/web3_proxy/src/rpcs/request.rs index 4c6b2dbf..ffcfbcee 100644 --- a/web3_proxy/src/rpcs/request.rs +++ b/web3_proxy/src/rpcs/request.rs @@ -219,13 +219,10 @@ impl OpenRequestHandle { .fetch_sub(1, std::sync::atomic::Ordering::Relaxed); // // TODO: i think ethers already has trace logging (and does it much more fancy) - // trace!( - // "response from {} for {} {:?}: {:?}", - // self.rpc, - // method, - // params, - // response, - // ); + debug!( + "response from {} for {} {:?}: {:?}", + self.rpc, method, params, response, + ); if let Err(err) = &response { // only save reverts for some types of calls