From bc91bd1c6fe9ccc6f84af2fe19805b1ef16131e9 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Thu, 5 May 2022 19:06:03 +0000 Subject: [PATCH] first pass at caching i'm confused. i had it over 100k connections on friday, but now even when i go back to those commits, i can't get that high --- Cargo.lock | 390 ++++++++++++++++++++++------ Cargo.toml | 34 +-- data/config/example.toml | 29 --- data/config/production.toml | 43 --- data/wrk/getBlockNumber.lua | 3 - data/wrk/getLatestBlockByNumber.lua | 3 - examples/subscribe_blocks.rs | 29 --- examples/watch_blocks.rs | 30 --- src/config.rs | 58 ----- src/connection.rs | 310 ---------------------- src/connections.rs | 315 ---------------------- src/main.rs | 267 ------------------- 12 files changed, 315 insertions(+), 1196 deletions(-) delete mode 100644 data/config/example.toml delete mode 100644 data/config/production.toml delete mode 100644 data/wrk/getBlockNumber.lua delete mode 100644 data/wrk/getLatestBlockByNumber.lua delete mode 100644 examples/subscribe_blocks.rs delete mode 100644 examples/watch_blocks.rs delete mode 100644 src/config.rs delete mode 100644 src/connection.rs delete mode 100644 src/connections.rs delete mode 100644 src/main.rs diff --git a/Cargo.lock b/Cargo.lock index 116e2655..85ac1fd1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -24,6 +24,17 @@ dependencies = [ "opaque-debug 0.3.0", ] +[[package]] +name = "ahash" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fcb51a0695d8f838b1ee009b3fbf66bda078cd64590202a864a8f3e8c4315c47" +dependencies = [ + "getrandom", + "once_cell", + "version_check", +] + [[package]] name = "aho-corasick" version = "0.7.18" @@ -310,6 +321,18 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "771fe0050b883fcc3ea2359b1a96bcfbc090b7116eae7c3c512c7a083fdf23d3" +[[package]] +name = "bstr" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba3569f383e8f1598449f1a423e72e99569137b47740b1da11ef19af3d5c3223" +dependencies = [ + "lazy_static", + "memchr", + "regex-automata", + "serde", +] + [[package]] name = "buf_redux" version = "0.8.4" @@ -384,6 +407,15 @@ dependencies = [ "serde_json", ] +[[package]] +name = "cast" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c24dab4283a142afa2fdca129b80ad2c6284e073930f964c3a1293c225ee39a" +dependencies = [ + "rustc_version", +] + [[package]] name = "cc" version = "1.0.73" @@ -407,9 +439,20 @@ dependencies = [ [[package]] name = "clap" -version = "3.1.12" +version = "2.34.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7c167e37342afc5f33fd87bbc870cedd020d2a6dffa05d45ccd9241fbdd146db" +checksum = "a0610544180c38b88101fecf2dd634b174a62eef6946f84dfc6a7127512b381c" +dependencies = [ + "bitflags", + "textwrap 0.11.0", + "unicode-width", +] + +[[package]] +name = "clap" +version = "3.1.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85a35a599b11c089a7f49105658d089b8f2cf0882993c17daf6de15285c2c35d" dependencies = [ "atty", "bitflags", @@ -419,7 +462,7 @@ dependencies = [ "lazy_static", "strsim", "termcolor", - "textwrap", + "textwrap 0.15.0", ] [[package]] @@ -437,9 +480,9 @@ dependencies = [ [[package]] name = "clap_lex" -version = "0.1.1" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "189ddd3b5d32a70b35e7686054371742a937b0d99128e76dde6340210e966669" +checksum = "a37c35f1112dad5e6e0b1adaff798507497a18fceeb30cceb3bae7d1427b9213" dependencies = [ "os_str_bytes", ] @@ -583,6 +626,42 @@ dependencies = [ "libc", ] +[[package]] +name = "criterion" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1604dafd25fba2fe2d5895a9da139f8dc9b319a5fe5354ca137cbbce4e178d10" +dependencies = [ + "atty", + "cast", + "clap 2.34.0", + "criterion-plot", + "csv", + "itertools", + "lazy_static", + "num-traits", + "oorandom", + "plotters", + "rayon", + "regex", + "serde", + "serde_cbor", + "serde_derive", + "serde_json", + "tinytemplate", + "walkdir", +] + +[[package]] +name = "criterion-plot" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d00996de9f2f7559f7f4dc286073197f83e92256a59ed395f9aac01fe717da57" +dependencies = [ + "cast", + "itertools", +] + [[package]] name = "crossbeam-channel" version = "0.5.4" @@ -676,6 +755,28 @@ dependencies = [ "subtle", ] +[[package]] +name = "csv" +version = "1.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22813a6dc45b335f9bade10bf7271dc477e81113e89eb251a0bc2a8a81c536e1" +dependencies = [ + "bstr", + "csv-core", + "itoa 0.4.8", + "ryu", + "serde", +] + +[[package]] +name = "csv-core" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b2466559f260f48ad25fe6317b3c8dac77b5bdb5763ac7d9d6103530663bc90" +dependencies = [ + "memchr", +] + [[package]] name = "ctr" version = "0.8.0" @@ -687,13 +788,13 @@ dependencies = [ [[package]] name = "dashmap" -version = "5.2.0" +version = "5.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c8858831f7781322e539ea39e72449c46b059638250c14344fec8d0aa6e539c" +checksum = "391b56fbd302e585b7a9494fb70e40949567b1cf9003a8e4a6041a1687c26573" dependencies = [ "cfg-if", - "num_cpus", - "parking_lot 0.12.0", + "hashbrown 0.12.1", + "lock_api", ] [[package]] @@ -921,7 +1022,7 @@ dependencies = [ [[package]] name = "ethers" version = "0.6.0" -source = "git+https://github.com/gakonst/ethers-rs#c81254a8b61bed1e20f29a72cc2e23454930348f" +source = "git+https://github.com/gakonst/ethers-rs#2768a6b4d7dee812d7de0a1e336214bd46e9bacb" dependencies = [ "ethers-addressbook", "ethers-contract", @@ -936,7 +1037,7 @@ dependencies = [ [[package]] name = "ethers-addressbook" version = "0.1.0" -source = "git+https://github.com/gakonst/ethers-rs#c81254a8b61bed1e20f29a72cc2e23454930348f" +source = "git+https://github.com/gakonst/ethers-rs#2768a6b4d7dee812d7de0a1e336214bd46e9bacb" dependencies = [ "ethers-core", "once_cell", @@ -947,7 +1048,7 @@ dependencies = [ [[package]] name = "ethers-contract" version = "0.6.0" -source = "git+https://github.com/gakonst/ethers-rs#c81254a8b61bed1e20f29a72cc2e23454930348f" +source = "git+https://github.com/gakonst/ethers-rs#2768a6b4d7dee812d7de0a1e336214bd46e9bacb" dependencies = [ "ethers-contract-abigen", "ethers-contract-derive", @@ -965,7 +1066,7 @@ dependencies = [ [[package]] name = "ethers-contract-abigen" version = "0.6.0" -source = "git+https://github.com/gakonst/ethers-rs#c81254a8b61bed1e20f29a72cc2e23454930348f" +source = "git+https://github.com/gakonst/ethers-rs#2768a6b4d7dee812d7de0a1e336214bd46e9bacb" dependencies = [ "Inflector", "cfg-if", @@ -987,7 +1088,7 @@ dependencies = [ [[package]] name = "ethers-contract-derive" version = "0.6.0" -source = "git+https://github.com/gakonst/ethers-rs#c81254a8b61bed1e20f29a72cc2e23454930348f" +source = "git+https://github.com/gakonst/ethers-rs#2768a6b4d7dee812d7de0a1e336214bd46e9bacb" dependencies = [ "ethers-contract-abigen", "ethers-core", @@ -1001,7 +1102,7 @@ dependencies = [ [[package]] name = "ethers-core" version = "0.6.0" -source = "git+https://github.com/gakonst/ethers-rs#c81254a8b61bed1e20f29a72cc2e23454930348f" +source = "git+https://github.com/gakonst/ethers-rs#2768a6b4d7dee812d7de0a1e336214bd46e9bacb" dependencies = [ "arrayvec", "bytes", @@ -1027,7 +1128,7 @@ dependencies = [ [[package]] name = "ethers-etherscan" version = "0.2.0" -source = "git+https://github.com/gakonst/ethers-rs#c81254a8b61bed1e20f29a72cc2e23454930348f" +source = "git+https://github.com/gakonst/ethers-rs#2768a6b4d7dee812d7de0a1e336214bd46e9bacb" dependencies = [ "ethers-core", "ethers-solc", @@ -1042,7 +1143,7 @@ dependencies = [ [[package]] name = "ethers-middleware" version = "0.6.0" -source = "git+https://github.com/gakonst/ethers-rs#c81254a8b61bed1e20f29a72cc2e23454930348f" +source = "git+https://github.com/gakonst/ethers-rs#2768a6b4d7dee812d7de0a1e336214bd46e9bacb" dependencies = [ "async-trait", "ethers-contract", @@ -1065,7 +1166,7 @@ dependencies = [ [[package]] name = "ethers-providers" version = "0.6.0" -source = "git+https://github.com/gakonst/ethers-rs#c81254a8b61bed1e20f29a72cc2e23454930348f" +source = "git+https://github.com/gakonst/ethers-rs#2768a6b4d7dee812d7de0a1e336214bd46e9bacb" dependencies = [ "async-trait", "auto_impl", @@ -1100,7 +1201,7 @@ dependencies = [ [[package]] name = "ethers-signers" version = "0.6.0" -source = "git+https://github.com/gakonst/ethers-rs#c81254a8b61bed1e20f29a72cc2e23454930348f" +source = "git+https://github.com/gakonst/ethers-rs#2768a6b4d7dee812d7de0a1e336214bd46e9bacb" dependencies = [ "async-trait", "coins-bip32", @@ -1117,7 +1218,7 @@ dependencies = [ [[package]] name = "ethers-solc" version = "0.3.0" -source = "git+https://github.com/gakonst/ethers-rs#c81254a8b61bed1e20f29a72cc2e23454930348f" +source = "git+https://github.com/gakonst/ethers-rs#2768a6b4d7dee812d7de0a1e336214bd46e9bacb" dependencies = [ "colored", "dunce", @@ -1170,9 +1271,9 @@ dependencies = [ [[package]] name = "ff" -version = "0.11.0" +version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b2958d04124b9f27f175eaeb9a9f383d026098aa837eadd8ba22c11f13a05b9e" +checksum = "131655483be284720a17d74ff97592b8e76576dc25563148601df2d7c9080924" dependencies = [ "rand_core", "subtle", @@ -1192,9 +1293,9 @@ dependencies = [ [[package]] name = "fixedbitset" -version = "0.2.0" +version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "37ab347416e802de484e4d03c7316c48f1ecb56574dfd4a46a80f173ce1de04d" +checksum = "279fb028e20b3c4c320317955b77c5e0c9701f05a1d309905d6fc702cdc5053e" [[package]] name = "flume" @@ -1446,12 +1547,27 @@ dependencies = [ "tracing", ] +[[package]] +name = "half" +version = "1.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eabb4a44450da02c90444cf74558da904edde8fb4e9035a9a6a4e15445af0bd7" + [[package]] name = "hashbrown" version = "0.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e" +[[package]] +name = "hashbrown" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db0d4cf898abf0081f964436dc980e96670a0f36863e4b83aaacdb65c9d7ccc3" +dependencies = [ + "ahash", +] + [[package]] name = "hashers" version = "1.0.1" @@ -1461,6 +1577,15 @@ dependencies = [ "fxhash", ] +[[package]] +name = "hashlink" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d452c155cb93fecdfb02a73dd57b5d8e442c2063bd7aac72f1bc5e4263a43086" +dependencies = [ + "hashbrown 0.12.1", +] + [[package]] name = "headers" version = "0.3.7" @@ -1552,7 +1677,7 @@ checksum = "ff8670570af52249509a86f5e3e18a08c60b177071826898fde8997cf5f6bfbb" dependencies = [ "bytes", "fnv", - "itoa", + "itoa 1.0.1", ] [[package]] @@ -1593,7 +1718,7 @@ dependencies = [ "http-body", "httparse", "httpdate", - "itoa", + "itoa 1.0.1", "pin-project-lite", "socket2", "tokio", @@ -1690,7 +1815,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0f647032dfaa1f8b6dc29bd3edb7bbef4861b8b8007ebb118d6db284fd59f6ee" dependencies = [ "autocfg", - "hashbrown", + "hashbrown 0.11.2", ] [[package]] @@ -1732,6 +1857,12 @@ dependencies = [ "either", ] +[[package]] +name = "itoa" +version = "0.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b71991ff56294aa922b450139ee08b3bfc70982c6b2c7562771375cf73542dd4" + [[package]] name = "itoa" version = "1.0.1" @@ -1769,9 +1900,9 @@ checksum = "67c21572b4949434e4fc1e1978b99c5f77064153c59d998bf13ecd96fb5ecba7" [[package]] name = "lalrpop" -version = "0.19.7" +version = "0.19.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "852b75a095da6b69da8c5557731c3afd06525d4f655a4fc1c799e2ec8bc4dce4" +checksum = "b30455341b0e18f276fa64540aff54deafb54c589de6aca68659c63dd2d5d823" dependencies = [ "ascii-canvas", "atty", @@ -1792,9 +1923,9 @@ dependencies = [ [[package]] name = "lalrpop-util" -version = "0.19.7" +version = "0.19.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d6d265705249fe209280676d8f68887859fa42e1d34f342fc05bd47726a5e188" +checksum = "bcf796c978e9b4d983414f4caedc9273aa33ee214c5b887bd55fde84c85d2dc4" dependencies = [ "regex", ] @@ -1807,9 +1938,26 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "libc" -version = "0.2.124" +version = "0.2.125" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21a41fed9d98f27ab1c6d161da622a4fa35e8a54a8adc24bbf3ddd0ef70b0e50" +checksum = "5916d2ae698f6de9bfb891ad7a8d65c09d232dc58cc4ac433c7da3b2fd84bc2b" + +[[package]] +name = "linked-hash-map" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7fb9b38af92608140b86b693604b9ffcc5824240a484d1ecd4795bacb2fe88f3" + +[[package]] +name = "linkedhashmap" +version = "0.2.0" +dependencies = [ + "criterion", + "hashbrown 0.12.1", + "hashlink", + "linked-hash-map", + "slab", +] [[package]] name = "lock_api" @@ -1823,9 +1971,9 @@ dependencies = [ [[package]] name = "log" -version = "0.4.16" +version = "0.4.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6389c490849ff5bc16be905ae24bc913a9c8892e19b2341dbc175e14c341c2b8" +checksum = "abb12e687cfb44aa40f41fc3978ef76448f9b6038cad6aef4259d3c095a2382e" dependencies = [ "cfg-if", ] @@ -1856,9 +2004,9 @@ dependencies = [ [[package]] name = "memchr" -version = "2.4.1" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "308cc39be01b73d0d18f82a0e7b2a3df85245f84af96fdddc5d202d27e47b86a" +checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" [[package]] name = "memoffset" @@ -1993,9 +2141,9 @@ dependencies = [ [[package]] name = "num-integer" -version = "0.1.44" +version = "0.1.45" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d2cc698a63b549a70bc047073d2949cce27cd1c7b0a4a862d08a8031bc2801db" +checksum = "225d3389fb3509a24c93f5c29eb6bde2586b98d9f016636dff58d7c6f7569cd9" dependencies = [ "autocfg", "num-traits", @@ -2015,9 +2163,9 @@ dependencies = [ [[package]] name = "num-traits" -version = "0.2.14" +version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a64b1ec5cda2586e284722486d802acf1f7dbdc623e2bfc57e65ca1cd099290" +checksum = "578ede34cf02f8924ab9447f50c28075b4d3e5b269972345e7e0372b38c6cdcd" dependencies = [ "autocfg", ] @@ -2044,6 +2192,12 @@ version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "87f3e037eac156d1775da914196f0f37741a274155e34a0b7e427c35d2a2ecb9" +[[package]] +name = "oorandom" +version = "11.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575" + [[package]] name = "opaque-debug" version = "0.2.3" @@ -2058,18 +2212,30 @@ checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" [[package]] name = "openssl" -version = "0.10.38" +version = "0.10.40" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c7ae222234c30df141154f159066c5093ff73b63204dcda7121eb082fc56a95" +checksum = "fb81a6430ac911acb25fe5ac8f1d2af1b4ea8a4fdfda0f1ee4292af2e2d8eb0e" dependencies = [ "bitflags", "cfg-if", "foreign-types", "libc", "once_cell", + "openssl-macros", "openssl-sys", ] +[[package]] +name = "openssl-macros" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b501e44f11665960c7e7fcf062c7d96a14ade4aa98116c004b2e37b5be7d736c" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "openssl-probe" version = "0.1.5" @@ -2078,9 +2244,9 @@ checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" [[package]] name = "openssl-sys" -version = "0.9.72" +version = "0.9.73" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e46109c383602735fa0a2e48dd2b7c892b048e1bf69e5c3b1d804b7d9c203cb" +checksum = "9d5fd19fb3e0a8191c1e34935718976a3e70c112ab9a24af6d7cadccd9d90bc0" dependencies = [ "autocfg", "cc", @@ -2139,7 +2305,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "87f5ec2493a61ac0506c0f4199f99070cbe83857b0337006a30f3e6719b8ef58" dependencies = [ "lock_api", - "parking_lot_core 0.9.2", + "parking_lot_core 0.9.3", ] [[package]] @@ -2158,9 +2324,9 @@ dependencies = [ [[package]] name = "parking_lot_core" -version = "0.9.2" +version = "0.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "995f667a6c822200b0433ac218e05582f0e2efa1b922a3fd2fbaadc5f87bab37" +checksum = "09a279cbf25cb0757810394fbc1e359949b59e348145c643a939a525692e6929" dependencies = [ "cfg-if", "libc", @@ -2230,9 +2396,9 @@ checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" [[package]] name = "petgraph" -version = "0.5.1" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "467d164a6de56270bd7c4d070df81d07beace25012d5103ced4e9ff08d6afdb7" +checksum = "4a13a2fa9d0b63e5f22328828741e523766fff0ee9e779316902290dff3f824f" dependencies = [ "fixedbitset", "indexmap", @@ -2347,6 +2513,34 @@ version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1df8c4ec4b0627e53bdf214615ad287367e482558cf84b109250b37464dc03ae" +[[package]] +name = "plotters" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32a3fd9ec30b9749ce28cd91f255d569591cdf937fe280c312143e3c4bad6f2a" +dependencies = [ + "num-traits", + "plotters-backend", + "plotters-svg", + "wasm-bindgen", + "web-sys", +] + +[[package]] +name = "plotters-backend" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d88417318da0eaf0fdcdb51a0ee6c3bed624333bff8f946733049380be67ac1c" + +[[package]] +name = "plotters-svg" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "521fa9638fa597e1dc53e9412a4f9cefb01187ee1f7413076f9e6749e2885ba9" +dependencies = [ + "plotters-backend", +] + [[package]] name = "ppv-lite86" version = "0.2.16" @@ -2558,6 +2752,12 @@ dependencies = [ "regex-syntax", ] +[[package]] +name = "regex-automata" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" + [[package]] name = "regex-syntax" version = "0.6.25" @@ -2833,9 +3033,9 @@ dependencies = [ [[package]] name = "semver" -version = "1.0.7" +version = "1.0.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d65bd28f48be7196d222d95b9243287f48d27aca604e08497513019ff0502cc4" +checksum = "8cb243bdfdb5936c8dc3c45762a19d12ab4550cdc753bc247637d4ec35a040fd" dependencies = [ "serde", ] @@ -2865,6 +3065,16 @@ dependencies = [ "serde_json", ] +[[package]] +name = "serde_cbor" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2bef2ebfde456fb76bbcf9f59315333decc4fda0b2b44b420243c11e0f5ec1f5" +dependencies = [ + "half", + "serde", +] + [[package]] name = "serde_derive" version = "1.0.137" @@ -2882,7 +3092,7 @@ version = "1.0.81" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b7ce2b32a1aed03c558dc61a5cd328f15aff2dbc17daad8fb8af04d2100e15c" dependencies = [ - "itoa", + "itoa 1.0.1", "ryu", "serde", ] @@ -2894,7 +3104,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" dependencies = [ "form_urlencoded", - "itoa", + "itoa 1.0.1", "ryu", "serde", ] @@ -3115,7 +3325,7 @@ source = "git+https://github.com/roynalnaruto/svm-rs#1eb7f4e2ccb549096885ded598f dependencies = [ "anyhow", "cfg-if", - "clap", + "clap 3.1.15", "console 0.14.1", "dialoguer", "fs2", @@ -3139,9 +3349,9 @@ dependencies = [ [[package]] name = "syn" -version = "1.0.91" +version = "1.0.92" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b683b2b825c8eef438b77c36a06dc262294da3d5a5813fac20da149241dcd44d" +checksum = "7ff7c592601f11445996a06f8ad0c27f094a58857c2f89e97974ab9235b92c52" dependencies = [ "proc-macro2", "quote", @@ -3198,6 +3408,15 @@ dependencies = [ "winapi", ] +[[package]] +name = "textwrap" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d326610f408c7a4eb6f51c37c330e496b08506c9457c9d34287ecc38809fb060" +dependencies = [ + "unicode-width", +] + [[package]] name = "textwrap" version = "0.15.0" @@ -3206,18 +3425,18 @@ checksum = "b1141d4d61095b28419e22cb0bbf02755f5e54e0526f97f1e3d1d160e60885fb" [[package]] name = "thiserror" -version = "1.0.30" +version = "1.0.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "854babe52e4df1653706b98fcfc05843010039b406875930a70e4d9644e5c417" +checksum = "bd829fe32373d27f76265620b5309d0340cb8550f523c1dda251d6298069069a" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.30" +version = "1.0.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aa32fd3f627f367fe16f893e2597ae3c05020f8bba2666a4e6ea73d377e5714b" +checksum = "0396bc89e626244658bef819e22d0cc459e795a5ebe878e6ec336d1674a8d79a" dependencies = [ "proc-macro2", "quote", @@ -3242,6 +3461,16 @@ dependencies = [ "crunchy", ] +[[package]] +name = "tinytemplate" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be4d6b5f19ff7664e8c98d03e2139cb510db9b0a60b55f8e8709b689d939b6bc" +dependencies = [ + "serde", + "serde_json", +] + [[package]] name = "tinyvec" version = "1.6.0" @@ -3300,9 +3529,9 @@ dependencies = [ [[package]] name = "tokio-rustls" -version = "0.23.3" +version = "0.23.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4151fda0cf2798550ad0b34bcfc9b9dcc2a9d2471c895c68f3a8818e54f2389e" +checksum = "c43ee83903113e03984cb9e5cebe6c04a5116269e900e3ddba8f068a62adda59" dependencies = [ "rustls", "tokio", @@ -3572,9 +3801,9 @@ checksum = "3ed742d4ea2bd1176e236172c8429aaf54486e7ac098db29ffe6529e0ce50973" [[package]] name = "unicode-xid" -version = "0.2.2" +version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ccb82d61f80a663efe1f787a51b16b5a51e3314d6ac365b08639f52387b33f3" +checksum = "957e51f3646910546462e67d5f7599b9e4fb8acdd304b087a6494730f9eebf04" [[package]] name = "untrusted" @@ -3793,8 +4022,9 @@ dependencies = [ "ethers", "flume", "futures", - "fxhash", "governor", + "hashbrown 0.12.1", + "linkedhashmap", "parking_lot 0.12.0", "regex", "reqwest", @@ -3861,9 +4091,9 @@ checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" [[package]] name = "windows-sys" -version = "0.34.0" +version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5acdd78cb4ba54c0045ac14f62d8f94a03d10047904ae2a40afa1e99d8f70825" +checksum = "ea04155a16a59f9eab786fe12a4a450e75cdb175f9e0d80da1e17db09f55b8d2" dependencies = [ "windows_aarch64_msvc", "windows_i686_gnu", @@ -3874,33 +4104,33 @@ dependencies = [ [[package]] name = "windows_aarch64_msvc" -version = "0.34.0" +version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17cffbe740121affb56fad0fc0e421804adf0ae00891205213b5cecd30db881d" +checksum = "9bb8c3fd39ade2d67e9874ac4f3db21f0d710bee00fe7cab16949ec184eeaa47" [[package]] name = "windows_i686_gnu" -version = "0.34.0" +version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2564fde759adb79129d9b4f54be42b32c89970c18ebf93124ca8870a498688ed" +checksum = "180e6ccf01daf4c426b846dfc66db1fc518f074baa793aa7d9b9aaeffad6a3b6" [[package]] name = "windows_i686_msvc" -version = "0.34.0" +version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9cd9d32ba70453522332c14d38814bceeb747d80b3958676007acadd7e166956" +checksum = "e2e7917148b2812d1eeafaeb22a97e4813dfa60a3f8f78ebe204bcc88f12f024" [[package]] name = "windows_x86_64_gnu" -version = "0.34.0" +version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cfce6deae227ee8d356d19effc141a509cc503dfd1f850622ec4b0f84428e1f4" +checksum = "4dcd171b8776c41b97521e5da127a2d86ad280114807d0b2ab1e462bc764d9e1" [[package]] name = "windows_x86_64_msvc" -version = "0.34.0" +version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d19538ccc21819d01deaf88d6a17eae6596a12e9aafdbb97916fb49896d89de9" +checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680" [[package]] name = "winreg" @@ -3940,6 +4170,6 @@ dependencies = [ [[package]] name = "zeroize" -version = "1.5.4" +version = "1.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7eb5728b8afd3f280a869ce1d4c554ffaed35f45c231fc41bfbd0381bef50317" +checksum = "94693807d016b2f2d2e14420eb3bfcca689311ff775dcf113d74ea624b7cdf07" diff --git a/Cargo.toml b/Cargo.toml index 9e7def07..f72f14b4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,29 +1,5 @@ -[package] -name = "web3-proxy" -version = "0.1.0" -edition = "2021" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -arc-swap = "1.5.0" -argh = "0.1.7" -anyhow = "1.0.57" -derive_more = "0.99.17" -ethers = { git = "https://github.com/gakonst/ethers-rs", features = ["rustls", "ws"] } -flume = "0.10.12" -futures = { version = "0.3.21", features = ["thread-pool"] } -fxhash = "0.2.1" -governor = { version = "0.4.2", features = ["dashmap", "std"] } -tokio = { version = "1.18.1", features = ["full"] } -parking_lot = "0.12.0" -regex = "1.5.5" -reqwest = { version = "0.11.10", features = ["json", "rustls"] } -rustc-hash = "1.1.0" -serde = { version = "1.0.137", features = [] } -serde_json = { version = "1.0.81", default-features = false, features = ["alloc"] } -toml = "0.5.9" -tracing = "0.1.34" -tracing-subscriber = "0.3.11" -url = "2.2.2" -warp = "0.3.2" +[workspace] +members = [ + "linkedhashmap", + "web3-proxy", +] diff --git a/data/config/example.toml b/data/config/example.toml deleted file mode 100644 index a4255426..00000000 --- a/data/config/example.toml +++ /dev/null @@ -1,29 +0,0 @@ -[config] -listen_port = 8445 - -[balanced_rpc_tiers] - -[balanced_rpc_tiers.0] - - [balanced_rpc_tiers.0.geth] - url = "ws://127.0.0.1:8546" - soft_limit = 200_000 - - -[private_rpcs] - - [private_rpcs.eden] - url = "https://api.edennetwork.io/v1/" - soft_limit = 1_805 - - [private_rpcs.eden_beta] - url = "https://api.edennetwork.io/v1/beta" - soft_limit = 5_861 - - [private_rpcs.ethermine] - url = "https://rpc.ethermine.org" - soft_limit = 5_861 - - [private_rpcs.flashbots] - url = "https://rpc.flashbots.net" - soft_limit = 7074 diff --git a/data/config/production.toml b/data/config/production.toml deleted file mode 100644 index ed78a28d..00000000 --- a/data/config/production.toml +++ /dev/null @@ -1,43 +0,0 @@ -[config] -listen_port = 8445 - -[balanced_rpc_tiers] - -[balanced_rpc_tiers.0] - - [balanced_rpc_tiers.0.local_erigon] - url = "ws://127.0.0.1:8545" - soft_limit = 68_000 - - [balanced_rpc_tiers.0.local_geth] - url = "ws://127.0.0.1:8946" - soft_limit = 160_000 - -[balanced_rpc_tiers.1] - - [balanced_rpc_tiers.1.linkpool] - #linkpool is slow and often offline - url = "https://main-rpc.linkpool.io" - soft_limit = 4_779 - - [balanced_rpc_tiers.1.ankr] - url = "https://rpc.ankr.com/eth" - soft_limit = 23_967 - -[private_rpcs] - - [private_rpcs.eden] - url = "https://api.edennetwork.io/v1/" - soft_limit = 1_805 - - [private_rpcs.eden_beta] - url = "https://api.edennetwork.io/v1/beta" - soft_limit = 5_861 - - [private_rpcs.ethermine] - url = "https://rpc.ethermine.org" - soft_limit = 5_861 - - [private_rpcs.flashbots] - url = "https://rpc.flashbots.net" - soft_limit = 7074 diff --git a/data/wrk/getBlockNumber.lua b/data/wrk/getBlockNumber.lua deleted file mode 100644 index e3dd78bd..00000000 --- a/data/wrk/getBlockNumber.lua +++ /dev/null @@ -1,3 +0,0 @@ -wrk.method = "POST" -wrk.body = "{\"jsonrpc\":\"2.0\",\"method\":\"eth_blockNumber\",\"params\":[],\"id\":420}" -wrk.headers["Content-Type"] = "application/json" diff --git a/data/wrk/getLatestBlockByNumber.lua b/data/wrk/getLatestBlockByNumber.lua deleted file mode 100644 index bea1576e..00000000 --- a/data/wrk/getLatestBlockByNumber.lua +++ /dev/null @@ -1,3 +0,0 @@ -wrk.method = "POST" -wrk.body = "{\"jsonrpc\":\"2.0\",\"method\":\"eth_getBlockByNumber\",\"params\":[\"latest\", false],\"id\":420}" -wrk.headers["Content-Type"] = "application/json" diff --git a/examples/subscribe_blocks.rs b/examples/subscribe_blocks.rs deleted file mode 100644 index b4602212..00000000 --- a/examples/subscribe_blocks.rs +++ /dev/null @@ -1,29 +0,0 @@ -/// subscribe to a websocket rpc -use ethers::prelude::*; -use std::time::Duration; - -#[tokio::main] -async fn main() -> anyhow::Result<()> { - // erigon - let url = "ws://10.11.12.16:8545"; - // geth - // let url = "ws://10.11.12.16:8946"; - - println!("Subscribing to blocks from {}", url); - - let provider = Ws::connect(url).await?; - - let provider = Provider::new(provider).interval(Duration::from_secs(1)); - - let mut stream = provider.subscribe_blocks().await?.take(3); - while let Some(block) = stream.next().await { - println!( - "{:?} = Ts: {:?}, block number: {}", - block.hash.unwrap(), - block.timestamp, - block.number.unwrap(), - ); - } - - Ok(()) -} diff --git a/examples/watch_blocks.rs b/examples/watch_blocks.rs deleted file mode 100644 index 7fa3ffe2..00000000 --- a/examples/watch_blocks.rs +++ /dev/null @@ -1,30 +0,0 @@ -/// poll an http rpc -use ethers::prelude::*; -use std::{str::FromStr, time::Duration}; - -#[tokio::main] -async fn main() -> anyhow::Result<()> { - // erigon does not support most filters - // let url = "http://10.11.12.16:8545"; - // geth - let url = "http://10.11.12.16:8945"; - - println!("Watching blocks from {:?}", url); - - let provider = Http::from_str(url)?; - - let provider = Provider::new(provider).interval(Duration::from_secs(1)); - - let mut stream = provider.watch_blocks().await?.take(3); - while let Some(block_number) = stream.next().await { - let block = provider.get_block(block_number).await?.unwrap(); - println!( - "{:?} = Ts: {:?}, block number: {}", - block.hash.unwrap(), - block.timestamp, - block.number.unwrap(), - ); - } - - Ok(()) -} diff --git a/src/config.rs b/src/config.rs deleted file mode 100644 index 8e391266..00000000 --- a/src/config.rs +++ /dev/null @@ -1,58 +0,0 @@ -use governor::clock::QuantaClock; -use serde::Deserialize; -use std::collections::{BTreeMap, HashMap}; -use std::sync::Arc; - -use crate::connection::Web3Connection; -use crate::Web3ProxyApp; - -#[derive(Deserialize)] -pub struct RootConfig { - pub config: Web3ProxyConfig, - // BTreeMap so that iterating keeps the same order - pub balanced_rpc_tiers: BTreeMap>, - pub private_rpcs: HashMap, -} - -#[derive(Deserialize)] -pub struct Web3ProxyConfig { - pub listen_port: u16, -} - -#[derive(Deserialize)] -pub struct Web3ConnectionConfig { - url: String, - soft_limit: u32, - hard_limit: Option, -} - -impl RootConfig { - pub async fn try_build(self) -> anyhow::Result { - let balanced_rpc_tiers = self - .balanced_rpc_tiers - .into_values() - .map(|x| x.into_values().collect()) - .collect(); - let private_rpcs = self.private_rpcs.into_values().collect(); - - Web3ProxyApp::try_new(balanced_rpc_tiers, private_rpcs).await - } -} - -impl Web3ConnectionConfig { - pub async fn try_build( - self, - clock: &QuantaClock, - http_client: Option, - ) -> anyhow::Result> { - Web3Connection::try_new( - self.url, - http_client, - self.hard_limit, - Some(clock), - self.soft_limit, - ) - .await - .map(Arc::new) - } -} diff --git a/src/connection.rs b/src/connection.rs deleted file mode 100644 index 4b9e1b4f..00000000 --- a/src/connection.rs +++ /dev/null @@ -1,310 +0,0 @@ -///! Communicate with a web3 provider -use derive_more::From; -use ethers::prelude::Middleware; -use futures::StreamExt; -use governor::clock::{QuantaClock, QuantaInstant}; -use governor::middleware::NoOpMiddleware; -use governor::state::{InMemoryState, NotKeyed}; -use governor::NotUntil; -use governor::RateLimiter; -use serde::{Deserialize, Serialize}; -use serde_json::value::RawValue; -use std::fmt; -use std::num::NonZeroU32; -use std::sync::atomic::{self, AtomicU32, AtomicU64}; -use std::time::Duration; -use std::{cmp::Ordering, sync::Arc}; -use tokio::time::interval; -use tracing::{info, warn}; - -use crate::connections::Web3Connections; - -type Web3RateLimiter = - RateLimiter>; - -/// TODO: instead of an enum, I tried to use Box, but hit https://github.com/gakonst/ethers-rs/issues/592 -#[derive(From)] -pub enum Web3Provider { - Http(ethers::providers::Provider), - Ws(ethers::providers::Provider), -} - -impl fmt::Debug for Web3Provider { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - // TODO: the default Debug takes forever to write. this is too quiet though. we at least need the url - f.debug_struct("Web3Provider").finish_non_exhaustive() - } -} - -/// An active connection to a Web3Rpc -pub struct Web3Connection { - /// TODO: can we get this from the provider? do we even need it? - url: String, - /// keep track of currently open requests. We sort on this - active_requests: AtomicU32, - provider: Web3Provider, - ratelimiter: Option, - /// used for load balancing to the least loaded server - soft_limit: u32, - head_block_number: AtomicU64, -} - -impl fmt::Debug for Web3Connection { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Web3Connection") - .field("url", &self.url) - .finish_non_exhaustive() - } -} - -impl fmt::Display for Web3Connection { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{}", &self.url) - } -} - -impl Web3Connection { - /// Connect to a web3 rpc and subscribe to new heads - pub async fn try_new( - url_str: String, - http_client: Option, - hard_rate_limit: Option, - clock: Option<&QuantaClock>, - // TODO: think more about this type - soft_limit: u32, - ) -> anyhow::Result { - let hard_rate_limiter = if let Some(hard_rate_limit) = hard_rate_limit { - let quota = governor::Quota::per_second(NonZeroU32::new(hard_rate_limit).unwrap()); - - let rate_limiter = governor::RateLimiter::direct_with_clock(quota, clock.unwrap()); - - Some(rate_limiter) - } else { - None - }; - - let provider = if url_str.starts_with("http") { - let url: url::Url = url_str.parse()?; - - let http_client = http_client.ok_or_else(|| anyhow::anyhow!("no http_client"))?; - - let provider = ethers::providers::Http::new_with_client(url, http_client); - - // TODO: dry this up (needs https://github.com/gakonst/ethers-rs/issues/592) - ethers::providers::Provider::new(provider) - .interval(Duration::from_secs(1)) - .into() - } else if url_str.starts_with("ws") { - let provider = ethers::providers::Ws::connect(url_str.clone()).await?; - - // TODO: make sure this automatically reconnects - - // TODO: dry this up (needs https://github.com/gakonst/ethers-rs/issues/592) - ethers::providers::Provider::new(provider) - .interval(Duration::from_secs(1)) - .into() - } else { - return Err(anyhow::anyhow!("only http and ws servers are supported")); - }; - - Ok(Web3Connection { - url: url_str.clone(), - active_requests: Default::default(), - provider, - ratelimiter: hard_rate_limiter, - soft_limit, - head_block_number: 0.into(), - }) - } - - pub fn active_requests(&self) -> u32 { - self.active_requests.load(atomic::Ordering::Acquire) - } - - pub fn url(&self) -> &str { - &self.url - } - - /// Subscribe to new blocks - // #[instrument] - pub async fn new_heads( - self: Arc, - connections: Option>, - ) -> anyhow::Result<()> { - info!("Watching new_heads on {}", self); - - match &self.provider { - Web3Provider::Http(provider) => { - // there is a "watch_blocks" function, but a lot of public nodes do not support the necessary rpc endpoints - // TODO: what should this interval be? probably some fraction of block time - // TODO: maybe it would be better to have one interval for all of the http providers, but this works for now - let mut interval = interval(Duration::from_secs(2)); - - loop { - // wait for the interval - interval.tick().await; - - let block_number = provider.get_block_number().await.map(|x| x.as_u64())?; - - // TODO: only store if this isn't already stored? - // TODO: also send something to the provider_tier so it can sort? - let old_block_number = self - .head_block_number - .swap(block_number, atomic::Ordering::AcqRel); - - if old_block_number != block_number { - info!("new block on {}: {}", self, block_number); - - if let Some(connections) = &connections { - connections.update_synced_rpcs(&self, block_number)?; - } - } - } - } - Web3Provider::Ws(provider) => { - // TODO: automatically reconnect? - // TODO: it would be faster to get the block number, but subscriptions don't provide that - // TODO: maybe we can do provider.subscribe("newHeads") and then parse into a custom struct that only gets the number out? - let mut stream = provider.subscribe_blocks().await?; - - // query the block once since the subscription doesn't send the current block - // there is a very small race condition here where the stream could send us a new block right now - // all it does is print "new block" for the same block as current block - let block_number = provider.get_block_number().await.map(|x| x.as_u64())?; - - info!("current block on {}: {}", self, block_number); - - self.head_block_number - .store(block_number, atomic::Ordering::Release); - - if let Some(connections) = &connections { - connections.update_synced_rpcs(&self, block_number)?; - } - - while let Some(block) = stream.next().await { - let block_number = block.number.unwrap().as_u64(); - - // TODO: only store if this isn't already stored? - // TODO: also send something to the provider_tier so it can sort? - // TODO: do we need this old block number check? its helpful on http, but here it shouldn't dupe except maybe on the first run - self.head_block_number - .store(block_number, atomic::Ordering::Release); - - info!("new block on {}: {}", self, block_number); - - if let Some(connections) = &connections { - connections.update_synced_rpcs(&self, block_number)?; - } - } - } - } - - info!("Done watching new_heads on {}", self); - - Ok(()) - } - - /// Send a web3 request - pub async fn request( - &self, - method: &str, - params: &serde_json::value::RawValue, - ) -> Result { - match &self.provider { - Web3Provider::Http(provider) => provider.request(method, params).await, - Web3Provider::Ws(provider) => provider.request(method, params).await, - } - } - - pub fn try_inc_active_requests(&self) -> Result<(), NotUntil> { - // check rate limits - if let Some(ratelimiter) = self.ratelimiter.as_ref() { - match ratelimiter.check() { - Ok(_) => { - // rate limit succeeded - } - Err(not_until) => { - // rate limit failed - // save the smallest not_until. if nothing succeeds, return an Err with not_until in it - // TODO: use tracing better - warn!("Exhausted rate limit on {:?}: {}", self, not_until); - - return Err(not_until); - } - } - }; - - // TODO: what ordering?! - self.active_requests.fetch_add(1, atomic::Ordering::AcqRel); - - Ok(()) - } - - pub fn dec_active_requests(&self) { - // TODO: what ordering?! - self.active_requests.fetch_sub(1, atomic::Ordering::AcqRel); - } -} - -impl Eq for Web3Connection {} - -impl Ord for Web3Connection { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - // TODO: what atomic ordering?! - let a = self.active_requests.load(atomic::Ordering::Acquire); - let b = other.active_requests.load(atomic::Ordering::Acquire); - - // TODO: how should we include the soft limit? floats are slower than integer math - let a = a as f32 / self.soft_limit as f32; - let b = b as f32 / other.soft_limit as f32; - - a.partial_cmp(&b).unwrap() - } -} - -impl PartialOrd for Web3Connection { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -/// note that this is just comparing the active requests. two providers with different rpc urls are equal! -impl PartialEq for Web3Connection { - fn eq(&self, other: &Self) -> bool { - // TODO: what ordering?! - self.active_requests.load(atomic::Ordering::Acquire) - == other.active_requests.load(atomic::Ordering::Acquire) - } -} - -#[derive(Clone, Deserialize)] -pub struct JsonRpcRequest { - pub id: Box, - pub method: String, - pub params: Box, -} - -impl fmt::Debug for JsonRpcRequest { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - // TODO: the default formatter takes forever to write. this is too quiet though - f.debug_struct("JsonRpcRequest") - .field("id", &self.id) - .finish_non_exhaustive() - } -} - -// TODO: check for errors too! -#[derive(Clone, Deserialize, Serialize)] -pub struct JsonRpcForwardedResponse { - pub id: Box, - pub result: Box, -} - -impl fmt::Debug for JsonRpcForwardedResponse { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - // TODO: the default formatter takes forever to write. this is too quiet though - f.debug_struct("JsonRpcForwardedResponse") - .field("id", &self.id) - .finish_non_exhaustive() - } -} diff --git a/src/connections.rs b/src/connections.rs deleted file mode 100644 index 5f3ab44c..00000000 --- a/src/connections.rs +++ /dev/null @@ -1,315 +0,0 @@ -///! Communicate with a group of web3 providers -use arc_swap::ArcSwap; -use derive_more::From; -use futures::stream::FuturesUnordered; -use futures::StreamExt; -use fxhash::FxHashMap; -use governor::clock::{QuantaClock, QuantaInstant}; -use governor::NotUntil; -use serde_json::value::RawValue; -use std::cmp; -use std::fmt; -use std::sync::Arc; -use tracing::warn; - -use crate::config::Web3ConnectionConfig; -use crate::connection::{JsonRpcForwardedResponse, Web3Connection}; - -#[derive(Clone, Default)] -struct SyncedConnections { - head_block_number: u64, - inner: Vec, -} - -impl fmt::Debug for SyncedConnections { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - // TODO: the default formatter takes forever to write. this is too quiet though - f.debug_struct("SyncedConnections").finish_non_exhaustive() - } -} - -impl SyncedConnections { - fn new(max_connections: usize) -> Self { - let inner = Vec::with_capacity(max_connections); - - Self { - head_block_number: 0, - inner, - } - } -} - -/// A collection of web3 connections. Sends requests either the current best server or all servers. -#[derive(From)] -pub struct Web3Connections { - inner: Vec>, - /// TODO: what is the best type for this? Heavy reads with writes every few seconds. When writes happen, there is a burst of them - /// TODO: arcswap was a lot faster, but i think we need a lock for proper logic - synced_connections: ArcSwap, -} - -impl fmt::Debug for Web3Connections { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - // TODO: the default formatter takes forever to write. this is too quiet though - f.debug_struct("Web3Connections") - .field("inner", &self.inner) - .finish_non_exhaustive() - } -} - -impl Web3Connections { - pub async fn try_new( - // TODO: servers should be a Web3ConnectionBuilder struct - servers: Vec, - http_client: Option, - clock: &QuantaClock, - ) -> anyhow::Result> { - let mut connections = vec![]; - - let num_connections = servers.len(); - - for server_config in servers.into_iter() { - let connection = server_config.try_build(clock, http_client.clone()).await?; - - connections.push(connection); - } - - let connections = Arc::new(Self { - inner: connections, - synced_connections: ArcSwap::new(Arc::new(SyncedConnections::new(num_connections))), - }); - - for connection in connections.inner.iter() { - // subscribe to new heads in a spawned future - // TODO: channel instead. then we can have one future with write access to a left-right - let connection = Arc::clone(connection); - let connections = connections.clone(); - tokio::spawn(async move { - // TODO: instead of passing Some(connections), pass Some(channel_sender). Then listen on the receiver below to keep local heads up-to-date - if let Err(e) = connection.new_heads(Some(connections)).await { - warn!("new_heads error: {:?}", e); - } - }); - } - - Ok(connections) - } - - pub async fn try_send_request( - &self, - connection: &Web3Connection, - method: &str, - params: &RawValue, - ) -> anyhow::Result { - // connection.in_active_requests was called when this rpc was selected - - let response = connection.request(method, params).await; - - connection.dec_active_requests(); - - // TODO: if "no block with that header" or some other jsonrpc errors, skip this response - - response.map_err(Into::into) - } - - pub async fn try_send_requests( - self: Arc, - connections: Vec>, - method: String, - params: Box, - response_sender: flume::Sender>, - ) -> anyhow::Result<()> { - let mut unordered_futures = FuturesUnordered::new(); - - for connection in connections { - // clone things so we can pass them to a future - let connections = self.clone(); - let method = method.clone(); - let params = params.clone(); - let response_sender = response_sender.clone(); - - let handle = tokio::spawn(async move { - // get the client for this rpc server - let response = connections - .try_send_request(connection.as_ref(), &method, ¶ms) - .await?; - - // send the first good response to a one shot channel. that way we respond quickly - // drop the result because errors are expected after the first send - response_sender.send(Ok(response)).map_err(Into::into) - }); - - unordered_futures.push(handle); - } - - // TODO: use iterators instead of pushing into a vec - let mut errs = vec![]; - if let Some(x) = unordered_futures.next().await { - match x.unwrap() { - Ok(_) => {} - Err(e) => { - // TODO: better errors - warn!("Got an error sending request: {}", e); - errs.push(e); - } - } - } - - // get the first error (if any) - // TODO: why collect multiple errors if we only pop one? - let e = if !errs.is_empty() { - Err(errs.pop().unwrap()) - } else { - Err(anyhow::anyhow!("no successful responses")) - }; - - // send the error to the channel - if response_sender.send(e).is_ok() { - // if we were able to send an error, then we never sent a success - return Err(anyhow::anyhow!("no successful responses")); - } else { - // if sending the error failed. the other side must be closed (which means we sent a success earlier) - Ok(()) - } - } - - pub fn update_synced_rpcs( - &self, - rpc: &Arc, - new_block: u64, - ) -> anyhow::Result<()> { - // TODO: try a left_right instead of an ArcSwap. - let synced_connections = self.synced_connections.load(); - - // should we load new_block here? - - let mut new_synced_connections: SyncedConnections = - match synced_connections.head_block_number.cmp(&new_block) { - cmp::Ordering::Equal => { - // this rpc is synced, but it isn't the first to this block - (**synced_connections).to_owned() - } - cmp::Ordering::Less => { - // this is a new head block. clear the current synced connections - // TODO: this is too verbose with a bunch of tiers. include the tier - // info!("new head block from {:?}: {}", rpc, new_block); - - let mut new_synced_connections = SyncedConnections::new(self.inner.len()); - - // synced_connections.inner.clear(); - - new_synced_connections.head_block_number = new_block; - - new_synced_connections - } - cmp::Ordering::Greater => { - // not the latest block. return now - return Ok(()); - } - }; - - let rpc_index = self - .inner - .iter() - .position(|x| x.url() == rpc.url()) - .unwrap(); - - new_synced_connections.inner.push(rpc_index); - - self.synced_connections - .swap(Arc::new(new_synced_connections)); - - Ok(()) - } - - /// get the best available rpc server - pub async fn next_upstream_server( - &self, - ) -> Result, Option>> { - let mut earliest_not_until = None; - - // TODO: this clone is probably not the best way to do this - let mut synced_rpc_indexes = self.synced_connections.load().inner.clone(); - - let cache: FxHashMap = synced_rpc_indexes - .iter() - .map(|synced_index| { - ( - *synced_index, - self.inner.get(*synced_index).unwrap().active_requests(), - ) - }) - .collect(); - - // TODO: i think we might need to load active connections and then - synced_rpc_indexes.sort_unstable_by(|a, b| { - let a = cache.get(a).unwrap(); - let b = cache.get(b).unwrap(); - - a.cmp(b) - }); - - for selected_rpc in synced_rpc_indexes.into_iter() { - let selected_rpc = self.inner.get(selected_rpc).unwrap(); - - // increment our connection counter - if let Err(not_until) = selected_rpc.try_inc_active_requests() { - earliest_possible(&mut earliest_not_until, not_until); - - continue; - } - - // return the selected RPC - return Ok(selected_rpc.clone()); - } - - // this might be None - Err(earliest_not_until) - } - - /// get all rpc servers that are not rate limited - /// even fetches if they aren't in sync. This is useful for broadcasting signed transactions - pub fn get_upstream_servers( - &self, - ) -> Result>, Option>> { - let mut earliest_not_until = None; - // TODO: with capacity? - let mut selected_rpcs = vec![]; - - for connection in self.inner.iter() { - // check rate limits and increment our connection counter - if let Err(not_until) = connection.try_inc_active_requests() { - earliest_possible(&mut earliest_not_until, not_until); - - // this rpc is not available. skip it - continue; - } - - selected_rpcs.push(connection.clone()); - } - - if !selected_rpcs.is_empty() { - return Ok(selected_rpcs); - } - - // return the earliest not_until (if no rpcs are synced, this will be None) - Err(earliest_not_until) - } -} - -fn earliest_possible( - earliest_not_until_option: &mut Option>, - new_not_until: NotUntil, -) { - match earliest_not_until_option.as_ref() { - None => *earliest_not_until_option = Some(new_not_until), - Some(earliest_not_until) => { - let earliest_possible = earliest_not_until.earliest_possible(); - let new_earliest_possible = new_not_until.earliest_possible(); - - if earliest_possible > new_earliest_possible { - *earliest_not_until_option = Some(new_not_until); - } - } - } -} diff --git a/src/main.rs b/src/main.rs deleted file mode 100644 index 6d78fc3f..00000000 --- a/src/main.rs +++ /dev/null @@ -1,267 +0,0 @@ -mod config; -mod connection; -mod connections; - -use config::Web3ConnectionConfig; -use futures::future; -use governor::clock::{Clock, QuantaClock}; -use serde_json::json; -use std::fmt; -use std::fs; -use std::sync::Arc; -use std::time::Duration; -use tokio::time::sleep; -use tracing::warn; -use warp::Filter; -use warp::Reply; - -use crate::config::RootConfig; -use crate::connection::JsonRpcRequest; -use crate::connections::Web3Connections; - -static APP_USER_AGENT: &str = concat!( - "satoshiandkin/", - env!("CARGO_PKG_NAME"), - "/", - env!("CARGO_PKG_VERSION"), -); - -/// The application -// TODO: this debug impl is way too verbose. make something smaller -pub struct Web3ProxyApp { - /// clock used for rate limiting - /// TODO: use tokio's clock (will require a different ratelimiting crate) - clock: QuantaClock, - /// Send requests to the best server available - balanced_rpc_tiers: Vec>, - /// Send private requests (like eth_sendRawTransaction) to all these servers - private_rpcs: Option>, -} - -impl fmt::Debug for Web3ProxyApp { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - // TODO: the default formatter takes forever to write. this is too quiet though - f.debug_struct("Web3ProxyApp").finish_non_exhaustive() - } -} - -impl Web3ProxyApp { - async fn try_new( - balanced_rpc_tiers: Vec>, - private_rpcs: Vec, - ) -> anyhow::Result { - let clock = QuantaClock::default(); - - // make a http shared client - // TODO: how should we configure the connection pool? - // TODO: 5 minutes is probably long enough. unlimited is a bad idea if something is wrong with the remote server - let http_client = reqwest::ClientBuilder::new() - .connect_timeout(Duration::from_secs(5)) - .timeout(Duration::from_secs(300)) - .user_agent(APP_USER_AGENT) - .build()?; - - let balanced_rpc_tiers = - future::join_all(balanced_rpc_tiers.into_iter().map(|balanced_rpc_tier| { - Web3Connections::try_new(balanced_rpc_tier, Some(http_client.clone()), &clock) - })) - .await - .into_iter() - .collect::>>>()?; - - let private_rpcs = if private_rpcs.is_empty() { - warn!("No private relays configured. Any transactions will be broadcast to the public mempool!"); - // TODO: instead of None, set it to a list of all the rpcs from balanced_rpc_tiers. that way we broadcast very loudly - None - } else { - Some(Web3Connections::try_new(private_rpcs, Some(http_client), &clock).await?) - }; - - Ok(Web3ProxyApp { - clock, - balanced_rpc_tiers, - private_rpcs, - }) - } - - /// send the request to the approriate RPCs - /// TODO: dry this up - async fn proxy_web3_rpc( - self: Arc, - json_body: JsonRpcRequest, - ) -> anyhow::Result { - if self.private_rpcs.is_some() && json_body.method == "eth_sendRawTransaction" { - let private_rpcs = self.private_rpcs.as_ref().unwrap(); - - // there are private rpcs configured and the request is eth_sendSignedTransaction. send to all private rpcs - loop { - // TODO: think more about this lock. i think it won't actually help the herd. it probably makes it worse if we have a tight lag_limit - match private_rpcs.get_upstream_servers() { - Ok(upstream_servers) => { - let (tx, rx) = flume::unbounded(); - - let connections = private_rpcs.clone(); - let method = json_body.method.clone(); - let params = json_body.params.clone(); - - tokio::spawn(async move { - connections - .try_send_requests(upstream_servers, method, params, tx) - .await - }); - - // wait for the first response - let response = rx.recv_async().await?; - - if let Ok(partial_response) = response { - let response = json!({ - "jsonrpc": "2.0", - "id": json_body.id, - "result": partial_response - }); - return Ok(warp::reply::json(&response)); - } - } - Err(not_until) => { - // TODO: move this to a helper function - // sleep (with a lock) until our rate limits should be available - if let Some(not_until) = not_until { - let deadline = not_until.wait_time_from(self.clock.now()); - - sleep(deadline).await; - } - } - }; - } - } else { - // this is not a private transaction (or no private relays are configured) - // try to send to each tier, stopping at the first success - loop { - // there are multiple tiers. save the earliest not_until (if any). if we don't return, we will sleep until then and then try again - let mut earliest_not_until = None; - - for balanced_rpcs in self.balanced_rpc_tiers.iter() { - // TODO: what allowed lag? - match balanced_rpcs.next_upstream_server().await { - Ok(upstream_server) => { - let response = balanced_rpcs - .try_send_request( - &upstream_server, - &json_body.method, - &json_body.params, - ) - .await; - - let response = match response { - Ok(partial_response) => { - // TODO: trace here was really slow with millions of requests. - // info!("forwarding request from {}", upstream_server); - - json!({ - // TODO: re-use their jsonrpc? - "jsonrpc": "2.0", - "id": json_body.id, - "result": partial_response - }) - } - Err(e) => { - // TODO: what is the proper format for an error? - json!({ - "jsonrpc": "2.0", - "id": json_body.id, - "error": format!("{}", e) - }) - } - }; - - return Ok(warp::reply::json(&response)); - } - Err(None) => { - // TODO: this is too verbose. if there are other servers in other tiers, use those! - warn!("No servers in sync!"); - } - Err(Some(not_until)) => { - // save the smallest not_until. if nothing succeeds, return an Err with not_until in it - // TODO: helper function for this - if earliest_not_until.is_none() { - earliest_not_until.replace(not_until); - } else { - let earliest_possible = - earliest_not_until.as_ref().unwrap().earliest_possible(); - - let new_earliest_possible = not_until.earliest_possible(); - - if earliest_possible > new_earliest_possible { - earliest_not_until = Some(not_until); - } - } - } - } - } - - // we haven't returned an Ok, sleep and try again - if let Some(earliest_not_until) = earliest_not_until { - let deadline = earliest_not_until.wait_time_from(self.clock.now()); - - sleep(deadline).await; - } else { - // TODO: how long should we wait? - // TODO: max wait time? - sleep(Duration::from_millis(500)).await; - }; - } - } - } -} - -#[tokio::main] -async fn main() -> anyhow::Result<()> { - // install global collector configured based on RUST_LOG env var. - tracing_subscriber::fmt::init(); - - // TODO: use flags for the config path - let config = "./data/config/example.toml"; - - let config: String = fs::read_to_string(config)?; - - let config: RootConfig = toml::from_str(&config)?; - - // TODO: load the config from yaml instead of hard coding - // TODO: support multiple chains in one process? then we could just point "chain.stytt.com" at this and caddy wouldn't need anything else - // TODO: be smart about about using archive nodes? have a set that doesn't use archive nodes since queries to them are more valuable - let listen_port = config.config.listen_port; - - let app = config.try_build().await?; - - let app: Arc = Arc::new(app); - - let proxy_rpc_filter = warp::any() - .and(warp::post()) - .and(warp::body::json()) - .then(move |json_body| app.clone().proxy_web3_rpc(json_body)); - - // TODO: filter for displaying connections and their block heights - - // TODO: warp trace is super verbose. how do we make this more readable? - // let routes = proxy_rpc_filter.with(warp::trace::request()); - let routes = proxy_rpc_filter.map(handle_anyhow_errors); - - warp::serve(routes).run(([0, 0, 0, 0], listen_port)).await; - - Ok(()) -} - -/// convert result into an http response. use this at the end of your warp filter -/// TODO: using boxes can't be the best way. think about this more -fn handle_anyhow_errors( - res: anyhow::Result, -) -> warp::http::Response { - match res { - Ok(r) => r.into_response(), - Err(e) => warp::reply::with_status( - format!("{}", e), - warp::http::StatusCode::INTERNAL_SERVER_ERROR, - ) - .into_response(), - } -}