From e5e137f76cdd3bbdeda5696225074f60ada49cb1 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Tue, 30 Aug 2022 20:01:42 +0000 Subject: [PATCH] fix deadlock --- .cargo/config.toml | 9 +- Cargo.lock | 319 ++++++------------ TODO.md | 6 +- redis-rate-limit/src/lib.rs | 29 +- web3_proxy/Cargo.toml | 5 +- web3_proxy/src/app.rs | 7 +- web3_proxy/src/config.rs | 12 +- web3_proxy/src/frontend/mod.rs | 12 +- .../{http_proxy.rs => rpc_proxy_http.rs} | 0 .../frontend/{ws_proxy.rs => rpc_proxy_ws.rs} | 0 web3_proxy/src/rpcs/blockchain.rs | 87 +++-- web3_proxy/src/rpcs/connection.rs | 92 +++-- web3_proxy/src/rpcs/connections.rs | 17 +- web3_proxy/src/rpcs/request.rs | 18 +- web3_proxy/src/rpcs/synced_connections.rs | 2 +- 15 files changed, 300 insertions(+), 315 deletions(-) rename web3_proxy/src/frontend/{http_proxy.rs => rpc_proxy_http.rs} (100%) rename web3_proxy/src/frontend/{ws_proxy.rs => rpc_proxy_ws.rs} (100%) diff --git a/.cargo/config.toml b/.cargo/config.toml index 4c1262bc..f4ad2dbf 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -1,3 +1,8 @@ [build] -# potentially faster. https://nnethercote.github.io/perf-book/build-configuration.html -rustflags = ["-C", "target-cpu=native", "--cfg", "tokio_unstable"] +rustflags = [ + # potentially faster. https://nnethercote.github.io/perf-book/build-configuration.html + "-C", "target-cpu=native", + # tokio unstable is needed for tokio-console + "--cfg", "tokio_unstable" +] +rustdocflags = ["--cfg", "tokio_unstable"] diff --git a/Cargo.lock b/Cargo.lock index 62e38d10..dcfd12b0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -33,7 +33,7 @@ version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9e8b47f52ea9bae42228d07ec09eb676433d7c4ed1ebdf0f1d1c29ed446f1ab8" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "cipher", "cpufeatures", "opaque-debug 0.3.0", @@ -80,7 +80,7 @@ version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d52a9bb7ec0cf484c551830a7ce27bd20d67eac647e1befb56b0be4ee39a55d2" dependencies = [ - "winapi 0.3.9", + "winapi", ] [[package]] @@ -210,7 +210,7 @@ dependencies = [ "slab", "socket2", "waker-fn", - "winapi 0.3.9", + "winapi", ] [[package]] @@ -321,7 +321,7 @@ checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" dependencies = [ "hermit-abi", "libc", - "winapi 0.3.9", + "winapi", ] [[package]] @@ -442,7 +442,7 @@ checksum = "11a17d453482a265fd5f8479f2a3f405566e6ca627837aaddb85af8b1ab8ef61" dependencies = [ "addr2line", "cc", - "cfg-if 1.0.0", + "cfg-if", "libc", "miniz_oxide", "object", @@ -786,12 +786,6 @@ dependencies = [ "jobserver", ] -[[package]] -name = "cfg-if" -version = "0.1.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822" - [[package]] name = "cfg-if" version = "1.0.0" @@ -811,7 +805,7 @@ dependencies = [ "serde", "time 0.1.43", "wasm-bindgen", - "winapi 0.3.9", + "winapi", ] [[package]] @@ -947,7 +941,7 @@ checksum = "b3616f750b84d8f0de8a58bda93e08e2a81ad3f523089b05f1dffecab48c6cbd" dependencies = [ "atty", "lazy_static", - "winapi 0.3.9", + "winapi", ] [[package]] @@ -985,7 +979,7 @@ dependencies = [ "regex", "terminal_size", "unicode-width", - "winapi 0.3.9", + "winapi", ] [[package]] @@ -998,7 +992,7 @@ dependencies = [ "libc", "once_cell", "terminal_size", - "winapi 0.3.9", + "winapi", ] [[package]] @@ -1076,7 +1070,7 @@ version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b540bd8bc810d3885c6ea91e2018302f68baba2129ab3e88f32389ee9370880d" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", ] [[package]] @@ -1121,7 +1115,7 @@ version = "0.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5aaa7bd5fb665c6864b5f963dd9097905c54125909c7aa94c9e18507cdbe6c53" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "crossbeam-utils", ] @@ -1131,7 +1125,7 @@ version = "0.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6455c0ca19f0d2fbf751b908d5c55c1f5cbc65e03c4225427254b46890bdde1e" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "crossbeam-epoch", "crossbeam-utils", ] @@ -1143,7 +1137,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1145cf131a2c6ba0615079ab6a638f7e1973ac9c2634fcbeaaad6114246efe8c" dependencies = [ "autocfg 1.1.0", - "cfg-if 1.0.0", + "cfg-if", "crossbeam-utils", "lazy_static", "memoffset", @@ -1156,7 +1150,7 @@ version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1cd42583b04998a5363558e5f9291ee5a5ff6b49944332103f251e7479a82aa7" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "crossbeam-utils", ] @@ -1166,7 +1160,7 @@ version = "0.8.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0bf124c720b7686e3c2663cf54062ab0f68a88af2fb6a030e87e30bf721fcb38" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "lazy_static", ] @@ -1255,7 +1249,7 @@ version = "5.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3495912c9c1ccf2e18976439f4443f3fee0fd61f424ff99fde6a66b15ecb448f" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "hashbrown", "lock_api", "parking_lot_core 0.9.3", @@ -1348,7 +1342,7 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b98cf8ebf19c3d1b223e151f99a4f9f0690dca41414773390fc824184ac833e1" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "dirs-sys-next", ] @@ -1360,7 +1354,7 @@ checksum = "4ebda144c4fe02d1f7ea1a7d9641b6fc6b580adcfa024ae48797ecdeb6825b4d" dependencies = [ "libc", "redox_users", - "winapi 0.3.9", + "winapi", ] [[package]] @@ -1446,7 +1440,7 @@ version = "0.8.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9852635589dc9f9ea1b6fe9f05b50ef208c85c834a562f0c6abb1c475736ec2b" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", ] [[package]] @@ -1577,7 +1571,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bda76ce804d524f693a898dc5857d08f4db443f3da64d0c36237fa05c0ecef30" dependencies = [ "Inflector", - "cfg-if 1.0.0", + "cfg-if", "dunce", "ethers-core", "eyre", @@ -1728,7 +1722,7 @@ version = "0.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ebe5db405d0e584aa8dae154ffebb90f2305cae588fd11d9f6b857ebe3a79294" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "colored", "dunce", "ethers-core", @@ -1848,7 +1842,7 @@ version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e94a7bbaa59354bc20dd75b67f23e2797b4490e9d6928203fb105c79e448c86c" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "libc", "redox_syscall", "windows-sys", @@ -1928,24 +1922,14 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9564fc758e15025b46aa6643b1b77d047d1a56a1aea6e01002ac0c7026876213" dependencies = [ "libc", - "winapi 0.3.9", -] - -[[package]] -name = "fsevent" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ab7d1bd1bd33cc98b0889831b72da23c0aa4df9cec7e0702f46ecea04b35db6" -dependencies = [ - "bitflags", - "fsevent-sys", + "winapi", ] [[package]] name = "fsevent-sys" -version = "2.0.1" +version = "4.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f41b048a94555da0f42f1d632e2e19510084fb8e303b0daa2816e733fb3644a0" +checksum = "76ee7a02da4d231650c7cea31349b889be2f45ddb3ef3032d2ec8185f6313fd2" dependencies = [ "libc", ] @@ -1956,22 +1940,6 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a06f77d526c1a601b7c4cdd98f54b5eaabffc14d5f2f0296febdc7f357c6d3ba" -[[package]] -name = "fuchsia-zircon" -version = "0.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e9763c69ebaae630ba35f74888db465e49e259ba1bc0eda7d06f4a067615d82" -dependencies = [ - "bitflags", - "fuchsia-zircon-sys", -] - -[[package]] -name = "fuchsia-zircon-sys" -version = "0.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7" - [[package]] name = "funty" version = "2.0.0" @@ -1980,9 +1948,9 @@ checksum = "e6d5a32815ae3f33302d95fdcb2ce17862f8c65363dcfd29360480ba1001fc9c" [[package]] name = "futures" -version = "0.3.23" +version = "0.3.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab30e97ab6aacfe635fad58f22c2bb06c8b685f7421eb1e064a729e2a5f481fa" +checksum = "7f21eda599937fba36daeb58a22e8f5cee2d14c4a17b5b7739c7c8e5e3b8230c" dependencies = [ "futures-channel", "futures-core", @@ -1995,9 +1963,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.23" +version = "0.3.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2bfc52cbddcfd745bf1740338492bb0bd83d76c67b445f91c5fb29fae29ecaa1" +checksum = "30bdd20c28fadd505d0fd6712cdfcb0d4b5648baf45faef7f852afb2399bb050" dependencies = [ "futures-core", "futures-sink", @@ -2005,15 +1973,15 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.23" +version = "0.3.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d2acedae88d38235936c3922476b10fced7b2b68136f5e3c03c2d5be348a1115" +checksum = "4e5aa3de05362c3fb88de6531e6296e85cde7739cccad4b9dfeeb7f6ebce56bf" [[package]] name = "futures-executor" -version = "0.3.23" +version = "0.3.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d11aa21b5b587a64682c0094c2bdd4df0076c5324961a40cc3abd7f37930528" +checksum = "9ff63c23854bee61b6e9cd331d523909f238fc7636290b96826e9cfa5faa00ab" dependencies = [ "futures-core", "futures-task", @@ -2034,9 +2002,9 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.23" +version = "0.3.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93a66fc6d035a26a3ae255a6d2bca35eda63ae4c5512bef54449113f7a1228e5" +checksum = "bbf4d2a7a308fd4578637c0b17c7e1c7ba127b8f6ba00b29f717e9655d85eb68" [[package]] name = "futures-lite" @@ -2066,9 +2034,9 @@ dependencies = [ [[package]] name = "futures-macro" -version = "0.3.23" +version = "0.3.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0db9cce532b0eae2ccf2766ab246f114b56b9cf6d445e00c2549fbc100ca045d" +checksum = "42cd15d1c7456c04dbdf7e88bcd69760d74f3a798d6444e16974b505b0e62f17" dependencies = [ "proc-macro2", "quote", @@ -2077,15 +2045,15 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.23" +version = "0.3.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca0bae1fe9752cf7fd9b0064c674ae63f97b37bc714d745cbde0afb7ec4e6765" +checksum = "21b20ba5a92e727ba30e72834706623d94ac93a725410b6a6b6fbc1b07f7ba56" [[package]] name = "futures-task" -version = "0.3.23" +version = "0.3.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "842fc63b931f4056a24d59de13fb1272134ce261816e063e634ad0c15cdc5306" +checksum = "a6508c467c73851293f390476d4491cf4d227dbabcd4170f3bb6044959b294f1" [[package]] name = "futures-timer" @@ -2095,9 +2063,9 @@ checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c" [[package]] name = "futures-util" -version = "0.3.23" +version = "0.3.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f0828a5471e340229c11c77ca80017937ce3c58cb788a17e5f1c2d5c485a9577" +checksum = "44fb6cb1be61cc1d2e43b262516aafcf63b241cffdb1d3fa115f91d9c7b09c90" dependencies = [ "futures-channel", "futures-core", @@ -2145,7 +2113,7 @@ version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9be70c98951c83b8d2f8f60d7065fa6d5146873094452a1008da8c2f1e4205ad" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "js-sys", "libc", "wasi 0.10.2+wasi-snapshot-preview1", @@ -2327,7 +2295,7 @@ version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2456aef2e6b6a9784192ae780c0f15bc57df0e918585282325e8c8ac27737654" dependencies = [ - "winapi 0.3.9", + "winapi", ] [[package]] @@ -2417,7 +2385,7 @@ dependencies = [ "core-foundation-sys", "js-sys", "wasm-bindgen", - "winapi 0.3.9", + "winapi", ] [[package]] @@ -2499,9 +2467,9 @@ dependencies = [ [[package]] name = "inotify" -version = "0.7.1" +version = "0.9.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4816c66d2c8ae673df83366c18341538f234a26d65a9ecea5c348b453ac1d02f" +checksum = "f8069d3ec154eb856955c1c0fbffefbf5f3c40a104ec912d4797314c1801abff" dependencies = [ "bitflags", "inotify-sys", @@ -2523,21 +2491,12 @@ version = "0.1.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "js-sys", "wasm-bindgen", "web-sys", ] -[[package]] -name = "iovec" -version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b2b3ea6ff95e175473f8ffe6a7eb7c00d054240321b84c57051175fe3c1e075e" -dependencies = [ - "libc", -] - [[package]] name = "ipnet" version = "2.5.0" @@ -2598,7 +2557,7 @@ version = "0.11.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2c8a5a96d92d849c4499d99461da81c9cdc1467418a8ed2aaeb407e8d85940ed" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "ecdsa", "elliptic-curve", "sha2 0.10.2", @@ -2612,13 +2571,23 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f9b7d56ba4a8344d6be9729995e6b06f928af29998cdf79fe390cbf6b1fee838" [[package]] -name = "kernel32-sys" -version = "0.2.2" +name = "kqueue" +version = "1.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d" +checksum = "4d6112e8f37b59803ac47a42d14f1f3a59bbf72fc6857ffc5be455e28a691f8e" dependencies = [ - "winapi 0.2.8", - "winapi-build", + "kqueue-sys", + "libc", +] + +[[package]] +name = "kqueue-sys" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8367585489f01bc55dd27404dcf56b95e6da061a256a666ab23be9ba96a2e587" +dependencies = [ + "bitflags", + "libc", ] [[package]] @@ -2671,12 +2640,6 @@ dependencies = [ "spin 0.5.2", ] -[[package]] -name = "lazycell" -version = "1.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" - [[package]] name = "libc" version = "0.2.126" @@ -2722,7 +2685,7 @@ version = "0.4.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "abb12e687cfb44aa40f41fc3978ef76448f9b6038cad6aef4259d3c095a2382e" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "value-bag", ] @@ -2800,25 +2763,6 @@ dependencies = [ "adler", ] -[[package]] -name = "mio" -version = "0.6.23" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4afd66f5b91bf2a3bc13fad0e21caedac168ca4c707504e75585648ae80e4cc4" -dependencies = [ - "cfg-if 0.1.10", - "fuchsia-zircon", - "fuchsia-zircon-sys", - "iovec", - "kernel32-sys", - "libc", - "log", - "miow", - "net2", - "slab", - "winapi 0.2.8", -] - [[package]] name = "mio" version = "0.8.3" @@ -2831,30 +2775,6 @@ dependencies = [ "windows-sys", ] -[[package]] -name = "mio-extras" -version = "2.0.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52403fe290012ce777c4626790c8951324a2b9e3316b3143779c72b029742f19" -dependencies = [ - "lazycell", - "log", - "mio 0.6.23", - "slab", -] - -[[package]] -name = "miow" -version = "0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ebd808424166322d4a38da87083bfddd3ac4c131334ed55856112eb06d46944d" -dependencies = [ - "kernel32-sys", - "net2", - "winapi 0.2.8", - "ws2_32-sys", -] - [[package]] name = "nanorand" version = "0.7.0" @@ -2864,17 +2784,6 @@ dependencies = [ "getrandom", ] -[[package]] -name = "net2" -version = "0.2.37" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "391630d12b68002ae1e25e8f974306474966550ad82dac6886fb8910c19568ae" -dependencies = [ - "cfg-if 0.1.10", - "libc", - "winapi 0.3.9", -] - [[package]] name = "new_debug_unreachable" version = "1.0.4" @@ -2899,20 +2808,20 @@ checksum = "e9e591e719385e6ebaeb5ce5d3887f7d5676fceca6411d1925ccc95745f3d6f7" [[package]] name = "notify" -version = "4.0.17" +version = "5.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae03c8c853dba7bfd23e571ff0cff7bc9dceb40a4cd684cd1681824183f45257" +checksum = "ed2c66da08abae1c024c01d635253e402341b4060a12e99b31c7594063bf490a" dependencies = [ "bitflags", + "crossbeam-channel", "filetime", - "fsevent", "fsevent-sys", "inotify", + "kqueue", "libc", - "mio 0.6.23", - "mio-extras", + "mio", "walkdir", - "winapi 0.3.9", + "winapi", ] [[package]] @@ -3156,12 +3065,12 @@ version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d76e8e1493bcac0d2766c42737f34458f1c8c50c0d23bcb24ea953affb273216" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "instant", "libc", "redox_syscall", "smallvec", - "winapi 0.3.9", + "winapi", ] [[package]] @@ -3171,7 +3080,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09a279cbf25cb0757810394fbc1e359949b59e348145c643a939a525692e6929" dependencies = [ "backtrace", - "cfg-if 1.0.0", + "cfg-if", "libc", "petgraph", "redox_syscall", @@ -3471,11 +3380,11 @@ version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "685404d509889fade3e86fe3a5803bca2ec09b0c0778d5ada6ec8bf7a8de5259" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "libc", "log", "wepoll-ffi", - "winapi 0.3.9", + "winapi", ] [[package]] @@ -3594,7 +3503,7 @@ checksum = "924cd8a0de90723d63fed19c5035ea129913a0bc998b37686a67f1eaf6a2aab5" dependencies = [ "lazy_static", "libc", - "winapi 0.3.9", + "winapi", ] [[package]] @@ -3657,7 +3566,7 @@ dependencies = [ "rand_os", "rand_pcg", "rand_xorshift", - "winapi 0.3.9", + "winapi", ] [[package]] @@ -3741,7 +3650,7 @@ checksum = "1166d5c91dc97b88d1decc3285bb0a99ed84b05cfd0bc2341bdf2d43fc41e39b" dependencies = [ "libc", "rand_core 0.4.2", - "winapi 0.3.9", + "winapi", ] [[package]] @@ -3756,7 +3665,7 @@ dependencies = [ "rand_core 0.4.2", "rdrand", "wasm-bindgen", - "winapi 0.3.9", + "winapi", ] [[package]] @@ -3891,7 +3800,7 @@ version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3acd125665422973a33ac9d3dd2df85edad0f4ae9b00dafb1a05e43a9f5ef8e7" dependencies = [ - "winapi 0.3.9", + "winapi", ] [[package]] @@ -3956,7 +3865,7 @@ dependencies = [ "spin 0.5.2", "untrusted", "web-sys", - "winapi 0.3.9", + "winapi", ] [[package]] @@ -4383,7 +4292,7 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "028f48d513f9678cda28f6e4064755b3fbb2af6acd672f2c209b62323f7aea0f" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "cpufeatures", "digest 0.10.3", ] @@ -4394,7 +4303,7 @@ version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c77f4e7f65455545c2153c1253d25056825e77ee2533f0e41deb65a93a34852f" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "cpufeatures", "digest 0.10.3", ] @@ -4418,7 +4327,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4d58a1e1bf39749807d89cf2d98ac2dfa0ff1cb3faa38fbb64dd88ac8013d800" dependencies = [ "block-buffer 0.9.0", - "cfg-if 1.0.0", + "cfg-if", "cpufeatures", "digest 0.9.0", "opaque-debug 0.3.0", @@ -4430,7 +4339,7 @@ version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "55deaec60f81eefe3cce0dc50bda92d6d8e88f2a27df7c5033b42afeb1ed2676" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "cpufeatures", "digest 0.10.3", ] @@ -4529,7 +4438,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "66d72b759436ae32898a2af0a14218dbf55efde3feeb170eb623637db85ee1e0" dependencies = [ "libc", - "winapi 0.3.9", + "winapi", ] [[package]] @@ -4762,7 +4671,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a55e00b6f95abd889ce398bd7eab2a9c62cd27281cf1bfba70847340557434cf" dependencies = [ "anyhow", - "cfg-if 1.0.0", + "cfg-if", "clap 3.2.15", "console 0.14.1", "dialoguer", @@ -4815,12 +4724,12 @@ version = "3.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5cdb1ef4eaeeaddc8fbd371e5017057064af0911902ef36b39801f67cc6d79e4" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "fastrand", "libc", "redox_syscall", "remove_dir_all", - "winapi 0.3.9", + "winapi", ] [[package]] @@ -4831,7 +4740,7 @@ checksum = "c59df8ac95d96ff9bede18eb7300b0fda5e5d8d90960e76f8e14ae765eedbf1f" dependencies = [ "dirs-next", "rustversion", - "winapi 0.3.9", + "winapi", ] [[package]] @@ -4850,7 +4759,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "633c1a546cee861a1a6d0dc69ebeca693bf4296661ba7852b9d21d159e0506df" dependencies = [ "libc", - "winapi 0.3.9", + "winapi", ] [[package]] @@ -4896,7 +4805,7 @@ checksum = "5fdfe0627923f7411a43ec9ec9c39c3a9b4151be313e0922042581fb6c9b717f" dependencies = [ "libc", "redox_syscall", - "winapi 0.3.9", + "winapi", ] [[package]] @@ -4915,7 +4824,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ca8a50ef2360fbd1eeb0ecd46795a87a19024eb4b53c5dc916ca1fd95fe62438" dependencies = [ "libc", - "winapi 0.3.9", + "winapi", ] [[package]] @@ -4980,7 +4889,7 @@ dependencies = [ "bytes", "libc", "memchr", - "mio 0.8.3", + "mio", "num_cpus", "once_cell", "parking_lot 0.12.1", @@ -4989,7 +4898,7 @@ dependencies = [ "socket2", "tokio-macros", "tracing", - "winapi 0.3.9", + "winapi", ] [[package]] @@ -5145,7 +5054,7 @@ version = "0.1.36" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2fce9567bd60a67d08a16488756721ba392f24f29006402881e43b19aac64307" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "log", "pin-project-lite", "tracing-attributes", @@ -5409,7 +5318,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "808cf2735cd4b6866113f648b791c6adc5714537bc222d9347bb203386ffda56" dependencies = [ "same-file", - "winapi 0.3.9", + "winapi", "winapi-util", ] @@ -5441,7 +5350,7 @@ version = "0.2.82" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc7652e3f6c4706c8d9cd54832c4a4ccb9b5336e2c3bd154d5cccfbf1c1f5f7d" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "wasm-bindgen-macro", ] @@ -5466,7 +5375,7 @@ version = "0.4.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6f741de44b75e14c35df886aff5f1eb73aa114fa5d4d00dcd37b5e01259bf3b2" dependencies = [ - "cfg-if 1.0.0", + "cfg-if", "js-sys", "wasm-bindgen", "web-sys", @@ -5609,12 +5518,6 @@ dependencies = [ "cc", ] -[[package]] -name = "winapi" -version = "0.2.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "167dc9d6949a9b857f3451275e911c3f44255842c1f7a76f33c55103a909087a" - [[package]] name = "winapi" version = "0.3.9" @@ -5625,12 +5528,6 @@ dependencies = [ "winapi-x86_64-pc-windows-gnu", ] -[[package]] -name = "winapi-build" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2d315eee3b34aca4797b2da6b13ed88266e6d612562a0c46390af8299fc699bc" - [[package]] name = "winapi-i686-pc-windows-gnu" version = "0.4.0" @@ -5643,7 +5540,7 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178" dependencies = [ - "winapi 0.3.9", + "winapi", ] [[package]] @@ -5701,17 +5598,7 @@ version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "80d0f4e272c85def139476380b12f9ac60926689dd2e01d4923222f40580869d" dependencies = [ - "winapi 0.3.9", -] - -[[package]] -name = "ws2_32-sys" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d59cefebd0c892fa2dd6de581e937301d8552cb44489cdff035c6187cb63fa5e" -dependencies = [ - "winapi 0.2.8", - "winapi-build", + "winapi", ] [[package]] diff --git a/TODO.md b/TODO.md index c25bbe10..aa23b2e6 100644 --- a/TODO.md +++ b/TODO.md @@ -114,6 +114,7 @@ - [x] now that we have a consensus head with enough soft limit (or an empty set), update SyncedConnections - [x] send the block through new head_block_sender - [x] rewrite cannonical_block +- [ ] bug around eth_getBlockByHash sometimes causes tokio to lock up - [-] use siwe messages and signatures for sign up and login - [-] requests for "Get transactions receipts" are routed to the private_rpcs and not the balanced_rpcs. do this better. - [x] quick fix, send to balanced_rpcs for now. we will just live with errors on new transactions. @@ -123,7 +124,10 @@ - [-] basic request method stats (using the user_id and other fields that are in the tracing frame) - [ ] "chain is forked" message is wrong. it includes nodes just being on different heights of the same chain. need a smarter check - i think there is also a bug because i've seen "server not synced" a couple times - - [ ] i saw a fork of like 300 blocks. probably just because a node was restarted and had fallen behind. need some checks to ignore things that are far behind + - [x] i saw a fork of like 300 blocks. probably just because a node was restarted and had fallen behind. need some checks to ignore things that are far behind +- [ ] todo!("pick the block on the current consensus chain") +- [ ] web3connection3.block(...) might wait forever. be sure to do it safely +- [ ] search for all "todo!" ## V1 diff --git a/redis-rate-limit/src/lib.rs b/redis-rate-limit/src/lib.rs index 84c2ec92..efdc754f 100644 --- a/redis-rate-limit/src/lib.rs +++ b/redis-rate-limit/src/lib.rs @@ -5,6 +5,7 @@ use anyhow::Context; use bb8_redis::redis::pipe; use std::ops::Add; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; +use tracing::trace; pub use crate::errors::{RedisError, RedisErrorSink}; pub use bb8_redis::{bb8, redis, RedisConnectionManager}; @@ -14,8 +15,10 @@ pub type RedisPool = bb8::Pool; pub struct RedisRateLimit { pool: RedisPool, key_prefix: String, - default_max_per_period: u64, - period: u64, + /// The default maximum requests allowed in a period. + max_requests_per_period: u64, + /// seconds + period: f32, } pub enum ThrottleResult { @@ -29,27 +32,29 @@ impl RedisRateLimit { pool: RedisPool, app: &str, label: &str, - default_max_per_period: u64, - period: u64, + max_requests_per_period: u64, + period: f32, ) -> Self { let key_prefix = format!("{}:rrl:{}", app, label); Self { pool, key_prefix, - default_max_per_period, + max_requests_per_period, period, } } - /// label might be an ip address or a user_key id + /// label might be an ip address or a user_key id. + /// if setting max_per_period, be sure to keep the period the same for all requests to this label + /// TODO: pub async fn throttle_label( &self, label: &str, max_per_period: Option, count: u64, ) -> anyhow::Result { - let max_per_period = max_per_period.unwrap_or(self.default_max_per_period); + let max_per_period = max_per_period.unwrap_or(self.max_requests_per_period); if max_per_period == 0 { return Ok(ThrottleResult::RetryNever); @@ -58,7 +63,7 @@ impl RedisRateLimit { let now = SystemTime::now() .duration_since(UNIX_EPOCH) .context("cannot tell the time")? - .as_secs(); + .as_secs_f32(); // if self.period is 60, period_id will be the minute of the current time let period_id = (now / self.period) % self.period; @@ -71,7 +76,7 @@ impl RedisRateLimit { // we could get the key first, but that means an extra redis call for every check. this seems better .incr(&throttle_key, count) // set expiration the first time we set the key. ignore the result - .expire(&throttle_key, self.period.try_into().unwrap()) + .expire(&throttle_key, self.period as usize) // .arg("NX") // TODO: this works in redis, but not elasticache .ignore() // do the query @@ -84,9 +89,11 @@ impl RedisRateLimit { .ok_or_else(|| anyhow::anyhow!("check rate limit result"))?; if new_count > &max_per_period { - let seconds_left_in_period = self.period - now / self.period; + let seconds_left_in_period = self.period - (now % self.period); - let retry_at = Instant::now().add(Duration::from_secs(seconds_left_in_period)); + let retry_at = Instant::now().add(Duration::from_secs_f32(seconds_left_in_period)); + + trace!(%label, ?retry_at, "rate limited"); return Ok(ThrottleResult::RetryAt(retry_at)); } diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index a2ef2205..d8153d67 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -23,6 +23,7 @@ axum = { version = "0.5.15", features = ["headers", "serde_json", "tokio-tungste axum-auth = "0.3.0" axum-client-ip = "0.2.0" axum-macros = "0.2.3" +# TODO: turn tokio-console on with a feature. console-subscriber = { version = "0.1.7" } counter = "0.5.6" dashmap = "5.3.4" derive_more = "0.99.17" @@ -30,12 +31,12 @@ dotenv = "0.15.0" ethers = { version = "0.17.0", features = ["rustls", "ws"] } fdlimit = "0.2.1" flume = "0.10.14" -futures = { version = "0.3.23", features = ["thread-pool"] } +futures = { version = "0.3.24", features = ["thread-pool"] } hashbrown = { version = "0.12.3", features = ["serde"] } http = "0.2.8" indexmap = "1.9.1" fifomap = { path = "../fifomap" } -notify = "4.0.17" +notify = "5.0.0" num = "0.4.0" parking_lot = { version = "0.12.1", features = ["arc_lock"] } petgraph = "0.6.2" diff --git a/web3_proxy/src/app.rs b/web3_proxy/src/app.rs index 2e36d5fc..90e345b2 100644 --- a/web3_proxy/src/app.rs +++ b/web3_proxy/src/app.rs @@ -6,7 +6,8 @@ use crate::jsonrpc::JsonRpcForwardedResponse; use crate::jsonrpc::JsonRpcForwardedResponseEnum; use crate::jsonrpc::JsonRpcRequest; use crate::jsonrpc::JsonRpcRequestEnum; -use crate::rpcs::connections::{BlockHashesMap, Web3Connections}; +use crate::rpcs::blockchain::{ArcBlock, BlockHashesMap}; +use crate::rpcs::connections::Web3Connections; use crate::rpcs::transactions::TxStatus; use crate::stats::AppStats; use anyhow::Context; @@ -82,7 +83,7 @@ pub struct Web3ProxyApp { response_cache: ResponseLrcCache, // don't drop this or the sender will stop working // TODO: broadcast channel instead? - head_block_receiver: watch::Receiver>>, + head_block_receiver: watch::Receiver, pending_tx_sender: broadcast::Sender, pub config: AppConfig, pub db_conn: Option, @@ -299,7 +300,7 @@ impl Web3ProxyApp { "web3_proxy", "frontend", top_config.app.public_rate_limit_per_minute, - 60, + 60.0, ) }); diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index e4a4f7fb..afad3c26 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -1,15 +1,16 @@ -use crate::app::AnyhowJoinHandle; +use crate::rpcs::blockchain::BlockHashesMap; use crate::rpcs::connection::Web3Connection; -use crate::rpcs::connections::BlockHashesMap; +use crate::{app::AnyhowJoinHandle, rpcs::blockchain::ArcBlock}; use argh::FromArgs; use derive_more::Constructor; -use ethers::prelude::{Block, TxHash}; +use ethers::prelude::TxHash; use hashbrown::HashMap; use serde::Deserialize; use std::sync::Arc; use tokio::sync::broadcast; -pub type BlockAndRpc = (Arc>, Arc); +pub type BlockAndRpc = (ArcBlock, Arc); +pub type TxHashAndRpc = (TxHash, Arc); #[derive(Debug, FromArgs)] /// Web3_proxy is a fast caching and load balancing proxy for web3 (Ethereum or similar) JsonRPC servers. @@ -96,6 +97,7 @@ pub struct Web3ConnectionConfig { impl Web3ConnectionConfig { /// Create a Web3Connection from config + /// TODO: move this into Web3Connection (just need to make things pub(crate)) // #[instrument(name = "try_build_Web3ConnectionConfig", skip_all)] #[allow(clippy::too_many_arguments)] pub async fn spawn( @@ -107,7 +109,7 @@ impl Web3ConnectionConfig { http_interval_sender: Option>>, block_map: BlockHashesMap, block_sender: Option>, - tx_id_sender: Option)>>, + tx_id_sender: Option>, ) -> anyhow::Result<(Arc, AnyhowJoinHandle<()>)> { let hard_limit = match (self.hard_limit, redis_pool) { (None, None) => None, diff --git a/web3_proxy/src/frontend/mod.rs b/web3_proxy/src/frontend/mod.rs index 007e5654..34074aab 100644 --- a/web3_proxy/src/frontend/mod.rs +++ b/web3_proxy/src/frontend/mod.rs @@ -1,9 +1,9 @@ mod errors; mod http; -mod http_proxy; mod rate_limit; +mod rpc_proxy_http; +mod rpc_proxy_ws; mod users; -mod ws_proxy; use crate::app::Web3ProxyApp; use ::http::Request; @@ -45,10 +45,10 @@ pub async fn serve(port: u16, proxy_app: Arc) -> anyhow::Result<() // build our axum Router let app = Router::new() // routes should be order most to least common - .route("/", post(http_proxy::public_proxy_web3_rpc)) - .route("/", get(ws_proxy::public_websocket_handler)) - .route("/u/:user_key", post(http_proxy::user_proxy_web3_rpc)) - .route("/u/:user_key", get(ws_proxy::user_websocket_handler)) + .route("/", post(rpc_proxy_http::public_proxy_web3_rpc)) + .route("/", get(rpc_proxy_ws::public_websocket_handler)) + .route("/u/:user_key", post(rpc_proxy_http::user_proxy_web3_rpc)) + .route("/u/:user_key", get(rpc_proxy_ws::user_websocket_handler)) .route("/health", get(http::health)) // TODO: we probably want to remove /status in favor of the separate prometheus thread .route("/status", get(http::status)) diff --git a/web3_proxy/src/frontend/http_proxy.rs b/web3_proxy/src/frontend/rpc_proxy_http.rs similarity index 100% rename from web3_proxy/src/frontend/http_proxy.rs rename to web3_proxy/src/frontend/rpc_proxy_http.rs diff --git a/web3_proxy/src/frontend/ws_proxy.rs b/web3_proxy/src/frontend/rpc_proxy_ws.rs similarity index 100% rename from web3_proxy/src/frontend/ws_proxy.rs rename to web3_proxy/src/frontend/rpc_proxy_ws.rs diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index f9c59a39..8850bb54 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -5,7 +5,10 @@ use super::transactions::TxStatus; use crate::{ config::BlockAndRpc, jsonrpc::JsonRpcRequest, rpcs::synced_connections::SyncedConnections, }; -use dashmap::mapref::{entry::Entry, one::Ref}; +use dashmap::{ + mapref::{entry::Entry, one::Ref}, + DashMap, +}; use derive_more::From; use ethers::prelude::{Block, TxHash, H256, U64}; use hashbrown::{HashMap, HashSet}; @@ -15,6 +18,10 @@ use std::sync::Arc; use tokio::sync::{broadcast, watch}; use tracing::{debug, info, trace, warn}; +pub type ArcBlock = Arc>; + +pub type BlockHashesMap = Arc>; + /// A block's hash and number. #[derive(Default, From)] pub struct BlockId { @@ -24,51 +31,59 @@ pub struct BlockId { impl Web3Connections { /// add a block to our map and it's hash to our graphmap of the blockchain - pub fn save_block(&self, block: &Arc>) -> anyhow::Result<()> { - let block_hash = block.hash.ok_or_else(|| anyhow::anyhow!("no block hash"))?; + pub fn save_block(&self, block: &ArcBlock) -> anyhow::Result<()> { + let block_hash = block + .hash + .as_ref() + .ok_or_else(|| anyhow::anyhow!("no block hash"))?; let block_num = block .number + .as_ref() .ok_or_else(|| anyhow::anyhow!("no block num"))?; + let _block_td = block + .total_difficulty + .as_ref() + .ok_or_else(|| anyhow::anyhow!("no block total difficulty"))?; - if self.block_hashes.contains_key(&block_hash) { + if self.block_hashes.contains_key(block_hash) { // this block is already included. no need to continue return Ok(()); } let mut blockchain = self.blockchain_graphmap.write(); - if blockchain.contains_node(block_hash) { + if blockchain.contains_node(*block_hash) { // this hash is already included. we must have hit that race condition // return now since this work was already done. return Ok(()); } // TODO: theres a small race between contains_key and insert - if let Some(_overwritten) = self.block_hashes.insert(block_hash, block.clone()) { + if let Some(_overwritten) = self.block_hashes.insert(*block_hash, block.clone()) { // there was a race and another thread wrote this block // i don't think this will happen. the blockchain.conains_node above should be enough // no need to continue because that other thread would have written (or soon will) write the return Ok(()); } - match self.block_numbers.entry(block_num) { + match self.block_numbers.entry(*block_num) { Entry::Occupied(mut x) => { - x.get_mut().push(block_hash); + x.get_mut().push(*block_hash); } Entry::Vacant(x) => { - x.insert(vec![block_hash]); + x.insert(vec![*block_hash]); } } // TODO: prettier log? or probably move the log somewhere else trace!(%block_hash, "new block"); - blockchain.add_node(block_hash); + blockchain.add_node(*block_hash); // what should edge weight be? and should the nodes be the blocks instead? // TODO: maybe the weight should be the block? // we store parent_hash -> hash because the block already stores the parent_hash - blockchain.add_edge(block.parent_hash, block_hash, 0); + blockchain.add_edge(block.parent_hash, *block_hash, 0); // TODO: prune block_numbers and block_map to only keep a configurable (256 on ETH?) number of blocks? @@ -77,11 +92,12 @@ impl Web3Connections { /// Get a block from caches with fallback. /// Will query a specific node or the best available. + /// WARNING! This may wait forever. be sure this runs with your own timeout pub async fn block( &self, hash: &H256, rpc: Option<&Arc>, - ) -> anyhow::Result>> { + ) -> anyhow::Result { // first, try to get the hash from our cache if let Some(block) = self.block_hashes.get(hash) { return Ok(block.clone()); @@ -118,6 +134,7 @@ impl Web3Connections { let block = Arc::new(block); + // the block was fetched using eth_getBlockByHash, so it should have all fields self.save_block(&block)?; Ok(block) @@ -133,9 +150,9 @@ impl Web3Connections { } /// Get the heaviest chain's block from cache or backend rpc - pub async fn cannonical_block(&self, num: &U64) -> anyhow::Result>> { + pub async fn cannonical_block(&self, num: &U64) -> anyhow::Result { // we only have blocks by hash now - // maybe save them during save_block in a blocks_by_number DashMap>>> + // maybe save them during save_block in a blocks_by_number DashMap> // if theres multiple, use petgraph to find the one on the main chain (and remove the others if they have enough confirmations) // first, try to get the hash from our cache @@ -189,6 +206,7 @@ impl Web3Connections { let block = Arc::new(block); + // the block was fetched using eth_getBlockByNumber, so it should have all fields self.save_block(&block)?; Ok(block) @@ -198,7 +216,7 @@ impl Web3Connections { &self, block_receiver: flume::Receiver, // TODO: head_block_sender should be a broadcast_sender like pending_tx_sender - head_block_sender: watch::Sender>>, + head_block_sender: watch::Sender, pending_tx_sender: Option>, ) -> anyhow::Result<()> { // TODO: indexmap or hashmap? what hasher? with_capacity? @@ -228,9 +246,9 @@ impl Web3Connections { async fn process_block_from_rpc( &self, connection_heads: &mut HashMap, - rpc_head_block: Arc>, + rpc_head_block: ArcBlock, rpc: Arc, - head_block_sender: &watch::Sender>>, + head_block_sender: &watch::Sender, pending_tx_sender: &Option>, ) -> anyhow::Result<()> { // add the block to connection_heads @@ -257,7 +275,7 @@ impl Web3Connections { // iterate the rpc_map to find the highest_work_block let mut checked_heads = HashSet::new(); - let mut highest_work_block: Option>>> = None; + let mut highest_work_block: Option> = None; for (_rpc_name, rpc_head_hash) in connection_heads.iter() { if checked_heads.contains(rpc_head_hash) { @@ -268,11 +286,26 @@ impl Web3Connections { let rpc_head_block = self.block_hashes.get(rpc_head_hash).unwrap(); - if highest_work_block.is_none() - || rpc_head_block.total_difficulty - > highest_work_block.as_ref().unwrap().total_difficulty - { - highest_work_block = Some(rpc_head_block); + match &rpc_head_block.total_difficulty { + None => { + // no total difficulty + // TODO: should we fetch the block here? I think this shouldn't happen + warn!(?rpc, %rpc_head_hash, "block is missing total difficulty"); + continue; + } + Some(td) => { + if highest_work_block.is_none() + || td + > highest_work_block + .as_ref() + .expect("there should always be a block here") + .total_difficulty + .as_ref() + .expect("there should always be total difficulty here") + { + highest_work_block = Some(rpc_head_block); + } + } } } @@ -422,16 +455,18 @@ impl Web3Connections { } if consensus_block_hash == old_head_hash { - debug!(hash=%consensus_block_hash, num=%consensus_block_num, limit=%consensus_sum_soft_limit, "cur consensus head {}/{}/{}", num_synced_rpcs, num_connection_heads, total_rpcs); + debug!(hash=%consensus_block_hash, num=%consensus_block_num, limit=%consensus_sum_soft_limit, %rpc, "cur consensus head {}/{}/{}", num_synced_rpcs, num_connection_heads, total_rpcs); } else if soft_limit_met { // TODO: if new's parent is not old, warn? - debug!(hash=%consensus_block_hash, num=%consensus_block_num, limit=%consensus_sum_soft_limit, "new consensus head {}/{}/{}", num_synced_rpcs, num_connection_heads, total_rpcs); + debug!(hash=%consensus_block_hash, num=%consensus_block_num, limit=%consensus_sum_soft_limit, %rpc, "NEW consensus head {}/{}/{}", num_synced_rpcs, num_connection_heads, total_rpcs); // the head hash changed. forward to any subscribers head_block_sender.send(highest_work_block)?; + + // TODO: do something with pending_tx_sender } else { - warn!(?soft_limit_met, %consensus_block_hash, %old_head_hash, "no consensus head {}/{}/{}", num_synced_rpcs, num_connection_heads, total_rpcs) + warn!(?soft_limit_met, %consensus_block_hash, %old_head_hash, %rpc, "NO consensus head {}/{}/{}", num_synced_rpcs, num_connection_heads, total_rpcs) } Ok(()) diff --git a/web3_proxy/src/rpcs/connection.rs b/web3_proxy/src/rpcs/connection.rs index 97c78e44..f22b1a50 100644 --- a/web3_proxy/src/rpcs/connection.rs +++ b/web3_proxy/src/rpcs/connection.rs @@ -1,9 +1,7 @@ ///! Rate-limited communication with a web3 provider. -use super::blockchain::BlockId; -use super::connections::BlockHashesMap; +use super::blockchain::{ArcBlock, BlockHashesMap, BlockId}; use super::provider::Web3Provider; -use super::request::OpenRequestHandle; -use super::request::OpenRequestResult; +use super::request::{OpenRequestHandle, OpenRequestResult}; use crate::app::{flatten_handle, AnyhowJoinHandle}; use crate::config::BlockAndRpc; use anyhow::Context; @@ -69,13 +67,12 @@ impl Web3Connection { ) -> anyhow::Result<(Arc, AnyhowJoinHandle<()>)> { let hard_limit = hard_limit.map(|(hard_rate_limit, redis_conection)| { // TODO: allow configurable period and max_burst - let period = 1; RedisRateLimit::new( redis_conection, "web3_proxy", &format!("{}:{}", chain_id, url_str), hard_rate_limit, - period, + 60.0, ) }); @@ -273,34 +270,68 @@ impl Web3Connection { #[instrument(skip_all)] async fn send_head_block_result( self: &Arc, - new_head_block: Result>, ProviderError>, + new_head_block: Result, block_sender: &flume::Sender, block_map: BlockHashesMap, ) -> anyhow::Result<()> { match new_head_block { - Ok(new_head_block) => { + Ok(mut new_head_block) => { // TODO: is unwrap_or_default ok? we might have an empty block let new_hash = new_head_block.hash.unwrap_or_default(); + let mut td_is_needed = new_head_block.total_difficulty.is_none(); + // if we already have this block saved, we don't need to store this copy - let new_head_block = match block_map.entry(new_hash) { - Entry::Occupied(x) => x.get().clone(), + // be careful with the entry api! awaits during this are a very bad idea. + new_head_block = match block_map.entry(new_hash) { Entry::Vacant(x) => { - // TODO: remove this once https://github.com/ledgerwatch/erigon/issues/5190 is closed - // TODO: include transactions? - let new_head_block = if new_head_block.total_difficulty.is_none() { - self.wait_for_request_handle() - .await? - .request("eth_getBlockByHash", (new_hash, false)) - .await? + // only save the block if it has a total difficulty! + if !td_is_needed { + x.insert(new_head_block).clone() } else { new_head_block - }; + } + } + Entry::Occupied(x) => { + let existing_block = x.get().clone(); - x.insert(new_head_block).clone() + // we only save blocks with a total difficulty + debug_assert!(existing_block.total_difficulty.is_some()); + td_is_needed = false; + + existing_block } }; + if td_is_needed { + // self got the head block first. unfortunately its missing a necessary field + // keep this even after https://github.com/ledgerwatch/erigon/issues/5190 is closed. + // there are other clients and we might have to use a third party without the td fix. + trace!(rpc=?self, ?new_hash, "total_difficulty missing"); + // todo: this can wait forever + let complete_head_block: Block = self + .wait_for_request_handle() + .await? + .request("eth_getBlockByHash", (new_hash, false)) + .await?; + + new_head_block = match block_map.entry(new_hash) { + Entry::Vacant(x) => { + // still vacant! self is still the leader + // now we definitely have total difficulty, so save + x.insert(Arc::new(complete_head_block)).clone() + } + Entry::Occupied(x) => { + let existing_block = x.get().clone(); + + // we only save blocks with a total difficulty + debug_assert!(existing_block.total_difficulty.is_some()); + + existing_block + } + }; + } + let new_num = new_head_block.number.unwrap_or_default(); // save the block so we don't send the same one multiple times @@ -483,6 +514,7 @@ impl Web3Connection { } } Web3Provider::Ws(provider) => { + // todo: move subscribe_blocks onto the request handle? let active_request_handle = self.wait_for_request_handle().await; let mut stream = provider.subscribe_blocks().await?; drop(active_request_handle); @@ -490,7 +522,7 @@ impl Web3Connection { // query the block once since the subscription doesn't send the current block // there is a very small race condition here where the stream could send us a new block right now // all it does is print "new block" for the same block as current block - let block: Result>, _> = self + let block: Result = self .wait_for_request_handle() .await? .request("eth_getBlockByNumber", ("latest", false)) @@ -580,42 +612,48 @@ impl Web3Connection { Ok(()) } - /// be careful with this; it will wait forever! + /// be careful with this; it might wait forever! // TODO: maximum wait time? - #[instrument(skip_all)] + #[instrument] pub async fn wait_for_request_handle(self: &Arc) -> anyhow::Result { // TODO: maximum wait time? i think timeouts in other parts of the code are probably best loop { - match self.try_request_handle().await { + let x = self.try_request_handle().await; + + trace!(?x, "try_request_handle"); + + match x { Ok(OpenRequestResult::Handle(handle)) => return Ok(handle), Ok(OpenRequestResult::RetryAt(retry_at)) => { // TODO: emit a stat? + trace!(?retry_at); sleep_until(retry_at).await; } - Ok(OpenRequestResult::None) => { + Ok(OpenRequestResult::RetryNever) => { // TODO: when can this happen? log? emit a stat? // TODO: subscribe to the head block on this // TODO: sleep how long? maybe just error? - sleep(Duration::from_secs(1)).await; + return Err(anyhow::anyhow!("unable to retry")); } Err(err) => return Err(err), } } } + #[instrument] pub async fn try_request_handle(self: &Arc) -> anyhow::Result { // check that we are connected if !self.has_provider().await { // TODO: emit a stat? - return Ok(OpenRequestResult::None); + return Ok(OpenRequestResult::RetryNever); } // check rate limits if let Some(ratelimiter) = self.hard_limit.as_ref() { match ratelimiter.throttle().await { Ok(ThrottleResult::Allowed) => { - // rate limit succeeded + trace!("rate limit succeeded") } Ok(ThrottleResult::RetryAt(retry_at)) => { // rate limit failed diff --git a/web3_proxy/src/rpcs/connections.rs b/web3_proxy/src/rpcs/connections.rs index 5162ccf3..221991f0 100644 --- a/web3_proxy/src/rpcs/connections.rs +++ b/web3_proxy/src/rpcs/connections.rs @@ -1,9 +1,10 @@ ///! Load balanced communication with a group of web3 providers +use super::blockchain::{ArcBlock, BlockHashesMap}; use super::connection::Web3Connection; use super::request::{OpenRequestHandle, OpenRequestResult}; use super::synced_connections::SyncedConnections; use crate::app::{flatten_handle, AnyhowJoinHandle}; -use crate::config::{BlockAndRpc, Web3ConnectionConfig}; +use crate::config::{BlockAndRpc, TxHashAndRpc, Web3ConnectionConfig}; use crate::jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest}; use crate::rpcs::transactions::TxStatus; use arc_swap::ArcSwap; @@ -30,8 +31,6 @@ use tokio::time::{interval, sleep, sleep_until, MissedTickBehavior}; use tokio::time::{Duration, Instant}; use tracing::{error, info, instrument, trace, warn}; -pub type BlockHashesMap = Arc>>>; - /// A collection of web3 connections. Sends requests either the current best server or all servers. #[derive(From)] pub struct Web3Connections { @@ -59,7 +58,7 @@ impl Web3Connections { http_client: Option, redis_client_pool: Option, block_map: BlockHashesMap, - head_block_sender: Option>>>, + head_block_sender: Option>, min_sum_soft_limit: u32, min_synced_rpcs: u32, pending_tx_sender: Option>, @@ -207,9 +206,9 @@ impl Web3Connections { /// transaction ids from all the `Web3Connection`s are deduplicated and forwarded to `pending_tx_sender` async fn subscribe( self: Arc, - pending_tx_id_receiver: flume::Receiver<(TxHash, Arc)>, + pending_tx_id_receiver: flume::Receiver, block_receiver: flume::Receiver, - head_block_sender: Option>>>, + head_block_sender: Option>, pending_tx_sender: Option>, ) -> anyhow::Result<()> { let mut futures = vec![]; @@ -407,7 +406,7 @@ impl Web3Connections { Ok(OpenRequestResult::RetryAt(retry_at)) => { earliest_retry_at = earliest_retry_at.min(Some(retry_at)); } - Ok(OpenRequestResult::None) => { + Ok(OpenRequestResult::RetryNever) => { // TODO: log a warning? } Err(err) => { @@ -450,7 +449,7 @@ impl Web3Connections { earliest_retry_at = earliest_retry_at.min(Some(retry_at)); } Ok(OpenRequestResult::Handle(handle)) => selected_rpcs.push(handle), - Ok(OpenRequestResult::None) => { + Ok(OpenRequestResult::RetryNever) => { warn!("no request handle for {}", connection) } Err(err) => { @@ -547,7 +546,7 @@ impl Web3Connections { continue; } - Ok(OpenRequestResult::None) => { + Ok(OpenRequestResult::RetryNever) => { warn!(?self, "No server handles!"); // TODO: subscribe to something on synced connections. maybe it should just be a watch channel diff --git a/web3_proxy/src/rpcs/request.rs b/web3_proxy/src/rpcs/request.rs index 82190ed6..168f9057 100644 --- a/web3_proxy/src/rpcs/request.rs +++ b/web3_proxy/src/rpcs/request.rs @@ -3,19 +3,21 @@ use super::provider::Web3Provider; use std::fmt; use std::sync::atomic; use std::sync::Arc; -use tokio::time::Instant; +use tokio::time::{sleep, Duration, Instant}; +use tracing::warn; use tracing::{instrument, trace}; -// TODO: rename this +#[derive(Debug)] pub enum OpenRequestResult { Handle(OpenRequestHandle), /// Unable to start a request. Retry at the given time. RetryAt(Instant), /// Unable to start a request. Retrying will not succeed. - None, + RetryNever, } /// Make RPC requests through this handle and drop it when you are done. +#[derive(Debug)] pub struct OpenRequestHandle(Arc); impl OpenRequestHandle { @@ -47,16 +49,20 @@ impl OpenRequestHandle { R: serde::Serialize + serde::de::DeserializeOwned + fmt::Debug, { // TODO: use tracing spans properly - // TODO: it would be nice to have the request id on this + // TODO: requests from customers have request ids, but we should add // TODO: including params in this is way too verbose trace!("Sending {} to {}", method, self.0); let mut provider = None; while provider.is_none() { - // TODO: if no provider, don't unwrap. wait until there is one. match self.0.provider.read().await.as_ref() { - None => {} + None => { + warn!(rpc=?self.0, "no provider!"); + // TODO: how should this work? a reconnect should be in progress. but maybe force one now? + // TODO: sleep how long? subscribe to something instead? + sleep(Duration::from_millis(100)).await + } Some(found_provider) => provider = Some(found_provider.clone()), } } diff --git a/web3_proxy/src/rpcs/synced_connections.rs b/web3_proxy/src/rpcs/synced_connections.rs index 8e4a75f9..7dc61be7 100644 --- a/web3_proxy/src/rpcs/synced_connections.rs +++ b/web3_proxy/src/rpcs/synced_connections.rs @@ -10,7 +10,7 @@ use std::sync::Arc; /// Serialize is so we can print it on our debug endpoint #[derive(Clone, Default, Serialize)] pub struct SyncedConnections { - // TODO: store Arc> instead? + // TODO: store ArcBlock instead? pub(super) head_block_num: U64, pub(super) head_block_hash: H256, // TODO: this should be able to serialize, but it isn't