cut out tracing for now

This commit is contained in:
Bryan Stitt 2022-11-12 08:24:32 +00:00
parent 9ae2337d1d
commit 0e1cf5767c
35 changed files with 356 additions and 612 deletions

317
Cargo.lock generated

@ -263,7 +263,7 @@ version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b88d82667eca772c4aa12f0f1348b3ae643424c8876448f3f7bd5787032e234c"
dependencies = [
"autocfg 1.1.0",
"autocfg",
]
[[package]]
@ -304,7 +304,7 @@ version = "0.7.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "41aed1da83ecdc799503b7cb94da1b45a34d72b49caf40a61d9cf5b88ec07cfd"
dependencies = [
"autocfg 1.1.0",
"autocfg",
"derive_utils",
"proc-macro2",
"quote",
@ -335,15 +335,6 @@ dependencies = [
"syn",
]
[[package]]
name = "autocfg"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0dde43e75fd43e8a1bf86103336bc699aa8d17ad1be60c76c0bdfd4828e19b78"
dependencies = [
"autocfg 1.1.0",
]
[[package]]
name = "autocfg"
version = "1.1.0"
@ -730,9 +721,9 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "chrono"
version = "0.4.22"
version = "0.4.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bfd4d1b31faaa3a89d7934dbded3111da0d2ef28e3ebccdb4f0179f5929d1ef1"
checksum = "16b0a3d9ed01224b22057780a37bb8c5dbfe1be8ba48678e7bf57ec4b385411f"
dependencies = [
"iana-time-zone",
"js-sys",
@ -808,15 +799,6 @@ dependencies = [
"os_str_bytes",
]
[[package]]
name = "cloudabi"
version = "0.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ddfc5b9aa5d4507acaf872de71051dfd0e309860e88966e1051e462a077aac4f"
dependencies = [
"bitflags",
]
[[package]]
name = "coins-bip32"
version = "0.7.0"
@ -848,7 +830,7 @@ dependencies = [
"hex",
"hmac",
"pbkdf2 0.11.0",
"rand 0.8.5",
"rand",
"sha2 0.10.6",
"thiserror",
]
@ -1033,7 +1015,7 @@ version = "0.9.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "045ebe27666471bb549370b4b0b3e51b07f56325befa4284db65fc89c02511b1"
dependencies = [
"autocfg 1.1.0",
"autocfg",
"cfg-if",
"crossbeam-utils",
"memoffset",
@ -1084,7 +1066,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9f2b443d17d49dad5ef0ede301c3179cc923b8822f3393b4d2c28c269dd4a122"
dependencies = [
"generic-array 0.14.6",
"rand_core 0.6.3",
"rand_core",
"subtle",
"zeroize",
]
@ -1158,10 +1140,10 @@ version = "0.2.0"
dependencies = [
"anyhow",
"hashbrown 0.13.1",
"log",
"moka",
"redis-rate-limiter",
"tokio",
"tracing",
]
[[package]]
@ -1333,7 +1315,7 @@ dependencies = [
"generic-array 0.14.6",
"group",
"pkcs8 0.9.0",
"rand_core 0.6.3",
"rand_core",
"sec1",
"subtle",
"zeroize",
@ -1372,6 +1354,19 @@ dependencies = [
"uuid 1.2.1",
]
[[package]]
name = "env_logger"
version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a12e6657c4c97ebab115a42dcee77225f7f482cdd841cf7088c657a42e9e00e7"
dependencies = [
"atty",
"humantime",
"log",
"regex",
"termcolor",
]
[[package]]
name = "error-chain"
version = "0.12.4"
@ -1393,7 +1388,7 @@ dependencies = [
"hex",
"hmac",
"pbkdf2 0.11.0",
"rand 0.8.5",
"rand",
"scrypt",
"serde",
"serde_json",
@ -1486,7 +1481,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "002a0d58a7d921b496f5f19b5b9508d01d25fbe25078286b1fcb6f4e7562acf7"
dependencies = [
"ethers-contract-abigen",
"ethers-contract-derive",
"ethers-core",
"ethers-providers",
"futures-util",
@ -1522,21 +1516,6 @@ dependencies = [
"walkdir",
]
[[package]]
name = "ethers-contract-derive"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "445276414690c97d88638d22dd5f89ba919d7dcea36de4825896d52280c704c7"
dependencies = [
"ethers-contract-abigen",
"ethers-core",
"hex",
"proc-macro2",
"quote",
"serde_json",
"syn",
]
[[package]]
name = "ethers-core"
version = "1.0.0"
@ -1556,7 +1535,7 @@ dependencies = [
"once_cell",
"open-fastrlp",
"proc-macro2",
"rand 0.8.5",
"rand",
"rlp",
"rlp-derive",
"rust_decimal",
@ -1662,7 +1641,7 @@ dependencies = [
"eth-keystore",
"ethers-core",
"hex",
"rand 0.8.5",
"rand",
"sha2 0.10.6",
"thiserror",
]
@ -1745,7 +1724,7 @@ version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df689201f395c6b90dfe87127685f8dbfc083a5e779e613575d8bd7314300c3e"
dependencies = [
"rand_core 0.6.3",
"rand_core",
"subtle",
]
@ -1768,7 +1747,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cfcf0ed7fe52a17a03854ec54a9f76d6d84508d1c0e66bc1793301c73fc8493c"
dependencies = [
"byteorder",
"rand 0.8.5",
"rand",
"rustc-hex",
"static_assertions",
]
@ -1852,12 +1831,6 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca45aac12b6c561b6289bc68957cb1db3dccf870e1951d590202de5e24f1dd35"
[[package]]
name = "fuchsia-cprng"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a06f77d526c1a601b7c4cdd98f54b5eaabffc14d5f2f0296febdc7f357c6d3ba"
[[package]]
name = "funty"
version = "2.0.0"
@ -2057,7 +2030,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7391856def869c1c81063a03457c676fbcd419709c3dfb33d8d319de484b154d"
dependencies = [
"ff",
"rand_core 0.6.3",
"rand_core",
"subtle",
]
@ -2272,6 +2245,12 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421"
[[package]]
name = "humantime"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
[[package]]
name = "hyper"
version = "0.14.19"
@ -2382,7 +2361,7 @@ version = "1.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "10a35a97730320ffe8e2d410b5d3b69279b98d2c14bdb8b70ea89ecf7888d41e"
dependencies = [
"autocfg 1.1.0",
"autocfg",
"hashbrown 0.12.3",
"serde",
]
@ -2593,7 +2572,7 @@ version = "0.4.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9f80bf5aacaf25cbfc8210d1cfb718f2bf3b11c4c54e5afe36c236853a8ec390"
dependencies = [
"autocfg 1.1.0",
"autocfg",
"scopeguard",
]
@ -2648,7 +2627,7 @@ version = "0.6.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5aa361d4faea93603064a027415f07bd8e1d5c88c9fbf68bf56a285428fd79ce"
dependencies = [
"autocfg 1.1.0",
"autocfg",
]
[[package]]
@ -2826,7 +2805,7 @@ version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f93ab6289c7b344a8a9f60f88d80aa20032336fe78da341afc91c8a2341fc75f"
dependencies = [
"autocfg 1.1.0",
"autocfg",
"num-integer",
"num-traits",
]
@ -2843,7 +2822,7 @@ dependencies = [
"num-integer",
"num-iter",
"num-traits",
"rand 0.8.5",
"rand",
"smallvec",
"zeroize",
]
@ -2863,7 +2842,7 @@ version = "0.1.45"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "225d3389fb3509a24c93f5c29eb6bde2586b98d9f016636dff58d7c6f7569cd9"
dependencies = [
"autocfg 1.1.0",
"autocfg",
"num-traits",
]
@ -2873,7 +2852,7 @@ version = "0.1.43"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7d03e6c028c5dc5cac6e2dec0efda81fc887605bb3d884578bb6d6bf7514e252"
dependencies = [
"autocfg 1.1.0",
"autocfg",
"num-integer",
"num-traits",
]
@ -2884,7 +2863,7 @@ version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0638a1c9d0a3c0914158145bc76cff373a75a627e6ecbfb71cbe6f453a5a19b0"
dependencies = [
"autocfg 1.1.0",
"autocfg",
"num-bigint",
"num-integer",
"num-traits",
@ -2896,7 +2875,7 @@ version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "578ede34cf02f8924ab9447f50c28075b4d3e5b269972345e7e0372b38c6cdcd"
dependencies = [
"autocfg 1.1.0",
"autocfg",
"libm",
]
@ -3105,7 +3084,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d791538a6dcc1e7cb7fe6f6b58aca40e7f79403c45b2bc274008b5e647af1d8"
dependencies = [
"base64ct",
"rand_core 0.6.3",
"rand_core",
"subtle",
]
@ -3116,7 +3095,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7676374caaee8a325c9e7a2ae557f216c5563a171d6997b0ef8a65af35147700"
dependencies = [
"base64ct",
"rand_core 0.6.3",
"rand_core",
"subtle",
]
@ -3253,7 +3232,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5d5285893bb5eb82e6aaf5d59ee909a06a16737a8970984dd7746ba9283498d6"
dependencies = [
"phf_shared",
"rand 0.8.5",
"rand",
]
[[package]]
@ -3486,25 +3465,6 @@ version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dc33ff2d4973d518d823d61aa239014831e521c75da58e3df4840d3f47749d09"
[[package]]
name = "rand"
version = "0.6.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d71dacdc3c88c1fde3885a3be3fbab9f35724e6ce99467f7d9c5026132184ca"
dependencies = [
"autocfg 0.1.8",
"libc",
"rand_chacha 0.1.1",
"rand_core 0.4.2",
"rand_hc",
"rand_isaac",
"rand_jitter",
"rand_os",
"rand_pcg",
"rand_xorshift",
"winapi",
]
[[package]]
name = "rand"
version = "0.8.5"
@ -3512,18 +3472,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404"
dependencies = [
"libc",
"rand_chacha 0.3.1",
"rand_core 0.6.3",
]
[[package]]
name = "rand_chacha"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "556d3a1ca6600bfcbab7c7c91ccb085ac7fbbcd70e008a98742e7847f4f7bcef"
dependencies = [
"autocfg 0.1.8",
"rand_core 0.3.1",
"rand_chacha",
"rand_core",
]
[[package]]
@ -3533,24 +3483,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88"
dependencies = [
"ppv-lite86",
"rand_core 0.6.3",
"rand_core",
]
[[package]]
name = "rand_core"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a6fdeb83b075e8266dcc8762c22776f6877a63111121f5f8c7411e5be7eed4b"
dependencies = [
"rand_core 0.4.2",
]
[[package]]
name = "rand_core"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c33a3c44ca05fa6f1807d8e6743f3824e8509beca625669633be0acbdf509dc"
[[package]]
name = "rand_core"
version = "0.6.3"
@ -3560,76 +3495,13 @@ dependencies = [
"getrandom",
]
[[package]]
name = "rand_hc"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b40677c7be09ae76218dc623efbf7b18e34bced3f38883af07bb75630a21bc4"
dependencies = [
"rand_core 0.3.1",
]
[[package]]
name = "rand_isaac"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ded997c9d5f13925be2a6fd7e66bf1872597f759fd9dd93513dd7e92e5a5ee08"
dependencies = [
"rand_core 0.3.1",
]
[[package]]
name = "rand_jitter"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1166d5c91dc97b88d1decc3285bb0a99ed84b05cfd0bc2341bdf2d43fc41e39b"
dependencies = [
"libc",
"rand_core 0.4.2",
"winapi",
]
[[package]]
name = "rand_os"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b75f676a1e053fc562eafbb47838d67c84801e38fc1ba459e8f180deabd5071"
dependencies = [
"cloudabi",
"fuchsia-cprng",
"libc",
"rand_core 0.4.2",
"rdrand",
"wasm-bindgen",
"winapi",
]
[[package]]
name = "rand_pcg"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "abf9b09b01790cfe0364f52bf32995ea3c39f4d2dd011eac241d2914146d0b44"
dependencies = [
"autocfg 0.1.8",
"rand_core 0.4.2",
]
[[package]]
name = "rand_xorshift"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cbf7e9e623549b0e21f6e97cf8ecf247c1a8fd2e8a992ae265314300b2455d5c"
dependencies = [
"rand_core 0.3.1",
]
[[package]]
name = "rand_xoshiro"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6f97cdb2a36ed4183de61b2f824cc45c9f1037f28afe0a322e9fff4c108b5aaa"
dependencies = [
"rand_core 0.6.3",
"rand_core",
]
[[package]]
@ -3638,7 +3510,7 @@ version = "1.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bd99e5772ead8baa5215278c9b15bf92087709e9c1b2d1f97cdb5a183c933a7d"
dependencies = [
"autocfg 1.1.0",
"autocfg",
"crossbeam-deque",
"either",
"rayon-core",
@ -3656,15 +3528,6 @@ dependencies = [
"num_cpus",
]
[[package]]
name = "rdrand"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "678054eb77286b51581ba43620cc911abf02758c91f93f479767aed0f90458b2"
dependencies = [
"rand_core 0.3.1",
]
[[package]]
name = "redis"
version = "0.22.1"
@ -3691,7 +3554,6 @@ dependencies = [
"anyhow",
"deadpool-redis",
"tokio",
"tracing",
]
[[package]]
@ -3864,7 +3726,7 @@ dependencies = [
"num-traits",
"pkcs1",
"pkcs8 0.8.0",
"rand_core 0.6.3",
"rand_core",
"smallvec",
"subtle",
"zeroize",
@ -4223,6 +4085,7 @@ dependencies = [
"sentry-backtrace",
"sentry-contexts",
"sentry-core",
"sentry-log",
"sentry-panic",
"tokio",
"ureq",
@ -4273,12 +4136,22 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff58433a7ad557b586a09c42c4298d5f3ddb0c777e1a79d950e510d7b93fce0e"
dependencies = [
"once_cell",
"rand 0.8.5",
"rand",
"sentry-types",
"serde",
"serde_json",
]
[[package]]
name = "sentry-log"
version = "0.28.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8da361d15c09707d3bd8a2742132b8af54a697994dd396942556aef01d2432de"
dependencies = [
"log",
"sentry-core",
]
[[package]]
name = "sentry-panic"
version = "0.28.0"
@ -4289,17 +4162,6 @@ dependencies = [
"sentry-core",
]
[[package]]
name = "sentry-tracing"
version = "0.28.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc8d5bae1e1c06d96a966efc425bf1479a90464de99757d40601ce449f91fbed"
dependencies = [
"sentry-core",
"tracing-core",
"tracing-subscriber",
]
[[package]]
name = "sentry-types"
version = "0.28.0"
@ -4479,7 +4341,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f054c6c1a6e95179d6f23ed974060dcefb2d9388bb7256900badad682c499de4"
dependencies = [
"digest 0.10.5",
"rand_core 0.6.3",
"rand_core",
]
[[package]]
@ -4498,7 +4360,7 @@ dependencies = [
"http",
"iri-string",
"k256",
"rand 0.8.5",
"rand",
"sha3",
"thiserror",
"time 0.3.17",
@ -4525,7 +4387,7 @@ version = "0.4.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4614a76b2a8be0058caa9dbbaf66d988527d86d003c11a94fbd335d7661edcef"
dependencies = [
"autocfg 1.1.0",
"autocfg",
]
[[package]]
@ -4668,7 +4530,7 @@ dependencies = [
"once_cell",
"paste",
"percent-encoding",
"rand 0.8.5",
"rand",
"rsa",
"rust_decimal",
"rustls",
@ -4806,7 +4668,7 @@ dependencies = [
"indicatif",
"itertools",
"once_cell",
"rand 0.8.5",
"rand",
"reqwest",
"semver",
"serde",
@ -4934,7 +4796,7 @@ dependencies = [
name = "thread-fast-rng"
version = "0.1.0"
dependencies = [
"rand 0.8.5",
"rand",
"rand_xoshiro",
]
@ -5025,7 +4887,7 @@ version = "1.21.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a9e03c497dc955702ba729190dc4aac6f2a0ce97f913e5b1b5912fc5039d9099"
dependencies = [
"autocfg 1.1.0",
"autocfg",
"bytes",
"libc",
"memchr",
@ -5146,7 +5008,6 @@ dependencies = [
"tower",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
@ -5155,18 +5016,6 @@ version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "343bc9466d3fe6b0f960ef45960509f84480bf4fd96f92901afe7ff3df9d3a62"
[[package]]
name = "tower-request-id"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "10b702a9ce17a8bd4c2f0e6acdcf75debaf14e4125cee76d4c5ac730f3ed9520"
dependencies = [
"http",
"tower-layer",
"tower-service",
"ulid 0.4.1",
]
[[package]]
name = "tower-service"
version = "0.3.1"
@ -5237,7 +5086,6 @@ dependencies = [
"matchers",
"nu-ansi-term",
"once_cell",
"parking_lot 0.12.1",
"regex",
"sharded-slab",
"smallvec",
@ -5271,7 +5119,7 @@ dependencies = [
"http",
"httparse",
"log",
"rand 0.8.5",
"rand",
"rustls",
"sha-1",
"thiserror",
@ -5304,24 +5152,13 @@ dependencies = [
"static_assertions",
]
[[package]]
name = "ulid"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b7e95a59b292ca0cf9b45be2e52294d1ca6cb24eb11b08ef4376f73f1a00c549"
dependencies = [
"chrono",
"lazy_static",
"rand 0.6.5",
]
[[package]]
name = "ulid"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "13a3aaa69b04e5b66cc27309710a569ea23593612387d67daaf102e73aa974fd"
dependencies = [
"rand 0.8.5",
"rand",
"serde",
]
@ -5606,6 +5443,7 @@ dependencies = [
"derive_more",
"dotenv",
"entities",
"env_logger",
"ethers",
"fdlimit",
"flume",
@ -5616,6 +5454,7 @@ dependencies = [
"http",
"ipnet",
"itertools",
"log",
"metered",
"migration",
"moka",
@ -5631,7 +5470,6 @@ dependencies = [
"rustc-hash",
"sea-orm",
"sentry",
"sentry-tracing",
"serde",
"serde_json",
"serde_prometheus",
@ -5643,10 +5481,7 @@ dependencies = [
"toml",
"tower",
"tower-http",
"tower-request-id",
"tracing",
"tracing-subscriber",
"ulid 1.0.0",
"ulid",
"url",
"uuid 1.2.1",
]

@ -98,8 +98,9 @@ Flame graphs make a developer's join of finding slow code painless:
4
$ echo -1 | sudo tee /proc/sys/kernel/perf_event_paranoid
-1
$ CARGO_PROFILE_RELEASE_DEBUG=true cargo flamegraph --bin web3_proxy
$ CARGO_PROFILE_RELEASE_DEBUG=true cargo flamegraph --bin web3_proxy --no-inline
Be sure to use `--no-inline` or perf will be VERY slow
## GDB

@ -236,8 +236,11 @@ These are roughly in order of completition
- [-] add configurable size limits to all the Caches
- instead of configuring each cache with MB sizes, have one value for total memory footprint and then percentages for each cache
- [x] improve sorting servers by weight. don't force to lower weights, still have a probability that smaller weights might be
- [ ] flamegraphs show 52% of the time to be in tracing. figure out how to improve that
- [ ] add block timestamp to the /status page
- [ ] cache the status page for a second
- [ ] probably need to turn more sentry log integrations (like anyhow) on!
- [ ] tests should use `test-env-log = "0.2.8"`
- [ ] actually block unauthenticated requests instead of emitting warning of "allowing without auth during development!"
@ -245,6 +248,7 @@ These are roughly in order of completition
These are not yet ordered. There might be duplicates. We might not actually need all of these.
- [ ] flamegraphs show 25% of the time to be in moka-housekeeper. tune that
- [ ] remove the "metered" crate now that we save aggregate queries?
- [ ] remove/change the "active_requests" counter? maybe only once we have dynamic soft limits?
- [ ] refactor so configs can change while running
@ -410,8 +414,6 @@ in another repo: event subscriber
- [ ] test /api/getGaugesmethod
- usually times out after vercel's 60 second timeout
- one time got: Error invalid Json response ""
- [ ] i think all the async methods in ethers need tracing instrument. something like `cfgif(tracing, tracing::instrument)`
- if they do that, i think my request_id will show up on their logs
- [ ] page that prints a graphviz dotfile of the blockchain
- [ ] search for all the "TODO" and `todo!(...)` items in the code and move them here
- [ ] add the backend server to the header?

@ -9,6 +9,6 @@ redis-rate-limiter = { path = "../redis-rate-limiter" }
anyhow = "1.0.66"
hashbrown = "0.13.1"
log = "0.4.17"
moka = { version = "0.9.6", default-features = false, features = ["future"] }
tokio = "1.21.2"
tracing = "0.1.37"

@ -1,4 +1,5 @@
//#![warn(missing_docs)]
use log::error;
use moka::future::Cache;
use redis_rate_limiter::{RedisRateLimitResult, RedisRateLimiter};
use std::cmp::Eq;
@ -8,7 +9,6 @@ use std::sync::atomic::Ordering;
use std::sync::{atomic::AtomicU64, Arc};
use tokio::sync::Mutex;
use tokio::time::{Duration, Instant};
use tracing::{error, info_span, Instrument};
/// A local cache that sits in front of a RedisRateLimiter
/// Generic accross the key so it is simple to use with IPs or user keys
@ -118,7 +118,7 @@ where
// if we get a redis error, just let the user through.
// if users are sticky on a server, local caches will work well enough
// though now that we do this, we need to reset rate limits every minute! cache must have ttl!
error!(?err, "unable to rate limit! creating empty cache");
error!("unable to rate limit! creating empty cache. err={:?}", err);
0
}
};
@ -177,9 +177,9 @@ where
Err(err) => {
// don't let redis errors block our users!
error!(
?key,
?err,
"unable to query rate limits, but local cache is available"
"unable to query rate limits, but local cache is available. key={:?} err={:?}",
key,
err,
);
// TODO: we need to start a timer that resets this count every minute
DeferredRateLimitResult::Allowed
@ -194,11 +194,8 @@ where
// close to period. don't risk it. wait on redis
Ok(rate_limit_f.await)
} else {
// TODO: pass the frontend request id through
let span = info_span!("deferred rate limit");
// rate limit has enough headroom that it should be safe to do this in the background
tokio::spawn(rate_limit_f.instrument(span));
tokio::spawn(rate_limit_f);
Ok(DeferredRateLimitResult::Allowed)
}

8
docs/faster perf.txt Normal file

@ -0,0 +1,8 @@
sudo apt install bison flex
wget https://eighty-twenty.org/files/0001-tools-perf-Use-long-running-addr2line-per-dso.patch
git clone https://github.com/torvalds/linux.git
cd linux
git checkout v5.15
git apply ../0001-tools-perf-Use-long-running-addr2line-per-dso.patch
cd tools/perf
make prefix=$HOME/.local VERSION=5.15 install-bin

@ -7,5 +7,4 @@ edition = "2021"
[dependencies]
anyhow = "1.0.66"
deadpool-redis = { version = "0.11.0", features = ["rt_tokio_1", "serde"] }
tracing = "0.1.37"
tokio = "1.21.2"

@ -3,7 +3,6 @@ use anyhow::Context;
use std::ops::Add;
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::time::{Duration, Instant};
use tracing::{debug, trace};
pub use deadpool_redis::redis;
pub use deadpool_redis::{
@ -97,7 +96,6 @@ impl RedisRateLimiter {
// TODO: at high concurency, this gives "connection reset by peer" errors. at least they are off the hot path
// TODO: only set expire if this is a new key
trace!("redis incr+expire");
// TODO: automatic retry
let x: Vec<_> = redis::pipe()
.atomic()
@ -119,11 +117,8 @@ impl RedisRateLimiter {
// TODO: this might actually be early if we are way over the count
let retry_at = self.next_period(now);
debug!(%label, ?retry_at, "rate limited: {}/{}", new_count, max_per_period);
Ok(RedisRateLimitResult::RetryAt(retry_at, new_count))
} else {
trace!(%label, "NOT rate limited: {}/{}", new_count, max_per_period);
Ok(RedisRateLimitResult::Allowed(new_count))
}
}

@ -27,11 +27,12 @@ axum = { version = "0.5.17", features = ["headers", "serde_json", "tokio-tungste
axum-client-ip = "0.2.0"
axum-macros = "0.2.3"
# TODO: import chrono from sea-orm so we always have the same version
chrono = "0.4.22"
chrono = "0.4.23"
counter = "0.5.7"
derive_more = "0.99.17"
dotenv = "0.15.0"
ethers = { version = "1.0.0", features = ["rustls", "ws"] }
ethers = { version = "1.0.0", default-features = false, features = ["rustls", "ws"] }
env_logger = "0.9.3"
fdlimit = "0.2.1"
flume = "0.10.14"
futures = { version = "0.3.25", features = ["thread-pool"] }
@ -39,6 +40,7 @@ hashbrown = { version = "0.13.1", features = ["serde"] }
hdrhistogram = "7.5.2"
http = "0.2.8"
ipnet = "2.5.1"
log = "0.4.17"
metered = { version = "0.9.0", features = ["serialize"] }
moka = { version = "0.9.6", default-features = false, features = ["future"] }
notify = "5.0.0"
@ -55,24 +57,18 @@ handlebars = "4.3.5"
rustc-hash = "1.1.0"
siwe = "0.5.0"
sea-orm = { version = "0.10.2", features = ["macros"] }
sentry = { version = "0.28.0", default-features = false, features = ["backtrace", "contexts", "panic", "anyhow", "reqwest", "rustls"] }
sentry-tracing = "0.28.0"
sentry = { version = "0.28.0", default-features = false, features = ["backtrace", "contexts", "panic", "anyhow", "reqwest", "rustls", "log", "sentry-log"] }
serde = { version = "1.0.147", features = [] }
serde_json = { version = "1.0.87", default-features = false, features = ["alloc", "raw_value"] }
serde_prometheus = "0.1.6"
# TODO: make sure this time version matches siwe. PR to put this in their prelude
time = "0.3.17"
tokio = { version = "1.21.2", features = ["full", "tracing"] }
tokio = { version = "1.21.2", features = ["full"] }
# TODO: make sure this uuid version matches sea-orm. PR to put this in their prelude
tokio-stream = { version = "0.1.11", features = ["sync"] }
toml = "0.5.9"
tower = "0.4.13"
# TODO: i don't think we need this. we can use it from tower-http instead. though this seems to use ulid and not uuid?
tower-request-id = "0.2.0"
tower-http = { version = "0.3.4", features = ["cors", "sensitive-headers", "trace"] }
tracing = "0.1.37"
# TODO: tracing-subscriber has serde and serde_json features that we might want to use
tracing-subscriber = { version = "0.3.16", features = ["env-filter", "parking_lot"] }
tower-http = { version = "0.3.4", features = ["cors", "sensitive-headers"] }
ulid = { version = "1.0.0", features = ["serde"] }
url = "2.3.1"
uuid = "1.2.1"

@ -5,11 +5,6 @@ use std::time::Duration;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// install global collector configured based on RUST_LOG env var.
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.compact()
.init();
fdlimit::raise_fd_limit();
// erigon

@ -4,12 +4,6 @@ use std::{str::FromStr, time::Duration};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// install global collector configured based on RUST_LOG env var.
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.compact()
.init();
fdlimit::raise_fd_limit();
// erigon does not support most filters

@ -25,6 +25,7 @@ use futures::stream::FuturesUnordered;
use futures::stream::StreamExt;
use hashbrown::HashMap;
use ipnet::IpNet;
use log::{error, info, warn};
use metered::{metered, ErrorCount, HitCount, ResponseTime, Throughput};
use migration::{Migrator, MigratorTrait};
use moka::future::Cache;
@ -43,7 +44,6 @@ use tokio::sync::{broadcast, watch, Semaphore};
use tokio::task::JoinHandle;
use tokio::time::timeout;
use tokio_stream::wrappers::{BroadcastStream, WatchStream};
use tracing::{error, info, instrument, trace, warn};
use ulid::Ulid;
// TODO: make this customizable?
@ -124,7 +124,6 @@ pub struct Web3ProxyApp {
/// flatten a JoinError into an anyhow error
/// Useful when joining multiple futures.
#[instrument(skip_all)]
pub async fn flatten_handle<T>(handle: AnyhowJoinHandle<T>) -> anyhow::Result<T> {
match handle.await {
Ok(Ok(result)) => Ok(result),
@ -134,7 +133,7 @@ pub async fn flatten_handle<T>(handle: AnyhowJoinHandle<T>) -> anyhow::Result<T>
}
/// return the first error or okay if everything worked
#[instrument(skip_all)]
pub async fn flatten_handles<T>(
mut handles: FuturesUnordered<AnyhowJoinHandle<T>>,
) -> anyhow::Result<()> {
@ -150,7 +149,6 @@ pub async fn flatten_handles<T>(
}
/// Connect to the database and run migrations
#[instrument(level = "trace")]
pub async fn get_migrated_db(
db_url: String,
min_connections: u32,
@ -191,7 +189,6 @@ pub struct Web3ProxyAppSpawn {
#[metered(registry = Web3ProxyAppMetrics, registry_expr = self.app_metrics, visibility = pub)]
impl Web3ProxyApp {
/// The main entrypoint.
#[instrument(level = "trace")]
pub async fn spawn(
top_config: TopConfig,
num_workers: usize,
@ -272,8 +269,8 @@ impl Web3ProxyApp {
// test the redis pool
if let Err(err) = redis_pool.get().await {
error!(
?err,
"failed to connect to vredis. some features will be disabled"
"failed to connect to vredis. some features will be disabled. err={:?}",
err
);
};
@ -504,7 +501,6 @@ impl Web3ProxyApp {
Ok((app, cancellable_handles, important_background_handles).into())
}
#[instrument(level = "trace")]
pub fn prometheus_metrics(&self) -> String {
let globals = HashMap::new();
// TODO: what globals? should this be the hostname or what?
@ -526,7 +522,6 @@ impl Web3ProxyApp {
}
#[measure([ErrorCount, HitCount, ResponseTime, Throughput])]
#[instrument(level = "trace")]
pub async fn eth_subscribe<'a>(
self: &'a Arc<Self>,
authorization: Arc<Authorization>,
@ -550,7 +545,7 @@ impl Web3ProxyApp {
Some(x) if x == json!(["newHeads"]) => {
let head_block_receiver = self.head_block_receiver.clone();
trace!(?subscription_id, "new heads subscription");
// trace!("new heads subscription. id={:?}", subscription_id);
tokio::spawn(async move {
let mut head_block_receiver = Abortable::new(
WatchStream::new(head_block_receiver),
@ -580,7 +575,7 @@ impl Web3ProxyApp {
};
}
trace!(?subscription_id, "closed new heads subscription");
// trace!("closed new heads subscription. id={:?}", subscription_id);
});
}
Some(x) if x == json!(["newPendingTransactions"]) => {
@ -591,7 +586,7 @@ impl Web3ProxyApp {
subscription_registration,
);
trace!(?subscription_id, "pending transactions subscription");
// // trace!(?subscription_id, "pending transactions subscription");
tokio::spawn(async move {
while let Some(Ok(new_tx_state)) = pending_tx_receiver.next().await {
let new_tx = match new_tx_state {
@ -619,7 +614,7 @@ impl Web3ProxyApp {
};
}
trace!(?subscription_id, "closed new heads subscription");
// // trace!(?subscription_id, "closed new heads subscription");
});
}
Some(x) if x == json!(["newPendingFullTransactions"]) => {
@ -631,7 +626,7 @@ impl Web3ProxyApp {
subscription_registration,
);
trace!(?subscription_id, "pending transactions subscription");
// // trace!(?subscription_id, "pending transactions subscription");
// TODO: do something with this handle?
tokio::spawn(async move {
@ -663,7 +658,7 @@ impl Web3ProxyApp {
};
}
trace!(?subscription_id, "closed new heads subscription");
// // trace!(?subscription_id, "closed new heads subscription");
});
}
Some(x) if x == json!(["newPendingRawTransactions"]) => {
@ -675,7 +670,7 @@ impl Web3ProxyApp {
subscription_registration,
);
trace!(?subscription_id, "pending transactions subscription");
// // trace!(?subscription_id, "pending transactions subscription");
// TODO: do something with this handle?
tokio::spawn(async move {
@ -707,7 +702,7 @@ impl Web3ProxyApp {
};
}
trace!(?subscription_id, "closed new heads subscription");
// // trace!(?subscription_id, "closed new heads subscription");
});
}
_ => return Err(anyhow::anyhow!("unimplemented")),
@ -723,14 +718,13 @@ impl Web3ProxyApp {
}
/// send the request or batch of requests to the approriate RPCs
#[instrument(level = "trace")]
pub async fn proxy_web3_rpc(
self: &Arc<Self>,
authorization: Arc<Authorization>,
request: JsonRpcRequestEnum,
) -> anyhow::Result<JsonRpcForwardedResponseEnum> {
// TODO: this should probably be trace level
trace!(?request, "proxy_web3_rpc");
// // trace!(?request, "proxy_web3_rpc");
// even though we have timeouts on the requests to our backend providers,
// we need a timeout for the incoming request so that retries don't run forever
@ -755,14 +749,13 @@ impl Web3ProxyApp {
};
// TODO: this should probably be trace level
trace!(?response, "Forwarding");
// // trace!(?response, "Forwarding");
Ok(response)
}
/// cut up the request and send to potentually different servers
/// TODO: make sure this isn't a problem
#[instrument(level = "trace")]
async fn proxy_web3_rpc_requests(
self: &Arc<Self>,
authorization: &Arc<Authorization>,
@ -792,12 +785,10 @@ impl Web3ProxyApp {
}
/// TODO: i don't think we want or need this. just use app.db_conn, or maybe app.db_conn.clone() or app.db_conn.as_ref()
#[instrument(level = "trace")]
pub fn db_conn(&self) -> Option<DatabaseConnection> {
self.db_conn.clone()
}
#[instrument(level = "trace")]
pub async fn redis_conn(&self) -> anyhow::Result<redis_rate_limiter::RedisConnection> {
match self.vredis_pool.as_ref() {
None => Err(anyhow::anyhow!("no redis server configured")),
@ -810,13 +801,12 @@ impl Web3ProxyApp {
}
#[measure([ErrorCount, HitCount, ResponseTime, Throughput])]
#[instrument(level = "trace")]
async fn proxy_web3_rpc_request(
self: &Arc<Self>,
authorization: &Arc<Authorization>,
mut request: JsonRpcRequest,
) -> anyhow::Result<JsonRpcForwardedResponse> {
trace!("Received request: {:?}", request);
// trace!("Received request: {:?}", request);
// TODO: allow customizing the period?
let request_metadata = Arc::new(RequestMetadata::new(60, &request)?);

@ -6,6 +6,7 @@ use derive_more::From;
use entities::rpc_accounting;
use hashbrown::HashMap;
use hdrhistogram::{Histogram, RecordError};
use log::{error, info};
use sea_orm::{ActiveModelTrait, DatabaseConnection, DbErr};
use std::num::NonZeroU64;
use std::sync::atomic::Ordering;
@ -14,7 +15,6 @@ use std::time::{Duration, SystemTime};
use tokio::sync::broadcast;
use tokio::task::JoinHandle;
use tokio::time::{interval_at, Instant};
use tracing::{error, info};
/// TODO: where should this be defined?
/// TODO: can we use something inside sea_orm instead?
@ -351,7 +351,7 @@ impl StatEmitter {
if let Some(value) = response_aggregate_map.get_mut(&key) {
if let Err(err) = value.add(stat) {
error!(?err, "unable to aggregate stats!");
error!( "unable to aggregate stats! err={:?}", err);
};
} else {
unimplemented!();
@ -364,7 +364,7 @@ impl StatEmitter {
// TODO: batch these saves
for (key, aggregate) in response_aggregate_map.drain() {
if let Err(err) = aggregate.save(self.chain_id, &self.db_conn, key, period_timestamp).await {
error!(?err, "Unable to save stat while shutting down!");
error!("Unable to save stat while shutting down! {:?}", err);
};
}
// advance to the next period
@ -377,7 +377,7 @@ impl StatEmitter {
info!("aggregate stat_loop shutting down");
// TODO: call aggregate_stat for all the
},
Err(err) => error!(?err, "shutdown receiver"),
Err(err) => error!("shutdown receiver. err={:?}", err),
}
break;
}
@ -391,7 +391,7 @@ impl StatEmitter {
.save(self.chain_id, &self.db_conn, key, period_timestamp)
.await
{
error!(?err, "Unable to save stat while shutting down!");
error!("Unable to save stat while shutting down! err={:?}", err);
};
}

@ -9,6 +9,7 @@
#![forbid(unsafe_code)]
use futures::StreamExt;
use log::{debug, info, warn, LevelFilter};
use parking_lot::deadlock;
use std::fs;
use std::sync::atomic::{self, AtomicUsize};
@ -16,9 +17,6 @@ use std::thread;
use tokio::runtime;
use tokio::sync::broadcast;
use tokio::time::Duration;
use tracing::{debug, info, warn};
use tracing_subscriber::prelude::*;
use tracing_subscriber::EnvFilter;
use web3_proxy::app::{flatten_handle, flatten_handles, Web3ProxyApp};
use web3_proxy::config::{CliConfig, TopConfig};
use web3_proxy::{frontend, metrics_frontend};
@ -28,7 +26,8 @@ fn run(
cli_config: CliConfig,
top_config: TopConfig,
) -> anyhow::Result<()> {
debug!(?cli_config, ?top_config);
debug!("{:?}", cli_config);
debug!("{:?}", top_config);
let mut shutdown_receiver = shutdown_sender.subscribe();
@ -70,7 +69,7 @@ fn run(
let rt = rt_builder.build()?;
let num_workers = rt.metrics().num_workers();
debug!(?num_workers);
debug!("num_workers: {}", num_workers);
rt.block_on(async {
let app_frontend_port = cli_config.port;
@ -134,7 +133,7 @@ fn run(
// one of the handles stopped. send a value so the others know to shut down
if let Err(err) = shutdown_sender.send(()) {
warn!(?err, "shutdown sender");
warn!("shutdown sender err={:?}", err);
};
// wait on all the important background tasks (like saving stats to the database) to complete
@ -175,6 +174,12 @@ fn main() -> anyhow::Result<()> {
// TODO: this doesn't seem to do anything
proctitle::set_title(format!("web3_proxy-{}", top_config.app.chain_id));
let mut log_builder = env_logger::builder();
log_builder
.filter_level(LevelFilter::Off)
.parse_env("RUST_LOG");
// connect to sentry for error reporting
// if no sentry, only log to stdout
let _sentry_guard = if let Some(sentry_url) = top_config.app.sentry_url.clone() {
@ -188,24 +193,16 @@ fn main() -> anyhow::Result<()> {
},
));
// TODO: how do we put the EnvFilter on this?
tracing_subscriber::registry()
.with(
tracing_subscriber::fmt::layer()
.compact()
.with_filter(EnvFilter::from_default_env()),
)
.with(sentry_tracing::layer())
.init();
let logger = sentry::integrations::log::SentryLogger::with_dest(log_builder.build());
log::set_boxed_logger(Box::new(logger)).unwrap();
Some(guard)
} else {
// install global collector configured based on RUST_LOG env var.
// TODO: attach sentry here
tracing_subscriber::fmt()
.compact()
.with_env_filter(EnvFilter::from_default_env())
.init();
let logger = log_builder.build();
log::set_boxed_logger(Box::new(logger)).unwrap();
None
};
@ -243,13 +240,8 @@ mod tests {
// TODO: how should we handle logs in this?
// TODO: option for super verbose logs
std::env::set_var("RUST_LOG", "info,web3_proxy=debug");
// install global collector configured based on RUST_LOG env var.
// TODO: sentry is needed here!
tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())
.compact()
.with_test_writer()
.init();
let _ = env_logger::builder().is_test(true).try_init();
let anvil = Anvil::new().spawn();

@ -1,6 +1,6 @@
use argh::FromArgs;
use log::{error, info, warn};
use std::fs;
use tracing::{error, info, warn};
use web3_proxy::config::TopConfig;
#[derive(FromArgs, PartialEq, Eq, Debug)]

@ -2,8 +2,8 @@ use anyhow::Context;
use argh::FromArgs;
use entities::{rpc_key, user};
use ethers::prelude::Address;
use log::info;
use sea_orm::{ActiveModelTrait, TransactionTrait};
use tracing::info;
use ulid::Ulid;
use uuid::Uuid;
use web3_proxy::frontend::authorization::RpcSecretKey;

@ -37,11 +37,7 @@ async fn main() -> anyhow::Result<()> {
std::env::set_var("RUST_LOG", "info,web3_proxy=debug,web3_proxy_cli=debug");
}
// install global collector configured based on RUST_LOG env var.
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.compact()
.init();
env_logger::init();
// this probably won't matter for us in docker, but better safe than sorry
fdlimit::raise_fd_limit();

@ -4,8 +4,8 @@ use ethers::{
prelude::{BlockNumber, U64},
types::H256,
};
use log::{warn};
use std::sync::Arc;
use tracing::{instrument, warn};
use crate::{frontend::authorization::Authorization, rpcs::connections::Web3Connections};
@ -39,7 +39,7 @@ pub fn block_num_to_u64(block_num: BlockNumber, latest_block: U64) -> U64 {
}
/// modify params to always have a block number and not "latest"
#[instrument(level = "trace")]
pub async fn clean_block_number(
authorization: &Arc<Authorization>,
params: &mut serde_json::Value,
@ -98,7 +98,7 @@ pub async fn clean_block_number(
}
// TODO: change this to also return the hash needed?
#[instrument(level = "trace")]
pub async fn block_needed(
authorization: &Arc<Authorization>,
method: &str,
@ -210,7 +210,7 @@ pub async fn block_needed(
Ok(block) => Ok(Some(block)),
Err(err) => {
// TODO: seems unlikely that we will get here
warn!(?err, "could not get block from params");
warn!("could not get block from params. err={:?}", err);
Ok(None)
}
}

@ -10,7 +10,6 @@ use sea_orm::DatabaseConnection;
use serde::Deserialize;
use std::sync::Arc;
use tokio::sync::broadcast;
use tracing::instrument;
pub type BlockAndRpc = (Option<ArcBlock>, Arc<Web3Connection>);
pub type TxHashAndRpc = (TxHash, Arc<Web3Connection>);
@ -201,7 +200,6 @@ impl Web3ConnectionConfig {
/// Create a Web3Connection from config
/// TODO: move this into Web3Connection (just need to make things pub(crate))
#[allow(clippy::too_many_arguments)]
#[instrument(level = "trace", skip(redis_pool))]
pub async fn spawn(
self,
name: String,

@ -13,6 +13,7 @@ use entities::{rpc_key, user, user_tier};
use hashbrown::HashMap;
use http::HeaderValue;
use ipnet::IpNet;
use log::error;
use redis_rate_limiter::redis::AsyncCommands;
use redis_rate_limiter::RedisRateLimitResult;
use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter};
@ -21,7 +22,6 @@ use std::sync::atomic::{AtomicBool, AtomicU64};
use std::{net::IpAddr, str::FromStr, sync::Arc};
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use tokio::time::Instant;
use tracing::{error, instrument, trace};
use ulid::Ulid;
use uuid::Uuid;
@ -348,7 +348,6 @@ pub async fn key_is_authorized(
impl Web3ProxyApp {
/// Limit the number of concurrent requests from the given ip address.
#[instrument(level = "trace")]
pub async fn ip_semaphore(&self, ip: IpAddr) -> anyhow::Result<Option<OwnedSemaphorePermit>> {
if let Some(max_concurrent_requests) = self.config.public_max_concurrent_requests {
let semaphore = self
@ -374,7 +373,6 @@ impl Web3ProxyApp {
}
/// Limit the number of concurrent requests from the given rpc key.
#[instrument(level = "trace")]
pub async fn rpc_key_semaphore(
&self,
authorization_checks: &AuthorizationChecks,
@ -386,7 +384,7 @@ impl Web3ProxyApp {
.rpc_key_semaphores
.get_with(rpc_key_id, async move {
let s = Semaphore::new(max_concurrent_requests as usize);
trace!("new semaphore for rpc_key_id {}", rpc_key_id);
// // trace!("new semaphore for rpc_key_id {}", rpc_key_id);
Arc::new(s)
})
.await;
@ -407,7 +405,6 @@ impl Web3ProxyApp {
/// Verify that the given bearer token and address are allowed to take the specified action.
/// This includes concurrent request limiting.
#[instrument(level = "trace")]
pub async fn bearer_is_authorized(
&self,
bearer: Bearer,
@ -447,7 +444,6 @@ impl Web3ProxyApp {
Ok((user, semaphore_permit))
}
#[instrument(level = "trace")]
pub async fn rate_limit_login(&self, ip: IpAddr) -> anyhow::Result<RateLimitResult> {
// TODO: dry this up with rate_limit_by_rpc_key?
@ -474,19 +470,19 @@ impl Web3ProxyApp {
// TODO: set headers so they know when they can retry
// TODO: debug or trace?
// this is too verbose, but a stat might be good
trace!(?ip, "login rate limit exceeded until {:?}", retry_at);
// // trace!(?ip, "login rate limit exceeded until {:?}", retry_at);
Ok(RateLimitResult::RateLimited(authorization, Some(retry_at)))
}
Ok(RedisRateLimitResult::RetryNever) => {
// TODO: i don't think we'll get here. maybe if we ban an IP forever? seems unlikely
trace!(?ip, "login rate limit is 0");
// // trace!(?ip, "login rate limit is 0");
Ok(RateLimitResult::RateLimited(authorization, None))
}
Err(err) => {
// internal error, not rate limit being hit
// TODO: i really want axum to do this for us in a single place.
error!(?err, "login rate limiter is unhappy. allowing ip");
error!("login rate limiter is unhappy. allowing ip. err={:?}", err);
Ok(RateLimitResult::Allowed(authorization, None))
}
@ -498,7 +494,6 @@ impl Web3ProxyApp {
}
/// origin is included because it can override the default rate limits
#[instrument(level = "trace")]
pub async fn rate_limit_by_ip(
&self,
allowed_origin_requests_per_period: &HashMap<String, u64>,
@ -529,18 +524,18 @@ impl Web3ProxyApp {
}
Ok(DeferredRateLimitResult::RetryAt(retry_at)) => {
// TODO: set headers so they know when they can retry
trace!(?ip, "rate limit exceeded until {:?}", retry_at);
// // trace!(?ip, "rate limit exceeded until {:?}", retry_at);
Ok(RateLimitResult::RateLimited(authorization, Some(retry_at)))
}
Ok(DeferredRateLimitResult::RetryNever) => {
// TODO: i don't think we'll get here. maybe if we ban an IP forever? seems unlikely
trace!(?ip, "rate limit is 0");
// // trace!(?ip, "rate limit is 0");
Ok(RateLimitResult::RateLimited(authorization, None))
}
Err(err) => {
// this an internal error of some kind, not the rate limit being hit
// TODO: i really want axum to do this for us in a single place.
error!(?err, "rate limiter is unhappy. allowing ip");
error!("rate limiter is unhappy. allowing ip. err={:?}", err);
// at least we can still check the semaphore
let semaphore = self.ip_semaphore(ip).await?;
@ -558,7 +553,6 @@ impl Web3ProxyApp {
}
// check the local cache for user data, or query the database
#[instrument(level = "trace")]
pub(crate) async fn authorization_checks(
&self,
rpc_secret_key: RpcSecretKey,
@ -566,7 +560,7 @@ impl Web3ProxyApp {
let authorization_checks: Result<_, Arc<anyhow::Error>> = self
.rpc_secret_key_cache
.try_get_with(rpc_secret_key.into(), async move {
trace!(?rpc_secret_key, "user cache miss");
// // trace!(?rpc_secret_key, "user cache miss");
let db_conn = self.db_conn().context("Getting database connection")?;
@ -671,7 +665,6 @@ impl Web3ProxyApp {
}
/// Authorized the ip/origin/referer/useragent and rate limit and concurrency
#[instrument(level = "trace")]
pub async fn rate_limit_by_rpc_key(
&self,
ip: IpAddr,
@ -722,19 +715,19 @@ impl Web3ProxyApp {
// this is too verbose, but a stat might be good
// TODO: keys are secrets! use the id instead
// TODO: emit a stat
trace!(?rpc_key, "rate limit exceeded until {:?}", retry_at);
// // trace!(?rpc_key, "rate limit exceeded until {:?}", retry_at);
Ok(RateLimitResult::RateLimited(authorization, Some(retry_at)))
}
Ok(DeferredRateLimitResult::RetryNever) => {
// TODO: keys are secret. don't log them!
trace!(?rpc_key, "rate limit is 0");
// // trace!(?rpc_key, "rate limit is 0");
// TODO: emit a stat
Ok(RateLimitResult::RateLimited(authorization, None))
}
Err(err) => {
// internal error, not rate limit being hit
// TODO: i really want axum to do this for us in a single place.
error!(?err, "rate limiter is unhappy. allowing ip");
error!("rate limiter is unhappy. allowing ip. err={:?}", err);
Ok(RateLimitResult::Allowed(authorization, semaphore))
}

@ -11,12 +11,12 @@ use axum::{
use derive_more::From;
use http::header::InvalidHeaderValue;
use ipnet::AddrParseError;
use log::warn;
use redis_rate_limiter::redis::RedisError;
use reqwest::header::ToStrError;
use sea_orm::DbErr;
use std::error::Error;
use tokio::{task::JoinError, time::Instant};
use tracing::{instrument, trace, warn};
// TODO: take "IntoResponse" instead of Response?
pub type FrontendResult = Result<Response, FrontendErrorResponse>;
@ -43,12 +43,11 @@ pub enum FrontendErrorResponse {
}
impl IntoResponse for FrontendErrorResponse {
#[instrument(level = "trace")]
fn into_response(self) -> Response {
// TODO: include the request id in these so that users can give us something that will point to logs
let (status_code, response) = match self {
Self::Anyhow(err) => {
warn!(?err, "anyhow");
warn!("anyhow. err={:?}", err);
(
StatusCode::INTERNAL_SERVER_ERROR,
JsonRpcForwardedResponse::from_string(
@ -60,7 +59,7 @@ impl IntoResponse for FrontendErrorResponse {
)
}
Self::Box(err) => {
warn!(?err, "boxed");
warn!("boxed err={:?}", err);
(
StatusCode::INTERNAL_SERVER_ERROR,
JsonRpcForwardedResponse::from_str(
@ -72,7 +71,7 @@ impl IntoResponse for FrontendErrorResponse {
)
}
Self::Database(err) => {
warn!(?err, "database");
warn!("database err={:?}", err);
(
StatusCode::INTERNAL_SERVER_ERROR,
JsonRpcForwardedResponse::from_str(
@ -83,7 +82,7 @@ impl IntoResponse for FrontendErrorResponse {
)
}
Self::HeadersError(err) => {
warn!(?err, "HeadersError");
warn!("HeadersError {:?}", err);
(
StatusCode::BAD_REQUEST,
JsonRpcForwardedResponse::from_str(
@ -94,7 +93,7 @@ impl IntoResponse for FrontendErrorResponse {
)
}
Self::IpAddrParse(err) => {
warn!(?err, "IpAddrParse");
warn!("IpAddrParse err={:?}", err);
(
StatusCode::BAD_REQUEST,
JsonRpcForwardedResponse::from_str(
@ -105,7 +104,7 @@ impl IntoResponse for FrontendErrorResponse {
)
}
Self::InvalidHeaderValue(err) => {
warn!(?err, "InvalidHeaderValue");
warn!("InvalidHeaderValue err={:?}", err);
(
StatusCode::BAD_REQUEST,
JsonRpcForwardedResponse::from_str(
@ -116,7 +115,7 @@ impl IntoResponse for FrontendErrorResponse {
)
}
Self::JoinError(err) => {
warn!(?err, "JoinError. likely shutting down");
warn!("JoinError. likely shutting down. err={:?}", err);
(
StatusCode::INTERNAL_SERVER_ERROR,
JsonRpcForwardedResponse::from_str(
@ -171,7 +170,7 @@ impl IntoResponse for FrontendErrorResponse {
)
}
Self::Redis(err) => {
warn!(?err, "redis");
warn!("redis err={:?}", err);
(
StatusCode::INTERNAL_SERVER_ERROR,
JsonRpcForwardedResponse::from_str(
@ -187,7 +186,7 @@ impl IntoResponse for FrontendErrorResponse {
}
Self::StatusCode(status_code, err_msg, err) => {
// TODO: warn is way too loud. different status codes should get different error levels. 500s should warn. 400s should stat
trace!(?status_code, ?err_msg, ?err);
// trace!(?status_code, ?err_msg, ?err);
(
status_code,
JsonRpcForwardedResponse::from_str(
@ -198,7 +197,7 @@ impl IntoResponse for FrontendErrorResponse {
)
}
Self::HeaderToString(err) => {
trace!(?err, "HeaderToString");
// // trace!(?err, "HeaderToString");
(
StatusCode::BAD_REQUEST,
JsonRpcForwardedResponse::from_str(
@ -209,7 +208,7 @@ impl IntoResponse for FrontendErrorResponse {
)
}
Self::UlidDecodeError(err) => {
trace!(?err, "UlidDecodeError");
// // trace!(?err, "UlidDecodeError");
(
StatusCode::BAD_REQUEST,
JsonRpcForwardedResponse::from_str(
@ -234,7 +233,6 @@ impl IntoResponse for FrontendErrorResponse {
}
}
#[instrument(level = "trace")]
pub async fn handler_404() -> Response {
FrontendErrorResponse::NotFound.into_response()
}

@ -10,46 +10,21 @@ pub mod users;
use crate::app::Web3ProxyApp;
use axum::{
body::Body,
handler::Handler,
routing::{get, post, put},
Extension, Router,
};
use http::header::AUTHORIZATION;
use http::Request;
use log::info;
use std::iter::once;
use std::net::SocketAddr;
use std::sync::Arc;
use tower_http::cors::CorsLayer;
use tower_http::sensitive_headers::SetSensitiveRequestHeadersLayer;
use tower_http::trace::TraceLayer;
use tower_request_id::{RequestId, RequestIdLayer};
use tracing::{error_span, info, instrument};
/// Start the frontend server.
#[instrument(level = "trace")]
pub async fn serve(port: u16, proxy_app: Arc<Web3ProxyApp>) -> anyhow::Result<()> {
// create a tracing span for each request with a random request id and the method
// GET: websocket or static pages
// POST: http rpc or login
let request_tracing_layer =
TraceLayer::new_for_http().make_span_with(|request: &Request<Body>| {
// We get the request id from the extensions
let request_id = request
.extensions()
.get::<RequestId>()
.map(ToString::to_string)
.unwrap_or_else(|| "unknown".into());
// And then we put it along with other information into the `request` span
error_span!(
"http_request",
id = %request_id,
// TODO: do we want these?
method = %request.method(),
// uri = %request.uri(),
)
});
pub async fn serve(port: u16, proxy_app: Arc<Web3ProxyApp>) -> anyhow::Result<()> {
// build our axum Router
let app = Router::new()
// routes should be ordered most to least common
@ -91,12 +66,8 @@ pub async fn serve(port: u16, proxy_app: Arc<Web3ProxyApp>) -> anyhow::Result<()
// the last layer is first for requests and last for responses
// Mark the `Authorization` request header as sensitive so it doesn't show in logs
.layer(SetSensitiveRequestHeadersLayer::new(once(AUTHORIZATION)))
// add the request id to our tracing logs
.layer(request_tracing_layer)
// handle cors
.layer(CorsLayer::very_permissive())
// create a unique id for each request
.layer(RequestIdLayer)
// application state
.layer(Extension(proxy_app.clone()))
// 404 for any unknown routes

@ -10,38 +10,27 @@ use axum::{response::IntoResponse, Extension, Json};
use axum_client_ip::ClientIp;
use axum_macros::debug_handler;
use std::sync::Arc;
use tracing::{error_span, instrument, Instrument};
/// POST /rpc -- Public entrypoint for HTTP JSON-RPC requests. Web3 wallets use this.
/// Defaults to rate limiting by IP address, but can also read the Authorization header for a bearer token.
/// If possible, please use a WebSocket instead.
#[debug_handler]
#[instrument(level = "trace")]
pub async fn proxy_web3_rpc(
Extension(app): Extension<Arc<Web3ProxyApp>>,
ClientIp(ip): ClientIp,
origin: Option<TypedHeader<Origin>>,
Json(payload): Json<JsonRpcRequestEnum>,
) -> FrontendResult {
let request_span = error_span!("request", %ip);
// TODO: do we care about keeping the TypedHeader wrapper?
let origin = origin.map(|x| x.0);
let (authorization, _semaphore) = ip_is_authorized(&app, ip, origin)
.instrument(request_span)
.await?;
let request_span = error_span!("request", ?authorization);
let (authorization, _semaphore) = ip_is_authorized(&app, ip, origin).await?;
let authorization = Arc::new(authorization);
// TODO: spawn earlier? i think we want ip_is_authorized in this future
let f = tokio::spawn(async move {
app.proxy_web3_rpc(authorization, payload)
.instrument(request_span)
.await
});
let f = tokio::spawn(async move { app.proxy_web3_rpc(authorization, payload).await });
let response = f.await.expect("joinhandle should always work")?;
@ -53,7 +42,7 @@ pub async fn proxy_web3_rpc(
/// Can optionally authorized based on origin, referer, or user agent.
/// If possible, please use a WebSocket instead.
#[debug_handler]
#[instrument(level = "trace")]
pub async fn proxy_web3_rpc_with_key(
Extension(app): Extension<Arc<Web3ProxyApp>>,
ClientIp(ip): ClientIp,
@ -65,8 +54,6 @@ pub async fn proxy_web3_rpc_with_key(
) -> FrontendResult {
let rpc_key = rpc_key.parse()?;
let request_span = error_span!("request", %ip, ?referer, ?user_agent);
// keep the semaphore until the end of the response
let (authorization, _semaphore) = key_is_authorized(
&app,
@ -76,18 +63,14 @@ pub async fn proxy_web3_rpc_with_key(
referer.map(|x| x.0),
user_agent.map(|x| x.0),
)
.instrument(request_span.clone())
.await?;
let request_span = error_span!("request", ?authorization);
let authorization = Arc::new(authorization);
// the request can take a while, so we spawn so that we can start serving another request
// TODO: spawn even earlier?
let f = tokio::spawn(async move {
app.proxy_web3_rpc(authorization, payload)
.instrument(request_span)
.await
});

@ -21,10 +21,10 @@ use futures::{
use handlebars::Handlebars;
use hashbrown::HashMap;
use http::StatusCode;
use log::{error, info};
use serde_json::{json, value::RawValue};
use std::sync::Arc;
use std::{str::from_utf8_mut, sync::atomic::AtomicUsize};
use tracing::{error, error_span, info, instrument, trace, Instrument};
use crate::{
app::Web3ProxyApp,
@ -34,7 +34,7 @@ use crate::{
/// Public entrypoint for WebSocket JSON-RPC requests.
/// Defaults to rate limiting by IP address, but can also read the Authorization header for a bearer token.
#[debug_handler]
#[instrument(level = "trace")]
pub async fn websocket_handler(
Extension(app): Extension<Arc<Web3ProxyApp>>,
ClientIp(ip): ClientIp,
@ -42,23 +42,16 @@ pub async fn websocket_handler(
ws_upgrade: Option<WebSocketUpgrade>,
) -> FrontendResult {
// TODO: i don't like logging ips. move this to trace level?
let request_span = error_span!("request", %ip, ?origin);
let origin = origin.map(|x| x.0);
let (authorization, _semaphore) = ip_is_authorized(&app, ip, origin)
.instrument(request_span)
.await?;
let request_span = error_span!("request", ?authorization);
let (authorization, _semaphore) = ip_is_authorized(&app, ip, origin).await?;
let authorization = Arc::new(authorization);
match ws_upgrade {
Some(ws) => Ok(ws
.on_upgrade(|socket| {
proxy_web3_socket(app, authorization, socket).instrument(request_span)
})
.on_upgrade(|socket| proxy_web3_socket(app, authorization, socket))
.into_response()),
None => {
if let Some(redirect) = &app.config.redirect_public_url {
@ -79,7 +72,7 @@ pub async fn websocket_handler(
/// Rate limit and billing based on the api key in the url.
/// Can optionally authorized based on origin, referer, or user agent.
#[debug_handler]
#[instrument(level = "trace")]
pub async fn websocket_handler_with_key(
Extension(app): Extension<Arc<Web3ProxyApp>>,
ClientIp(ip): ClientIp,
@ -91,8 +84,6 @@ pub async fn websocket_handler_with_key(
) -> FrontendResult {
let rpc_key = rpc_key.parse()?;
let request_span = error_span!("request", %ip, ?referer, ?user_agent);
let (authorization, _semaphore) = key_is_authorized(
&app,
rpc_key,
@ -101,18 +92,14 @@ pub async fn websocket_handler_with_key(
referer.map(|x| x.0),
user_agent.map(|x| x.0),
)
.instrument(request_span.clone())
.await?;
// TODO: type that wraps Address and have it censor? would protect us from accidently logging addresses or other user info
let request_span = error_span!("request", ?authorization);
let authorization = Arc::new(authorization);
match ws_upgrade {
Some(ws_upgrade) => Ok(ws_upgrade.on_upgrade(move |socket| {
proxy_web3_socket(app, authorization, socket).instrument(request_span)
})),
Some(ws_upgrade) => {
Ok(ws_upgrade.on_upgrade(move |socket| proxy_web3_socket(app, authorization, socket)))
}
None => {
// if no websocket upgrade, this is probably a user loading the url with their browser
match (
@ -156,7 +143,6 @@ pub async fn websocket_handler_with_key(
}
}
#[instrument(level = "trace")]
async fn proxy_web3_socket(
app: Arc<Web3ProxyApp>,
authorization: Arc<Authorization>,
@ -173,7 +159,7 @@ async fn proxy_web3_socket(
}
/// websockets support a few more methods than http clients
#[instrument(level = "trace")]
async fn handle_socket_payload(
app: Arc<Web3ProxyApp>,
authorization: &Arc<Authorization>,
@ -193,7 +179,6 @@ async fn handle_socket_payload(
{
"eth_subscribe" => {
// TODO: what should go in this span?
let span = error_span!("eth_subscribe");
let response = app
.eth_subscribe(
@ -202,7 +187,6 @@ async fn handle_socket_payload(
subscription_count,
response_sender.clone(),
)
.instrument(span)
.await;
match response {
@ -269,7 +253,6 @@ async fn handle_socket_payload(
Message::Text(response_str)
}
#[instrument(level = "trace")]
async fn read_web3_socket(
app: Arc<Web3ProxyApp>,
authorization: Arc<Authorization>,
@ -295,7 +278,7 @@ async fn read_web3_socket(
}
Message::Ping(x) => Message::Pong(x),
Message::Pong(x) => {
trace!("pong: {:?}", x);
// // trace!("pong: {:?}", x);
continue;
}
Message::Close(_) => {
@ -328,7 +311,6 @@ async fn read_web3_socket(
}
}
#[instrument(level = "trace")]
async fn write_web3_socket(
response_rx: flume::Receiver<Message>,
mut ws_tx: SplitSink<WebSocket, Message>,
@ -343,7 +325,7 @@ async fn write_web3_socket(
// forward the response to through the websocket
if let Err(err) = ws_tx.send(msg).await {
// this isn't a problem. this is common and happens whenever a client disconnects
trace!(?err, "unable to write to websocket");
// trace!(?err, "unable to write to websocket");
break;
};
}

@ -9,11 +9,10 @@ use axum_macros::debug_handler;
use moka::future::ConcurrentCacheExt;
use serde_json::json;
use std::sync::Arc;
use tracing::instrument;
/// Health check page for load balancers to use.
#[debug_handler]
#[instrument(level = "trace")]
pub async fn health(Extension(app): Extension<Arc<Web3ProxyApp>>) -> impl IntoResponse {
// TODO: also check that the head block is not too old
if app.balanced_rpcs.synced() {
@ -27,7 +26,7 @@ pub async fn health(Extension(app): Extension<Arc<Web3ProxyApp>>) -> impl IntoRe
///
/// TODO: when done debugging, remove this and only allow access on a different port
#[debug_handler]
#[instrument(level = "trace")]
pub async fn prometheus(Extension(app): Extension<Arc<Web3ProxyApp>>) -> impl IntoResponse {
app.prometheus_metrics()
}
@ -36,7 +35,7 @@ pub async fn prometheus(Extension(app): Extension<Arc<Web3ProxyApp>>) -> impl In
///
/// TODO: replace this with proper stats and monitoring
#[debug_handler]
#[instrument(level = "trace")]
pub async fn status(Extension(app): Extension<Arc<Web3ProxyApp>>) -> impl IntoResponse {
app.pending_transactions.sync();
app.rpc_secret_key_cache.sync();

@ -24,6 +24,7 @@ use hashbrown::HashMap;
use http::{HeaderValue, StatusCode};
use ipnet::IpNet;
use itertools::Itertools;
use log::warn;
use redis_rate_limiter::redis::AsyncCommands;
use sea_orm::{
ActiveModelTrait, ColumnTrait, EntityTrait, PaginatorTrait, QueryFilter, QueryOrder,
@ -36,7 +37,6 @@ use std::ops::Add;
use std::str::FromStr;
use std::sync::Arc;
use time::{Duration, OffsetDateTime};
use tracing::{instrument, warn};
use ulid::Ulid;
/// `GET /user/login/:user_address` or `GET /user/login/:user_address/:message_eip` -- Start the "Sign In with Ethereum" (siwe) login flow.
@ -57,7 +57,7 @@ use ulid::Ulid;
/// It is a better UX to just click "login with ethereum" and have the account created if it doesn't exist.
/// We can prompt for an email and and payment after they log in.
#[debug_handler]
#[instrument(level = "trace")]
pub async fn user_login_get(
Extension(app): Extension<Arc<Web3ProxyApp>>,
ClientIp(ip): ClientIp,
@ -154,7 +154,7 @@ pub struct PostLogin {
/// It is recommended to save the returned bearer token in a cookie.
/// The bearer token can be used to authenticate other requests, such as getting the user's stats or modifying the user's profile.
#[debug_handler]
#[instrument(level = "trace")]
pub async fn user_login_post(
Extension(app): Extension<Arc<Web3ProxyApp>>,
ClientIp(ip): ClientIp,
@ -333,7 +333,7 @@ pub async fn user_login_post(
/// `POST /user/logout` - Forget the bearer token in the `Authentication` header.
#[debug_handler]
#[instrument(level = "trace")]
pub async fn user_logout_post(
Extension(app): Extension<Arc<Web3ProxyApp>>,
TypedHeader(Authorization(bearer)): TypedHeader<Authorization<Bearer>>,
@ -355,7 +355,7 @@ pub async fn user_logout_post(
///
/// TODO: this will change as we add better support for secondary users.
#[debug_handler]
#[instrument(level = "trace")]
pub async fn user_get(
Extension(app): Extension<Arc<Web3ProxyApp>>,
TypedHeader(Authorization(bearer_token)): TypedHeader<Authorization<Bearer>>,
@ -373,7 +373,7 @@ pub struct UserPost {
/// `POST /user` -- modify the account connected to the bearer token in the `Authentication` header.
#[debug_handler]
#[instrument(level = "trace")]
pub async fn user_post(
Extension(app): Extension<Arc<Web3ProxyApp>>,
TypedHeader(Authorization(bearer_token)): TypedHeader<Authorization<Bearer>>,
@ -420,7 +420,7 @@ pub async fn user_post(
/// TODO: one key per request? maybe /user/balance/:rpc_key?
/// TODO: this will change as we add better support for secondary users.
#[debug_handler]
#[instrument(level = "trace")]
pub async fn user_balance_get(
Extension(app): Extension<Arc<Web3ProxyApp>>,
TypedHeader(Authorization(bearer)): TypedHeader<Authorization<Bearer>>,
@ -438,7 +438,7 @@ pub async fn user_balance_get(
/// TODO: one key per request? maybe /user/balance/:rpc_key?
/// TODO: this will change as we add better support for secondary users.
#[debug_handler]
#[instrument(level = "trace")]
pub async fn user_balance_post(
Extension(app): Extension<Arc<Web3ProxyApp>>,
TypedHeader(Authorization(bearer)): TypedHeader<Authorization<Bearer>>,
@ -452,7 +452,7 @@ pub async fn user_balance_post(
///
/// TODO: one key per request? maybe /user/keys/:rpc_key?
#[debug_handler]
#[instrument(level = "trace")]
pub async fn rpc_keys_get(
Extension(app): Extension<Arc<Web3ProxyApp>>,
TypedHeader(Authorization(bearer)): TypedHeader<Authorization<Bearer>>,
@ -481,7 +481,7 @@ pub async fn rpc_keys_get(
/// `DELETE /user/keys` -- Use a bearer token to delete an existing key.
#[debug_handler]
#[instrument(level = "trace")]
pub async fn rpc_keys_delete(
Extension(app): Extension<Arc<Web3ProxyApp>>,
TypedHeader(Authorization(bearer)): TypedHeader<Authorization<Bearer>>,
@ -511,7 +511,7 @@ pub struct UserKeyManagement {
/// `POST /user/keys` or `PUT /user/keys` -- Use a bearer token to create or update an existing key.
#[debug_handler]
#[instrument(level = "trace")]
pub async fn rpc_keys_management(
Extension(app): Extension<Arc<Web3ProxyApp>>,
TypedHeader(Authorization(bearer)): TypedHeader<Authorization<Bearer>>,
@ -677,7 +677,7 @@ pub async fn rpc_keys_management(
/// `GET /user/revert_logs` -- Use a bearer token to get the user's revert logs.
#[debug_handler]
#[instrument(level = "trace")]
pub async fn user_revert_logs_get(
Extension(app): Extension<Arc<Web3ProxyApp>>,
TypedHeader(Authorization(bearer)): TypedHeader<Authorization<Bearer>>,
@ -733,7 +733,7 @@ pub async fn user_revert_logs_get(
/// `GET /user/stats/aggregate` -- Public endpoint for aggregate stats such as bandwidth used and methods requested.
#[debug_handler]
#[instrument(level = "trace")]
pub async fn user_stats_aggregate_get(
Extension(app): Extension<Arc<Web3ProxyApp>>,
bearer: Option<TypedHeader<Authorization<Bearer>>>,
@ -754,7 +754,7 @@ pub async fn user_stats_aggregate_get(
///
/// TODO: this will change as we add better support for secondary users.
#[debug_handler]
#[instrument(level = "trace")]
pub async fn user_stats_detailed_get(
Extension(app): Extension<Arc<Web3ProxyApp>>,
bearer: Option<TypedHeader<Authorization<Bearer>>>,

@ -4,7 +4,6 @@ use serde::de::{self, Deserializer, MapAccess, SeqAccess, Visitor};
use serde::{Deserialize, Serialize};
use serde_json::value::RawValue;
use std::fmt;
use tracing::instrument;
// this is used by serde
#[allow(dead_code)]
@ -194,12 +193,10 @@ impl JsonRpcForwardedResponse {
Self::from_string(message, code, id)
}
#[instrument(level = "trace")]
pub fn from_str(message: &str, code: Option<i64>, id: Option<Box<RawValue>>) -> Self {
Self::from_string(message.to_string(), code, id)
}
#[instrument(level = "trace")]
pub fn from_string(message: String, code: Option<i64>, id: Option<Box<RawValue>>) -> Self {
// TODO: this is too verbose. plenty of errors are valid, like users giving an invalid address. no need to log that
// TODO: can we somehow get the initial request here? if we put that into a tracing span, will things slow down a ton?
@ -217,7 +214,6 @@ impl JsonRpcForwardedResponse {
}
}
#[instrument(level = "trace")]
pub fn from_response(partial_response: Box<RawValue>, id: Box<RawValue>) -> Self {
JsonRpcForwardedResponse {
jsonrpc: "2.0".to_string(),
@ -228,7 +224,6 @@ impl JsonRpcForwardedResponse {
}
}
#[instrument(level = "trace")]
pub fn from_value(partial_response: serde_json::Value, id: Box<RawValue>) -> Self {
let partial_response =
serde_json::to_string(&partial_response).expect("this should always work");
@ -244,7 +239,6 @@ impl JsonRpcForwardedResponse {
}
}
#[instrument(level = "trace")]
pub fn from_ethers_error(e: ProviderError, id: Box<RawValue>) -> anyhow::Result<Self> {
// TODO: move turning ClientError into json to a helper function?
let code;
@ -304,7 +298,6 @@ impl JsonRpcForwardedResponse {
})
}
#[instrument(level = "trace")]
pub fn try_from_response_result(
result: Result<Box<RawValue>, ProviderError>,
id: Box<RawValue>,

@ -2,14 +2,14 @@ use axum::headers::HeaderName;
use axum::http::HeaderValue;
use axum::response::{IntoResponse, Response};
use axum::{routing::get, Extension, Router};
use log::info;
use std::net::SocketAddr;
use std::sync::Arc;
use tracing::{info, instrument};
use crate::app::Web3ProxyApp;
/// Run a prometheus metrics server on the given port.
#[instrument(level = "trace")]
pub async fn serve(app: Arc<Web3ProxyApp>, port: u16) -> anyhow::Result<()> {
// build our application with a route
// order most to least common
@ -42,7 +42,6 @@ pub async fn serve(app: Arc<Web3ProxyApp>, port: u16) -> anyhow::Result<()> {
.map_err(Into::into)
}
#[instrument(level = "trace")]
async fn root(Extension(app): Extension<Arc<Web3ProxyApp>>) -> Response {
let serialized = app.prometheus_metrics();

@ -10,13 +10,13 @@ use anyhow::Context;
use derive_more::From;
use ethers::prelude::{Block, TxHash, H256, U64};
use hashbrown::{HashMap, HashSet};
use log::{debug, warn, Level};
use moka::future::Cache;
use serde::Serialize;
use serde_json::json;
use std::{cmp::Ordering, fmt::Display, sync::Arc};
use tokio::sync::{broadcast, watch};
use tokio::time::Duration;
use tracing::{debug, instrument, trace, warn, Level};
// TODO: type for Hydrated Blocks with their full transactions?
pub type ArcBlock = Arc<Block<TxHash>>;
@ -38,7 +38,7 @@ impl Display for BlockId {
impl Web3Connections {
/// add a block to our map and it's hash to our graphmap of the blockchain
#[instrument]
pub async fn save_block(&self, block: &ArcBlock, heaviest_chain: bool) -> anyhow::Result<()> {
// TODO: i think we can rearrange this function to make it faster on the hot path
let block_hash = block.hash.as_ref().context("no block hash")?;
@ -62,11 +62,11 @@ impl Web3Connections {
}
if blockchain.contains_node(*block_hash) {
trace!(%block_hash, %block_num, "block already saved");
// // trace!(%block_hash, %block_num, "block already saved");
return Ok(());
}
trace!(%block_hash, %block_num, "saving new block");
// // trace!(%block_hash, %block_num, "saving new block");
self.block_hashes
.insert(*block_hash, block.to_owned())
@ -85,7 +85,6 @@ impl Web3Connections {
/// Get a block from caches with fallback.
/// Will query a specific node or the best available.
#[instrument(level = "trace")]
pub async fn block(
&self,
authorization: &Arc<Authorization>,
@ -108,7 +107,7 @@ impl Web3Connections {
.request(
"eth_getBlockByHash",
&json!(get_block_params),
Level::ERROR.into(),
Level::Error.into(),
)
.await?
}
@ -237,7 +236,7 @@ impl Web3Connections {
)
.await
{
warn!(rpc=%rpc_name, ?err, "unable to process block from rpc");
warn!("unable to process block from rpc {}: {:?}", rpc_name, err);
}
}
@ -267,7 +266,7 @@ impl Web3Connections {
if rpc_head_num.is_zero() {
// TODO: i don't think we can get to this anymore now that we use Options
debug!(%rpc, "still syncing");
debug!("{} still syncing", rpc);
connection_heads.remove(&rpc.name);
@ -286,7 +285,7 @@ impl Web3Connections {
}
None => {
// TODO: warn is too verbose. this is expected if a node disconnects and has to reconnect
trace!(%rpc, "Block without number or hash!");
// // trace!(%rpc, "Block without number or hash!");
connection_heads.remove(&rpc.name);
@ -311,7 +310,7 @@ impl Web3Connections {
// TODO: why does this happen?!?! seems to only happen with uncled blocks
// TODO: maybe we should do get_with?
// TODO: maybe we should just continue. this only seems to happen when an older block is received
warn!(%connection_head_hash, %conn_name, %rpc, "Missing connection_head_block in block_hashes. Fetching now");
warn!("Missing connection_head_block in block_hashes. Fetching now. hash={}. other={}. rpc={}", connection_head_hash, conn_name, rpc);
// this option should always be populated
let conn_rpc = self.conns.get(conn_name);
@ -322,7 +321,7 @@ impl Web3Connections {
{
Ok(block) => block,
Err(err) => {
warn!(%connection_head_hash, %conn_name, %rpc, ?err, "Failed fetching connection_head_block for block_hashes");
warn!("Processing {}. Failed fetching connection_head_block for block_hashes. {} head hash={}. err={:?}", rpc, conn_name, connection_head_hash, err);
continue;
}
}
@ -393,9 +392,9 @@ impl Web3Connections {
// not enough rpcs yet. check the parent
if let Some(parent_block) = self.block_hashes.get(&maybe_head_block.parent_hash)
{
trace!(
child=%maybe_head_hash, parent=%parent_block.hash.unwrap(), "avoiding thundering herd",
);
// // trace!(
// child=%maybe_head_hash, parent=%parent_block.hash.unwrap(), "avoiding thundering herd",
// );
maybe_head_block = parent_block;
continue;
@ -428,9 +427,12 @@ impl Web3Connections {
.swap(Arc::new(empty_synced_connections));
// TODO: log different things depending on old_synced_connections
warn!(%rpc, "no consensus head! {}/{}/{}", 0, num_connection_heads, total_conns);
warn!(
"Processing {}. no consensus head! {}/{}/{}",
rpc, 0, num_connection_heads, total_conns
);
} else {
trace!(?highest_rpcs);
// // trace!(?highest_rpcs);
// TODO: if maybe_head_block.time() is old, ignore it
@ -470,7 +472,14 @@ impl Web3Connections {
// TODO: if the rpc_head_block != consensus_head_block, log something?
match &old_synced_connections.head_block_id {
None => {
debug!(block=%consensus_head_block_id, %rpc, "first {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns);
debug!(
"first {}/{}/{} block={}, rpc={}",
num_consensus_rpcs,
num_connection_heads,
total_conns,
consensus_head_block_id,
rpc
);
self.save_block(&consensus_head_block, true).await?;
@ -491,10 +500,27 @@ impl Web3Connections {
// multiple blocks with the same fork!
if consensus_head_block_id.hash == old_block_id.hash {
// no change in hash. no need to use head_block_sender
debug!(con_head=%consensus_head_block_id, rpc_head=%rpc_head_str, %rpc, "con {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns)
debug!(
"con {}/{}/{}. con head={}. rpc={}. rpc head={}",
num_consensus_rpcs,
num_connection_heads,
total_conns,
consensus_head_block_id,
rpc,
rpc_head_str
)
} else {
// hash changed
debug!(con_head=%consensus_head_block_id, old=%old_block_id, rpc_head=%rpc_head_str, %rpc, "unc {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns);
debug!(
"unc {}/{}/{} con_head={}. old={}. rpc_head={}. rpc={}",
num_consensus_rpcs,
num_connection_heads,
total_conns,
consensus_head_block_id,
old_block_id,
rpc_head_str,
rpc
);
self.save_block(&consensus_head_block, true)
.await
@ -508,7 +534,7 @@ impl Web3Connections {
Ordering::Less => {
// this is unlikely but possible
// TODO: better log
warn!(con_head=%consensus_head_block_id, old_head=%old_block_id, rpc_head=%rpc_head_str, %rpc, "chain rolled back {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns);
warn!("chain rolled back {}/{}/{}. con_head={} old_head={}. rpc_head={}. rpc={}", num_consensus_rpcs, num_connection_heads, total_conns, consensus_head_block_id, old_block_id, rpc_head_str, rpc);
// TODO: tell save_block to remove any higher block numbers from the cache. not needed because we have other checks on requested blocks being > head, but still seems slike a good idea
self.save_block(&consensus_head_block, true).await.context(
@ -520,7 +546,15 @@ impl Web3Connections {
.context("head_block_sender sending consensus_head_block")?;
}
Ordering::Greater => {
debug!(con_head=%consensus_head_block_id, rpc_head=%rpc_head_str, %rpc, "new {}/{}/{}", num_consensus_rpcs, num_connection_heads, total_conns);
debug!(
"new {}/{}/{} conn_head={}. rpc_head={}. rpc={}",
num_consensus_rpcs,
num_connection_heads,
total_conns,
consensus_head_block_id,
rpc_head_str,
rpc
);
self.save_block(&consensus_head_block, true).await?;

@ -9,6 +9,7 @@ use anyhow::Context;
use ethers::prelude::{Bytes, Middleware, ProviderError, TxHash, H256, U64};
use futures::future::try_join_all;
use futures::StreamExt;
use log::{debug, error, info, warn, Level};
use parking_lot::RwLock;
use redis_rate_limiter::{RedisPool, RedisRateLimitResult, RedisRateLimiter};
use sea_orm::DatabaseConnection;
@ -25,7 +26,6 @@ use thread_fast_rng::thread_fast_rng;
use tokio::sync::broadcast;
use tokio::sync::RwLock as AsyncRwLock;
use tokio::time::{interval, sleep, sleep_until, Duration, Instant, MissedTickBehavior};
use tracing::{debug, error, info, instrument, trace, warn, Level};
/// An active connection to a Web3 RPC server like geth or erigon.
pub struct Web3Connection {
@ -58,7 +58,6 @@ pub struct Web3Connection {
impl Web3Connection {
/// Connect to a web3 rpc
// #[instrument(name = "spawn_Web3Connection", skip(hard_limit, http_client))]
// TODO: have this take a builder (which will have channels attached)
#[allow(clippy::too_many_arguments)]
pub async fn spawn(
@ -128,7 +127,7 @@ impl Web3Connection {
.request(
"eth_chainId",
&json!(Option::None::<()>),
Level::ERROR.into(),
Level::Error.into(),
)
.await;
@ -203,7 +202,7 @@ impl Web3Connection {
// TODO: subscribe to a channel instead of polling. subscribe to http_interval_sender?
while head_block_id.is_none() {
warn!(rpc=%self, "no head block yet. retrying");
warn!("no head block yet. retrying rpc {}", self);
sleep(Duration::from_secs(13)).await;
@ -230,11 +229,11 @@ impl Web3Connection {
maybe_archive_block,
)),
// error here are expected, so keep the level low
tracing::Level::DEBUG.into(),
Level::Debug.into(),
)
.await;
trace!(?archive_result, rpc=%self);
// // trace!(?archive_result, rpc=%self);
if archive_result.is_ok() {
limit = Some(block_data_limit);
@ -300,7 +299,7 @@ impl Web3Connection {
);
let reconnect_in = Duration::from_millis(first_sleep_ms);
warn!(rpc=%self, ?reconnect_in, "Reconnect in");
warn!("Reconnect to {} in {}ms", self, reconnect_in.as_millis());
sleep(reconnect_in).await;
@ -318,7 +317,12 @@ impl Web3Connection {
);
let retry_in = Duration::from_millis(sleep_ms);
warn!(rpc=%self, ?retry_in, ?err, "Failed to reconnect!");
warn!(
"Failed reconnect to {}! Retry in {}ms. err={:?}",
self,
retry_in.as_millis(),
err,
);
sleep(retry_in).await;
}
@ -346,7 +350,7 @@ impl Web3Connection {
Web3Provider::Ws(_) => {}
}
info!(rpc=%self, "reconnecting");
info!("Reconnecting to {}", self);
// disconnect the current provider
*provider_option = None;
@ -365,7 +369,7 @@ impl Web3Connection {
.context("block_sender during connect")?;
}
} else {
info!(rpc=%self, "connecting");
info!("connecting to {}", self);
}
// TODO: if this fails, keep retrying! otherwise it crashes and doesn't try again!
@ -373,7 +377,7 @@ impl Web3Connection {
*provider_option = Some(Arc::new(new_provider));
info!(rpc=%self, "successfully connected");
info!("successfully connected to {}", self);
Ok(())
}
@ -446,7 +450,7 @@ impl Web3Connection {
.context("block_sender")?;
}
Err(err) => {
warn!(?err, "unable to get block from {}", self);
warn!("unable to get block from {}. err={:?}", self, err);
{
let mut head_block_id = self.head_block_id.write();
@ -477,7 +481,7 @@ impl Web3Connection {
reconnect: bool,
) -> anyhow::Result<()> {
loop {
debug!(rpc=%self, "subscribing");
debug!("subscribing to {}", self);
let http_interval_receiver = http_interval_sender.as_ref().map(|x| x.subscribe());
@ -510,9 +514,9 @@ impl Web3Connection {
loop {
if let Some(provider) = conn.provider.read().await.as_ref() {
if provider.ready() {
trace!(rpc=%conn, "provider is ready");
// // trace!(rpc=%conn, "provider is ready");
} else {
warn!(rpc=%conn, "provider is NOT ready");
warn!("rpc {} is NOT ready", conn);
// returning error will trigger a reconnect
// TODO: what if we just happened to have this check line up with another restart?
return Err(anyhow::anyhow!("provider is not ready"));
@ -535,22 +539,18 @@ impl Web3Connection {
}
Err(err) => {
if reconnect {
warn!(
rpc=%self,
?err,
"subscription exited",
);
warn!("{} subscription exited. err={:?}", self, err);
self.retrying_reconnect(block_sender.as_ref(), true).await?;
} else {
error!(rpc=%self, ?err, "subscription exited");
error!("{} subscription exited. err={:?}", self, err);
return Err(err);
}
}
}
}
info!(rpc=%self, "all subscriptions complete");
info!("all subscriptions on {} completed", self);
Ok(())
}
@ -563,7 +563,7 @@ impl Web3Connection {
block_sender: flume::Sender<BlockAndRpc>,
block_map: BlockHashesCache,
) -> anyhow::Result<()> {
info!(%self, "watching new heads");
info!("watching new heads on {}", self);
// TODO: is a RwLock of an Option<Arc> the right thing here?
if let Some(provider) = self.provider.read().await.clone() {
@ -587,7 +587,7 @@ impl Web3Connection {
.request(
"eth_getBlockByNumber",
&json!(("latest", false)),
tracing::Level::ERROR.into(),
Level::Error.into(),
)
.await;
@ -632,7 +632,7 @@ impl Web3Connection {
}
}
Err(err) => {
warn!(?err, "Internal error on latest block from {}", self);
warn!("Internal error on latest block from {}. {:?}", self, err);
self.send_head_block_result(
Ok(None),
@ -656,12 +656,12 @@ impl Web3Connection {
broadcast::error::RecvError::Lagged(lagged) => {
// querying the block was delayed
// this can happen if tokio is very busy or waiting for requests limits took too long
warn!(?err, rpc=%self, "http interval lagging by {}!", lagged);
warn!("http interval on {} lagging by {}!", self, lagged);
}
}
}
trace!(rpc=%self, "ok http interval");
// // trace!(rpc=%self, "ok http interval");
}
}
Web3Provider::Ws(provider) => {
@ -682,7 +682,7 @@ impl Web3Connection {
.request(
"eth_getBlockByNumber",
&json!(("latest", false)),
tracing::Level::ERROR.into(),
Level::Error.into(),
)
.await;
@ -723,7 +723,7 @@ impl Web3Connection {
// TODO: is this always an error?
// TODO: we probably don't want a warn and to return error
warn!(rpc=%self, "new_heads subscription ended");
warn!("new_heads subscription to {} ended", self);
return Err(anyhow::anyhow!("new_heads subscription ended"));
}
}
@ -737,7 +737,7 @@ impl Web3Connection {
authorization: Arc<Authorization>,
tx_id_sender: flume::Sender<(TxHash, Arc<Self>)>,
) -> anyhow::Result<()> {
info!(%self, "watching pending transactions");
info!( "watching pending transactions on {}", self);
// TODO: is a RwLock of an Option<Arc> the right thing here?
if let Some(provider) = self.provider.read().await.clone() {
@ -790,7 +790,7 @@ impl Web3Connection {
// TODO: is this always an error?
// TODO: we probably don't want a warn and to return error
warn!(rpc=%self, "pending_transactions subscription ended");
warn!( "pending_transactions subscription ended on {}", self);
return Err(anyhow::anyhow!("pending_transactions subscription ended"));
}
}
@ -800,7 +800,7 @@ impl Web3Connection {
}
/// be careful with this; it might wait forever!
#[instrument]
pub async fn wait_for_request_handle(
self: &Arc<Self>,
authorization: &Arc<Authorization>,
@ -811,13 +811,13 @@ impl Web3Connection {
loop {
let x = self.try_request_handle(authorization).await;
trace!(?x, "try_request_handle");
// // trace!(?x, "try_request_handle");
match x {
Ok(OpenRequestResult::Handle(handle)) => return Ok(handle),
Ok(OpenRequestResult::RetryAt(retry_at)) => {
// TODO: emit a stat?
trace!(?retry_at);
// // trace!(?retry_at);
if retry_at > max_wait {
// break now since we will wait past our maximum wait time
@ -836,7 +836,6 @@ impl Web3Connection {
}
}
#[instrument]
pub async fn try_request_handle(
self: &Arc<Self>,
authorization: &Arc<Authorization>,
@ -853,14 +852,14 @@ impl Web3Connection {
// TODO: how should we know if we should set expire or not?
match ratelimiter.throttle().await? {
RedisRateLimitResult::Allowed(_) => {
trace!("rate limit succeeded")
// // trace!("rate limit succeeded")
}
RedisRateLimitResult::RetryAt(retry_at, _) => {
// rate limit failed
// save the smallest retry_after. if nothing succeeds, return an Err with retry_after in it
// TODO: use tracing better
// TODO: i'm seeing "Exhausted rate limit on moralis: 0ns". How is it getting 0?
warn!(?retry_at, rpc=%self, "Exhausted rate limit");
warn!( "Exhausted rate limit on {}. Retry at {:?}", self, retry_at);
return Ok(OpenRequestResult::RetryAt(retry_at));
}

@ -19,6 +19,7 @@ use futures::future::{join_all, try_join_all};
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use hashbrown::HashMap;
use log::{error, info, warn, Level};
use moka::future::{Cache, ConcurrentCacheExt};
use petgraph::graphmap::DiGraphMap;
use sea_orm::DatabaseConnection;
@ -34,7 +35,6 @@ use tokio::sync::RwLock as AsyncRwLock;
use tokio::sync::{broadcast, watch};
use tokio::task;
use tokio::time::{interval, sleep, sleep_until, Duration, Instant, MissedTickBehavior};
use tracing::{error, info, instrument, trace, warn};
/// A collection of web3 connections. Sends requests either the current best server or all servers.
#[derive(From)]
@ -95,7 +95,7 @@ impl Web3Connections {
// TODO: every time a head_block arrives (with a small delay for known slow servers), or on the interval.
interval.tick().await;
trace!("http interval ready");
// // trace!("http interval ready");
// errors are okay. they mean that all receivers have been dropped
let _ = sender.send(());
@ -171,7 +171,7 @@ impl Web3Connections {
Ok(Err(err)) => {
// if we got an error here, it is not retryable
// TODO: include context about which connection failed
error!(?err, "Unable to create connection");
error!("Unable to create connection. err={:?}", err);
}
Err(err) => {
return Err(err.into());
@ -337,7 +337,7 @@ impl Web3Connections {
.into_iter()
.map(|active_request_handle| async move {
let result: Result<Box<RawValue>, _> = active_request_handle
.request(method, &json!(&params), tracing::Level::ERROR.into())
.request(method, &json!(&params), Level::Error.into())
.await;
result
})
@ -472,7 +472,7 @@ impl Web3Connections {
// increment our connection counter
match rpc.try_request_handle(authorization).await {
Ok(OpenRequestResult::Handle(handle)) => {
trace!("next server on {:?}: {:?}", self, rpc);
// // trace!("next server on {:?}: {:?}", self, rpc);
return Ok(OpenRequestResult::Handle(handle));
}
Ok(OpenRequestResult::RetryAt(retry_at)) => {
@ -483,7 +483,7 @@ impl Web3Connections {
}
Err(err) => {
// TODO: log a warning?
warn!(?err, "No request handle for {}", rpc)
warn!("No request handle for {}. err={:?}", rpc, err)
}
}
}
@ -539,7 +539,10 @@ impl Web3Connections {
warn!("no request handle for {}", connection)
}
Err(err) => {
warn!(?err, "error getting request handle for {}", connection)
warn!(
"error getting request handle for {}. err={:?}",
connection, err
)
}
}
}
@ -602,7 +605,7 @@ impl Web3Connections {
) {
Ok(response) => {
if let Some(error) = &response.error {
trace!(?response, "rpc error");
// // trace!(?response, "rpc error");
if let Some(request_metadata) = request_metadata {
request_metadata
@ -629,17 +632,20 @@ impl Web3Connections {
}
}
} else {
trace!(?response, "rpc success");
// // trace!(?response, "rpc success");
}
return Ok(response);
}
Err(e) => {
Err(err) => {
let rpc = skip_rpcs
.last()
.expect("there must have been a provider if we got an error");
warn!(%rpc, ?e, "Backend server error! Retrying on another");
warn!(
"Backend server error on {}! Retrying on another. err={:?}",
rpc, err
);
// TODO: sleep how long? until synced_connections changes or rate limits are available
// sleep(Duration::from_millis(100)).await;
@ -652,7 +658,7 @@ impl Web3Connections {
// TODO: move this to a helper function
// sleep (TODO: with a lock?) until our rate limits should be available
// TODO: if a server catches up sync while we are waiting, we could stop waiting
warn!(?retry_at, "All rate limits exceeded. Sleeping");
warn!("All rate limits exceeded. Sleeping untill {:?}", retry_at);
// TODO: have a separate column for rate limited?
if let Some(request_metadata) = request_metadata {
@ -664,7 +670,7 @@ impl Web3Connections {
continue;
}
OpenRequestResult::RetryNever => {
warn!(?self, "No server handles!");
warn!("No server handles! {:?}", self);
if let Some(request_metadata) = request_metadata {
request_metadata.no_servers.fetch_add(1, Ordering::Release);
@ -690,7 +696,7 @@ impl Web3Connections {
}
/// be sure there is a timeout on this or it might loop forever
#[instrument]
pub async fn try_send_all_upstream_servers(
&self,
authorization: &Arc<Authorization>,
@ -729,7 +735,7 @@ impl Web3Connections {
return Ok(response);
}
Err(None) => {
warn!(?self, "No servers in sync! Retrying");
warn!("No servers in sync on {:?}! Retrying", self);
if let Some(request_metadata) = &request_metadata {
request_metadata.no_servers.fetch_add(1, Ordering::Release);

@ -1,7 +1,6 @@
use anyhow::Context;
use derive_more::From;
use std::time::Duration;
use tracing::{info_span, instrument, Instrument};
/// Use HTTP and WS providers.
// TODO: instead of an enum, I tried to use Box<dyn Provider>, but hit <https://github.com/gakonst/ethers-rs/issues/592>
@ -20,7 +19,6 @@ impl Web3Provider {
}
}
#[instrument]
pub async fn from_str(
url_str: &str,
http_client: Option<reqwest::Client>,
@ -38,10 +36,7 @@ impl Web3Provider {
.interval(Duration::from_secs(13))
.into()
} else if url_str.starts_with("ws") {
// TODO: i dont think this instrument does much of anything. what level should it be?
let provider = ethers::providers::Ws::connect(url_str)
.instrument(info_span!("Web3Provider", %url_str))
.await?;
let provider = ethers::providers::Ws::connect(url_str).await?;
// TODO: dry this up (needs https://github.com/gakonst/ethers-rs/issues/592)
// TODO: i don't think this interval matters

@ -8,6 +8,7 @@ use entities::revert_log;
use entities::sea_orm_active_enums::Method;
use ethers::providers::{HttpClientError, ProviderError, WsClientError};
use ethers::types::{Address, Bytes};
use log::{debug, error, warn, Level};
use metered::metered;
use metered::HitCount;
use metered::ResponseTime;
@ -20,8 +21,6 @@ use std::sync::atomic::{self, AtomicBool, Ordering};
use std::sync::Arc;
use thread_fast_rng::rand::Rng;
use tokio::time::{sleep, Duration, Instant};
use tracing::Level;
use tracing::{debug, error, trace, warn};
#[derive(Debug)]
pub enum OpenRequestResult {
@ -67,9 +66,9 @@ struct EthCallFirstParams {
impl From<Level> for RequestErrorHandler {
fn from(level: Level) -> Self {
match level {
Level::DEBUG => RequestErrorHandler::DebugLevel,
Level::ERROR => RequestErrorHandler::ErrorLevel,
Level::WARN => RequestErrorHandler::WarnLevel,
Level::Debug => RequestErrorHandler::DebugLevel,
Level::Error => RequestErrorHandler::ErrorLevel,
Level::Warn => RequestErrorHandler::WarnLevel,
_ => unimplemented!("unexpected tracing Level"),
}
}
@ -85,7 +84,7 @@ impl Authorization {
let rpc_key_id = match self.checks.rpc_key_id {
Some(rpc_key_id) => rpc_key_id.into(),
None => {
trace!(?self, "cannot save revert without rpc_key_id");
// // trace!(?self, "cannot save revert without rpc_key_id");
return Ok(());
}
};
@ -119,7 +118,7 @@ impl Authorization {
// TODO: what log level?
// TODO: better format
trace!(?rl);
// trace!(?rl);
// TODO: return something useful
Ok(())
@ -181,14 +180,14 @@ impl OpenRequestHandle {
// TODO: requests from customers have request ids, but we should add
// TODO: including params in this is way too verbose
// the authorization field is already on a parent span
trace!(rpc=%self.conn, %method, "request");
// trace!(rpc=%self.conn, %method, "request");
let mut provider = None;
while provider.is_none() {
match self.conn.provider.read().await.clone() {
None => {
warn!(rpc=%self.conn, "no provider!");
warn!("no provider for {}!", self.conn);
// TODO: how should this work? a reconnect should be in progress. but maybe force one now?
// TODO: sleep how long? subscribe to something instead? maybe use a watch handle?
// TODO: this is going to be way too verbose!
@ -211,29 +210,29 @@ impl OpenRequestHandle {
// TODO: do something special for eth_sendRawTransaction too
let error_handler = if let RequestErrorHandler::SaveReverts = error_handler {
if !["eth_call", "eth_estimateGas"].contains(&method) {
trace!(%method, "skipping save on revert");
// trace!(%method, "skipping save on revert");
RequestErrorHandler::DebugLevel
} else if self.authorization.db_conn.is_some() {
let log_revert_chance = self.authorization.checks.log_revert_chance;
if log_revert_chance == 0.0 {
trace!(%method, "no chance. skipping save on revert");
// trace!(%method, "no chance. skipping save on revert");
RequestErrorHandler::DebugLevel
} else if log_revert_chance == 1.0 {
trace!(%method, "gaurenteed chance. SAVING on revert");
// trace!(%method, "gaurenteed chance. SAVING on revert");
error_handler
} else if thread_fast_rng::thread_fast_rng().gen_range(0.0f64..=1.0)
< log_revert_chance
{
trace!(%method, "missed chance. skipping save on revert");
// trace!(%method, "missed chance. skipping save on revert");
RequestErrorHandler::DebugLevel
} else {
trace!("Saving on revert");
// trace!("Saving on revert");
// TODO: is always logging at debug level fine?
error_handler
}
} else {
trace!(%method, "no database. skipping save on revert");
// trace!(%method, "no database. skipping save on revert");
RequestErrorHandler::DebugLevel
}
} else {
@ -277,14 +276,14 @@ impl OpenRequestHandle {
RequestErrorHandler::DebugLevel => {
// TODO: think about this revert check more. sometimes we might want reverts logged so this needs a flag
if !is_revert {
debug!(?err, %method, ?params, rpc=%self.conn, "bad response!");
debug!("bad response from {}! method={} params={:?} err={:?}", self.conn, method, params, err);
}
}
RequestErrorHandler::ErrorLevel => {
error!(?err, %method, ?params, rpc=%self.conn, "bad response!");
error!("bad response from {}! method={} params={:?} err={:?}", self.conn, method, params, err);
}
RequestErrorHandler::WarnLevel => {
warn!(?err, %method, ?params, rpc=%self.conn, "bad response!");
warn!("bad response from {}! method={} params={:?} err={:?}", self.conn, method, params, err);
}
RequestErrorHandler::SaveReverts => {
// TODO: do not unwrap! (doesn't matter much since we check method as a string above)
@ -304,8 +303,8 @@ impl OpenRequestHandle {
} else {
// TODO: i think ethers already has trace logging (and does it much more fancy)
// TODO: opt-in response inspection to log reverts with their request. put into redis or what?
// trace!(rpc=%self.conn, %method, ?response);
trace!(%method, rpc=%self.conn, "response");
// // trace!(rpc=%self.conn, %method, ?response);
// trace!(%method, rpc=%self.conn, "response");
}
response

@ -5,9 +5,9 @@ use super::connection::Web3Connection;
use super::connections::Web3Connections;
use super::request::OpenRequestResult;
use ethers::prelude::{ProviderError, Transaction, TxHash};
use log::{debug, Level};
use std::sync::Arc;
use tokio::sync::broadcast;
use tracing::{debug, trace, Level};
// TODO: think more about TxState
#[derive(Clone)]
@ -34,7 +34,7 @@ impl Web3Connections {
.request(
"eth_getTransactionByHash",
&(pending_tx_id,),
Level::ERROR.into(),
Level::Error.into(),
)
.await?
}
@ -43,12 +43,12 @@ impl Web3Connections {
return Ok(None);
}
Err(err) => {
trace!(
?pending_tx_id,
?rpc,
?err,
"cancelled funneling transaction"
);
// trace!(
// ?pending_tx_id,
// ?rpc,
// ?err,
// "cancelled funneling transaction"
// );
return Ok(None);
}
};
@ -79,7 +79,7 @@ impl Web3Connections {
return Ok(());
}
trace!(?pending_tx_id, "checking pending_transactions on {}", rpc);
// trace!(?pending_tx_id, "checking pending_transactions on {}", rpc);
if self.pending_transactions.contains_key(&pending_tx_id) {
// this transaction has already been processed
return Ok(());
@ -94,14 +94,14 @@ impl Web3Connections {
Ok(Some(tx_state)) => {
let _ = pending_tx_sender.send(tx_state);
trace!(?pending_tx_id, "sent");
// trace!(?pending_tx_id, "sent");
// we sent the transaction. return now. don't break looping because that gives a warning
return Ok(());
}
Ok(None) => {}
Err(err) => {
trace!(?err, ?pending_tx_id, "failed fetching transaction");
// trace!(?err, ?pending_tx_id, "failed fetching transaction");
// unable to update the entry. sleep and try again soon
// TODO: retry with exponential backoff with jitter starting from a much smaller time
// sleep(Duration::from_millis(100)).await;
@ -112,7 +112,7 @@ impl Web3Connections {
// "There is a Pending txn with a lower account nonce. This txn can only be executed after confirmation of the earlier Txn Hash#"
// sometimes it's been pending for many hours
// sometimes it's maybe something else?
debug!(?pending_tx_id, "not found on {}", rpc);
debug!("txid {} not found on {}", pending_tx_id, rpc);
Ok(())
}
}

@ -9,16 +9,15 @@ use chrono::NaiveDateTime;
use entities::{rpc_accounting, rpc_key};
use hashbrown::HashMap;
use http::StatusCode;
use log::warn;
use migration::{Condition, Expr, SimpleExpr};
use redis_rate_limiter::{redis::AsyncCommands, RedisConnection};
use sea_orm::{
ColumnTrait, EntityTrait, PaginatorTrait, QueryFilter, QueryOrder, QuerySelect, Select,
};
use tracing::{instrument, warn};
/// get the attached address from redis for the given auth_token.
/// 0 means all users
#[instrument(level = "trace", skip(redis_conn))]
pub async fn get_user_id_from_params(
mut redis_conn: RedisConnection,
// this is a long type. should we strip it down?
@ -68,7 +67,7 @@ pub async fn get_user_id_from_params(
/// only allow rpc_key to be set if user_id is also set.
/// this will keep people from reading someone else's keys.
/// 0 means none.
#[instrument(level = "trace")]
pub fn get_rpc_key_id_from_params(
user_id: u64,
params: &HashMap<String, String>,
@ -87,7 +86,6 @@ pub fn get_rpc_key_id_from_params(
}
}
#[instrument(level = "trace")]
pub fn get_chain_id_from_params(
app: &Web3ProxyApp,
params: &HashMap<String, String>,
@ -102,7 +100,6 @@ pub fn get_chain_id_from_params(
)
}
#[instrument(level = "trace")]
pub fn get_query_start_from_params(
params: &HashMap<String, String>,
) -> anyhow::Result<chrono::NaiveDateTime> {
@ -126,7 +123,6 @@ pub fn get_query_start_from_params(
)
}
#[instrument(level = "trace")]
pub fn get_page_from_params(params: &HashMap<String, String>) -> anyhow::Result<u64> {
params.get("page").map_or_else::<anyhow::Result<u64>, _, _>(
|| {
@ -143,7 +139,6 @@ pub fn get_page_from_params(params: &HashMap<String, String>) -> anyhow::Result<
)
}
#[instrument(level = "trace")]
pub fn get_query_window_seconds_from_params(
params: &HashMap<String, String>,
) -> Result<u64, FrontendErrorResponse> {