fix deadlock

This commit is contained in:
Bryan Stitt 2022-08-30 20:01:42 +00:00
parent 11c66636bb
commit e5e137f76c
15 changed files with 300 additions and 315 deletions

@ -1,3 +1,8 @@
[build]
# potentially faster. https://nnethercote.github.io/perf-book/build-configuration.html
rustflags = ["-C", "target-cpu=native", "--cfg", "tokio_unstable"]
rustflags = [
# potentially faster. https://nnethercote.github.io/perf-book/build-configuration.html
"-C", "target-cpu=native",
# tokio unstable is needed for tokio-console
"--cfg", "tokio_unstable"
]
rustdocflags = ["--cfg", "tokio_unstable"]

319
Cargo.lock generated

@ -33,7 +33,7 @@ version = "0.7.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e8b47f52ea9bae42228d07ec09eb676433d7c4ed1ebdf0f1d1c29ed446f1ab8"
dependencies = [
"cfg-if 1.0.0",
"cfg-if",
"cipher",
"cpufeatures",
"opaque-debug 0.3.0",
@ -80,7 +80,7 @@ version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d52a9bb7ec0cf484c551830a7ce27bd20d67eac647e1befb56b0be4ee39a55d2"
dependencies = [
"winapi 0.3.9",
"winapi",
]
[[package]]
@ -210,7 +210,7 @@ dependencies = [
"slab",
"socket2",
"waker-fn",
"winapi 0.3.9",
"winapi",
]
[[package]]
@ -321,7 +321,7 @@ checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8"
dependencies = [
"hermit-abi",
"libc",
"winapi 0.3.9",
"winapi",
]
[[package]]
@ -442,7 +442,7 @@ checksum = "11a17d453482a265fd5f8479f2a3f405566e6ca627837aaddb85af8b1ab8ef61"
dependencies = [
"addr2line",
"cc",
"cfg-if 1.0.0",
"cfg-if",
"libc",
"miniz_oxide",
"object",
@ -786,12 +786,6 @@ dependencies = [
"jobserver",
]
[[package]]
name = "cfg-if"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822"
[[package]]
name = "cfg-if"
version = "1.0.0"
@ -811,7 +805,7 @@ dependencies = [
"serde",
"time 0.1.43",
"wasm-bindgen",
"winapi 0.3.9",
"winapi",
]
[[package]]
@ -947,7 +941,7 @@ checksum = "b3616f750b84d8f0de8a58bda93e08e2a81ad3f523089b05f1dffecab48c6cbd"
dependencies = [
"atty",
"lazy_static",
"winapi 0.3.9",
"winapi",
]
[[package]]
@ -985,7 +979,7 @@ dependencies = [
"regex",
"terminal_size",
"unicode-width",
"winapi 0.3.9",
"winapi",
]
[[package]]
@ -998,7 +992,7 @@ dependencies = [
"libc",
"once_cell",
"terminal_size",
"winapi 0.3.9",
"winapi",
]
[[package]]
@ -1076,7 +1070,7 @@ version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b540bd8bc810d3885c6ea91e2018302f68baba2129ab3e88f32389ee9370880d"
dependencies = [
"cfg-if 1.0.0",
"cfg-if",
]
[[package]]
@ -1121,7 +1115,7 @@ version = "0.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5aaa7bd5fb665c6864b5f963dd9097905c54125909c7aa94c9e18507cdbe6c53"
dependencies = [
"cfg-if 1.0.0",
"cfg-if",
"crossbeam-utils",
]
@ -1131,7 +1125,7 @@ version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6455c0ca19f0d2fbf751b908d5c55c1f5cbc65e03c4225427254b46890bdde1e"
dependencies = [
"cfg-if 1.0.0",
"cfg-if",
"crossbeam-epoch",
"crossbeam-utils",
]
@ -1143,7 +1137,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1145cf131a2c6ba0615079ab6a638f7e1973ac9c2634fcbeaaad6114246efe8c"
dependencies = [
"autocfg 1.1.0",
"cfg-if 1.0.0",
"cfg-if",
"crossbeam-utils",
"lazy_static",
"memoffset",
@ -1156,7 +1150,7 @@ version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1cd42583b04998a5363558e5f9291ee5a5ff6b49944332103f251e7479a82aa7"
dependencies = [
"cfg-if 1.0.0",
"cfg-if",
"crossbeam-utils",
]
@ -1166,7 +1160,7 @@ version = "0.8.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0bf124c720b7686e3c2663cf54062ab0f68a88af2fb6a030e87e30bf721fcb38"
dependencies = [
"cfg-if 1.0.0",
"cfg-if",
"lazy_static",
]
@ -1255,7 +1249,7 @@ version = "5.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3495912c9c1ccf2e18976439f4443f3fee0fd61f424ff99fde6a66b15ecb448f"
dependencies = [
"cfg-if 1.0.0",
"cfg-if",
"hashbrown",
"lock_api",
"parking_lot_core 0.9.3",
@ -1348,7 +1342,7 @@ version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b98cf8ebf19c3d1b223e151f99a4f9f0690dca41414773390fc824184ac833e1"
dependencies = [
"cfg-if 1.0.0",
"cfg-if",
"dirs-sys-next",
]
@ -1360,7 +1354,7 @@ checksum = "4ebda144c4fe02d1f7ea1a7d9641b6fc6b580adcfa024ae48797ecdeb6825b4d"
dependencies = [
"libc",
"redox_users",
"winapi 0.3.9",
"winapi",
]
[[package]]
@ -1446,7 +1440,7 @@ version = "0.8.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9852635589dc9f9ea1b6fe9f05b50ef208c85c834a562f0c6abb1c475736ec2b"
dependencies = [
"cfg-if 1.0.0",
"cfg-if",
]
[[package]]
@ -1577,7 +1571,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bda76ce804d524f693a898dc5857d08f4db443f3da64d0c36237fa05c0ecef30"
dependencies = [
"Inflector",
"cfg-if 1.0.0",
"cfg-if",
"dunce",
"ethers-core",
"eyre",
@ -1728,7 +1722,7 @@ version = "0.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ebe5db405d0e584aa8dae154ffebb90f2305cae588fd11d9f6b857ebe3a79294"
dependencies = [
"cfg-if 1.0.0",
"cfg-if",
"colored",
"dunce",
"ethers-core",
@ -1848,7 +1842,7 @@ version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e94a7bbaa59354bc20dd75b67f23e2797b4490e9d6928203fb105c79e448c86c"
dependencies = [
"cfg-if 1.0.0",
"cfg-if",
"libc",
"redox_syscall",
"windows-sys",
@ -1928,24 +1922,14 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9564fc758e15025b46aa6643b1b77d047d1a56a1aea6e01002ac0c7026876213"
dependencies = [
"libc",
"winapi 0.3.9",
]
[[package]]
name = "fsevent"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5ab7d1bd1bd33cc98b0889831b72da23c0aa4df9cec7e0702f46ecea04b35db6"
dependencies = [
"bitflags",
"fsevent-sys",
"winapi",
]
[[package]]
name = "fsevent-sys"
version = "2.0.1"
version = "4.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f41b048a94555da0f42f1d632e2e19510084fb8e303b0daa2816e733fb3644a0"
checksum = "76ee7a02da4d231650c7cea31349b889be2f45ddb3ef3032d2ec8185f6313fd2"
dependencies = [
"libc",
]
@ -1956,22 +1940,6 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a06f77d526c1a601b7c4cdd98f54b5eaabffc14d5f2f0296febdc7f357c6d3ba"
[[package]]
name = "fuchsia-zircon"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2e9763c69ebaae630ba35f74888db465e49e259ba1bc0eda7d06f4a067615d82"
dependencies = [
"bitflags",
"fuchsia-zircon-sys",
]
[[package]]
name = "fuchsia-zircon-sys"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7"
[[package]]
name = "funty"
version = "2.0.0"
@ -1980,9 +1948,9 @@ checksum = "e6d5a32815ae3f33302d95fdcb2ce17862f8c65363dcfd29360480ba1001fc9c"
[[package]]
name = "futures"
version = "0.3.23"
version = "0.3.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab30e97ab6aacfe635fad58f22c2bb06c8b685f7421eb1e064a729e2a5f481fa"
checksum = "7f21eda599937fba36daeb58a22e8f5cee2d14c4a17b5b7739c7c8e5e3b8230c"
dependencies = [
"futures-channel",
"futures-core",
@ -1995,9 +1963,9 @@ dependencies = [
[[package]]
name = "futures-channel"
version = "0.3.23"
version = "0.3.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2bfc52cbddcfd745bf1740338492bb0bd83d76c67b445f91c5fb29fae29ecaa1"
checksum = "30bdd20c28fadd505d0fd6712cdfcb0d4b5648baf45faef7f852afb2399bb050"
dependencies = [
"futures-core",
"futures-sink",
@ -2005,15 +1973,15 @@ dependencies = [
[[package]]
name = "futures-core"
version = "0.3.23"
version = "0.3.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2acedae88d38235936c3922476b10fced7b2b68136f5e3c03c2d5be348a1115"
checksum = "4e5aa3de05362c3fb88de6531e6296e85cde7739cccad4b9dfeeb7f6ebce56bf"
[[package]]
name = "futures-executor"
version = "0.3.23"
version = "0.3.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d11aa21b5b587a64682c0094c2bdd4df0076c5324961a40cc3abd7f37930528"
checksum = "9ff63c23854bee61b6e9cd331d523909f238fc7636290b96826e9cfa5faa00ab"
dependencies = [
"futures-core",
"futures-task",
@ -2034,9 +2002,9 @@ dependencies = [
[[package]]
name = "futures-io"
version = "0.3.23"
version = "0.3.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "93a66fc6d035a26a3ae255a6d2bca35eda63ae4c5512bef54449113f7a1228e5"
checksum = "bbf4d2a7a308fd4578637c0b17c7e1c7ba127b8f6ba00b29f717e9655d85eb68"
[[package]]
name = "futures-lite"
@ -2066,9 +2034,9 @@ dependencies = [
[[package]]
name = "futures-macro"
version = "0.3.23"
version = "0.3.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0db9cce532b0eae2ccf2766ab246f114b56b9cf6d445e00c2549fbc100ca045d"
checksum = "42cd15d1c7456c04dbdf7e88bcd69760d74f3a798d6444e16974b505b0e62f17"
dependencies = [
"proc-macro2",
"quote",
@ -2077,15 +2045,15 @@ dependencies = [
[[package]]
name = "futures-sink"
version = "0.3.23"
version = "0.3.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca0bae1fe9752cf7fd9b0064c674ae63f97b37bc714d745cbde0afb7ec4e6765"
checksum = "21b20ba5a92e727ba30e72834706623d94ac93a725410b6a6b6fbc1b07f7ba56"
[[package]]
name = "futures-task"
version = "0.3.23"
version = "0.3.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "842fc63b931f4056a24d59de13fb1272134ce261816e063e634ad0c15cdc5306"
checksum = "a6508c467c73851293f390476d4491cf4d227dbabcd4170f3bb6044959b294f1"
[[package]]
name = "futures-timer"
@ -2095,9 +2063,9 @@ checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c"
[[package]]
name = "futures-util"
version = "0.3.23"
version = "0.3.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0828a5471e340229c11c77ca80017937ce3c58cb788a17e5f1c2d5c485a9577"
checksum = "44fb6cb1be61cc1d2e43b262516aafcf63b241cffdb1d3fa115f91d9c7b09c90"
dependencies = [
"futures-channel",
"futures-core",
@ -2145,7 +2113,7 @@ version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9be70c98951c83b8d2f8f60d7065fa6d5146873094452a1008da8c2f1e4205ad"
dependencies = [
"cfg-if 1.0.0",
"cfg-if",
"js-sys",
"libc",
"wasi 0.10.2+wasi-snapshot-preview1",
@ -2327,7 +2295,7 @@ version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2456aef2e6b6a9784192ae780c0f15bc57df0e918585282325e8c8ac27737654"
dependencies = [
"winapi 0.3.9",
"winapi",
]
[[package]]
@ -2417,7 +2385,7 @@ dependencies = [
"core-foundation-sys",
"js-sys",
"wasm-bindgen",
"winapi 0.3.9",
"winapi",
]
[[package]]
@ -2499,9 +2467,9 @@ dependencies = [
[[package]]
name = "inotify"
version = "0.7.1"
version = "0.9.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4816c66d2c8ae673df83366c18341538f234a26d65a9ecea5c348b453ac1d02f"
checksum = "f8069d3ec154eb856955c1c0fbffefbf5f3c40a104ec912d4797314c1801abff"
dependencies = [
"bitflags",
"inotify-sys",
@ -2523,21 +2491,12 @@ version = "0.1.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c"
dependencies = [
"cfg-if 1.0.0",
"cfg-if",
"js-sys",
"wasm-bindgen",
"web-sys",
]
[[package]]
name = "iovec"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b2b3ea6ff95e175473f8ffe6a7eb7c00d054240321b84c57051175fe3c1e075e"
dependencies = [
"libc",
]
[[package]]
name = "ipnet"
version = "2.5.0"
@ -2598,7 +2557,7 @@ version = "0.11.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2c8a5a96d92d849c4499d99461da81c9cdc1467418a8ed2aaeb407e8d85940ed"
dependencies = [
"cfg-if 1.0.0",
"cfg-if",
"ecdsa",
"elliptic-curve",
"sha2 0.10.2",
@ -2612,13 +2571,23 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f9b7d56ba4a8344d6be9729995e6b06f928af29998cdf79fe390cbf6b1fee838"
[[package]]
name = "kernel32-sys"
version = "0.2.2"
name = "kqueue"
version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d"
checksum = "4d6112e8f37b59803ac47a42d14f1f3a59bbf72fc6857ffc5be455e28a691f8e"
dependencies = [
"winapi 0.2.8",
"winapi-build",
"kqueue-sys",
"libc",
]
[[package]]
name = "kqueue-sys"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8367585489f01bc55dd27404dcf56b95e6da061a256a666ab23be9ba96a2e587"
dependencies = [
"bitflags",
"libc",
]
[[package]]
@ -2671,12 +2640,6 @@ dependencies = [
"spin 0.5.2",
]
[[package]]
name = "lazycell"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55"
[[package]]
name = "libc"
version = "0.2.126"
@ -2722,7 +2685,7 @@ version = "0.4.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "abb12e687cfb44aa40f41fc3978ef76448f9b6038cad6aef4259d3c095a2382e"
dependencies = [
"cfg-if 1.0.0",
"cfg-if",
"value-bag",
]
@ -2800,25 +2763,6 @@ dependencies = [
"adler",
]
[[package]]
name = "mio"
version = "0.6.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4afd66f5b91bf2a3bc13fad0e21caedac168ca4c707504e75585648ae80e4cc4"
dependencies = [
"cfg-if 0.1.10",
"fuchsia-zircon",
"fuchsia-zircon-sys",
"iovec",
"kernel32-sys",
"libc",
"log",
"miow",
"net2",
"slab",
"winapi 0.2.8",
]
[[package]]
name = "mio"
version = "0.8.3"
@ -2831,30 +2775,6 @@ dependencies = [
"windows-sys",
]
[[package]]
name = "mio-extras"
version = "2.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "52403fe290012ce777c4626790c8951324a2b9e3316b3143779c72b029742f19"
dependencies = [
"lazycell",
"log",
"mio 0.6.23",
"slab",
]
[[package]]
name = "miow"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ebd808424166322d4a38da87083bfddd3ac4c131334ed55856112eb06d46944d"
dependencies = [
"kernel32-sys",
"net2",
"winapi 0.2.8",
"ws2_32-sys",
]
[[package]]
name = "nanorand"
version = "0.7.0"
@ -2864,17 +2784,6 @@ dependencies = [
"getrandom",
]
[[package]]
name = "net2"
version = "0.2.37"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "391630d12b68002ae1e25e8f974306474966550ad82dac6886fb8910c19568ae"
dependencies = [
"cfg-if 0.1.10",
"libc",
"winapi 0.3.9",
]
[[package]]
name = "new_debug_unreachable"
version = "1.0.4"
@ -2899,20 +2808,20 @@ checksum = "e9e591e719385e6ebaeb5ce5d3887f7d5676fceca6411d1925ccc95745f3d6f7"
[[package]]
name = "notify"
version = "4.0.17"
version = "5.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ae03c8c853dba7bfd23e571ff0cff7bc9dceb40a4cd684cd1681824183f45257"
checksum = "ed2c66da08abae1c024c01d635253e402341b4060a12e99b31c7594063bf490a"
dependencies = [
"bitflags",
"crossbeam-channel",
"filetime",
"fsevent",
"fsevent-sys",
"inotify",
"kqueue",
"libc",
"mio 0.6.23",
"mio-extras",
"mio",
"walkdir",
"winapi 0.3.9",
"winapi",
]
[[package]]
@ -3156,12 +3065,12 @@ version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d76e8e1493bcac0d2766c42737f34458f1c8c50c0d23bcb24ea953affb273216"
dependencies = [
"cfg-if 1.0.0",
"cfg-if",
"instant",
"libc",
"redox_syscall",
"smallvec",
"winapi 0.3.9",
"winapi",
]
[[package]]
@ -3171,7 +3080,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09a279cbf25cb0757810394fbc1e359949b59e348145c643a939a525692e6929"
dependencies = [
"backtrace",
"cfg-if 1.0.0",
"cfg-if",
"libc",
"petgraph",
"redox_syscall",
@ -3471,11 +3380,11 @@ version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "685404d509889fade3e86fe3a5803bca2ec09b0c0778d5ada6ec8bf7a8de5259"
dependencies = [
"cfg-if 1.0.0",
"cfg-if",
"libc",
"log",
"wepoll-ffi",
"winapi 0.3.9",
"winapi",
]
[[package]]
@ -3594,7 +3503,7 @@ checksum = "924cd8a0de90723d63fed19c5035ea129913a0bc998b37686a67f1eaf6a2aab5"
dependencies = [
"lazy_static",
"libc",
"winapi 0.3.9",
"winapi",
]
[[package]]
@ -3657,7 +3566,7 @@ dependencies = [
"rand_os",
"rand_pcg",
"rand_xorshift",
"winapi 0.3.9",
"winapi",
]
[[package]]
@ -3741,7 +3650,7 @@ checksum = "1166d5c91dc97b88d1decc3285bb0a99ed84b05cfd0bc2341bdf2d43fc41e39b"
dependencies = [
"libc",
"rand_core 0.4.2",
"winapi 0.3.9",
"winapi",
]
[[package]]
@ -3756,7 +3665,7 @@ dependencies = [
"rand_core 0.4.2",
"rdrand",
"wasm-bindgen",
"winapi 0.3.9",
"winapi",
]
[[package]]
@ -3891,7 +3800,7 @@ version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3acd125665422973a33ac9d3dd2df85edad0f4ae9b00dafb1a05e43a9f5ef8e7"
dependencies = [
"winapi 0.3.9",
"winapi",
]
[[package]]
@ -3956,7 +3865,7 @@ dependencies = [
"spin 0.5.2",
"untrusted",
"web-sys",
"winapi 0.3.9",
"winapi",
]
[[package]]
@ -4383,7 +4292,7 @@ version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "028f48d513f9678cda28f6e4064755b3fbb2af6acd672f2c209b62323f7aea0f"
dependencies = [
"cfg-if 1.0.0",
"cfg-if",
"cpufeatures",
"digest 0.10.3",
]
@ -4394,7 +4303,7 @@ version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c77f4e7f65455545c2153c1253d25056825e77ee2533f0e41deb65a93a34852f"
dependencies = [
"cfg-if 1.0.0",
"cfg-if",
"cpufeatures",
"digest 0.10.3",
]
@ -4418,7 +4327,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4d58a1e1bf39749807d89cf2d98ac2dfa0ff1cb3faa38fbb64dd88ac8013d800"
dependencies = [
"block-buffer 0.9.0",
"cfg-if 1.0.0",
"cfg-if",
"cpufeatures",
"digest 0.9.0",
"opaque-debug 0.3.0",
@ -4430,7 +4339,7 @@ version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "55deaec60f81eefe3cce0dc50bda92d6d8e88f2a27df7c5033b42afeb1ed2676"
dependencies = [
"cfg-if 1.0.0",
"cfg-if",
"cpufeatures",
"digest 0.10.3",
]
@ -4529,7 +4438,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "66d72b759436ae32898a2af0a14218dbf55efde3feeb170eb623637db85ee1e0"
dependencies = [
"libc",
"winapi 0.3.9",
"winapi",
]
[[package]]
@ -4762,7 +4671,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a55e00b6f95abd889ce398bd7eab2a9c62cd27281cf1bfba70847340557434cf"
dependencies = [
"anyhow",
"cfg-if 1.0.0",
"cfg-if",
"clap 3.2.15",
"console 0.14.1",
"dialoguer",
@ -4815,12 +4724,12 @@ version = "3.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5cdb1ef4eaeeaddc8fbd371e5017057064af0911902ef36b39801f67cc6d79e4"
dependencies = [
"cfg-if 1.0.0",
"cfg-if",
"fastrand",
"libc",
"redox_syscall",
"remove_dir_all",
"winapi 0.3.9",
"winapi",
]
[[package]]
@ -4831,7 +4740,7 @@ checksum = "c59df8ac95d96ff9bede18eb7300b0fda5e5d8d90960e76f8e14ae765eedbf1f"
dependencies = [
"dirs-next",
"rustversion",
"winapi 0.3.9",
"winapi",
]
[[package]]
@ -4850,7 +4759,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "633c1a546cee861a1a6d0dc69ebeca693bf4296661ba7852b9d21d159e0506df"
dependencies = [
"libc",
"winapi 0.3.9",
"winapi",
]
[[package]]
@ -4896,7 +4805,7 @@ checksum = "5fdfe0627923f7411a43ec9ec9c39c3a9b4151be313e0922042581fb6c9b717f"
dependencies = [
"libc",
"redox_syscall",
"winapi 0.3.9",
"winapi",
]
[[package]]
@ -4915,7 +4824,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca8a50ef2360fbd1eeb0ecd46795a87a19024eb4b53c5dc916ca1fd95fe62438"
dependencies = [
"libc",
"winapi 0.3.9",
"winapi",
]
[[package]]
@ -4980,7 +4889,7 @@ dependencies = [
"bytes",
"libc",
"memchr",
"mio 0.8.3",
"mio",
"num_cpus",
"once_cell",
"parking_lot 0.12.1",
@ -4989,7 +4898,7 @@ dependencies = [
"socket2",
"tokio-macros",
"tracing",
"winapi 0.3.9",
"winapi",
]
[[package]]
@ -5145,7 +5054,7 @@ version = "0.1.36"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2fce9567bd60a67d08a16488756721ba392f24f29006402881e43b19aac64307"
dependencies = [
"cfg-if 1.0.0",
"cfg-if",
"log",
"pin-project-lite",
"tracing-attributes",
@ -5409,7 +5318,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "808cf2735cd4b6866113f648b791c6adc5714537bc222d9347bb203386ffda56"
dependencies = [
"same-file",
"winapi 0.3.9",
"winapi",
"winapi-util",
]
@ -5441,7 +5350,7 @@ version = "0.2.82"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc7652e3f6c4706c8d9cd54832c4a4ccb9b5336e2c3bd154d5cccfbf1c1f5f7d"
dependencies = [
"cfg-if 1.0.0",
"cfg-if",
"wasm-bindgen-macro",
]
@ -5466,7 +5375,7 @@ version = "0.4.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6f741de44b75e14c35df886aff5f1eb73aa114fa5d4d00dcd37b5e01259bf3b2"
dependencies = [
"cfg-if 1.0.0",
"cfg-if",
"js-sys",
"wasm-bindgen",
"web-sys",
@ -5609,12 +5518,6 @@ dependencies = [
"cc",
]
[[package]]
name = "winapi"
version = "0.2.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "167dc9d6949a9b857f3451275e911c3f44255842c1f7a76f33c55103a909087a"
[[package]]
name = "winapi"
version = "0.3.9"
@ -5625,12 +5528,6 @@ dependencies = [
"winapi-x86_64-pc-windows-gnu",
]
[[package]]
name = "winapi-build"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2d315eee3b34aca4797b2da6b13ed88266e6d612562a0c46390af8299fc699bc"
[[package]]
name = "winapi-i686-pc-windows-gnu"
version = "0.4.0"
@ -5643,7 +5540,7 @@ version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178"
dependencies = [
"winapi 0.3.9",
"winapi",
]
[[package]]
@ -5701,17 +5598,7 @@ version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "80d0f4e272c85def139476380b12f9ac60926689dd2e01d4923222f40580869d"
dependencies = [
"winapi 0.3.9",
]
[[package]]
name = "ws2_32-sys"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d59cefebd0c892fa2dd6de581e937301d8552cb44489cdff035c6187cb63fa5e"
dependencies = [
"winapi 0.2.8",
"winapi-build",
"winapi",
]
[[package]]

@ -114,6 +114,7 @@
- [x] now that we have a consensus head with enough soft limit (or an empty set), update SyncedConnections
- [x] send the block through new head_block_sender
- [x] rewrite cannonical_block
- [ ] bug around eth_getBlockByHash sometimes causes tokio to lock up
- [-] use siwe messages and signatures for sign up and login
- [-] requests for "Get transactions receipts" are routed to the private_rpcs and not the balanced_rpcs. do this better.
- [x] quick fix, send to balanced_rpcs for now. we will just live with errors on new transactions.
@ -123,7 +124,10 @@
- [-] basic request method stats (using the user_id and other fields that are in the tracing frame)
- [ ] "chain is forked" message is wrong. it includes nodes just being on different heights of the same chain. need a smarter check
- i think there is also a bug because i've seen "server not synced" a couple times
- [ ] i saw a fork of like 300 blocks. probably just because a node was restarted and had fallen behind. need some checks to ignore things that are far behind
- [x] i saw a fork of like 300 blocks. probably just because a node was restarted and had fallen behind. need some checks to ignore things that are far behind
- [ ] todo!("pick the block on the current consensus chain")
- [ ] web3connection3.block(...) might wait forever. be sure to do it safely
- [ ] search for all "todo!"
## V1

@ -5,6 +5,7 @@ use anyhow::Context;
use bb8_redis::redis::pipe;
use std::ops::Add;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use tracing::trace;
pub use crate::errors::{RedisError, RedisErrorSink};
pub use bb8_redis::{bb8, redis, RedisConnectionManager};
@ -14,8 +15,10 @@ pub type RedisPool = bb8::Pool<RedisConnectionManager>;
pub struct RedisRateLimit {
pool: RedisPool,
key_prefix: String,
default_max_per_period: u64,
period: u64,
/// The default maximum requests allowed in a period.
max_requests_per_period: u64,
/// seconds
period: f32,
}
pub enum ThrottleResult {
@ -29,27 +32,29 @@ impl RedisRateLimit {
pool: RedisPool,
app: &str,
label: &str,
default_max_per_period: u64,
period: u64,
max_requests_per_period: u64,
period: f32,
) -> Self {
let key_prefix = format!("{}:rrl:{}", app, label);
Self {
pool,
key_prefix,
default_max_per_period,
max_requests_per_period,
period,
}
}
/// label might be an ip address or a user_key id
/// label might be an ip address or a user_key id.
/// if setting max_per_period, be sure to keep the period the same for all requests to this label
/// TODO:
pub async fn throttle_label(
&self,
label: &str,
max_per_period: Option<u64>,
count: u64,
) -> anyhow::Result<ThrottleResult> {
let max_per_period = max_per_period.unwrap_or(self.default_max_per_period);
let max_per_period = max_per_period.unwrap_or(self.max_requests_per_period);
if max_per_period == 0 {
return Ok(ThrottleResult::RetryNever);
@ -58,7 +63,7 @@ impl RedisRateLimit {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.context("cannot tell the time")?
.as_secs();
.as_secs_f32();
// if self.period is 60, period_id will be the minute of the current time
let period_id = (now / self.period) % self.period;
@ -71,7 +76,7 @@ impl RedisRateLimit {
// we could get the key first, but that means an extra redis call for every check. this seems better
.incr(&throttle_key, count)
// set expiration the first time we set the key. ignore the result
.expire(&throttle_key, self.period.try_into().unwrap())
.expire(&throttle_key, self.period as usize)
// .arg("NX") // TODO: this works in redis, but not elasticache
.ignore()
// do the query
@ -84,9 +89,11 @@ impl RedisRateLimit {
.ok_or_else(|| anyhow::anyhow!("check rate limit result"))?;
if new_count > &max_per_period {
let seconds_left_in_period = self.period - now / self.period;
let seconds_left_in_period = self.period - (now % self.period);
let retry_at = Instant::now().add(Duration::from_secs(seconds_left_in_period));
let retry_at = Instant::now().add(Duration::from_secs_f32(seconds_left_in_period));
trace!(%label, ?retry_at, "rate limited");
return Ok(ThrottleResult::RetryAt(retry_at));
}

@ -23,6 +23,7 @@ axum = { version = "0.5.15", features = ["headers", "serde_json", "tokio-tungste
axum-auth = "0.3.0"
axum-client-ip = "0.2.0"
axum-macros = "0.2.3"
# TODO: turn tokio-console on with a feature. console-subscriber = { version = "0.1.7" }
counter = "0.5.6"
dashmap = "5.3.4"
derive_more = "0.99.17"
@ -30,12 +31,12 @@ dotenv = "0.15.0"
ethers = { version = "0.17.0", features = ["rustls", "ws"] }
fdlimit = "0.2.1"
flume = "0.10.14"
futures = { version = "0.3.23", features = ["thread-pool"] }
futures = { version = "0.3.24", features = ["thread-pool"] }
hashbrown = { version = "0.12.3", features = ["serde"] }
http = "0.2.8"
indexmap = "1.9.1"
fifomap = { path = "../fifomap" }
notify = "4.0.17"
notify = "5.0.0"
num = "0.4.0"
parking_lot = { version = "0.12.1", features = ["arc_lock"] }
petgraph = "0.6.2"

@ -6,7 +6,8 @@ use crate::jsonrpc::JsonRpcForwardedResponse;
use crate::jsonrpc::JsonRpcForwardedResponseEnum;
use crate::jsonrpc::JsonRpcRequest;
use crate::jsonrpc::JsonRpcRequestEnum;
use crate::rpcs::connections::{BlockHashesMap, Web3Connections};
use crate::rpcs::blockchain::{ArcBlock, BlockHashesMap};
use crate::rpcs::connections::Web3Connections;
use crate::rpcs::transactions::TxStatus;
use crate::stats::AppStats;
use anyhow::Context;
@ -82,7 +83,7 @@ pub struct Web3ProxyApp {
response_cache: ResponseLrcCache,
// don't drop this or the sender will stop working
// TODO: broadcast channel instead?
head_block_receiver: watch::Receiver<Arc<Block<TxHash>>>,
head_block_receiver: watch::Receiver<ArcBlock>,
pending_tx_sender: broadcast::Sender<TxStatus>,
pub config: AppConfig,
pub db_conn: Option<sea_orm::DatabaseConnection>,
@ -299,7 +300,7 @@ impl Web3ProxyApp {
"web3_proxy",
"frontend",
top_config.app.public_rate_limit_per_minute,
60,
60.0,
)
});

@ -1,15 +1,16 @@
use crate::app::AnyhowJoinHandle;
use crate::rpcs::blockchain::BlockHashesMap;
use crate::rpcs::connection::Web3Connection;
use crate::rpcs::connections::BlockHashesMap;
use crate::{app::AnyhowJoinHandle, rpcs::blockchain::ArcBlock};
use argh::FromArgs;
use derive_more::Constructor;
use ethers::prelude::{Block, TxHash};
use ethers::prelude::TxHash;
use hashbrown::HashMap;
use serde::Deserialize;
use std::sync::Arc;
use tokio::sync::broadcast;
pub type BlockAndRpc = (Arc<Block<TxHash>>, Arc<Web3Connection>);
pub type BlockAndRpc = (ArcBlock, Arc<Web3Connection>);
pub type TxHashAndRpc = (TxHash, Arc<Web3Connection>);
#[derive(Debug, FromArgs)]
/// Web3_proxy is a fast caching and load balancing proxy for web3 (Ethereum or similar) JsonRPC servers.
@ -96,6 +97,7 @@ pub struct Web3ConnectionConfig {
impl Web3ConnectionConfig {
/// Create a Web3Connection from config
/// TODO: move this into Web3Connection (just need to make things pub(crate))
// #[instrument(name = "try_build_Web3ConnectionConfig", skip_all)]
#[allow(clippy::too_many_arguments)]
pub async fn spawn(
@ -107,7 +109,7 @@ impl Web3ConnectionConfig {
http_interval_sender: Option<Arc<broadcast::Sender<()>>>,
block_map: BlockHashesMap,
block_sender: Option<flume::Sender<BlockAndRpc>>,
tx_id_sender: Option<flume::Sender<(TxHash, Arc<Web3Connection>)>>,
tx_id_sender: Option<flume::Sender<TxHashAndRpc>>,
) -> anyhow::Result<(Arc<Web3Connection>, AnyhowJoinHandle<()>)> {
let hard_limit = match (self.hard_limit, redis_pool) {
(None, None) => None,

@ -1,9 +1,9 @@
mod errors;
mod http;
mod http_proxy;
mod rate_limit;
mod rpc_proxy_http;
mod rpc_proxy_ws;
mod users;
mod ws_proxy;
use crate::app::Web3ProxyApp;
use ::http::Request;
@ -45,10 +45,10 @@ pub async fn serve(port: u16, proxy_app: Arc<Web3ProxyApp>) -> anyhow::Result<()
// build our axum Router
let app = Router::new()
// routes should be order most to least common
.route("/", post(http_proxy::public_proxy_web3_rpc))
.route("/", get(ws_proxy::public_websocket_handler))
.route("/u/:user_key", post(http_proxy::user_proxy_web3_rpc))
.route("/u/:user_key", get(ws_proxy::user_websocket_handler))
.route("/", post(rpc_proxy_http::public_proxy_web3_rpc))
.route("/", get(rpc_proxy_ws::public_websocket_handler))
.route("/u/:user_key", post(rpc_proxy_http::user_proxy_web3_rpc))
.route("/u/:user_key", get(rpc_proxy_ws::user_websocket_handler))
.route("/health", get(http::health))
// TODO: we probably want to remove /status in favor of the separate prometheus thread
.route("/status", get(http::status))

@ -5,7 +5,10 @@ use super::transactions::TxStatus;
use crate::{
config::BlockAndRpc, jsonrpc::JsonRpcRequest, rpcs::synced_connections::SyncedConnections,
};
use dashmap::mapref::{entry::Entry, one::Ref};
use dashmap::{
mapref::{entry::Entry, one::Ref},
DashMap,
};
use derive_more::From;
use ethers::prelude::{Block, TxHash, H256, U64};
use hashbrown::{HashMap, HashSet};
@ -15,6 +18,10 @@ use std::sync::Arc;
use tokio::sync::{broadcast, watch};
use tracing::{debug, info, trace, warn};
pub type ArcBlock = Arc<Block<TxHash>>;
pub type BlockHashesMap = Arc<DashMap<H256, ArcBlock>>;
/// A block's hash and number.
#[derive(Default, From)]
pub struct BlockId {
@ -24,51 +31,59 @@ pub struct BlockId {
impl Web3Connections {
/// add a block to our map and it's hash to our graphmap of the blockchain
pub fn save_block(&self, block: &Arc<Block<TxHash>>) -> anyhow::Result<()> {
let block_hash = block.hash.ok_or_else(|| anyhow::anyhow!("no block hash"))?;
pub fn save_block(&self, block: &ArcBlock) -> anyhow::Result<()> {
let block_hash = block
.hash
.as_ref()
.ok_or_else(|| anyhow::anyhow!("no block hash"))?;
let block_num = block
.number
.as_ref()
.ok_or_else(|| anyhow::anyhow!("no block num"))?;
let _block_td = block
.total_difficulty
.as_ref()
.ok_or_else(|| anyhow::anyhow!("no block total difficulty"))?;
if self.block_hashes.contains_key(&block_hash) {
if self.block_hashes.contains_key(block_hash) {
// this block is already included. no need to continue
return Ok(());
}
let mut blockchain = self.blockchain_graphmap.write();
if blockchain.contains_node(block_hash) {
if blockchain.contains_node(*block_hash) {
// this hash is already included. we must have hit that race condition
// return now since this work was already done.
return Ok(());
}
// TODO: theres a small race between contains_key and insert
if let Some(_overwritten) = self.block_hashes.insert(block_hash, block.clone()) {
if let Some(_overwritten) = self.block_hashes.insert(*block_hash, block.clone()) {
// there was a race and another thread wrote this block
// i don't think this will happen. the blockchain.conains_node above should be enough
// no need to continue because that other thread would have written (or soon will) write the
return Ok(());
}
match self.block_numbers.entry(block_num) {
match self.block_numbers.entry(*block_num) {
Entry::Occupied(mut x) => {
x.get_mut().push(block_hash);
x.get_mut().push(*block_hash);
}
Entry::Vacant(x) => {
x.insert(vec![block_hash]);
x.insert(vec![*block_hash]);
}
}
// TODO: prettier log? or probably move the log somewhere else
trace!(%block_hash, "new block");
blockchain.add_node(block_hash);
blockchain.add_node(*block_hash);
// what should edge weight be? and should the nodes be the blocks instead?
// TODO: maybe the weight should be the block?
// we store parent_hash -> hash because the block already stores the parent_hash
blockchain.add_edge(block.parent_hash, block_hash, 0);
blockchain.add_edge(block.parent_hash, *block_hash, 0);
// TODO: prune block_numbers and block_map to only keep a configurable (256 on ETH?) number of blocks?
@ -77,11 +92,12 @@ impl Web3Connections {
/// Get a block from caches with fallback.
/// Will query a specific node or the best available.
/// WARNING! This may wait forever. be sure this runs with your own timeout
pub async fn block(
&self,
hash: &H256,
rpc: Option<&Arc<Web3Connection>>,
) -> anyhow::Result<Arc<Block<TxHash>>> {
) -> anyhow::Result<ArcBlock> {
// first, try to get the hash from our cache
if let Some(block) = self.block_hashes.get(hash) {
return Ok(block.clone());
@ -118,6 +134,7 @@ impl Web3Connections {
let block = Arc::new(block);
// the block was fetched using eth_getBlockByHash, so it should have all fields
self.save_block(&block)?;
Ok(block)
@ -133,9 +150,9 @@ impl Web3Connections {
}
/// Get the heaviest chain's block from cache or backend rpc
pub async fn cannonical_block(&self, num: &U64) -> anyhow::Result<Arc<Block<TxHash>>> {
pub async fn cannonical_block(&self, num: &U64) -> anyhow::Result<ArcBlock> {
// we only have blocks by hash now
// maybe save them during save_block in a blocks_by_number DashMap<U64, Vec<Arc<Block<TxHash>>>>
// maybe save them during save_block in a blocks_by_number DashMap<U64, Vec<ArcBlock>>
// if theres multiple, use petgraph to find the one on the main chain (and remove the others if they have enough confirmations)
// first, try to get the hash from our cache
@ -189,6 +206,7 @@ impl Web3Connections {
let block = Arc::new(block);
// the block was fetched using eth_getBlockByNumber, so it should have all fields
self.save_block(&block)?;
Ok(block)
@ -198,7 +216,7 @@ impl Web3Connections {
&self,
block_receiver: flume::Receiver<BlockAndRpc>,
// TODO: head_block_sender should be a broadcast_sender like pending_tx_sender
head_block_sender: watch::Sender<Arc<Block<TxHash>>>,
head_block_sender: watch::Sender<ArcBlock>,
pending_tx_sender: Option<broadcast::Sender<TxStatus>>,
) -> anyhow::Result<()> {
// TODO: indexmap or hashmap? what hasher? with_capacity?
@ -228,9 +246,9 @@ impl Web3Connections {
async fn process_block_from_rpc(
&self,
connection_heads: &mut HashMap<String, H256>,
rpc_head_block: Arc<Block<TxHash>>,
rpc_head_block: ArcBlock,
rpc: Arc<Web3Connection>,
head_block_sender: &watch::Sender<Arc<Block<TxHash>>>,
head_block_sender: &watch::Sender<ArcBlock>,
pending_tx_sender: &Option<broadcast::Sender<TxStatus>>,
) -> anyhow::Result<()> {
// add the block to connection_heads
@ -257,7 +275,7 @@ impl Web3Connections {
// iterate the rpc_map to find the highest_work_block
let mut checked_heads = HashSet::new();
let mut highest_work_block: Option<Ref<H256, Arc<Block<TxHash>>>> = None;
let mut highest_work_block: Option<Ref<H256, ArcBlock>> = None;
for (_rpc_name, rpc_head_hash) in connection_heads.iter() {
if checked_heads.contains(rpc_head_hash) {
@ -268,11 +286,26 @@ impl Web3Connections {
let rpc_head_block = self.block_hashes.get(rpc_head_hash).unwrap();
if highest_work_block.is_none()
|| rpc_head_block.total_difficulty
> highest_work_block.as_ref().unwrap().total_difficulty
{
highest_work_block = Some(rpc_head_block);
match &rpc_head_block.total_difficulty {
None => {
// no total difficulty
// TODO: should we fetch the block here? I think this shouldn't happen
warn!(?rpc, %rpc_head_hash, "block is missing total difficulty");
continue;
}
Some(td) => {
if highest_work_block.is_none()
|| td
> highest_work_block
.as_ref()
.expect("there should always be a block here")
.total_difficulty
.as_ref()
.expect("there should always be total difficulty here")
{
highest_work_block = Some(rpc_head_block);
}
}
}
}
@ -422,16 +455,18 @@ impl Web3Connections {
}
if consensus_block_hash == old_head_hash {
debug!(hash=%consensus_block_hash, num=%consensus_block_num, limit=%consensus_sum_soft_limit, "cur consensus head {}/{}/{}", num_synced_rpcs, num_connection_heads, total_rpcs);
debug!(hash=%consensus_block_hash, num=%consensus_block_num, limit=%consensus_sum_soft_limit, %rpc, "cur consensus head {}/{}/{}", num_synced_rpcs, num_connection_heads, total_rpcs);
} else if soft_limit_met {
// TODO: if new's parent is not old, warn?
debug!(hash=%consensus_block_hash, num=%consensus_block_num, limit=%consensus_sum_soft_limit, "new consensus head {}/{}/{}", num_synced_rpcs, num_connection_heads, total_rpcs);
debug!(hash=%consensus_block_hash, num=%consensus_block_num, limit=%consensus_sum_soft_limit, %rpc, "NEW consensus head {}/{}/{}", num_synced_rpcs, num_connection_heads, total_rpcs);
// the head hash changed. forward to any subscribers
head_block_sender.send(highest_work_block)?;
// TODO: do something with pending_tx_sender
} else {
warn!(?soft_limit_met, %consensus_block_hash, %old_head_hash, "no consensus head {}/{}/{}", num_synced_rpcs, num_connection_heads, total_rpcs)
warn!(?soft_limit_met, %consensus_block_hash, %old_head_hash, %rpc, "NO consensus head {}/{}/{}", num_synced_rpcs, num_connection_heads, total_rpcs)
}
Ok(())

@ -1,9 +1,7 @@
///! Rate-limited communication with a web3 provider.
use super::blockchain::BlockId;
use super::connections::BlockHashesMap;
use super::blockchain::{ArcBlock, BlockHashesMap, BlockId};
use super::provider::Web3Provider;
use super::request::OpenRequestHandle;
use super::request::OpenRequestResult;
use super::request::{OpenRequestHandle, OpenRequestResult};
use crate::app::{flatten_handle, AnyhowJoinHandle};
use crate::config::BlockAndRpc;
use anyhow::Context;
@ -69,13 +67,12 @@ impl Web3Connection {
) -> anyhow::Result<(Arc<Web3Connection>, AnyhowJoinHandle<()>)> {
let hard_limit = hard_limit.map(|(hard_rate_limit, redis_conection)| {
// TODO: allow configurable period and max_burst
let period = 1;
RedisRateLimit::new(
redis_conection,
"web3_proxy",
&format!("{}:{}", chain_id, url_str),
hard_rate_limit,
period,
60.0,
)
});
@ -273,34 +270,68 @@ impl Web3Connection {
#[instrument(skip_all)]
async fn send_head_block_result(
self: &Arc<Self>,
new_head_block: Result<Arc<Block<TxHash>>, ProviderError>,
new_head_block: Result<ArcBlock, ProviderError>,
block_sender: &flume::Sender<BlockAndRpc>,
block_map: BlockHashesMap,
) -> anyhow::Result<()> {
match new_head_block {
Ok(new_head_block) => {
Ok(mut new_head_block) => {
// TODO: is unwrap_or_default ok? we might have an empty block
let new_hash = new_head_block.hash.unwrap_or_default();
let mut td_is_needed = new_head_block.total_difficulty.is_none();
// if we already have this block saved, we don't need to store this copy
let new_head_block = match block_map.entry(new_hash) {
Entry::Occupied(x) => x.get().clone(),
// be careful with the entry api! awaits during this are a very bad idea.
new_head_block = match block_map.entry(new_hash) {
Entry::Vacant(x) => {
// TODO: remove this once https://github.com/ledgerwatch/erigon/issues/5190 is closed
// TODO: include transactions?
let new_head_block = if new_head_block.total_difficulty.is_none() {
self.wait_for_request_handle()
.await?
.request("eth_getBlockByHash", (new_hash, false))
.await?
// only save the block if it has a total difficulty!
if !td_is_needed {
x.insert(new_head_block).clone()
} else {
new_head_block
};
}
}
Entry::Occupied(x) => {
let existing_block = x.get().clone();
x.insert(new_head_block).clone()
// we only save blocks with a total difficulty
debug_assert!(existing_block.total_difficulty.is_some());
td_is_needed = false;
existing_block
}
};
if td_is_needed {
// self got the head block first. unfortunately its missing a necessary field
// keep this even after https://github.com/ledgerwatch/erigon/issues/5190 is closed.
// there are other clients and we might have to use a third party without the td fix.
trace!(rpc=?self, ?new_hash, "total_difficulty missing");
// todo: this can wait forever
let complete_head_block: Block<TxHash> = self
.wait_for_request_handle()
.await?
.request("eth_getBlockByHash", (new_hash, false))
.await?;
new_head_block = match block_map.entry(new_hash) {
Entry::Vacant(x) => {
// still vacant! self is still the leader
// now we definitely have total difficulty, so save
x.insert(Arc::new(complete_head_block)).clone()
}
Entry::Occupied(x) => {
let existing_block = x.get().clone();
// we only save blocks with a total difficulty
debug_assert!(existing_block.total_difficulty.is_some());
existing_block
}
};
}
let new_num = new_head_block.number.unwrap_or_default();
// save the block so we don't send the same one multiple times
@ -483,6 +514,7 @@ impl Web3Connection {
}
}
Web3Provider::Ws(provider) => {
// todo: move subscribe_blocks onto the request handle?
let active_request_handle = self.wait_for_request_handle().await;
let mut stream = provider.subscribe_blocks().await?;
drop(active_request_handle);
@ -490,7 +522,7 @@ impl Web3Connection {
// query the block once since the subscription doesn't send the current block
// there is a very small race condition here where the stream could send us a new block right now
// all it does is print "new block" for the same block as current block
let block: Result<Arc<Block<TxHash>>, _> = self
let block: Result<ArcBlock, _> = self
.wait_for_request_handle()
.await?
.request("eth_getBlockByNumber", ("latest", false))
@ -580,42 +612,48 @@ impl Web3Connection {
Ok(())
}
/// be careful with this; it will wait forever!
/// be careful with this; it might wait forever!
// TODO: maximum wait time?
#[instrument(skip_all)]
#[instrument]
pub async fn wait_for_request_handle(self: &Arc<Self>) -> anyhow::Result<OpenRequestHandle> {
// TODO: maximum wait time? i think timeouts in other parts of the code are probably best
loop {
match self.try_request_handle().await {
let x = self.try_request_handle().await;
trace!(?x, "try_request_handle");
match x {
Ok(OpenRequestResult::Handle(handle)) => return Ok(handle),
Ok(OpenRequestResult::RetryAt(retry_at)) => {
// TODO: emit a stat?
trace!(?retry_at);
sleep_until(retry_at).await;
}
Ok(OpenRequestResult::None) => {
Ok(OpenRequestResult::RetryNever) => {
// TODO: when can this happen? log? emit a stat?
// TODO: subscribe to the head block on this
// TODO: sleep how long? maybe just error?
sleep(Duration::from_secs(1)).await;
return Err(anyhow::anyhow!("unable to retry"));
}
Err(err) => return Err(err),
}
}
}
#[instrument]
pub async fn try_request_handle(self: &Arc<Self>) -> anyhow::Result<OpenRequestResult> {
// check that we are connected
if !self.has_provider().await {
// TODO: emit a stat?
return Ok(OpenRequestResult::None);
return Ok(OpenRequestResult::RetryNever);
}
// check rate limits
if let Some(ratelimiter) = self.hard_limit.as_ref() {
match ratelimiter.throttle().await {
Ok(ThrottleResult::Allowed) => {
// rate limit succeeded
trace!("rate limit succeeded")
}
Ok(ThrottleResult::RetryAt(retry_at)) => {
// rate limit failed

@ -1,9 +1,10 @@
///! Load balanced communication with a group of web3 providers
use super::blockchain::{ArcBlock, BlockHashesMap};
use super::connection::Web3Connection;
use super::request::{OpenRequestHandle, OpenRequestResult};
use super::synced_connections::SyncedConnections;
use crate::app::{flatten_handle, AnyhowJoinHandle};
use crate::config::{BlockAndRpc, Web3ConnectionConfig};
use crate::config::{BlockAndRpc, TxHashAndRpc, Web3ConnectionConfig};
use crate::jsonrpc::{JsonRpcForwardedResponse, JsonRpcRequest};
use crate::rpcs::transactions::TxStatus;
use arc_swap::ArcSwap;
@ -30,8 +31,6 @@ use tokio::time::{interval, sleep, sleep_until, MissedTickBehavior};
use tokio::time::{Duration, Instant};
use tracing::{error, info, instrument, trace, warn};
pub type BlockHashesMap = Arc<DashMap<H256, Arc<Block<TxHash>>>>;
/// A collection of web3 connections. Sends requests either the current best server or all servers.
#[derive(From)]
pub struct Web3Connections {
@ -59,7 +58,7 @@ impl Web3Connections {
http_client: Option<reqwest::Client>,
redis_client_pool: Option<redis_rate_limit::RedisPool>,
block_map: BlockHashesMap,
head_block_sender: Option<watch::Sender<Arc<Block<TxHash>>>>,
head_block_sender: Option<watch::Sender<ArcBlock>>,
min_sum_soft_limit: u32,
min_synced_rpcs: u32,
pending_tx_sender: Option<broadcast::Sender<TxStatus>>,
@ -207,9 +206,9 @@ impl Web3Connections {
/// transaction ids from all the `Web3Connection`s are deduplicated and forwarded to `pending_tx_sender`
async fn subscribe(
self: Arc<Self>,
pending_tx_id_receiver: flume::Receiver<(TxHash, Arc<Web3Connection>)>,
pending_tx_id_receiver: flume::Receiver<TxHashAndRpc>,
block_receiver: flume::Receiver<BlockAndRpc>,
head_block_sender: Option<watch::Sender<Arc<Block<TxHash>>>>,
head_block_sender: Option<watch::Sender<ArcBlock>>,
pending_tx_sender: Option<broadcast::Sender<TxStatus>>,
) -> anyhow::Result<()> {
let mut futures = vec![];
@ -407,7 +406,7 @@ impl Web3Connections {
Ok(OpenRequestResult::RetryAt(retry_at)) => {
earliest_retry_at = earliest_retry_at.min(Some(retry_at));
}
Ok(OpenRequestResult::None) => {
Ok(OpenRequestResult::RetryNever) => {
// TODO: log a warning?
}
Err(err) => {
@ -450,7 +449,7 @@ impl Web3Connections {
earliest_retry_at = earliest_retry_at.min(Some(retry_at));
}
Ok(OpenRequestResult::Handle(handle)) => selected_rpcs.push(handle),
Ok(OpenRequestResult::None) => {
Ok(OpenRequestResult::RetryNever) => {
warn!("no request handle for {}", connection)
}
Err(err) => {
@ -547,7 +546,7 @@ impl Web3Connections {
continue;
}
Ok(OpenRequestResult::None) => {
Ok(OpenRequestResult::RetryNever) => {
warn!(?self, "No server handles!");
// TODO: subscribe to something on synced connections. maybe it should just be a watch channel

@ -3,19 +3,21 @@ use super::provider::Web3Provider;
use std::fmt;
use std::sync::atomic;
use std::sync::Arc;
use tokio::time::Instant;
use tokio::time::{sleep, Duration, Instant};
use tracing::warn;
use tracing::{instrument, trace};
// TODO: rename this
#[derive(Debug)]
pub enum OpenRequestResult {
Handle(OpenRequestHandle),
/// Unable to start a request. Retry at the given time.
RetryAt(Instant),
/// Unable to start a request. Retrying will not succeed.
None,
RetryNever,
}
/// Make RPC requests through this handle and drop it when you are done.
#[derive(Debug)]
pub struct OpenRequestHandle(Arc<Web3Connection>);
impl OpenRequestHandle {
@ -47,16 +49,20 @@ impl OpenRequestHandle {
R: serde::Serialize + serde::de::DeserializeOwned + fmt::Debug,
{
// TODO: use tracing spans properly
// TODO: it would be nice to have the request id on this
// TODO: requests from customers have request ids, but we should add
// TODO: including params in this is way too verbose
trace!("Sending {} to {}", method, self.0);
let mut provider = None;
while provider.is_none() {
// TODO: if no provider, don't unwrap. wait until there is one.
match self.0.provider.read().await.as_ref() {
None => {}
None => {
warn!(rpc=?self.0, "no provider!");
// TODO: how should this work? a reconnect should be in progress. but maybe force one now?
// TODO: sleep how long? subscribe to something instead?
sleep(Duration::from_millis(100)).await
}
Some(found_provider) => provider = Some(found_provider.clone()),
}
}

@ -10,7 +10,7 @@ use std::sync::Arc;
/// Serialize is so we can print it on our debug endpoint
#[derive(Clone, Default, Serialize)]
pub struct SyncedConnections {
// TODO: store Arc<Block<TxHash>> instead?
// TODO: store ArcBlock instead?
pub(super) head_block_num: U64,
pub(super) head_block_hash: H256,
// TODO: this should be able to serialize, but it isn't