From 0e1cf5767c04eaa4cc9322a6c69506cff385ed28 Mon Sep 17 00:00:00 2001 From: Bryan Stitt Date: Sat, 12 Nov 2022 08:24:32 +0000 Subject: [PATCH] cut out tracing for now --- Cargo.lock | 317 +++++------------- README.md | 3 +- TODO.md | 6 +- deferred-rate-limiter/Cargo.toml | 2 +- deferred-rate-limiter/src/lib.rs | 15 +- docs/faster perf.txt | 8 + redis-rate-limiter/Cargo.toml | 1 - redis-rate-limiter/src/lib.rs | 5 - web3_proxy/Cargo.toml | 18 +- web3_proxy/examples/subscribe_blocks.rs | 5 - web3_proxy/examples/watch_blocks.rs | 6 - web3_proxy/src/app.rs | 40 +-- web3_proxy/src/app_stats.rs | 10 +- web3_proxy/src/bin/web3_proxy.rs | 46 ++- .../src/bin/web3_proxy_cli/check_config.rs | 2 +- .../src/bin/web3_proxy_cli/create_user.rs | 2 +- web3_proxy/src/bin/web3_proxy_cli/main.rs | 6 +- web3_proxy/src/block_number.rs | 8 +- web3_proxy/src/config.rs | 2 - web3_proxy/src/frontend/authorization.rs | 31 +- web3_proxy/src/frontend/errors.rs | 26 +- web3_proxy/src/frontend/mod.rs | 33 +- web3_proxy/src/frontend/rpc_proxy_http.rs | 25 +- web3_proxy/src/frontend/rpc_proxy_ws.rs | 40 +-- web3_proxy/src/frontend/status.rs | 7 +- web3_proxy/src/frontend/users.rs | 28 +- web3_proxy/src/jsonrpc.rs | 7 - web3_proxy/src/metrics_frontend.rs | 5 +- web3_proxy/src/rpcs/blockchain.rs | 76 +++-- web3_proxy/src/rpcs/connection.rs | 73 ++-- web3_proxy/src/rpcs/connections.rs | 36 +- web3_proxy/src/rpcs/provider.rs | 7 +- web3_proxy/src/rpcs/request.rs | 39 ++- web3_proxy/src/rpcs/transactions.rs | 24 +- web3_proxy/src/user_queries.rs | 9 +- 35 files changed, 356 insertions(+), 612 deletions(-) create mode 100644 docs/faster perf.txt diff --git a/Cargo.lock b/Cargo.lock index c2e7326e..0d69b884 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -263,7 +263,7 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b88d82667eca772c4aa12f0f1348b3ae643424c8876448f3f7bd5787032e234c" dependencies = [ - "autocfg 1.1.0", + "autocfg", ] [[package]] @@ -304,7 +304,7 @@ version = "0.7.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "41aed1da83ecdc799503b7cb94da1b45a34d72b49caf40a61d9cf5b88ec07cfd" dependencies = [ - "autocfg 1.1.0", + "autocfg", "derive_utils", "proc-macro2", "quote", @@ -335,15 +335,6 @@ dependencies = [ "syn", ] -[[package]] -name = "autocfg" -version = "0.1.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0dde43e75fd43e8a1bf86103336bc699aa8d17ad1be60c76c0bdfd4828e19b78" -dependencies = [ - "autocfg 1.1.0", -] - [[package]] name = "autocfg" version = "1.1.0" @@ -730,9 +721,9 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "chrono" -version = "0.4.22" +version = "0.4.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bfd4d1b31faaa3a89d7934dbded3111da0d2ef28e3ebccdb4f0179f5929d1ef1" +checksum = "16b0a3d9ed01224b22057780a37bb8c5dbfe1be8ba48678e7bf57ec4b385411f" dependencies = [ "iana-time-zone", "js-sys", @@ -808,15 +799,6 @@ dependencies = [ "os_str_bytes", ] -[[package]] -name = "cloudabi" -version = "0.0.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ddfc5b9aa5d4507acaf872de71051dfd0e309860e88966e1051e462a077aac4f" -dependencies = [ - "bitflags", -] - [[package]] name = "coins-bip32" version = "0.7.0" @@ -848,7 +830,7 @@ dependencies = [ "hex", "hmac", "pbkdf2 0.11.0", - "rand 0.8.5", + "rand", "sha2 0.10.6", "thiserror", ] @@ -1033,7 +1015,7 @@ version = "0.9.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "045ebe27666471bb549370b4b0b3e51b07f56325befa4284db65fc89c02511b1" dependencies = [ - "autocfg 1.1.0", + "autocfg", "cfg-if", "crossbeam-utils", "memoffset", @@ -1084,7 +1066,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9f2b443d17d49dad5ef0ede301c3179cc923b8822f3393b4d2c28c269dd4a122" dependencies = [ "generic-array 0.14.6", - "rand_core 0.6.3", + "rand_core", "subtle", "zeroize", ] @@ -1158,10 +1140,10 @@ version = "0.2.0" dependencies = [ "anyhow", "hashbrown 0.13.1", + "log", "moka", "redis-rate-limiter", "tokio", - "tracing", ] [[package]] @@ -1333,7 +1315,7 @@ dependencies = [ "generic-array 0.14.6", "group", "pkcs8 0.9.0", - "rand_core 0.6.3", + "rand_core", "sec1", "subtle", "zeroize", @@ -1372,6 +1354,19 @@ dependencies = [ "uuid 1.2.1", ] +[[package]] +name = "env_logger" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a12e6657c4c97ebab115a42dcee77225f7f482cdd841cf7088c657a42e9e00e7" +dependencies = [ + "atty", + "humantime", + "log", + "regex", + "termcolor", +] + [[package]] name = "error-chain" version = "0.12.4" @@ -1393,7 +1388,7 @@ dependencies = [ "hex", "hmac", "pbkdf2 0.11.0", - "rand 0.8.5", + "rand", "scrypt", "serde", "serde_json", @@ -1486,7 +1481,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "002a0d58a7d921b496f5f19b5b9508d01d25fbe25078286b1fcb6f4e7562acf7" dependencies = [ "ethers-contract-abigen", - "ethers-contract-derive", "ethers-core", "ethers-providers", "futures-util", @@ -1522,21 +1516,6 @@ dependencies = [ "walkdir", ] -[[package]] -name = "ethers-contract-derive" -version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "445276414690c97d88638d22dd5f89ba919d7dcea36de4825896d52280c704c7" -dependencies = [ - "ethers-contract-abigen", - "ethers-core", - "hex", - "proc-macro2", - "quote", - "serde_json", - "syn", -] - [[package]] name = "ethers-core" version = "1.0.0" @@ -1556,7 +1535,7 @@ dependencies = [ "once_cell", "open-fastrlp", "proc-macro2", - "rand 0.8.5", + "rand", "rlp", "rlp-derive", "rust_decimal", @@ -1662,7 +1641,7 @@ dependencies = [ "eth-keystore", "ethers-core", "hex", - "rand 0.8.5", + "rand", "sha2 0.10.6", "thiserror", ] @@ -1745,7 +1724,7 @@ version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df689201f395c6b90dfe87127685f8dbfc083a5e779e613575d8bd7314300c3e" dependencies = [ - "rand_core 0.6.3", + "rand_core", "subtle", ] @@ -1768,7 +1747,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cfcf0ed7fe52a17a03854ec54a9f76d6d84508d1c0e66bc1793301c73fc8493c" dependencies = [ "byteorder", - "rand 0.8.5", + "rand", "rustc-hex", "static_assertions", ] @@ -1852,12 +1831,6 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ca45aac12b6c561b6289bc68957cb1db3dccf870e1951d590202de5e24f1dd35" -[[package]] -name = "fuchsia-cprng" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a06f77d526c1a601b7c4cdd98f54b5eaabffc14d5f2f0296febdc7f357c6d3ba" - [[package]] name = "funty" version = "2.0.0" @@ -2057,7 +2030,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7391856def869c1c81063a03457c676fbcd419709c3dfb33d8d319de484b154d" dependencies = [ "ff", - "rand_core 0.6.3", + "rand_core", "subtle", ] @@ -2272,6 +2245,12 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421" +[[package]] +name = "humantime" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" + [[package]] name = "hyper" version = "0.14.19" @@ -2382,7 +2361,7 @@ version = "1.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "10a35a97730320ffe8e2d410b5d3b69279b98d2c14bdb8b70ea89ecf7888d41e" dependencies = [ - "autocfg 1.1.0", + "autocfg", "hashbrown 0.12.3", "serde", ] @@ -2593,7 +2572,7 @@ version = "0.4.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9f80bf5aacaf25cbfc8210d1cfb718f2bf3b11c4c54e5afe36c236853a8ec390" dependencies = [ - "autocfg 1.1.0", + "autocfg", "scopeguard", ] @@ -2648,7 +2627,7 @@ version = "0.6.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5aa361d4faea93603064a027415f07bd8e1d5c88c9fbf68bf56a285428fd79ce" dependencies = [ - "autocfg 1.1.0", + "autocfg", ] [[package]] @@ -2826,7 +2805,7 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f93ab6289c7b344a8a9f60f88d80aa20032336fe78da341afc91c8a2341fc75f" dependencies = [ - "autocfg 1.1.0", + "autocfg", "num-integer", "num-traits", ] @@ -2843,7 +2822,7 @@ dependencies = [ "num-integer", "num-iter", "num-traits", - "rand 0.8.5", + "rand", "smallvec", "zeroize", ] @@ -2863,7 +2842,7 @@ version = "0.1.45" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "225d3389fb3509a24c93f5c29eb6bde2586b98d9f016636dff58d7c6f7569cd9" dependencies = [ - "autocfg 1.1.0", + "autocfg", "num-traits", ] @@ -2873,7 +2852,7 @@ version = "0.1.43" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7d03e6c028c5dc5cac6e2dec0efda81fc887605bb3d884578bb6d6bf7514e252" dependencies = [ - "autocfg 1.1.0", + "autocfg", "num-integer", "num-traits", ] @@ -2884,7 +2863,7 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0638a1c9d0a3c0914158145bc76cff373a75a627e6ecbfb71cbe6f453a5a19b0" dependencies = [ - "autocfg 1.1.0", + "autocfg", "num-bigint", "num-integer", "num-traits", @@ -2896,7 +2875,7 @@ version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "578ede34cf02f8924ab9447f50c28075b4d3e5b269972345e7e0372b38c6cdcd" dependencies = [ - "autocfg 1.1.0", + "autocfg", "libm", ] @@ -3105,7 +3084,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1d791538a6dcc1e7cb7fe6f6b58aca40e7f79403c45b2bc274008b5e647af1d8" dependencies = [ "base64ct", - "rand_core 0.6.3", + "rand_core", "subtle", ] @@ -3116,7 +3095,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7676374caaee8a325c9e7a2ae557f216c5563a171d6997b0ef8a65af35147700" dependencies = [ "base64ct", - "rand_core 0.6.3", + "rand_core", "subtle", ] @@ -3253,7 +3232,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5d5285893bb5eb82e6aaf5d59ee909a06a16737a8970984dd7746ba9283498d6" dependencies = [ "phf_shared", - "rand 0.8.5", + "rand", ] [[package]] @@ -3486,25 +3465,6 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc33ff2d4973d518d823d61aa239014831e521c75da58e3df4840d3f47749d09" -[[package]] -name = "rand" -version = "0.6.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6d71dacdc3c88c1fde3885a3be3fbab9f35724e6ce99467f7d9c5026132184ca" -dependencies = [ - "autocfg 0.1.8", - "libc", - "rand_chacha 0.1.1", - "rand_core 0.4.2", - "rand_hc", - "rand_isaac", - "rand_jitter", - "rand_os", - "rand_pcg", - "rand_xorshift", - "winapi", -] - [[package]] name = "rand" version = "0.8.5" @@ -3512,18 +3472,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" dependencies = [ "libc", - "rand_chacha 0.3.1", - "rand_core 0.6.3", -] - -[[package]] -name = "rand_chacha" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "556d3a1ca6600bfcbab7c7c91ccb085ac7fbbcd70e008a98742e7847f4f7bcef" -dependencies = [ - "autocfg 0.1.8", - "rand_core 0.3.1", + "rand_chacha", + "rand_core", ] [[package]] @@ -3533,24 +3483,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" dependencies = [ "ppv-lite86", - "rand_core 0.6.3", + "rand_core", ] -[[package]] -name = "rand_core" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a6fdeb83b075e8266dcc8762c22776f6877a63111121f5f8c7411e5be7eed4b" -dependencies = [ - "rand_core 0.4.2", -] - -[[package]] -name = "rand_core" -version = "0.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c33a3c44ca05fa6f1807d8e6743f3824e8509beca625669633be0acbdf509dc" - [[package]] name = "rand_core" version = "0.6.3" @@ -3560,76 +3495,13 @@ dependencies = [ "getrandom", ] -[[package]] -name = "rand_hc" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b40677c7be09ae76218dc623efbf7b18e34bced3f38883af07bb75630a21bc4" -dependencies = [ - "rand_core 0.3.1", -] - -[[package]] -name = "rand_isaac" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ded997c9d5f13925be2a6fd7e66bf1872597f759fd9dd93513dd7e92e5a5ee08" -dependencies = [ - "rand_core 0.3.1", -] - -[[package]] -name = "rand_jitter" -version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1166d5c91dc97b88d1decc3285bb0a99ed84b05cfd0bc2341bdf2d43fc41e39b" -dependencies = [ - "libc", - "rand_core 0.4.2", - "winapi", -] - -[[package]] -name = "rand_os" -version = "0.1.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b75f676a1e053fc562eafbb47838d67c84801e38fc1ba459e8f180deabd5071" -dependencies = [ - "cloudabi", - "fuchsia-cprng", - "libc", - "rand_core 0.4.2", - "rdrand", - "wasm-bindgen", - "winapi", -] - -[[package]] -name = "rand_pcg" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "abf9b09b01790cfe0364f52bf32995ea3c39f4d2dd011eac241d2914146d0b44" -dependencies = [ - "autocfg 0.1.8", - "rand_core 0.4.2", -] - -[[package]] -name = "rand_xorshift" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cbf7e9e623549b0e21f6e97cf8ecf247c1a8fd2e8a992ae265314300b2455d5c" -dependencies = [ - "rand_core 0.3.1", -] - [[package]] name = "rand_xoshiro" version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6f97cdb2a36ed4183de61b2f824cc45c9f1037f28afe0a322e9fff4c108b5aaa" dependencies = [ - "rand_core 0.6.3", + "rand_core", ] [[package]] @@ -3638,7 +3510,7 @@ version = "1.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bd99e5772ead8baa5215278c9b15bf92087709e9c1b2d1f97cdb5a183c933a7d" dependencies = [ - "autocfg 1.1.0", + "autocfg", "crossbeam-deque", "either", "rayon-core", @@ -3656,15 +3528,6 @@ dependencies = [ "num_cpus", ] -[[package]] -name = "rdrand" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "678054eb77286b51581ba43620cc911abf02758c91f93f479767aed0f90458b2" -dependencies = [ - "rand_core 0.3.1", -] - [[package]] name = "redis" version = "0.22.1" @@ -3691,7 +3554,6 @@ dependencies = [ "anyhow", "deadpool-redis", "tokio", - "tracing", ] [[package]] @@ -3864,7 +3726,7 @@ dependencies = [ "num-traits", "pkcs1", "pkcs8 0.8.0", - "rand_core 0.6.3", + "rand_core", "smallvec", "subtle", "zeroize", @@ -4223,6 +4085,7 @@ dependencies = [ "sentry-backtrace", "sentry-contexts", "sentry-core", + "sentry-log", "sentry-panic", "tokio", "ureq", @@ -4273,12 +4136,22 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff58433a7ad557b586a09c42c4298d5f3ddb0c777e1a79d950e510d7b93fce0e" dependencies = [ "once_cell", - "rand 0.8.5", + "rand", "sentry-types", "serde", "serde_json", ] +[[package]] +name = "sentry-log" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8da361d15c09707d3bd8a2742132b8af54a697994dd396942556aef01d2432de" +dependencies = [ + "log", + "sentry-core", +] + [[package]] name = "sentry-panic" version = "0.28.0" @@ -4289,17 +4162,6 @@ dependencies = [ "sentry-core", ] -[[package]] -name = "sentry-tracing" -version = "0.28.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fc8d5bae1e1c06d96a966efc425bf1479a90464de99757d40601ce449f91fbed" -dependencies = [ - "sentry-core", - "tracing-core", - "tracing-subscriber", -] - [[package]] name = "sentry-types" version = "0.28.0" @@ -4479,7 +4341,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f054c6c1a6e95179d6f23ed974060dcefb2d9388bb7256900badad682c499de4" dependencies = [ "digest 0.10.5", - "rand_core 0.6.3", + "rand_core", ] [[package]] @@ -4498,7 +4360,7 @@ dependencies = [ "http", "iri-string", "k256", - "rand 0.8.5", + "rand", "sha3", "thiserror", "time 0.3.17", @@ -4525,7 +4387,7 @@ version = "0.4.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4614a76b2a8be0058caa9dbbaf66d988527d86d003c11a94fbd335d7661edcef" dependencies = [ - "autocfg 1.1.0", + "autocfg", ] [[package]] @@ -4668,7 +4530,7 @@ dependencies = [ "once_cell", "paste", "percent-encoding", - "rand 0.8.5", + "rand", "rsa", "rust_decimal", "rustls", @@ -4806,7 +4668,7 @@ dependencies = [ "indicatif", "itertools", "once_cell", - "rand 0.8.5", + "rand", "reqwest", "semver", "serde", @@ -4934,7 +4796,7 @@ dependencies = [ name = "thread-fast-rng" version = "0.1.0" dependencies = [ - "rand 0.8.5", + "rand", "rand_xoshiro", ] @@ -5025,7 +4887,7 @@ version = "1.21.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a9e03c497dc955702ba729190dc4aac6f2a0ce97f913e5b1b5912fc5039d9099" dependencies = [ - "autocfg 1.1.0", + "autocfg", "bytes", "libc", "memchr", @@ -5146,7 +5008,6 @@ dependencies = [ "tower", "tower-layer", "tower-service", - "tracing", ] [[package]] @@ -5155,18 +5016,6 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "343bc9466d3fe6b0f960ef45960509f84480bf4fd96f92901afe7ff3df9d3a62" -[[package]] -name = "tower-request-id" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "10b702a9ce17a8bd4c2f0e6acdcf75debaf14e4125cee76d4c5ac730f3ed9520" -dependencies = [ - "http", - "tower-layer", - "tower-service", - "ulid 0.4.1", -] - [[package]] name = "tower-service" version = "0.3.1" @@ -5237,7 +5086,6 @@ dependencies = [ "matchers", "nu-ansi-term", "once_cell", - "parking_lot 0.12.1", "regex", "sharded-slab", "smallvec", @@ -5271,7 +5119,7 @@ dependencies = [ "http", "httparse", "log", - "rand 0.8.5", + "rand", "rustls", "sha-1", "thiserror", @@ -5304,24 +5152,13 @@ dependencies = [ "static_assertions", ] -[[package]] -name = "ulid" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b7e95a59b292ca0cf9b45be2e52294d1ca6cb24eb11b08ef4376f73f1a00c549" -dependencies = [ - "chrono", - "lazy_static", - "rand 0.6.5", -] - [[package]] name = "ulid" version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "13a3aaa69b04e5b66cc27309710a569ea23593612387d67daaf102e73aa974fd" dependencies = [ - "rand 0.8.5", + "rand", "serde", ] @@ -5606,6 +5443,7 @@ dependencies = [ "derive_more", "dotenv", "entities", + "env_logger", "ethers", "fdlimit", "flume", @@ -5616,6 +5454,7 @@ dependencies = [ "http", "ipnet", "itertools", + "log", "metered", "migration", "moka", @@ -5631,7 +5470,6 @@ dependencies = [ "rustc-hash", "sea-orm", "sentry", - "sentry-tracing", "serde", "serde_json", "serde_prometheus", @@ -5643,10 +5481,7 @@ dependencies = [ "toml", "tower", "tower-http", - "tower-request-id", - "tracing", - "tracing-subscriber", - "ulid 1.0.0", + "ulid", "url", "uuid 1.2.1", ] diff --git a/README.md b/README.md index 9f2f961e..f8f13637 100644 --- a/README.md +++ b/README.md @@ -98,8 +98,9 @@ Flame graphs make a developer's join of finding slow code painless: 4 $ echo -1 | sudo tee /proc/sys/kernel/perf_event_paranoid -1 - $ CARGO_PROFILE_RELEASE_DEBUG=true cargo flamegraph --bin web3_proxy + $ CARGO_PROFILE_RELEASE_DEBUG=true cargo flamegraph --bin web3_proxy --no-inline +Be sure to use `--no-inline` or perf will be VERY slow ## GDB diff --git a/TODO.md b/TODO.md index a0b7167a..ba9d639d 100644 --- a/TODO.md +++ b/TODO.md @@ -236,8 +236,11 @@ These are roughly in order of completition - [-] add configurable size limits to all the Caches - instead of configuring each cache with MB sizes, have one value for total memory footprint and then percentages for each cache - [x] improve sorting servers by weight. don't force to lower weights, still have a probability that smaller weights might be +- [ ] flamegraphs show 52% of the time to be in tracing. figure out how to improve that - [ ] add block timestamp to the /status page - [ ] cache the status page for a second +- [ ] probably need to turn more sentry log integrations (like anyhow) on! +- [ ] tests should use `test-env-log = "0.2.8"` - [ ] actually block unauthenticated requests instead of emitting warning of "allowing without auth during development!" @@ -245,6 +248,7 @@ These are roughly in order of completition These are not yet ordered. There might be duplicates. We might not actually need all of these. +- [ ] flamegraphs show 25% of the time to be in moka-housekeeper. tune that - [ ] remove the "metered" crate now that we save aggregate queries? - [ ] remove/change the "active_requests" counter? maybe only once we have dynamic soft limits? - [ ] refactor so configs can change while running @@ -410,8 +414,6 @@ in another repo: event subscriber - [ ] test /api/getGaugesmethod - usually times out after vercel's 60 second timeout - one time got: Error invalid Json response "" -- [ ] i think all the async methods in ethers need tracing instrument. something like `cfgif(tracing, tracing::instrument)` - - if they do that, i think my request_id will show up on their logs - [ ] page that prints a graphviz dotfile of the blockchain - [ ] search for all the "TODO" and `todo!(...)` items in the code and move them here - [ ] add the backend server to the header? diff --git a/deferred-rate-limiter/Cargo.toml b/deferred-rate-limiter/Cargo.toml index 5b2e1ead..a178bd0b 100644 --- a/deferred-rate-limiter/Cargo.toml +++ b/deferred-rate-limiter/Cargo.toml @@ -9,6 +9,6 @@ redis-rate-limiter = { path = "../redis-rate-limiter" } anyhow = "1.0.66" hashbrown = "0.13.1" +log = "0.4.17" moka = { version = "0.9.6", default-features = false, features = ["future"] } tokio = "1.21.2" -tracing = "0.1.37" diff --git a/deferred-rate-limiter/src/lib.rs b/deferred-rate-limiter/src/lib.rs index 3d671585..276b759d 100644 --- a/deferred-rate-limiter/src/lib.rs +++ b/deferred-rate-limiter/src/lib.rs @@ -1,4 +1,5 @@ //#![warn(missing_docs)] +use log::error; use moka::future::Cache; use redis_rate_limiter::{RedisRateLimitResult, RedisRateLimiter}; use std::cmp::Eq; @@ -8,7 +9,6 @@ use std::sync::atomic::Ordering; use std::sync::{atomic::AtomicU64, Arc}; use tokio::sync::Mutex; use tokio::time::{Duration, Instant}; -use tracing::{error, info_span, Instrument}; /// A local cache that sits in front of a RedisRateLimiter /// Generic accross the key so it is simple to use with IPs or user keys @@ -118,7 +118,7 @@ where // if we get a redis error, just let the user through. // if users are sticky on a server, local caches will work well enough // though now that we do this, we need to reset rate limits every minute! cache must have ttl! - error!(?err, "unable to rate limit! creating empty cache"); + error!("unable to rate limit! creating empty cache. err={:?}", err); 0 } }; @@ -177,9 +177,9 @@ where Err(err) => { // don't let redis errors block our users! error!( - ?key, - ?err, - "unable to query rate limits, but local cache is available" + "unable to query rate limits, but local cache is available. key={:?} err={:?}", + key, + err, ); // TODO: we need to start a timer that resets this count every minute DeferredRateLimitResult::Allowed @@ -194,11 +194,8 @@ where // close to period. don't risk it. wait on redis Ok(rate_limit_f.await) } else { - // TODO: pass the frontend request id through - let span = info_span!("deferred rate limit"); - // rate limit has enough headroom that it should be safe to do this in the background - tokio::spawn(rate_limit_f.instrument(span)); + tokio::spawn(rate_limit_f); Ok(DeferredRateLimitResult::Allowed) } diff --git a/docs/faster perf.txt b/docs/faster perf.txt new file mode 100644 index 00000000..4a6f2073 --- /dev/null +++ b/docs/faster perf.txt @@ -0,0 +1,8 @@ +sudo apt install bison flex +wget https://eighty-twenty.org/files/0001-tools-perf-Use-long-running-addr2line-per-dso.patch +git clone https://github.com/torvalds/linux.git +cd linux +git checkout v5.15 +git apply ../0001-tools-perf-Use-long-running-addr2line-per-dso.patch +cd tools/perf +make prefix=$HOME/.local VERSION=5.15 install-bin diff --git a/redis-rate-limiter/Cargo.toml b/redis-rate-limiter/Cargo.toml index 48ecb4a4..4c92d638 100644 --- a/redis-rate-limiter/Cargo.toml +++ b/redis-rate-limiter/Cargo.toml @@ -7,5 +7,4 @@ edition = "2021" [dependencies] anyhow = "1.0.66" deadpool-redis = { version = "0.11.0", features = ["rt_tokio_1", "serde"] } -tracing = "0.1.37" tokio = "1.21.2" diff --git a/redis-rate-limiter/src/lib.rs b/redis-rate-limiter/src/lib.rs index b8c83e0c..ced90004 100644 --- a/redis-rate-limiter/src/lib.rs +++ b/redis-rate-limiter/src/lib.rs @@ -3,7 +3,6 @@ use anyhow::Context; use std::ops::Add; use std::time::{SystemTime, UNIX_EPOCH}; use tokio::time::{Duration, Instant}; -use tracing::{debug, trace}; pub use deadpool_redis::redis; pub use deadpool_redis::{ @@ -97,7 +96,6 @@ impl RedisRateLimiter { // TODO: at high concurency, this gives "connection reset by peer" errors. at least they are off the hot path // TODO: only set expire if this is a new key - trace!("redis incr+expire"); // TODO: automatic retry let x: Vec<_> = redis::pipe() .atomic() @@ -119,11 +117,8 @@ impl RedisRateLimiter { // TODO: this might actually be early if we are way over the count let retry_at = self.next_period(now); - debug!(%label, ?retry_at, "rate limited: {}/{}", new_count, max_per_period); - Ok(RedisRateLimitResult::RetryAt(retry_at, new_count)) } else { - trace!(%label, "NOT rate limited: {}/{}", new_count, max_per_period); Ok(RedisRateLimitResult::Allowed(new_count)) } } diff --git a/web3_proxy/Cargo.toml b/web3_proxy/Cargo.toml index 6b261324..d9a412e1 100644 --- a/web3_proxy/Cargo.toml +++ b/web3_proxy/Cargo.toml @@ -27,11 +27,12 @@ axum = { version = "0.5.17", features = ["headers", "serde_json", "tokio-tungste axum-client-ip = "0.2.0" axum-macros = "0.2.3" # TODO: import chrono from sea-orm so we always have the same version -chrono = "0.4.22" +chrono = "0.4.23" counter = "0.5.7" derive_more = "0.99.17" dotenv = "0.15.0" -ethers = { version = "1.0.0", features = ["rustls", "ws"] } +ethers = { version = "1.0.0", default-features = false, features = ["rustls", "ws"] } +env_logger = "0.9.3" fdlimit = "0.2.1" flume = "0.10.14" futures = { version = "0.3.25", features = ["thread-pool"] } @@ -39,6 +40,7 @@ hashbrown = { version = "0.13.1", features = ["serde"] } hdrhistogram = "7.5.2" http = "0.2.8" ipnet = "2.5.1" +log = "0.4.17" metered = { version = "0.9.0", features = ["serialize"] } moka = { version = "0.9.6", default-features = false, features = ["future"] } notify = "5.0.0" @@ -55,24 +57,18 @@ handlebars = "4.3.5" rustc-hash = "1.1.0" siwe = "0.5.0" sea-orm = { version = "0.10.2", features = ["macros"] } -sentry = { version = "0.28.0", default-features = false, features = ["backtrace", "contexts", "panic", "anyhow", "reqwest", "rustls"] } -sentry-tracing = "0.28.0" +sentry = { version = "0.28.0", default-features = false, features = ["backtrace", "contexts", "panic", "anyhow", "reqwest", "rustls", "log", "sentry-log"] } serde = { version = "1.0.147", features = [] } serde_json = { version = "1.0.87", default-features = false, features = ["alloc", "raw_value"] } serde_prometheus = "0.1.6" # TODO: make sure this time version matches siwe. PR to put this in their prelude time = "0.3.17" -tokio = { version = "1.21.2", features = ["full", "tracing"] } +tokio = { version = "1.21.2", features = ["full"] } # TODO: make sure this uuid version matches sea-orm. PR to put this in their prelude tokio-stream = { version = "0.1.11", features = ["sync"] } toml = "0.5.9" tower = "0.4.13" -# TODO: i don't think we need this. we can use it from tower-http instead. though this seems to use ulid and not uuid? -tower-request-id = "0.2.0" -tower-http = { version = "0.3.4", features = ["cors", "sensitive-headers", "trace"] } -tracing = "0.1.37" -# TODO: tracing-subscriber has serde and serde_json features that we might want to use -tracing-subscriber = { version = "0.3.16", features = ["env-filter", "parking_lot"] } +tower-http = { version = "0.3.4", features = ["cors", "sensitive-headers"] } ulid = { version = "1.0.0", features = ["serde"] } url = "2.3.1" uuid = "1.2.1" diff --git a/web3_proxy/examples/subscribe_blocks.rs b/web3_proxy/examples/subscribe_blocks.rs index a8c4375f..72345209 100644 --- a/web3_proxy/examples/subscribe_blocks.rs +++ b/web3_proxy/examples/subscribe_blocks.rs @@ -5,11 +5,6 @@ use std::time::Duration; #[tokio::main] async fn main() -> anyhow::Result<()> { // install global collector configured based on RUST_LOG env var. - tracing_subscriber::fmt() - .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) - .compact() - .init(); - fdlimit::raise_fd_limit(); // erigon diff --git a/web3_proxy/examples/watch_blocks.rs b/web3_proxy/examples/watch_blocks.rs index 1c4c8994..04595975 100644 --- a/web3_proxy/examples/watch_blocks.rs +++ b/web3_proxy/examples/watch_blocks.rs @@ -4,12 +4,6 @@ use std::{str::FromStr, time::Duration}; #[tokio::main] async fn main() -> anyhow::Result<()> { - // install global collector configured based on RUST_LOG env var. - tracing_subscriber::fmt() - .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) - .compact() - .init(); - fdlimit::raise_fd_limit(); // erigon does not support most filters diff --git a/web3_proxy/src/app.rs b/web3_proxy/src/app.rs index ac7f8c1f..5a34b228 100644 --- a/web3_proxy/src/app.rs +++ b/web3_proxy/src/app.rs @@ -25,6 +25,7 @@ use futures::stream::FuturesUnordered; use futures::stream::StreamExt; use hashbrown::HashMap; use ipnet::IpNet; +use log::{error, info, warn}; use metered::{metered, ErrorCount, HitCount, ResponseTime, Throughput}; use migration::{Migrator, MigratorTrait}; use moka::future::Cache; @@ -43,7 +44,6 @@ use tokio::sync::{broadcast, watch, Semaphore}; use tokio::task::JoinHandle; use tokio::time::timeout; use tokio_stream::wrappers::{BroadcastStream, WatchStream}; -use tracing::{error, info, instrument, trace, warn}; use ulid::Ulid; // TODO: make this customizable? @@ -124,7 +124,6 @@ pub struct Web3ProxyApp { /// flatten a JoinError into an anyhow error /// Useful when joining multiple futures. -#[instrument(skip_all)] pub async fn flatten_handle(handle: AnyhowJoinHandle) -> anyhow::Result { match handle.await { Ok(Ok(result)) => Ok(result), @@ -134,7 +133,7 @@ pub async fn flatten_handle(handle: AnyhowJoinHandle) -> anyhow::Result } /// return the first error or okay if everything worked -#[instrument(skip_all)] + pub async fn flatten_handles( mut handles: FuturesUnordered>, ) -> anyhow::Result<()> { @@ -150,7 +149,6 @@ pub async fn flatten_handles( } /// Connect to the database and run migrations -#[instrument(level = "trace")] pub async fn get_migrated_db( db_url: String, min_connections: u32, @@ -191,7 +189,6 @@ pub struct Web3ProxyAppSpawn { #[metered(registry = Web3ProxyAppMetrics, registry_expr = self.app_metrics, visibility = pub)] impl Web3ProxyApp { /// The main entrypoint. - #[instrument(level = "trace")] pub async fn spawn( top_config: TopConfig, num_workers: usize, @@ -272,8 +269,8 @@ impl Web3ProxyApp { // test the redis pool if let Err(err) = redis_pool.get().await { error!( - ?err, - "failed to connect to vredis. some features will be disabled" + "failed to connect to vredis. some features will be disabled. err={:?}", + err ); }; @@ -504,7 +501,6 @@ impl Web3ProxyApp { Ok((app, cancellable_handles, important_background_handles).into()) } - #[instrument(level = "trace")] pub fn prometheus_metrics(&self) -> String { let globals = HashMap::new(); // TODO: what globals? should this be the hostname or what? @@ -526,7 +522,6 @@ impl Web3ProxyApp { } #[measure([ErrorCount, HitCount, ResponseTime, Throughput])] - #[instrument(level = "trace")] pub async fn eth_subscribe<'a>( self: &'a Arc, authorization: Arc, @@ -550,7 +545,7 @@ impl Web3ProxyApp { Some(x) if x == json!(["newHeads"]) => { let head_block_receiver = self.head_block_receiver.clone(); - trace!(?subscription_id, "new heads subscription"); + // trace!("new heads subscription. id={:?}", subscription_id); tokio::spawn(async move { let mut head_block_receiver = Abortable::new( WatchStream::new(head_block_receiver), @@ -580,7 +575,7 @@ impl Web3ProxyApp { }; } - trace!(?subscription_id, "closed new heads subscription"); + // trace!("closed new heads subscription. id={:?}", subscription_id); }); } Some(x) if x == json!(["newPendingTransactions"]) => { @@ -591,7 +586,7 @@ impl Web3ProxyApp { subscription_registration, ); - trace!(?subscription_id, "pending transactions subscription"); + // // trace!(?subscription_id, "pending transactions subscription"); tokio::spawn(async move { while let Some(Ok(new_tx_state)) = pending_tx_receiver.next().await { let new_tx = match new_tx_state { @@ -619,7 +614,7 @@ impl Web3ProxyApp { }; } - trace!(?subscription_id, "closed new heads subscription"); + // // trace!(?subscription_id, "closed new heads subscription"); }); } Some(x) if x == json!(["newPendingFullTransactions"]) => { @@ -631,7 +626,7 @@ impl Web3ProxyApp { subscription_registration, ); - trace!(?subscription_id, "pending transactions subscription"); + // // trace!(?subscription_id, "pending transactions subscription"); // TODO: do something with this handle? tokio::spawn(async move { @@ -663,7 +658,7 @@ impl Web3ProxyApp { }; } - trace!(?subscription_id, "closed new heads subscription"); + // // trace!(?subscription_id, "closed new heads subscription"); }); } Some(x) if x == json!(["newPendingRawTransactions"]) => { @@ -675,7 +670,7 @@ impl Web3ProxyApp { subscription_registration, ); - trace!(?subscription_id, "pending transactions subscription"); + // // trace!(?subscription_id, "pending transactions subscription"); // TODO: do something with this handle? tokio::spawn(async move { @@ -707,7 +702,7 @@ impl Web3ProxyApp { }; } - trace!(?subscription_id, "closed new heads subscription"); + // // trace!(?subscription_id, "closed new heads subscription"); }); } _ => return Err(anyhow::anyhow!("unimplemented")), @@ -723,14 +718,13 @@ impl Web3ProxyApp { } /// send the request or batch of requests to the approriate RPCs - #[instrument(level = "trace")] pub async fn proxy_web3_rpc( self: &Arc, authorization: Arc, request: JsonRpcRequestEnum, ) -> anyhow::Result { // TODO: this should probably be trace level - trace!(?request, "proxy_web3_rpc"); + // // trace!(?request, "proxy_web3_rpc"); // even though we have timeouts on the requests to our backend providers, // we need a timeout for the incoming request so that retries don't run forever @@ -755,14 +749,13 @@ impl Web3ProxyApp { }; // TODO: this should probably be trace level - trace!(?response, "Forwarding"); + // // trace!(?response, "Forwarding"); Ok(response) } /// cut up the request and send to potentually different servers /// TODO: make sure this isn't a problem - #[instrument(level = "trace")] async fn proxy_web3_rpc_requests( self: &Arc, authorization: &Arc, @@ -792,12 +785,10 @@ impl Web3ProxyApp { } /// TODO: i don't think we want or need this. just use app.db_conn, or maybe app.db_conn.clone() or app.db_conn.as_ref() - #[instrument(level = "trace")] pub fn db_conn(&self) -> Option { self.db_conn.clone() } - #[instrument(level = "trace")] pub async fn redis_conn(&self) -> anyhow::Result { match self.vredis_pool.as_ref() { None => Err(anyhow::anyhow!("no redis server configured")), @@ -810,13 +801,12 @@ impl Web3ProxyApp { } #[measure([ErrorCount, HitCount, ResponseTime, Throughput])] - #[instrument(level = "trace")] async fn proxy_web3_rpc_request( self: &Arc, authorization: &Arc, mut request: JsonRpcRequest, ) -> anyhow::Result { - trace!("Received request: {:?}", request); + // trace!("Received request: {:?}", request); // TODO: allow customizing the period? let request_metadata = Arc::new(RequestMetadata::new(60, &request)?); diff --git a/web3_proxy/src/app_stats.rs b/web3_proxy/src/app_stats.rs index 42a20e55..303b5b59 100644 --- a/web3_proxy/src/app_stats.rs +++ b/web3_proxy/src/app_stats.rs @@ -6,6 +6,7 @@ use derive_more::From; use entities::rpc_accounting; use hashbrown::HashMap; use hdrhistogram::{Histogram, RecordError}; +use log::{error, info}; use sea_orm::{ActiveModelTrait, DatabaseConnection, DbErr}; use std::num::NonZeroU64; use std::sync::atomic::Ordering; @@ -14,7 +15,6 @@ use std::time::{Duration, SystemTime}; use tokio::sync::broadcast; use tokio::task::JoinHandle; use tokio::time::{interval_at, Instant}; -use tracing::{error, info}; /// TODO: where should this be defined? /// TODO: can we use something inside sea_orm instead? @@ -351,7 +351,7 @@ impl StatEmitter { if let Some(value) = response_aggregate_map.get_mut(&key) { if let Err(err) = value.add(stat) { - error!(?err, "unable to aggregate stats!"); + error!( "unable to aggregate stats! err={:?}", err); }; } else { unimplemented!(); @@ -364,7 +364,7 @@ impl StatEmitter { // TODO: batch these saves for (key, aggregate) in response_aggregate_map.drain() { if let Err(err) = aggregate.save(self.chain_id, &self.db_conn, key, period_timestamp).await { - error!(?err, "Unable to save stat while shutting down!"); + error!("Unable to save stat while shutting down! {:?}", err); }; } // advance to the next period @@ -377,7 +377,7 @@ impl StatEmitter { info!("aggregate stat_loop shutting down"); // TODO: call aggregate_stat for all the }, - Err(err) => error!(?err, "shutdown receiver"), + Err(err) => error!("shutdown receiver. err={:?}", err), } break; } @@ -391,7 +391,7 @@ impl StatEmitter { .save(self.chain_id, &self.db_conn, key, period_timestamp) .await { - error!(?err, "Unable to save stat while shutting down!"); + error!("Unable to save stat while shutting down! err={:?}", err); }; } diff --git a/web3_proxy/src/bin/web3_proxy.rs b/web3_proxy/src/bin/web3_proxy.rs index 625f1e98..69d3e786 100644 --- a/web3_proxy/src/bin/web3_proxy.rs +++ b/web3_proxy/src/bin/web3_proxy.rs @@ -9,6 +9,7 @@ #![forbid(unsafe_code)] use futures::StreamExt; +use log::{debug, info, warn, LevelFilter}; use parking_lot::deadlock; use std::fs; use std::sync::atomic::{self, AtomicUsize}; @@ -16,9 +17,6 @@ use std::thread; use tokio::runtime; use tokio::sync::broadcast; use tokio::time::Duration; -use tracing::{debug, info, warn}; -use tracing_subscriber::prelude::*; -use tracing_subscriber::EnvFilter; use web3_proxy::app::{flatten_handle, flatten_handles, Web3ProxyApp}; use web3_proxy::config::{CliConfig, TopConfig}; use web3_proxy::{frontend, metrics_frontend}; @@ -28,7 +26,8 @@ fn run( cli_config: CliConfig, top_config: TopConfig, ) -> anyhow::Result<()> { - debug!(?cli_config, ?top_config); + debug!("{:?}", cli_config); + debug!("{:?}", top_config); let mut shutdown_receiver = shutdown_sender.subscribe(); @@ -70,7 +69,7 @@ fn run( let rt = rt_builder.build()?; let num_workers = rt.metrics().num_workers(); - debug!(?num_workers); + debug!("num_workers: {}", num_workers); rt.block_on(async { let app_frontend_port = cli_config.port; @@ -134,7 +133,7 @@ fn run( // one of the handles stopped. send a value so the others know to shut down if let Err(err) = shutdown_sender.send(()) { - warn!(?err, "shutdown sender"); + warn!("shutdown sender err={:?}", err); }; // wait on all the important background tasks (like saving stats to the database) to complete @@ -175,6 +174,12 @@ fn main() -> anyhow::Result<()> { // TODO: this doesn't seem to do anything proctitle::set_title(format!("web3_proxy-{}", top_config.app.chain_id)); + let mut log_builder = env_logger::builder(); + + log_builder + .filter_level(LevelFilter::Off) + .parse_env("RUST_LOG"); + // connect to sentry for error reporting // if no sentry, only log to stdout let _sentry_guard = if let Some(sentry_url) = top_config.app.sentry_url.clone() { @@ -188,24 +193,16 @@ fn main() -> anyhow::Result<()> { }, )); - // TODO: how do we put the EnvFilter on this? - tracing_subscriber::registry() - .with( - tracing_subscriber::fmt::layer() - .compact() - .with_filter(EnvFilter::from_default_env()), - ) - .with(sentry_tracing::layer()) - .init(); + let logger = sentry::integrations::log::SentryLogger::with_dest(log_builder.build()); + + log::set_boxed_logger(Box::new(logger)).unwrap(); Some(guard) } else { // install global collector configured based on RUST_LOG env var. - // TODO: attach sentry here - tracing_subscriber::fmt() - .compact() - .with_env_filter(EnvFilter::from_default_env()) - .init(); + let logger = log_builder.build(); + + log::set_boxed_logger(Box::new(logger)).unwrap(); None }; @@ -243,13 +240,8 @@ mod tests { // TODO: how should we handle logs in this? // TODO: option for super verbose logs std::env::set_var("RUST_LOG", "info,web3_proxy=debug"); - // install global collector configured based on RUST_LOG env var. - // TODO: sentry is needed here! - tracing_subscriber::fmt() - .with_env_filter(EnvFilter::from_default_env()) - .compact() - .with_test_writer() - .init(); + + let _ = env_logger::builder().is_test(true).try_init(); let anvil = Anvil::new().spawn(); diff --git a/web3_proxy/src/bin/web3_proxy_cli/check_config.rs b/web3_proxy/src/bin/web3_proxy_cli/check_config.rs index 4491fdf6..fa31ba5b 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/check_config.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/check_config.rs @@ -1,6 +1,6 @@ use argh::FromArgs; +use log::{error, info, warn}; use std::fs; -use tracing::{error, info, warn}; use web3_proxy::config::TopConfig; #[derive(FromArgs, PartialEq, Eq, Debug)] diff --git a/web3_proxy/src/bin/web3_proxy_cli/create_user.rs b/web3_proxy/src/bin/web3_proxy_cli/create_user.rs index 13371b14..db14eac2 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/create_user.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/create_user.rs @@ -2,8 +2,8 @@ use anyhow::Context; use argh::FromArgs; use entities::{rpc_key, user}; use ethers::prelude::Address; +use log::info; use sea_orm::{ActiveModelTrait, TransactionTrait}; -use tracing::info; use ulid::Ulid; use uuid::Uuid; use web3_proxy::frontend::authorization::RpcSecretKey; diff --git a/web3_proxy/src/bin/web3_proxy_cli/main.rs b/web3_proxy/src/bin/web3_proxy_cli/main.rs index 4cdb3593..49d3a1b6 100644 --- a/web3_proxy/src/bin/web3_proxy_cli/main.rs +++ b/web3_proxy/src/bin/web3_proxy_cli/main.rs @@ -37,11 +37,7 @@ async fn main() -> anyhow::Result<()> { std::env::set_var("RUST_LOG", "info,web3_proxy=debug,web3_proxy_cli=debug"); } - // install global collector configured based on RUST_LOG env var. - tracing_subscriber::fmt() - .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) - .compact() - .init(); + env_logger::init(); // this probably won't matter for us in docker, but better safe than sorry fdlimit::raise_fd_limit(); diff --git a/web3_proxy/src/block_number.rs b/web3_proxy/src/block_number.rs index be854c79..932dfff5 100644 --- a/web3_proxy/src/block_number.rs +++ b/web3_proxy/src/block_number.rs @@ -4,8 +4,8 @@ use ethers::{ prelude::{BlockNumber, U64}, types::H256, }; +use log::{warn}; use std::sync::Arc; -use tracing::{instrument, warn}; use crate::{frontend::authorization::Authorization, rpcs::connections::Web3Connections}; @@ -39,7 +39,7 @@ pub fn block_num_to_u64(block_num: BlockNumber, latest_block: U64) -> U64 { } /// modify params to always have a block number and not "latest" -#[instrument(level = "trace")] + pub async fn clean_block_number( authorization: &Arc, params: &mut serde_json::Value, @@ -98,7 +98,7 @@ pub async fn clean_block_number( } // TODO: change this to also return the hash needed? -#[instrument(level = "trace")] + pub async fn block_needed( authorization: &Arc, method: &str, @@ -210,7 +210,7 @@ pub async fn block_needed( Ok(block) => Ok(Some(block)), Err(err) => { // TODO: seems unlikely that we will get here - warn!(?err, "could not get block from params"); + warn!("could not get block from params. err={:?}", err); Ok(None) } } diff --git a/web3_proxy/src/config.rs b/web3_proxy/src/config.rs index bca68f6d..6967236b 100644 --- a/web3_proxy/src/config.rs +++ b/web3_proxy/src/config.rs @@ -10,7 +10,6 @@ use sea_orm::DatabaseConnection; use serde::Deserialize; use std::sync::Arc; use tokio::sync::broadcast; -use tracing::instrument; pub type BlockAndRpc = (Option, Arc); pub type TxHashAndRpc = (TxHash, Arc); @@ -201,7 +200,6 @@ impl Web3ConnectionConfig { /// Create a Web3Connection from config /// TODO: move this into Web3Connection (just need to make things pub(crate)) #[allow(clippy::too_many_arguments)] - #[instrument(level = "trace", skip(redis_pool))] pub async fn spawn( self, name: String, diff --git a/web3_proxy/src/frontend/authorization.rs b/web3_proxy/src/frontend/authorization.rs index 0c5a0247..5da0c230 100644 --- a/web3_proxy/src/frontend/authorization.rs +++ b/web3_proxy/src/frontend/authorization.rs @@ -13,6 +13,7 @@ use entities::{rpc_key, user, user_tier}; use hashbrown::HashMap; use http::HeaderValue; use ipnet::IpNet; +use log::error; use redis_rate_limiter::redis::AsyncCommands; use redis_rate_limiter::RedisRateLimitResult; use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter}; @@ -21,7 +22,6 @@ use std::sync::atomic::{AtomicBool, AtomicU64}; use std::{net::IpAddr, str::FromStr, sync::Arc}; use tokio::sync::{OwnedSemaphorePermit, Semaphore}; use tokio::time::Instant; -use tracing::{error, instrument, trace}; use ulid::Ulid; use uuid::Uuid; @@ -348,7 +348,6 @@ pub async fn key_is_authorized( impl Web3ProxyApp { /// Limit the number of concurrent requests from the given ip address. - #[instrument(level = "trace")] pub async fn ip_semaphore(&self, ip: IpAddr) -> anyhow::Result> { if let Some(max_concurrent_requests) = self.config.public_max_concurrent_requests { let semaphore = self @@ -374,7 +373,6 @@ impl Web3ProxyApp { } /// Limit the number of concurrent requests from the given rpc key. - #[instrument(level = "trace")] pub async fn rpc_key_semaphore( &self, authorization_checks: &AuthorizationChecks, @@ -386,7 +384,7 @@ impl Web3ProxyApp { .rpc_key_semaphores .get_with(rpc_key_id, async move { let s = Semaphore::new(max_concurrent_requests as usize); - trace!("new semaphore for rpc_key_id {}", rpc_key_id); + // // trace!("new semaphore for rpc_key_id {}", rpc_key_id); Arc::new(s) }) .await; @@ -407,7 +405,6 @@ impl Web3ProxyApp { /// Verify that the given bearer token and address are allowed to take the specified action. /// This includes concurrent request limiting. - #[instrument(level = "trace")] pub async fn bearer_is_authorized( &self, bearer: Bearer, @@ -447,7 +444,6 @@ impl Web3ProxyApp { Ok((user, semaphore_permit)) } - #[instrument(level = "trace")] pub async fn rate_limit_login(&self, ip: IpAddr) -> anyhow::Result { // TODO: dry this up with rate_limit_by_rpc_key? @@ -474,19 +470,19 @@ impl Web3ProxyApp { // TODO: set headers so they know when they can retry // TODO: debug or trace? // this is too verbose, but a stat might be good - trace!(?ip, "login rate limit exceeded until {:?}", retry_at); + // // trace!(?ip, "login rate limit exceeded until {:?}", retry_at); Ok(RateLimitResult::RateLimited(authorization, Some(retry_at))) } Ok(RedisRateLimitResult::RetryNever) => { // TODO: i don't think we'll get here. maybe if we ban an IP forever? seems unlikely - trace!(?ip, "login rate limit is 0"); + // // trace!(?ip, "login rate limit is 0"); Ok(RateLimitResult::RateLimited(authorization, None)) } Err(err) => { // internal error, not rate limit being hit // TODO: i really want axum to do this for us in a single place. - error!(?err, "login rate limiter is unhappy. allowing ip"); + error!("login rate limiter is unhappy. allowing ip. err={:?}", err); Ok(RateLimitResult::Allowed(authorization, None)) } @@ -498,7 +494,6 @@ impl Web3ProxyApp { } /// origin is included because it can override the default rate limits - #[instrument(level = "trace")] pub async fn rate_limit_by_ip( &self, allowed_origin_requests_per_period: &HashMap, @@ -529,18 +524,18 @@ impl Web3ProxyApp { } Ok(DeferredRateLimitResult::RetryAt(retry_at)) => { // TODO: set headers so they know when they can retry - trace!(?ip, "rate limit exceeded until {:?}", retry_at); + // // trace!(?ip, "rate limit exceeded until {:?}", retry_at); Ok(RateLimitResult::RateLimited(authorization, Some(retry_at))) } Ok(DeferredRateLimitResult::RetryNever) => { // TODO: i don't think we'll get here. maybe if we ban an IP forever? seems unlikely - trace!(?ip, "rate limit is 0"); + // // trace!(?ip, "rate limit is 0"); Ok(RateLimitResult::RateLimited(authorization, None)) } Err(err) => { // this an internal error of some kind, not the rate limit being hit // TODO: i really want axum to do this for us in a single place. - error!(?err, "rate limiter is unhappy. allowing ip"); + error!("rate limiter is unhappy. allowing ip. err={:?}", err); // at least we can still check the semaphore let semaphore = self.ip_semaphore(ip).await?; @@ -558,7 +553,6 @@ impl Web3ProxyApp { } // check the local cache for user data, or query the database - #[instrument(level = "trace")] pub(crate) async fn authorization_checks( &self, rpc_secret_key: RpcSecretKey, @@ -566,7 +560,7 @@ impl Web3ProxyApp { let authorization_checks: Result<_, Arc> = self .rpc_secret_key_cache .try_get_with(rpc_secret_key.into(), async move { - trace!(?rpc_secret_key, "user cache miss"); + // // trace!(?rpc_secret_key, "user cache miss"); let db_conn = self.db_conn().context("Getting database connection")?; @@ -671,7 +665,6 @@ impl Web3ProxyApp { } /// Authorized the ip/origin/referer/useragent and rate limit and concurrency - #[instrument(level = "trace")] pub async fn rate_limit_by_rpc_key( &self, ip: IpAddr, @@ -722,19 +715,19 @@ impl Web3ProxyApp { // this is too verbose, but a stat might be good // TODO: keys are secrets! use the id instead // TODO: emit a stat - trace!(?rpc_key, "rate limit exceeded until {:?}", retry_at); + // // trace!(?rpc_key, "rate limit exceeded until {:?}", retry_at); Ok(RateLimitResult::RateLimited(authorization, Some(retry_at))) } Ok(DeferredRateLimitResult::RetryNever) => { // TODO: keys are secret. don't log them! - trace!(?rpc_key, "rate limit is 0"); + // // trace!(?rpc_key, "rate limit is 0"); // TODO: emit a stat Ok(RateLimitResult::RateLimited(authorization, None)) } Err(err) => { // internal error, not rate limit being hit // TODO: i really want axum to do this for us in a single place. - error!(?err, "rate limiter is unhappy. allowing ip"); + error!("rate limiter is unhappy. allowing ip. err={:?}", err); Ok(RateLimitResult::Allowed(authorization, semaphore)) } diff --git a/web3_proxy/src/frontend/errors.rs b/web3_proxy/src/frontend/errors.rs index e825a7ee..6b2fcb08 100644 --- a/web3_proxy/src/frontend/errors.rs +++ b/web3_proxy/src/frontend/errors.rs @@ -11,12 +11,12 @@ use axum::{ use derive_more::From; use http::header::InvalidHeaderValue; use ipnet::AddrParseError; +use log::warn; use redis_rate_limiter::redis::RedisError; use reqwest::header::ToStrError; use sea_orm::DbErr; use std::error::Error; use tokio::{task::JoinError, time::Instant}; -use tracing::{instrument, trace, warn}; // TODO: take "IntoResponse" instead of Response? pub type FrontendResult = Result; @@ -43,12 +43,11 @@ pub enum FrontendErrorResponse { } impl IntoResponse for FrontendErrorResponse { - #[instrument(level = "trace")] fn into_response(self) -> Response { // TODO: include the request id in these so that users can give us something that will point to logs let (status_code, response) = match self { Self::Anyhow(err) => { - warn!(?err, "anyhow"); + warn!("anyhow. err={:?}", err); ( StatusCode::INTERNAL_SERVER_ERROR, JsonRpcForwardedResponse::from_string( @@ -60,7 +59,7 @@ impl IntoResponse for FrontendErrorResponse { ) } Self::Box(err) => { - warn!(?err, "boxed"); + warn!("boxed err={:?}", err); ( StatusCode::INTERNAL_SERVER_ERROR, JsonRpcForwardedResponse::from_str( @@ -72,7 +71,7 @@ impl IntoResponse for FrontendErrorResponse { ) } Self::Database(err) => { - warn!(?err, "database"); + warn!("database err={:?}", err); ( StatusCode::INTERNAL_SERVER_ERROR, JsonRpcForwardedResponse::from_str( @@ -83,7 +82,7 @@ impl IntoResponse for FrontendErrorResponse { ) } Self::HeadersError(err) => { - warn!(?err, "HeadersError"); + warn!("HeadersError {:?}", err); ( StatusCode::BAD_REQUEST, JsonRpcForwardedResponse::from_str( @@ -94,7 +93,7 @@ impl IntoResponse for FrontendErrorResponse { ) } Self::IpAddrParse(err) => { - warn!(?err, "IpAddrParse"); + warn!("IpAddrParse err={:?}", err); ( StatusCode::BAD_REQUEST, JsonRpcForwardedResponse::from_str( @@ -105,7 +104,7 @@ impl IntoResponse for FrontendErrorResponse { ) } Self::InvalidHeaderValue(err) => { - warn!(?err, "InvalidHeaderValue"); + warn!("InvalidHeaderValue err={:?}", err); ( StatusCode::BAD_REQUEST, JsonRpcForwardedResponse::from_str( @@ -116,7 +115,7 @@ impl IntoResponse for FrontendErrorResponse { ) } Self::JoinError(err) => { - warn!(?err, "JoinError. likely shutting down"); + warn!("JoinError. likely shutting down. err={:?}", err); ( StatusCode::INTERNAL_SERVER_ERROR, JsonRpcForwardedResponse::from_str( @@ -171,7 +170,7 @@ impl IntoResponse for FrontendErrorResponse { ) } Self::Redis(err) => { - warn!(?err, "redis"); + warn!("redis err={:?}", err); ( StatusCode::INTERNAL_SERVER_ERROR, JsonRpcForwardedResponse::from_str( @@ -187,7 +186,7 @@ impl IntoResponse for FrontendErrorResponse { } Self::StatusCode(status_code, err_msg, err) => { // TODO: warn is way too loud. different status codes should get different error levels. 500s should warn. 400s should stat - trace!(?status_code, ?err_msg, ?err); + // trace!(?status_code, ?err_msg, ?err); ( status_code, JsonRpcForwardedResponse::from_str( @@ -198,7 +197,7 @@ impl IntoResponse for FrontendErrorResponse { ) } Self::HeaderToString(err) => { - trace!(?err, "HeaderToString"); + // // trace!(?err, "HeaderToString"); ( StatusCode::BAD_REQUEST, JsonRpcForwardedResponse::from_str( @@ -209,7 +208,7 @@ impl IntoResponse for FrontendErrorResponse { ) } Self::UlidDecodeError(err) => { - trace!(?err, "UlidDecodeError"); + // // trace!(?err, "UlidDecodeError"); ( StatusCode::BAD_REQUEST, JsonRpcForwardedResponse::from_str( @@ -234,7 +233,6 @@ impl IntoResponse for FrontendErrorResponse { } } -#[instrument(level = "trace")] pub async fn handler_404() -> Response { FrontendErrorResponse::NotFound.into_response() } diff --git a/web3_proxy/src/frontend/mod.rs b/web3_proxy/src/frontend/mod.rs index a636e1ba..da55808d 100644 --- a/web3_proxy/src/frontend/mod.rs +++ b/web3_proxy/src/frontend/mod.rs @@ -10,46 +10,21 @@ pub mod users; use crate::app::Web3ProxyApp; use axum::{ - body::Body, handler::Handler, routing::{get, post, put}, Extension, Router, }; use http::header::AUTHORIZATION; -use http::Request; +use log::info; use std::iter::once; use std::net::SocketAddr; use std::sync::Arc; use tower_http::cors::CorsLayer; use tower_http::sensitive_headers::SetSensitiveRequestHeadersLayer; -use tower_http::trace::TraceLayer; -use tower_request_id::{RequestId, RequestIdLayer}; -use tracing::{error_span, info, instrument}; /// Start the frontend server. -#[instrument(level = "trace")] -pub async fn serve(port: u16, proxy_app: Arc) -> anyhow::Result<()> { - // create a tracing span for each request with a random request id and the method - // GET: websocket or static pages - // POST: http rpc or login - let request_tracing_layer = - TraceLayer::new_for_http().make_span_with(|request: &Request| { - // We get the request id from the extensions - let request_id = request - .extensions() - .get::() - .map(ToString::to_string) - .unwrap_or_else(|| "unknown".into()); - // And then we put it along with other information into the `request` span - error_span!( - "http_request", - id = %request_id, - // TODO: do we want these? - method = %request.method(), - // uri = %request.uri(), - ) - }); +pub async fn serve(port: u16, proxy_app: Arc) -> anyhow::Result<()> { // build our axum Router let app = Router::new() // routes should be ordered most to least common @@ -91,12 +66,8 @@ pub async fn serve(port: u16, proxy_app: Arc) -> anyhow::Result<() // the last layer is first for requests and last for responses // Mark the `Authorization` request header as sensitive so it doesn't show in logs .layer(SetSensitiveRequestHeadersLayer::new(once(AUTHORIZATION))) - // add the request id to our tracing logs - .layer(request_tracing_layer) // handle cors .layer(CorsLayer::very_permissive()) - // create a unique id for each request - .layer(RequestIdLayer) // application state .layer(Extension(proxy_app.clone())) // 404 for any unknown routes diff --git a/web3_proxy/src/frontend/rpc_proxy_http.rs b/web3_proxy/src/frontend/rpc_proxy_http.rs index 77e10eb1..b6722332 100644 --- a/web3_proxy/src/frontend/rpc_proxy_http.rs +++ b/web3_proxy/src/frontend/rpc_proxy_http.rs @@ -10,38 +10,27 @@ use axum::{response::IntoResponse, Extension, Json}; use axum_client_ip::ClientIp; use axum_macros::debug_handler; use std::sync::Arc; -use tracing::{error_span, instrument, Instrument}; /// POST /rpc -- Public entrypoint for HTTP JSON-RPC requests. Web3 wallets use this. /// Defaults to rate limiting by IP address, but can also read the Authorization header for a bearer token. /// If possible, please use a WebSocket instead. #[debug_handler] -#[instrument(level = "trace")] + pub async fn proxy_web3_rpc( Extension(app): Extension>, ClientIp(ip): ClientIp, origin: Option>, Json(payload): Json, ) -> FrontendResult { - let request_span = error_span!("request", %ip); - // TODO: do we care about keeping the TypedHeader wrapper? let origin = origin.map(|x| x.0); - let (authorization, _semaphore) = ip_is_authorized(&app, ip, origin) - .instrument(request_span) - .await?; - - let request_span = error_span!("request", ?authorization); + let (authorization, _semaphore) = ip_is_authorized(&app, ip, origin).await?; let authorization = Arc::new(authorization); // TODO: spawn earlier? i think we want ip_is_authorized in this future - let f = tokio::spawn(async move { - app.proxy_web3_rpc(authorization, payload) - .instrument(request_span) - .await - }); + let f = tokio::spawn(async move { app.proxy_web3_rpc(authorization, payload).await }); let response = f.await.expect("joinhandle should always work")?; @@ -53,7 +42,7 @@ pub async fn proxy_web3_rpc( /// Can optionally authorized based on origin, referer, or user agent. /// If possible, please use a WebSocket instead. #[debug_handler] -#[instrument(level = "trace")] + pub async fn proxy_web3_rpc_with_key( Extension(app): Extension>, ClientIp(ip): ClientIp, @@ -65,8 +54,6 @@ pub async fn proxy_web3_rpc_with_key( ) -> FrontendResult { let rpc_key = rpc_key.parse()?; - let request_span = error_span!("request", %ip, ?referer, ?user_agent); - // keep the semaphore until the end of the response let (authorization, _semaphore) = key_is_authorized( &app, @@ -76,18 +63,14 @@ pub async fn proxy_web3_rpc_with_key( referer.map(|x| x.0), user_agent.map(|x| x.0), ) - .instrument(request_span.clone()) .await?; - let request_span = error_span!("request", ?authorization); - let authorization = Arc::new(authorization); // the request can take a while, so we spawn so that we can start serving another request // TODO: spawn even earlier? let f = tokio::spawn(async move { app.proxy_web3_rpc(authorization, payload) - .instrument(request_span) .await }); diff --git a/web3_proxy/src/frontend/rpc_proxy_ws.rs b/web3_proxy/src/frontend/rpc_proxy_ws.rs index bc89b343..496ed682 100644 --- a/web3_proxy/src/frontend/rpc_proxy_ws.rs +++ b/web3_proxy/src/frontend/rpc_proxy_ws.rs @@ -21,10 +21,10 @@ use futures::{ use handlebars::Handlebars; use hashbrown::HashMap; use http::StatusCode; +use log::{error, info}; use serde_json::{json, value::RawValue}; use std::sync::Arc; use std::{str::from_utf8_mut, sync::atomic::AtomicUsize}; -use tracing::{error, error_span, info, instrument, trace, Instrument}; use crate::{ app::Web3ProxyApp, @@ -34,7 +34,7 @@ use crate::{ /// Public entrypoint for WebSocket JSON-RPC requests. /// Defaults to rate limiting by IP address, but can also read the Authorization header for a bearer token. #[debug_handler] -#[instrument(level = "trace")] + pub async fn websocket_handler( Extension(app): Extension>, ClientIp(ip): ClientIp, @@ -42,23 +42,16 @@ pub async fn websocket_handler( ws_upgrade: Option, ) -> FrontendResult { // TODO: i don't like logging ips. move this to trace level? - let request_span = error_span!("request", %ip, ?origin); let origin = origin.map(|x| x.0); - let (authorization, _semaphore) = ip_is_authorized(&app, ip, origin) - .instrument(request_span) - .await?; - - let request_span = error_span!("request", ?authorization); + let (authorization, _semaphore) = ip_is_authorized(&app, ip, origin).await?; let authorization = Arc::new(authorization); match ws_upgrade { Some(ws) => Ok(ws - .on_upgrade(|socket| { - proxy_web3_socket(app, authorization, socket).instrument(request_span) - }) + .on_upgrade(|socket| proxy_web3_socket(app, authorization, socket)) .into_response()), None => { if let Some(redirect) = &app.config.redirect_public_url { @@ -79,7 +72,7 @@ pub async fn websocket_handler( /// Rate limit and billing based on the api key in the url. /// Can optionally authorized based on origin, referer, or user agent. #[debug_handler] -#[instrument(level = "trace")] + pub async fn websocket_handler_with_key( Extension(app): Extension>, ClientIp(ip): ClientIp, @@ -91,8 +84,6 @@ pub async fn websocket_handler_with_key( ) -> FrontendResult { let rpc_key = rpc_key.parse()?; - let request_span = error_span!("request", %ip, ?referer, ?user_agent); - let (authorization, _semaphore) = key_is_authorized( &app, rpc_key, @@ -101,18 +92,14 @@ pub async fn websocket_handler_with_key( referer.map(|x| x.0), user_agent.map(|x| x.0), ) - .instrument(request_span.clone()) .await?; - // TODO: type that wraps Address and have it censor? would protect us from accidently logging addresses or other user info - let request_span = error_span!("request", ?authorization); - let authorization = Arc::new(authorization); match ws_upgrade { - Some(ws_upgrade) => Ok(ws_upgrade.on_upgrade(move |socket| { - proxy_web3_socket(app, authorization, socket).instrument(request_span) - })), + Some(ws_upgrade) => { + Ok(ws_upgrade.on_upgrade(move |socket| proxy_web3_socket(app, authorization, socket))) + } None => { // if no websocket upgrade, this is probably a user loading the url with their browser match ( @@ -156,7 +143,6 @@ pub async fn websocket_handler_with_key( } } -#[instrument(level = "trace")] async fn proxy_web3_socket( app: Arc, authorization: Arc, @@ -173,7 +159,7 @@ async fn proxy_web3_socket( } /// websockets support a few more methods than http clients -#[instrument(level = "trace")] + async fn handle_socket_payload( app: Arc, authorization: &Arc, @@ -193,7 +179,6 @@ async fn handle_socket_payload( { "eth_subscribe" => { // TODO: what should go in this span? - let span = error_span!("eth_subscribe"); let response = app .eth_subscribe( @@ -202,7 +187,6 @@ async fn handle_socket_payload( subscription_count, response_sender.clone(), ) - .instrument(span) .await; match response { @@ -269,7 +253,6 @@ async fn handle_socket_payload( Message::Text(response_str) } -#[instrument(level = "trace")] async fn read_web3_socket( app: Arc, authorization: Arc, @@ -295,7 +278,7 @@ async fn read_web3_socket( } Message::Ping(x) => Message::Pong(x), Message::Pong(x) => { - trace!("pong: {:?}", x); + // // trace!("pong: {:?}", x); continue; } Message::Close(_) => { @@ -328,7 +311,6 @@ async fn read_web3_socket( } } -#[instrument(level = "trace")] async fn write_web3_socket( response_rx: flume::Receiver, mut ws_tx: SplitSink, @@ -343,7 +325,7 @@ async fn write_web3_socket( // forward the response to through the websocket if let Err(err) = ws_tx.send(msg).await { // this isn't a problem. this is common and happens whenever a client disconnects - trace!(?err, "unable to write to websocket"); + // trace!(?err, "unable to write to websocket"); break; }; } diff --git a/web3_proxy/src/frontend/status.rs b/web3_proxy/src/frontend/status.rs index f1d4d046..5d1fb2e3 100644 --- a/web3_proxy/src/frontend/status.rs +++ b/web3_proxy/src/frontend/status.rs @@ -9,11 +9,10 @@ use axum_macros::debug_handler; use moka::future::ConcurrentCacheExt; use serde_json::json; use std::sync::Arc; -use tracing::instrument; /// Health check page for load balancers to use. #[debug_handler] -#[instrument(level = "trace")] + pub async fn health(Extension(app): Extension>) -> impl IntoResponse { // TODO: also check that the head block is not too old if app.balanced_rpcs.synced() { @@ -27,7 +26,7 @@ pub async fn health(Extension(app): Extension>) -> impl IntoRe /// /// TODO: when done debugging, remove this and only allow access on a different port #[debug_handler] -#[instrument(level = "trace")] + pub async fn prometheus(Extension(app): Extension>) -> impl IntoResponse { app.prometheus_metrics() } @@ -36,7 +35,7 @@ pub async fn prometheus(Extension(app): Extension>) -> impl In /// /// TODO: replace this with proper stats and monitoring #[debug_handler] -#[instrument(level = "trace")] + pub async fn status(Extension(app): Extension>) -> impl IntoResponse { app.pending_transactions.sync(); app.rpc_secret_key_cache.sync(); diff --git a/web3_proxy/src/frontend/users.rs b/web3_proxy/src/frontend/users.rs index e989f602..9ac78824 100644 --- a/web3_proxy/src/frontend/users.rs +++ b/web3_proxy/src/frontend/users.rs @@ -24,6 +24,7 @@ use hashbrown::HashMap; use http::{HeaderValue, StatusCode}; use ipnet::IpNet; use itertools::Itertools; +use log::warn; use redis_rate_limiter::redis::AsyncCommands; use sea_orm::{ ActiveModelTrait, ColumnTrait, EntityTrait, PaginatorTrait, QueryFilter, QueryOrder, @@ -36,7 +37,6 @@ use std::ops::Add; use std::str::FromStr; use std::sync::Arc; use time::{Duration, OffsetDateTime}; -use tracing::{instrument, warn}; use ulid::Ulid; /// `GET /user/login/:user_address` or `GET /user/login/:user_address/:message_eip` -- Start the "Sign In with Ethereum" (siwe) login flow. @@ -57,7 +57,7 @@ use ulid::Ulid; /// It is a better UX to just click "login with ethereum" and have the account created if it doesn't exist. /// We can prompt for an email and and payment after they log in. #[debug_handler] -#[instrument(level = "trace")] + pub async fn user_login_get( Extension(app): Extension>, ClientIp(ip): ClientIp, @@ -154,7 +154,7 @@ pub struct PostLogin { /// It is recommended to save the returned bearer token in a cookie. /// The bearer token can be used to authenticate other requests, such as getting the user's stats or modifying the user's profile. #[debug_handler] -#[instrument(level = "trace")] + pub async fn user_login_post( Extension(app): Extension>, ClientIp(ip): ClientIp, @@ -333,7 +333,7 @@ pub async fn user_login_post( /// `POST /user/logout` - Forget the bearer token in the `Authentication` header. #[debug_handler] -#[instrument(level = "trace")] + pub async fn user_logout_post( Extension(app): Extension>, TypedHeader(Authorization(bearer)): TypedHeader>, @@ -355,7 +355,7 @@ pub async fn user_logout_post( /// /// TODO: this will change as we add better support for secondary users. #[debug_handler] -#[instrument(level = "trace")] + pub async fn user_get( Extension(app): Extension>, TypedHeader(Authorization(bearer_token)): TypedHeader>, @@ -373,7 +373,7 @@ pub struct UserPost { /// `POST /user` -- modify the account connected to the bearer token in the `Authentication` header. #[debug_handler] -#[instrument(level = "trace")] + pub async fn user_post( Extension(app): Extension>, TypedHeader(Authorization(bearer_token)): TypedHeader>, @@ -420,7 +420,7 @@ pub async fn user_post( /// TODO: one key per request? maybe /user/balance/:rpc_key? /// TODO: this will change as we add better support for secondary users. #[debug_handler] -#[instrument(level = "trace")] + pub async fn user_balance_get( Extension(app): Extension>, TypedHeader(Authorization(bearer)): TypedHeader>, @@ -438,7 +438,7 @@ pub async fn user_balance_get( /// TODO: one key per request? maybe /user/balance/:rpc_key? /// TODO: this will change as we add better support for secondary users. #[debug_handler] -#[instrument(level = "trace")] + pub async fn user_balance_post( Extension(app): Extension>, TypedHeader(Authorization(bearer)): TypedHeader>, @@ -452,7 +452,7 @@ pub async fn user_balance_post( /// /// TODO: one key per request? maybe /user/keys/:rpc_key? #[debug_handler] -#[instrument(level = "trace")] + pub async fn rpc_keys_get( Extension(app): Extension>, TypedHeader(Authorization(bearer)): TypedHeader>, @@ -481,7 +481,7 @@ pub async fn rpc_keys_get( /// `DELETE /user/keys` -- Use a bearer token to delete an existing key. #[debug_handler] -#[instrument(level = "trace")] + pub async fn rpc_keys_delete( Extension(app): Extension>, TypedHeader(Authorization(bearer)): TypedHeader>, @@ -511,7 +511,7 @@ pub struct UserKeyManagement { /// `POST /user/keys` or `PUT /user/keys` -- Use a bearer token to create or update an existing key. #[debug_handler] -#[instrument(level = "trace")] + pub async fn rpc_keys_management( Extension(app): Extension>, TypedHeader(Authorization(bearer)): TypedHeader>, @@ -677,7 +677,7 @@ pub async fn rpc_keys_management( /// `GET /user/revert_logs` -- Use a bearer token to get the user's revert logs. #[debug_handler] -#[instrument(level = "trace")] + pub async fn user_revert_logs_get( Extension(app): Extension>, TypedHeader(Authorization(bearer)): TypedHeader>, @@ -733,7 +733,7 @@ pub async fn user_revert_logs_get( /// `GET /user/stats/aggregate` -- Public endpoint for aggregate stats such as bandwidth used and methods requested. #[debug_handler] -#[instrument(level = "trace")] + pub async fn user_stats_aggregate_get( Extension(app): Extension>, bearer: Option>>, @@ -754,7 +754,7 @@ pub async fn user_stats_aggregate_get( /// /// TODO: this will change as we add better support for secondary users. #[debug_handler] -#[instrument(level = "trace")] + pub async fn user_stats_detailed_get( Extension(app): Extension>, bearer: Option>>, diff --git a/web3_proxy/src/jsonrpc.rs b/web3_proxy/src/jsonrpc.rs index 8a9feed4..5cd2706a 100644 --- a/web3_proxy/src/jsonrpc.rs +++ b/web3_proxy/src/jsonrpc.rs @@ -4,7 +4,6 @@ use serde::de::{self, Deserializer, MapAccess, SeqAccess, Visitor}; use serde::{Deserialize, Serialize}; use serde_json::value::RawValue; use std::fmt; -use tracing::instrument; // this is used by serde #[allow(dead_code)] @@ -194,12 +193,10 @@ impl JsonRpcForwardedResponse { Self::from_string(message, code, id) } - #[instrument(level = "trace")] pub fn from_str(message: &str, code: Option, id: Option>) -> Self { Self::from_string(message.to_string(), code, id) } - #[instrument(level = "trace")] pub fn from_string(message: String, code: Option, id: Option>) -> Self { // TODO: this is too verbose. plenty of errors are valid, like users giving an invalid address. no need to log that // TODO: can we somehow get the initial request here? if we put that into a tracing span, will things slow down a ton? @@ -217,7 +214,6 @@ impl JsonRpcForwardedResponse { } } - #[instrument(level = "trace")] pub fn from_response(partial_response: Box, id: Box) -> Self { JsonRpcForwardedResponse { jsonrpc: "2.0".to_string(), @@ -228,7 +224,6 @@ impl JsonRpcForwardedResponse { } } - #[instrument(level = "trace")] pub fn from_value(partial_response: serde_json::Value, id: Box) -> Self { let partial_response = serde_json::to_string(&partial_response).expect("this should always work"); @@ -244,7 +239,6 @@ impl JsonRpcForwardedResponse { } } - #[instrument(level = "trace")] pub fn from_ethers_error(e: ProviderError, id: Box) -> anyhow::Result { // TODO: move turning ClientError into json to a helper function? let code; @@ -304,7 +298,6 @@ impl JsonRpcForwardedResponse { }) } - #[instrument(level = "trace")] pub fn try_from_response_result( result: Result, ProviderError>, id: Box, diff --git a/web3_proxy/src/metrics_frontend.rs b/web3_proxy/src/metrics_frontend.rs index 823d8442..48b47553 100644 --- a/web3_proxy/src/metrics_frontend.rs +++ b/web3_proxy/src/metrics_frontend.rs @@ -2,14 +2,14 @@ use axum::headers::HeaderName; use axum::http::HeaderValue; use axum::response::{IntoResponse, Response}; use axum::{routing::get, Extension, Router}; +use log::info; use std::net::SocketAddr; use std::sync::Arc; -use tracing::{info, instrument}; use crate::app::Web3ProxyApp; /// Run a prometheus metrics server on the given port. -#[instrument(level = "trace")] + pub async fn serve(app: Arc, port: u16) -> anyhow::Result<()> { // build our application with a route // order most to least common @@ -42,7 +42,6 @@ pub async fn serve(app: Arc, port: u16) -> anyhow::Result<()> { .map_err(Into::into) } -#[instrument(level = "trace")] async fn root(Extension(app): Extension>) -> Response { let serialized = app.prometheus_metrics(); diff --git a/web3_proxy/src/rpcs/blockchain.rs b/web3_proxy/src/rpcs/blockchain.rs index 6ab91c2d..267174fd 100644 --- a/web3_proxy/src/rpcs/blockchain.rs +++ b/web3_proxy/src/rpcs/blockchain.rs @@ -10,13 +10,13 @@ use anyhow::Context; use derive_more::From; use ethers::prelude::{Block, TxHash, H256, U64}; use hashbrown::{HashMap, HashSet}; +use log::{debug, warn, Level}; use moka::future::Cache; use serde::Serialize; use serde_json::json; use std::{cmp::Ordering, fmt::Display, sync::Arc}; use tokio::sync::{broadcast, watch}; use tokio::time::Duration; -use tracing::{debug, instrument, trace, warn, Level}; // TODO: type for Hydrated Blocks with their full transactions? pub type ArcBlock = Arc>; @@ -38,7 +38,7 @@ impl Display for BlockId { impl Web3Connections { /// add a block to our map and it's hash to our graphmap of the blockchain - #[instrument] + pub async fn save_block(&self, block: &ArcBlock, heaviest_chain: bool) -> anyhow::Result<()> { // TODO: i think we can rearrange this function to make it faster on the hot path let block_hash = block.hash.as_ref().context("no block hash")?; @@ -62,11 +62,11 @@ impl Web3Connections { } if blockchain.contains_node(*block_hash) { - trace!(%block_hash, %block_num, "block already saved"); + // // trace!(%block_hash, %block_num, "block already saved"); return Ok(()); } - trace!(%block_hash, %block_num, "saving new block"); + // // trace!(%block_hash, %block_num, "saving new block"); self.block_hashes .insert(*block_hash, block.to_owned()) @@ -85,7 +85,6 @@ impl Web3Connections { /// Get a block from caches with fallback. /// Will query a specific node or the best available. - #[instrument(level = "trace")] pub async fn block( &self, authorization: &Arc, @@ -108,7 +107,7 @@ impl Web3Connections { .request( "eth_getBlockByHash", &json!(get_block_params), - Level::ERROR.into(), + Level::Error.into(), ) .await? } @@ -237,7 +236,7 @@ impl Web3Connections { ) .await { - warn!(rpc=%rpc_name, ?err, "unable to process block from rpc"); + warn!("unable to process block from rpc {}: {:?}", rpc_name, err); } } @@ -267,7 +266,7 @@ impl Web3Connections { if rpc_head_num.is_zero() { // TODO: i don't think we can get to this anymore now that we use Options - debug!(%rpc, "still syncing"); + debug!("{} still syncing", rpc); connection_heads.remove(&rpc.name); @@ -286,7 +285,7 @@ impl Web3Connections { } None => { // TODO: warn is too verbose. this is expected if a node disconnects and has to reconnect - trace!(%rpc, "Block without number or hash!"); + // // trace!(%rpc, "Block without number or hash!"); connection_heads.remove(&rpc.name); @@ -311,7 +310,7 @@ impl Web3Connections { // TODO: why does this happen?!?! seems to only happen with uncled blocks // TODO: maybe we should do get_with? // TODO: maybe we should just continue. this only seems to happen when an older block is received - warn!(%connection_head_hash, %conn_name, %rpc, "Missing connection_head_block in block_hashes. Fetching now"); + warn!("Missing connection_head_block in block_hashes. Fetching now. hash={}. other={}. rpc={}", connection_head_hash, conn_name, rpc); // this option should always be populated let conn_rpc = self.conns.get(conn_name); @@ -322,7 +321,7 @@ impl Web3Connections { { Ok(block) => block, Err(err) => { - warn!(%connection_head_hash, %conn_name, %rpc, ?err, "Failed fetching connection_head_block for block_hashes"); + warn!("Processing {}. Failed fetching connection_head_block for block_hashes. {} head hash={}. err={:?}", rpc, conn_name, connection_head_hash, err); continue; } } @@ -393,9 +392,9 @@ impl Web3Connections { // not enough rpcs yet. check the parent if let Some(parent_block) = self.block_hashes.get(&maybe_head_block.parent_hash) { - trace!( - child=%maybe_head_hash, parent=%parent_block.hash.unwrap(), "avoiding thundering herd", - ); + // // trace!( + // child=%maybe_head_hash, parent=%parent_block.hash.unwrap(), "avoiding thundering herd", + // ); maybe_head_block = parent_block; continue; @@ -428,9 +427,12 @@ impl Web3Connections { .swap(Arc::new(empty_synced_connections)); // TODO: log different things depending on old_synced_connections - warn!(%rpc, "no consensus head! {}/{}/{}", 0, num_connection_heads, total_conns); + warn!( + "Processing {}. no consensus head! {}/{}/{}", + rpc, 0, num_connection_heads, total_conns + ); } else { - trace!(?highest_rpcs); + // // trace!(?highest_rpcs); // TODO: if maybe_head_block.time() is old, ignore it @@ -470,7 +472,14 @@ impl Web3Connections { // TODO: if the rpc_head_block != consensus_head_block, log something? match &old_synced_connections.head_block_id { None => { - debug!(block=%consensus_head_block_id, %rpc, "first {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns); + debug!( + "first {}/{}/{} block={}, rpc={}", + num_consensus_rpcs, + num_connection_heads, + total_conns, + consensus_head_block_id, + rpc + ); self.save_block(&consensus_head_block, true).await?; @@ -491,10 +500,27 @@ impl Web3Connections { // multiple blocks with the same fork! if consensus_head_block_id.hash == old_block_id.hash { // no change in hash. no need to use head_block_sender - debug!(con_head=%consensus_head_block_id, rpc_head=%rpc_head_str, %rpc, "con {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns) + debug!( + "con {}/{}/{}. con head={}. rpc={}. rpc head={}", + num_consensus_rpcs, + num_connection_heads, + total_conns, + consensus_head_block_id, + rpc, + rpc_head_str + ) } else { // hash changed - debug!(con_head=%consensus_head_block_id, old=%old_block_id, rpc_head=%rpc_head_str, %rpc, "unc {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns); + debug!( + "unc {}/{}/{} con_head={}. old={}. rpc_head={}. rpc={}", + num_consensus_rpcs, + num_connection_heads, + total_conns, + consensus_head_block_id, + old_block_id, + rpc_head_str, + rpc + ); self.save_block(&consensus_head_block, true) .await @@ -508,7 +534,7 @@ impl Web3Connections { Ordering::Less => { // this is unlikely but possible // TODO: better log - warn!(con_head=%consensus_head_block_id, old_head=%old_block_id, rpc_head=%rpc_head_str, %rpc, "chain rolled back {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns); + warn!("chain rolled back {}/{}/{}. con_head={} old_head={}. rpc_head={}. rpc={}", num_consensus_rpcs, num_connection_heads, total_conns, consensus_head_block_id, old_block_id, rpc_head_str, rpc); // TODO: tell save_block to remove any higher block numbers from the cache. not needed because we have other checks on requested blocks being > head, but still seems slike a good idea self.save_block(&consensus_head_block, true).await.context( @@ -520,7 +546,15 @@ impl Web3Connections { .context("head_block_sender sending consensus_head_block")?; } Ordering::Greater => { - debug!(con_head=%consensus_head_block_id, rpc_head=%rpc_head_str, %rpc, "new {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns); + debug!( + "new {}/{}/{} conn_head={}. rpc_head={}. rpc={}", + num_consensus_rpcs, + num_connection_heads, + total_conns, + consensus_head_block_id, + rpc_head_str, + rpc + ); self.save_block(&consensus_head_block, true).await?; diff --git a/web3_proxy/src/rpcs/connection.rs b/web3_proxy/src/rpcs/connection.rs index b999fe8a..d6aa2260 100644 --- a/web3_proxy/src/rpcs/connection.rs +++ b/web3_proxy/src/rpcs/connection.rs @@ -9,6 +9,7 @@ use anyhow::Context; use ethers::prelude::{Bytes, Middleware, ProviderError, TxHash, H256, U64}; use futures::future::try_join_all; use futures::StreamExt; +use log::{debug, error, info, warn, Level}; use parking_lot::RwLock; use redis_rate_limiter::{RedisPool, RedisRateLimitResult, RedisRateLimiter}; use sea_orm::DatabaseConnection; @@ -25,7 +26,6 @@ use thread_fast_rng::thread_fast_rng; use tokio::sync::broadcast; use tokio::sync::RwLock as AsyncRwLock; use tokio::time::{interval, sleep, sleep_until, Duration, Instant, MissedTickBehavior}; -use tracing::{debug, error, info, instrument, trace, warn, Level}; /// An active connection to a Web3 RPC server like geth or erigon. pub struct Web3Connection { @@ -58,7 +58,6 @@ pub struct Web3Connection { impl Web3Connection { /// Connect to a web3 rpc - // #[instrument(name = "spawn_Web3Connection", skip(hard_limit, http_client))] // TODO: have this take a builder (which will have channels attached) #[allow(clippy::too_many_arguments)] pub async fn spawn( @@ -128,7 +127,7 @@ impl Web3Connection { .request( "eth_chainId", &json!(Option::None::<()>), - Level::ERROR.into(), + Level::Error.into(), ) .await; @@ -203,7 +202,7 @@ impl Web3Connection { // TODO: subscribe to a channel instead of polling. subscribe to http_interval_sender? while head_block_id.is_none() { - warn!(rpc=%self, "no head block yet. retrying"); + warn!("no head block yet. retrying rpc {}", self); sleep(Duration::from_secs(13)).await; @@ -230,11 +229,11 @@ impl Web3Connection { maybe_archive_block, )), // error here are expected, so keep the level low - tracing::Level::DEBUG.into(), + Level::Debug.into(), ) .await; - trace!(?archive_result, rpc=%self); + // // trace!(?archive_result, rpc=%self); if archive_result.is_ok() { limit = Some(block_data_limit); @@ -300,7 +299,7 @@ impl Web3Connection { ); let reconnect_in = Duration::from_millis(first_sleep_ms); - warn!(rpc=%self, ?reconnect_in, "Reconnect in"); + warn!("Reconnect to {} in {}ms", self, reconnect_in.as_millis()); sleep(reconnect_in).await; @@ -318,7 +317,12 @@ impl Web3Connection { ); let retry_in = Duration::from_millis(sleep_ms); - warn!(rpc=%self, ?retry_in, ?err, "Failed to reconnect!"); + warn!( + "Failed reconnect to {}! Retry in {}ms. err={:?}", + self, + retry_in.as_millis(), + err, + ); sleep(retry_in).await; } @@ -346,7 +350,7 @@ impl Web3Connection { Web3Provider::Ws(_) => {} } - info!(rpc=%self, "reconnecting"); + info!("Reconnecting to {}", self); // disconnect the current provider *provider_option = None; @@ -365,7 +369,7 @@ impl Web3Connection { .context("block_sender during connect")?; } } else { - info!(rpc=%self, "connecting"); + info!("connecting to {}", self); } // TODO: if this fails, keep retrying! otherwise it crashes and doesn't try again! @@ -373,7 +377,7 @@ impl Web3Connection { *provider_option = Some(Arc::new(new_provider)); - info!(rpc=%self, "successfully connected"); + info!("successfully connected to {}", self); Ok(()) } @@ -446,7 +450,7 @@ impl Web3Connection { .context("block_sender")?; } Err(err) => { - warn!(?err, "unable to get block from {}", self); + warn!("unable to get block from {}. err={:?}", self, err); { let mut head_block_id = self.head_block_id.write(); @@ -477,7 +481,7 @@ impl Web3Connection { reconnect: bool, ) -> anyhow::Result<()> { loop { - debug!(rpc=%self, "subscribing"); + debug!("subscribing to {}", self); let http_interval_receiver = http_interval_sender.as_ref().map(|x| x.subscribe()); @@ -510,9 +514,9 @@ impl Web3Connection { loop { if let Some(provider) = conn.provider.read().await.as_ref() { if provider.ready() { - trace!(rpc=%conn, "provider is ready"); + // // trace!(rpc=%conn, "provider is ready"); } else { - warn!(rpc=%conn, "provider is NOT ready"); + warn!("rpc {} is NOT ready", conn); // returning error will trigger a reconnect // TODO: what if we just happened to have this check line up with another restart? return Err(anyhow::anyhow!("provider is not ready")); @@ -535,22 +539,18 @@ impl Web3Connection { } Err(err) => { if reconnect { - warn!( - rpc=%self, - ?err, - "subscription exited", - ); + warn!("{} subscription exited. err={:?}", self, err); self.retrying_reconnect(block_sender.as_ref(), true).await?; } else { - error!(rpc=%self, ?err, "subscription exited"); + error!("{} subscription exited. err={:?}", self, err); return Err(err); } } } } - info!(rpc=%self, "all subscriptions complete"); + info!("all subscriptions on {} completed", self); Ok(()) } @@ -563,7 +563,7 @@ impl Web3Connection { block_sender: flume::Sender, block_map: BlockHashesCache, ) -> anyhow::Result<()> { - info!(%self, "watching new heads"); + info!("watching new heads on {}", self); // TODO: is a RwLock of an Option the right thing here? if let Some(provider) = self.provider.read().await.clone() { @@ -587,7 +587,7 @@ impl Web3Connection { .request( "eth_getBlockByNumber", &json!(("latest", false)), - tracing::Level::ERROR.into(), + Level::Error.into(), ) .await; @@ -632,7 +632,7 @@ impl Web3Connection { } } Err(err) => { - warn!(?err, "Internal error on latest block from {}", self); + warn!("Internal error on latest block from {}. {:?}", self, err); self.send_head_block_result( Ok(None), @@ -656,12 +656,12 @@ impl Web3Connection { broadcast::error::RecvError::Lagged(lagged) => { // querying the block was delayed // this can happen if tokio is very busy or waiting for requests limits took too long - warn!(?err, rpc=%self, "http interval lagging by {}!", lagged); + warn!("http interval on {} lagging by {}!", self, lagged); } } } - trace!(rpc=%self, "ok http interval"); + // // trace!(rpc=%self, "ok http interval"); } } Web3Provider::Ws(provider) => { @@ -682,7 +682,7 @@ impl Web3Connection { .request( "eth_getBlockByNumber", &json!(("latest", false)), - tracing::Level::ERROR.into(), + Level::Error.into(), ) .await; @@ -723,7 +723,7 @@ impl Web3Connection { // TODO: is this always an error? // TODO: we probably don't want a warn and to return error - warn!(rpc=%self, "new_heads subscription ended"); + warn!("new_heads subscription to {} ended", self); return Err(anyhow::anyhow!("new_heads subscription ended")); } } @@ -737,7 +737,7 @@ impl Web3Connection { authorization: Arc, tx_id_sender: flume::Sender<(TxHash, Arc)>, ) -> anyhow::Result<()> { - info!(%self, "watching pending transactions"); + info!( "watching pending transactions on {}", self); // TODO: is a RwLock of an Option the right thing here? if let Some(provider) = self.provider.read().await.clone() { @@ -790,7 +790,7 @@ impl Web3Connection { // TODO: is this always an error? // TODO: we probably don't want a warn and to return error - warn!(rpc=%self, "pending_transactions subscription ended"); + warn!( "pending_transactions subscription ended on {}", self); return Err(anyhow::anyhow!("pending_transactions subscription ended")); } } @@ -800,7 +800,7 @@ impl Web3Connection { } /// be careful with this; it might wait forever! - #[instrument] + pub async fn wait_for_request_handle( self: &Arc, authorization: &Arc, @@ -811,13 +811,13 @@ impl Web3Connection { loop { let x = self.try_request_handle(authorization).await; - trace!(?x, "try_request_handle"); + // // trace!(?x, "try_request_handle"); match x { Ok(OpenRequestResult::Handle(handle)) => return Ok(handle), Ok(OpenRequestResult::RetryAt(retry_at)) => { // TODO: emit a stat? - trace!(?retry_at); + // // trace!(?retry_at); if retry_at > max_wait { // break now since we will wait past our maximum wait time @@ -836,7 +836,6 @@ impl Web3Connection { } } - #[instrument] pub async fn try_request_handle( self: &Arc, authorization: &Arc, @@ -853,14 +852,14 @@ impl Web3Connection { // TODO: how should we know if we should set expire or not? match ratelimiter.throttle().await? { RedisRateLimitResult::Allowed(_) => { - trace!("rate limit succeeded") + // // trace!("rate limit succeeded") } RedisRateLimitResult::RetryAt(retry_at, _) => { // rate limit failed // save the smallest retry_after. if nothing succeeds, return an Err with retry_after in it // TODO: use tracing better // TODO: i'm seeing "Exhausted rate limit on moralis: 0ns". How is it getting 0? - warn!(?retry_at, rpc=%self, "Exhausted rate limit"); + warn!( "Exhausted rate limit on {}. Retry at {:?}", self, retry_at); return Ok(OpenRequestResult::RetryAt(retry_at)); } diff --git a/web3_proxy/src/rpcs/connections.rs b/web3_proxy/src/rpcs/connections.rs index 9ddf7e6d..b3544d62 100644 --- a/web3_proxy/src/rpcs/connections.rs +++ b/web3_proxy/src/rpcs/connections.rs @@ -19,6 +19,7 @@ use futures::future::{join_all, try_join_all}; use futures::stream::FuturesUnordered; use futures::StreamExt; use hashbrown::HashMap; +use log::{error, info, warn, Level}; use moka::future::{Cache, ConcurrentCacheExt}; use petgraph::graphmap::DiGraphMap; use sea_orm::DatabaseConnection; @@ -34,7 +35,6 @@ use tokio::sync::RwLock as AsyncRwLock; use tokio::sync::{broadcast, watch}; use tokio::task; use tokio::time::{interval, sleep, sleep_until, Duration, Instant, MissedTickBehavior}; -use tracing::{error, info, instrument, trace, warn}; /// A collection of web3 connections. Sends requests either the current best server or all servers. #[derive(From)] @@ -95,7 +95,7 @@ impl Web3Connections { // TODO: every time a head_block arrives (with a small delay for known slow servers), or on the interval. interval.tick().await; - trace!("http interval ready"); + // // trace!("http interval ready"); // errors are okay. they mean that all receivers have been dropped let _ = sender.send(()); @@ -171,7 +171,7 @@ impl Web3Connections { Ok(Err(err)) => { // if we got an error here, it is not retryable // TODO: include context about which connection failed - error!(?err, "Unable to create connection"); + error!("Unable to create connection. err={:?}", err); } Err(err) => { return Err(err.into()); @@ -337,7 +337,7 @@ impl Web3Connections { .into_iter() .map(|active_request_handle| async move { let result: Result, _> = active_request_handle - .request(method, &json!(¶ms), tracing::Level::ERROR.into()) + .request(method, &json!(¶ms), Level::Error.into()) .await; result }) @@ -472,7 +472,7 @@ impl Web3Connections { // increment our connection counter match rpc.try_request_handle(authorization).await { Ok(OpenRequestResult::Handle(handle)) => { - trace!("next server on {:?}: {:?}", self, rpc); + // // trace!("next server on {:?}: {:?}", self, rpc); return Ok(OpenRequestResult::Handle(handle)); } Ok(OpenRequestResult::RetryAt(retry_at)) => { @@ -483,7 +483,7 @@ impl Web3Connections { } Err(err) => { // TODO: log a warning? - warn!(?err, "No request handle for {}", rpc) + warn!("No request handle for {}. err={:?}", rpc, err) } } } @@ -539,7 +539,10 @@ impl Web3Connections { warn!("no request handle for {}", connection) } Err(err) => { - warn!(?err, "error getting request handle for {}", connection) + warn!( + "error getting request handle for {}. err={:?}", + connection, err + ) } } } @@ -602,7 +605,7 @@ impl Web3Connections { ) { Ok(response) => { if let Some(error) = &response.error { - trace!(?response, "rpc error"); + // // trace!(?response, "rpc error"); if let Some(request_metadata) = request_metadata { request_metadata @@ -629,17 +632,20 @@ impl Web3Connections { } } } else { - trace!(?response, "rpc success"); + // // trace!(?response, "rpc success"); } return Ok(response); } - Err(e) => { + Err(err) => { let rpc = skip_rpcs .last() .expect("there must have been a provider if we got an error"); - warn!(%rpc, ?e, "Backend server error! Retrying on another"); + warn!( + "Backend server error on {}! Retrying on another. err={:?}", + rpc, err + ); // TODO: sleep how long? until synced_connections changes or rate limits are available // sleep(Duration::from_millis(100)).await; @@ -652,7 +658,7 @@ impl Web3Connections { // TODO: move this to a helper function // sleep (TODO: with a lock?) until our rate limits should be available // TODO: if a server catches up sync while we are waiting, we could stop waiting - warn!(?retry_at, "All rate limits exceeded. Sleeping"); + warn!("All rate limits exceeded. Sleeping untill {:?}", retry_at); // TODO: have a separate column for rate limited? if let Some(request_metadata) = request_metadata { @@ -664,7 +670,7 @@ impl Web3Connections { continue; } OpenRequestResult::RetryNever => { - warn!(?self, "No server handles!"); + warn!("No server handles! {:?}", self); if let Some(request_metadata) = request_metadata { request_metadata.no_servers.fetch_add(1, Ordering::Release); @@ -690,7 +696,7 @@ impl Web3Connections { } /// be sure there is a timeout on this or it might loop forever - #[instrument] + pub async fn try_send_all_upstream_servers( &self, authorization: &Arc, @@ -729,7 +735,7 @@ impl Web3Connections { return Ok(response); } Err(None) => { - warn!(?self, "No servers in sync! Retrying"); + warn!("No servers in sync on {:?}! Retrying", self); if let Some(request_metadata) = &request_metadata { request_metadata.no_servers.fetch_add(1, Ordering::Release); diff --git a/web3_proxy/src/rpcs/provider.rs b/web3_proxy/src/rpcs/provider.rs index f02f5551..c4850534 100644 --- a/web3_proxy/src/rpcs/provider.rs +++ b/web3_proxy/src/rpcs/provider.rs @@ -1,7 +1,6 @@ use anyhow::Context; use derive_more::From; use std::time::Duration; -use tracing::{info_span, instrument, Instrument}; /// Use HTTP and WS providers. // TODO: instead of an enum, I tried to use Box, but hit @@ -20,7 +19,6 @@ impl Web3Provider { } } - #[instrument] pub async fn from_str( url_str: &str, http_client: Option, @@ -38,10 +36,7 @@ impl Web3Provider { .interval(Duration::from_secs(13)) .into() } else if url_str.starts_with("ws") { - // TODO: i dont think this instrument does much of anything. what level should it be? - let provider = ethers::providers::Ws::connect(url_str) - .instrument(info_span!("Web3Provider", %url_str)) - .await?; + let provider = ethers::providers::Ws::connect(url_str).await?; // TODO: dry this up (needs https://github.com/gakonst/ethers-rs/issues/592) // TODO: i don't think this interval matters diff --git a/web3_proxy/src/rpcs/request.rs b/web3_proxy/src/rpcs/request.rs index 1021d170..a7dc6224 100644 --- a/web3_proxy/src/rpcs/request.rs +++ b/web3_proxy/src/rpcs/request.rs @@ -8,6 +8,7 @@ use entities::revert_log; use entities::sea_orm_active_enums::Method; use ethers::providers::{HttpClientError, ProviderError, WsClientError}; use ethers::types::{Address, Bytes}; +use log::{debug, error, warn, Level}; use metered::metered; use metered::HitCount; use metered::ResponseTime; @@ -20,8 +21,6 @@ use std::sync::atomic::{self, AtomicBool, Ordering}; use std::sync::Arc; use thread_fast_rng::rand::Rng; use tokio::time::{sleep, Duration, Instant}; -use tracing::Level; -use tracing::{debug, error, trace, warn}; #[derive(Debug)] pub enum OpenRequestResult { @@ -67,9 +66,9 @@ struct EthCallFirstParams { impl From for RequestErrorHandler { fn from(level: Level) -> Self { match level { - Level::DEBUG => RequestErrorHandler::DebugLevel, - Level::ERROR => RequestErrorHandler::ErrorLevel, - Level::WARN => RequestErrorHandler::WarnLevel, + Level::Debug => RequestErrorHandler::DebugLevel, + Level::Error => RequestErrorHandler::ErrorLevel, + Level::Warn => RequestErrorHandler::WarnLevel, _ => unimplemented!("unexpected tracing Level"), } } @@ -85,7 +84,7 @@ impl Authorization { let rpc_key_id = match self.checks.rpc_key_id { Some(rpc_key_id) => rpc_key_id.into(), None => { - trace!(?self, "cannot save revert without rpc_key_id"); + // // trace!(?self, "cannot save revert without rpc_key_id"); return Ok(()); } }; @@ -119,7 +118,7 @@ impl Authorization { // TODO: what log level? // TODO: better format - trace!(?rl); + // trace!(?rl); // TODO: return something useful Ok(()) @@ -181,14 +180,14 @@ impl OpenRequestHandle { // TODO: requests from customers have request ids, but we should add // TODO: including params in this is way too verbose // the authorization field is already on a parent span - trace!(rpc=%self.conn, %method, "request"); + // trace!(rpc=%self.conn, %method, "request"); let mut provider = None; while provider.is_none() { match self.conn.provider.read().await.clone() { None => { - warn!(rpc=%self.conn, "no provider!"); + warn!("no provider for {}!", self.conn); // TODO: how should this work? a reconnect should be in progress. but maybe force one now? // TODO: sleep how long? subscribe to something instead? maybe use a watch handle? // TODO: this is going to be way too verbose! @@ -211,29 +210,29 @@ impl OpenRequestHandle { // TODO: do something special for eth_sendRawTransaction too let error_handler = if let RequestErrorHandler::SaveReverts = error_handler { if !["eth_call", "eth_estimateGas"].contains(&method) { - trace!(%method, "skipping save on revert"); + // trace!(%method, "skipping save on revert"); RequestErrorHandler::DebugLevel } else if self.authorization.db_conn.is_some() { let log_revert_chance = self.authorization.checks.log_revert_chance; if log_revert_chance == 0.0 { - trace!(%method, "no chance. skipping save on revert"); + // trace!(%method, "no chance. skipping save on revert"); RequestErrorHandler::DebugLevel } else if log_revert_chance == 1.0 { - trace!(%method, "gaurenteed chance. SAVING on revert"); + // trace!(%method, "gaurenteed chance. SAVING on revert"); error_handler } else if thread_fast_rng::thread_fast_rng().gen_range(0.0f64..=1.0) < log_revert_chance { - trace!(%method, "missed chance. skipping save on revert"); + // trace!(%method, "missed chance. skipping save on revert"); RequestErrorHandler::DebugLevel } else { - trace!("Saving on revert"); + // trace!("Saving on revert"); // TODO: is always logging at debug level fine? error_handler } } else { - trace!(%method, "no database. skipping save on revert"); + // trace!(%method, "no database. skipping save on revert"); RequestErrorHandler::DebugLevel } } else { @@ -277,14 +276,14 @@ impl OpenRequestHandle { RequestErrorHandler::DebugLevel => { // TODO: think about this revert check more. sometimes we might want reverts logged so this needs a flag if !is_revert { - debug!(?err, %method, ?params, rpc=%self.conn, "bad response!"); + debug!("bad response from {}! method={} params={:?} err={:?}", self.conn, method, params, err); } } RequestErrorHandler::ErrorLevel => { - error!(?err, %method, ?params, rpc=%self.conn, "bad response!"); + error!("bad response from {}! method={} params={:?} err={:?}", self.conn, method, params, err); } RequestErrorHandler::WarnLevel => { - warn!(?err, %method, ?params, rpc=%self.conn, "bad response!"); + warn!("bad response from {}! method={} params={:?} err={:?}", self.conn, method, params, err); } RequestErrorHandler::SaveReverts => { // TODO: do not unwrap! (doesn't matter much since we check method as a string above) @@ -304,8 +303,8 @@ impl OpenRequestHandle { } else { // TODO: i think ethers already has trace logging (and does it much more fancy) // TODO: opt-in response inspection to log reverts with their request. put into redis or what? - // trace!(rpc=%self.conn, %method, ?response); - trace!(%method, rpc=%self.conn, "response"); + // // trace!(rpc=%self.conn, %method, ?response); + // trace!(%method, rpc=%self.conn, "response"); } response diff --git a/web3_proxy/src/rpcs/transactions.rs b/web3_proxy/src/rpcs/transactions.rs index e5c0fb52..868ec855 100644 --- a/web3_proxy/src/rpcs/transactions.rs +++ b/web3_proxy/src/rpcs/transactions.rs @@ -5,9 +5,9 @@ use super::connection::Web3Connection; use super::connections::Web3Connections; use super::request::OpenRequestResult; use ethers::prelude::{ProviderError, Transaction, TxHash}; +use log::{debug, Level}; use std::sync::Arc; use tokio::sync::broadcast; -use tracing::{debug, trace, Level}; // TODO: think more about TxState #[derive(Clone)] @@ -34,7 +34,7 @@ impl Web3Connections { .request( "eth_getTransactionByHash", &(pending_tx_id,), - Level::ERROR.into(), + Level::Error.into(), ) .await? } @@ -43,12 +43,12 @@ impl Web3Connections { return Ok(None); } Err(err) => { - trace!( - ?pending_tx_id, - ?rpc, - ?err, - "cancelled funneling transaction" - ); + // trace!( + // ?pending_tx_id, + // ?rpc, + // ?err, + // "cancelled funneling transaction" + // ); return Ok(None); } }; @@ -79,7 +79,7 @@ impl Web3Connections { return Ok(()); } - trace!(?pending_tx_id, "checking pending_transactions on {}", rpc); + // trace!(?pending_tx_id, "checking pending_transactions on {}", rpc); if self.pending_transactions.contains_key(&pending_tx_id) { // this transaction has already been processed return Ok(()); @@ -94,14 +94,14 @@ impl Web3Connections { Ok(Some(tx_state)) => { let _ = pending_tx_sender.send(tx_state); - trace!(?pending_tx_id, "sent"); + // trace!(?pending_tx_id, "sent"); // we sent the transaction. return now. don't break looping because that gives a warning return Ok(()); } Ok(None) => {} Err(err) => { - trace!(?err, ?pending_tx_id, "failed fetching transaction"); + // trace!(?err, ?pending_tx_id, "failed fetching transaction"); // unable to update the entry. sleep and try again soon // TODO: retry with exponential backoff with jitter starting from a much smaller time // sleep(Duration::from_millis(100)).await; @@ -112,7 +112,7 @@ impl Web3Connections { // "There is a Pending txn with a lower account nonce. This txn can only be executed after confirmation of the earlier Txn Hash#" // sometimes it's been pending for many hours // sometimes it's maybe something else? - debug!(?pending_tx_id, "not found on {}", rpc); + debug!("txid {} not found on {}", pending_tx_id, rpc); Ok(()) } } diff --git a/web3_proxy/src/user_queries.rs b/web3_proxy/src/user_queries.rs index 8ae6a473..55e7cc95 100644 --- a/web3_proxy/src/user_queries.rs +++ b/web3_proxy/src/user_queries.rs @@ -9,16 +9,15 @@ use chrono::NaiveDateTime; use entities::{rpc_accounting, rpc_key}; use hashbrown::HashMap; use http::StatusCode; +use log::warn; use migration::{Condition, Expr, SimpleExpr}; use redis_rate_limiter::{redis::AsyncCommands, RedisConnection}; use sea_orm::{ ColumnTrait, EntityTrait, PaginatorTrait, QueryFilter, QueryOrder, QuerySelect, Select, }; -use tracing::{instrument, warn}; /// get the attached address from redis for the given auth_token. /// 0 means all users -#[instrument(level = "trace", skip(redis_conn))] pub async fn get_user_id_from_params( mut redis_conn: RedisConnection, // this is a long type. should we strip it down? @@ -68,7 +67,7 @@ pub async fn get_user_id_from_params( /// only allow rpc_key to be set if user_id is also set. /// this will keep people from reading someone else's keys. /// 0 means none. -#[instrument(level = "trace")] + pub fn get_rpc_key_id_from_params( user_id: u64, params: &HashMap, @@ -87,7 +86,6 @@ pub fn get_rpc_key_id_from_params( } } -#[instrument(level = "trace")] pub fn get_chain_id_from_params( app: &Web3ProxyApp, params: &HashMap, @@ -102,7 +100,6 @@ pub fn get_chain_id_from_params( ) } -#[instrument(level = "trace")] pub fn get_query_start_from_params( params: &HashMap, ) -> anyhow::Result { @@ -126,7 +123,6 @@ pub fn get_query_start_from_params( ) } -#[instrument(level = "trace")] pub fn get_page_from_params(params: &HashMap) -> anyhow::Result { params.get("page").map_or_else::, _, _>( || { @@ -143,7 +139,6 @@ pub fn get_page_from_params(params: &HashMap) -> anyhow::Result< ) } -#[instrument(level = "trace")] pub fn get_query_window_seconds_from_params( params: &HashMap, ) -> Result {